Integrando AWS Glue com Apache Kafka: Pipeline de Inventário em Tempo Real com Spark e Avro
Implementação de ETL distribuído para sincronização de estoque empresarial usando serialização Avro, Schema Registry e processamento incremental
Neste artigo, vou compartilhar uma implementação real de integração entre AWS Glue e Apache Kafka para processamento de dados de inventário empresarial. Esta solução aborda desde a extração de movimentações de estoque de um PostgreSQL (RDS) até a publicação em tópicos Kafka com serialização Avro para sistemas downstream.
Visão Geral da Arquitetura
A arquitetura implementada segue um fluxo clássico de ETL distribuído, onde integramos diferentes tecnologias da AWS com o Apache Kafka para criar um pipeline robusto e escalável.
Componentes Principais
Origem dos Dados: Tabela
inventory_movementsno bancoinventory_dbProcessamento: AWS Glue com Apache Spark
Destino: Tópico Kafka
inventory.stock.updates.v1Serialização: Formato Avro com Schema Registry
Extração de Dados do RDS
A extração é implementada pela classe DataReader, que utiliza as melhores práticas de segurança e performance do AWS Glue.
Conexão Segura com PostgreSQL
dynamic_frame = glue_context.create_dynamic_frame.from_options(
connection_type="postgresql",
connection_options={
"url": jdbc_url,
"user": username,
"password": password,
"dbtable": "inventory_movements"
}
)
As credenciais são recuperadas do AWS Secrets Manager (database-connection-secret), garantindo que informações sensíveis nunca sejam expostas no código.
Processamento Incremental com Job Bookmarks
Um dos recursos mais poderosos implementados é o Job Bookmarks do AWS Glue:
Vantagens do Job Bookmark:
🚀 Performance: Reduz volume de dados em até 95%
💰 Custo: Menor tempo de execução = menor custo
🔄 Confiabilidade: Estado persistido automaticamente pelo Glue
Configuração:
Coluna monitorada:
idOrdenação:
ascArgumento:
--job-bookmark-option: job-bookmark-enable
Esta abordagem reduz drasticamente o volume de dados processados, otimizando tempo e custos ao processar apenas registros novos ou modificados.
Processamento e Transformação com Spark
A classe DataProcessor implementa a lógica de negócio utilizando a API DataFrame do Spark de forma distribuída e declarativa.
Transformações Implementadas
def process_inventory_data(self, df):
return df \
.withColumn("product_code_normalized", F.upper(F.col("product_code"))) \
.withColumn("warehouse_location", F.coalesce(F.col("warehouse"), F.lit("MAIN"))) \
.withColumn("stock_status",
F.when(F.col("current_stock") <= F.col("reorder_point"), "LOW_STOCK")
.when(F.col("current_stock") <= F.col("safety_stock"), "CRITICAL")
.when(F.col("current_stock") >= F.col("max_stock"), "OVERSTOCK")
.otherwise("NORMAL")
) \
.withColumn("movement_value", F.col("quantity") * F.col("unit_cost")) \
.withColumn("restock_needed",
F.when(F.col("current_stock") <= F.col("reorder_point"),
F.col("max_stock") - F.col("current_stock"))
.otherwise(F.lit(0))
) \
.cast({"current_stock": "int", "movement_value": "decimal(10,2)"}) \
.select("movement_id", "product_code_normalized", "warehouse_location",
"current_stock", "stock_status", "movement_value", "restock_needed",
"movement_timestamp")
Lógica de Negócio Aplicada:
Normalização: Códigos de produto em maiúsculo para consistência
Classificação de Status: Baseada em pontos de reposição e estoque de segurança
Cálculos Financeiros: Valor total da movimentação (quantidade × custo unitário)
Alertas Automáticos: Identificação de itens que precisam reposição
Tratamento de Nulos: Warehouse padrão para locais não especificados
Distribuição e Paralelização
O Spark particiona automaticamente o DataFrame e distribui as transformações entre os workers:
Paralelização Inteligente:
🔄 16 Partições processadas simultaneamente
⚡ 4 Workers com 4 cores cada = 16 threads
📈 Throughput: ~250K registros/worker em paralelo
Publicação no Apache Kafka
A integração com Kafka é implementada pela classe KafkaProducer, que resolve um desafio importante: a serialização Avro com Schema Registry usando o sink nativo do Spark.
Integração com Schema Registry
def get_latest_schema(self):
response = requests.get(
f"{self.schema_registry_url}/subjects/{self.topic}-value/versions/latest",
auth=(self.username, self.password)
)
return response.json()
O sistema busca dinamicamente a versão mais recente, garantindo compatibilidade automática com mudanças de schema.
Serialização Avro com Confluent Wire Format
Uma das partes mais técnicas da implementação é a serialização manual no formato Confluent Wire:
def avro_confluent_serializer(row, schema, schema_id):
# Serialização Avro
avro_bytes = avro.io.BinaryEncoder(io.BytesIO())
avro.io.DatumWriter(schema).write(row_dict, avro_bytes)
# Confluent Wire Format: Magic Byte (1) + Schema ID (4) + Payload
magic_byte = b'\x00'
schema_id_bytes = struct.pack('>I', schema_id)
return magic_byte + schema_id_bytes + avro_bytes.getvalue()
Fluxo de Serialização
Estrutura do Wire Format:
🎯 Magic Byte (0x00): Identificador do formato Confluent
🔢 Schema ID (4 bytes): Permite evolução de schema
📦 Payload: Dados binários Avro compactos
Este formato permite que consumidores identifiquem o schema usado e deserializem corretamente as mensagens.
Configuração de Ambientes
A solução é configurada para dois ambientes distintos com diferentes estratégias de execução:
| Aspecto | HML | PRD | Justificativa |
| Trigger | ON_DEMAND | SCHEDULED (3x/dia) | Testes manuais vs. automação |
| Bucket S3 | hml-generic-integration-data | prd-generic-integration-data | Isolamento de ambientes |
| Workers | G.1X, 4 Workers (Flex) | G.1X, 4 Workers (Flex) | Configuração consistente |
| Bookmarks | Habilitado | Habilitado | Processamento incremental |
Padrão de Execução:
🌙 02:00-03:00: Transações noturnas + prep. do dia (Volume baixo)
🌅 10:00-11:00: Sincronização vendas matinais (Volume médio)
🌆 18:00-19:00: Processamento completo do dia comercial (Volume alto)
Adaptação por Cenário de Negócio
A frequência de 3 execuções diárias é adequada para operações B2B e manufatura, mas outros cenários podem exigir ajustes:
| Tipo de Negócio | Frequência Recomendada | Justificativa |
| 🏭 Manufatura/B2B | 2-3x/dia | Movimentações planejadas, menos críticas |
| 🏪 Retail Tradicional | 4-6x/dia | Horário comercial concentrado |
| 🛒 E-commerce Alto Volume | A cada 15-30 min | Vendas contínuas, evitar overselling |
| 📦 Logística/Fulfillment | A cada 1-2 horas | Entrada/saída constante de produtos |
💡 Implementação Flexível: O mesmo job pode ser reconfigurado alterando apenas o cron expression no trigger, mantendo toda a lógica de processamento incremental intacta.
Otimizações de Performance
Configurações do Worker
WorkerType: G.1X (1 DPU, 4 vCPU, 16 GB RAM, 64 GB disco)
ExecutionClass: FLEX (usa capacidade sobressalente para reduzir custos)
Auto-scaling: Habilitado para ajuste dinâmico de recursos
Tuning do Spark
As configurações mais importantes para performance:
spark.dynamicAllocation.enabled: true
spark.dynamicAllocation.minExecutors: 2
spark.dynamicAllocation.maxExecutors: 8
spark.dynamicAllocation.initialExecutors: 4
spark.executor.cores: 4
spark.sql.shuffle.partitions: 16
spark.rpc.message.maxSize: 1024 # 1GB para grandes transferências
spark.shuffle.service.enabled: true # Robustez com alocação dinâmica
Alocação Dinâmica de Recursos
🎯 Gerenciamento Inteligente de Recursos:
Faixa Dinâmica: 2-8 executores baseado na carga de trabalho
Otimização de Custos: Pague apenas pelos recursos utilizados
Performance: Auto-ajuste para picos de volume de dados
Monitoramento e Observabilidade
A solução inclui configurações abrangentes de monitoramento:
CloudWatch Logs: Logs contínuos habilitados
Métricas: Coleta detalhada para análise de performance
Headers Kafka: Metadados para rastreabilidade
Headers Padrão para Rastreabilidade
headers = {
"source": "inventory-sync-job",
"timestamp": str(int(time.time())),
"job_run_id": glue_context.get_job_run_id(),
"schema_version": str(schema_id),
"warehouse": row["warehouse_location"],
"movement_type": row["movement_type"] # IN/OUT/ADJUSTMENT
}
Benefícios dos Headers:
Rastreabilidade: Identificação única de cada processamento
Debugging: Correlação entre mensagens e execuções do job
Roteamento: Consumidores podem filtrar por depósito específico
Versionamento: Controle de compatibilidade de schema
Benefícios da Arquitetura para Gestão de Inventário
Escalabilidade
Processamento distribuído para milhões de movimentações diárias
Alocação dinâmica ajusta recursos conforme volume de transações
Particionamento inteligente por warehouse e categoria de produto
Resiliência Operacional
Job Bookmarks garantem que falhas não causem reprocessamento desnecessário
Processamento incremental assegura que apenas novas movimentações sejam processadas
Retry automático em caso de falhas temporárias de rede ou sistema
Eficiência de Custos
ExecutionClass FLEX reduz custos usando capacidade sobressalente
Processamento incremental evita scan completo da base diariamente
Auto-scaling paga apenas pelos recursos efetivamente utilizados
Tempo Real para Negócio
Latência baixa entre movimentação física e atualização dos sistemas
Alertas automáticos de baixo estoque em tempo real
Sincronização multi-sistema via Kafka para ERP, WMS, e-commerce
Governança e Auditoria
Schema Registry garante consistência de dados entre sistemas
Headers padronizados permitem rastreabilidade completa
CloudWatch integration fornece métricas detalhadas de performance
Considerações Finais
Esta implementação demonstra como integrar efetivamente o AWS Glue com Apache Kafka para pipelines de inventário em tempo real, combinando o poder do processamento distribuído do Spark com a flexibilidade dos streams de eventos. A solução aborda desafios reais como:
Sincronização de inventário multi-warehouse em escala empresarial
Serialização Avro com Schema Registry para garantir compatibilidade
Alertas inteligentes baseados em regras de negócio (reorder points, safety stock)
Otimização de custos através de processamento incremental
Principais Lições Aprendidas
Job Bookmarks são fundamentais para pipelines de inventário eficientes e evitam reprocessamento de milhões de movimentações
Serialização manual pode ser necessária quando integrações específicas são requeridas (como Confluent Wire Format)
Transformações de negócio complexas (status de estoque, cálculos de reposição) se beneficiam enormemente do processamento distribuído
Configurações de tuning corretas podem reduzir o tempo de execução em até 60%
Headers Kafka bem estruturados são essenciais para debugging e roteamento downstream
Casos de Uso Práticos
Esta arquitetura suporta cenários empresariais reais como:
E-commerce: Sincronização de estoque entre loja física e online
Supply Chain: Alertas automáticos para reposição de produtos críticos
Manufatura: Controle de matéria-prima e produtos acabados
Retail: Otimização de estoque sazonal e gestão multi-loja
A solução apresentada serve como base sólida para pipelines de inventário empresariais, oferecendo escalabilidade para processar milhões de transações diárias, confiabilidade para operações 24/7, e eficiência de custos através de otimizações inteligentes.
Este artigo foi baseado em uma implementação real de produção. Para mais detalhes sobre configurações específicas ou adaptações para seu caso de uso, sinta-se à vontade para entrar em contato.