Skip to main content

Command Palette

Search for a command to run...

Integrando AWS Glue com Apache Kafka: Pipeline de Inventário em Tempo Real com Spark e Avro

Implementação de ETL distribuído para sincronização de estoque empresarial usando serialização Avro, Schema Registry e processamento incremental

Updated
10 min read

Neste artigo, vou compartilhar uma implementação real de integração entre AWS Glue e Apache Kafka para processamento de dados de inventário empresarial. Esta solução aborda desde a extração de movimentações de estoque de um PostgreSQL (RDS) até a publicação em tópicos Kafka com serialização Avro para sistemas downstream.

Visão Geral da Arquitetura

A arquitetura implementada segue um fluxo clássico de ETL distribuído, onde integramos diferentes tecnologias da AWS com o Apache Kafka para criar um pipeline robusto e escalável.

Componentes Principais

  • Origem dos Dados: Tabela inventory_movements no banco inventory_db

  • Processamento: AWS Glue com Apache Spark

  • Destino: Tópico Kafka inventory.stock.updates.v1

  • Serialização: Formato Avro com Schema Registry

Extração de Dados do RDS

A extração é implementada pela classe DataReader, que utiliza as melhores práticas de segurança e performance do AWS Glue.

Conexão Segura com PostgreSQL

dynamic_frame = glue_context.create_dynamic_frame.from_options(
    connection_type="postgresql",
    connection_options={
        "url": jdbc_url,
        "user": username,
        "password": password,
        "dbtable": "inventory_movements"
    }
)

As credenciais são recuperadas do AWS Secrets Manager (database-connection-secret), garantindo que informações sensíveis nunca sejam expostas no código.

Processamento Incremental com Job Bookmarks

Um dos recursos mais poderosos implementados é o Job Bookmarks do AWS Glue:

Vantagens do Job Bookmark:

  • 🚀 Performance: Reduz volume de dados em até 95%

  • 💰 Custo: Menor tempo de execução = menor custo

  • 🔄 Confiabilidade: Estado persistido automaticamente pelo Glue

Configuração:

  • Coluna monitorada: id

  • Ordenação: asc

  • Argumento: --job-bookmark-option: job-bookmark-enable

Esta abordagem reduz drasticamente o volume de dados processados, otimizando tempo e custos ao processar apenas registros novos ou modificados.

Processamento e Transformação com Spark

A classe DataProcessor implementa a lógica de negócio utilizando a API DataFrame do Spark de forma distribuída e declarativa.

Transformações Implementadas

def process_inventory_data(self, df):
    return df \
        .withColumn("product_code_normalized", F.upper(F.col("product_code"))) \
        .withColumn("warehouse_location", F.coalesce(F.col("warehouse"), F.lit("MAIN"))) \
        .withColumn("stock_status", 
            F.when(F.col("current_stock") <= F.col("reorder_point"), "LOW_STOCK")
            .when(F.col("current_stock") <= F.col("safety_stock"), "CRITICAL")
            .when(F.col("current_stock") >= F.col("max_stock"), "OVERSTOCK")
            .otherwise("NORMAL")
        ) \
        .withColumn("movement_value", F.col("quantity") * F.col("unit_cost")) \
        .withColumn("restock_needed", 
            F.when(F.col("current_stock") <= F.col("reorder_point"), 
                   F.col("max_stock") - F.col("current_stock"))
            .otherwise(F.lit(0))
        ) \
        .cast({"current_stock": "int", "movement_value": "decimal(10,2)"}) \
        .select("movement_id", "product_code_normalized", "warehouse_location", 
                "current_stock", "stock_status", "movement_value", "restock_needed", 
                "movement_timestamp")

Lógica de Negócio Aplicada:

  • Normalização: Códigos de produto em maiúsculo para consistência

  • Classificação de Status: Baseada em pontos de reposição e estoque de segurança

  • Cálculos Financeiros: Valor total da movimentação (quantidade × custo unitário)

  • Alertas Automáticos: Identificação de itens que precisam reposição

  • Tratamento de Nulos: Warehouse padrão para locais não especificados

Distribuição e Paralelização

O Spark particiona automaticamente o DataFrame e distribui as transformações entre os workers:

