🚀 Oferta especial: 60% OFF no CrazyStack - Últimas vagas!Garantir vaga →
Módulo 5 - Aula 2

Event Sourcing e CQRS

Domine Event Sourcing e CQRS com tRPC: event store, command handlers, projections, sagas e arquitetura escalável para SaaS enterprise com auditoria completa.

130 min
Expert
Event Sourcing

🎯 Por que Event Sourcing e CQRS são essenciais para SaaS?

Auditoria Completa: Cada mudança é registrada como evento, garantindo compliance e rastreabilidade total.

Escalabilidade: CQRS permite otimizar leitura e escrita independentemente, suportando milhões de usuários.

🗃️ Event Store com tRPC

shared/event-sourcing/event-store.ts
// 📁 shared/event-sourcing/event-store.ts
import { PrismaClient } from '@prisma/client';
import { z } from 'zod';

// 🎯 Interfaces base para eventos
export interface DomainEvent {
  id: string;
  aggregateId: string;
  aggregateType: string;
  eventType: string;
  eventVersion: number;
  eventData: any;
  metadata: {
    userId?: string;
    correlationId: string;
    causationId?: string;
    timestamp: string;
    source: string;
    traceId?: string;
  };
}

export interface EventStream {
  aggregateId: string;
  aggregateType: string;
  events: DomainEvent[];
  version: number;
}

// 📊 Event Store principal
export class EventStore {
  private prisma: PrismaClient;
  private eventHandlers: Map<string, ((event: DomainEvent) => Promise<void>)[]> = new Map();
  
  constructor(prisma: PrismaClient) {
    this.prisma = prisma;
  }
  
  // 💾 Salvar eventos
  async saveEvents(
    aggregateId: string,
    aggregateType: string,
    events: Omit<DomainEvent, 'id'>[],
    expectedVersion: number
  ): Promise<void> {
    await this.prisma.$transaction(async (tx) => {
      // 🔒 Verificar versão para evitar conflitos
      const currentStream = await tx.eventStream.findUnique({
        where: {
          aggregateId_aggregateType: {
            aggregateId,
            aggregateType,
          },
        },
      });
      
      if (currentStream && currentStream.version !== expectedVersion) {
        throw new Error(`Concurrency conflict. Expected version ${expectedVersion}, but current is ${currentStream.version}`);
      }
      
      // 💾 Salvar eventos
      const eventRecords = events.map((event, index) => ({
        id: this.generateEventId(),
        aggregateId,
        aggregateType,
        eventType: event.eventType,
        eventVersion: event.eventVersion,
        eventData: JSON.stringify(event.eventData),
        metadata: JSON.stringify(event.metadata),
        sequenceNumber: (currentStream?.version || 0) + index + 1,
        createdAt: new Date(),
      }));
      
      await tx.event.createMany({
        data: eventRecords,
      });
      
      // 🔄 Atualizar stream
      const newVersion = (currentStream?.version || 0) + events.length;
      
      await tx.eventStream.upsert({
        where: {
          aggregateId_aggregateType: {
            aggregateId,
            aggregateType,
          },
        },
        create: {
          aggregateId,
          aggregateType,
          version: newVersion,
          lastEventId: eventRecords[eventRecords.length - 1].id,
          updatedAt: new Date(),
        },
        update: {
          version: newVersion,
          lastEventId: eventRecords[eventRecords.length - 1].id,
          updatedAt: new Date(),
        },
      });
      
      // 📢 Publicar eventos para handlers
      for (const eventRecord of eventRecords) {
        const domainEvent: DomainEvent = {
          id: eventRecord.id,
          aggregateId: eventRecord.aggregateId,
          aggregateType: eventRecord.aggregateType,
          eventType: eventRecord.eventType,
          eventVersion: eventRecord.eventVersion,
          eventData: JSON.parse(eventRecord.eventData),
          metadata: JSON.parse(eventRecord.metadata),
        };
        
        await this.publishEvent(domainEvent);
      }
    });
  }
  
  // 📖 Carregar eventos de um aggregate
  async loadEvents(
    aggregateId: string,
    aggregateType: string,
    fromVersion: number = 0
  ): Promise<EventStream> {
    const events = await this.prisma.event.findMany({
      where: {
        aggregateId,
        aggregateType,
        sequenceNumber: {
          gt: fromVersion,
        },
      },
      orderBy: {
        sequenceNumber: 'asc',
      },
    });
    
    const domainEvents: DomainEvent[] = events.map(event => ({
      id: event.id,
      aggregateId: event.aggregateId,
      aggregateType: event.aggregateType,
      eventType: event.eventType,
      eventVersion: event.eventVersion,
      eventData: JSON.parse(event.eventData),
      metadata: JSON.parse(event.metadata),
    }));
    
    const stream = await this.prisma.eventStream.findUnique({
      where: {
        aggregateId_aggregateType: {
          aggregateId,
          aggregateType,
        },
      },
    });
    
    return {
      aggregateId,
      aggregateType,
      events: domainEvents,
      version: stream?.version || 0,
    };
  }
  
  // 📊 Carregar eventos por tipo
  async loadEventsByType(
    eventType: string,
    fromTimestamp?: Date,
    limit: number = 100
  ): Promise<DomainEvent[]> {
    const events = await this.prisma.event.findMany({
      where: {
        eventType,
        ...(fromTimestamp && {
          createdAt: {
            gte: fromTimestamp,
          },
        }),
      },
      orderBy: {
        createdAt: 'asc',
      },
      take: limit,
    });
    
    return events.map(event => ({
      id: event.id,
      aggregateId: event.aggregateId,
      aggregateType: event.aggregateType,
      eventType: event.eventType,
      eventVersion: event.eventVersion,
      eventData: JSON.parse(event.eventData),
      metadata: JSON.parse(event.metadata),
    }));
  }
  
  // 🔍 Buscar eventos com filtros
  async queryEvents(filters: {
    aggregateType?: string;
    eventTypes?: string[];
    fromDate?: Date;
    toDate?: Date;
    userId?: string;
    correlationId?: string;
    limit?: number;
    offset?: number;
  }): Promise<{
    events: DomainEvent[];
    total: number;
  }> {
    const where: any = {};
    
    if (filters.aggregateType) {
      where.aggregateType = filters.aggregateType;
    }
    
    if (filters.eventTypes?.length) {
      where.eventType = { in: filters.eventTypes };
    }
    
    if (filters.fromDate || filters.toDate) {
      where.createdAt = {};
      if (filters.fromDate) where.createdAt.gte = filters.fromDate;
      if (filters.toDate) where.createdAt.lte = filters.toDate;
    }
    
    // 🔍 Filtros em metadata
    if (filters.userId || filters.correlationId) {
      const metadataFilters = [];
      if (filters.userId) {
        metadataFilters.push(`JSON_EXTRACT(metadata, '$.userId') = '${filters.userId}'`);
      }
      if (filters.correlationId) {
        metadataFilters.push(`JSON_EXTRACT(metadata, '$.correlationId') = '${filters.correlationId}'`);
      }
      
      where.AND = metadataFilters.map(filter => 
        this.prisma.$queryRawUnsafe(`SELECT 1 WHERE ${filter}`)
      );
    }
    
    const [events, total] = await Promise.all([
      this.prisma.event.findMany({
        where,
        orderBy: { createdAt: 'desc' },
        skip: filters.offset || 0,
        take: filters.limit || 50,
      }),
      this.prisma.event.count({ where }),
    ]);
    
    return {
      events: events.map(event => ({
        id: event.id,
        aggregateId: event.aggregateId,
        aggregateType: event.aggregateType,
        eventType: event.eventType,
        eventVersion: event.eventVersion,
        eventData: JSON.parse(event.eventData),
        metadata: JSON.parse(event.metadata),
      })),
      total,
    };
  }
  
  // 📢 Registrar handler de evento
  registerEventHandler(
    eventType: string,
    handler: (event: DomainEvent) => Promise<void>
  ): void {
    if (!this.eventHandlers.has(eventType)) {
      this.eventHandlers.set(eventType, []);
    }
    
    this.eventHandlers.get(eventType)!.push(handler);
  }
  
