🚀 Oferta especial: 60% OFF no CrazyStack - Últimas vagas!Garantir vaga →
Módulo 5 - Aula 4

Real-time e WebSockets

Implemente sistemas real-time com tRPC: WebSocket server, subscriptions type-safe, event-driven architecture e live notifications para experiências instantâneas.

110 min
Expert
Real-time Systems

🎯 Por que Real-time é essencial para SaaS moderno?

Experiência Instantânea: Usuários esperam atualizações em tempo real sem refresh, aumentando engajamento em 40%.

Colaboração: Features como chat, edição simultânea e notificações live são diferenciais competitivos.

Performance: WebSockets eliminam polling, reduzindo latência em 90% e carga do servidor em 70%.

Type Safety: tRPC subscriptions mantém type safety end-to-end mesmo em conexões real-time.

⚠️ Conceitos Importantes para Entender

WebSocket Connection:

Conexão bidirecional persistente que permite comunicação real-time entre client/server.

tRPC Subscriptions:

Sistema de subscriptions type-safe que funciona sobre WebSockets ou Server-Sent Events.

Event-Driven Architecture:

Padrão onde componentes se comunicam através de eventos, permitindo desacoplamento.

Connection Management:

Gerenciamento de estado de conexões, reconnect automático e cleanup de recursos.

🏗️ Arquitetura Real-time com tRPC

📱
Client

React + tRPC subscriptions

🌐
Gateway

WebSocket + Event Router

🎯
Services

Event publishers/subscribers

Fluxo: Client subscreve → Gateway roteia → Services emitem eventos → Gateway distribui → Client recebe updates

🌐 WebSocket Server Setup

🔧 WebSocket Server com tRPC

server/websocket-server.ts
// 📁 server/websocket-server.ts
import { WebSocketServer } from 'ws';
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import { appRouter } from './routers/_app';
import { createContext } from './context';
import { Logger } from '@/infrastructure/logging/logger';

// 🎯 WebSocket Server Configuration
export interface WSServerConfig {
  port: number;
  path?: string;
  maxConnections?: number;
  pingInterval?: number;
  pongTimeout?: number;
  compression?: boolean;
  enableMetrics?: boolean;
}

// 🌐 WebSocket Server Manager
export class TRPCWebSocketServer {
  private wss: WebSocketServer;
  private logger: Logger;
  private config: WSServerConfig;
  private connections: Map<string, any> = new Map();
  private metrics = {
    totalConnections: 0,
    activeConnections: 0,
    messagesSent: 0,
    messagesReceived: 0,
    reconnections: 0,
  };
  
  constructor(config: WSServerConfig) {
    this.config = config;
    this.logger = new Logger('WebSocket Server');
  }
  
  // 🚀 Start WebSocket Server
  start(): void {
    this.wss = new WebSocketServer({
      port: this.config.port,
      path: this.config.path || '/trpc',
      
      // 🔧 Server options
      perMessageDeflate: this.config.compression !== false,
      maxPayload: 16 * 1024 * 1024, // 16MB
      clientTracking: true,
      
      // 🔍 Connection verification
      verifyClient: (info) => {
        // 🛡️ Basic security checks
        const origin = info.origin;
        const allowedOrigins = process.env.ALLOWED_ORIGINS?.split(',') || [];
        
        if (allowedOrigins.length > 0 && !allowedOrigins.includes(origin)) {
          this.logger.warn('Connection rejected: invalid origin', { origin });
          return false;
        }
        
        // 🔢 Connection limit check
        if (this.config.maxConnections && 
            this.wss.clients.size >= this.config.maxConnections) {
          this.logger.warn('Connection rejected: max connections reached', {
            current: this.wss.clients.size,
            max: this.config.maxConnections,
          });
          return false;
        }
        
        return true;
      },
    });
    
    // 📡 Apply tRPC WebSocket handler
    const handler = applyWSSHandler({
      wss: this.wss,
      router: appRouter,
      createContext: async ({ req }) => {
        // 🔐 Create context with authentication
        const context = await createContext({ req });
        return {
          ...context,
          // 📊 Add WebSocket specific data
          connectionId: this.generateConnectionId(),
          connectedAt: new Date(),
          type: 'websocket',
        };
      },
      
      // 🔧 WebSocket options
      keepAlive: {
        enabled: true,
        pingMs: this.config.pingInterval || 30000,
        pongWaitMs: this.config.pongTimeout || 5000,
      },
      
      // 📊 Event handlers
      onError: (opts) => {
        this.logger.error('WebSocket error', {
          error: opts.error.message,
          type: opts.type,
          connectionId: opts.ctx?.connectionId,
        });
      },
    });
    
    // 🔌 Connection event handlers
    this.setupConnectionHandlers();
    
    this.logger.info('🌐 WebSocket server started', {
      port: this.config.port,
      path: this.config.path,
      maxConnections: this.config.maxConnections,
    });
    
    // 📊 Start metrics collection
    if (this.config.enableMetrics) {
      this.startMetricsCollection();
    }
  }
  
