Processamento assíncrono sob demanda. APIs, bancos de dados e streams de dados em tempo real
Generators síncronos são poderosos, mas a magia real acontece quando combinamos com async/await. Async generators permitem processar streams infinitos de dados assíncronos: APIs, bancos de dados, WebSockets, arquivos gigantes.
Um async generator é uma função que pode pausar (yield) e aguardar operações assíncronas (await) ao mesmo tempo. Perfeito para processar dados que chegam ao longo do tempo: API calls, DB queries, file reads.
// 🔄 ASYNC GENERATOR BÁSICO: async function*
async function* simpleAsyncGenerator() {
console.log('🚀 Iniciando async generator...');
// ⏰ YIELD + AWAIT: Pausa e aguarda operação assíncrona
yield await new Promise(resolve =>
setTimeout(() => resolve('Primeiro resultado'), 1000)
);
yield await new Promise(resolve =>
setTimeout(() => resolve('Segundo resultado'), 500)
);
yield await new Promise(resolve =>
setTimeout(() => resolve('Terceiro resultado'), 800)
);
console.log('✅ Async generator finalizado');
}
// 🔄 CONSUMINDO com for await...of
async function demonstrateAsyncGenerator() {
console.log('=== DEMONSTRAÇÃO: for await...of ===');
// ⏰ PROCESSA resultados conforme chegam
for await (const result of simpleAsyncGenerator()) {
console.log(`📥 Recebido: ${result} às ${new Date().toLocaleTimeString()}`);
}
console.log('🏁 Processamento finalizado');
}
// 🚀 EXECUTAR demonstração
demonstrateAsyncGenerator();
// 📊 RESULTADO:
// - Cada yield aguarda operação assíncrona
// - for await...of processa conforme dados chegam
// - Controle total sobre timing e fluxo
// - Não bloqueia event loop
// 💡 DIFERENÇA FUNDAMENTAL:
// Generator normal: yield valor sincronamente
// Async generator: yield Promise que resolve assincronamente
Ao invés de fazer uma requisição gigante que demora 30 segundos e pode dar timeout, fazemos várias pequenas requisições conforme necessário. Rate limiting automático e controle total.
// 🌐 API STREAMING: Processa APIs sob demanda
class APIStreamer {
constructor(baseUrl, options = {}) {
this.baseUrl = baseUrl;
this.rateLimit = options.rateLimit || 5; // requests/segundo
this.timeout = options.timeout || 10000; // 10s timeout
this.retries = options.retries || 3;
this.requestCount = 0;
}
// ⏱️ RATE LIMITING: Controla velocidade de requisições
async waitForRateLimit() {
if (this.lastRequest) {
const minInterval = 1000 / this.rateLimit;
const elapsed = Date.now() - this.lastRequest;
if (elapsed < minInterval) {
const waitTime = minInterval - elapsed;
console.log(`⏸️ Rate limit: aguardando ${waitTime}ms`);
await new Promise(resolve => setTimeout(resolve, waitTime));
}
}
this.lastRequest = Date.now();
}
// 🌐 REQUISIÇÃO com retry e timeout
async fetchWithRetry(url) {
let lastError;
for (let attempt = 1; attempt <= this.retries; attempt++) {
try {
await this.waitForRateLimit();
console.log(`🌐 Request #${++this.requestCount}: ${url} (tentativa ${attempt})`);
// Simula requisição HTTP (em produção seria fetch real)
const response = await this.simulateAPICall(url);
if (response.ok) {
return response.data;
} else {
throw new Error(`HTTP ${response.status}: ${response.error}`);
}
} catch (error) {
lastError = error;
console.log(`❌ Tentativa ${attempt} falhou: ${error.message}`);
if (attempt < this.retries) {
const backoff = Math.pow(2, attempt) * 1000; // Exponential backoff
console.log(`⏰ Aguardando ${backoff}ms antes de tentar novamente...`);
await new Promise(resolve => setTimeout(resolve, backoff));
}
}
}
throw new Error(`Falha após ${this.retries} tentativas: ${lastError.message}`);
}
// 🎭 SIMULA API call (substitua por fetch real)
async simulateAPICall(url) {
// Simula latência da rede
await new Promise(resolve => setTimeout(resolve, Math.random() * 1000 + 200));
// Simula ocasionais falhas de rede (5% chance)
if (Math.random() < 0.05) {
throw new Error('Network timeout');
}
// Simula resposta da API
const page = parseInt(url.match(/page=(\d+)/)?.[1] || '1');
const limit = parseInt(url.match(/limit=(\d+)/)?.[1] || '10');
return {
ok: true,
data: {
page,
limit,
users: Array.from({ length: limit }, (_, i) => ({
id: (page - 1) * limit + i + 1,
name: `User ${(page - 1) * limit + i + 1}`,
email: `user${(page - 1) * limit + i + 1}@api.com`,
fetchedAt: new Date().toISOString()
})),
hasNext: page < 10 // Simula 10 páginas total
}
};
}
// 🔄 STREAM de usuários de API paginada
async* streamUsers(options = {}) {
const { startPage = 1, limit = 50, filter = null } = options;
let currentPage = startPage;
let hasNextPage = true;
console.log(`🚀 Iniciando stream de usuários (página ${startPage}, ${limit}/página)`);
while (hasNextPage) {
try {
// 🌐 FETCH página atual
const url = `${this.baseUrl}/users?page=${currentPage}&limit=${limit}`;
const response = await this.fetchWithRetry(url);
console.log(`📦 Página ${currentPage}: ${response.users.length} usuários`);
// 🔄 YIELD cada usuário individualmente
for (const user of response.users) {
// 🔍 FILTRO opcional
if (!filter || filter(user)) {
yield user;
}
}
// 📄 PRÓXIMA página
hasNextPage = response.hasNext;
currentPage++;
// ✅ ESTATÍSTICAS
console.log(`📊 Stream: página ${currentPage - 1}, total requests: ${this.requestCount}`);
} catch (error) {
console.error(`❌ Erro ao buscar página ${currentPage}: ${error.message}`);
// Decide se continua ou para baseado no tipo de erro
if (error.message.includes('timeout')) {
console.log('⏭️ Continuando com próxima página...');
currentPage++;
} else {
throw error; // Erro fatal, para o stream
}
}
}
console.log('🏁 Stream de API finalizado');
}
// 📊 ESTATÍSTICAS do stream
getStats() {
return {
requestCount: this.requestCount,
rateLimit: this.rateLimit,
avgRequestTime: this.lastRequest ? Date.now() - this.lastRequest : 0
};
}
}
// 🚀 EXEMPLO DE USO: Processando API em stream
async function apiStreamExample() {
console.log('🌐 DEMONSTRAÇÃO: API Streaming');
console.log('=' .repeat(40));
// 🔧 CONFIGURAÇÃO do streamer
const streamer = new APIStreamer('https://api.exemplo.com', {
rateLimit: 3, // 3 requests/segundo
timeout: 5000,
retries: 2
});
// 🔍 FILTRO: Só usuários com ID par
const evenUsersFilter = user => user.id % 2 === 0;
let processedCount = 0;
let evenCount = 0;
try {
// 🔄 PROCESSA stream com filtro
for await (const user of streamer.streamUsers({
startPage: 1,
limit: 20,
filter: evenUsersFilter
})) {
processedCount++;
if (user.id % 2 === 0) evenCount++;
console.log(`👤 Processando: ${user.name} (ID: ${user.id})`);
// 🎯 PROCESSAMENTO do usuário
// Aqui você faria: salvar no banco, enviar email, etc.
// 🛑 DEMO: Para após 50 usuários processados
if (processedCount >= 50) {
console.log('\n⏸️ Demo finalizada - stream funcionando perfeitamente!');
break;
}
}
} catch (error) {
console.error('❌ Erro no stream:', error.message);
} finally {
// 📊 RELATÓRIO
const stats = streamer.getStats();
console.log('\n📊 ESTATÍSTICAS DO STREAM:');
console.log(`📦 Usuários processados: ${processedCount}`);
console.log(`👥 Usuários pares: ${evenCount}`);
console.log(`🌐 Total requests: ${stats.requestCount}`);
console.log(`⚡ Rate limit: ${stats.rateLimit} req/sec`);
}
}
// 🚀 EXECUTAR exemplo
apiStreamExample();
// 💡 VANTAGENS DO API STREAMING:
//
// ✅ CONTROLE TOTAL:
// - Rate limiting automático
// - Retry com exponential backoff
// - Timeout configurável
// - Filtros aplicados sob demanda
//
// ✅ EFICIÊNCIA:
// - Só faz requests necessários
// - Para imediatamente quando encontra o que precisa
// - Não sobrecarrega APIs
// - Memory usage constante
//
// ✅ ROBUSTEZ:
// - Lida com falhas de rede
// - Não para por um erro
// - Métricas de monitoramento
// - Graceful degradation
Processar milhões de registros do banco sem explodir memória. Cursors automáticos, batch processing e controle de transações para operações que podem durar horas.
// 💾 DATABASE STREAMING: Processa milhões de registros
class DatabaseStreamer {
constructor(connection, options = {}) {
this.connection = connection;
this.batchSize = options.batchSize || 1000;
this.timeout = options.timeout || 30000;
this.maxRetries = options.maxRetries || 3;
this.processedCount = 0;
this.errorCount = 0;
}
// 🔄 STREAM de registros com cursor automático
async* streamRecords(query, params = []) {
let cursor = null;
let hasMore = true;
let batchNumber = 0;
console.log(`💾 Iniciando stream DB: "${query.substring(0, 50)}..."`);
console.log(`📦 Batch size: ${this.batchSize}`);
try {
while (hasMore) {
batchNumber++;
console.log(`\n📦 Processando batch #${batchNumber}...`);
// 🔍 QUERY com cursor/offset
const batchQuery = this.buildCursorQuery(query, cursor);
const startTime = Date.now();
// 💾 EXECUTA query com timeout
const records = await this.executeQueryWithTimeout(batchQuery, params);
const queryTime = Date.now() - startTime;
console.log(`⚡ Query executada em ${queryTime}ms: ${records.length} registros`);
if (records.length === 0) {
hasMore = false;
console.log('✅ Não há mais registros para processar');
break;
}
// 🔄 YIELD cada registro individualmente
for (const record of records) {
try {
// 🔄 YIELD com metadata útil
yield {
...record,
_meta: {
batchNumber,
batchPosition: records.indexOf(record),
totalProcessed: ++this.processedCount,
fetchedAt: new Date().toISOString()
}
};
} catch (error) {
this.errorCount++;
console.error(`❌ Erro ao processar registro ${record.id}: ${error.message}`);
}
}
// 📍 ATUALIZA cursor para próximo batch
cursor = this.extractCursor(records);
hasMore = records.length === this.batchSize; // Se retornou menos, acabou
// 📊 ESTATÍSTICAS do batch
console.log(`✅ Batch #${batchNumber} processado: ${records.length} registros`);
console.log(`📊 Total acumulado: ${this.processedCount} registros`);
// ⏸️ PAUSA entre batches para não sobrecarregar DB
if (hasMore) {
await new Promise(resolve => setTimeout(resolve, 100));
}
}
} catch (error) {
console.error('❌ Erro crítico no stream DB:', error.message);
throw error;
} finally {
console.log(`\n🏁 Stream DB finalizado: ${this.processedCount} registros, ${this.errorCount} erros`);
}
}
// 🔍 CONSTRÓI query com cursor
buildCursorQuery(baseQuery, cursor) {
if (!cursor) {
// Primeira query - adiciona ORDER BY e LIMIT
return `${baseQuery} ORDER BY id ASC LIMIT ${this.batchSize}`;
} else {
// Queries subsequentes - adiciona WHERE com cursor
return `${baseQuery} AND id > ${cursor} ORDER BY id ASC LIMIT ${this.batchSize}`;
}
}
// 📍 EXTRAI cursor do último registro
extractCursor(records) {
return records.length > 0 ? records[records.length - 1].id : null;
}
// ⏱️ EXECUTA query com timeout
async executeQueryWithTimeout(query, params) {
return new Promise(async (resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(new Error(`Query timeout após ${this.timeout}ms`));
}, this.timeout);
try {
// 🎭 SIMULA query do banco (substitua por query real)
const result = await this.simulateDbQuery(query, params);
clearTimeout(timeoutId);
resolve(result);
} catch (error) {
clearTimeout(timeoutId);
reject(error);
}
});
}
// 🎭 SIMULA query do banco (substitua por implementação real)
async simulateDbQuery(query, params) {
// Simula latência do banco
await new Promise(resolve => setTimeout(resolve, Math.random() * 200 + 50));
// Extrai cursor da query
const cursorMatch = query.match(/id > (\d+)/);
const startId = cursorMatch ? parseInt(cursorMatch[1]) + 1 : 1;
// Simula registros do banco
const records = [];
for (let i = 0; i < this.batchSize && startId + i <= 50000; i++) {
const id = startId + i;
records.push({
id,
name: `Record ${id}`,
email: `record${id}@db.com`,
created_at: new Date(Date.now() - Math.random() * 365 * 24 * 60 * 60 * 1000),
data: `Sample data for record ${id}`.repeat(Math.floor(Math.random() * 5) + 1)
});
}
return records;
}
// 📊 ESTATÍSTICAS do processamento
getStats() {
return {
processedCount: this.processedCount,
errorCount: this.errorCount,
successRate: this.processedCount / (this.processedCount + this.errorCount) * 100,
batchSize: this.batchSize
};
}
}
// 🚀 EXEMPLO: Processamento massivo de dados
async function databaseStreamExample() {
console.log('💾 DEMONSTRAÇÃO: Database Streaming');
console.log('=' .repeat(50));
// 🔧 SETUP do streamer
const dbStreamer = new DatabaseStreamer(null, {
batchSize: 500, // 500 registros por batch
timeout: 10000, // 10s timeout por query
maxRetries: 2
});
// 📝 QUERY para buscar usuários ativos
const query = `
SELECT id, name, email, created_at, data
FROM users
WHERE active = true
`;
let processedUsers = 0;
let recentUsers = 0;
const oneYearAgo = Date.now() - 365 * 24 * 60 * 60 * 1000;
try {
console.log('🔄 Iniciando processamento de usuários ativos...');
// 🔄 PROCESSA cada registro conforme chega
for await (const user of dbStreamer.streamRecords(query)) {
processedUsers++;
// 📊 ANÁLISE: Usuários recentes (último ano)
if (new Date(user.created_at).getTime() > oneYearAgo) {
recentUsers++;
}
// 🔧 PROCESSAMENTO personalizado
if (user.id % 1000 === 0) {
console.log(`🎯 Marco: ${user.name} (ID: ${user.id}) - Total: ${user._meta.totalProcessed}`);
}
// 🎯 AQUI: Faria processamento real
// - Enviar email de marketing
// - Atualizar cache
// - Gerar relatórios
// - Sync com sistema externo
// 🛑 DEMO: Para após 5000 usuários
if (processedUsers >= 5000) {
console.log('\n⏸️ Demo finalizada - processamento funcionando!');
break;
}
}
} catch (error) {
console.error('❌ Erro no processamento:', error.message);
} finally {
// 📊 RELATÓRIO FINAL
const stats = dbStreamer.getStats();
console.log('\n📊 RELATÓRIO FINAL:');
console.log(`👥 Usuários processados: ${processedUsers}`);
console.log(`🆕 Usuários recentes: ${recentUsers} (${(recentUsers/processedUsers*100).toFixed(1)}%)`);
console.log(`✅ Taxa de sucesso: ${stats.successRate.toFixed(2)}%`);
console.log(`⚡ Batch size: ${stats.batchSize}`);
console.log(`❌ Erros: ${stats.errorCount}`);
}
}
// 🚀 EXECUTAR exemplo
databaseStreamExample();
// 💡 VANTAGENS DO DB STREAMING:
//
// ✅ MEMÓRIA CONTROLADA:
// - Nunca carrega mais que ${batchSize} registros
// - Memory usage constante independente do tamanho da tabela
// - Pode processar bilhões de registros
//
// ✅ PERFORMANCE:
// - Cursor automático (sem OFFSET caro)
// - Queries eficientes com índices
// - Timeout para queries lentas
// - Pausas entre batches
//
// ✅ ROBUSTEZ:
// - Continua de onde parou em caso de erro
// - Estatísticas em tempo real
// - Graceful handling de timeouts
// - Monitoring de progresso
//
// ✅ FLEXIBILIDADE:
// - Batch size configurável
// - Filtros aplicados sob demanda
// - Processamento personalizado por registro
// - Early termination quando necessário
WebSockets + Async Generators = streaming de dados em tempo real. Chat messages, notificações, atualizações de preços, logs - tudo processado sob demanda conforme chega.
// 🔌 WEBSOCKET STREAMING: Dados em tempo real
class WebSocketStreamer {
constructor(url, options = {}) {
this.url = url;
this.options = options;
this.ws = null;
this.messageQueue = [];
this.connected = false;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
this.reconnectDelay = options.reconnectDelay || 1000;
}
// 🔌 CONECTA ao WebSocket
async connect() {
return new Promise((resolve, reject) => {
console.log(`🔌 Conectando ao WebSocket: ${this.url}`);
// 🎭 SIMULA WebSocket (substitua por WebSocket real)
this.ws = this.createMockWebSocket();
this.ws.onopen = () => {
console.log('✅ WebSocket conectado');
this.connected = true;
this.reconnectAttempts = 0;
resolve();
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
this.messageQueue.push({
...message,
receivedAt: Date.now()
});
};
this.ws.onclose = () => {
console.log('🔌 WebSocket desconectado');
this.connected = false;
this.attemptReconnect();
};
this.ws.onerror = (error) => {
console.error('❌ Erro no WebSocket:', error);
reject(error);
};
});
}
// 🔄 RECONNECT automático
async attemptReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('❌ Max tentativas de reconexão atingidas');
return;
}
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
console.log(`🔄 Tentativa de reconexão #${this.reconnectAttempts} em ${delay}ms`);
setTimeout(() => {
this.connect().catch(error => {
console.error('❌ Falha na reconexão:', error);
});
}, delay);
}
// 🎭 MOCK WebSocket para demonstração
createMockWebSocket() {
const mockWs = {
onopen: null,
onmessage: null,
onclose: null,
onerror: null,
readyState: 1
};
// Simula mensagens chegando
setTimeout(() => mockWs.onopen?.(), 100);
// Gera mensagens de exemplo
let messageId = 1;
const messageTypes = ['chat', 'notification', 'price_update', 'log'];
const sendMessage = () => {
if (mockWs.readyState === 1) {
const type = messageTypes[Math.floor(Math.random() * messageTypes.length)];
const message = this.generateMockMessage(type, messageId++);
mockWs.onmessage?.({
data: JSON.stringify(message)
});
// Próxima mensagem em 100-2000ms
setTimeout(sendMessage, Math.random() * 1900 + 100);
}
};
setTimeout(sendMessage, 500);
return mockWs;
}
// 🎭 GERA mensagens de exemplo
generateMockMessage(type, id) {
const messages = {
chat: {
type: 'chat',
id,
user: `User${Math.floor(Math.random() * 100)}`,
message: [
'Hello everyone!',
'How is everyone doing?',
'Great weather today!',
'Anyone working on cool projects?',
'Just deployed a new feature!'
][Math.floor(Math.random() * 5)],
timestamp: Date.now()
},
notification: {
type: 'notification',
id,
title: 'New Update Available',
body: 'Version 2.1.0 is now available for download',
priority: ['low', 'medium', 'high'][Math.floor(Math.random() * 3)],
timestamp: Date.now()
},
price_update: {
type: 'price_update',
id,
symbol: ['BTCUSD', 'ETHUSD', 'AAPL', 'GOOGL'][Math.floor(Math.random() * 4)],
price: (Math.random() * 1000 + 100).toFixed(2),
change: (Math.random() * 20 - 10).toFixed(2),
timestamp: Date.now()
},
log: {
type: 'log',
id,
level: ['info', 'warn', 'error', 'debug'][Math.floor(Math.random() * 4)],
message: `System message #${id}`,
service: ['api', 'auth', 'db', 'cache'][Math.floor(Math.random() * 4)],
timestamp: Date.now()
}
};
return messages[type];
}
// 🔄 STREAM de mensagens em tempo real
async* streamMessages(filter = null) {
if (!this.connected) {
await this.connect();
}
console.log('📡 Iniciando stream de mensagens em tempo real...');
let processedCount = 0;
let lastHeartbeat = Date.now();
while (this.connected || this.messageQueue.length > 0) {
// 📥 PROCESSA mensagens da queue
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
// 🔍 APLICA filtro se especificado
if (!filter || filter(message)) {
processedCount++;
yield {
...message,
_meta: {
processedCount,
queueSize: this.messageQueue.length,
connected: this.connected,
latency: Date.now() - message.receivedAt
}
};
}
}
// 💓 HEARTBEAT a cada 5 segundos
if (Date.now() - lastHeartbeat > 5000) {
console.log(`💓 Stream ativo: ${processedCount} mensagens processadas, queue: ${this.messageQueue.length}`);
lastHeartbeat = Date.now();
}
// ⏸️ Aguarda novas mensagens
await new Promise(resolve => setTimeout(resolve, 50));
}
console.log('🏁 Stream de WebSocket finalizado');
}
// 🔌 DESCONECTA
disconnect() {
if (this.ws) {
this.ws.close();
this.connected = false;
}
}
}
// 🚀 EXEMPLO: Stream de dados em tempo real
async function webSocketStreamExample() {
console.log('🔌 DEMONSTRAÇÃO: WebSocket Streaming');
console.log('=' .repeat(45));
const streamer = new WebSocketStreamer('wss://api.exemplo.com/stream', {
maxReconnectAttempts: 3,
reconnectDelay: 1000
});
// 🔍 FILTROS específicos
const chatFilter = msg => msg.type === 'chat';
const highPriorityFilter = msg => msg.priority === 'high' || msg.type === 'price_update';
let processedMessages = 0;
let chatMessages = 0;
let priceUpdates = 0;
try {
console.log('🔄 Iniciando stream de mensagens...');
// 🔄 PROCESSA mensagens conforme chegam
for await (const message of streamer.streamMessages()) {
processedMessages++;
// 📊 ESTATÍSTICAS por tipo
switch (message.type) {
case 'chat':
chatMessages++;
console.log(`💬 Chat: ${message.user}: ${message.message}`);
break;
case 'price_update':
priceUpdates++;
console.log(`📈 Price: ${message.symbol} = $${message.price} (${message.change})`);
break;
case 'notification':
if (message.priority === 'high') {
console.log(`🚨 High Priority: ${message.title}`);
}
break;
case 'log':
if (message.level === 'error') {
console.log(`❌ Error Log: ${message.message} [${message.service}]`);
}
break;
}
// 📊 ESTATÍSTICAS periódicas
if (processedMessages % 20 === 0) {
console.log(`\n📊 Estatísticas: ${processedMessages} mensagens, queue: ${message._meta.queueSize}`);
console.log(`💬 Chats: ${chatMessages}, 📈 Preços: ${priceUpdates}`);
console.log(`⚡ Latência: ${message._meta.latency}ms\n`);
}
// 🛑 DEMO: Para após 100 mensagens
if (processedMessages >= 100) {
console.log('\n⏸️ Demo finalizada - stream em tempo real funcionando!');
break;
}
}
} catch (error) {
console.error('❌ Erro no stream:', error);
} finally {
streamer.disconnect();
// 📊 RELATÓRIO FINAL
console.log('\n📊 RELATÓRIO FINAL:');
console.log(`📨 Total mensagens: ${processedMessages}`);
console.log(`💬 Mensagens de chat: ${chatMessages}`);
console.log(`📈 Atualizações de preço: ${priceUpdates}`);
}
}
// 🚀 EXECUTAR exemplo
webSocketStreamExample();
// 💡 VANTAGENS DO WEBSOCKET STREAMING:
//
// ✅ TEMPO REAL:
// - Processa dados conforme chegam
// - Latência mínima
// - Não perde mensagens
// - Queue automática
//
// ✅ ROBUSTEZ:
// - Reconexão automática
// - Exponential backoff
// - Error handling
// - Heartbeat monitoring
//
// ✅ FLEXIBILIDADE:
// - Filtros aplicados sob demanda
// - Múltiplos tipos de mensagem
// - Processamento personalizado
// - Estatísticas em tempo real
//
// ✅ ESCALABILIDADE:
// - Memory usage constante
// - Não bloqueia event loop
// - Múltiplos consumers
// - Rate limiting natural
Agora você domina processamento assíncrono em tempo real com controle total sobre recursos!