Implemente sistemas real-time com tRPC: WebSocket server, subscriptions type-safe, event-driven architecture e live notifications para experiências instantâneas.
Experiência Instantânea: Usuários esperam atualizações em tempo real sem refresh, aumentando engajamento em 40%.
Colaboração: Features como chat, edição simultânea e notificações live são diferenciais competitivos.
Performance: WebSockets eliminam polling, reduzindo latência em 90% e carga do servidor em 70%.
Type Safety: tRPC subscriptions mantém type safety end-to-end mesmo em conexões real-time.
WebSocket Connection:
Conexão bidirecional persistente que permite comunicação real-time entre client/server.
tRPC Subscriptions:
Sistema de subscriptions type-safe que funciona sobre WebSockets ou Server-Sent Events.
Event-Driven Architecture:
Padrão onde componentes se comunicam através de eventos, permitindo desacoplamento.
Connection Management:
Gerenciamento de estado de conexões, reconnect automático e cleanup de recursos.
React + tRPC subscriptions
WebSocket + Event Router
Event publishers/subscribers
Fluxo: Client subscreve → Gateway roteia → Services emitem eventos → Gateway distribui → Client recebe updates
// 📁 server/websocket-server.ts
import { WebSocketServer } from 'ws';
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import { appRouter } from './routers/_app';
import { createContext } from './context';
import { Logger } from '@/infrastructure/logging/logger';
// 🎯 WebSocket Server Configuration
export interface WSServerConfig {
port: number;
path?: string;
maxConnections?: number;
pingInterval?: number;
pongTimeout?: number;
compression?: boolean;
enableMetrics?: boolean;
}
// 🌐 WebSocket Server Manager
export class TRPCWebSocketServer {
private wss: WebSocketServer;
private logger: Logger;
private config: WSServerConfig;
private connections: Map<string, any> = new Map();
private metrics = {
totalConnections: 0,
activeConnections: 0,
messagesSent: 0,
messagesReceived: 0,
reconnections: 0,
};
constructor(config: WSServerConfig) {
this.config = config;
this.logger = new Logger('WebSocket Server');
}
// 🚀 Start WebSocket Server
start(): void {
this.wss = new WebSocketServer({
port: this.config.port,
path: this.config.path || '/trpc',
// 🔧 Server options
perMessageDeflate: this.config.compression !== false,
maxPayload: 16 * 1024 * 1024, // 16MB
clientTracking: true,
// 🔍 Connection verification
verifyClient: (info) => {
// 🛡️ Basic security checks
const origin = info.origin;
const allowedOrigins = process.env.ALLOWED_ORIGINS?.split(',') || [];
if (allowedOrigins.length > 0 && !allowedOrigins.includes(origin)) {
this.logger.warn('Connection rejected: invalid origin', { origin });
return false;
}
// 🔢 Connection limit check
if (this.config.maxConnections &&
this.wss.clients.size >= this.config.maxConnections) {
this.logger.warn('Connection rejected: max connections reached', {
current: this.wss.clients.size,
max: this.config.maxConnections,
});
return false;
}
return true;
},
});
// 📡 Apply tRPC WebSocket handler
const handler = applyWSSHandler({
wss: this.wss,
router: appRouter,
createContext: async ({ req }) => {
// 🔐 Create context with authentication
const context = await createContext({ req });
return {
...context,
// 📊 Add WebSocket specific data
connectionId: this.generateConnectionId(),
connectedAt: new Date(),
type: 'websocket',
};
},
// 🔧 WebSocket options
keepAlive: {
enabled: true,
pingMs: this.config.pingInterval || 30000,
pongWaitMs: this.config.pongTimeout || 5000,
},
// 📊 Event handlers
onError: (opts) => {
this.logger.error('WebSocket error', {
error: opts.error.message,
type: opts.type,
connectionId: opts.ctx?.connectionId,
});
},
});
// 🔌 Connection event handlers
this.setupConnectionHandlers();
this.logger.info('🌐 WebSocket server started', {
port: this.config.port,
path: this.config.path,
maxConnections: this.config.maxConnections,
});
// 📊 Start metrics collection
if (this.config.enableMetrics) {
this.startMetricsCollection();
}
}
// 🔌 Setup connection event handlers
private setupConnectionHandlers(): void {
this.wss.on('connection', (ws, req) => {
const connectionId = this.generateConnectionId();
const clientIp = req.socket.remoteAddress;
const userAgent = req.headers['user-agent'];
// 📝 Store connection info
this.connections.set(connectionId, {
ws,
connectedAt: new Date(),
clientIp,
userAgent,
messageCount: 0,
});
// 📊 Update metrics
this.metrics.totalConnections++;
this.metrics.activeConnections++;
this.logger.info('🔌 New WebSocket connection', {
connectionId,
clientIp,
totalConnections: this.metrics.totalConnections,
activeConnections: this.metrics.activeConnections,
});
// 📨 Message handler
ws.on('message', (data) => {
this.metrics.messagesReceived++;
const connection = this.connections.get(connectionId);
if (connection) {
connection.messageCount++;
}
// 📊 Log high-frequency message sources
if (connection && connection.messageCount > 100) {
this.logger.warn('High message frequency detected', {
connectionId,
messageCount: connection.messageCount,
clientIp,
});
}
});
// 🔌 Disconnect handler
ws.on('close', (code, reason) => {
this.connections.delete(connectionId);
this.metrics.activeConnections--;
this.logger.info('🔌 WebSocket disconnected', {
connectionId,
code,
reason: reason.toString(),
activeConnections: this.metrics.activeConnections,
});
});
// 🚨 Error handler
ws.on('error', (error) => {
this.logger.error('WebSocket connection error', {
connectionId,
error: error.message,
clientIp,
});
});
// 💓 Ping/Pong for connection health
ws.on('ping', () => {
ws.pong();
});
ws.on('pong', () => {
// Connection is alive
const connection = this.connections.get(connectionId);
if (connection) {
connection.lastPong = new Date();
}
});
});
}
// 📊 Start metrics collection
private startMetricsCollection(): void {
setInterval(() => {
this.collectMetrics();
}, 60000); // Every minute
setInterval(() => {
this.healthCheck();
}, 30000); // Every 30 seconds
}
// 📊 Collect WebSocket metrics
private collectMetrics(): void {
const now = new Date();
const activeConnections = Array.from(this.connections.values());
// 🔍 Connection age analysis
const connectionAges = activeConnections.map(conn =>
now.getTime() - conn.connectedAt.getTime()
);
const avgConnectionAge = connectionAges.length > 0
? connectionAges.reduce((a, b) => a + b, 0) / connectionAges.length
: 0;
// 📈 Message statistics
const totalMessages = activeConnections.reduce(
(sum, conn) => sum + conn.messageCount, 0
);
const metrics = {
...this.metrics,
avgConnectionAge: Math.round(avgConnectionAge / 1000), // seconds
totalMessages,
connectionsPerMinute: this.metrics.totalConnections, // reset after collection
memoryUsage: process.memoryUsage(),
uptime: process.uptime(),
};
this.logger.info('📊 WebSocket metrics', metrics);
// 🔄 Reset counters
this.metrics.totalConnections = 0;
// 📤 Send metrics to monitoring system
this.sendMetricsToMonitoring(metrics);
}
// 🏥 Health check for connections
private healthCheck(): void {
const now = new Date();
const staleConnections: string[] = [];
for (const [connectionId, connection] of this.connections.entries()) {
// 🕒 Check for stale connections (no pong in 2 minutes)
if (connection.lastPong &&
now.getTime() - connection.lastPong.getTime() > 120000) {
staleConnections.push(connectionId);
}
// 📊 Check for idle connections (no messages in 10 minutes)
if (now.getTime() - connection.connectedAt.getTime() > 600000 &&
connection.messageCount === 0) {
this.logger.warn('Idle connection detected', {
connectionId,
connectedAt: connection.connectedAt,
messageCount: connection.messageCount,
});
}
}
// 🧹 Clean up stale connections
staleConnections.forEach(connectionId => {
const connection = this.connections.get(connectionId);
if (connection?.ws) {
connection.ws.terminate();
this.connections.delete(connectionId);
this.metrics.activeConnections--;
this.logger.warn('Terminated stale connection', { connectionId });
}
});
}
// 🛑 Graceful shutdown
async shutdown(): Promise<void> {
this.logger.info('🛑 Shutting down WebSocket server...');
// 📢 Notify all clients
const shutdownMessage = JSON.stringify({
type: 'server.shutdown',
message: 'Server is shutting down. Please reconnect.',
reconnectIn: 5000,
});
this.wss.clients.forEach(ws => {
if (ws.readyState === ws.OPEN) {
ws.send(shutdownMessage);
}
});
// ⏳ Wait for clients to disconnect gracefully
await new Promise<void>((resolve) => {
setTimeout(() => {
this.wss.close(() => {
this.logger.info('✅ WebSocket server shut down');
resolve();
});
}, 2000);
});
}
// 📤 Send metrics to monitoring system
private sendMetricsToMonitoring(metrics: any): void {
// 🎯 Integration with monitoring services
if (process.env.PROMETHEUS_ENABLED === 'true') {
// Send to Prometheus
this.sendToPrometheus(metrics);
}
if (process.env.DATADOG_ENABLED === 'true') {
// Send to Datadog
this.sendToDatadog(metrics);
}
}
// 🔑 Generate unique connection ID
private generateConnectionId(): string {
return `ws_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`;
}
// 📊 Get current server status
getStatus(): {
isRunning: boolean;
connections: number;
metrics: any;
uptime: number;
} {
return {
isRunning: this.wss ? true : false,
connections: this.metrics.activeConnections,
metrics: this.metrics,
uptime: process.uptime(),
};
}
// 📢 Broadcast to all connections
broadcast(message: any, filter?: (connection: any) => boolean): void {
const data = JSON.stringify(message);
this.wss.clients.forEach(ws => {
if (ws.readyState === ws.OPEN) {
if (!filter || filter({ ws })) {
ws.send(data);
this.metrics.messagesSent++;
}
}
});
}
// 🎯 Send to specific connections
sendToConnections(connectionIds: string[], message: any): void {
const data = JSON.stringify(message);
connectionIds.forEach(connectionId => {
const connection = this.connections.get(connectionId);
if (connection?.ws && connection.ws.readyState === connection.ws.OPEN) {
connection.ws.send(data);
this.metrics.messagesSent++;
}
});
}
// 📁 server/production-ws.ts
// 🚀 Production WebSocket Server Setup
import { TRPCWebSocketServer } from './websocket-server';
import { Logger } from '@/infrastructure/logging/logger';
const logger = new Logger('Production WebSocket');
// 🔧 Production configuration
const wsConfig = {
port: parseInt(process.env.WS_PORT || '3001'),
path: '/trpc',
maxConnections: parseInt(process.env.WS_MAX_CONNECTIONS || '10000'),
pingInterval: 30000, // 30 seconds
pongTimeout: 5000, // 5 seconds
compression: true,
enableMetrics: true,
};
// 🌐 Start WebSocket server
async function startWebSocketServer() {
const wsServer = new TRPCWebSocketServer(wsConfig);
try {
wsServer.start();
logger.info('🎉 WebSocket server started successfully', {
port: wsConfig.port,
maxConnections: wsConfig.maxConnections,
environment: process.env.NODE_ENV,
});
// 🔄 Graceful shutdown handlers
const shutdown = async (signal: string) => {
logger.info(`📛 Received ${signal}, shutting down gracefully`);
await wsServer.shutdown();
process.exit(0);
};
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
// 📊 Status endpoint for health checks
setInterval(() => {
const status = wsServer.getStatus();
logger.info('🏥 WebSocket server status', status);
}, 300000); // Every 5 minutes
} catch (error) {
logger.error('💥 Failed to start WebSocket server', {
error: error.message,
stack: error.stack,
});
process.exit(1);
}
}
// 🎬 Execute if main module
if (require.main === module) {
startWebSocketServer();
}
export { startWebSocketServer, wsConfig };
// 📁 docker/websocket.Dockerfile
// 🐳 Docker setup for WebSocket server
FROM node:18-alpine AS base
RUN apk add --no-cache libc6-compat
WORKDIR /app
# 📦 Dependencies
FROM base AS deps
COPY package.json package-lock.json ./
RUN npm ci --only=production
# 🏗️ Builder
FROM base AS builder
COPY package.json package-lock.json ./
RUN npm ci
COPY . .
RUN npm run build:websocket
# 🚀 Runner
FROM base AS runner
WORKDIR /app
ENV NODE_ENV=production
RUN addgroup --system --gid 1001 nodejs
RUN adduser --system --uid 1001 websocket
COPY --from=deps /app/node_modules ./node_modules
COPY --from=builder /app/dist ./dist
COPY --from=builder /app/package.json ./package.json
USER websocket
EXPOSE 3001
# 🏥 Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD node -e "
const net = require('net');
const client = net.createConnection({port: 3001}, () => {
client.end();
process.exit(0);
});
client.on('error', () => process.exit(1));
"
CMD ["node", "dist/server/production-ws.js"]
// 📁 server/routers/realtime-router.ts
import { z } from 'zod';
import { router, protectedProcedure } from '../trpc';
import { observable } from '@trpc/server/observable';
import { EventEmitter } from 'events';
// 🎯 Event types
type RealtimeEvents = {
userOnline: { userId: string; organizationId: string; status: 'online' | 'away' | 'offline' };
messageReceived: { messageId: string; chatId: string; userId: string; content: string };
notificationSent: { userId: string; type: string; title: string; data: any };
documentUpdated: { documentId: string; userId: string; changes: any };
};
// 📡 Event emitter for real-time events
export const realtimeEmitter = new EventEmitter();
// 🚀 Realtime Router
export const realtimeRouter = router({
// 👥 User presence subscription
userPresence: protectedProcedure
.input(z.object({
organizationId: z.string().uuid(),
}))
.subscription(async ({ input, ctx }) => {
return observable<RealtimeEvents['userOnline']>((emit) => {
// 🔍 Listen for user presence changes
const onUserPresence = (data: RealtimeEvents['userOnline']) => {
if (data.organizationId === input.organizationId) {
emit.next(data);
}
};
realtimeEmitter.on('userOnline', onUserPresence);
// 📤 Send current user online status
emit.next({
userId: ctx.session.user.id,
organizationId: input.organizationId,
status: 'online',
});
// 🧹 Cleanup function
return () => {
realtimeEmitter.off('userOnline', onUserPresence);
// 📤 Send offline status when disconnecting
realtimeEmitter.emit('userOnline', {
userId: ctx.session.user.id,
organizationId: input.organizationId,
status: 'offline',
});
};
});
}),
// 💬 Chat messages subscription
chatMessages: protectedProcedure
.input(z.object({
chatId: z.string().uuid(),
}))
.subscription(async ({ input, ctx }) => {
return observable<RealtimeEvents['messageReceived']>((emit) => {
const onMessage = (data: RealtimeEvents['messageReceived']) => {
if (data.chatId === input.chatId) {
emit.next(data);
}
};
realtimeEmitter.on('messageReceived', onMessage);
return () => {
realtimeEmitter.off('messageReceived', onMessage);
};
});
}),
// 🔔 Notifications subscription
notifications: protectedProcedure
.subscription(async ({ ctx }) => {
return observable<RealtimeEvents['notificationSent']>((emit) => {
const onNotification = (data: RealtimeEvents['notificationSent']) => {
if (data.userId === ctx.session.user.id) {
emit.next(data);
}
};
realtimeEmitter.on('notificationSent', onNotification);
return () => {
realtimeEmitter.off('notificationSent', onNotification);
};
});
}),
// 📝 Document collaboration subscription
documentChanges: protectedProcedure
.input(z.object({
documentId: z.string().uuid(),
}))
.subscription(async ({ input, ctx }) => {
return observable<RealtimeEvents['documentUpdated']>((emit) => {
const onDocumentChange = (data: RealtimeEvents['documentUpdated']) => {
// 🚫 Don't emit changes from the same user
if (data.documentId === input.documentId && data.userId !== ctx.session.user.id) {
emit.next(data);
}
};
realtimeEmitter.on('documentUpdated', onDocumentChange);
return () => {
realtimeEmitter.off('documentUpdated', onDocumentChange);
};
});
}),
});
// 📁 client/hooks/use-realtime.ts
import { useEffect, useRef, useState } from 'react';
import { trpc } from '@/lib/trpc';
// 👥 User presence hook
export function useUserPresence(organizationId: string) {
const [onlineUsers, setOnlineUsers] = useState<Set<string>>(new Set());
trpc.realtime.userPresence.useSubscription(
{ organizationId },
{
onData: (data) => {
setOnlineUsers(prev => {
const newSet = new Set(prev);
if (data.status === 'online') {
newSet.add(data.userId);
} else {
newSet.delete(data.userId);
}
return newSet;
});
},
onError: (error) => {
console.error('User presence subscription error:', error);
},
}
);
return { onlineUsers: Array.from(onlineUsers) };
}
// 💬 Chat messages hook
export function useChatMessages(chatId: string) {
const [messages, setMessages] = useState<any[]>([]);
const messagesRef = useRef<any[]>([]);
trpc.realtime.chatMessages.useSubscription(
{ chatId },
{
onData: (newMessage) => {
setMessages(prev => {
const updated = [...prev, newMessage];
messagesRef.current = updated;
return updated;
});
},
onError: (error) => {
console.error('Chat subscription error:', error);
},
}
);
// 🧹 Clear messages when chat changes
useEffect(() => {
setMessages([]);
messagesRef.current = [];
}, [chatId]);
return {
messages,
addMessage: (message: any) => {
setMessages(prev => [...prev, message]);
}
};
}
// 🔔 Notifications hook
export function useNotifications() {
const [notifications, setNotifications] = useState<any[]>([]);
const [unreadCount, setUnreadCount] = useState(0);
trpc.realtime.notifications.useSubscription(undefined, {
onData: (notification) => {
setNotifications(prev => [notification, ...prev]);
setUnreadCount(prev => prev + 1);
// 🔔 Show browser notification
if (Notification.permission === 'granted') {
new Notification(notification.title, {
body: notification.data?.body,
icon: '/favicon.ico',
});
}
},
onError: (error) => {
console.error('Notifications subscription error:', error);
},
});
const markAsRead = (notificationId: string) => {
setNotifications(prev =>
prev.map(n => n.id === notificationId ? { ...n, read: true } : n)
);
setUnreadCount(prev => Math.max(0, prev - 1));
};
const clearAll = () => {
setNotifications([]);
setUnreadCount(0);
};
return {
notifications,
unreadCount,
markAsRead,
clearAll,
};
}
// 📁 components/realtime/user-presence.tsx
import { useUserPresence } from '@/hooks/use-realtime';
import { Avatar } from '@/components/ui/avatar';
interface UserPresenceProps {
organizationId: string;
currentUserId: string;
}
export function UserPresence({ organizationId, currentUserId }: UserPresenceProps) {
const { onlineUsers } = useUserPresence(organizationId);
return (
<div className="bg-gray-900/50 rounded-lg p-4 border border-gray-800">
<h3 className="text-white font-semibold mb-3">
👥 Online ({onlineUsers.length})
</h3>
<div className="space-y-2">
{onlineUsers.map(userId => (
<div
key={userId}
className="flex items-center gap-2"
>
<div className="relative">
<Avatar userId={userId} size="sm" />
<div className="absolute -bottom-1 -right-1 w-3 h-3 bg-green-500 rounded-full border-2 border-gray-900"></div>
</div>
<span className="text-gray-300 text-sm">
{userId === currentUserId ? 'You' : `User ${userId.slice(0, 8)}`}
</span>
</div>
))}
</div>
</div>
);
}
// 📁 components/realtime/live-notifications.tsx
import { useNotifications } from '@/hooks/use-realtime';
import { Bell, X } from 'lucide-react';
export function LiveNotifications() {
const { notifications, unreadCount, markAsRead, clearAll } = useNotifications();
const [isOpen, setIsOpen] = useState(false);
return (
<div className="relative">
{/* 🔔 Notification Bell */}
<button
onClick={() => setIsOpen(!isOpen)}
className="relative p-2 rounded-lg bg-gray-800 hover:bg-gray-700 transition-colors"
>
<Bell className="w-5 h-5 text-gray-300" />
{unreadCount > 0 && (
<span className="absolute -top-1 -right-1 bg-red-500 text-white text-xs rounded-full w-5 h-5 flex items-center justify-center">
{unreadCount > 9 ? '9+' : unreadCount}
</span>
)}
</button>
{/* 📋 Notifications Panel */}
{isOpen && (
<div className="absolute top-full right-0 mt-2 w-80 bg-gray-900 rounded-lg border border-gray-700 shadow-xl z-50">
<div className="flex items-center justify-between p-4 border-b border-gray-700">
<h3 className="text-white font-semibold">Notifications</h3>
<div className="flex items-center gap-2">
{notifications.length > 0 && (
<button
onClick={clearAll}
className="text-gray-400 hover:text-white text-xs"
>
Clear all
</button>
)}
<button
onClick={() => setIsOpen(false)}
className="text-gray-400 hover:text-white"
>
<X className="w-4 h-4" />
</button>
</div>
</div>
<div className="max-h-96 overflow-y-auto">
{notifications.length === 0 ? (
<div className="p-4 text-center text-gray-500">
No notifications
</div>
) : (
notifications.map((notification, index) => (
<div
key={index}
className={`p-4 border-b border-gray-700 last:border-b-0 ${
!notification.read ? 'bg-blue-900/20' : ''
}`}
onClick={() => markAsRead(notification.id)}
>
<div className="flex items-start gap-3">
<div className="flex-1">
<h4 className="text-white font-medium text-sm">
{notification.title}
</h4>
{notification.data?.body && (
<p className="text-gray-400 text-sm mt-1">
{notification.data.body}
</p>
)}
<p className="text-gray-500 text-xs mt-2">
{new Date(notification.timestamp).toLocaleTimeString()}
</p>
</div>
{!notification.read && (
<div className="w-2 h-2 bg-blue-500 rounded-full mt-1"></div>
)}
</div>
</div>
))
)}
</div>
</div>
)}
</div>
);
}
// 📁 events/event-bus.ts
import { EventEmitter } from 'events';
import { Logger } from '@/infrastructure/logging/logger';
// 🎯 Event types
export interface DomainEvent {
id: string;
type: string;
aggregateId: string;
aggregateType: string;
data: any;
metadata: {
userId?: string;
correlationId: string;
timestamp: string;
source: string;
version: number;
};
}
export interface EventHandler<T = any> {
eventType: string;
handler: (event: DomainEvent<T>) => Promise<void>;
options?: {
retry?: number;
timeout?: number;
priority?: 'low' | 'normal' | 'high';
};
}
// 🚌 Event Bus
export class EventBus extends EventEmitter {
private logger: Logger;
private handlers: Map<string, EventHandler[]> = new Map();
private metrics = {
eventsPublished: 0,
eventsProcessed: 0,
eventsFailed: 0,
handlersRegistered: 0,
};
constructor() {
super();
this.logger = new Logger('EventBus');
this.setMaxListeners(1000); // Support many handlers
}
// 📝 Register event handler
registerHandler<T>(handler: EventHandler<T>): void {
const existingHandlers = this.handlers.get(handler.eventType) || [];
existingHandlers.push(handler);
this.handlers.set(handler.eventType, existingHandlers);
this.metrics.handlersRegistered++;
this.logger.info('📝 Event handler registered', {
eventType: handler.eventType,
totalHandlers: existingHandlers.length,
});
}
// 📤 Publish event
async publish(event: DomainEvent): Promise<void> {
this.metrics.eventsPublished++;
this.logger.debug('📤 Publishing event', {
eventId: event.id,
eventType: event.type,
aggregateId: event.aggregateId,
});
// 📡 Emit to subscribers
this.emit(event.type, event);
this.emit('*', event); // Global listener
// 🔄 Process handlers
await this.processHandlers(event);
// 📡 Emit to real-time subscribers
await this.emitToRealtime(event);
}
// 🔄 Process event handlers
private async processHandlers(event: DomainEvent): Promise<void> {
const handlers = this.handlers.get(event.type) || [];
if (handlers.length === 0) {
this.logger.warn('⚠️ No handlers found for event', {
eventType: event.type,
eventId: event.id,
});
return;
}
// 🎯 Group handlers by priority
const priorityGroups = {
high: handlers.filter(h => h.options?.priority === 'high'),
normal: handlers.filter(h => !h.options?.priority || h.options.priority === 'normal'),
low: handlers.filter(h => h.options?.priority === 'low'),
};
// ⚡ Process high priority first (sequential)
for (const handler of priorityGroups.high) {
await this.executeHandler(handler, event);
}
// 🔄 Process normal priority (parallel)
await Promise.allSettled(
priorityGroups.normal.map(handler => this.executeHandler(handler, event))
);
// 🐌 Process low priority (parallel, background)
setImmediate(() => {
Promise.allSettled(
priorityGroups.low.map(handler => this.executeHandler(handler, event))
);
});
}
// ⚡ Execute single handler
private async executeHandler(handler: EventHandler, event: DomainEvent): Promise<void> {
const startTime = Date.now();
const timeout = handler.options?.timeout || 30000; // 30s default
const maxRetries = handler.options?.retry || 3;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
// ⏱️ Execute with timeout
await Promise.race([
handler.handler(event),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Handler timeout')), timeout)
),
]);
this.metrics.eventsProcessed++;
this.logger.debug('✅ Event handler executed', {
eventType: event.type,
eventId: event.id,
duration: Date.now() - startTime,
attempt,
});
return; // Success, exit retry loop
} catch (error) {
this.metrics.eventsFailed++;
this.logger.error('❌ Event handler failed', {
eventType: event.type,
eventId: event.id,
error: error.message,
attempt,
maxRetries,
});
// 🔄 Retry logic
if (attempt < maxRetries) {
const delay = Math.min(1000 * Math.pow(2, attempt - 1), 10000); // Exponential backoff
await new Promise(resolve => setTimeout(resolve, delay));
} else {
// 📨 Send to dead letter queue
await this.sendToDeadLetterQueue(event, handler, error);
}
}
}
}
// 📡 Emit to real-time subscribers
private async emitToRealtime(event: DomainEvent): Promise<void> {
try {
// 🎯 Map domain events to real-time events
const realtimeEvent = this.mapToRealtimeEvent(event);
if (realtimeEvent) {
const { realtimeEmitter } = await import('@/server/routers/realtime-router');
realtimeEmitter.emit(realtimeEvent.type, realtimeEvent.data);
}
} catch (error) {
this.logger.error('Failed to emit to real-time', {
eventId: event.id,
error: error.message,
});
}
}
// 🗂️ Map domain event to real-time event
private mapToRealtimeEvent(event: DomainEvent): { type: string; data: any } | null {
switch (event.type) {
case 'UserCreated':
case 'UserUpdated':
case 'UserDeactivated':
return {
type: 'userOnline',
data: {
userId: event.aggregateId,
organizationId: event.data.organizationId,
status: event.type === 'UserDeactivated' ? 'offline' : 'online',
},
};
case 'MessageSent':
return {
type: 'messageReceived',
data: {
messageId: event.id,
chatId: event.data.chatId,
userId: event.metadata.userId,
content: event.data.content,
},
};
case 'NotificationCreated':
return {
type: 'notificationSent',
data: {
userId: event.data.targetUserId,
type: event.data.type,
title: event.data.title,
data: event.data.payload,
},
};
case 'DocumentUpdated':
return {
type: 'documentUpdated',
data: {
documentId: event.aggregateId,
userId: event.metadata.userId,
changes: event.data.changes,
},
};
default:
return null;
}
}
// 💀 Send to dead letter queue
private async sendToDeadLetterQueue(
event: DomainEvent,
handler: EventHandler,
error: Error
): Promise<void> {
const deadLetterEvent = {
originalEvent: event,
handler: handler.eventType,
error: error.message,
failedAt: new Date().toISOString(),
attempts: handler.options?.retry || 3,
};
// 📨 Store in database or external queue
this.logger.error('💀 Event sent to dead letter queue', deadLetterEvent);
// TODO: Implement actual dead letter queue storage
// await this.deadLetterQueue.add(deadLetterEvent);
}
// 📊 Get metrics
getMetrics() {
return {
...this.metrics,
handlersCount: Array.from(this.handlers.values()).reduce(
(sum, handlers) => sum + handlers.length, 0
),
eventTypesRegistered: this.handlers.size,
};
}
// 🧹 Cleanup
async shutdown(): Promise<void> {
this.logger.info('🧹 Shutting down event bus');
this.removeAllListeners();
this.handlers.clear();
}
}
// 📁 events/handlers/user-handlers.ts
import { EventHandler, DomainEvent } from '../event-bus';
import { EmailService } from '@/services/email-service';
import { AnalyticsService } from '@/services/analytics-service';
// 👤 User event handlers
export const userEventHandlers: EventHandler[] = [
// 📧 Send welcome email when user is created
{
eventType: 'UserCreated',
options: { priority: 'high', timeout: 10000 },
handler: async (event: DomainEvent) => {
const emailService = new EmailService();
await emailService.send({
to: event.data.email,
template: 'user-welcome',
data: {
name: event.data.name,
loginUrl: process.env.APP_URL + '/login',
},
metadata: {
userId: event.aggregateId,
correlationId: event.metadata.correlationId,
},
});
},
},
// 📊 Track user analytics
{
eventType: 'UserCreated',
options: { priority: 'normal' },
handler: async (event: DomainEvent) => {
const analyticsService = new AnalyticsService();
await analyticsService.track({
event: 'user_created',
userId: event.aggregateId,
properties: {
email: event.data.email,
organizationId: event.data.organizationId,
source: event.metadata.source,
timestamp: event.metadata.timestamp,
},
});
},
},
// 🔔 Create welcome notification
{
eventType: 'UserCreated',
options: { priority: 'low' },
handler: async (event: DomainEvent) => {
// Publish notification event
const { eventBus } = await import('../event-bus-instance');
await eventBus.publish({
id: `notif_${Date.now()}`,
type: 'NotificationCreated',
aggregateId: `notification_${event.aggregateId}`,
aggregateType: 'Notification',
data: {
targetUserId: event.aggregateId,
type: 'welcome',
title: 'Welcome to the platform!',
payload: {
body: 'Get started by completing your profile.',
actionUrl: '/profile/setup',
},
},
metadata: {
...event.metadata,
source: 'user-handler',
},
});
},
},
];
// 📁 events/handlers/notification-handlers.ts
// 🔔 Notification event handlers
export const notificationEventHandlers: EventHandler[] = [
// 📱 Send push notification
{
eventType: 'NotificationCreated',
options: { priority: 'high', timeout: 5000 },
handler: async (event: DomainEvent) => {
const pushService = new PushNotificationService();
await pushService.send({
userId: event.data.targetUserId,
title: event.data.title,
body: event.data.payload.body,
data: event.data.payload,
});
},
},
// 📧 Send email notification if critical
{
eventType: 'NotificationCreated',
options: { priority: 'normal' },
handler: async (event: DomainEvent) => {
if (event.data.type === 'critical' || event.data.type === 'security') {
const emailService = new EmailService();
await emailService.send({
to: event.data.targetUserId, // Will be resolved to email
template: 'notification-email',
data: {
title: event.data.title,
body: event.data.payload.body,
actionUrl: event.data.payload.actionUrl,
},
});
}
},
},
];
// 📁 events/setup.ts
// 🔧 Event system setup
import { EventBus } from './event-bus';
import { userEventHandlers } from './handlers/user-handlers';
import { notificationEventHandlers } from './handlers/notification-handlers';
export const eventBus = new EventBus();
// 📝 Register all handlers
export function setupEventHandlers() {
// 👤 User handlers
userEventHandlers.forEach(handler => {
eventBus.registerHandler(handler);
});
// 🔔 Notification handlers
notificationEventHandlers.forEach(handler => {
eventBus.registerHandler(handler);
});
console.log('✅ Event handlers registered');
}
// 📊 Global event listener for debugging
if (process.env.NODE_ENV === 'development') {
eventBus.on('*', (event: DomainEvent) => {
console.log('🎯 Event published:', {
type: event.type,
id: event.id,
aggregateId: event.aggregateId,
});
});
}
// 📁 server/routers/user-events-router.ts
import { z } from 'zod';
import { router, protectedProcedure } from '../trpc';
import { eventBus } from '@/events/setup';
export const userEventsRouter = router({
// 👤 Create user with events
create: protectedProcedure
.input(z.object({
email: z.string().email(),
name: z.string().min(2),
organizationId: z.string().uuid().optional(),
}))
.mutation(async ({ input, ctx }) => {
// 🏗️ Create user in database
const user = await ctx.prisma.user.create({
data: {
email: input.email,
name: input.name,
organizationId: input.organizationId,
},
});
// 📤 Publish domain event
await eventBus.publish({
id: `user_created_${Date.now()}`,
type: 'UserCreated',
aggregateId: user.id,
aggregateType: 'User',
data: {
email: user.email,
name: user.name,
organizationId: user.organizationId,
},
metadata: {
userId: ctx.session.user.id,
correlationId: ctx.traceId,
timestamp: new Date().toISOString(),
source: 'user-service',
version: 1,
},
});
return user;
}),
// 📝 Update user profile
updateProfile: protectedProcedure
.input(z.object({
userId: z.string().uuid(),
name: z.string().min(2).optional(),
email: z.string().email().optional(),
}))
.mutation(async ({ input, ctx }) => {
const user = await ctx.prisma.user.update({
where: { id: input.userId },
data: {
...(input.name && { name: input.name }),
...(input.email && { email: input.email }),
},
});
// 📤 Publish update event
await eventBus.publish({
id: `user_updated_${Date.now()}`,
type: 'UserUpdated',
aggregateId: user.id,
aggregateType: 'User',
data: {
changes: {
...(input.name && { name: { from: null, to: input.name } }),
...(input.email && { email: { from: null, to: input.email } }),
},
},
metadata: {
userId: ctx.session.user.id,
correlationId: ctx.traceId,
timestamp: new Date().toISOString(),
source: 'user-service',
version: 1,
},
});
return user;
}),
// 💬 Send message
sendMessage: protectedProcedure
.input(z.object({
chatId: z.string().uuid(),
content: z.string().min(1),
}))
.mutation(async ({ input, ctx }) => {
const message = await ctx.prisma.message.create({
data: {
chatId: input.chatId,
userId: ctx.session.user.id,
content: input.content,
},
});
// 📤 Publish message event
await eventBus.publish({
id: `message_sent_${Date.now()}`,
type: 'MessageSent',
aggregateId: message.id,
aggregateType: 'Message',
data: {
chatId: input.chatId,
content: input.content,
messageId: message.id,
},
metadata: {
userId: ctx.session.user.id,
correlationId: ctx.traceId,
timestamp: new Date().toISOString(),
source: 'chat-service',
version: 1,
},
});
return message;
}),
});
// 📁 services/notification-service.ts
import { PrismaClient } from '@prisma/client';
import { eventBus } from '@/events/setup';
export class NotificationService {
private prisma: PrismaClient;
constructor(prisma: PrismaClient) {
this.prisma = prisma;
}
// 🔔 Create notification
async create(params: {
userId: string;
type: 'info' | 'warning' | 'error' | 'success';
title: string;
message: string;
actionUrl?: string;
data?: any;
}) {
const notification = await this.prisma.notification.create({
data: {
userId: params.userId,
type: params.type,
title: params.title,
message: params.message,
actionUrl: params.actionUrl,
data: params.data || {},
isRead: false,
},
});
// 📤 Publish event for real-time
await eventBus.publish({
id: `notif_${Date.now()}`,
type: 'NotificationCreated',
aggregateId: notification.id,
aggregateType: 'Notification',
data: {
targetUserId: params.userId,
type: params.type,
title: params.title,
payload: {
body: params.message,
actionUrl: params.actionUrl,
data: params.data,
},
},
metadata: {
correlationId: `notif_${Date.now()}`,
timestamp: new Date().toISOString(),
source: 'notification-service',
version: 1,
},
});
return notification;
}
// 📋 Get user notifications
async getForUser(userId: string, options: {
page?: number;
limit?: number;
unreadOnly?: boolean;
} = {}) {
const { page = 1, limit = 20, unreadOnly = false } = options;
const where = {
userId,
...(unreadOnly && { isRead: false }),
};
const [notifications, total] = await Promise.all([
this.prisma.notification.findMany({
where,
orderBy: { createdAt: 'desc' },
skip: (page - 1) * limit,
take: limit,
}),
this.prisma.notification.count({ where }),
]);
return { notifications, total, page, totalPages: Math.ceil(total / limit) };
}
}
// 📁 services/push-notification-service.ts
import webpush from 'web-push';
export class PushNotificationService {
constructor() {
webpush.setVapidDetails(
'mailto:your-email@domain.com',
process.env.VAPID_PUBLIC_KEY!,
process.env.VAPID_PRIVATE_KEY!
);
}
// 📱 Send push notification
async send(params: {
userId: string;
title: string;
body: string;
icon?: string;
badge?: string;
data?: any;
actions?: Array<{ action: string; title: string; icon?: string }>;
}) {
// 🔍 Get user subscriptions
const subscriptions = await this.getUserSubscriptions(params.userId);
const payload = JSON.stringify({
title: params.title,
body: params.body,
icon: params.icon || '/icons/notification-icon.png',
badge: params.badge || '/icons/badge-icon.png',
data: params.data || {},
actions: params.actions || [],
});
// 📤 Send to all user devices
const results = await Promise.allSettled(
subscriptions.map(subscription =>
webpush.sendNotification(subscription, payload)
)
);
// 🧹 Remove invalid subscriptions
const invalidSubscriptions = results
.map((result, index) => ({ result, index }))
.filter(({ result }) => result.status === 'rejected')
.map(({ index }) => subscriptions[index]);
if (invalidSubscriptions.length > 0) {
await this.removeSubscriptions(invalidSubscriptions);
}
return {
sent: results.filter(r => r.status === 'fulfilled').length,
failed: results.filter(r => r.status === 'rejected').length,
};
}
}
// 📁 components/live-notifications.tsx
import React, { useState, useEffect } from 'react';
import { Bell, Check, X } from 'lucide-react';
import { trpc } from '@/lib/trpc';
export function LiveNotifications() {
const [notifications, setNotifications] = useState([]);
const [isOpen, setIsOpen] = useState(false);
const [unreadCount, setUnreadCount] = useState(0);
// 📡 Subscribe to real-time notifications
trpc.realtime.notifications.useSubscription(undefined, {
onData: (notification) => {
setNotifications(prev => [notification, ...prev]);
setUnreadCount(prev => prev + 1);
// 🔔 Browser notification
if (Notification.permission === 'granted') {
new Notification(notification.title, {
body: notification.data?.body,
icon: '/icons/notification.png',
});
}
},
});
// 📲 Request notification permission
useEffect(() => {
if ('Notification' in window && Notification.permission === 'default') {
Notification.requestPermission();
}
}, []);
return (
<div className="relative">
<button
onClick={() => setIsOpen(!isOpen)}
className="relative p-2 rounded-lg bg-gray-800 hover:bg-gray-700 transition-colors"
>
<Bell className="w-5 h-5 text-gray-300" />
{unreadCount > 0 && (
<span className="absolute -top-1 -right-1 bg-red-500 text-white text-xs rounded-full w-5 h-5 flex items-center justify-center">
{unreadCount > 9 ? '9+' : unreadCount}
</span>
)}
</button>
{isOpen && (
<NotificationPanel
notifications={notifications}
onClose={() => setIsOpen(false)}
onMarkRead={(id) => {
// Mark as read logic
setUnreadCount(prev => Math.max(0, prev - 1));
}}
/>
)}
</div>
);
}
function NotificationPanel({ notifications, onClose, onMarkRead }) {
return (
<div className="absolute top-full right-0 mt-2 w-80 bg-gray-900 rounded-lg border border-gray-700 shadow-xl z-50">
<div className="flex items-center justify-between p-4 border-b border-gray-700">
<h3 className="text-white font-semibold">Notifications</h3>
<button onClick={onClose} className="text-gray-400 hover:text-white">
<X className="w-4 h-4" />
</button>
</div>
<div className="max-h-96 overflow-y-auto">
{notifications.length === 0 ? (
<div className="p-4 text-center text-gray-500">No notifications</div>
) : (
notifications.map((notification, index) => (
<NotificationItem
key={index}
notification={notification}
onMarkRead={onMarkRead}
/>
))
)}
</div>
</div>
);
}