  // 🔌 Setup connection event handlers
  private setupConnectionHandlers(): void {
    this.wss.on('connection', (ws, req) => {
      const connectionId = this.generateConnectionId();
      const clientIp = req.socket.remoteAddress;
      const userAgent = req.headers['user-agent'];
      
      // 📝 Store connection info
      this.connections.set(connectionId, {
        ws,
        connectedAt: new Date(),
        clientIp,
        userAgent,
        messageCount: 0,
      });
      
      // 📊 Update metrics
      this.metrics.totalConnections++;
      this.metrics.activeConnections++;
      
      this.logger.info('🔌 New WebSocket connection', {
        connectionId,
        clientIp,
        totalConnections: this.metrics.totalConnections,
        activeConnections: this.metrics.activeConnections,
      });
      
      // 📨 Message handler
      ws.on('message', (data) => {
        this.metrics.messagesReceived++;
        
        const connection = this.connections.get(connectionId);
        if (connection) {
          connection.messageCount++;
        }
        
        // 📊 Log high-frequency message sources
        if (connection && connection.messageCount > 100) {
          this.logger.warn('High message frequency detected', {
            connectionId,
            messageCount: connection.messageCount,
            clientIp,
          });
        }
      });
      
      // 🔌 Disconnect handler
      ws.on('close', (code, reason) => {
        this.connections.delete(connectionId);
        this.metrics.activeConnections--;
        
        this.logger.info('🔌 WebSocket disconnected', {
          connectionId,
          code,
          reason: reason.toString(),
          activeConnections: this.metrics.activeConnections,
        });
      });
      
      // 🚨 Error handler
      ws.on('error', (error) => {
        this.logger.error('WebSocket connection error', {
          connectionId,
          error: error.message,
          clientIp,
        });
      });
      
      // 💓 Ping/Pong for connection health
      ws.on('ping', () => {
        ws.pong();
      });
      
      ws.on('pong', () => {
        // Connection is alive
        const connection = this.connections.get(connectionId);
        if (connection) {
          connection.lastPong = new Date();
        }
      });
    });
  }

📊 Metrics e Health Monitoring

server/websocket-server.ts
// 📊 Start metrics collection
private startMetricsCollection(): void {
  setInterval(() => {
    this.collectMetrics();
  }, 60000); // Every minute
  
  setInterval(() => {
    this.healthCheck();
  }, 30000); // Every 30 seconds
}

// 📊 Collect WebSocket metrics
private collectMetrics(): void {
  const now = new Date();
  const activeConnections = Array.from(this.connections.values());
  
  // 🔍 Connection age analysis
  const connectionAges = activeConnections.map(conn => 
    now.getTime() - conn.connectedAt.getTime()
  );
  
  const avgConnectionAge = connectionAges.length > 0 
    ? connectionAges.reduce((a, b) => a + b, 0) / connectionAges.length 
    : 0;
  
  // 📈 Message statistics
  const totalMessages = activeConnections.reduce(
    (sum, conn) => sum + conn.messageCount, 0
  );
  
  const metrics = {
    ...this.metrics,
    avgConnectionAge: Math.round(avgConnectionAge / 1000), // seconds
    totalMessages,
    connectionsPerMinute: this.metrics.totalConnections, // reset after collection
    memoryUsage: process.memoryUsage(),
    uptime: process.uptime(),
  };
  
  this.logger.info('📊 WebSocket metrics', metrics);
  
  // 🔄 Reset counters
  this.metrics.totalConnections = 0;
  
  // 📤 Send metrics to monitoring system
  this.sendMetricsToMonitoring(metrics);
}

// 🏥 Health check for connections
private healthCheck(): void {
  const now = new Date();
  const staleConnections: string[] = [];
  
  for (const [connectionId, connection] of this.connections.entries()) {
    // 🕒 Check for stale connections (no pong in 2 minutes)
    if (connection.lastPong && 
        now.getTime() - connection.lastPong.getTime() > 120000) {
      staleConnections.push(connectionId);
    }
    
    // 📊 Check for idle connections (no messages in 10 minutes)
    if (now.getTime() - connection.connectedAt.getTime() > 600000 && 
        connection.messageCount === 0) {
      this.logger.warn('Idle connection detected', {
        connectionId,
        connectedAt: connection.connectedAt,
        messageCount: connection.messageCount,
      });
    }
  }
  
  // 🧹 Clean up stale connections
  staleConnections.forEach(connectionId => {
    const connection = this.connections.get(connectionId);
    if (connection?.ws) {
      connection.ws.terminate();
      this.connections.delete(connectionId);
      this.metrics.activeConnections--;
      
      this.logger.warn('Terminated stale connection', { connectionId });
    }
  });
}

// 🛑 Graceful shutdown
async shutdown(): Promise<void> {
  this.logger.info('🛑 Shutting down WebSocket server...');
  
  // 📢 Notify all clients
  const shutdownMessage = JSON.stringify({
    type: 'server.shutdown',
    message: 'Server is shutting down. Please reconnect.',
    reconnectIn: 5000,
  });
  
  this.wss.clients.forEach(ws => {
    if (ws.readyState === ws.OPEN) {
      ws.send(shutdownMessage);
    }
  });
  
  // ⏳ Wait for clients to disconnect gracefully
  await new Promise<void>((resolve) => {
    setTimeout(() => {
      this.wss.close(() => {
        this.logger.info('✅ WebSocket server shut down');
        resolve();
      });
    }, 2000);
  });
}

// 📤 Send metrics to monitoring system
private sendMetricsToMonitoring(metrics: any): void {
  // 🎯 Integration with monitoring services
  if (process.env.PROMETHEUS_ENABLED === 'true') {
    // Send to Prometheus
    this.sendToPrometheus(metrics);
  }
  
  if (process.env.DATADOG_ENABLED === 'true') {
    // Send to Datadog
    this.sendToDatadog(metrics);
  }
}

// 🔑 Generate unique connection ID
private generateConnectionId(): string {
  return `ws_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`;
}

// 📊 Get current server status
getStatus(): {
  isRunning: boolean;
  connections: number;
  metrics: any;
  uptime: number;
} {
  return {
    isRunning: this.wss ? true : false,
    connections: this.metrics.activeConnections,
    metrics: this.metrics,
    uptime: process.uptime(),
  };
}

// 📢 Broadcast to all connections
broadcast(message: any, filter?: (connection: any) => boolean): void {
  const data = JSON.stringify(message);
  
  this.wss.clients.forEach(ws => {
    if (ws.readyState === ws.OPEN) {
      if (!filter || filter({ ws })) {
        ws.send(data);
        this.metrics.messagesSent++;
      }
    }
  });
}

// 🎯 Send to specific connections
sendToConnections(connectionIds: string[], message: any): void {
  const data = JSON.stringify(message);
  
  connectionIds.forEach(connectionId => {
    const connection = this.connections.get(connectionId);
    if (connection?.ws && connection.ws.readyState === connection.ws.OPEN) {
      connection.ws.send(data);
      this.metrics.messagesSent++;
    }
  });
}

🚀 Production WebSocket Setup

server/production-ws.ts
// 📁 server/production-ws.ts
// 🚀 Production WebSocket Server Setup

import { TRPCWebSocketServer } from './websocket-server';
import { Logger } from '@/infrastructure/logging/logger';

const logger = new Logger('Production WebSocket');

// 🔧 Production configuration
const wsConfig = {
  port: parseInt(process.env.WS_PORT || '3001'),
  path: '/trpc',
  maxConnections: parseInt(process.env.WS_MAX_CONNECTIONS || '10000'),
  pingInterval: 30000, // 30 seconds
  pongTimeout: 5000,   // 5 seconds
  compression: true,
  enableMetrics: true,
};

// 🌐 Start WebSocket server
async function startWebSocketServer() {
  const wsServer = new TRPCWebSocketServer(wsConfig);
  
  try {
    wsServer.start();
    
    logger.info('🎉 WebSocket server started successfully', {
      port: wsConfig.port,
      maxConnections: wsConfig.maxConnections,
      environment: process.env.NODE_ENV,
    });
    
    // 🔄 Graceful shutdown handlers
    const shutdown = async (signal: string) => {
      logger.info(`📛 Received ${signal}, shutting down gracefully`);
      await wsServer.shutdown();
      process.exit(0);
    };
    
    process.on('SIGTERM', () => shutdown('SIGTERM'));
    process.on('SIGINT', () => shutdown('SIGINT'));
    
    // 📊 Status endpoint for health checks
    setInterval(() => {
      const status = wsServer.getStatus();
      logger.info('🏥 WebSocket server status', status);
    }, 300000); // Every 5 minutes
    
  } catch (error) {
    logger.error('💥 Failed to start WebSocket server', {
      error: error.message,
      stack: error.stack,
    });
    process.exit(1);
  }
}

// 🎬 Execute if main module
if (require.main === module) {
  startWebSocketServer();
}

export { startWebSocketServer, wsConfig };

// 📁 docker/websocket.Dockerfile
// 🐳 Docker setup for WebSocket server

FROM node:18-alpine AS base
RUN apk add --no-cache libc6-compat
WORKDIR /app

# 📦 Dependencies
FROM base AS deps
COPY package.json package-lock.json ./
RUN npm ci --only=production

# 🏗️ Builder
FROM base AS builder
COPY package.json package-lock.json ./
RUN npm ci
COPY . .
RUN npm run build:websocket

# 🚀 Runner
FROM base AS runner
WORKDIR /app
ENV NODE_ENV=production

RUN addgroup --system --gid 1001 nodejs
RUN adduser --system --uid 1001 websocket

COPY --from=deps /app/node_modules ./node_modules
COPY --from=builder /app/dist ./dist
COPY --from=builder /app/package.json ./package.json

USER websocket

EXPOSE 3001

# 🏥 Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
  CMD node -e "
    const net = require('net');
    const client = net.createConnection({port: 3001}, () => {
      client.end();
      process.exit(0);
    });
    client.on('error', () => process.exit(1));
  "

CMD ["node", "dist/server/production-ws.js"]

📡 tRPC Subscriptions

🔧 Server-side Subscription Router

server/routers/realtime-router.ts
// 📁 server/routers/realtime-router.ts
import { z } from 'zod';
import { router, protectedProcedure } from '../trpc';
import { observable } from '@trpc/server/observable';
import { EventEmitter } from 'events';

// 🎯 Event types
type RealtimeEvents = {
  userOnline: { userId: string; organizationId: string; status: 'online' | 'away' | 'offline' };
  messageReceived: { messageId: string; chatId: string; userId: string; content: string };
  notificationSent: { userId: string; type: string; title: string; data: any };
  documentUpdated: { documentId: string; userId: string; changes: any };
};

// 📡 Event emitter for real-time events
export const realtimeEmitter = new EventEmitter();

// 🚀 Realtime Router
export const realtimeRouter = router({
  // 👥 User presence subscription
  userPresence: protectedProcedure
    .input(z.object({
      organizationId: z.string().uuid(),
    }))
    .subscription(async ({ input, ctx }) => {
      return observable<RealtimeEvents['userOnline']>((emit) => {
        // 🔍 Listen for user presence changes
        const onUserPresence = (data: RealtimeEvents['userOnline']) => {
          if (data.organizationId === input.organizationId) {
            emit.next(data);
          }
        };

        realtimeEmitter.on('userOnline', onUserPresence);

        // 📤 Send current user online status
        emit.next({
          userId: ctx.session.user.id,
          organizationId: input.organizationId,
          status: 'online',
        });

        // 🧹 Cleanup function
        return () => {
          realtimeEmitter.off('userOnline', onUserPresence);
          
          // 📤 Send offline status when disconnecting
          realtimeEmitter.emit('userOnline', {
            userId: ctx.session.user.id,
            organizationId: input.organizationId,
            status: 'offline',
          });
        };
      });
    }),

  // 💬 Chat messages subscription
  chatMessages: protectedProcedure
    .input(z.object({
      chatId: z.string().uuid(),
    }))
    .subscription(async ({ input, ctx }) => {
      return observable<RealtimeEvents['messageReceived']>((emit) => {
        const onMessage = (data: RealtimeEvents['messageReceived']) => {
          if (data.chatId === input.chatId) {
            emit.next(data);
          }
        };

        realtimeEmitter.on('messageReceived', onMessage);

        return () => {
          realtimeEmitter.off('messageReceived', onMessage);
        };
      });
    }),

  // 🔔 Notifications subscription
  notifications: protectedProcedure
    .subscription(async ({ ctx }) => {
      return observable<RealtimeEvents['notificationSent']>((emit) => {
        const onNotification = (data: RealtimeEvents['notificationSent']) => {
          if (data.userId === ctx.session.user.id) {
            emit.next(data);
          }
        };

        realtimeEmitter.on('notificationSent', onNotification);

        return () => {
          realtimeEmitter.off('notificationSent', onNotification);
        };
      });
    }),

  // 📝 Document collaboration subscription
  documentChanges: protectedProcedure
    .input(z.object({
      documentId: z.string().uuid(),
    }))
    .subscription(async ({ input, ctx }) => {
      return observable<RealtimeEvents['documentUpdated']>((emit) => {
        const onDocumentChange = (data: RealtimeEvents['documentUpdated']) => {
          // 🚫 Don't emit changes from the same user
          if (data.documentId === input.documentId && data.userId !== ctx.session.user.id) {
            emit.next(data);
          }
        };

        realtimeEmitter.on('documentUpdated', onDocumentChange);

        return () => {
          realtimeEmitter.off('documentUpdated', onDocumentChange);
        };
      });
    }),
});

📱 Client-side Subscription Hooks

client/hooks/use-realtime.ts
// 📁 client/hooks/use-realtime.ts
import { useEffect, useRef, useState } from 'react';
import { trpc } from '@/lib/trpc';

// 👥 User presence hook
export function useUserPresence(organizationId: string) {
  const [onlineUsers, setOnlineUsers] = useState<Set<string>>(new Set());
  
  trpc.realtime.userPresence.useSubscription(
    { organizationId },
    {
      onData: (data) => {
        setOnlineUsers(prev => {
          const newSet = new Set(prev);
          if (data.status === 'online') {
            newSet.add(data.userId);
          } else {
            newSet.delete(data.userId);
          }
          return newSet;
        });
      },
      onError: (error) => {
        console.error('User presence subscription error:', error);
      },
    }
  );

  return { onlineUsers: Array.from(onlineUsers) };
}

// 💬 Chat messages hook
export function useChatMessages(chatId: string) {
  const [messages, setMessages] = useState<any[]>([]);
  const messagesRef = useRef<any[]>([]);

  trpc.realtime.chatMessages.useSubscription(
    { chatId },
    {
      onData: (newMessage) => {
        setMessages(prev => {
          const updated = [...prev, newMessage];
          messagesRef.current = updated;
          return updated;
        });
      },
      onError: (error) => {
        console.error('Chat subscription error:', error);
      },
    }
  );

  // 🧹 Clear messages when chat changes
  useEffect(() => {
    setMessages([]);
    messagesRef.current = [];
  }, [chatId]);

  return { 
    messages,
    addMessage: (message: any) => {
      setMessages(prev => [...prev, message]);
    }
  };
}

// 🔔 Notifications hook
export function useNotifications() {
  const [notifications, setNotifications] = useState<any[]>([]);
  const [unreadCount, setUnreadCount] = useState(0);

  trpc.realtime.notifications.useSubscription(undefined, {
    onData: (notification) => {
      setNotifications(prev => [notification, ...prev]);
      setUnreadCount(prev => prev + 1);
      
      // 🔔 Show browser notification
      if (Notification.permission === 'granted') {
        new Notification(notification.title, {
          body: notification.data?.body,
          icon: '/favicon.ico',
        });
      }
    },
    onError: (error) => {
      console.error('Notifications subscription error:', error);
    },
  });

  const markAsRead = (notificationId: string) => {
    setNotifications(prev => 
      prev.map(n => n.id === notificationId ? { ...n, read: true } : n)
    );
    setUnreadCount(prev => Math.max(0, prev - 1));
  };

  const clearAll = () => {
    setNotifications([]);
    setUnreadCount(0);
  };

  return {
    notifications,
    unreadCount,
    markAsRead,
    clearAll,
  };
}

⚛️ React Components com Real-time

components/realtime/user-presence.tsx
// 📁 components/realtime/user-presence.tsx
import { useUserPresence } from '@/hooks/use-realtime';
import { Avatar } from '@/components/ui/avatar';

interface UserPresenceProps {
  organizationId: string;
  currentUserId: string;
}

export function UserPresence({ organizationId, currentUserId }: UserPresenceProps) {
  const { onlineUsers } = useUserPresence(organizationId);
  
  return (
    <div className="bg-gray-900/50 rounded-lg p-4 border border-gray-800">
      <h3 className="text-white font-semibold mb-3">
        👥 Online ({onlineUsers.length})
      </h3>
      
      <div className="space-y-2">
        {onlineUsers.map(userId => (
          <div 
            key={userId}
            className="flex items-center gap-2"
          >
            <div className="relative">
              <Avatar userId={userId} size="sm" />
              <div className="absolute -bottom-1 -right-1 w-3 h-3 bg-green-500 rounded-full border-2 border-gray-900"></div>
            </div>
            <span className="text-gray-300 text-sm">
              {userId === currentUserId ? 'You' : `User ${userId.slice(0, 8)}`}
            </span>
          </div>
        ))}
      </div>
    </div>
  );
}

// 📁 components/realtime/live-notifications.tsx
import { useNotifications } from '@/hooks/use-realtime';
import { Bell, X } from 'lucide-react';

export function LiveNotifications() {
  const { notifications, unreadCount, markAsRead, clearAll } = useNotifications();
  const [isOpen, setIsOpen] = useState(false);
  
  return (
    <div className="relative">
      {/* 🔔 Notification Bell */}
      <button
        onClick={() => setIsOpen(!isOpen)}
        className="relative p-2 rounded-lg bg-gray-800 hover:bg-gray-700 transition-colors"
      >
        <Bell className="w-5 h-5 text-gray-300" />
        {unreadCount > 0 && (
          <span className="absolute -top-1 -right-1 bg-red-500 text-white text-xs rounded-full w-5 h-5 flex items-center justify-center">
            {unreadCount > 9 ? '9+' : unreadCount}
          </span>
        )}
      </button>
      
      {/* 📋 Notifications Panel */}
      {isOpen && (
        <div className="absolute top-full right-0 mt-2 w-80 bg-gray-900 rounded-lg border border-gray-700 shadow-xl z-50">
          <div className="flex items-center justify-between p-4 border-b border-gray-700">
            <h3 className="text-white font-semibold">Notifications</h3>
            <div className="flex items-center gap-2">
              {notifications.length > 0 && (
                <button
                  onClick={clearAll}
                  className="text-gray-400 hover:text-white text-xs"
                >
                  Clear all
                </button>
              )}
              <button
                onClick={() => setIsOpen(false)}
                className="text-gray-400 hover:text-white"
              >
                <X className="w-4 h-4" />
              </button>
            </div>
          </div>
          
          <div className="max-h-96 overflow-y-auto">
            {notifications.length === 0 ? (
              <div className="p-4 text-center text-gray-500">
                No notifications
              </div>
            ) : (
              notifications.map((notification, index) => (
                <div
                  key={index}
                  className={`p-4 border-b border-gray-700 last:border-b-0 ${
                    !notification.read ? 'bg-blue-900/20' : ''
                  }`}
                  onClick={() => markAsRead(notification.id)}
                >
                  <div className="flex items-start gap-3">
                    <div className="flex-1">
                      <h4 className="text-white font-medium text-sm">
                        {notification.title}
                      </h4>
                      {notification.data?.body && (
                        <p className="text-gray-400 text-sm mt-1">
                          {notification.data.body}
                        </p>
                      )}
                      <p className="text-gray-500 text-xs mt-2">
                        {new Date(notification.timestamp).toLocaleTimeString()}
                      </p>
                    </div>
                    {!notification.read && (
                      <div className="w-2 h-2 bg-blue-500 rounded-full mt-1"></div>
                    )}
                  </div>
                </div>
              ))
            )}
          </div>
        </div>
      )}
    </div>
  );
}

🎯 Event-Driven Architecture

🚌 Event Bus Implementation

events/event-bus.ts
// 📁 events/event-bus.ts
import { EventEmitter } from 'events';
import { Logger } from '@/infrastructure/logging/logger';

// 🎯 Event types
export interface DomainEvent {
  id: string;
  type: string;
  aggregateId: string;
  aggregateType: string;
  data: any;
  metadata: {
    userId?: string;
    correlationId: string;
    timestamp: string;
    source: string;
    version: number;
  };
}

export interface EventHandler<T = any> {
  eventType: string;
  handler: (event: DomainEvent<T>) => Promise<void>;
  options?: {
    retry?: number;
    timeout?: number;
    priority?: 'low' | 'normal' | 'high';
  };
}

// 🚌 Event Bus
export class EventBus extends EventEmitter {
  private logger: Logger;
  private handlers: Map<string, EventHandler[]> = new Map();
  private metrics = {
    eventsPublished: 0,
    eventsProcessed: 0,
    eventsFailed: 0,
    handlersRegistered: 0,
  };