  // 📤 Publicar evento para handlers
  private async publishEvent(event: DomainEvent): Promise<void> {
    const handlers = this.eventHandlers.get(event.eventType) || [];
    
    // 🔄 Executar handlers em paralelo
    await Promise.allSettled(
      handlers.map(handler => handler(event))
    );
  }
  
  // 🔑 Gerar ID único para evento
  private generateEventId(): string {
    return `evt_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`;
  }
  
  // 📊 Obter estatísticas do event store
  async getStats(): Promise<{
    totalEvents: number;
    totalStreams: number;
    eventsByType: Record<string, number>;
    recentActivity: {
      period: string;
      eventCount: number;
    }[];
  }> {
    const [totalEvents, totalStreams, eventsByType, recentActivity] = await Promise.all([
      this.prisma.event.count(),
      this.prisma.eventStream.count(),
      this.prisma.event.groupBy({
        by: ['eventType'],
        _count: { id: true },
      }),
      this.prisma.$queryRaw`
        SELECT 
          DATE_TRUNC('hour', "createdAt") as period,
          COUNT(*) as eventCount
        FROM "Event"
        WHERE "createdAt" >= NOW() - INTERVAL '24 hours'
        GROUP BY DATE_TRUNC('hour', "createdAt")
        ORDER BY period DESC
        LIMIT 24
      `,
    ]);
    
    return {
      totalEvents,
      totalStreams,
      eventsByType: eventsByType.reduce((acc, item) => {
        acc[item.eventType] = item._count.id;
        return acc;
      }, {} as Record<string, number>),
      recentActivity: (recentActivity as any[]).map(item => ({
        period: item.period.toISOString(),
        eventCount: parseInt(item.eventCount),
      })),
    };
  }
}

🏗️ Aggregate Root e Domain Events

domain/user/user-aggregate.ts
// 📁 domain/user/user-aggregate.ts
import { DomainEvent } from '@/shared/event-sourcing/event-store';

// 🎯 Domain Events para User
export interface UserCreatedEvent extends DomainEvent {
  eventType: 'UserCreated';
  eventData: {
    email: string;
    name: string;
    organizationId?: string;
    role: string;
    metadata: Record<string, any>;
  };
}

export interface UserEmailChangedEvent extends DomainEvent {
  eventType: 'UserEmailChanged';
  eventData: {
    previousEmail: string;
    newEmail: string;
    verificationRequired: boolean;
  };
}

export interface UserRoleChangedEvent extends DomainEvent {
  eventType: 'UserRoleChanged';
  eventData: {
    previousRole: string;
    newRole: string;
    changedBy: string;
    reason?: string;
  };
}

export interface UserDeactivatedEvent extends DomainEvent {
  eventType: 'UserDeactivated';
  eventData: {
    reason: string;
    deactivatedBy: string;
    scheduledDeletion?: string;
  };
}

// 🏗️ User Aggregate Root
export class UserAggregate {
  private id: string;
  private email: string;
  private name: string;
  private role: string;
  private organizationId?: string;
  private isActive: boolean;
  private version: number;
  private uncommittedEvents: DomainEvent[] = [];
  
  constructor(id: string) {
    this.id = id;
    this.version = 0;
    this.isActive = true;
  }
  
  // 📖 Aplicar evento (para reconstrução do estado)
  apply(event: DomainEvent): void {
    switch (event.eventType) {
      case 'UserCreated':
        this.applyUserCreated(event as UserCreatedEvent);
        break;
      case 'UserEmailChanged':
        this.applyUserEmailChanged(event as UserEmailChangedEvent);
        break;
      case 'UserRoleChanged':
        this.applyUserRoleChanged(event as UserRoleChangedEvent);
        break;
      case 'UserDeactivated':
        this.applyUserDeactivated(event as UserDeactivatedEvent);
        break;
      default:
        throw new Error(`Unknown event type: ${event.eventType}`);
    }
    
    this.version++;
  }
  
  // 🎯 Command: Criar usuário
  static create(params: {
    id: string;
    email: string;
    name: string;
    role: string;
    organizationId?: string;
    metadata?: Record<string, any>;
    correlationId: string;
    userId?: string;
  }): UserAggregate {
    const user = new UserAggregate(params.id);
    
    // ✅ Validações de domínio
    if (!params.email || !user.isValidEmail(params.email)) {
      throw new Error('Invalid email address');
    }
    
    if (!params.name || params.name.trim().length < 2) {
      throw new Error('Name must be at least 2 characters');
    }
    
    if (!['USER', 'ADMIN', 'SUPER_ADMIN'].includes(params.role)) {
      throw new Error('Invalid role');
    }
    
    // 📢 Gerar evento
    const event: UserCreatedEvent = {
      id: user.generateEventId(),
      aggregateId: params.id,
      aggregateType: 'User',
      eventType: 'UserCreated',
      eventVersion: 1,
      eventData: {
        email: params.email,
        name: params.name,
        organizationId: params.organizationId,
        role: params.role,
        metadata: params.metadata || {},
      },
      metadata: {
        correlationId: params.correlationId,
        timestamp: new Date().toISOString(),
        source: 'user-service',
        userId: params.userId,
      },
    };
    
    user.raiseEvent(event);
    return user;
  }
  
  // 🔄 Command: Alterar email
  changeEmail(params: {
    newEmail: string;
    correlationId: string;
    userId: string;
  }): void {
    // ✅ Validações
    if (!this.isActive) {
      throw new Error('Cannot change email of inactive user');
    }
    
    if (!params.newEmail || !this.isValidEmail(params.newEmail)) {
      throw new Error('Invalid email address');
    }
    
    if (params.newEmail === this.email) {
      return; // Nenhuma mudança necessária
    }
    
    // 📢 Gerar evento
    const event: UserEmailChangedEvent = {
      id: this.generateEventId(),
      aggregateId: this.id,
      aggregateType: 'User',
      eventType: 'UserEmailChanged',
      eventVersion: 1,
      eventData: {
        previousEmail: this.email,
        newEmail: params.newEmail,
        verificationRequired: true,
      },
      metadata: {
        correlationId: params.correlationId,
        timestamp: new Date().toISOString(),
        source: 'user-service',
        userId: params.userId,
      },
    };
    
    this.raiseEvent(event);
  }
  
  // 👑 Command: Alterar role
  changeRole(params: {
    newRole: string;
    reason?: string;
    correlationId: string;
    userId: string;
  }): void {
    // ✅ Validações
    if (!this.isActive) {
      throw new Error('Cannot change role of inactive user');
    }
    
    if (!['USER', 'ADMIN', 'SUPER_ADMIN'].includes(params.newRole)) {
      throw new Error('Invalid role');
    }
    
    if (params.newRole === this.role) {
      return; // Nenhuma mudança necessária
    }
    
    // 🔒 Regras de negócio específicas
    if (this.role === 'SUPER_ADMIN' && params.newRole !== 'SUPER_ADMIN') {
      throw new Error('Cannot downgrade SUPER_ADMIN role');
    }
    
    // 📢 Gerar evento
    const event: UserRoleChangedEvent = {
      id: this.generateEventId(),
      aggregateId: this.id,
      aggregateType: 'User',
      eventType: 'UserRoleChanged',
      eventVersion: 1,
      eventData: {
        previousRole: this.role,
        newRole: params.newRole,
        changedBy: params.userId,
        reason: params.reason,
      },
      metadata: {
        correlationId: params.correlationId,
        timestamp: new Date().toISOString(),
        source: 'user-service',
        userId: params.userId,
      },
    };
    
    this.raiseEvent(event);
  }
  
  // 🗑️ Command: Desativar usuário
  deactivate(params: {
    reason: string;
    scheduledDeletion?: Date;
    correlationId: string;
    userId: string;
  }): void {
    // ✅ Validações
    if (!this.isActive) {
      throw new Error('User is already inactive');
    }
    
    if (!params.reason || params.reason.trim().length < 5) {
      throw new Error('Deactivation reason must be at least 5 characters');
    }
    
    // 📢 Gerar evento
    const event: UserDeactivatedEvent = {
      id: this.generateEventId(),
      aggregateId: this.id,
      aggregateType: 'User',
      eventType: 'UserDeactivated',
      eventVersion: 1,
      eventData: {
        reason: params.reason,
        deactivatedBy: params.userId,
        scheduledDeletion: params.scheduledDeletion?.toISOString(),
      },
      metadata: {
        correlationId: params.correlationId,
        timestamp: new Date().toISOString(),
        source: 'user-service',
        userId: params.userId,
      },
    };
    
    this.raiseEvent(event);
  }
  