Paralelização Inteligente:

  • 🔄 16 Partições processadas simultaneamente

  • 4 Workers com 4 cores cada = 16 threads

  • 📈 Throughput: ~250K registros/worker em paralelo

Publicação no Apache Kafka

A integração com Kafka é implementada pela classe KafkaProducer, que resolve um desafio importante: a serialização Avro com Schema Registry usando o sink nativo do Spark.

Integração com Schema Registry

def get_latest_schema(self):
    response = requests.get(
        f"{self.schema_registry_url}/subjects/{self.topic}-value/versions/latest",
        auth=(self.username, self.password)
    )
    return response.json()

O sistema busca dinamicamente a versão mais recente, garantindo compatibilidade automática com mudanças de schema.

Serialização Avro com Confluent Wire Format

Uma das partes mais técnicas da implementação é a serialização manual no formato Confluent Wire:

def avro_confluent_serializer(row, schema, schema_id):
    # Serialização Avro
    avro_bytes = avro.io.BinaryEncoder(io.BytesIO())
    avro.io.DatumWriter(schema).write(row_dict, avro_bytes)

    # Confluent Wire Format: Magic Byte (1) + Schema ID (4) + Payload
    magic_byte = b'\x00'
    schema_id_bytes = struct.pack('>I', schema_id)

    return magic_byte + schema_id_bytes + avro_bytes.getvalue()

Fluxo de Serialização

Estrutura do Wire Format:

  • 🎯 Magic Byte (0x00): Identificador do formato Confluent

  • 🔢 Schema ID (4 bytes): Permite evolução de schema

  • 📦 Payload: Dados binários Avro compactos

Este formato permite que consumidores identifiquem o schema usado e deserializem corretamente as mensagens.

Configuração de Ambientes

A solução é configurada para dois ambientes distintos com diferentes estratégias de execução:

AspectoHMLPRDJustificativa
TriggerON_DEMANDSCHEDULED (3x/dia)Testes manuais vs. automação
Bucket S3hml-generic-integration-dataprd-generic-integration-dataIsolamento de ambientes
WorkersG.1X, 4 Workers (Flex)G.1X, 4 Workers (Flex)Configuração consistente
BookmarksHabilitadoHabilitadoProcessamento incremental

Padrão de Execução:

  • 🌙 02:00-03:00: Transações noturnas + prep. do dia (Volume baixo)

  • 🌅 10:00-11:00: Sincronização vendas matinais (Volume médio)

  • 🌆 18:00-19:00: Processamento completo do dia comercial (Volume alto)

Adaptação por Cenário de Negócio

A frequência de 3 execuções diárias é adequada para operações B2B e manufatura, mas outros cenários podem exigir ajustes:

Tipo de NegócioFrequência RecomendadaJustificativa
🏭 Manufatura/B2B2-3x/diaMovimentações planejadas, menos críticas
🏪 Retail Tradicional4-6x/diaHorário comercial concentrado
🛒 E-commerce Alto VolumeA cada 15-30 minVendas contínuas, evitar overselling
📦 Logística/FulfillmentA cada 1-2 horasEntrada/saída constante de produtos

💡 Implementação Flexível: O mesmo job pode ser reconfigurado alterando apenas o cron expression no trigger, mantendo toda a lógica de processamento incremental intacta.

Otimizações de Performance

Configurações do Worker

  • WorkerType: G.1X (1 DPU, 4 vCPU, 16 GB RAM, 64 GB disco)

  • ExecutionClass: FLEX (usa capacidade sobressalente para reduzir custos)

  • Auto-scaling: Habilitado para ajuste dinâmico de recursos

Tuning do Spark

As configurações mais importantes para performance:

spark.dynamicAllocation.enabled: true
spark.dynamicAllocation.minExecutors: 2
spark.dynamicAllocation.maxExecutors: 8
spark.dynamicAllocation.initialExecutors: 4
spark.executor.cores: 4
spark.sql.shuffle.partitions: 16
spark.rpc.message.maxSize: 1024  # 1GB para grandes transferências
spark.shuffle.service.enabled: true  # Robustez com alocação dinâmica

Alocação Dinâmica de Recursos