  constructor() {
    super();
    this.logger = new Logger('EventBus');
    this.setMaxListeners(1000); // Support many handlers
  }

  // 📝 Register event handler
  registerHandler<T>(handler: EventHandler<T>): void {
    const existingHandlers = this.handlers.get(handler.eventType) || [];
    existingHandlers.push(handler);
    this.handlers.set(handler.eventType, existingHandlers);

    this.metrics.handlersRegistered++;

    this.logger.info('📝 Event handler registered', {
      eventType: handler.eventType,
      totalHandlers: existingHandlers.length,
    });
  }

  // 📤 Publish event
  async publish(event: DomainEvent): Promise<void> {
    this.metrics.eventsPublished++;

    this.logger.debug('📤 Publishing event', {
      eventId: event.id,
      eventType: event.type,
      aggregateId: event.aggregateId,
    });

    // 📡 Emit to subscribers
    this.emit(event.type, event);
    this.emit('*', event); // Global listener

    // 🔄 Process handlers
    await this.processHandlers(event);

    // 📡 Emit to real-time subscribers
    await this.emitToRealtime(event);
  }

  // 🔄 Process event handlers
  private async processHandlers(event: DomainEvent): Promise<void> {
    const handlers = this.handlers.get(event.type) || [];
    
    if (handlers.length === 0) {
      this.logger.warn('⚠️ No handlers found for event', {
        eventType: event.type,
        eventId: event.id,
      });
      return;
    }

    // 🎯 Group handlers by priority
    const priorityGroups = {
      high: handlers.filter(h => h.options?.priority === 'high'),
      normal: handlers.filter(h => !h.options?.priority || h.options.priority === 'normal'),
      low: handlers.filter(h => h.options?.priority === 'low'),
    };

    // ⚡ Process high priority first (sequential)
    for (const handler of priorityGroups.high) {
      await this.executeHandler(handler, event);
    }

    // 🔄 Process normal priority (parallel)
    await Promise.allSettled(
      priorityGroups.normal.map(handler => this.executeHandler(handler, event))
    );

    // 🐌 Process low priority (parallel, background)
    setImmediate(() => {
      Promise.allSettled(
        priorityGroups.low.map(handler => this.executeHandler(handler, event))
      );
    });
  }