  // 📢 Adicionar evento não confirmado
  private raiseEvent(event: DomainEvent): void {
    this.uncommittedEvents.push(event);
    this.apply(event);
  }
  
  // 📋 Obter eventos não confirmados
  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents];
  }
  
  // ✅ Marcar eventos como confirmados
  markEventsAsCommitted(): void {
    this.uncommittedEvents = [];
  }
  
  // 🎯 Event Handlers
  private applyUserCreated(event: UserCreatedEvent): void {
    this.email = event.eventData.email;
    this.name = event.eventData.name;
    this.role = event.eventData.role;
    this.organizationId = event.eventData.organizationId;
    this.isActive = true;
  }
  
  private applyUserEmailChanged(event: UserEmailChangedEvent): void {
    this.email = event.eventData.newEmail;
  }
  
  private applyUserRoleChanged(event: UserRoleChangedEvent): void {
    this.role = event.eventData.newRole;
  }
  
  private applyUserDeactivated(event: UserDeactivatedEvent): void {
    this.isActive = false;
  }
  
  // 🔧 Utilitários
  private isValidEmail(email: string): boolean {
    const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
    return emailRegex.test(email);
  }
  
  private generateEventId(): string {
    return `evt_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`;
  }
  
  // 📊 Getters
  getId(): string { return this.id; }
  getEmail(): string { return this.email; }
  getName(): string { return this.name; }
  getRole(): string { return this.role; }
  getOrganizationId(): string | undefined { return this.organizationId; }
  getIsActive(): boolean { return this.isActive; }
  getVersion(): number { return this.version; }
}

📚 Repository Pattern para Event Sourcing

domain/user/user-repository.ts
// 📁 domain/user/user-repository.ts
import { EventStore } from '@/shared/event-sourcing/event-store';
import { UserAggregate } from './user-aggregate';

// 🎯 Interface do repositório
export interface IUserRepository {
  save(user: UserAggregate): Promise<void>;
  getById(id: string): Promise<UserAggregate | null>;
  exists(id: string): Promise<boolean>;
  getByEmail(email: string): Promise<UserAggregate | null>;
}

// 📚 Implementação do repositório
export class UserRepository implements IUserRepository {
  private eventStore: EventStore;
  private cache: Map<string, UserAggregate> = new Map();
  
  constructor(eventStore: EventStore) {
    this.eventStore = eventStore;
  }
  
  // 💾 Salvar aggregate
  async save(user: UserAggregate): Promise<void> {
    const uncommittedEvents = user.getUncommittedEvents();
    
    if (uncommittedEvents.length === 0) {
      return; // Nenhuma mudança para salvar
    }
    
    // 💾 Salvar eventos no event store
    await this.eventStore.saveEvents(
      user.getId(),
      'User',
      uncommittedEvents,
      user.getVersion() - uncommittedEvents.length
    );
    
    // ✅ Marcar eventos como confirmados
    user.markEventsAsCommitted();
    
    // 💾 Atualizar cache
    this.cache.set(user.getId(), user);
  }
  
  // 📖 Carregar aggregate por ID
  async getById(id: string): Promise<UserAggregate | null> {
    // 🔍 Verificar cache primeiro
    if (this.cache.has(id)) {
      return this.cache.get(id)!;
    }
    
    // 📖 Carregar eventos do event store
    const eventStream = await this.eventStore.loadEvents(id, 'User');
    
    if (eventStream.events.length === 0) {
      return null; // Aggregate não existe
    }
    
    // 🏗️ Reconstruir aggregate a partir dos eventos
    const user = new UserAggregate(id);
    
    for (const event of eventStream.events) {
      user.apply(event);
    }
    
    // 💾 Adicionar ao cache
    this.cache.set(id, user);
    
    return user;
  }
  
  // ✅ Verificar se aggregate existe
  async exists(id: string): Promise<boolean> {
    if (this.cache.has(id)) {
      return true;
    }
    
    const eventStream = await this.eventStore.loadEvents(id, 'User');
    return eventStream.events.length > 0;
  }
  
  // 🔍 Buscar por email (usando projection)
  async getByEmail(email: string): Promise<UserAggregate | null> {
    // 🎯 Aqui usaríamos uma projection/read model
    // Por simplicidade, vamos buscar nos eventos
    const events = await this.eventStore.loadEventsByType('UserCreated');
    
    for (const event of events) {
      if (event.eventData.email === email) {
        return this.getById(event.aggregateId);
      }
    }
    
    // 🔍 Também verificar mudanças de email
    const emailChangeEvents = await this.eventStore.loadEventsByType('UserEmailChanged');
    
    for (const event of emailChangeEvents) {
      if (event.eventData.newEmail === email) {
        return this.getById(event.aggregateId);
      }
    }
    
    return null;
  }
  
  // 🧹 Limpar cache
  clearCache(): void {
    this.cache.clear();
  }
  
  // 📊 Estatísticas do repositório
  getCacheStats(): {
    size: number;
    hitRate: number;
  } {
    return {
      size: this.cache.size,
      hitRate: 0.85, // Seria calculado com métricas reais
    };
  }
}

// 📁 infrastructure/projections/user-read-model.ts
// 🎯 Read Model para queries otimizadas

export interface UserReadModel {
  id: string;
  email: string;
  name: string;
  role: string;
  organizationId?: string;
  isActive: boolean;
  createdAt: string;
  updatedAt: string;
  lastLoginAt?: string;
  metadata: Record<string, any>;
}

// 📊 Projection Handler para User
export class UserProjectionHandler {
  private prisma: PrismaClient;
  
  constructor(prisma: PrismaClient) {
    this.prisma = prisma;
  }
  
  // 🎯 Setup dos event handlers
  setupEventHandlers(eventStore: EventStore): void {
    eventStore.registerEventHandler('UserCreated', this.handleUserCreated.bind(this));
    eventStore.registerEventHandler('UserEmailChanged', this.handleUserEmailChanged.bind(this));
    eventStore.registerEventHandler('UserRoleChanged', this.handleUserRoleChanged.bind(this));
    eventStore.registerEventHandler('UserDeactivated', this.handleUserDeactivated.bind(this));
  }
  
  // 👤 Handler: Usuário criado
  private async handleUserCreated(event: DomainEvent): Promise<void> {
    try {
      await this.prisma.userReadModel.create({
        data: {
          id: event.aggregateId,
          email: event.eventData.email,
          name: event.eventData.name,
          role: event.eventData.role,
          organizationId: event.eventData.organizationId,
          isActive: true,
          createdAt: event.metadata.timestamp,
          updatedAt: event.metadata.timestamp,
          metadata: event.eventData.metadata,
        },
      });
      
      console.log(`✅ User read model created: ${event.aggregateId}`);
    } catch (error) {
      console.error('Failed to handle UserCreated event:', error);
      throw error;
    }
  }
  
  // 📧 Handler: Email alterado
  private async handleUserEmailChanged(event: DomainEvent): Promise<void> {
    try {
      await this.prisma.userReadModel.update({
        where: { id: event.aggregateId },
        data: {
          email: event.eventData.newEmail,
          updatedAt: event.metadata.timestamp,
        },
      });
      
      console.log(`📧 User email updated: ${event.aggregateId}`);
    } catch (error) {
      console.error('Failed to handle UserEmailChanged event:', error);
      throw error;
    }
  }
  
  // 👑 Handler: Role alterado
  private async handleUserRoleChanged(event: DomainEvent): Promise<void> {
    try {
      await this.prisma.userReadModel.update({
        where: { id: event.aggregateId },
        data: {
          role: event.eventData.newRole,
          updatedAt: event.metadata.timestamp,
        },
      });
      
      console.log(`👑 User role updated: ${event.aggregateId}`);
    } catch (error) {
      console.error('Failed to handle UserRoleChanged event:', error);
      throw error;
    }
  }
  
