Analytics e Múltiplos Eventos
Consumindo múltiplos eventos
Implementando consumer Kafka com idempotência para múltiplos tipos de evento.
Nesta aula você vai
- Consumir múltiplos tópicos do Kafka no analytics-service
- Processar order.created payment.approved e notification.sent com dispatcher por tipo
- Garantir idempotência usando eventId
Consumindo múltiplos eventos
Objetivos
- Assinar mais de um tópico no Kafka com um único consumer group.
- Rotear cada evento para o handler correto.
- Evitar contagem duplicada quando o mesmo evento for reentregue.
Pré-requisitos
- Aula anterior concluída (
papel-analytics-ecossistema). - Kafka acessível em
kafka:9092. - Dependências
kafkajseioredisinstaladas noanalytics-service.
Conceito
Consumir múltiplos eventos é simples; o difícil é manter consistência. Em sistemas distribuídos, duplicidade acontece. A solução é tratar o eventId como chave de idempotência e registrar processamento em Redis (SETNX com TTL).
Estrutura de arquivos
services/analytics-service/src/
├── consumers/
│ └── kafka-consumer.js
├── handlers/
│ ├── order-created-handler.js
│ ├── payment-approved-handler.js
│ └── notification-sent-handler.js
└── infra/
└── idempotency-store.js
Passo a passo
- Implemente o store de idempotência em
services/analytics-service/src/infra/idempotency-store.js:
import Redis from "ioredis";
const redis = new Redis(process.env.REDIS_URL ?? "redis://redis:6379");
const KEY_PREFIX = "analytics:processed-event:";
const ONE_DAY_SECONDS = 60 * 60 * 24;
export async function tryMarkAsProcessed(eventId) {
const key = `${KEY_PREFIX}${eventId}`;
const wasSet = await redis.set(key, "1", "EX", ONE_DAY_SECONDS, "NX");
return wasSet === "OK";
}
- Configure o consumer Kafka em
services/analytics-service/src/consumers/kafka-consumer.js:
import { Kafka } from "kafkajs";
import { handlersByType } from "./analytics-consumer.js";
import { tryMarkAsProcessed } from "../infra/idempotency-store.js";
const kafka = new Kafka({
clientId: "analytics-service",
brokers: [process.env.KAFKA_BROKER ?? "kafka:9092"],
});
const consumer = kafka.consumer({ groupId: "analytics-group-v1" });
export async function startKafkaConsumer() {
await consumer.connect();
await consumer.subscribe({ topic: "order.events", fromBeginning: false });
await consumer.subscribe({ topic: "payment.events", fromBeginning: false });
await consumer.subscribe({ topic: "notification.events", fromBeginning: false });
await consumer.run({
eachMessage: async ({ message, topic }) => {
const event = JSON.parse(message.value.toString("utf-8"));
const eventId = event.eventId ?? `${topic}-${message.offset}`;
const shouldProcess = await tryMarkAsProcessed(eventId);
if (!shouldProcess) return;
const handler = handlersByType[event.type];
if (!handler) {
console.warn("Evento sem handler", { type: event.type, eventId });
return;
}
await handler(event);
},
});
}
- Inicialize o consumer no bootstrap da aplicação:
import express from "express";
import { startKafkaConsumer } from "./consumers/kafka-consumer.js";
const app = express();
app.get("/health", (_, res) => res.json({ status: "ok" }));
await startKafkaConsumer();
app.listen(8085, () => console.log("analytics-service em 8085"));
Como testar
Publice um evento duplicado e valide que só conta uma vez:
docker compose up -d kafka redis analytics-service
curl -s -X POST http://localhost:8002/orders \
-H "Content-Type: application/json" \
-d '{"customerId":"c-1","amountCents":15000}'
Repita a publicação com mesmo eventId no producer de teste e confirme nos logs do analytics que o segundo processamento foi ignorado.
Dicas de projeto
- Mantenha
groupIdversionado para migrações controladas. - Use DLQ para evento inválido (vamos usar nas matérias 9 e 10).
- Sempre logue
eventId,typeetopic.
Erros comuns
- Usar só
offsetcomo chave de idempotência global. - Fazer
JSON.parsesemtry/catch. - Misturar regras de retry no mesmo arquivo do handler.
Resumo
Você implementou um consumer Kafka robusto para múltiplos eventos com idempotência real baseada em eventId, incluindo notification.sent no pipeline analítico.