  // ⚡ Execute single handler
  private async executeHandler(handler: EventHandler, event: DomainEvent): Promise<void> {
    const startTime = Date.now();
    const timeout = handler.options?.timeout || 30000; // 30s default
    const maxRetries = handler.options?.retry || 3;

    for (let attempt = 1; attempt <= maxRetries; attempt++) {
      try {
        // ⏱️ Execute with timeout
        await Promise.race([
          handler.handler(event),
          new Promise((_, reject) => 
            setTimeout(() => reject(new Error('Handler timeout')), timeout)
          ),
        ]);

        this.metrics.eventsProcessed++;
        
        this.logger.debug('✅ Event handler executed', {
          eventType: event.type,
          eventId: event.id,
          duration: Date.now() - startTime,
          attempt,
        });

        return; // Success, exit retry loop
      } catch (error) {
        this.metrics.eventsFailed++;

        this.logger.error('❌ Event handler failed', {
          eventType: event.type,
          eventId: event.id,
          error: error.message,
          attempt,
          maxRetries,
        });

        // 🔄 Retry logic
        if (attempt < maxRetries) {
          const delay = Math.min(1000 * Math.pow(2, attempt - 1), 10000); // Exponential backoff
          await new Promise(resolve => setTimeout(resolve, delay));
        } else {
          // 📨 Send to dead letter queue
          await this.sendToDeadLetterQueue(event, handler, error);
        }
      }
    }
  }

