Analytics e Múltiplos Eventos

Consumindo múltiplos eventos

Implementando consumer Kafka com idempotência para múltiplos tipos de evento.

Avançado 40 min 35 pontos Leitura 0%

Nesta aula você vai

  • Consumir múltiplos tópicos do Kafka no analytics-service
  • Processar order.created payment.approved e notification.sent com dispatcher por tipo
  • Garantir idempotência usando eventId

Consumindo múltiplos eventos

Objetivos

  • Assinar mais de um tópico no Kafka com um único consumer group.
  • Rotear cada evento para o handler correto.
  • Evitar contagem duplicada quando o mesmo evento for reentregue.

Pré-requisitos

  • Aula anterior concluída (papel-analytics-ecossistema).
  • Kafka acessível em kafka:9092.
  • Dependências kafkajs e ioredis instaladas no analytics-service.

Conceito

Consumir múltiplos eventos é simples; o difícil é manter consistência. Em sistemas distribuídos, duplicidade acontece. A solução é tratar o eventId como chave de idempotência e registrar processamento em Redis (SETNX com TTL).

Estrutura de arquivos

services/analytics-service/src/
├── consumers/
│   └── kafka-consumer.js
├── handlers/
│   ├── order-created-handler.js
│   ├── payment-approved-handler.js
│   └── notification-sent-handler.js
└── infra/
    └── idempotency-store.js

Passo a passo

  1. Implemente o store de idempotência em services/analytics-service/src/infra/idempotency-store.js:
import Redis from "ioredis";

const redis = new Redis(process.env.REDIS_URL ?? "redis://redis:6379");
const KEY_PREFIX = "analytics:processed-event:";
const ONE_DAY_SECONDS = 60 * 60 * 24;

export async function tryMarkAsProcessed(eventId) {
  const key = `${KEY_PREFIX}${eventId}`;
  const wasSet = await redis.set(key, "1", "EX", ONE_DAY_SECONDS, "NX");
  return wasSet === "OK";
}
  1. Configure o consumer Kafka em services/analytics-service/src/consumers/kafka-consumer.js:
import { Kafka } from "kafkajs";
import { handlersByType } from "./analytics-consumer.js";
import { tryMarkAsProcessed } from "../infra/idempotency-store.js";

const kafka = new Kafka({
  clientId: "analytics-service",
  brokers: [process.env.KAFKA_BROKER ?? "kafka:9092"],
});

const consumer = kafka.consumer({ groupId: "analytics-group-v1" });

export async function startKafkaConsumer() {
  await consumer.connect();
  await consumer.subscribe({ topic: "order.events", fromBeginning: false });
  await consumer.subscribe({ topic: "payment.events", fromBeginning: false });
  await consumer.subscribe({ topic: "notification.events", fromBeginning: false });

  await consumer.run({
    eachMessage: async ({ message, topic }) => {
      const event = JSON.parse(message.value.toString("utf-8"));
      const eventId = event.eventId ?? `${topic}-${message.offset}`;
      const shouldProcess = await tryMarkAsProcessed(eventId);
      if (!shouldProcess) return;

      const handler = handlersByType[event.type];
      if (!handler) {
        console.warn("Evento sem handler", { type: event.type, eventId });
        return;
      }
      await handler(event);
    },
  });
}
  1. Inicialize o consumer no bootstrap da aplicação:
import express from "express";
import { startKafkaConsumer } from "./consumers/kafka-consumer.js";

const app = express();
app.get("/health", (_, res) => res.json({ status: "ok" }));

await startKafkaConsumer();
app.listen(8085, () => console.log("analytics-service em 8085"));

Como testar

Publice um evento duplicado e valide que só conta uma vez:

docker compose up -d kafka redis analytics-service
curl -s -X POST http://localhost:8002/orders \
  -H "Content-Type: application/json" \
  -d '{"customerId":"c-1","amountCents":15000}'

Repita a publicação com mesmo eventId no producer de teste e confirme nos logs do analytics que o segundo processamento foi ignorado.

Dicas de projeto

  • Mantenha groupId versionado para migrações controladas.
  • Use DLQ para evento inválido (vamos usar nas matérias 9 e 10).
  • Sempre logue eventId, type e topic.

Erros comuns

  • Usar só offset como chave de idempotência global.
  • Fazer JSON.parse sem try/catch.
  • Misturar regras de retry no mesmo arquivo do handler.

Resumo

Você implementou um consumer Kafka robusto para múltiplos eventos com idempotência real baseada em eventId, incluindo notification.sent no pipeline analítico.