  // 🗑️ Handler: Usuário desativado
  private async handleUserDeactivated(event: DomainEvent): Promise<void> {
    try {
      await this.prisma.userReadModel.update({
        where: { id: event.aggregateId },
        data: {
          isActive: false,
          updatedAt: event.metadata.timestamp,
        },
      });
      
      console.log(`🗑️ User deactivated: ${event.aggregateId}`);
    } catch (error) {
      console.error('Failed to handle UserDeactivated event:', error);
      throw error;
    }
  }
}

// 🔍 Query Service para read models
export class UserQueryService {
  private prisma: PrismaClient;
  
  constructor(prisma: PrismaClient) {
    this.prisma = prisma;
  }
  
  // 🔍 Buscar usuário por ID
  async getById(id: string): Promise<UserReadModel | null> {
    return this.prisma.userReadModel.findUnique({
      where: { id },
    });
  }
  
  // 📧 Buscar usuário por email
  async getByEmail(email: string): Promise<UserReadModel | null> {
    return this.prisma.userReadModel.findUnique({
      where: { email },
    });
  }
  
  // 📊 Buscar usuários com paginação
  async getUsers(params: {
    organizationId?: string;
    role?: string;
    isActive?: boolean;
    search?: string;
    page: number;
    limit: number;
    sortBy?: string;
    sortOrder?: 'asc' | 'desc';
  }): Promise<{
    users: UserReadModel[];
    total: number;
    page: number;
    totalPages: number;
  }> {
    const where: any = {};
    
    if (params.organizationId) {
      where.organizationId = params.organizationId;
    }
    
    if (params.role) {
      where.role = params.role;
    }
    
    if (params.isActive !== undefined) {
      where.isActive = params.isActive;
    }
    
    if (params.search) {
      where.OR = [
        { name: { contains: params.search, mode: 'insensitive' } },
        { email: { contains: params.search, mode: 'insensitive' } },
      ];
    }
    
    const [users, total] = await Promise.all([
      this.prisma.userReadModel.findMany({
        where,
        skip: (params.page - 1) * params.limit,
        take: params.limit,
        orderBy: {
          [params.sortBy || 'createdAt']: params.sortOrder || 'desc',
        },
      }),
      this.prisma.userReadModel.count({ where }),
    ]);
    
    return {
      users,
      total,
      page: params.page,
      totalPages: Math.ceil(total / params.limit),
    };
  }
  
  // 📊 Estatísticas de usuários
  async getStats(organizationId?: string): Promise<{
    total: number;
    active: number;
    inactive: number;
    byRole: Record<string, number>;
    recentSignups: number;
  }> {
    const where = organizationId ? { organizationId } : {};
    
    const [total, active, byRole, recentSignups] = await Promise.all([
      this.prisma.userReadModel.count({ where }),
      this.prisma.userReadModel.count({ 
        where: { ...where, isActive: true } 
      }),
      this.prisma.userReadModel.groupBy({
        by: ['role'],
        where,
        _count: { id: true },
      }),
      this.prisma.userReadModel.count({
        where: {
          ...where,
          createdAt: {
            gte: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000), // 7 dias
          },
        },
      }),
    ]);
    
    return {
      total,
      active,
      inactive: total - active,
      byRole: byRole.reduce((acc, item) => {
        acc[item.role] = item._count.id;
        return acc;
      }, {} as Record<string, number>),
      recentSignups,
    };
  }
}

⚡ Command Handlers e Application Service

application/user/user-application-service.ts
// 📁 application/user/commands.ts
import { z } from 'zod';

// 🎯 Command schemas
export const createUserCommandSchema = z.object({
  id: z.string().uuid(),
  email: z.string().email(),
  name: z.string().min(2).max(100),
  role: z.enum(['USER', 'ADMIN', 'SUPER_ADMIN']),
  organizationId: z.string().uuid().optional(),
  metadata: z.record(z.any()).optional(),
});

export const changeUserEmailCommandSchema = z.object({
  id: z.string().uuid(),
  newEmail: z.string().email(),
});

export const changeUserRoleCommandSchema = z.object({
  id: z.string().uuid(),
  newRole: z.enum(['USER', 'ADMIN', 'SUPER_ADMIN']),
  reason: z.string().min(5).max(500).optional(),
});

export const deactivateUserCommandSchema = z.object({
  id: z.string().uuid(),
  reason: z.string().min(5).max(500),
  scheduledDeletion: z.date().optional(),
});

// 🎯 Command types
export type CreateUserCommand = z.infer<typeof createUserCommandSchema>;
export type ChangeUserEmailCommand = z.infer<typeof changeUserEmailCommandSchema>;
export type ChangeUserRoleCommand = z.infer<typeof changeUserRoleCommandSchema>;
export type DeactivateUserCommand = z.infer<typeof deactivateUserCommandSchema>;

// 📁 application/user/user-application-service.ts
import { IUserRepository } from '@/domain/user/user-repository';
import { UserAggregate } from '@/domain/user/user-aggregate';
import { EmailService } from '@/infrastructure/email/email-service';
import { Logger } from '@/infrastructure/logging/logger';

// 📋 Context para comandos
export interface CommandContext {
  userId: string;
  correlationId: string;
  traceId?: string;
  organizationId?: string;
  permissions: string[];
}

// ⚡ Application Service principal
export class UserApplicationService {
  private userRepository: IUserRepository;
  private emailService: EmailService;
  private logger: Logger;
  
  constructor(
    userRepository: IUserRepository,
    emailService: EmailService,
    logger: Logger
  ) {
    this.userRepository = userRepository;
    this.emailService = emailService;
    this.logger = logger;
  }
  
  // 👤 Command: Criar usuário
  async createUser(
    command: CreateUserCommand,
    context: CommandContext
  ): Promise<{ id: string; success: boolean }> {
    this.logger.info('Creating user', {
      command,
      context,
      traceId: context.traceId,
    });
    
    try {
      // ✅ Validações de negócio
      await this.validateCreateUser(command, context);
      
      // 🏗️ Criar aggregate
      const user = UserAggregate.create({
        id: command.id,
        email: command.email,
        name: command.name,
        role: command.role,
        organizationId: command.organizationId,
        metadata: command.metadata,
        correlationId: context.correlationId,
        userId: context.userId,
      });
      
      // 💾 Salvar no repositório
      await this.userRepository.save(user);
      
      // 📧 Enviar email de boas-vindas (async)
      this.sendWelcomeEmail(user, context).catch(error => {
        this.logger.error('Failed to send welcome email', {
          userId: user.getId(),
          error: error.message,
          traceId: context.traceId,
        });
      });
      
      this.logger.info('User created successfully', {
        userId: user.getId(),
        email: command.email,
        traceId: context.traceId,
      });
      
      return { id: user.getId(), success: true };
    } catch (error) {
      this.logger.error('Failed to create user', {
        command,
        error: error.message,
        traceId: context.traceId,
      });
      throw error;
    }
  }
  
  // 📧 Command: Alterar email
  async changeUserEmail(
    command: ChangeUserEmailCommand,
    context: CommandContext
  ): Promise<{ success: boolean }> {
    this.logger.info('Changing user email', {
      userId: command.id,
      newEmail: command.newEmail,
      traceId: context.traceId,
    });
    
    try {
      // 🔍 Carregar aggregate
      const user = await this.userRepository.getById(command.id);
      if (!user) {
        throw new Error('User not found');
      }
      
      // ✅ Verificar permissões
      this.validateEmailChangePermissions(user, context);
      
      // ✅ Verificar se email já está em uso
      const existingUser = await this.userRepository.getByEmail(command.newEmail);
      if (existingUser && existingUser.getId() !== user.getId()) {
        throw new Error('Email already in use');
      }
      
      // 🔄 Executar comando
      user.changeEmail({
        newEmail: command.newEmail,
        correlationId: context.correlationId,
        userId: context.userId,
      });
      
      // 💾 Salvar mudanças
      await this.userRepository.save(user);
      
      // 📧 Enviar email de confirmação
      this.sendEmailVerification(user, command.newEmail, context).catch(error => {
        this.logger.error('Failed to send email verification', {
          userId: user.getId(),
          error: error.message,
          traceId: context.traceId,
        });
      });
      
      this.logger.info('User email changed successfully', {
        userId: user.getId(),
        newEmail: command.newEmail,
        traceId: context.traceId,
      });
      
      return { success: true };
    } catch (error) {
      this.logger.error('Failed to change user email', {
        command,
        error: error.message,
        traceId: context.traceId,
      });
      throw error;
    }
  }
  