  // 📡 Emit to real-time subscribers
  private async emitToRealtime(event: DomainEvent): Promise<void> {
    try {
      // 🎯 Map domain events to real-time events
      const realtimeEvent = this.mapToRealtimeEvent(event);
      if (realtimeEvent) {
        const { realtimeEmitter } = await import('@/server/routers/realtime-router');
        realtimeEmitter.emit(realtimeEvent.type, realtimeEvent.data);
      }
    } catch (error) {
      this.logger.error('Failed to emit to real-time', {
        eventId: event.id,
        error: error.message,
      });
    }
  }

  // 🗂️ Map domain event to real-time event
  private mapToRealtimeEvent(event: DomainEvent): { type: string; data: any } | null {
    switch (event.type) {
      case 'UserCreated':
      case 'UserUpdated':
      case 'UserDeactivated':
        return {
          type: 'userOnline',
          data: {
            userId: event.aggregateId,
            organizationId: event.data.organizationId,
            status: event.type === 'UserDeactivated' ? 'offline' : 'online',
          },
        };

      case 'MessageSent':
        return {
          type: 'messageReceived',
          data: {
            messageId: event.id,
            chatId: event.data.chatId,
            userId: event.metadata.userId,
            content: event.data.content,
          },
        };

      case 'NotificationCreated':
        return {
          type: 'notificationSent',
          data: {
            userId: event.data.targetUserId,
            type: event.data.type,
            title: event.data.title,
            data: event.data.payload,
          },
        };

      case 'DocumentUpdated':
        return {
          type: 'documentUpdated',
          data: {
            documentId: event.aggregateId,
            userId: event.metadata.userId,
            changes: event.data.changes,
          },
        };

      default:
        return null;
    }
  }

