Modern applications generate data continuously, and the ability to process this data in real-time has become a competitive necessity rather than a luxury. Whether you’re building fraud detection systems, personalizing user experiences, or maintaining up-to-date analytics dashboards, traditional batch ETL processes that run overnight no longer meet business requirements. AWS Database Migration Service (DMS) combined with Amazon Kinesis provides a powerful architecture for streaming data changes from source databases to downstream systems with sub-second latency.
This combination solves a fundamental challenge: extracting change data from operational databases without impacting application performance, transforming it as needed, and loading it into analytics or operational data stores in real-time. While both DMS and Kinesis are individually powerful, their integration creates a flexible, scalable pipeline architecture that handles millions of database changes per hour while maintaining data consistency and enabling complex transformations.
Understanding the DMS and Kinesis Architecture
Before diving into implementation, understanding how these services work together clarifies design decisions and troubleshooting approaches.
The Role of AWS DMS in Real-Time CDC
AWS Database Migration Service is primarily known for database migrations, but its Change Data Capture (CDC) capabilities make it invaluable for real-time ETL. DMS connects to your source database—whether MySQL, PostgreSQL, Oracle, SQL Server, or others—and monitors the transaction log to capture every insert, update, and delete as it occurs.
Unlike application-level change tracking that requires code modifications, DMS operates at the database layer, capturing changes transparently without touching your application code. It handles the complex task of parsing database-specific transaction logs (MySQL binlogs, PostgreSQL WAL, Oracle redo logs) and converting them into a standardized format that downstream systems can consume.
The CDC process works continuously: DMS establishes a replication instance that maintains a persistent connection to your source database, reading the transaction log in real-time. As changes occur, DMS packages them into change records containing the operation type (INSERT, UPDATE, DELETE), the affected table, the new data values, and for updates, the before-image showing previous values.
Kinesis as the Streaming Buffer
Amazon Kinesis Data Streams serves as the critical buffer between DMS and downstream consumers. Rather than DMS writing directly to target databases or analytics systems, it publishes change records to Kinesis streams. This architecture provides several crucial benefits:
Decoupling: Multiple downstream consumers can process the same change stream independently. Your real-time analytics dashboard, machine learning feature store, and audit logging system all consume from the same Kinesis stream without coordinating with each other.
Durability and replay: Kinesis retains data for 1-365 days (configurable), enabling replay for disaster recovery or reprocessing. If a downstream system fails, you can restart from any point in the stream without requesting changes from the source database.
Backpressure handling: When downstream systems slow down, Kinesis buffers the data rather than creating backpressure on DMS or the source database. This prevents transient slowdowns from cascading into application impact.
Scalability: Kinesis scales to handle millions of records per second through sharding, allowing your pipeline to grow with your data volume without architectural changes.
Data Flow Through the Pipeline
The complete data flow follows this pattern:
- Application writes data to the source database
- Database commits the transaction to its transaction log
- DMS reads the transaction log entry
- DMS transforms the entry into a standardized change record
- DMS publishes the change record to Kinesis Data Stream
- Kinesis stores the record in a shard based on partition key
- Consumer applications read from Kinesis shards
- Consumers process and load data into target systems
This architecture typically achieves end-to-end latency of 1-5 seconds from database commit to data availability in target systems, depending on configuration and data volume.
Real-Time ETL Pipeline Architecture
Setting Up DMS for Change Data Capture
Configuring DMS correctly is critical for reliable, performant real-time ETL. Several configuration decisions significantly impact pipeline behavior.
Preparing Your Source Database
Before DMS can capture changes, the source database requires proper configuration. The requirements vary by database type, but common preparations include:
Enable transaction logging: For MySQL, enable binlog with row-based format. For PostgreSQL, enable logical replication. For SQL Server, enable CDC at the database level. Without proper logging configuration, DMS cannot capture changes.
Create a replication user: DMS needs database credentials with sufficient privileges to read transaction logs and query table schemas. Create a dedicated user with minimal necessary permissions rather than using admin credentials.
Configure retention: Transaction logs must be retained long enough for DMS to process them. If logs are purged before DMS reads them, you lose changes. For MySQL, set binlog retention to at least 24 hours. For PostgreSQL, configure appropriate WAL retention.
Creating the DMS Replication Instance
The replication instance is the compute resource that runs your CDC tasks. Size it based on expected throughput:
import boto3
dms = boto3.client('dms')
# Create replication instance
response = dms.create_replication_instance(
ReplicationInstanceIdentifier='realtime-cdc-instance',
ReplicationInstanceClass='dms.c5.xlarge', # Choose based on throughput needs
AllocatedStorage=100, # GB for staging and caching
VpcSecurityGroupIds=['sg-xxxxx'], # Must access source DB
AvailabilityZone='us-east-1a',
MultiAZ=True, # For production high availability
PubliclyAccessible=False,
EngineVersion='3.5.1'
)
Instance sizing considerations:
- c5.large: Up to 10,000 records/second, suitable for small to medium workloads
- c5.xlarge: Up to 30,000 records/second, typical for production workloads
- c5.4xlarge: 100,000+ records/second for high-volume scenarios
Monitor CPU and network utilization during initial testing to right-size your instance. Under-provisioning causes lag accumulation; over-provisioning wastes money.
Configuring Endpoints and Target
Define source and target endpoints. For Kinesis targets, you specify the stream and IAM role:
# Create source endpoint (MySQL example)
source_endpoint = dms.create_endpoint(
EndpointIdentifier='mysql-source',
EndpointType='source',
EngineName='mysql',
ServerName='prod-db.example.com',
Port=3306,
DatabaseName='production',
Username='dms_user',
Password='secure_password',
SslMode='require',
MySQLSettings={
'ServerTimezone': 'UTC',
'TargetDbType': 'specific-database'
}
)
# Create Kinesis target endpoint
target_endpoint = dms.create_endpoint(
EndpointIdentifier='kinesis-target',
EndpointType='target',
EngineName='kinesis',
KinesisSettings={
'StreamArn': 'arn:aws:kinesis:us-east-1:123456789012:stream/cdc-changes',
'MessageFormat': 'json', # or 'json-unformatted'
'ServiceAccessRoleArn': 'arn:aws:iam::123456789012:role/dms-kinesis-role',
'IncludeTransactionDetails': True,
'IncludePartitionValue': True,
'PartitionIncludeSchemaTable': True,
'IncludeTableAlterOperations': True,
'IncludeControlDetails': True
}
)
The KinesisSettings parameters control change record format and content. IncludeTransactionDetails adds transaction metadata useful for ensuring consistency in downstream processing. PartitionIncludeSchemaTable ensures records from the same table go to the same Kinesis shard, maintaining ordering within tables.
Creating and Configuring the Replication Task
The replication task defines what data to replicate and how:
# Define table mappings - what to replicate
table_mappings = {
"rules": [
{
"rule-type": "selection",
"rule-id": "1",
"rule-name": "include-users-table",
"object-locator": {
"schema-name": "production",
"table-name": "users"
},
"rule-action": "include"
},
{
"rule-type": "selection",
"rule-id": "2",
"rule-name": "include-orders-table",
"object-locator": {
"schema-name": "production",
"table-name": "orders"
},
"rule-action": "include"
},
{
"rule-type": "transformation",
"rule-id": "3",
"rule-name": "rename-users-table",
"rule-target": "table",
"object-locator": {
"schema-name": "production",
"table-name": "users"
},
"rule-action": "rename",
"value": "customers"
}
]
}
# Create the replication task
task = dms.create_replication_task(
ReplicationTaskIdentifier='realtime-cdc-task',
SourceEndpointArn=source_endpoint['Endpoint']['EndpointArn'],
TargetEndpointArn=target_endpoint['Endpoint']['EndpointArn'],
ReplicationInstanceArn='arn:aws:dms:us-east-1:123456789012:rep:xxxxx',
MigrationType='cdc', # CDC only, not full-load
TableMappings=json.dumps(table_mappings),
ReplicationTaskSettings=json.dumps({
'TargetMetadata': {
'SupportLobs': True,
'LobMaxSize': 32 # MB
},
'FullLoadSettings': {
'TargetTablePrepMode': 'DO_NOTHING'
},
'Logging': {
'EnableLogging': True,
'LogComponents': [
{'Id': 'SOURCE_CAPTURE', 'Severity': 'LOGGER_SEVERITY_DEFAULT'},
{'Id': 'TARGET_APPLY', 'Severity': 'LOGGER_SEVERITY_INFO'}
]
},
'ControlTablesSettings': {
'ControlSchema': 'dms_control',
'StatusTableEnabled': True,
'SuspendedTablesTableEnabled': True
},
'ChangeProcessingTuning': {
'BatchApplyTimeoutMin': 1,
'BatchApplyTimeoutMax': 30,
'BatchApplyMemoryLimit': 500,
'BatchSplitSize': 0,
'MinTransactionSize': 1000,
'CommitTimeout': 1,
'MemoryLimitTotal': 1024,
'MemoryKeepTime': 60,
'StatementCacheSize': 50
}
})
)
# Start the task
dms.start_replication_task(
ReplicationTaskArn=task['ReplicationTask']['ReplicationTaskArn'],
StartReplicationTaskType='start-replication'
)
Key configuration points:
MigrationType='cdc': Specifies ongoing change capture, not one-time migrationTableMappings: Controls which tables to replicate and any transformations to applyChangeProcessingTuning: Critical for performance—controls batching behavior and memory usageMinTransactionSize: Larger values improve throughput but increase latencyCommitTimeout: Lower values reduce latency but may reduce throughput
The tuning parameters require experimentation based on your specific workload characteristics—transaction sizes, change rates, and latency requirements.
Designing the Kinesis Stream Architecture
How you configure your Kinesis stream significantly impacts both performance and downstream consumer design.
Determining Shard Count
Kinesis shards are the unit of scalability. Each shard handles:
- 1 MB/second write throughput
- 2 MB/second read throughput
- 1,000 records/second writes
Calculate required shards based on peak throughput:
Example calculation: Your database generates 5,000 changes per second during peak hours. Average change record size is 2 KB.
- Data rate: 5,000 records × 2 KB = 10 MB/second
- Required shards: 10 MB/second ÷ 1 MB/second/shard = 10 shards minimum
Add 20-30% overhead for spikes and future growth. This scenario needs 12-13 shards.
Partition Key Strategy
The partition key determines which shard receives each record. DMS uses configurable partition keys:
Schema+Table: All changes to a table go to the same shard, maintaining table-level ordering. Good for downstream systems that need consistent table views.
Primary Key: Changes to the same record always go to the same shard, maintaining per-record ordering. Useful when tracking entity state changes.
Random: Distributes load evenly but sacrifices ordering guarantees. Appropriate when downstream consumers don’t require ordering.
Choose based on downstream requirements. If consumers need to maintain consistent views of entities or tables, use Schema+Table or Primary Key partitioning. If consumers process changes independently without ordering concerns, Random distribution maximizes throughput.
Data Retention Configuration
Kinesis retention determines how long data remains available for consumption:
aws kinesis create-stream \
--stream-name cdc-changes \
--shard-count 12 \
--retention-period-hours 168 # 7 days
Longer retention enables:
- Disaster recovery and replay
- Adding new consumers that process historical changes
- Debugging by examining past change records
The cost trade-off is minimal—extending retention from 24 hours to 7 days adds only marginal cost but dramatically improves operational flexibility.
Building Robust Consumer Applications
The consumer side of your pipeline transforms raw change records into business value. Well-designed consumers handle failures gracefully, process data efficiently, and maintain exactly-once semantics where required.
Consumer Pattern Selection
AWS provides multiple frameworks for consuming from Kinesis:
Kinesis Client Library (KCL): Handles shard assignment, checkpointing, and failover automatically. Best for applications needing at-least-once processing with built-in scalability.
Lambda with Kinesis trigger: Serverless consumption with automatic scaling. Ideal for transformations and routing that don’t require complex state management.
Kinesis Data Analytics: SQL-based real-time analytics directly on the stream. Useful for aggregations, windowing, and pattern detection.
Custom consumers: Using Kinesis API directly provides maximum control but requires implementing checkpointing and error handling.
Most production pipelines use KCL or Lambda depending on whether they need long-running stateful processing (KCL) or event-driven transformation (Lambda).
Processing DMS Change Records
DMS publishes change records in JSON format with a specific structure:
{
"data": {
"user_id": 12345,
"email": "user@example.com",
"created_at": "2024-01-15T10:30:00Z",
"updated_at": "2024-01-15T14:22:00Z"
},
"metadata": {
"timestamp": "2024-01-15T14:22:00.123456Z",
"record-type": "data",
"operation": "update",
"partition-key-type": "schema-table",
"schema-name": "production",
"table-name": "users",
"transaction-id": 98765
},
"before-image": {
"user_id": 12345,
"email": "old@example.com",
"created_at": "2024-01-15T10:30:00Z",
"updated_at": "2024-01-15T10:30:00Z"
}
}
Your consumer must:
- Parse the JSON structure
- Extract the operation type (insert/update/delete)
- Handle the before-image for updates (showing previous values)
- Process the data payload appropriately
- Handle errors and implement retry logic
Implementing Idempotent Processing
Stream processing requires handling duplicate records. Network issues, consumer failures, or reprocessing from checkpoints can cause records to be processed multiple times. Implement idempotency to ensure duplicate processing doesn’t corrupt data:
Transaction IDs: Include transaction or sequence numbers in target systems, rejecting duplicates.
Upsert operations: Use INSERT … ON CONFLICT UPDATE or MERGE statements that safely handle duplicates.
Deduplication windows: Track recently processed record IDs in a cache (Redis, DynamoDB) and skip duplicates within a time window.
The specific approach depends on your target system capabilities and consistency requirements.
Pipeline Design Best Practices
Monitoring and Operational Excellence
Production real-time ETL pipelines require comprehensive monitoring to maintain reliability and performance.
Critical Metrics to Track
DMS metrics:
- CDCLatencySource: Time lag between source database commit and DMS reading the change
- CDCLatencyTarget: Time lag between DMS reading and writing to Kinesis
- CDCIncomingChanges: Rate of changes being captured
- FullLoadThroughputRowsTarget: For initial loads
Alert when CDCLatency exceeds acceptable thresholds (typically 30-60 seconds indicates problems).
Kinesis metrics:
- IncomingRecords: Write rate to the stream
- IncomingBytes: Data volume
- GetRecords.IteratorAgeMilliseconds: Consumer lag
- WriteProvisionedThroughputExceeded: Throttling events
Consumer lag is critical—if iteratorAge grows continuously, consumers can’t keep up with incoming data rate.
Consumer metrics:
- Processing rate (records/second)
- Error rate and types
- Processing latency (time from Kinesis arrival to target system commit)
- Checkpoint age (how far behind the consumer is in the stream)
Handling Common Failure Scenarios
DMS task failures: Replication tasks can fail due to network issues, schema changes, or permission problems. Configure CloudWatch alarms on task status and implement automated restart procedures for transient failures.
Kinesis throttling: If write rate exceeds provisioned capacity, DMS experiences backpressure. Implement automatic shard splitting or use on-demand capacity mode for variable workloads.
Consumer errors: Dead letter queues (DLQ) capture records that fail processing after retries. Monitor DLQ depth and implement procedures for examining, fixing, and replaying failed records.
Schema evolution: Database schema changes can break consumers expecting specific structures. Implement schema validation in consumers and alerting when unexpected schemas appear.
Cost Optimization Strategies
Real-time ETL pipelines can incur significant costs. Optimization strategies include:
Right-size DMS instances: Monitor utilization and scale down if running at <40% CPU consistently.
Use Kinesis On-Demand: For unpredictable workloads, on-demand mode eliminates over-provisioning costs.
Consolidate low-volume streams: Multiple low-volume CDC tasks can share Kinesis streams using different partition keys.
Compress data: Enable compression in consumers to reduce Kinesis data storage costs.
Archive to S3: Use Kinesis Data Firehose to automatically archive stream data to S3 for compliance while reducing Kinesis retention periods.
Typical costs for a moderate pipeline (10 shards, 100GB daily data, 2-consumer setup) run $500-1000 monthly—significantly less than comparable commercial CDC solutions.
Conclusion
Building real-time ETL pipelines with AWS DMS and Kinesis provides a powerful, scalable architecture for streaming database changes to downstream systems with sub-second latency. The combination leverages DMS’s robust change data capture capabilities and Kinesis’s distributed streaming infrastructure to create pipelines that handle millions of changes per hour while maintaining data consistency and enabling multiple independent consumers. Success requires careful configuration of DMS replication tasks, appropriate Kinesis stream sizing and partitioning strategies, and well-designed consumer applications that handle failures gracefully.
The architecture’s flexibility makes it suitable for diverse use cases—real-time analytics, machine learning feature stores, audit logging, cache invalidation, and event-driven architectures. By following the patterns and practices outlined here—proper database preparation, thoughtful partition key selection, comprehensive monitoring, and robust error handling—you can build production-grade real-time ETL pipelines that deliver business value while maintaining operational reliability and controlling costs.