Domine Event Sourcing e CQRS com tRPC: event store, command handlers, projections, sagas e arquitetura escalável para SaaS enterprise com auditoria completa.
Auditoria Completa: Cada mudança é registrada como evento, garantindo compliance e rastreabilidade total.
Escalabilidade: CQRS permite otimizar leitura e escrita independentemente, suportando milhões de usuários.
// 📁 shared/event-sourcing/event-store.ts
import { PrismaClient } from '@prisma/client';
import { z } from 'zod';
// 🎯 Interfaces base para eventos
export interface DomainEvent {
id: string;
aggregateId: string;
aggregateType: string;
eventType: string;
eventVersion: number;
eventData: any;
metadata: {
userId?: string;
correlationId: string;
causationId?: string;
timestamp: string;
source: string;
traceId?: string;
};
}
export interface EventStream {
aggregateId: string;
aggregateType: string;
events: DomainEvent[];
version: number;
}
// 📊 Event Store principal
export class EventStore {
private prisma: PrismaClient;
private eventHandlers: Map<string, ((event: DomainEvent) => Promise<void>)[]> = new Map();
constructor(prisma: PrismaClient) {
this.prisma = prisma;
}
// 💾 Salvar eventos
async saveEvents(
aggregateId: string,
aggregateType: string,
events: Omit<DomainEvent, 'id'>[],
expectedVersion: number
): Promise<void> {
await this.prisma.$transaction(async (tx) => {
// 🔒 Verificar versão para evitar conflitos
const currentStream = await tx.eventStream.findUnique({
where: {
aggregateId_aggregateType: {
aggregateId,
aggregateType,
},
},
});
if (currentStream && currentStream.version !== expectedVersion) {
throw new Error(`Concurrency conflict. Expected version ${expectedVersion}, but current is ${currentStream.version}`);
}
// 💾 Salvar eventos
const eventRecords = events.map((event, index) => ({
id: this.generateEventId(),
aggregateId,
aggregateType,
eventType: event.eventType,
eventVersion: event.eventVersion,
eventData: JSON.stringify(event.eventData),
metadata: JSON.stringify(event.metadata),
sequenceNumber: (currentStream?.version || 0) + index + 1,
createdAt: new Date(),
}));
await tx.event.createMany({
data: eventRecords,
});
// 🔄 Atualizar stream
const newVersion = (currentStream?.version || 0) + events.length;
await tx.eventStream.upsert({
where: {
aggregateId_aggregateType: {
aggregateId,
aggregateType,
},
},
create: {
aggregateId,
aggregateType,
version: newVersion,
lastEventId: eventRecords[eventRecords.length - 1].id,
updatedAt: new Date(),
},
update: {
version: newVersion,
lastEventId: eventRecords[eventRecords.length - 1].id,
updatedAt: new Date(),
},
});
// 📢 Publicar eventos para handlers
for (const eventRecord of eventRecords) {
const domainEvent: DomainEvent = {
id: eventRecord.id,
aggregateId: eventRecord.aggregateId,
aggregateType: eventRecord.aggregateType,
eventType: eventRecord.eventType,
eventVersion: eventRecord.eventVersion,
eventData: JSON.parse(eventRecord.eventData),
metadata: JSON.parse(eventRecord.metadata),
};
await this.publishEvent(domainEvent);
}
});
}
// 📖 Carregar eventos de um aggregate
async loadEvents(
aggregateId: string,
aggregateType: string,
fromVersion: number = 0
): Promise<EventStream> {
const events = await this.prisma.event.findMany({
where: {
aggregateId,
aggregateType,
sequenceNumber: {
gt: fromVersion,
},
},
orderBy: {
sequenceNumber: 'asc',
},
});
const domainEvents: DomainEvent[] = events.map(event => ({
id: event.id,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
eventType: event.eventType,
eventVersion: event.eventVersion,
eventData: JSON.parse(event.eventData),
metadata: JSON.parse(event.metadata),
}));
const stream = await this.prisma.eventStream.findUnique({
where: {
aggregateId_aggregateType: {
aggregateId,
aggregateType,
},
},
});
return {
aggregateId,
aggregateType,
events: domainEvents,
version: stream?.version || 0,
};
}
// 📊 Carregar eventos por tipo
async loadEventsByType(
eventType: string,
fromTimestamp?: Date,
limit: number = 100
): Promise<DomainEvent[]> {
const events = await this.prisma.event.findMany({
where: {
eventType,
...(fromTimestamp && {
createdAt: {
gte: fromTimestamp,
},
}),
},
orderBy: {
createdAt: 'asc',
},
take: limit,
});
return events.map(event => ({
id: event.id,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
eventType: event.eventType,
eventVersion: event.eventVersion,
eventData: JSON.parse(event.eventData),
metadata: JSON.parse(event.metadata),
}));
}
// 🔍 Buscar eventos com filtros
async queryEvents(filters: {
aggregateType?: string;
eventTypes?: string[];
fromDate?: Date;
toDate?: Date;
userId?: string;
correlationId?: string;
limit?: number;
offset?: number;
}): Promise<{
events: DomainEvent[];
total: number;
}> {
const where: any = {};
if (filters.aggregateType) {
where.aggregateType = filters.aggregateType;
}
if (filters.eventTypes?.length) {
where.eventType = { in: filters.eventTypes };
}
if (filters.fromDate || filters.toDate) {
where.createdAt = {};
if (filters.fromDate) where.createdAt.gte = filters.fromDate;
if (filters.toDate) where.createdAt.lte = filters.toDate;
}
// 🔍 Filtros em metadata
if (filters.userId || filters.correlationId) {
const metadataFilters = [];
if (filters.userId) {
metadataFilters.push(`JSON_EXTRACT(metadata, '$.userId') = '${filters.userId}'`);
}
if (filters.correlationId) {
metadataFilters.push(`JSON_EXTRACT(metadata, '$.correlationId') = '${filters.correlationId}'`);
}
where.AND = metadataFilters.map(filter =>
this.prisma.$queryRawUnsafe(`SELECT 1 WHERE ${filter}`)
);
}
const [events, total] = await Promise.all([
this.prisma.event.findMany({
where,
orderBy: { createdAt: 'desc' },
skip: filters.offset || 0,
take: filters.limit || 50,
}),
this.prisma.event.count({ where }),
]);
return {
events: events.map(event => ({
id: event.id,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
eventType: event.eventType,
eventVersion: event.eventVersion,
eventData: JSON.parse(event.eventData),
metadata: JSON.parse(event.metadata),
})),
total,
};
}
// 📢 Registrar handler de evento
registerEventHandler(
eventType: string,
handler: (event: DomainEvent) => Promise<void>
): void {
if (!this.eventHandlers.has(eventType)) {
this.eventHandlers.set(eventType, []);
}
this.eventHandlers.get(eventType)!.push(handler);
}
// 📤 Publicar evento para handlers
private async publishEvent(event: DomainEvent): Promise<void> {
const handlers = this.eventHandlers.get(event.eventType) || [];
// 🔄 Executar handlers em paralelo
await Promise.allSettled(
handlers.map(handler => handler(event))
);
}
// 🔑 Gerar ID único para evento
private generateEventId(): string {
return `evt_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`;
}
// 📊 Obter estatísticas do event store
async getStats(): Promise<{
totalEvents: number;
totalStreams: number;
eventsByType: Record<string, number>;
recentActivity: {
period: string;
eventCount: number;
}[];
}> {
const [totalEvents, totalStreams, eventsByType, recentActivity] = await Promise.all([
this.prisma.event.count(),
this.prisma.eventStream.count(),
this.prisma.event.groupBy({
by: ['eventType'],
_count: { id: true },
}),
this.prisma.$queryRaw`
SELECT
DATE_TRUNC('hour', "createdAt") as period,
COUNT(*) as eventCount
FROM "Event"
WHERE "createdAt" >= NOW() - INTERVAL '24 hours'
GROUP BY DATE_TRUNC('hour', "createdAt")
ORDER BY period DESC
LIMIT 24
`,
]);
return {
totalEvents,
totalStreams,
eventsByType: eventsByType.reduce((acc, item) => {
acc[item.eventType] = item._count.id;
return acc;
}, {} as Record<string, number>),
recentActivity: (recentActivity as any[]).map(item => ({
period: item.period.toISOString(),
eventCount: parseInt(item.eventCount),
})),
};
}
}
// 📁 domain/user/user-aggregate.ts
import { DomainEvent } from '@/shared/event-sourcing/event-store';
// 🎯 Domain Events para User
export interface UserCreatedEvent extends DomainEvent {
eventType: 'UserCreated';
eventData: {
email: string;
name: string;
organizationId?: string;
role: string;
metadata: Record<string, any>;
};
}
export interface UserEmailChangedEvent extends DomainEvent {
eventType: 'UserEmailChanged';
eventData: {
previousEmail: string;
newEmail: string;
verificationRequired: boolean;
};
}
export interface UserRoleChangedEvent extends DomainEvent {
eventType: 'UserRoleChanged';
eventData: {
previousRole: string;
newRole: string;
changedBy: string;
reason?: string;
};
}
export interface UserDeactivatedEvent extends DomainEvent {
eventType: 'UserDeactivated';
eventData: {
reason: string;
deactivatedBy: string;
scheduledDeletion?: string;
};
}
// 🏗️ User Aggregate Root
export class UserAggregate {
private id: string;
private email: string;
private name: string;
private role: string;
private organizationId?: string;
private isActive: boolean;
private version: number;
private uncommittedEvents: DomainEvent[] = [];
constructor(id: string) {
this.id = id;
this.version = 0;
this.isActive = true;
}
// 📖 Aplicar evento (para reconstrução do estado)
apply(event: DomainEvent): void {
switch (event.eventType) {
case 'UserCreated':
this.applyUserCreated(event as UserCreatedEvent);
break;
case 'UserEmailChanged':
this.applyUserEmailChanged(event as UserEmailChangedEvent);
break;
case 'UserRoleChanged':
this.applyUserRoleChanged(event as UserRoleChangedEvent);
break;
case 'UserDeactivated':
this.applyUserDeactivated(event as UserDeactivatedEvent);
break;
default:
throw new Error(`Unknown event type: ${event.eventType}`);
}
this.version++;
}
// 🎯 Command: Criar usuário
static create(params: {
id: string;
email: string;
name: string;
role: string;
organizationId?: string;
metadata?: Record<string, any>;
correlationId: string;
userId?: string;
}): UserAggregate {
const user = new UserAggregate(params.id);
// ✅ Validações de domínio
if (!params.email || !user.isValidEmail(params.email)) {
throw new Error('Invalid email address');
}
if (!params.name || params.name.trim().length < 2) {
throw new Error('Name must be at least 2 characters');
}
if (!['USER', 'ADMIN', 'SUPER_ADMIN'].includes(params.role)) {
throw new Error('Invalid role');
}
// 📢 Gerar evento
const event: UserCreatedEvent = {
id: user.generateEventId(),
aggregateId: params.id,
aggregateType: 'User',
eventType: 'UserCreated',
eventVersion: 1,
eventData: {
email: params.email,
name: params.name,
organizationId: params.organizationId,
role: params.role,
metadata: params.metadata || {},
},
metadata: {
correlationId: params.correlationId,
timestamp: new Date().toISOString(),
source: 'user-service',
userId: params.userId,
},
};
user.raiseEvent(event);
return user;
}
// 🔄 Command: Alterar email
changeEmail(params: {
newEmail: string;
correlationId: string;
userId: string;
}): void {
// ✅ Validações
if (!this.isActive) {
throw new Error('Cannot change email of inactive user');
}
if (!params.newEmail || !this.isValidEmail(params.newEmail)) {
throw new Error('Invalid email address');
}
if (params.newEmail === this.email) {
return; // Nenhuma mudança necessária
}
// 📢 Gerar evento
const event: UserEmailChangedEvent = {
id: this.generateEventId(),
aggregateId: this.id,
aggregateType: 'User',
eventType: 'UserEmailChanged',
eventVersion: 1,
eventData: {
previousEmail: this.email,
newEmail: params.newEmail,
verificationRequired: true,
},
metadata: {
correlationId: params.correlationId,
timestamp: new Date().toISOString(),
source: 'user-service',
userId: params.userId,
},
};
this.raiseEvent(event);
}
// 👑 Command: Alterar role
changeRole(params: {
newRole: string;
reason?: string;
correlationId: string;
userId: string;
}): void {
// ✅ Validações
if (!this.isActive) {
throw new Error('Cannot change role of inactive user');
}
if (!['USER', 'ADMIN', 'SUPER_ADMIN'].includes(params.newRole)) {
throw new Error('Invalid role');
}
if (params.newRole === this.role) {
return; // Nenhuma mudança necessária
}
// 🔒 Regras de negócio específicas
if (this.role === 'SUPER_ADMIN' && params.newRole !== 'SUPER_ADMIN') {
throw new Error('Cannot downgrade SUPER_ADMIN role');
}
// 📢 Gerar evento
const event: UserRoleChangedEvent = {
id: this.generateEventId(),
aggregateId: this.id,
aggregateType: 'User',
eventType: 'UserRoleChanged',
eventVersion: 1,
eventData: {
previousRole: this.role,
newRole: params.newRole,
changedBy: params.userId,
reason: params.reason,
},
metadata: {
correlationId: params.correlationId,
timestamp: new Date().toISOString(),
source: 'user-service',
userId: params.userId,
},
};
this.raiseEvent(event);
}
// 🗑️ Command: Desativar usuário
deactivate(params: {
reason: string;
scheduledDeletion?: Date;
correlationId: string;
userId: string;
}): void {
// ✅ Validações
if (!this.isActive) {
throw new Error('User is already inactive');
}
if (!params.reason || params.reason.trim().length < 5) {
throw new Error('Deactivation reason must be at least 5 characters');
}
// 📢 Gerar evento
const event: UserDeactivatedEvent = {
id: this.generateEventId(),
aggregateId: this.id,
aggregateType: 'User',
eventType: 'UserDeactivated',
eventVersion: 1,
eventData: {
reason: params.reason,
deactivatedBy: params.userId,
scheduledDeletion: params.scheduledDeletion?.toISOString(),
},
metadata: {
correlationId: params.correlationId,
timestamp: new Date().toISOString(),
source: 'user-service',
userId: params.userId,
},
};
this.raiseEvent(event);
}
// 📢 Adicionar evento não confirmado
private raiseEvent(event: DomainEvent): void {
this.uncommittedEvents.push(event);
this.apply(event);
}
// 📋 Obter eventos não confirmados
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents];
}
// ✅ Marcar eventos como confirmados
markEventsAsCommitted(): void {
this.uncommittedEvents = [];
}
// 🎯 Event Handlers
private applyUserCreated(event: UserCreatedEvent): void {
this.email = event.eventData.email;
this.name = event.eventData.name;
this.role = event.eventData.role;
this.organizationId = event.eventData.organizationId;
this.isActive = true;
}
private applyUserEmailChanged(event: UserEmailChangedEvent): void {
this.email = event.eventData.newEmail;
}
private applyUserRoleChanged(event: UserRoleChangedEvent): void {
this.role = event.eventData.newRole;
}
private applyUserDeactivated(event: UserDeactivatedEvent): void {
this.isActive = false;
}
// 🔧 Utilitários
private isValidEmail(email: string): boolean {
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
return emailRegex.test(email);
}
private generateEventId(): string {
return `evt_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`;
}
// 📊 Getters
getId(): string { return this.id; }
getEmail(): string { return this.email; }
getName(): string { return this.name; }
getRole(): string { return this.role; }
getOrganizationId(): string | undefined { return this.organizationId; }
getIsActive(): boolean { return this.isActive; }
getVersion(): number { return this.version; }
}
// 📁 domain/user/user-repository.ts
import { EventStore } from '@/shared/event-sourcing/event-store';
import { UserAggregate } from './user-aggregate';
// 🎯 Interface do repositório
export interface IUserRepository {
save(user: UserAggregate): Promise<void>;
getById(id: string): Promise<UserAggregate | null>;
exists(id: string): Promise<boolean>;
getByEmail(email: string): Promise<UserAggregate | null>;
}
// 📚 Implementação do repositório
export class UserRepository implements IUserRepository {
private eventStore: EventStore;
private cache: Map<string, UserAggregate> = new Map();
constructor(eventStore: EventStore) {
this.eventStore = eventStore;
}
// 💾 Salvar aggregate
async save(user: UserAggregate): Promise<void> {
const uncommittedEvents = user.getUncommittedEvents();
if (uncommittedEvents.length === 0) {
return; // Nenhuma mudança para salvar
}
// 💾 Salvar eventos no event store
await this.eventStore.saveEvents(
user.getId(),
'User',
uncommittedEvents,
user.getVersion() - uncommittedEvents.length
);
// ✅ Marcar eventos como confirmados
user.markEventsAsCommitted();
// 💾 Atualizar cache
this.cache.set(user.getId(), user);
}
// 📖 Carregar aggregate por ID
async getById(id: string): Promise<UserAggregate | null> {
// 🔍 Verificar cache primeiro
if (this.cache.has(id)) {
return this.cache.get(id)!;
}
// 📖 Carregar eventos do event store
const eventStream = await this.eventStore.loadEvents(id, 'User');
if (eventStream.events.length === 0) {
return null; // Aggregate não existe
}
// 🏗️ Reconstruir aggregate a partir dos eventos
const user = new UserAggregate(id);
for (const event of eventStream.events) {
user.apply(event);
}
// 💾 Adicionar ao cache
this.cache.set(id, user);
return user;
}
// ✅ Verificar se aggregate existe
async exists(id: string): Promise<boolean> {
if (this.cache.has(id)) {
return true;
}
const eventStream = await this.eventStore.loadEvents(id, 'User');
return eventStream.events.length > 0;
}
// 🔍 Buscar por email (usando projection)
async getByEmail(email: string): Promise<UserAggregate | null> {
// 🎯 Aqui usaríamos uma projection/read model
// Por simplicidade, vamos buscar nos eventos
const events = await this.eventStore.loadEventsByType('UserCreated');
for (const event of events) {
if (event.eventData.email === email) {
return this.getById(event.aggregateId);
}
}
// 🔍 Também verificar mudanças de email
const emailChangeEvents = await this.eventStore.loadEventsByType('UserEmailChanged');
for (const event of emailChangeEvents) {
if (event.eventData.newEmail === email) {
return this.getById(event.aggregateId);
}
}
return null;
}
// 🧹 Limpar cache
clearCache(): void {
this.cache.clear();
}
// 📊 Estatísticas do repositório
getCacheStats(): {
size: number;
hitRate: number;
} {
return {
size: this.cache.size,
hitRate: 0.85, // Seria calculado com métricas reais
};
}
}
// 📁 infrastructure/projections/user-read-model.ts
// 🎯 Read Model para queries otimizadas
export interface UserReadModel {
id: string;
email: string;
name: string;
role: string;
organizationId?: string;
isActive: boolean;
createdAt: string;
updatedAt: string;
lastLoginAt?: string;
metadata: Record<string, any>;
}
// 📊 Projection Handler para User
export class UserProjectionHandler {
private prisma: PrismaClient;
constructor(prisma: PrismaClient) {
this.prisma = prisma;
}
// 🎯 Setup dos event handlers
setupEventHandlers(eventStore: EventStore): void {
eventStore.registerEventHandler('UserCreated', this.handleUserCreated.bind(this));
eventStore.registerEventHandler('UserEmailChanged', this.handleUserEmailChanged.bind(this));
eventStore.registerEventHandler('UserRoleChanged', this.handleUserRoleChanged.bind(this));
eventStore.registerEventHandler('UserDeactivated', this.handleUserDeactivated.bind(this));
}
// 👤 Handler: Usuário criado
private async handleUserCreated(event: DomainEvent): Promise<void> {
try {
await this.prisma.userReadModel.create({
data: {
id: event.aggregateId,
email: event.eventData.email,
name: event.eventData.name,
role: event.eventData.role,
organizationId: event.eventData.organizationId,
isActive: true,
createdAt: event.metadata.timestamp,
updatedAt: event.metadata.timestamp,
metadata: event.eventData.metadata,
},
});
console.log(`✅ User read model created: ${event.aggregateId}`);
} catch (error) {
console.error('Failed to handle UserCreated event:', error);
throw error;
}
}
// 📧 Handler: Email alterado
private async handleUserEmailChanged(event: DomainEvent): Promise<void> {
try {
await this.prisma.userReadModel.update({
where: { id: event.aggregateId },
data: {
email: event.eventData.newEmail,
updatedAt: event.metadata.timestamp,
},
});
console.log(`📧 User email updated: ${event.aggregateId}`);
} catch (error) {
console.error('Failed to handle UserEmailChanged event:', error);
throw error;
}
}
// 👑 Handler: Role alterado
private async handleUserRoleChanged(event: DomainEvent): Promise<void> {
try {
await this.prisma.userReadModel.update({
where: { id: event.aggregateId },
data: {
role: event.eventData.newRole,
updatedAt: event.metadata.timestamp,
},
});
console.log(`👑 User role updated: ${event.aggregateId}`);
} catch (error) {
console.error('Failed to handle UserRoleChanged event:', error);
throw error;
}
}
// 🗑️ Handler: Usuário desativado
private async handleUserDeactivated(event: DomainEvent): Promise<void> {
try {
await this.prisma.userReadModel.update({
where: { id: event.aggregateId },
data: {
isActive: false,
updatedAt: event.metadata.timestamp,
},
});
console.log(`🗑️ User deactivated: ${event.aggregateId}`);
} catch (error) {
console.error('Failed to handle UserDeactivated event:', error);
throw error;
}
}
}
// 🔍 Query Service para read models
export class UserQueryService {
private prisma: PrismaClient;
constructor(prisma: PrismaClient) {
this.prisma = prisma;
}
// 🔍 Buscar usuário por ID
async getById(id: string): Promise<UserReadModel | null> {
return this.prisma.userReadModel.findUnique({
where: { id },
});
}
// 📧 Buscar usuário por email
async getByEmail(email: string): Promise<UserReadModel | null> {
return this.prisma.userReadModel.findUnique({
where: { email },
});
}
// 📊 Buscar usuários com paginação
async getUsers(params: {
organizationId?: string;
role?: string;
isActive?: boolean;
search?: string;
page: number;
limit: number;
sortBy?: string;
sortOrder?: 'asc' | 'desc';
}): Promise<{
users: UserReadModel[];
total: number;
page: number;
totalPages: number;
}> {
const where: any = {};
if (params.organizationId) {
where.organizationId = params.organizationId;
}
if (params.role) {
where.role = params.role;
}
if (params.isActive !== undefined) {
where.isActive = params.isActive;
}
if (params.search) {
where.OR = [
{ name: { contains: params.search, mode: 'insensitive' } },
{ email: { contains: params.search, mode: 'insensitive' } },
];
}
const [users, total] = await Promise.all([
this.prisma.userReadModel.findMany({
where,
skip: (params.page - 1) * params.limit,
take: params.limit,
orderBy: {
[params.sortBy || 'createdAt']: params.sortOrder || 'desc',
},
}),
this.prisma.userReadModel.count({ where }),
]);
return {
users,
total,
page: params.page,
totalPages: Math.ceil(total / params.limit),
};
}
// 📊 Estatísticas de usuários
async getStats(organizationId?: string): Promise<{
total: number;
active: number;
inactive: number;
byRole: Record<string, number>;
recentSignups: number;
}> {
const where = organizationId ? { organizationId } : {};
const [total, active, byRole, recentSignups] = await Promise.all([
this.prisma.userReadModel.count({ where }),
this.prisma.userReadModel.count({
where: { ...where, isActive: true }
}),
this.prisma.userReadModel.groupBy({
by: ['role'],
where,
_count: { id: true },
}),
this.prisma.userReadModel.count({
where: {
...where,
createdAt: {
gte: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000), // 7 dias
},
},
}),
]);
return {
total,
active,
inactive: total - active,
byRole: byRole.reduce((acc, item) => {
acc[item.role] = item._count.id;
return acc;
}, {} as Record<string, number>),
recentSignups,
};
}
}
// 📁 application/user/commands.ts
import { z } from 'zod';
// 🎯 Command schemas
export const createUserCommandSchema = z.object({
id: z.string().uuid(),
email: z.string().email(),
name: z.string().min(2).max(100),
role: z.enum(['USER', 'ADMIN', 'SUPER_ADMIN']),
organizationId: z.string().uuid().optional(),
metadata: z.record(z.any()).optional(),
});
export const changeUserEmailCommandSchema = z.object({
id: z.string().uuid(),
newEmail: z.string().email(),
});
export const changeUserRoleCommandSchema = z.object({
id: z.string().uuid(),
newRole: z.enum(['USER', 'ADMIN', 'SUPER_ADMIN']),
reason: z.string().min(5).max(500).optional(),
});
export const deactivateUserCommandSchema = z.object({
id: z.string().uuid(),
reason: z.string().min(5).max(500),
scheduledDeletion: z.date().optional(),
});
// 🎯 Command types
export type CreateUserCommand = z.infer<typeof createUserCommandSchema>;
export type ChangeUserEmailCommand = z.infer<typeof changeUserEmailCommandSchema>;
export type ChangeUserRoleCommand = z.infer<typeof changeUserRoleCommandSchema>;
export type DeactivateUserCommand = z.infer<typeof deactivateUserCommandSchema>;
// 📁 application/user/user-application-service.ts
import { IUserRepository } from '@/domain/user/user-repository';
import { UserAggregate } from '@/domain/user/user-aggregate';
import { EmailService } from '@/infrastructure/email/email-service';
import { Logger } from '@/infrastructure/logging/logger';
// 📋 Context para comandos
export interface CommandContext {
userId: string;
correlationId: string;
traceId?: string;
organizationId?: string;
permissions: string[];
}
// ⚡ Application Service principal
export class UserApplicationService {
private userRepository: IUserRepository;
private emailService: EmailService;
private logger: Logger;
constructor(
userRepository: IUserRepository,
emailService: EmailService,
logger: Logger
) {
this.userRepository = userRepository;
this.emailService = emailService;
this.logger = logger;
}
// 👤 Command: Criar usuário
async createUser(
command: CreateUserCommand,
context: CommandContext
): Promise<{ id: string; success: boolean }> {
this.logger.info('Creating user', {
command,
context,
traceId: context.traceId,
});
try {
// ✅ Validações de negócio
await this.validateCreateUser(command, context);
// 🏗️ Criar aggregate
const user = UserAggregate.create({
id: command.id,
email: command.email,
name: command.name,
role: command.role,
organizationId: command.organizationId,
metadata: command.metadata,
correlationId: context.correlationId,
userId: context.userId,
});
// 💾 Salvar no repositório
await this.userRepository.save(user);
// 📧 Enviar email de boas-vindas (async)
this.sendWelcomeEmail(user, context).catch(error => {
this.logger.error('Failed to send welcome email', {
userId: user.getId(),
error: error.message,
traceId: context.traceId,
});
});
this.logger.info('User created successfully', {
userId: user.getId(),
email: command.email,
traceId: context.traceId,
});
return { id: user.getId(), success: true };
} catch (error) {
this.logger.error('Failed to create user', {
command,
error: error.message,
traceId: context.traceId,
});
throw error;
}
}
// 📧 Command: Alterar email
async changeUserEmail(
command: ChangeUserEmailCommand,
context: CommandContext
): Promise<{ success: boolean }> {
this.logger.info('Changing user email', {
userId: command.id,
newEmail: command.newEmail,
traceId: context.traceId,
});
try {
// 🔍 Carregar aggregate
const user = await this.userRepository.getById(command.id);
if (!user) {
throw new Error('User not found');
}
// ✅ Verificar permissões
this.validateEmailChangePermissions(user, context);
// ✅ Verificar se email já está em uso
const existingUser = await this.userRepository.getByEmail(command.newEmail);
if (existingUser && existingUser.getId() !== user.getId()) {
throw new Error('Email already in use');
}
// 🔄 Executar comando
user.changeEmail({
newEmail: command.newEmail,
correlationId: context.correlationId,
userId: context.userId,
});
// 💾 Salvar mudanças
await this.userRepository.save(user);
// 📧 Enviar email de confirmação
this.sendEmailVerification(user, command.newEmail, context).catch(error => {
this.logger.error('Failed to send email verification', {
userId: user.getId(),
error: error.message,
traceId: context.traceId,
});
});
this.logger.info('User email changed successfully', {
userId: user.getId(),
newEmail: command.newEmail,
traceId: context.traceId,
});
return { success: true };
} catch (error) {
this.logger.error('Failed to change user email', {
command,
error: error.message,
traceId: context.traceId,
});
throw error;
}
}
// 👑 Command: Alterar role
async changeUserRole(
command: ChangeUserRoleCommand,
context: CommandContext
): Promise<{ success: boolean }> {
this.logger.info('Changing user role', {
userId: command.id,
newRole: command.newRole,
traceId: context.traceId,
});
try {
// 🔍 Carregar aggregate
const user = await this.userRepository.getById(command.id);
if (!user) {
throw new Error('User not found');
}
// ✅ Verificar permissões
this.validateRoleChangePermissions(user, command.newRole, context);
// 🔄 Executar comando
user.changeRole({
newRole: command.newRole,
reason: command.reason,
correlationId: context.correlationId,
userId: context.userId,
});
// 💾 Salvar mudanças
await this.userRepository.save(user);
// 📧 Notificar mudança de role
this.notifyRoleChange(user, command.newRole, context).catch(error => {
this.logger.error('Failed to notify role change', {
userId: user.getId(),
error: error.message,
traceId: context.traceId,
});
});
this.logger.info('User role changed successfully', {
userId: user.getId(),
newRole: command.newRole,
reason: command.reason,
traceId: context.traceId,
});
return { success: true };
} catch (error) {
this.logger.error('Failed to change user role', {
command,
error: error.message,
traceId: context.traceId,
});
throw error;
}
}
// 🗑️ Command: Desativar usuário
async deactivateUser(
command: DeactivateUserCommand,
context: CommandContext
): Promise<{ success: boolean }> {
this.logger.info('Deactivating user', {
userId: command.id,
reason: command.reason,
traceId: context.traceId,
});
try {
// 🔍 Carregar aggregate
const user = await this.userRepository.getById(command.id);
if (!user) {
throw new Error('User not found');
}
// ✅ Verificar permissões
this.validateDeactivationPermissions(user, context);
// 🔄 Executar comando
user.deactivate({
reason: command.reason,
scheduledDeletion: command.scheduledDeletion,
correlationId: context.correlationId,
userId: context.userId,
});
// 💾 Salvar mudanças
await this.userRepository.save(user);
// 📧 Notificar desativação
this.notifyUserDeactivation(user, command.reason, context).catch(error => {
this.logger.error('Failed to notify user deactivation', {
userId: user.getId(),
error: error.message,
traceId: context.traceId,
});
});
this.logger.info('User deactivated successfully', {
userId: user.getId(),
reason: command.reason,
traceId: context.traceId,
});
return { success: true };
} catch (error) {
this.logger.error('Failed to deactivate user', {
command,
error: error.message,
traceId: context.traceId,
});
throw error;
}
}
// ✅ Validações privadas
private async validateCreateUser(
command: CreateUserCommand,
context: CommandContext
): Promise<void> {
// 🔍 Verificar se email já existe
const existingUser = await this.userRepository.getByEmail(command.email);
if (existingUser) {
throw new Error('Email already in use');
}
// 🔍 Verificar se ID já existe
const userExists = await this.userRepository.exists(command.id);
if (userExists) {
throw new Error('User ID already exists');
}
// 👑 Verificar permissões para criar admin
if (['ADMIN', 'SUPER_ADMIN'].includes(command.role)) {
if (!context.permissions.includes('user:create_admin')) {
throw new Error('Insufficient permissions to create admin user');
}
}
}
private validateEmailChangePermissions(
user: UserAggregate,
context: CommandContext
): void {
// 👤 Usuário pode alterar próprio email
if (user.getId() === context.userId) {
return;
}
// 👑 Admin pode alterar email de usuários da mesma organização
if (context.permissions.includes('user:manage') &&
user.getOrganizationId() === context.organizationId) {
return;
}
throw new Error('Insufficient permissions to change user email');
}
private validateRoleChangePermissions(
user: UserAggregate,
newRole: string,
context: CommandContext
): void {
// 🚫 Não pode alterar próprio role
if (user.getId() === context.userId) {
throw new Error('Cannot change your own role');
}
// 👑 Verificar permissões baseadas no role target
if (newRole === 'SUPER_ADMIN' && !context.permissions.includes('user:create_super_admin')) {
throw new Error('Insufficient permissions to assign SUPER_ADMIN role');
}
if (newRole === 'ADMIN' && !context.permissions.includes('user:create_admin')) {
throw new Error('Insufficient permissions to assign ADMIN role');
}
// 🏢 Verificar mesma organização
if (user.getOrganizationId() !== context.organizationId) {
throw new Error('Cannot change role of user from different organization');
}
}
private validateDeactivationPermissions(
user: UserAggregate,
context: CommandContext
): void {
// 🚫 Não pode desativar a si mesmo
if (user.getId() === context.userId) {
throw new Error('Cannot deactivate yourself');
}
// 🚫 Não pode desativar SUPER_ADMIN
if (user.getRole() === 'SUPER_ADMIN') {
throw new Error('Cannot deactivate SUPER_ADMIN user');
}
// 👑 Verificar permissões
if (!context.permissions.includes('user:deactivate')) {
throw new Error('Insufficient permissions to deactivate user');
}
}
// 📧 Notificações privadas
private async sendWelcomeEmail(
user: UserAggregate,
context: CommandContext
): Promise<void> {
await this.emailService.send({
to: user.getEmail(),
template: 'user-welcome',
data: {
name: user.getName(),
loginUrl: process.env.APP_URL + '/login',
},
metadata: {
userId: user.getId(),
correlationId: context.correlationId,
traceId: context.traceId,
},
});
}
private async sendEmailVerification(
user: UserAggregate,
newEmail: string,
context: CommandContext
): Promise<void> {
await this.emailService.send({
to: newEmail,
template: 'email-verification',
data: {
name: user.getName(),
verificationUrl: process.env.APP_URL + `/verify-email?token=...`,
},
metadata: {
userId: user.getId(),
correlationId: context.correlationId,
traceId: context.traceId,
},
});
}
private async notifyRoleChange(
user: UserAggregate,
newRole: string,
context: CommandContext
): Promise<void> {
await this.emailService.send({
to: user.getEmail(),
template: 'role-changed',
data: {
name: user.getName(),
newRole,
changedBy: context.userId,
},
metadata: {
userId: user.getId(),
correlationId: context.correlationId,
traceId: context.traceId,
},
});
}
private async notifyUserDeactivation(
user: UserAggregate,
reason: string,
context: CommandContext
): Promise<void> {
await this.emailService.send({
to: user.getEmail(),
template: 'account-deactivated',
data: {
name: user.getName(),
reason,
supportEmail: process.env.SUPPORT_EMAIL,
},
metadata: {
userId: user.getId(),
correlationId: context.correlationId,
traceId: context.traceId,
},
});
}
}
// 📁 server/routers/user-cqrs-router.ts
import { z } from 'zod';
import { router, protectedProcedure, adminProcedure } from '../trpc';
import { UserApplicationService } from '@/application/user/user-application-service';
import { UserQueryService } from '@/infrastructure/projections/user-read-model';
import {
createUserCommandSchema,
changeUserEmailCommandSchema,
changeUserRoleCommandSchema,
deactivateUserCommandSchema,
} from '@/application/user/commands';
// 🎯 User CQRS Router
export const userCQRSRouter = router({
// 📝 COMMANDS (Write Side)
commands: router({
// 👤 Criar usuário
create: adminProcedure
.input(createUserCommandSchema)
.mutation(async ({ input, ctx }) => {
const context = {
userId: ctx.session.user.id,
correlationId: ctx.traceId,
traceId: ctx.traceId,
organizationId: ctx.organization?.id,
permissions: ctx.session.user.permissions,
};
return ctx.userApplicationService.createUser(input, context);
}),
// 📧 Alterar email
changeEmail: protectedProcedure
.input(changeUserEmailCommandSchema)
.mutation(async ({ input, ctx }) => {
const context = {
userId: ctx.session.user.id,
correlationId: ctx.traceId,
traceId: ctx.traceId,
organizationId: ctx.organization?.id,
permissions: ctx.session.user.permissions,
};
return ctx.userApplicationService.changeUserEmail(input, context);
}),
// 👑 Alterar role
changeRole: adminProcedure
.input(changeUserRoleCommandSchema)
.mutation(async ({ input, ctx }) => {
const context = {
userId: ctx.session.user.id,
correlationId: ctx.traceId,
traceId: ctx.traceId,
organizationId: ctx.organization?.id,
permissions: ctx.session.user.permissions,
};
return ctx.userApplicationService.changeUserRole(input, context);
}),
// 🗑️ Desativar usuário
deactivate: adminProcedure
.input(deactivateUserCommandSchema)
.mutation(async ({ input, ctx }) => {
const context = {
userId: ctx.session.user.id,
correlationId: ctx.traceId,
traceId: ctx.traceId,
organizationId: ctx.organization?.id,
permissions: ctx.session.user.permissions,
};
return ctx.userApplicationService.deactivateUser(input, context);
}),
}),
// 🔍 QUERIES (Read Side)
queries: router({
// 👤 Buscar usuário por ID
getById: protectedProcedure
.input(z.object({ id: z.string().uuid() }))
.query(async ({ input, ctx }) => {
return ctx.userQueryService.getById(input.id);
}),
// 📧 Buscar usuário por email
getByEmail: protectedProcedure
.input(z.object({ email: z.string().email() }))
.query(async ({ input, ctx }) => {
return ctx.userQueryService.getByEmail(input.email);
}),
// 📊 Listar usuários com paginação
list: protectedProcedure
.input(z.object({
organizationId: z.string().uuid().optional(),
role: z.enum(['USER', 'ADMIN', 'SUPER_ADMIN']).optional(),
isActive: z.boolean().optional(),
search: z.string().optional(),
page: z.number().min(1).default(1),
limit: z.number().min(1).max(100).default(20),
sortBy: z.string().optional(),
sortOrder: z.enum(['asc', 'desc']).default('desc'),
}))
.query(async ({ input, ctx }) => {
// 🔒 Filtrar por organização se não for SUPER_ADMIN
const organizationId = ctx.session.user.role === 'SUPER_ADMIN'
? input.organizationId
: ctx.organization?.id;
return ctx.userQueryService.getUsers({
...input,
organizationId,
});
}),
// 📊 Estatísticas de usuários
stats: protectedProcedure
.input(z.object({
organizationId: z.string().uuid().optional(),
}))
.query(async ({ input, ctx }) => {
// 🔒 Filtrar por organização se não for SUPER_ADMIN
const organizationId = ctx.session.user.role === 'SUPER_ADMIN'
? input.organizationId
: ctx.organization?.id;
return ctx.userQueryService.getStats(organizationId);
}),
}),
// 📊 EVENT HISTORY (Para auditoria)
events: router({
// 📜 Histórico de eventos de um usuário
getHistory: protectedProcedure
.input(z.object({
userId: z.string().uuid(),
fromDate: z.date().optional(),
toDate: z.date().optional(),
eventTypes: z.array(z.string()).optional(),
limit: z.number().min(1).max(100).default(50),
offset: z.number().min(0).default(0),
}))
.query(async ({ input, ctx }) => {
// 🔒 Verificar permissões para ver histórico
if (input.userId !== ctx.session.user.id &&
!ctx.utils.hasPermission('user:view_history')) {
throw new TRPCError({
code: 'FORBIDDEN',
message: 'Insufficient permissions to view user history',
});
}
return ctx.eventStore.queryEvents({
aggregateType: 'User',
aggregateId: input.userId,
fromDate: input.fromDate,
toDate: input.toDate,
eventTypes: input.eventTypes,
limit: input.limit,
offset: input.offset,
});
}),
// 📊 Estatísticas de eventos
getEventStats: adminProcedure
.input(z.object({
organizationId: z.string().uuid().optional(),
fromDate: z.date().optional(),
toDate: z.date().optional(),
}))
.query(async ({ input, ctx }) => {
return ctx.eventStore.queryEvents({
aggregateType: 'User',
fromDate: input.fromDate,
toDate: input.toDate,
// Filtrar por organização através de metadata seria mais complexo
// Em uma implementação real, usaríamos uma projection específica
});
}),
}),
// 📋 PROJECTIONS MANAGEMENT (Admin only)
projections: router({
// 🔄 Rebuild projection
rebuild: adminProcedure
.input(z.object({
projectionName: z.enum(['UserReadModel', 'UserStatistics']),
fromEventId: z.string().optional(),
}))
.mutation(async ({ input, ctx }) => {
// ⚡ Rebuild specific projection
return ctx.projectionService.rebuild(input.projectionName, {
fromEventId: input.fromEventId,
correlationId: ctx.traceId,
});
}),
// 📊 Status das projections
getStatus: adminProcedure
.query(async ({ ctx }) => {
return ctx.projectionService.getStatus();
}),
}),
});
// 📁 server/routers/event-store-router.ts
// 🎯 Router para administração do Event Store
export const eventStoreRouter = router({
// 📊 Estatísticas do Event Store
stats: adminProcedure
.query(async ({ ctx }) => {
return ctx.eventStore.getStats();
}),
// 🔍 Buscar eventos
queryEvents: adminProcedure
.input(z.object({
aggregateType: z.string().optional(),
aggregateId: z.string().uuid().optional(),
eventTypes: z.array(z.string()).optional(),
fromDate: z.date().optional(),
toDate: z.date().optional(),
userId: z.string().uuid().optional(),
correlationId: z.string().optional(),
limit: z.number().min(1).max(1000).default(100),
offset: z.number().min(0).default(0),
}))
.query(async ({ input, ctx }) => {
return ctx.eventStore.queryEvents(input);
}),
// 📜 Stream de eventos específico
getEventStream: adminProcedure
.input(z.object({
aggregateId: z.string().uuid(),
aggregateType: z.string(),
fromVersion: z.number().min(0).default(0),
}))
.query(async ({ input, ctx }) => {
return ctx.eventStore.loadEvents(
input.aggregateId,
input.aggregateType,
input.fromVersion
);
}),
// 📊 Health check do Event Store
health: adminProcedure
.query(async ({ ctx }) => {
try {
const stats = await ctx.eventStore.getStats();
return {
status: 'healthy',
timestamp: new Date().toISOString(),
stats,
};
} catch (error) {
return {
status: 'unhealthy',
timestamp: new Date().toISOString(),
error: error.message,
};
}
}),
});
// 📁 server/context.ts - Context atualizado
import { EventStore } from '@/shared/event-sourcing/event-store';
import { UserApplicationService } from '@/application/user/user-application-service';
import { UserQueryService } from '@/infrastructure/projections/user-read-model';
export interface Context {
// ... outros campos do contexto
// 📊 Event Sourcing services
eventStore: EventStore;
userApplicationService: UserApplicationService;
userQueryService: UserQueryService;
projectionService: ProjectionService;
}
export async function createContext({ req }: { req: NextRequest }): Promise<Context> {
// ... setup do contexto base
// 🔧 Setup Event Sourcing services
const eventStore = new EventStore(prisma);
const userRepository = new UserRepository(eventStore);
const userApplicationService = new UserApplicationService(
userRepository,
emailService,
logger
);
const userQueryService = new UserQueryService(prisma);
return {
// ... outros campos
eventStore,
userApplicationService,
userQueryService,
projectionService,
};
}
// 📁 domain/sagas/user-onboarding-saga.ts
import { DomainEvent } from '@/shared/event-sourcing/event-store';
import { Logger } from '@/infrastructure/logging/logger';
// 🎯 Interface para Saga State
export interface SagaState {
sagaId: string;
correlationId: string;
status: 'started' | 'completed' | 'failed' | 'compensating';
startedAt: string;
completedAt?: string;
currentStep: string;
data: Record<string, any>;
compensationSteps: string[];
}
// 🔄 Base Saga class
export abstract class BaseSaga {
protected logger: Logger;
protected state: SagaState;
constructor(sagaId: string, correlationId: string, logger: Logger) {
this.logger = logger;
this.state = {
sagaId,
correlationId,
status: 'started',
startedAt: new Date().toISOString(),
currentStep: 'initial',
data: {},
compensationSteps: [],
};
}
abstract handle(event: DomainEvent): Promise<void>;
abstract compensate(): Promise<void>;
protected updateState(updates: Partial<SagaState>): void {
this.state = { ...this.state, ...updates };
}
protected complete(): void {
this.updateState({
status: 'completed',
completedAt: new Date().toISOString(),
});
}
protected fail(error: string): void {
this.updateState({ status: 'failed' });
this.logger.error('Saga failed', {
sagaId: this.state.sagaId,
correlationId: this.state.correlationId,
error,
currentStep: this.state.currentStep,
});
}
getState(): SagaState {
return { ...this.state };
}
}
// 👤 User Onboarding Saga
export class UserOnboardingSaga extends BaseSaga {
private userApplicationService: UserApplicationService;
private organizationService: OrganizationService;
private emailService: EmailService;
private analyticsService: AnalyticsService;
constructor(
sagaId: string,
correlationId: string,
logger: Logger,
services: {
userApplicationService: UserApplicationService;
organizationService: OrganizationService;
emailService: EmailService;
analyticsService: AnalyticsService;
}
) {
super(sagaId, correlationId, logger);
this.userApplicationService = services.userApplicationService;
this.organizationService = services.organizationService;
this.emailService = services.emailService;
this.analyticsService = services.analyticsService;
}
// 🎯 Handle incoming events
async handle(event: DomainEvent): Promise<void> {
this.logger.info('Saga handling event', {
sagaId: this.state.sagaId,
eventType: event.eventType,
currentStep: this.state.currentStep,
});
try {
switch (event.eventType) {
case 'OnboardingStarted':
await this.handleOnboardingStarted(event);
break;
case 'UserCreated':
await this.handleUserCreated(event);
break;
case 'OrganizationCreated':
await this.handleOrganizationCreated(event);
break;
case 'WelcomeEmailSent':
await this.handleWelcomeEmailSent(event);
break;
case 'UserProfileCompleted':
await this.handleUserProfileCompleted(event);
break;
case 'OnboardingTaskFailed':
await this.handleTaskFailure(event);
break;
default:
this.logger.warn('Unhandled event type in saga', {
sagaId: this.state.sagaId,
eventType: event.eventType,
});
}
} catch (error) {
this.fail(error.message);
await this.compensate();
}
}
// 🚀 Step 1: Onboarding iniciado
private async handleOnboardingStarted(event: DomainEvent): Promise<void> {
this.updateState({
currentStep: 'creating-organization',
data: {
...this.state.data,
userEmail: event.eventData.userEmail,
userName: event.eventData.userName,
organizationName: event.eventData.organizationName,
plan: event.eventData.plan || 'FREE',
},
});
// 🏢 Criar organização
try {
await this.organizationService.create({
name: this.state.data.organizationName,
plan: this.state.data.plan,
ownerId: event.eventData.userId,
correlationId: this.state.correlationId,
});
this.state.compensationSteps.push('deleteOrganization');
} catch (error) {
throw new Error(`Failed to create organization: ${error.message}`);
}
}
// 🏢 Step 2: Organização criada
private async handleOrganizationCreated(event: DomainEvent): Promise<void> {
this.updateState({
currentStep: 'creating-user',
data: {
...this.state.data,
organizationId: event.aggregateId,
},
});
// 👤 Criar usuário
try {
await this.userApplicationService.createUser({
id: this.generateUserId(),
email: this.state.data.userEmail,
name: this.state.data.userName,
role: 'ADMIN', // Owner da organização
organizationId: this.state.data.organizationId,
metadata: {
onboardingSagaId: this.state.sagaId,
source: 'onboarding',
},
}, {
userId: 'system',
correlationId: this.state.correlationId,
permissions: ['user:create_admin'],
});
this.state.compensationSteps.push('deleteUser');
} catch (error) {
throw new Error(`Failed to create user: ${error.message}`);
}
}
// 👤 Step 3: Usuário criado
private async handleUserCreated(event: DomainEvent): Promise<void> {
this.updateState({
currentStep: 'sending-welcome-email',
data: {
...this.state.data,
userId: event.aggregateId,
},
});
// 📧 Enviar email de boas-vindas
try {
await this.emailService.send({
to: this.state.data.userEmail,
template: 'onboarding-welcome',
data: {
userName: this.state.data.userName,
organizationName: this.state.data.organizationName,
setupUrl: `${process.env.APP_URL}/onboarding/setup?token=${this.generateSetupToken()}`,
},
metadata: {
sagaId: this.state.sagaId,
correlationId: this.state.correlationId,
userId: this.state.data.userId,
},
});
} catch (error) {
// Email failure is not critical, continue saga
this.logger.warn('Failed to send welcome email', {
sagaId: this.state.sagaId,
error: error.message,
});
}
}
// 📧 Step 4: Welcome email enviado
private async handleWelcomeEmailSent(event: DomainEvent): Promise<void> {
this.updateState({
currentStep: 'tracking-analytics',
});
// 📊 Track onboarding metrics
try {
await this.analyticsService.track({
event: 'onboarding_user_created',
userId: this.state.data.userId,
properties: {
organizationId: this.state.data.organizationId,
plan: this.state.data.plan,
sagaId: this.state.sagaId,
onboardingSource: 'web',
timestamp: new Date().toISOString(),
},
});
this.updateState({ currentStep: 'waiting-profile-completion' });
} catch (error) {
this.logger.warn('Failed to track analytics', {
sagaId: this.state.sagaId,
error: error.message,
});
}
}
// ✅ Step 5: Profile completed (saga completion)
private async handleUserProfileCompleted(event: DomainEvent): Promise<void> {
this.updateState({ currentStep: 'finalizing-onboarding' });
// 📊 Track successful onboarding
try {
await this.analyticsService.track({
event: 'onboarding_completed',
userId: this.state.data.userId,
properties: {
organizationId: this.state.data.organizationId,
duration: Date.now() - new Date(this.state.startedAt).getTime(),
sagaId: this.state.sagaId,
completedSteps: ['organization', 'user', 'email', 'profile'],
},
});
// 📧 Send completion email
await this.emailService.send({
to: this.state.data.userEmail,
template: 'onboarding-completed',
data: {
userName: this.state.data.userName,
dashboardUrl: `${process.env.APP_URL}/dashboard`,
},
});
this.complete();
this.logger.info('User onboarding completed successfully', {
sagaId: this.state.sagaId,
userId: this.state.data.userId,
organizationId: this.state.data.organizationId,
duration: Date.now() - new Date(this.state.startedAt).getTime(),
});
} catch (error) {
this.logger.warn('Failed to finalize onboarding', {
sagaId: this.state.sagaId,
error: error.message,
});
// Still complete the saga as core onboarding is done
this.complete();
}
}
// ❌ Handle task failures
private async handleTaskFailure(event: DomainEvent): Promise<void> {
this.logger.error('Onboarding task failed', {
sagaId: this.state.sagaId,
failedStep: event.eventData.step,
error: event.eventData.error,
});
this.fail(`Task failed at step ${event.eventData.step}: ${event.eventData.error}`);
await this.compensate();
}
// 🔄 Compensate (rollback) saga
async compensate(): Promise<void> {
this.updateState({ status: 'compensating' });
this.logger.info('Starting saga compensation', {
sagaId: this.state.sagaId,
compensationSteps: this.state.compensationSteps,
});
// 🔄 Execute compensation steps in reverse order
for (let i = this.state.compensationSteps.length - 1; i >= 0; i--) {
const step = this.state.compensationSteps[i];
try {
switch (step) {
case 'deleteUser':
if (this.state.data.userId) {
await this.userApplicationService.deactivateUser({
id: this.state.data.userId,
reason: 'Onboarding saga compensation',
}, {
userId: 'system',
correlationId: this.state.correlationId,
permissions: ['user:deactivate'],
});
}
break;
case 'deleteOrganization':
if (this.state.data.organizationId) {
await this.organizationService.delete({
id: this.state.data.organizationId,
reason: 'Onboarding saga compensation',
});
}
break;
default:
this.logger.warn('Unknown compensation step', {
sagaId: this.state.sagaId,
step,
});
}
this.logger.info('Compensation step completed', {
sagaId: this.state.sagaId,
step,
});
} catch (error) {
this.logger.error('Compensation step failed', {
sagaId: this.state.sagaId,
step,
error: error.message,
});
// Continue with other compensation steps
}
}
this.logger.info('Saga compensation completed', {
sagaId: this.state.sagaId,
});
}
// 🔧 Utilities
private generateUserId(): string {
return `user_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`;
}
private generateSetupToken(): string {
return `setup_${this.state.sagaId}_${Date.now()}`;
}
}
// 📁 infrastructure/sagas/saga-manager.ts
// 🎯 Saga Manager para orchestrar sagas
export class SagaManager {
private sagas: Map<string, BaseSaga> = new Map();
private eventStore: EventStore;
private logger: Logger;
constructor(eventStore: EventStore, logger: Logger) {
this.eventStore = eventStore;
this.logger = logger;
this.setupEventHandlers();
}
// 🔧 Setup event handlers
private setupEventHandlers(): void {
this.eventStore.registerEventHandler('OnboardingStarted', this.handleEvent.bind(this));
this.eventStore.registerEventHandler('UserCreated', this.handleEvent.bind(this));
this.eventStore.registerEventHandler('OrganizationCreated', this.handleEvent.bind(this));
this.eventStore.registerEventHandler('WelcomeEmailSent', this.handleEvent.bind(this));
this.eventStore.registerEventHandler('UserProfileCompleted', this.handleEvent.bind(this));
this.eventStore.registerEventHandler('OnboardingTaskFailed', this.handleEvent.bind(this));
}
// 🚀 Start new saga
startSaga<T extends BaseSaga>(
sagaType: new (...args: any[]) => T,
correlationId: string,
initialEvent: DomainEvent,
...args: any[]
): void {
const sagaId = this.generateSagaId();
const saga = new sagaType(sagaId, correlationId, this.logger, ...args);
this.sagas.set(correlationId, saga);
this.logger.info('Saga started', {
sagaId,
correlationId,
sagaType: sagaType.name,
initialEvent: initialEvent.eventType,
});
// Handle initial event
saga.handle(initialEvent);
}
// 📨 Handle incoming events
private async handleEvent(event: DomainEvent): Promise<void> {
const correlationId = event.metadata.correlationId;
const saga = this.sagas.get(correlationId);
if (!saga) {
// No saga found for this correlation ID
return;
}
try {
await saga.handle(event);
// Clean up completed or failed sagas
const state = saga.getState();
if (state.status === 'completed' || state.status === 'failed') {
this.sagas.delete(correlationId);
this.logger.info('Saga finished and cleaned up', {
sagaId: state.sagaId,
correlationId,
status: state.status,
duration: Date.now() - new Date(state.startedAt).getTime(),
});
}
} catch (error) {
this.logger.error('Error handling event in saga', {
correlationId,
eventType: event.eventType,
error: error.message,
});
}
}
// 📊 Get saga status
getSagaStatus(correlationId: string): SagaState | null {
const saga = this.sagas.get(correlationId);
return saga ? saga.getState() : null;
}
// 📋 Get all active sagas
getActiveSagas(): SagaState[] {
return Array.from(this.sagas.values()).map(saga => saga.getState());
}
// 🔧 Generate unique saga ID
private generateSagaId(): string {
return `saga_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`;
}
// 🧹 Cleanup expired sagas
async cleanupExpiredSagas(maxAge: number = 24 * 60 * 60 * 1000): Promise<void> {
const now = Date.now();
const expired = [];
for (const [correlationId, saga] of this.sagas.entries()) {
const state = saga.getState();
const age = now - new Date(state.startedAt).getTime();
if (age > maxAge) {
expired.push(correlationId);
}
}
for (const correlationId of expired) {
const saga = this.sagas.get(correlationId)!;
const state = saga.getState();
this.logger.warn('Cleaning up expired saga', {
sagaId: state.sagaId,
correlationId,
age: now - new Date(state.startedAt).getTime(),
});
this.sagas.delete(correlationId);
}
this.logger.info('Saga cleanup completed', {
expiredSagas: expired.length,
activeSagas: this.sagas.size,
});
}
}
Eventos Imutáveis:Nunca altere eventos já salvos. Crie novos eventos para correções.
Versionamento de Eventos:Planeje evolução de esquemas com backward compatibility.
Projections Idempotentes:Garanta que reprocessar eventos não cause inconsistências.
Snapshots para Performance:Use snapshots para aggregates com muitos eventos.