  // 💀 Send to dead letter queue
  private async sendToDeadLetterQueue(
    event: DomainEvent,
    handler: EventHandler,
    error: Error
  ): Promise<void> {
    const deadLetterEvent = {
      originalEvent: event,
      handler: handler.eventType,
      error: error.message,
      failedAt: new Date().toISOString(),
      attempts: handler.options?.retry || 3,
    };

    // 📨 Store in database or external queue
    this.logger.error('💀 Event sent to dead letter queue', deadLetterEvent);
    
    // TODO: Implement actual dead letter queue storage
    // await this.deadLetterQueue.add(deadLetterEvent);
  }

  // 📊 Get metrics
  getMetrics() {
    return {
      ...this.metrics,
      handlersCount: Array.from(this.handlers.values()).reduce(
        (sum, handlers) => sum + handlers.length, 0
      ),
      eventTypesRegistered: this.handlers.size,
    };
  }

  // 🧹 Cleanup
  async shutdown(): Promise<void> {
    this.logger.info('🧹 Shutting down event bus');
    this.removeAllListeners();
    this.handlers.clear();
  }
}

🎭 Event Handlers Implementation

events/handlers/user-handlers.ts
// 📁 events/handlers/user-handlers.ts
import { EventHandler, DomainEvent } from '../event-bus';
import { EmailService } from '@/services/email-service';
import { AnalyticsService } from '@/services/analytics-service';

// 👤 User event handlers
export const userEventHandlers: EventHandler[] = [
  // 📧 Send welcome email when user is created
  {
    eventType: 'UserCreated',
    options: { priority: 'high', timeout: 10000 },
    handler: async (event: DomainEvent) => {
      const emailService = new EmailService();
      
      await emailService.send({
        to: event.data.email,
        template: 'user-welcome',
        data: {
          name: event.data.name,
          loginUrl: process.env.APP_URL + '/login',
        },
        metadata: {
          userId: event.aggregateId,
          correlationId: event.metadata.correlationId,
        },
      });
    },
  },

  // 📊 Track user analytics
  {
    eventType: 'UserCreated',
    options: { priority: 'normal' },
    handler: async (event: DomainEvent) => {
      const analyticsService = new AnalyticsService();
      
      await analyticsService.track({
        event: 'user_created',
        userId: event.aggregateId,
        properties: {
          email: event.data.email,
          organizationId: event.data.organizationId,
          source: event.metadata.source,
          timestamp: event.metadata.timestamp,
        },
      });
    },
  },

  // 🔔 Create welcome notification
  {
    eventType: 'UserCreated',
    options: { priority: 'low' },
    handler: async (event: DomainEvent) => {
      // Publish notification event
      const { eventBus } = await import('../event-bus-instance');
      
      await eventBus.publish({
        id: `notif_${Date.now()}`,
        type: 'NotificationCreated',
        aggregateId: `notification_${event.aggregateId}`,
        aggregateType: 'Notification',
        data: {
          targetUserId: event.aggregateId,
          type: 'welcome',
          title: 'Welcome to the platform!',
          payload: {
            body: 'Get started by completing your profile.',
            actionUrl: '/profile/setup',
          },
        },
        metadata: {
          ...event.metadata,
          source: 'user-handler',
        },
      });
    },
  },
];

// 📁 events/handlers/notification-handlers.ts
// 🔔 Notification event handlers

export const notificationEventHandlers: EventHandler[] = [
  // 📱 Send push notification
  {
    eventType: 'NotificationCreated',
    options: { priority: 'high', timeout: 5000 },
    handler: async (event: DomainEvent) => {
      const pushService = new PushNotificationService();
      
      await pushService.send({
        userId: event.data.targetUserId,
        title: event.data.title,
        body: event.data.payload.body,
        data: event.data.payload,
      });
    },
  },

  // 📧 Send email notification if critical
  {
    eventType: 'NotificationCreated',
    options: { priority: 'normal' },
    handler: async (event: DomainEvent) => {
      if (event.data.type === 'critical' || event.data.type === 'security') {
        const emailService = new EmailService();
        
        await emailService.send({
          to: event.data.targetUserId, // Will be resolved to email
          template: 'notification-email',
          data: {
            title: event.data.title,
            body: event.data.payload.body,
            actionUrl: event.data.payload.actionUrl,
          },
        });
      }
    },
  },
];

// 📁 events/setup.ts
// 🔧 Event system setup

import { EventBus } from './event-bus';
import { userEventHandlers } from './handlers/user-handlers';
import { notificationEventHandlers } from './handlers/notification-handlers';

export const eventBus = new EventBus();

// 📝 Register all handlers
export function setupEventHandlers() {
  // 👤 User handlers
  userEventHandlers.forEach(handler => {
    eventBus.registerHandler(handler);
  });

  // 🔔 Notification handlers  
  notificationEventHandlers.forEach(handler => {
    eventBus.registerHandler(handler);
  });

  console.log('✅ Event handlers registered');
}

// 📊 Global event listener for debugging
if (process.env.NODE_ENV === 'development') {
  eventBus.on('*', (event: DomainEvent) => {
    console.log('🎯 Event published:', {
      type: event.type,
      id: event.id,
      aggregateId: event.aggregateId,
    });
  });
}

🔗 Integration com tRPC

server/routers/user-events-router.ts
// 📁 server/routers/user-events-router.ts
import { z } from 'zod';
import { router, protectedProcedure } from '../trpc';
import { eventBus } from '@/events/setup';

export const userEventsRouter = router({
  // 👤 Create user with events
  create: protectedProcedure
    .input(z.object({
      email: z.string().email(),
      name: z.string().min(2),
      organizationId: z.string().uuid().optional(),
    }))
    .mutation(async ({ input, ctx }) => {
      // 🏗️ Create user in database
      const user = await ctx.prisma.user.create({
        data: {
          email: input.email,
          name: input.name,
          organizationId: input.organizationId,
        },
      });

      // 📤 Publish domain event
      await eventBus.publish({
        id: `user_created_${Date.now()}`,
        type: 'UserCreated',
        aggregateId: user.id,
        aggregateType: 'User',
        data: {
          email: user.email,
          name: user.name,
          organizationId: user.organizationId,
        },
        metadata: {
          userId: ctx.session.user.id,
          correlationId: ctx.traceId,
          timestamp: new Date().toISOString(),
          source: 'user-service',
          version: 1,
        },
      });

      return user;
    }),

  // 📝 Update user profile
  updateProfile: protectedProcedure
    .input(z.object({
      userId: z.string().uuid(),
      name: z.string().min(2).optional(),
      email: z.string().email().optional(),
    }))
    .mutation(async ({ input, ctx }) => {
      const user = await ctx.prisma.user.update({
        where: { id: input.userId },
        data: {
          ...(input.name && { name: input.name }),
          ...(input.email && { email: input.email }),
        },
      });

      // 📤 Publish update event
      await eventBus.publish({
        id: `user_updated_${Date.now()}`,
        type: 'UserUpdated',
        aggregateId: user.id,
        aggregateType: 'User',
        data: {
          changes: {
            ...(input.name && { name: { from: null, to: input.name } }),
            ...(input.email && { email: { from: null, to: input.email } }),
          },
        },
        metadata: {
          userId: ctx.session.user.id,
          correlationId: ctx.traceId,
          timestamp: new Date().toISOString(),
          source: 'user-service',
          version: 1,
        },
      });

      return user;
    }),

  // 💬 Send message
  sendMessage: protectedProcedure
    .input(z.object({
      chatId: z.string().uuid(),
      content: z.string().min(1),
    }))
    .mutation(async ({ input, ctx }) => {
      const message = await ctx.prisma.message.create({
        data: {
          chatId: input.chatId,
          userId: ctx.session.user.id,
          content: input.content,
        },
      });

      // 📤 Publish message event
      await eventBus.publish({
        id: `message_sent_${Date.now()}`,
        type: 'MessageSent',
        aggregateId: message.id,
        aggregateType: 'Message',
        data: {
          chatId: input.chatId,
          content: input.content,
          messageId: message.id,
        },
        metadata: {
          userId: ctx.session.user.id,
          correlationId: ctx.traceId,
          timestamp: new Date().toISOString(),
          source: 'chat-service',
          version: 1,
        },
      });

      return message;
    }),
});

🔔 Live Notifications System

📱 Notification Service

services/notification-service.ts
// 📁 services/notification-service.ts
import { PrismaClient } from '@prisma/client';
import { eventBus } from '@/events/setup';

export class NotificationService {
  private prisma: PrismaClient;