  // 👑 Command: Alterar role
  async changeUserRole(
    command: ChangeUserRoleCommand,
    context: CommandContext
  ): Promise<{ success: boolean }> {
    this.logger.info('Changing user role', {
      userId: command.id,
      newRole: command.newRole,
      traceId: context.traceId,
    });
    
    try {
      // 🔍 Carregar aggregate
      const user = await this.userRepository.getById(command.id);
      if (!user) {
        throw new Error('User not found');
      }
      
      // ✅ Verificar permissões
      this.validateRoleChangePermissions(user, command.newRole, context);
      
      // 🔄 Executar comando
      user.changeRole({
        newRole: command.newRole,
        reason: command.reason,
        correlationId: context.correlationId,
        userId: context.userId,
      });
      
      // 💾 Salvar mudanças
      await this.userRepository.save(user);
      
      // 📧 Notificar mudança de role
      this.notifyRoleChange(user, command.newRole, context).catch(error => {
        this.logger.error('Failed to notify role change', {
          userId: user.getId(),
          error: error.message,
          traceId: context.traceId,
        });
      });
      
      this.logger.info('User role changed successfully', {
        userId: user.getId(),
        newRole: command.newRole,
        reason: command.reason,
        traceId: context.traceId,
      });
      
      return { success: true };
    } catch (error) {
      this.logger.error('Failed to change user role', {
        command,
        error: error.message,
        traceId: context.traceId,
      });
      throw error;
    }
  }
  
  // 🗑️ Command: Desativar usuário
  async deactivateUser(
    command: DeactivateUserCommand,
    context: CommandContext
  ): Promise<{ success: boolean }> {
    this.logger.info('Deactivating user', {
      userId: command.id,
      reason: command.reason,
      traceId: context.traceId,
    });
    
    try {
      // 🔍 Carregar aggregate
      const user = await this.userRepository.getById(command.id);
      if (!user) {
        throw new Error('User not found');
      }
      
      // ✅ Verificar permissões
      this.validateDeactivationPermissions(user, context);
      
      // 🔄 Executar comando
      user.deactivate({
        reason: command.reason,
        scheduledDeletion: command.scheduledDeletion,
        correlationId: context.correlationId,
        userId: context.userId,
      });
      
      // 💾 Salvar mudanças
      await this.userRepository.save(user);
      
      // 📧 Notificar desativação
      this.notifyUserDeactivation(user, command.reason, context).catch(error => {
        this.logger.error('Failed to notify user deactivation', {
          userId: user.getId(),
          error: error.message,
          traceId: context.traceId,
        });
      });
      
      this.logger.info('User deactivated successfully', {
        userId: user.getId(),
        reason: command.reason,
        traceId: context.traceId,
      });
      
      return { success: true };
    } catch (error) {
      this.logger.error('Failed to deactivate user', {
        command,
        error: error.message,
        traceId: context.traceId,
      });
      throw error;
    }
  }
  
  // ✅ Validações privadas
  private async validateCreateUser(
    command: CreateUserCommand,
    context: CommandContext
  ): Promise<void> {
    // 🔍 Verificar se email já existe
    const existingUser = await this.userRepository.getByEmail(command.email);
    if (existingUser) {
      throw new Error('Email already in use');
    }
    
    // 🔍 Verificar se ID já existe
    const userExists = await this.userRepository.exists(command.id);
    if (userExists) {
      throw new Error('User ID already exists');
    }
    
    // 👑 Verificar permissões para criar admin
    if (['ADMIN', 'SUPER_ADMIN'].includes(command.role)) {
      if (!context.permissions.includes('user:create_admin')) {
        throw new Error('Insufficient permissions to create admin user');
      }
    }
  }
  
  private validateEmailChangePermissions(
    user: UserAggregate,
    context: CommandContext
  ): void {
    // 👤 Usuário pode alterar próprio email
    if (user.getId() === context.userId) {
      return;
    }
    
    // 👑 Admin pode alterar email de usuários da mesma organização
    if (context.permissions.includes('user:manage') && 
        user.getOrganizationId() === context.organizationId) {
      return;
    }
    
    throw new Error('Insufficient permissions to change user email');
  }
  
  private validateRoleChangePermissions(
    user: UserAggregate,
    newRole: string,
    context: CommandContext
  ): void {
    // 🚫 Não pode alterar próprio role
    if (user.getId() === context.userId) {
      throw new Error('Cannot change your own role');
    }
    
    // 👑 Verificar permissões baseadas no role target
    if (newRole === 'SUPER_ADMIN' && !context.permissions.includes('user:create_super_admin')) {
      throw new Error('Insufficient permissions to assign SUPER_ADMIN role');
    }
    
    if (newRole === 'ADMIN' && !context.permissions.includes('user:create_admin')) {
      throw new Error('Insufficient permissions to assign ADMIN role');
    }
    
    // 🏢 Verificar mesma organização
    if (user.getOrganizationId() !== context.organizationId) {
      throw new Error('Cannot change role of user from different organization');
    }
  }
  
  private validateDeactivationPermissions(
    user: UserAggregate,
    context: CommandContext
  ): void {
    // 🚫 Não pode desativar a si mesmo
    if (user.getId() === context.userId) {
      throw new Error('Cannot deactivate yourself');
    }
    
    // 🚫 Não pode desativar SUPER_ADMIN
    if (user.getRole() === 'SUPER_ADMIN') {
      throw new Error('Cannot deactivate SUPER_ADMIN user');
    }
    
    // 👑 Verificar permissões
    if (!context.permissions.includes('user:deactivate')) {
      throw new Error('Insufficient permissions to deactivate user');
    }
  }
  
  // 📧 Notificações privadas
  private async sendWelcomeEmail(
    user: UserAggregate,
    context: CommandContext
  ): Promise<void> {
    await this.emailService.send({
      to: user.getEmail(),
      template: 'user-welcome',
      data: {
        name: user.getName(),
        loginUrl: process.env.APP_URL + '/login',
      },
      metadata: {
        userId: user.getId(),
        correlationId: context.correlationId,
        traceId: context.traceId,
      },
    });
  }
  
  private async sendEmailVerification(
    user: UserAggregate,
    newEmail: string,
    context: CommandContext
  ): Promise<void> {
    await this.emailService.send({
      to: newEmail,
      template: 'email-verification',
      data: {
        name: user.getName(),
        verificationUrl: process.env.APP_URL + `/verify-email?token=...`,
      },
      metadata: {
        userId: user.getId(),
        correlationId: context.correlationId,
        traceId: context.traceId,
      },
    });
  }
  
  private async notifyRoleChange(
    user: UserAggregate,
    newRole: string,
    context: CommandContext
  ): Promise<void> {
    await this.emailService.send({
      to: user.getEmail(),
      template: 'role-changed',
      data: {
        name: user.getName(),
        newRole,
        changedBy: context.userId,
      },
      metadata: {
        userId: user.getId(),
        correlationId: context.correlationId,
        traceId: context.traceId,
      },
    });
  }
  
  private async notifyUserDeactivation(
    user: UserAggregate,
    reason: string,
    context: CommandContext
  ): Promise<void> {
    await this.emailService.send({
      to: user.getEmail(),
      template: 'account-deactivated',
      data: {
        name: user.getName(),
        reason,
        supportEmail: process.env.SUPPORT_EMAIL,
      },
      metadata: {
        userId: user.getId(),
        correlationId: context.correlationId,
        traceId: context.traceId,
      },
    });
  }
}

🌐 tRPC Router com CQRS

