Building a Change Data Capture (CDC) pipeline with Apache Airflow and PostgreSQL creates a powerful data integration solution that balances real-time requirements with operational simplicity. While Airflow is traditionally known for batch orchestration, its extensible architecture and support for sensors, custom operators, and dynamic DAG generation make it surprisingly capable for near real-time CDC workloads. PostgreSQL’s logical replication capabilities provide reliable change tracking without the overhead of trigger-based solutions or application-level change detection. This article explores how to architect and implement a CDC pipeline that leverages these technologies to maintain synchronized data across systems with minimal latency and maximum reliability.
Understanding PostgreSQL Logical Replication for CDC
PostgreSQL’s logical replication feature forms the foundation of an effective CDC pipeline. Unlike physical replication that copies raw disk blocks, logical replication decodes the write-ahead log (WAL) into logical change events representing individual row operations. This means you can selectively replicate specific tables, transform data during replication, and even replicate to non-PostgreSQL targets.
The architecture revolves around publications and subscriptions. A publication defines which tables and operations (INSERT, UPDATE, DELETE) should be captured on the source database. A replication slot maintains the position in the WAL, ensuring no changes are lost even if your consumer temporarily disconnects. The replication slot acts as a bookmark, preventing PostgreSQL from discarding WAL segments until they’ve been consumed.
Enabling logical replication requires specific PostgreSQL configuration. In your postgresql.conf
, set these parameters:
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
The wal_level = logical
setting instructs PostgreSQL to include enough information in the WAL for logical decoding. The max_replication_slots
parameter determines how many concurrent CDC consumers you can support. After changing these settings, restart PostgreSQL to apply them.
Creating a publication is straightforward SQL:
-- Create publication for specific tables
CREATE PUBLICATION cdc_publication FOR TABLE
customers, orders, order_items
WITH (publish = 'insert, update, delete');
-- Create replication slot
SELECT pg_create_logical_replication_slot('airflow_cdc_slot', 'pgoutput');
The pgoutput
plugin is PostgreSQL’s native logical decoding plugin. It produces standardized output that’s compatible with PostgreSQL’s replication protocol. Alternative plugins like wal2json
produce JSON output, which can be easier to work with in some scenarios but require additional installation.
CDC Pipeline Architecture
Building Custom Airflow Operators for CDC
Airflow’s operator-based architecture allows you to build custom operators specifically designed for CDC workloads. A CDC operator needs to connect to the replication slot, consume change events, and process them idempotently. Here’s a production-ready implementation:
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import psycopg2
from psycopg2.extras import LogicalReplicationConnection
import json
class PostgresCDCOperator(BaseOperator):
@apply_defaults
def __init__(
self,
source_conn_id,
slot_name,
publication_name,
target_conn_id,
tables,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.source_conn_id = source_conn_id
self.slot_name = slot_name
self.publication_name = publication_name
self.target_conn_id = target_conn_id
self.tables = tables
def execute(self, context):
# Get connection details from Airflow connections
source_hook = PostgresHook(postgres_conn_id=self.source_conn_id)
target_hook = PostgresHook(postgres_conn_id=self.target_conn_id)
# Connect with replication protocol
conn = psycopg2.connect(
source_hook.get_uri(),
connection_factory=LogicalReplicationConnection
)
cur = conn.cursor()
try:
# Start replication from current position
cur.start_replication(
slot_name=self.slot_name,
decode=True,
options={
'proto_version': '1',
'publication_names': self.publication_name
}
)
changes = []
timeout = 30 # Process for 30 seconds then return
# Consume messages
cur.consume_stream(
lambda msg: self._process_message(msg, changes),
timeout=timeout
)
# Apply changes to target
if changes:
self._apply_changes(target_hook, changes)
# Acknowledge processing
cur.send_feedback(flush_lsn=cur.wal_end)
finally:
cur.close()
conn.close()
return len(changes)
def _process_message(self, msg, changes):
"""Process individual replication message"""
if msg.data_start:
# Parse change event
change = self._parse_change(msg.payload)
if change and change['table'] in self.tables:
changes.append(change)
msg.cursor.send_feedback(flush_lsn=msg.data_start)
def _parse_change(self, payload):
"""Parse replication protocol message"""
# Implementation depends on output plugin
# For pgoutput, parse the binary protocol
# For wal2json, parse JSON directly
pass
def _apply_changes(self, target_hook, changes):
"""Apply changes to target database"""
conn = target_hook.get_conn()
cur = conn.cursor()
try:
for change in changes:
if change['action'] == 'INSERT':
self._handle_insert(cur, change)
elif change['action'] == 'UPDATE':
self._handle_update(cur, change)
elif change['action'] == 'DELETE':
self._handle_delete(cur, change)
conn.commit()
except Exception as e:
conn.rollback()
raise
finally:
cur.close()
This operator encapsulates the complexity of consuming logical replication streams. The execute
method connects to the replication slot, consumes messages for a defined period, and applies them to the target database. The timeout-based approach ensures the operator completes within Airflow’s task execution model while still processing changes frequently enough for near real-time requirements.
The key design decision is how long to consume messages before returning. Shorter timeouts (30-60 seconds) provide lower latency but require more frequent task executions. Longer timeouts reduce orchestration overhead but increase end-to-end latency. For truly real-time requirements, you might consume for just a few seconds and schedule the DAG to run continuously.
Implementing Idempotent Change Application
CDC pipelines must handle scenarios where the same change event is processed multiple times. Network failures, task retries, and replication slot rewinds can all cause duplicate deliveries. Your change application logic must be idempotent—applying the same change multiple times produces the same result as applying it once.
For INSERT operations, use INSERT ON CONFLICT to handle duplicates:
def _handle_insert(self, cursor, change):
table = change['table']
columns = change['columns']
values = change['values']
pk_columns = change['primary_keys']
# Build column and value lists
col_names = ', '.join(columns)
placeholders = ', '.join(['%s'] * len(values))
# Build conflict resolution
updates = ', '.join([
f"{col} = EXCLUDED.{col}"
for col in columns if col not in pk_columns
])
sql = f"""
INSERT INTO {table} ({col_names})
VALUES ({placeholders})
ON CONFLICT ({', '.join(pk_columns)})
DO UPDATE SET {updates}
"""
cursor.execute(sql, values)
This pattern inserts the row if it doesn’t exist, or updates it if it does—making the operation idempotent. Even if you process the same INSERT event multiple times, the final state is correct.
UPDATE operations require checking that you’re not applying stale changes:
def _handle_update(self, cursor, change):
table = change['table']
pk_columns = change['primary_keys']
pk_values = [change['values'][change['columns'].index(pk)]
for pk in pk_columns]
# Include version/timestamp check
set_clause = ', '.join([
f"{col} = %s"
for col in change['columns']
if col not in pk_columns
])
where_clause = ' AND '.join([f"{pk} = %s" for pk in pk_columns])
# Only update if change is newer than current data
sql = f"""
UPDATE {table}
SET {set_clause}, updated_at = %s
WHERE {where_clause}
AND (updated_at IS NULL OR updated_at < %s)
"""
values = [v for k, v in zip(change['columns'], change['values'])
if k not in pk_columns]
values.append(change['commit_timestamp'])
values.extend(pk_values)
values.append(change['commit_timestamp'])
cursor.execute(sql, values)
The timestamp check prevents older changes from overwriting newer data. This is crucial when changes arrive out of order or when replaying from an earlier replication slot position.
DELETE operations are naturally idempotent—deleting a non-existent row is a no-op:
def _handle_delete(self, cursor, change):
table = change['table']
pk_columns = change['primary_keys']
pk_values = [change['old_values'][change['columns'].index(pk)]
for pk in pk_columns]
where_clause = ' AND '.join([f"{pk} = %s" for pk in pk_columns])
sql = f"DELETE FROM {table} WHERE {where_clause}"
cursor.execute(sql, pk_values)
Designing Airflow DAGs for Continuous CDC
Traditional Airflow DAGs run on schedules—hourly, daily, or at specific times. CDC pipelines need a different approach that processes changes continuously while respecting Airflow’s task-based execution model. The solution is a continuously running DAG with short cycles.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(seconds=30),
}
dag = DAG(
'postgres_cdc_pipeline',
default_args=default_args,
description='Real-time CDC from Postgres',
schedule_interval='*/2 * * * *', # Every 2 minutes
catchup=False,
max_active_runs=1, # Prevent concurrent runs
tags=['cdc', 'postgres'],
)
# Capture changes from source
capture_changes = PostgresCDCOperator(
task_id='capture_customer_changes',
source_conn_id='source_postgres',
slot_name='airflow_cdc_slot',
publication_name='cdc_publication',
target_conn_id='staging_postgres',
tables=['customers', 'orders', 'order_items'],
dag=dag,
)
# Process and transform
def transform_changes(**context):
staging_hook = PostgresHook(postgres_conn_id='staging_postgres')
warehouse_hook = PostgresHook(postgres_conn_id='warehouse_postgres')
# Read from staging
staging_conn = staging_hook.get_conn()
staging_cur = staging_conn.cursor()
staging_cur.execute("""
SELECT table_name, operation, data, commit_timestamp
FROM cdc_staging
WHERE processed = false
ORDER BY commit_timestamp
""")
changes = staging_cur.fetchall()
# Apply business logic transformations
for change in changes:
# Enrich, validate, transform
transformed = apply_business_rules(change)
apply_to_warehouse(warehouse_hook, transformed)
# Mark as processed
staging_cur.execute("""
UPDATE cdc_staging
SET processed = true, processed_at = NOW()
WHERE processed = false
""")
staging_conn.commit()
transform_task = PythonOperator(
task_id='transform_and_load',
python_callable=transform_changes,
dag=dag,
)
# Define dependencies
capture_changes >> transform_task
The max_active_runs=1
setting is crucial—it prevents multiple DAG runs from processing the same replication slot simultaneously, which would cause conflicts. The catchup=False
prevents Airflow from backfilling missed runs when the scheduler restarts.
CDC Pipeline Patterns
Managing Replication Slots and WAL Growth
Replication slots are powerful but require careful management. A slot prevents PostgreSQL from discarding WAL segments, which means an inactive slot can cause WAL to grow indefinitely until disk space is exhausted. This is one of the most common operational issues with logical replication-based CDC.
Monitor replication lag using PostgreSQL’s system views:
SELECT
slot_name,
slot_type,
database,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag_size,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) as lag_bytes
FROM pg_replication_slots
WHERE slot_type = 'logical';
If lag grows beyond acceptable thresholds, investigate why your CDC pipeline isn’t consuming changes. Common causes include:
- Airflow scheduler or worker failures preventing DAG execution
- Target database performance issues slowing change application
- Network connectivity problems between Airflow and source database
- Insufficient resources on Airflow workers for CDC processing
Implement automated alerts when lag exceeds thresholds:
def check_replication_lag(**context):
hook = PostgresHook(postgres_conn_id='source_postgres')
result = hook.get_first("""
SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) as lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'airflow_cdc_slot'
""")
lag_bytes = result[0]
lag_mb = lag_bytes / (1024 * 1024)
# Alert if lag exceeds 1GB
if lag_mb > 1024:
send_alert(f"CDC replication lag is {lag_mb:.2f}MB")
# Push metric to monitoring system
push_metric('cdc.replication.lag_mb', lag_mb)
lag_check = PythonOperator(
task_id='check_replication_lag',
python_callable=check_replication_lag,
dag=dag,
)
In extreme cases where a slot has fallen too far behind, you may need to rebuild it. This involves dropping the old slot, creating a new one, and performing a fresh full-load of affected tables. Your DAG should detect this condition and automatically trigger full-load tasks when necessary.
Handling Schema Evolution and DDL Changes
Schema changes in the source database present challenges for CDC pipelines. When someone adds a column, changes a data type, or renames a table, your pipeline needs to adapt without breaking. PostgreSQL logical replication handles many schema changes gracefully, but DDL events themselves aren’t captured through replication slots.
One approach is to periodically compare source and target schemas:
def sync_schema(**context):
source_hook = PostgresHook(postgres_conn_id='source_postgres')
target_hook = PostgresHook(postgres_conn_id='target_postgres')
# Get source schema
source_schema = source_hook.get_records("""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'customers'
ORDER BY ordinal_position
""")
# Get target schema
target_schema = target_hook.get_records("""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'customers'
ORDER BY ordinal_position
""")
# Detect differences
source_cols = {col[0]: col[1] for col in source_schema}
target_cols = {col[0]: col[1] for col in target_schema}
# Add missing columns to target
for col_name, col_type in source_cols.items():
if col_name not in target_cols:
target_hook.run(f"""
ALTER TABLE customers
ADD COLUMN {col_name} {col_type}
""")
log.info(f"Added column {col_name} to target")
schema_sync = PythonOperator(
task_id='sync_schema',
python_callable=sync_schema,
dag=dag,
)
This task runs before CDC capture, ensuring the target schema is up-to-date. For more sophisticated schema evolution, consider maintaining a schema version table that tracks applied migrations, similar to database migration tools like Flyway or Liquibase.
Breaking schema changes—like dropping columns or changing column types—require more careful handling. You might need to version your pipeline code to handle both old and new schemas during a transition period, or implement a blue-green deployment strategy where you build a new target schema alongside the old one before cutting over.
Optimizing Performance and Resource Usage
CDC pipelines can strain system resources if not properly optimized. The source database must maintain WAL segments and decode them. Airflow workers must fetch, parse, and apply changes. The target database must handle continuous write load. Each component requires tuning for optimal performance.
On the PostgreSQL source, monitor WAL generation rate and replication slot resource usage. The pg_stat_replication
view shows active replication connections and their lag. If decoding becomes a bottleneck, consider batching change consumption—read larger chunks of changes less frequently rather than tiny amounts very frequently.
In Airflow, properly configure worker concurrency and queue management. CDC tasks should run on dedicated workers if possible, preventing them from competing with batch ETL jobs for resources. Use Airflow pools to limit concurrent CDC task execution and prevent overwhelming your target database.
Target database optimization focuses on write throughput. Batch changes into transactions to reduce commit overhead. Use appropriate indexes on join keys and lookup columns. Consider partitioning large target tables by date or another suitable key to improve write performance and enable efficient archival of historical data.
Conclusion
Building a real-time CDC pipeline with Airflow and PostgreSQL provides a robust, maintainable solution for data integration that balances near real-time requirements with operational practicality. PostgreSQL’s logical replication delivers reliable change capture with minimal overhead, while Airflow’s flexible orchestration handles the complexity of continuous processing, error handling, and monitoring within a familiar workflow framework.
The key to success lies in understanding the tradeoffs between latency, resource usage, and operational complexity. By implementing idempotent change application, managing replication slots carefully, and monitoring pipeline health comprehensively, you can build CDC pipelines that reliably synchronize data across systems while remaining maintainable and cost-effective as your data landscape evolves.