  constructor(prisma: PrismaClient) {
    this.prisma = prisma;
  }

  // 🔔 Create notification
  async create(params: {
    userId: string;
    type: 'info' | 'warning' | 'error' | 'success';
    title: string;
    message: string;
    actionUrl?: string;
    data?: any;
  }) {
    const notification = await this.prisma.notification.create({
      data: {
        userId: params.userId,
        type: params.type,
        title: params.title,
        message: params.message,
        actionUrl: params.actionUrl,
        data: params.data || {},
        isRead: false,
      },
    });

    // 📤 Publish event for real-time
    await eventBus.publish({
      id: `notif_${Date.now()}`,
      type: 'NotificationCreated',
      aggregateId: notification.id,
      aggregateType: 'Notification',
      data: {
        targetUserId: params.userId,
        type: params.type,
        title: params.title,
        payload: {
          body: params.message,
          actionUrl: params.actionUrl,
          data: params.data,
        },
      },
      metadata: {
        correlationId: `notif_${Date.now()}`,
        timestamp: new Date().toISOString(),
        source: 'notification-service',
        version: 1,
      },
    });

    return notification;
  }

  // 📋 Get user notifications
  async getForUser(userId: string, options: {
    page?: number;
    limit?: number;
    unreadOnly?: boolean;
  } = {}) {
    const { page = 1, limit = 20, unreadOnly = false } = options;
    
    const where = {
      userId,
      ...(unreadOnly && { isRead: false }),
    };

    const [notifications, total] = await Promise.all([
      this.prisma.notification.findMany({
        where,
        orderBy: { createdAt: 'desc' },
        skip: (page - 1) * limit,
        take: limit,
      }),
      this.prisma.notification.count({ where }),
    ]);

    return { notifications, total, page, totalPages: Math.ceil(total / limit) };
  }
}

📲 Push Notification Setup

services/push-notification-service.ts
// 📁 services/push-notification-service.ts
import webpush from 'web-push';

export class PushNotificationService {
  constructor() {
    webpush.setVapidDetails(
      'mailto:your-email@domain.com',
      process.env.VAPID_PUBLIC_KEY!,
      process.env.VAPID_PRIVATE_KEY!
    );
  }