server/routers/user-cqrs-router.ts
// 📁 server/routers/user-cqrs-router.ts
import { z } from 'zod';
import { router, protectedProcedure, adminProcedure } from '../trpc';
import { UserApplicationService } from '@/application/user/user-application-service';
import { UserQueryService } from '@/infrastructure/projections/user-read-model';
import {
  createUserCommandSchema,
  changeUserEmailCommandSchema,
  changeUserRoleCommandSchema,
  deactivateUserCommandSchema,
} from '@/application/user/commands';

// 🎯 User CQRS Router
export const userCQRSRouter = router({
  // 📝 COMMANDS (Write Side)
  commands: router({
    // 👤 Criar usuário
    create: adminProcedure
      .input(createUserCommandSchema)
      .mutation(async ({ input, ctx }) => {
        const context = {
          userId: ctx.session.user.id,
          correlationId: ctx.traceId,
          traceId: ctx.traceId,
          organizationId: ctx.organization?.id,
          permissions: ctx.session.user.permissions,
        };
        
        return ctx.userApplicationService.createUser(input, context);
      }),
    
    // 📧 Alterar email
    changeEmail: protectedProcedure
      .input(changeUserEmailCommandSchema)
      .mutation(async ({ input, ctx }) => {
        const context = {
          userId: ctx.session.user.id,
          correlationId: ctx.traceId,
          traceId: ctx.traceId,
          organizationId: ctx.organization?.id,
          permissions: ctx.session.user.permissions,
        };
        
        return ctx.userApplicationService.changeUserEmail(input, context);
      }),
    
    // 👑 Alterar role
    changeRole: adminProcedure
      .input(changeUserRoleCommandSchema)
      .mutation(async ({ input, ctx }) => {
        const context = {
          userId: ctx.session.user.id,
          correlationId: ctx.traceId,
          traceId: ctx.traceId,
          organizationId: ctx.organization?.id,
          permissions: ctx.session.user.permissions,
        };
        
        return ctx.userApplicationService.changeUserRole(input, context);
      }),
    
    // 🗑️ Desativar usuário
    deactivate: adminProcedure
      .input(deactivateUserCommandSchema)
      .mutation(async ({ input, ctx }) => {
        const context = {
          userId: ctx.session.user.id,
          correlationId: ctx.traceId,
          traceId: ctx.traceId,
          organizationId: ctx.organization?.id,
          permissions: ctx.session.user.permissions,
        };
        
        return ctx.userApplicationService.deactivateUser(input, context);
      }),
  }),
  
  // 🔍 QUERIES (Read Side)
  queries: router({
    // 👤 Buscar usuário por ID
    getById: protectedProcedure
      .input(z.object({ id: z.string().uuid() }))
      .query(async ({ input, ctx }) => {
        return ctx.userQueryService.getById(input.id);
      }),
    
    // 📧 Buscar usuário por email
    getByEmail: protectedProcedure
      .input(z.object({ email: z.string().email() }))
      .query(async ({ input, ctx }) => {
        return ctx.userQueryService.getByEmail(input.email);
      }),
    
    // 📊 Listar usuários com paginação
    list: protectedProcedure
      .input(z.object({
        organizationId: z.string().uuid().optional(),
        role: z.enum(['USER', 'ADMIN', 'SUPER_ADMIN']).optional(),
        isActive: z.boolean().optional(),
        search: z.string().optional(),
        page: z.number().min(1).default(1),
        limit: z.number().min(1).max(100).default(20),
        sortBy: z.string().optional(),
        sortOrder: z.enum(['asc', 'desc']).default('desc'),
      }))
      .query(async ({ input, ctx }) => {
        // 🔒 Filtrar por organização se não for SUPER_ADMIN
        const organizationId = ctx.session.user.role === 'SUPER_ADMIN' 
          ? input.organizationId 
          : ctx.organization?.id;
        
        return ctx.userQueryService.getUsers({
          ...input,
          organizationId,
        });
      }),
    
    // 📊 Estatísticas de usuários
    stats: protectedProcedure
      .input(z.object({
        organizationId: z.string().uuid().optional(),
      }))
      .query(async ({ input, ctx }) => {
        // 🔒 Filtrar por organização se não for SUPER_ADMIN
        const organizationId = ctx.session.user.role === 'SUPER_ADMIN' 
          ? input.organizationId 
          : ctx.organization?.id;
        
        return ctx.userQueryService.getStats(organizationId);
      }),
  }),
  
  // 📊 EVENT HISTORY (Para auditoria)
  events: router({
    // 📜 Histórico de eventos de um usuário
    getHistory: protectedProcedure
      .input(z.object({
        userId: z.string().uuid(),
        fromDate: z.date().optional(),
        toDate: z.date().optional(),
        eventTypes: z.array(z.string()).optional(),
        limit: z.number().min(1).max(100).default(50),
        offset: z.number().min(0).default(0),
      }))
      .query(async ({ input, ctx }) => {
        // 🔒 Verificar permissões para ver histórico
        if (input.userId !== ctx.session.user.id && 
            !ctx.utils.hasPermission('user:view_history')) {
          throw new TRPCError({
            code: 'FORBIDDEN',
            message: 'Insufficient permissions to view user history',
          });
        }
        
        return ctx.eventStore.queryEvents({
          aggregateType: 'User',
          aggregateId: input.userId,
          fromDate: input.fromDate,
          toDate: input.toDate,
          eventTypes: input.eventTypes,
          limit: input.limit,
          offset: input.offset,
        });
      }),
    
    // 📊 Estatísticas de eventos
    getEventStats: adminProcedure
      .input(z.object({
        organizationId: z.string().uuid().optional(),
        fromDate: z.date().optional(),
        toDate: z.date().optional(),
      }))
      .query(async ({ input, ctx }) => {
        return ctx.eventStore.queryEvents({
          aggregateType: 'User',
          fromDate: input.fromDate,
          toDate: input.toDate,
          // Filtrar por organização através de metadata seria mais complexo
          // Em uma implementação real, usaríamos uma projection específica
        });
      }),
  }),
  
  // 📋 PROJECTIONS MANAGEMENT (Admin only)
  projections: router({
    // 🔄 Rebuild projection
    rebuild: adminProcedure
      .input(z.object({
        projectionName: z.enum(['UserReadModel', 'UserStatistics']),
        fromEventId: z.string().optional(),
      }))
      .mutation(async ({ input, ctx }) => {
        // ⚡ Rebuild specific projection
        return ctx.projectionService.rebuild(input.projectionName, {
          fromEventId: input.fromEventId,
          correlationId: ctx.traceId,
        });
      }),
    
    // 📊 Status das projections
    getStatus: adminProcedure
      .query(async ({ ctx }) => {
        return ctx.projectionService.getStatus();
      }),
  }),
});

// 📁 server/routers/event-store-router.ts
// 🎯 Router para administração do Event Store

export const eventStoreRouter = router({
  // 📊 Estatísticas do Event Store
  stats: adminProcedure
    .query(async ({ ctx }) => {
      return ctx.eventStore.getStats();
    }),
  
  // 🔍 Buscar eventos
  queryEvents: adminProcedure
    .input(z.object({
      aggregateType: z.string().optional(),
      aggregateId: z.string().uuid().optional(),
      eventTypes: z.array(z.string()).optional(),
      fromDate: z.date().optional(),
      toDate: z.date().optional(),
      userId: z.string().uuid().optional(),
      correlationId: z.string().optional(),
      limit: z.number().min(1).max(1000).default(100),
      offset: z.number().min(0).default(0),
    }))
    .query(async ({ input, ctx }) => {
      return ctx.eventStore.queryEvents(input);
    }),
  
  // 📜 Stream de eventos específico
  getEventStream: adminProcedure
    .input(z.object({
      aggregateId: z.string().uuid(),
      aggregateType: z.string(),
      fromVersion: z.number().min(0).default(0),
    }))
    .query(async ({ input, ctx }) => {
      return ctx.eventStore.loadEvents(
        input.aggregateId,
        input.aggregateType,
        input.fromVersion
      );
    }),
  
  // 📊 Health check do Event Store
  health: adminProcedure
    .query(async ({ ctx }) => {
      try {
        const stats = await ctx.eventStore.getStats();
        return {
          status: 'healthy',
          timestamp: new Date().toISOString(),
          stats,
        };
      } catch (error) {
        return {
          status: 'unhealthy',
          timestamp: new Date().toISOString(),
          error: error.message,
        };
      }
    }),
});

