Building your first real-time CDC pipeline can feel overwhelming with the abundance of tools and architectural choices available. This hands-on guide walks through a complete, production-ready example that streams changes from a PostgreSQL database through Kafka to a data warehouse, demonstrating every step from initial setup to monitoring. Rather than abstract concepts, you’ll see actual commands, configuration files, and code that you can adapt for your own use cases. By the end, you’ll understand not just how to build a CDC pipeline, but why specific design decisions matter and how to troubleshoot common issues.
The Example Scenario and Architecture
Our example implements a common real-world scenario: an e-commerce platform needs to synchronize its operational PostgreSQL database to a Redshift data warehouse for analytics. The operational database handles customer orders, constantly inserting new orders, updating order statuses, and occasionally deleting cancelled orders. The analytics team needs this data in near real-time—within minutes of changes occurring—to power dashboards showing current sales metrics and inventory levels.
The architecture uses Debezium as the CDC connector, Kafka as the message broker, and a custom Python consumer that applies changes to Redshift. This stack provides excellent reliability, scalability, and operational visibility. Debezium connects to PostgreSQL’s replication slot, decodes the write-ahead log, and produces change events to Kafka. The Python consumer reads from Kafka topics, transforms the events, and uses SQL MERGE operations to update Redshift tables.
Here’s the complete data flow:
- Application writes to PostgreSQL (order placed, status updated)
- PostgreSQL records changes in write-ahead log
- Debezium reads WAL through replication slot
- Debezium produces change event to Kafka topic
ecommerce.public.orders - Python consumer reads from Kafka topic
- Consumer transforms and batches changes
- Consumer executes MERGE statement in Redshift
- Analytics queries reflect updated data
This architecture separates concerns cleanly—Debezium focuses on reliable capture, Kafka provides durable buffering, and the consumer handles transformation and loading. If any component fails, the others continue operating normally, and processing resumes from the last checkpoint when the failed component recovers.
Real-Time CDC Pipeline Flow
Setting Up PostgreSQL for CDC
The first step is configuring PostgreSQL to support logical replication. This requires database-level settings and creating the appropriate database objects. Start by modifying postgresql.conf:
# Enable logical replication
wal_level = logical
# Allow enough replication connections
max_replication_slots = 4
max_wal_senders = 4
# Retain WAL for at least 2 hours
wal_keep_size = 2GB
After modifying the configuration, restart PostgreSQL for changes to take effect. The wal_level = logical setting is crucial—it instructs PostgreSQL to include enough information in the write-ahead log for logical decoding. The retention settings ensure that if your CDC pipeline experiences downtime, PostgreSQL won’t discard WAL segments before they’re processed.
Next, create a dedicated database user for replication with appropriate permissions:
-- Create replication user
CREATE USER debezium_user WITH REPLICATION PASSWORD 'secure_password_here';
-- Grant necessary permissions
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
GRANT USAGE ON SCHEMA public TO debezium_user;
-- Allow future table access
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO debezium_user;
The REPLICATION attribute allows this user to create replication slots and consume WAL data. The SELECT permissions enable Debezium to perform the initial snapshot of existing data before starting to stream changes.
Create a publication that defines which tables to replicate:
-- Create publication for order-related tables
CREATE PUBLICATION debezium_publication FOR TABLE
orders,
order_items,
customers;
Publications are PostgreSQL’s way of defining what data should be available for replication. By specifying individual tables, you avoid replicating unnecessary data like internal audit tables or temporary tables. You can also create publications for ALL TABLES, but explicit table lists provide better control and clarity.
Finally, verify your configuration by checking that logical replication is enabled:
-- Check WAL level
SHOW wal_level; -- Should return 'logical'
-- Verify publication exists
SELECT * FROM pg_publication WHERE pubname = 'debezium_publication';
-- Check that tables are included
SELECT * FROM pg_publication_tables WHERE pubname = 'debezium_publication';
Deploying Debezium with Kafka Connect
Debezium runs as a connector within Kafka Connect, which provides the distributed runtime and REST API. For this example, we’ll use Docker Compose to deploy Kafka, Zookeeper, and Kafka Connect with Debezium. Here’s the complete docker-compose.yml:
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
connect:
image: debezium/connect:2.4
depends_on:
- kafka
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: debezium_configs
OFFSET_STORAGE_TOPIC: debezium_offsets
STATUS_STORAGE_TOPIC: debezium_statuses
Start the stack with docker-compose up -d. Once running, configure the Debezium PostgreSQL connector by submitting this JSON to the Connect REST API:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "ecommerce-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres-host.example.com",
"database.port": "5432",
"database.user": "debezium_user",
"database.password": "secure_password_here",
"database.dbname": "ecommerce",
"database.server.name": "ecommerce",
"table.include.list": "public.orders,public.order_items,public.customers",
"plugin.name": "pgoutput",
"publication.name": "debezium_publication",
"slot.name": "debezium_slot",
"snapshot.mode": "initial",
"heartbeat.interval.ms": "10000"
}
}'
Let’s examine the key configuration parameters:
database.server.name: Forms the prefix for Kafka topic names. Topics will be namedecommerce.public.orders,ecommerce.public.order_items, etc.plugin.name: Specifiespgoutput, PostgreSQL’s native logical decoding pluginsnapshot.mode: Set toinitialto capture existing data before streaming changesheartbeat.interval.ms: Sends periodic heartbeat messages even when no data changes, helping monitor connector health
After submitting the connector configuration, Debezium immediately begins the initial snapshot, reading all existing data from the specified tables and producing it to Kafka. You can monitor progress using the Connect REST API:
# Check connector status
curl http://localhost:8083/connectors/ecommerce-postgres-connector/status
# View connector tasks
curl http://localhost:8083/connectors/ecommerce-postgres-connector/tasks
Once the snapshot completes, Debezium switches to streaming mode, continuously reading from PostgreSQL’s replication slot and producing change events to Kafka.
Understanding the Change Event Structure
Before building the consumer, it’s important to understand the structure of change events Debezium produces. Here’s an example event for an order update:
{
"before": {
"order_id": 12345,
"customer_id": 678,
"status": "pending",
"total_amount": 129.99,
"created_at": "2024-10-20T10:30:00Z",
"updated_at": "2024-10-20T10:30:00Z"
},
"after": {
"order_id": 12345,
"customer_id": 678,
"status": "shipped",
"total_amount": 129.99,
"created_at": "2024-10-20T10:30:00Z",
"updated_at": "2024-10-23T14:22:15Z"
},
"source": {
"version": "2.4.0.Final",
"connector": "postgresql",
"name": "ecommerce",
"ts_ms": 1698068535000,
"snapshot": "false",
"db": "ecommerce",
"sequence": "[\"23456789\",\"23456800\"]",
"schema": "public",
"table": "orders",
"txId": 567890,
"lsn": 23456800,
"xmin": null
},
"op": "u",
"ts_ms": 1698068535150,
"transaction": null
}
The event structure contains everything needed to understand and apply the change:
before: The row state before the change (null for inserts)after: The row state after the change (null for deletes)source: Metadata including database position, transaction ID, and timestampop: Operation type (‘c’ for create, ‘u’ for update, ‘d’ for delete, ‘r’ for read during snapshot)ts_ms: When Debezium processed this event
The before and after fields enable sophisticated change tracking. You can compute exactly what changed, implement slowly changing dimensions, or maintain full history tables. The source.lsn (log sequence number) provides a total ordering of changes, allowing you to handle out-of-order delivery.
Building the Python Consumer
The Python consumer reads from Kafka, transforms events, and applies changes to Redshift. Using the confluent-kafka library provides high-performance consumption with automatic offset management:
from confluent_kafka import Consumer, KafkaError
import json
import psycopg2
from datetime import datetime
class CDCConsumer:
def __init__(self, kafka_config, redshift_config):
self.consumer = Consumer(kafka_config)
self.redshift_conn = psycopg2.connect(**redshift_config)
self.batch = []
self.batch_size = 100
self.batch_timeout = 5 # seconds
self.last_commit = datetime.now()
def run(self):
self.consumer.subscribe(['ecommerce.public.orders'])
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
# Check if we should flush batch on timeout
if self.batch and self._should_flush():
self._flush_batch()
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(f"Consumer error: {msg.error()}")
break
# Process message
event = json.loads(msg.value().decode('utf-8'))
self._process_event(event)
# Flush batch if size threshold reached
if len(self.batch) >= self.batch_size:
self._flush_batch()
finally:
self.consumer.close()
self.redshift_conn.close()
def _process_event(self, event):
"""Transform event and add to batch"""
op = event['op']
if op == 'c' or op == 'r': # Insert or snapshot read
row = event['after']
self.batch.append(('INSERT', row))
elif op == 'u': # Update
row = event['after']
self.batch.append(('UPDATE', row))
elif op == 'd': # Delete
row = event['before']
self.batch.append(('DELETE', row))
def _flush_batch(self):
"""Apply batched changes to Redshift"""
if not self.batch:
return
cursor = self.redshift_conn.cursor()
try:
# Create temporary staging table
cursor.execute("""
CREATE TEMP TABLE orders_staging (LIKE orders)
""")
# Separate inserts/updates from deletes
upserts = [row for op, row in self.batch if op != 'DELETE']
deletes = [row for op, row in self.batch if op == 'DELETE']
# Load upserts to staging
if upserts:
values = []
for row in upserts:
values.append(cursor.mogrify(
"(%s, %s, %s, %s, %s, %s)",
(row['order_id'], row['customer_id'],
row['status'], row['total_amount'],
row['created_at'], row['updated_at'])
).decode('utf-8'))
cursor.execute(f"""
INSERT INTO orders_staging VALUES {','.join(values)}
""")
# Merge staging into target
cursor.execute("""
DELETE FROM orders
WHERE order_id IN (SELECT order_id FROM orders_staging);
INSERT INTO orders
SELECT * FROM orders_staging;
""")
# Handle deletes
if deletes:
delete_ids = [row['order_id'] for row in deletes]
cursor.execute("""
DELETE FROM orders
WHERE order_id = ANY(%s)
""", (delete_ids,))
self.redshift_conn.commit()
self.consumer.commit() # Commit Kafka offsets
print(f"Flushed batch: {len(self.batch)} changes")
self.batch = []
self.last_commit = datetime.now()
except Exception as e:
self.redshift_conn.rollback()
print(f"Error flushing batch: {e}")
raise
finally:
cursor.close()
def _should_flush(self):
"""Check if batch timeout has elapsed"""
elapsed = (datetime.now() - self.last_commit).total_seconds()
return elapsed >= self.batch_timeout
# Configuration
kafka_config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'redshift-consumer',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Manual commit after successful write
}
redshift_config = {
'host': 'redshift-cluster.example.com',
'port': 5439,
'database': 'analytics',
'user': 'etl_user',
'password': 'secure_password'
}
# Run consumer
consumer = CDCConsumer(kafka_config, redshift_config)
consumer.run()
This consumer implements several important patterns:
Batching: Accumulates changes before writing to Redshift, reducing network overhead and improving throughput. Batches flush either when size threshold is reached or after a timeout period.
Staging Tables: Loads changes to temporary staging tables first, then merges them into target tables. This pattern is more efficient than individual row operations and provides transactional atomicity.
Manual Offset Commits: Only commits Kafka offsets after successfully writing to Redshift. This ensures at-least-once semantics—if the consumer crashes after reading from Kafka but before writing to Redshift, those events will be reprocessed.
Error Handling: Rolls back database transactions on errors and allows exceptions to propagate, which will cause the consumer to restart and retry from the last committed offset.
Performance Optimization Tips
Testing the Complete Pipeline
With all components deployed, test the pipeline end-to-end. Insert a new order in PostgreSQL:
INSERT INTO orders (order_id, customer_id, status, total_amount, created_at, updated_at)
VALUES (12346, 679, 'pending', 249.99, NOW(), NOW());
Monitor the flow through each component:
# Watch Kafka topic for new events
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic ecommerce.public.orders --from-beginning
# Check consumer lag
kafka-consumer-groups --bootstrap-server localhost:9092 \
--describe --group redshift-consumer
# Verify data in Redshift
psql -h redshift-cluster.example.com -U etl_user -d analytics \
-c "SELECT * FROM orders WHERE order_id = 12346;"
The event should appear in Kafka within milliseconds, and in Redshift within seconds (depending on your batch timeout). This end-to-end latency—typically under 10 seconds—enables near real-time analytics.
Test update and delete operations:
-- Update order status
UPDATE orders SET status = 'shipped', updated_at = NOW()
WHERE order_id = 12346;
-- Delete cancelled order
DELETE FROM orders WHERE order_id = 12346;
Each operation produces a corresponding change event that flows through the pipeline. The Python consumer applies updates using the MERGE pattern and executes DELETE statements for deleted records.
Monitoring and Troubleshooting
Production CDC pipelines require comprehensive monitoring. Key metrics include:
Debezium Connector Metrics: Monitor through JMX or Kafka Connect REST API
- Snapshot progress during initial load
- Number of events produced per second
- Queue size (backlog of events waiting to be sent to Kafka)
- Last event timestamp (detect stale connectors)
Kafka Metrics: Monitor through Kafka JMX or management tools
- Topic partition lag (events produced vs consumed)
- Consumer group lag (how far behind real-time)
- Broker disk usage (ensure topics don’t exhaust storage)
Consumer Application Metrics: Instrument your Python consumer
- Batch processing time
- Events processed per second
- Error rate and types
- Redshift write latency
Common issues and solutions:
Growing Consumer Lag: Consumer can’t keep up with event rate. Solution: Increase batch size, add more consumer instances, or optimize Redshift writes.
Connector Stops Producing: Replication slot may have been dropped or connector crashed. Check connector status and logs. Restart connector if needed.
Duplicate Events in Target: Consumer crashed between Kafka read and offset commit. Solution: Implement idempotent writes using UPSERT operations.
Missing Events: Replication slot fell too far behind and WAL was purged. Solution: Increase wal_keep_size, perform new snapshot.
Conclusion
This complete example demonstrates that building a production-ready CDC pipeline is achievable with the right architecture and tooling. By leveraging PostgreSQL’s logical replication, Debezium’s reliable change capture, Kafka’s durable messaging, and custom Python consumers, you create a pipeline that streams database changes in real-time while maintaining data consistency and operational resilience.
The patterns shown here—batching for efficiency, staging tables for atomicity, manual offset management for consistency—apply broadly across CDC implementations regardless of specific technologies. Understanding these fundamentals positions you to adapt this example to your specific databases, message brokers, and target systems while maintaining the reliability and performance characteristics that production workloads demand.