  // 📱 Send push notification
  async send(params: {
    userId: string;
    title: string;
    body: string;
    icon?: string;
    badge?: string;
    data?: any;
    actions?: Array<{ action: string; title: string; icon?: string }>;
  }) {
    // 🔍 Get user subscriptions
    const subscriptions = await this.getUserSubscriptions(params.userId);
    
    const payload = JSON.stringify({
      title: params.title,
      body: params.body,
      icon: params.icon || '/icons/notification-icon.png',
      badge: params.badge || '/icons/badge-icon.png',
      data: params.data || {},
      actions: params.actions || [],
    });

    // 📤 Send to all user devices
    const results = await Promise.allSettled(
      subscriptions.map(subscription =>
        webpush.sendNotification(subscription, payload)
      )
    );

    // 🧹 Remove invalid subscriptions
    const invalidSubscriptions = results
      .map((result, index) => ({ result, index }))
      .filter(({ result }) => result.status === 'rejected')
      .map(({ index }) => subscriptions[index]);

    if (invalidSubscriptions.length > 0) {
      await this.removeSubscriptions(invalidSubscriptions);
    }

    return {
      sent: results.filter(r => r.status === 'fulfilled').length,
      failed: results.filter(r => r.status === 'rejected').length,
    };
  }
}

⚛️ Live Notification Component

components/live-notifications.tsx
// 📁 components/live-notifications.tsx
import React, { useState, useEffect } from 'react';
import { Bell, Check, X } from 'lucide-react';
import { trpc } from '@/lib/trpc';

export function LiveNotifications() {
  const [notifications, setNotifications] = useState([]);
  const [isOpen, setIsOpen] = useState(false);
  const [unreadCount, setUnreadCount] = useState(0);

  // 📡 Subscribe to real-time notifications
  trpc.realtime.notifications.useSubscription(undefined, {
    onData: (notification) => {
      setNotifications(prev => [notification, ...prev]);
      setUnreadCount(prev => prev + 1);
      
      // 🔔 Browser notification
      if (Notification.permission === 'granted') {
        new Notification(notification.title, {
          body: notification.data?.body,
          icon: '/icons/notification.png',
        });
      }
    },
  });

  // 📲 Request notification permission
  useEffect(() => {
    if ('Notification' in window && Notification.permission === 'default') {
      Notification.requestPermission();
    }
  }, []);

  return (
    <div className="relative">
      <button
        onClick={() => setIsOpen(!isOpen)}
        className="relative p-2 rounded-lg bg-gray-800 hover:bg-gray-700 transition-colors"
      >
        <Bell className="w-5 h-5 text-gray-300" />
        {unreadCount > 0 && (
          <span className="absolute -top-1 -right-1 bg-red-500 text-white text-xs rounded-full w-5 h-5 flex items-center justify-center">
            {unreadCount > 9 ? '9+' : unreadCount}
          </span>
        )}
      </button>

      {isOpen && (
        <NotificationPanel
          notifications={notifications}
          onClose={() => setIsOpen(false)}
          onMarkRead={(id) => {
            // Mark as read logic
            setUnreadCount(prev => Math.max(0, prev - 1));
          }}
        />
      )}
    </div>
  );
}

function NotificationPanel({ notifications, onClose, onMarkRead }) {
  return (
    <div className="absolute top-full right-0 mt-2 w-80 bg-gray-900 rounded-lg border border-gray-700 shadow-xl z-50">
      <div className="flex items-center justify-between p-4 border-b border-gray-700">
        <h3 className="text-white font-semibold">Notifications</h3>
        <button onClick={onClose} className="text-gray-400 hover:text-white">
          <X className="w-4 h-4" />
        </button>
      </div>
      
      <div className="max-h-96 overflow-y-auto">
        {notifications.length === 0 ? (
          <div className="p-4 text-center text-gray-500">No notifications</div>
        ) : (
          notifications.map((notification, index) => (
            <NotificationItem
              key={index}
              notification={notification}
              onMarkRead={onMarkRead}
            />
          ))
        )}
      </div>
    </div>
  );
}

✅ O que você conquistou nesta aula

WebSocket Server com tRPC configurado
tRPC Subscriptions type-safe implementadas
Event-Driven Architecture completa
Live Notifications em tempo real
Connection Management com health checks
Event Bus com retry e dead letter queue
Push Notifications cross-platform
Production Monitoring e métricas

🚀 Próximos Passos

Na próxima aula:

  • • Deploy e configuração de produção
  • • Monitoramento e observability
  • • Scaling e load balancing
  • • Security e performance tuning

Para praticar:

  • • Implemente chat em tempo real
  • • Configure push notifications
  • • Teste com carga real
  • • Monitore métricas de performance

Navegação do Curso

Progresso do Módulo 54/5 aulas