// 📁 server/context.ts - Context atualizado
import { EventStore } from '@/shared/event-sourcing/event-store';
import { UserApplicationService } from '@/application/user/user-application-service';
import { UserQueryService } from '@/infrastructure/projections/user-read-model';

export interface Context {
  // ... outros campos do contexto
  
  // 📊 Event Sourcing services
  eventStore: EventStore;
  userApplicationService: UserApplicationService;
  userQueryService: UserQueryService;
  projectionService: ProjectionService;
}

export async function createContext({ req }: { req: NextRequest }): Promise<Context> {
  // ... setup do contexto base
  
  // 🔧 Setup Event Sourcing services
  const eventStore = new EventStore(prisma);
  const userRepository = new UserRepository(eventStore);
  const userApplicationService = new UserApplicationService(
    userRepository,
    emailService,
    logger
  );
  const userQueryService = new UserQueryService(prisma);
  
  return {
    // ... outros campos
    eventStore,
    userApplicationService,
    userQueryService,
    projectionService,
  };
}

🔄 Sagas e Process Managers

domain/sagas/user-onboarding-saga.ts
// 📁 domain/sagas/user-onboarding-saga.ts
import { DomainEvent } from '@/shared/event-sourcing/event-store';
import { Logger } from '@/infrastructure/logging/logger';

// 🎯 Interface para Saga State
export interface SagaState {
  sagaId: string;
  correlationId: string;
  status: 'started' | 'completed' | 'failed' | 'compensating';
  startedAt: string;
  completedAt?: string;
  currentStep: string;
  data: Record<string, any>;
  compensationSteps: string[];
}

// 🔄 Base Saga class
export abstract class BaseSaga {
  protected logger: Logger;
  protected state: SagaState;
  
  constructor(sagaId: string, correlationId: string, logger: Logger) {
    this.logger = logger;
    this.state = {
      sagaId,
      correlationId,
      status: 'started',
      startedAt: new Date().toISOString(),
      currentStep: 'initial',
      data: {},
      compensationSteps: [],
    };
  }
  
  abstract handle(event: DomainEvent): Promise<void>;
  abstract compensate(): Promise<void>;
  
  protected updateState(updates: Partial<SagaState>): void {
    this.state = { ...this.state, ...updates };
  }
  
  protected complete(): void {
    this.updateState({
      status: 'completed',
      completedAt: new Date().toISOString(),
    });
  }
  
  protected fail(error: string): void {
    this.updateState({ status: 'failed' });
    this.logger.error('Saga failed', {
      sagaId: this.state.sagaId,
      correlationId: this.state.correlationId,
      error,
      currentStep: this.state.currentStep,
    });
  }
  
  getState(): SagaState {
    return { ...this.state };
  }
}

// 👤 User Onboarding Saga
export class UserOnboardingSaga extends BaseSaga {
  private userApplicationService: UserApplicationService;
  private organizationService: OrganizationService;
  private emailService: EmailService;
  private analyticsService: AnalyticsService;
  
  constructor(
    sagaId: string,
    correlationId: string,
    logger: Logger,
    services: {
      userApplicationService: UserApplicationService;
      organizationService: OrganizationService;
      emailService: EmailService;
      analyticsService: AnalyticsService;
    }
  ) {
    super(sagaId, correlationId, logger);
    
    this.userApplicationService = services.userApplicationService;
    this.organizationService = services.organizationService;
    this.emailService = services.emailService;
    this.analyticsService = services.analyticsService;
  }
  
  // 🎯 Handle incoming events
  async handle(event: DomainEvent): Promise<void> {
    this.logger.info('Saga handling event', {
      sagaId: this.state.sagaId,
      eventType: event.eventType,
      currentStep: this.state.currentStep,
    });
    
    try {
      switch (event.eventType) {
        case 'OnboardingStarted':
          await this.handleOnboardingStarted(event);
          break;
        case 'UserCreated':
          await this.handleUserCreated(event);
          break;
        case 'OrganizationCreated':
          await this.handleOrganizationCreated(event);
          break;
        case 'WelcomeEmailSent':
          await this.handleWelcomeEmailSent(event);
          break;
        case 'UserProfileCompleted':
          await this.handleUserProfileCompleted(event);
          break;
        case 'OnboardingTaskFailed':
          await this.handleTaskFailure(event);
          break;
        default:
          this.logger.warn('Unhandled event type in saga', {
            sagaId: this.state.sagaId,
            eventType: event.eventType,
          });
      }
    } catch (error) {
      this.fail(error.message);
      await this.compensate();
    }
  }
  
  // 🚀 Step 1: Onboarding iniciado
  private async handleOnboardingStarted(event: DomainEvent): Promise<void> {
    this.updateState({
      currentStep: 'creating-organization',
      data: {
        ...this.state.data,
        userEmail: event.eventData.userEmail,
        userName: event.eventData.userName,
        organizationName: event.eventData.organizationName,
        plan: event.eventData.plan || 'FREE',
      },
    });
    
    // 🏢 Criar organização
    try {
      await this.organizationService.create({
        name: this.state.data.organizationName,
        plan: this.state.data.plan,
        ownerId: event.eventData.userId,
        correlationId: this.state.correlationId,
      });
      
      this.state.compensationSteps.push('deleteOrganization');
    } catch (error) {
      throw new Error(`Failed to create organization: ${error.message}`);
    }
  }
  
  // 🏢 Step 2: Organização criada
  private async handleOrganizationCreated(event: DomainEvent): Promise<void> {
    this.updateState({
      currentStep: 'creating-user',
      data: {
        ...this.state.data,
        organizationId: event.aggregateId,
      },
    });
    
    // 👤 Criar usuário
    try {
      await this.userApplicationService.createUser({
        id: this.generateUserId(),
        email: this.state.data.userEmail,
        name: this.state.data.userName,
        role: 'ADMIN', // Owner da organização
        organizationId: this.state.data.organizationId,
        metadata: {
          onboardingSagaId: this.state.sagaId,
          source: 'onboarding',
        },
      }, {
        userId: 'system',
        correlationId: this.state.correlationId,
        permissions: ['user:create_admin'],
      });
      
      this.state.compensationSteps.push('deleteUser');
    } catch (error) {
      throw new Error(`Failed to create user: ${error.message}`);
    }
  }
  
  // 👤 Step 3: Usuário criado
  private async handleUserCreated(event: DomainEvent): Promise<void> {
    this.updateState({
      currentStep: 'sending-welcome-email',
      data: {
        ...this.state.data,
        userId: event.aggregateId,
      },
    });
    
    // 📧 Enviar email de boas-vindas
    try {
      await this.emailService.send({
        to: this.state.data.userEmail,
        template: 'onboarding-welcome',
        data: {
          userName: this.state.data.userName,
          organizationName: this.state.data.organizationName,
          setupUrl: `${process.env.APP_URL}/onboarding/setup?token=${this.generateSetupToken()}`,
        },
        metadata: {
          sagaId: this.state.sagaId,
          correlationId: this.state.correlationId,
          userId: this.state.data.userId,
        },
      });
    } catch (error) {
      // Email failure is not critical, continue saga
      this.logger.warn('Failed to send welcome email', {
        sagaId: this.state.sagaId,
        error: error.message,
      });
    }
  }
  
  // 📧 Step 4: Welcome email enviado
  private async handleWelcomeEmailSent(event: DomainEvent): Promise<void> {
    this.updateState({
      currentStep: 'tracking-analytics',
    });
    
    // 📊 Track onboarding metrics
    try {
      await this.analyticsService.track({
        event: 'onboarding_user_created',
        userId: this.state.data.userId,
        properties: {
          organizationId: this.state.data.organizationId,
          plan: this.state.data.plan,
          sagaId: this.state.sagaId,
          onboardingSource: 'web',
          timestamp: new Date().toISOString(),
        },
      });
      
      this.updateState({ currentStep: 'waiting-profile-completion' });
    } catch (error) {
      this.logger.warn('Failed to track analytics', {
        sagaId: this.state.sagaId,
        error: error.message,
      });
    }
  }
  