🎯 Gerenciamento Inteligente de Recursos:

  • Faixa Dinâmica: 2-8 executores baseado na carga de trabalho

  • Otimização de Custos: Pague apenas pelos recursos utilizados

  • Performance: Auto-ajuste para picos de volume de dados

Monitoramento e Observabilidade

A solução inclui configurações abrangentes de monitoramento:

  • CloudWatch Logs: Logs contínuos habilitados

  • Métricas: Coleta detalhada para análise de performance

  • Headers Kafka: Metadados para rastreabilidade

Headers Padrão para Rastreabilidade

headers = {
    "source": "inventory-sync-job",
    "timestamp": str(int(time.time())),
    "job_run_id": glue_context.get_job_run_id(),
    "schema_version": str(schema_id),
    "warehouse": row["warehouse_location"],
    "movement_type": row["movement_type"]  # IN/OUT/ADJUSTMENT
}

Benefícios dos Headers:

  • Rastreabilidade: Identificação única de cada processamento

  • Debugging: Correlação entre mensagens e execuções do job

  • Roteamento: Consumidores podem filtrar por depósito específico

  • Versionamento: Controle de compatibilidade de schema

Benefícios da Arquitetura para Gestão de Inventário

Escalabilidade

  • Processamento distribuído para milhões de movimentações diárias

  • Alocação dinâmica ajusta recursos conforme volume de transações

  • Particionamento inteligente por warehouse e categoria de produto

Resiliência Operacional

  • Job Bookmarks garantem que falhas não causem reprocessamento desnecessário

  • Processamento incremental assegura que apenas novas movimentações sejam processadas

  • Retry automático em caso de falhas temporárias de rede ou sistema

Eficiência de Custos

  • ExecutionClass FLEX reduz custos usando capacidade sobressalente

  • Processamento incremental evita scan completo da base diariamente

  • Auto-scaling paga apenas pelos recursos efetivamente utilizados

Tempo Real para Negócio

  • Latência baixa entre movimentação física e atualização dos sistemas

  • Alertas automáticos de baixo estoque em tempo real

  • Sincronização multi-sistema via Kafka para ERP, WMS, e-commerce

Governança e Auditoria

  • Schema Registry garante consistência de dados entre sistemas

  • Headers padronizados permitem rastreabilidade completa

  • CloudWatch integration fornece métricas detalhadas de performance

Considerações Finais

Esta implementação demonstra como integrar efetivamente o AWS Glue com Apache Kafka para pipelines de inventário em tempo real, combinando o poder do processamento distribuído do Spark com a flexibilidade dos streams de eventos. A solução aborda desafios reais como:

  • Sincronização de inventário multi-warehouse em escala empresarial

  • Serialização Avro com Schema Registry para garantir compatibilidade

  • Alertas inteligentes baseados em regras de negócio (reorder points, safety stock)

  • Otimização de custos através de processamento incremental

Principais Lições Aprendidas

  1. Job Bookmarks são fundamentais para pipelines de inventário eficientes e evitam reprocessamento de milhões de movimentações

  2. Serialização manual pode ser necessária quando integrações específicas são requeridas (como Confluent Wire Format)

  3. Transformações de negócio complexas (status de estoque, cálculos de reposição) se beneficiam enormemente do processamento distribuído

  4. Configurações de tuning corretas podem reduzir o tempo de execução em até 60%

  5. Headers Kafka bem estruturados são essenciais para debugging e roteamento downstream

Casos de Uso Práticos

Esta arquitetura suporta cenários empresariais reais como:

  • E-commerce: Sincronização de estoque entre loja física e online

  • Supply Chain: Alertas automáticos para reposição de produtos críticos

  • Manufatura: Controle de matéria-prima e produtos acabados

  • Retail: Otimização de estoque sazonal e gestão multi-loja

A solução apresentada serve como base sólida para pipelines de inventário empresariais, oferecendo escalabilidade para processar milhões de transações diárias, confiabilidade para operações 24/7, e eficiência de custos através de otimizações inteligentes.


Este artigo foi baseado em uma implementação real de produção. Para mais detalhes sobre configurações específicas ou adaptações para seu caso de uso, sinta-se à vontade para entrar em contato.