Designing a Change Data Capture (CDC) pipeline that reliably delivers incremental data loads requires more than just connecting a CDC tool to your database and hoping for the best. Production-grade CDC pipelines must handle edge cases, maintain consistency during failures, scale with data volume growth, and provide visibility into their operation. The difference between a prototype that works in development and a system that runs reliably in production lies in the design decisions you make and the best practices you follow. This article explores proven patterns and anti-patterns learned from operating CDC pipelines at scale, focusing on practical design choices that prevent data loss, ensure consistency, and maintain performance under real-world conditions.
Establishing State Management and Checkpointing
State management forms the foundation of reliable CDC pipelines. Your pipeline must track which changes have been processed to avoid duplicating work or skipping data. This tracking mechanism—often called checkpointing or bookmarking—determines whether your pipeline can recover correctly after failures and whether it provides exactly-once or at-least-once semantics.
The checkpoint represents your pipeline’s position in the source change stream. For log-based CDC, this might be a log sequence number (LSN) in PostgreSQL, a binlog position in MySQL, or a System Change Number (SCN) in Oracle. The checkpoint must be stored durably and updated transactionally with the application of changes to prevent inconsistencies.
Never store checkpoints in memory only. A common anti-pattern is tracking position in application state that’s lost when the process restarts. This forces you to either re-process all changes from the beginning or guess at a restart position, both unacceptable in production. Instead, persist checkpoints to durable storage—database tables, distributed coordination services like ZooKeeper, or the metadata facilities provided by your message broker.
Update checkpoints transactionally with data writes. Consider this sequence:
# Anti-pattern: Non-transactional checkpoint update
def process_batch(changes):
for change in changes:
apply_change_to_target(change)
# If this fails, we've applied changes but not updated position
update_checkpoint(changes[-1].position)
The problem: if update_checkpoint fails or the process crashes between applying changes and updating the checkpoint, those changes will be reprocessed on restart, creating duplicates. The correct pattern couples checkpoint updates with data writes:
# Best practice: Transactional checkpoint update
def process_batch(changes):
with transaction():
for change in changes:
apply_change_to_target(change)
# Commit checkpoint and data changes atomically
update_checkpoint(changes[-1].position)
This atomic coupling means either both succeed or both fail. On restart, the checkpoint reflects exactly which changes have been safely applied.
Implement checkpoint validation on startup. When your pipeline restarts, verify that the stored checkpoint position is still valid in the source system. Transaction logs have retention limits—if your pipeline was down longer than the retention period, the checkpoint position may no longer exist. Detect this condition and trigger a full refresh rather than attempting to continue from an invalid position.
def initialize_pipeline():
checkpoint = load_checkpoint()
if not is_position_valid(source_db, checkpoint):
logging.error(f"Checkpoint {checkpoint} no longer available in source")
trigger_full_refresh()
return
start_streaming_from(checkpoint)
Consider checkpoint granularity carefully. Checkpointing after every individual change provides the finest-grained recovery but incurs significant overhead. Checkpointing after each batch balances recovery granularity with performance. For extremely high-volume systems, you might checkpoint based on time intervals (every 10 seconds) or byte thresholds (every 100MB processed), accepting that recovery might reprocess a bounded amount of data.
CDC Pipeline Design Principles
Designing for Idempotency and Duplicate Handling
At-least-once delivery is far easier to implement than exactly-once delivery in distributed systems. Your CDC pipeline will likely deliver some changes multiple times due to retries, restarts, or network issues. Rather than attempting to guarantee exactly-once delivery (which is complex and brittle), design your pipeline to be idempotent—making duplicate delivery harmless.
Use upsert operations instead of pure inserts or updates. When applying changes to target systems, use merge or upsert semantics that handle both cases:
-- Instead of INSERT (fails on duplicates) or UPDATE (fails on missing rows)
-- Use MERGE that handles both cases idempotently
MERGE INTO target_table t
USING staging_table s
ON t.id = s.id
WHEN MATCHED THEN
UPDATE SET t.value = s.value, t.updated_at = s.updated_at
WHEN NOT MATCHED THEN
INSERT (id, value, created_at, updated_at)
VALUES (s.id, s.value, s.created_at, s.updated_at);
This operation succeeds whether the row exists or not, making it safe to apply multiple times. PostgreSQL’s INSERT ... ON CONFLICT, MySQL’s INSERT ... ON DUPLICATE KEY UPDATE, and most data warehouses provide similar capabilities.
Include version information or timestamps in your data model. When you must prevent stale updates from overwriting fresher data, include version numbers or timestamps:
-- Only apply update if source timestamp is newer
UPDATE target_table
SET value = $1, updated_at = $2
WHERE id = $3
AND (updated_at IS NULL OR updated_at < $2);
This prevents out-of-order changes from corrupting data. If an older update arrives after a newer one has been applied, the condition prevents the update from executing.
Deduplicate at the source when possible. If your pipeline accumulates changes in a staging area before applying them, deduplicate within the batch:
def deduplicate_batch(changes):
"""Keep only the latest change per key"""
deduped = {}
for change in sorted(changes, key=lambda c: c.timestamp):
key = change.table_name + ':' + change.primary_key
# Keep the change with the highest timestamp
if key not in deduped or change.timestamp > deduped[key].timestamp:
deduped[key] = change
return deduped.values()
This reduces the number of operations applied to target systems and ensures each entity reflects its latest state.
Implement dead letter queues for truly duplicate detection. For cases where you need to detect exact duplicates (same change delivered twice), maintain a recent history of processed event IDs:
def process_change(change):
# Check if we've seen this event ID recently
if redis_client.exists(f"processed:{change.event_id}"):
logging.info(f"Skipping duplicate event {change.event_id}")
return
# Process the change
apply_change(change)
# Mark as processed with TTL (e.g., 1 hour)
redis_client.setex(f"processed:{change.event_id}", 3600, "1")
This pattern works well when CDC tools provide unique event identifiers and duplicate windows are bounded.
Handling Schema Evolution Without Downtime
Schema changes in source databases pose significant challenges for CDC pipelines. Your pipeline must accommodate new columns, modified types, and restructured tables without data loss or downtime. The key is building flexibility into your design from the start.
Implement schema registry integration early. Rather than hardcoding schema expectations, use a schema registry (like Confluent Schema Registry or AWS Glue Data Catalog) to manage schema versions:
from confluent_kafka import avro
from confluent_kafka.avro import AvroConsumer
consumer = AvroConsumer({
'bootstrap.servers': 'kafka:9092',
'group.id': 'cdc-consumer',
'schema.registry.url': 'http://schema-registry:8081'
})
# Consumer automatically fetches and validates schemas
for msg in consumer.consume():
# Deserialization handles schema evolution automatically
record = msg.value()
Schema registries enforce compatibility rules that prevent breaking changes and manage schema versions, enabling gradual rollouts of schema changes across pipeline components.
Design target schemas with flexibility. Use semi-structured column types when appropriate to handle evolving schemas:
-- Flexible schema using JSONB for extensibility
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY,
customer_id BIGINT NOT NULL,
status VARCHAR(50) NOT NULL,
-- Core structured fields above
-- Extended attributes in JSONB for flexibility
attributes JSONB,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);
New source columns can be added to the attributes JSONB field without altering the target table structure. You can later promote frequently-queried fields to proper columns through controlled migrations.
Implement schema change detection and alerting. Monitor for schema changes and alert teams before they break pipelines:
def monitor_schema_changes():
current_schema = fetch_table_schema(source_db, 'orders')
cached_schema = load_from_cache('orders_schema')
if current_schema != cached_schema:
diff = compute_schema_diff(cached_schema, current_schema)
if is_breaking_change(diff):
alert("CRITICAL: Breaking schema change detected", diff)
else:
alert("INFO: Additive schema change detected", diff)
update_cache('orders_schema', current_schema)
This proactive monitoring gives you time to adjust pipeline code before changes cause failures.
Handle different change types with appropriate strategies:
- Added columns: Automatically include in CDC events and either map to target columns or store in flexible attributes
- Dropped columns: Continue accepting old events with the column, simply ignore it in processing
- Renamed columns: Treat as drop + add; maintain both names temporarily during transition
- Type changes: Most challenging—often requires adding new column, migrating consumers, then dropping old column
Maintain backward compatibility windows. When making breaking changes, support both old and new schemas for a defined transition period:
def transform_event(event, schema_version):
if schema_version < 2:
# Handle old schema format
return transform_v1(event)
elif schema_version < 3:
# Handle intermediate schema
return transform_v2(event)
else:
# Handle current schema
return transform_v3(event)
This allows gradual migration of components without requiring simultaneous updates across the entire pipeline.
Optimizing for Throughput and Latency
CDC pipeline performance has two primary dimensions: throughput (how many changes per second) and latency (how quickly changes propagate). These often conflict—optimizations that improve throughput (like larger batches) typically increase latency. Design your pipeline to explicitly target your requirements.
Batch appropriately for your latency SLA. If your SLA allows 5-minute latency, batching changes for 60 seconds dramatically improves throughput:
class BatchedConsumer:
def __init__(self, batch_size=1000, batch_timeout=60):
self.batch = []
self.batch_size = batch_size
self.batch_timeout = batch_timeout
self.last_flush = time.time()
def consume(self):
while True:
msg = self.poll(timeout=1.0)
if msg:
self.batch.append(msg)
# Flush on size or time threshold
should_flush = (
len(self.batch) >= self.batch_size or
time.time() - self.last_flush >= self.batch_timeout
)
if should_flush and self.batch:
self.flush_batch()
This guarantees latency stays within SLA while maximizing batch sizes for efficiency.
Parallelize based on independence guarantees. Changes to different entities can process in parallel, but changes to the same entity must maintain order:
# Partition messages by entity ID to enable parallelism
def get_partition_key(change):
return change.primary_key # All changes to same entity go to same partition
# Each partition processes independently in order
for partition in partitions:
ThreadPoolExecutor.submit(process_partition, partition)
Kafka’s partitioning naturally enables this pattern—partition CDC topics by primary key to ensure changes to the same entity remain ordered while different entities process in parallel.
Implement connection pooling and batched writes. Instead of opening a connection per change, maintain connection pools and batch database operations:
from contextlib import contextmanager
@contextmanager
def get_connection(pool):
conn = pool.get_connection()
try:
yield conn
finally:
pool.return_connection(conn)
def apply_batch(changes):
with get_connection(target_pool) as conn:
cursor = conn.cursor()
# Use executemany for batch operations
cursor.executemany(
"INSERT INTO target VALUES (%s, %s, %s) ON CONFLICT ...",
[(c.id, c.value, c.timestamp) for c in changes]
)
conn.commit()
Batch writes reduce network round trips and database overhead significantly compared to individual operations.
Monitor and optimize bottlenecks systematically. Instrument each pipeline stage to identify where time is spent:
import time
from dataclasses import dataclass
@dataclass
class Metrics:
read_time: float = 0
transform_time: float = 0
write_time: float = 0
total_count: int = 0
def process_with_metrics(changes):
metrics = Metrics()
start = time.time()
# Read from source
read_start = time.time()
events = read_changes(changes)
metrics.read_time = time.time() - read_start
# Transform
transform_start = time.time()
transformed = transform_events(events)
metrics.transform_time = time.time() - transform_start
# Write to target
write_start = time.time()
write_to_target(transformed)
metrics.write_time = time.time() - write_start
metrics.total_count = len(changes)
log_metrics(metrics)
This visibility shows whether you’re bottlenecked on source reads, transformation logic, or target writes, guiding optimization efforts.
Performance Optimization Checklist
✓ Use bulk write APIs
✓ Parallelize across partitions
✓ Compress data in transit
✓ Use connection pooling
✓ Decrease polling intervals
✓ Optimize transformation logic
✓ Use fast network paths
✓ Minimize serialization overhead
Building Comprehensive Monitoring and Alerting
Reliable CDC pipelines require visibility into their operation. Unlike batch ETL where success or failure is obvious, CDC issues often manifest as subtle degradation—gradually increasing lag or slowly growing error rates that compound over time.
Track end-to-end latency as your primary health metric. Measure the time between when a change occurs in the source database and when it’s visible in the target system:
def calculate_latency(change):
source_timestamp = change.source_metadata['ts_ms']
processing_timestamp = time.time() * 1000 # Current time in milliseconds
latency_ms = processing_timestamp - source_timestamp
# Emit metric to monitoring system
metrics.gauge('cdc.latency.ms', latency_ms, tags=[
f'source:{change.database}',
f'table:{change.table}'
])
return latency_ms
Alert when latency exceeds SLA thresholds or when it trends upward, indicating the pipeline can’t keep up with change rate.
Monitor queue depths and lag at each stage. For message broker-based pipelines, track consumer lag:
def check_consumer_lag():
admin = AdminClient({'bootstrap.servers': 'kafka:9092'})
for topic, partitions in topics.items():
for partition in partitions:
# Get latest offset
high_water = admin.get_watermark_offsets(
TopicPartition(topic, partition)
)[1]
# Get consumer position
committed = consumer.committed([
TopicPartition(topic, partition)
])[0].offset
lag = high_water - committed
if lag > 10000: # Alert threshold
alert(f"High lag on {topic}:{partition}: {lag} messages")
Growing lag indicates your consumers can’t keep up with producers, requiring scaling or optimization.
Implement data quality checks and reconciliation. Periodically verify that target systems match source systems:
def reconcile_counts():
source_count = execute_query(
source_db,
"SELECT COUNT(*) FROM orders WHERE updated_at > NOW() - INTERVAL '1 day'"
)
target_count = execute_query(
target_db,
"SELECT COUNT(*) FROM orders WHERE updated_at > NOW() - INTERVAL '1 day'"
)
discrepancy = abs(source_count - target_count)
if discrepancy > 100: # Threshold based on expected lag
alert(f"Count mismatch: source={source_count}, target={target_count}")
Regular reconciliation catches issues like silent failures or stuck pipelines that might not trigger other alerts.
Log structured data for debugging. When failures occur, structured logs enable quick diagnosis:
import structlog
logger = structlog.get_logger()
def process_change(change):
log = logger.bind(
event_id=change.event_id,
table=change.table,
operation=change.operation,
primary_key=change.primary_key
)
try:
log.info("processing_change")
apply_change(change)
log.info("change_applied")
except Exception as e:
log.error("change_failed", error=str(e))
raise
Structured logs allow querying by specific events, tables, or error types, dramatically speeding up troubleshooting.
Create dashboards that tell a story. Good monitoring dashboards answer key questions at a glance:
- Is the pipeline healthy? (green/yellow/red status indicator)
- How far behind are we? (lag metrics and trends)
- What’s the throughput? (events per second)
- Are there errors? (error rate and recent error samples)
- What’s the resource usage? (CPU, memory, disk I/O)
Organize dashboards by audience—operational dashboards for on-call engineers focus on immediate health, while analytical dashboards for capacity planning show trends over weeks or months.
Conclusion
Reliable CDC pipelines don’t happen by accident—they result from deliberate design choices that anticipate failure modes, handle edge cases, and provide operational visibility. The best practices covered here—transactional checkpointing, idempotent operations, flexible schema handling, performance optimization, and comprehensive monitoring—represent lessons learned from running CDC pipelines at scale across diverse environments and use cases.
Building production-grade CDC infrastructure requires balancing competing concerns: consistency versus performance, flexibility versus simplicity, real-time versus cost. By following these established patterns and understanding the tradeoffs inherent in each design decision, you can build CDC pipelines that reliably deliver incremental data loads while remaining maintainable, debuggable, and adaptable to changing requirements.