  // ✅ Step 5: Profile completed (saga completion)
  private async handleUserProfileCompleted(event: DomainEvent): Promise<void> {
    this.updateState({ currentStep: 'finalizing-onboarding' });
    
    // 📊 Track successful onboarding
    try {
      await this.analyticsService.track({
        event: 'onboarding_completed',
        userId: this.state.data.userId,
        properties: {
          organizationId: this.state.data.organizationId,
          duration: Date.now() - new Date(this.state.startedAt).getTime(),
          sagaId: this.state.sagaId,
          completedSteps: ['organization', 'user', 'email', 'profile'],
        },
      });
      
      // 📧 Send completion email
      await this.emailService.send({
        to: this.state.data.userEmail,
        template: 'onboarding-completed',
        data: {
          userName: this.state.data.userName,
          dashboardUrl: `${process.env.APP_URL}/dashboard`,
        },
      });
      
      this.complete();
      
      this.logger.info('User onboarding completed successfully', {
        sagaId: this.state.sagaId,
        userId: this.state.data.userId,
        organizationId: this.state.data.organizationId,
        duration: Date.now() - new Date(this.state.startedAt).getTime(),
      });
    } catch (error) {
      this.logger.warn('Failed to finalize onboarding', {
        sagaId: this.state.sagaId,
        error: error.message,
      });
      // Still complete the saga as core onboarding is done
      this.complete();
    }
  }
  
  // ❌ Handle task failures
  private async handleTaskFailure(event: DomainEvent): Promise<void> {
    this.logger.error('Onboarding task failed', {
      sagaId: this.state.sagaId,
      failedStep: event.eventData.step,
      error: event.eventData.error,
    });
    
    this.fail(`Task failed at step ${event.eventData.step}: ${event.eventData.error}`);
    await this.compensate();
  }
  
  // 🔄 Compensate (rollback) saga
  async compensate(): Promise<void> {
    this.updateState({ status: 'compensating' });
    
    this.logger.info('Starting saga compensation', {
      sagaId: this.state.sagaId,
      compensationSteps: this.state.compensationSteps,
    });
    
    // 🔄 Execute compensation steps in reverse order
    for (let i = this.state.compensationSteps.length - 1; i >= 0; i--) {
      const step = this.state.compensationSteps[i];
      
      try {
        switch (step) {
          case 'deleteUser':
            if (this.state.data.userId) {
              await this.userApplicationService.deactivateUser({
                id: this.state.data.userId,
                reason: 'Onboarding saga compensation',
              }, {
                userId: 'system',
                correlationId: this.state.correlationId,
                permissions: ['user:deactivate'],
              });
            }
            break;
            
          case 'deleteOrganization':
            if (this.state.data.organizationId) {
              await this.organizationService.delete({
                id: this.state.data.organizationId,
                reason: 'Onboarding saga compensation',
              });
            }
            break;
            
          default:
            this.logger.warn('Unknown compensation step', {
              sagaId: this.state.sagaId,
              step,
            });
        }
        
        this.logger.info('Compensation step completed', {
          sagaId: this.state.sagaId,
          step,
        });
      } catch (error) {
        this.logger.error('Compensation step failed', {
          sagaId: this.state.sagaId,
          step,
          error: error.message,
        });
        // Continue with other compensation steps
      }
    }
    
    this.logger.info('Saga compensation completed', {
      sagaId: this.state.sagaId,
    });
  }
  
  // 🔧 Utilities
  private generateUserId(): string {
    return `user_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`;
  }
  
  private generateSetupToken(): string {
    return `setup_${this.state.sagaId}_${Date.now()}`;
  }
}

// 📁 infrastructure/sagas/saga-manager.ts
// 🎯 Saga Manager para orchestrar sagas

export class SagaManager {
  private sagas: Map<string, BaseSaga> = new Map();
  private eventStore: EventStore;
  private logger: Logger;
  
  constructor(eventStore: EventStore, logger: Logger) {
    this.eventStore = eventStore;
    this.logger = logger;
    this.setupEventHandlers();
  }
  
  // 🔧 Setup event handlers
  private setupEventHandlers(): void {
    this.eventStore.registerEventHandler('OnboardingStarted', this.handleEvent.bind(this));
    this.eventStore.registerEventHandler('UserCreated', this.handleEvent.bind(this));
    this.eventStore.registerEventHandler('OrganizationCreated', this.handleEvent.bind(this));
    this.eventStore.registerEventHandler('WelcomeEmailSent', this.handleEvent.bind(this));
    this.eventStore.registerEventHandler('UserProfileCompleted', this.handleEvent.bind(this));
    this.eventStore.registerEventHandler('OnboardingTaskFailed', this.handleEvent.bind(this));
  }
  
  // 🚀 Start new saga
  startSaga<T extends BaseSaga>(
    sagaType: new (...args: any[]) => T,
    correlationId: string,
    initialEvent: DomainEvent,
    ...args: any[]
  ): void {
    const sagaId = this.generateSagaId();
    const saga = new sagaType(sagaId, correlationId, this.logger, ...args);
    
    this.sagas.set(correlationId, saga);
    
    this.logger.info('Saga started', {
      sagaId,
      correlationId,
      sagaType: sagaType.name,
      initialEvent: initialEvent.eventType,
    });
    
    // Handle initial event
    saga.handle(initialEvent);
  }
  
  // 📨 Handle incoming events
  private async handleEvent(event: DomainEvent): Promise<void> {
    const correlationId = event.metadata.correlationId;
    const saga = this.sagas.get(correlationId);
    
    if (!saga) {
      // No saga found for this correlation ID
      return;
    }
    
    try {
      await saga.handle(event);
      
      // Clean up completed or failed sagas
      const state = saga.getState();
      if (state.status === 'completed' || state.status === 'failed') {
        this.sagas.delete(correlationId);
        
        this.logger.info('Saga finished and cleaned up', {
          sagaId: state.sagaId,
          correlationId,
          status: state.status,
          duration: Date.now() - new Date(state.startedAt).getTime(),
        });
      }
    } catch (error) {
      this.logger.error('Error handling event in saga', {
        correlationId,
        eventType: event.eventType,
        error: error.message,
      });
    }
  }
  
  // 📊 Get saga status
  getSagaStatus(correlationId: string): SagaState | null {
    const saga = this.sagas.get(correlationId);
    return saga ? saga.getState() : null;
  }
  
  // 📋 Get all active sagas
  getActiveSagas(): SagaState[] {
    return Array.from(this.sagas.values()).map(saga => saga.getState());
  }
  
  // 🔧 Generate unique saga ID
  private generateSagaId(): string {
    return `saga_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`;
  }
  
  // 🧹 Cleanup expired sagas
  async cleanupExpiredSagas(maxAge: number = 24 * 60 * 60 * 1000): Promise<void> {
    const now = Date.now();
    const expired = [];
    
    for (const [correlationId, saga] of this.sagas.entries()) {
      const state = saga.getState();
      const age = now - new Date(state.startedAt).getTime();
      
      if (age > maxAge) {
        expired.push(correlationId);
      }
    }
    
    for (const correlationId of expired) {
      const saga = this.sagas.get(correlationId)!;
      const state = saga.getState();
      
      this.logger.warn('Cleaning up expired saga', {
        sagaId: state.sagaId,
        correlationId,
        age: now - new Date(state.startedAt).getTime(),
      });
      
      this.sagas.delete(correlationId);
    }
    
    this.logger.info('Saga cleanup completed', {
      expiredSagas: expired.length,
      activeSagas: this.sagas.size,
    });
  }
}

💡 Melhores Práticas para Event Sourcing e CQRS

Eventos Imutáveis:Nunca altere eventos já salvos. Crie novos eventos para correções.

Versionamento de Eventos:Planeje evolução de esquemas com backward compatibility.

Projections Idempotentes:Garanta que reprocessar eventos não cause inconsistências.

Snapshots para Performance:Use snapshots para aggregates com muitos eventos.