End-to-End Streaming Architecture with Kinesis and Glue

Modern applications generate continuous streams of data—clickstream events from websites, IoT sensor readings, transaction logs, application metrics, and real-time user interactions—that demand immediate processing and analysis to extract timely insights. Building robust streaming architectures that ingest, transform, and analyze this data at scale while maintaining reliability and cost-efficiency presents significant engineering challenges that Amazon Web Services addresses through the combination of Kinesis and Glue. Amazon Kinesis provides a fully managed platform for collecting, processing, and analyzing streaming data in real-time, while AWS Glue offers serverless ETL capabilities that transform raw streaming data into analysis-ready formats. Together, these services enable end-to-end streaming pipelines that capture data as it’s generated, apply transformations and enrichments, and deliver results to analytics platforms or data lakes within seconds. This comprehensive guide explores building production-grade streaming architectures using Kinesis and Glue, covering ingestion patterns, stream processing, data transformation, schema evolution, and operational best practices that ensure your streaming pipeline scales reliably while delivering actionable insights from real-time data.

Understanding the Streaming Data Pipeline Components

Before architecting the complete solution, understanding how Kinesis and Glue components fit together establishes the foundation for effective design decisions.

Amazon Kinesis Services Overview

Amazon Kinesis comprises several services, each addressing specific streaming use cases:

Kinesis Data Streams provides the foundation for custom real-time data pipelines. It captures streaming data and makes it available to consumer applications for processing. Data Streams acts as a highly durable, scalable message queue where producers write records and consumers read them in order within each shard.

Key characteristics include:

  • Retention: Data persists for 24 hours by default, extendable to 365 days
  • Ordering: Records within a shard maintain strict ordering based on arrival sequence
  • Shards: The throughput unit in Kinesis—each shard supports 1MB/sec input and 2MB/sec output
  • Partition keys: Determine which shard receives each record, enabling controlled data distribution

Kinesis Data Firehose simplifies delivery of streaming data to destinations like S3, Redshift, Elasticsearch, or third-party services. Firehose handles batching, compression, and transformation automatically, making it ideal when you need simple, reliable delivery without complex processing logic.

Firehose benefits:

  • Fully managed: No infrastructure to manage or scale
  • Automatic batching: Buffers data and delivers in optimal batch sizes
  • Built-in transformation: Can invoke Lambda functions to transform data in-flight
  • Direct integrations: Native connectors to common AWS services

Kinesis Data Analytics enables SQL-based analysis of streaming data without managing servers. While powerful for certain use cases, this guide focuses on Glue for transformation, which offers more flexibility for complex ETL scenarios.

AWS Glue in Streaming Context

AWS Glue traditionally served batch ETL workloads but now supports streaming through Glue Streaming ETL jobs:

Glue Streaming Jobs consume data from Kinesis Data Streams or Kafka, apply transformations using Apache Spark Structured Streaming, and write results to various destinations. These jobs run continuously, processing micro-batches of streaming data.

Streaming capabilities include:

  • Exactly-once processing: Ensures each record processes exactly once despite failures
  • Stateful transformations: Maintains state across micro-batches for aggregations or windowing
  • Schema evolution: Adapts to changing data schemas automatically
  • Built-in bookmarking: Tracks processing progress for recovery after failures

Glue Data Catalog provides centralized metadata management, defining schemas for streaming data that multiple services reference. This catalog becomes the single source of truth for data structure, ensuring consistency across ingestion, processing, and analysis.

🔄 Kinesis and Glue Architecture Components

📥
Ingestion Layer
Kinesis Data Streams – Captures real-time data from producers, maintains ordering, provides durable buffer with configurable retention
⚙️
Processing Layer
Glue Streaming ETL – Transforms data using Spark, handles schema evolution, maintains state for aggregations
💾
Storage Layer
S3 + Glue Catalog – Stores processed data in optimized formats (Parquet), catalogs schemas for querying
📊
Analytics Layer
Athena / Redshift – Queries processed data for insights, dashboards, and reporting

Setting Up the Ingestion Layer with Kinesis Data Streams

The ingestion layer must reliably capture high-volume streaming data from diverse sources while providing the durability and ordering guarantees downstream processing requires.

Creating and Configuring Kinesis Data Streams

Provisioning the stream requires careful capacity planning based on expected throughput:

import boto3

kinesis = boto3.client('kinesis')

# Create stream with appropriate shard count
response = kinesis.create_stream(
    StreamName='application-events',
    ShardCount=10,  # Based on expected throughput
    StreamModeDetails={
        'StreamMode': 'PROVISIONED'  # or 'ON_DEMAND' for automatic scaling
    }
)

# Enable enhanced monitoring for detailed metrics
kinesis.enable_enhanced_monitoring(
    StreamName='application-events',
    ShardLevelMetrics=[
        'IncomingBytes',
        'IncomingRecords',
        'OutgoingBytes',
        'OutgoingRecords',
        'WriteProvisionedThroughputExceeded',
        'ReadProvisionedThroughputExceeded'
    ]
)

# Configure retention period for replay capability
kinesis.increase_stream_retention_period(
    StreamName='application-events',
    RetentionPeriodHours=168  # 7 days for debugging and recovery
)

Shard count calculation determines stream capacity:

  • Each shard handles 1MB/sec or 1,000 records/sec input (whichever is reached first)
  • Each shard provides 2MB/sec output across all consumers
  • For 50MB/sec expected input, provision at least 50 shards with headroom for spikes

On-demand mode simplifies capacity management by automatically scaling shards based on workload. This mode costs more per GB but eliminates capacity planning and shard splitting/merging operations. Use on-demand for unpredictable workloads or rapid prototyping.

Implementing Efficient Producers

Producer patterns significantly impact throughput, latency, and costs:

from aws_kinesis_agg.aggregator import RecordAggregator

def efficient_kinesis_producer(records):
    """Batch and aggregate records for efficient ingestion"""
    kinesis = boto3.client('kinesis')
    aggregator = RecordAggregator()
    
    for record in records:
        partition_key = record['user_id']  # Ensure same user goes to same shard
        data = json.dumps(record).encode('utf-8')
        
        # Aggregate multiple small records into single Kinesis record
        aggregator.add_user_record(partition_key, data)
        
        # Flush when aggregated record approaches 1MB
        if aggregator.get_size() > 950000:  # Leave margin
            send_aggregated_record(kinesis, aggregator)
            aggregator.clear()
    
    # Flush remaining records
    if aggregator.get_num_user_records() > 0:
        send_aggregated_record(kinesis, aggregator)

def send_aggregated_record(kinesis, aggregator):
    """Send aggregated record to Kinesis"""
    pk, data = aggregator.get_aggregated_record()
    
    kinesis.put_record(
        StreamName='application-events',
        Data=data,
        PartitionKey=pk
    )

Batching with PutRecords reduces API calls and improves throughput:

  • Batch up to 500 records per PutRecords request
  • Reduce per-request overhead and improve cost-efficiency
  • Implement retry logic for failed records in the batch

Partition key strategy determines data distribution:

  • Use high-cardinality keys (user IDs, session IDs) for even distribution
  • Avoid low-cardinality keys (fixed categories) that cause hot shards
  • Consistent hashing ensures related records route to the same shard for ordering

Monitoring and Scaling the Stream

CloudWatch metrics provide visibility into stream health:

Critical metrics to monitor:

  • IncomingBytes/IncomingRecords: Track input rate against shard capacity
  • WriteProvisionedThroughputExceeded: Indicates insufficient capacity
  • IteratorAge: Measures processing lag—high values signal consumer problems
  • PutRecords.Success: Track percentage of successful writes

Automatic scaling adjusts capacity based on metrics:

# Lambda function for automatic shard scaling
def scale_kinesis_stream(event, context):
    """Scale stream based on throughput metrics"""
    cloudwatch = boto3.client('cloudwatch')
    kinesis = boto3.client('kinesis')
    
    # Get current throughput metrics
    metrics = cloudwatch.get_metric_statistics(
        Namespace='AWS/Kinesis',
        MetricName='IncomingBytes',
        Dimensions=[{'Name': 'StreamName', 'Value': 'application-events'}],
        StartTime=datetime.now() - timedelta(minutes=5),
        EndTime=datetime.now(),
        Period=300,
        Statistics=['Average']
    )
    
    avg_throughput = metrics['Datapoints'][0]['Average'] if metrics['Datapoints'] else 0
    current_shards = get_shard_count('application-events')
    capacity_utilization = avg_throughput / (current_shards * 1024 * 1024)  # MB per shard
    
    # Scale if utilization exceeds 70%
    if capacity_utilization > 0.7:
        new_shard_count = int(current_shards * 1.5)
        update_shard_count('application-events', new_shard_count)

Building the Processing Layer with Glue Streaming ETL

The processing layer transforms raw streaming data into structured, analysis-ready formats while handling schema evolution and data quality issues.

Creating Glue Streaming Jobs

Glue Streaming jobs consume from Kinesis and apply transformations:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame, functions as F
from awsglue.streaming import KinesisStreamingDataFrame

# Initialize Glue context
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Read from Kinesis Data Stream
kinesis_options = {
    "streamARN": "arn:aws:kinesis:us-east-1:123456789012:stream/application-events",
    "startingPosition": "TRIM_HORIZON",  # or "LATEST" for new data only
    "inferSchema": "true",
    "classification": "json"
}

streaming_df = glueContext.create_data_frame.from_options(
    connection_type="kinesis",
    connection_options=kinesis_options,
    transformation_ctx="streaming_df"
)

# Apply transformations
def process_micro_batch(data_frame, batch_id):
    """Transform each micro-batch of streaming data"""
    if data_frame.count() > 0:
        # Parse nested JSON
        parsed_df = data_frame.selectExpr("CAST(data as STRING) as json_data")
        
        # Extract fields
        events_df = parsed_df.select(
            F.get_json_object('json_data', '$.user_id').alias('user_id'),
            F.get_json_object('json_data', '$.event_type').alias('event_type'),
            F.get_json_object('json_data', '$.timestamp').alias('event_timestamp'),
            F.get_json_object('json_data', '$.properties').alias('properties')
        )
        
        # Add processing metadata
        enriched_df = events_df.withColumn('processing_time', F.current_timestamp()) \
                               .withColumn('batch_id', F.lit(batch_id))
        
        # Write to S3 in Parquet format partitioned by date
        enriched_df.write \
            .format("parquet") \
            .partitionBy("event_type", "year", "month", "day") \
            .mode("append") \
            .save("s3://your-bucket/processed-events/")

# Process streaming data
glueContext.forEachBatch(
    frame=streaming_df,
    batch_function=process_micro_batch,
    options={
        "windowSize": "30 seconds",
        "checkpointLocation": "s3://your-bucket/checkpoints/"
    }
)

job.commit()

Implementing Data Quality and Validation

Schema enforcement ensures data conforms to expected structure:

from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType

# Define expected schema
expected_schema = StructType([
    StructField("user_id", StringType(), nullable=False),
    StructField("event_type", StringType(), nullable=False),
    StructField("timestamp", TimestampType(), nullable=False),
    StructField("properties", StringType(), nullable=True)
])

def validate_and_clean(df):
    """Validate schema and handle data quality issues"""
    
    # Filter out records with null required fields
    valid_df = df.filter(
        (F.col('user_id').isNotNull()) & 
        (F.col('event_type').isNotNull()) & 
        (F.col('timestamp').isNotNull())
    )
    
    # Standardize event types
    valid_df = valid_df.withColumn(
        'event_type',
        F.lower(F.trim(F.col('event_type')))
    )
    
    # Validate timestamp ranges
    valid_df = valid_df.filter(
        (F.col('timestamp') >= F.lit('2020-01-01')) &
        (F.col('timestamp') <= F.current_timestamp())
    )
    
    # Log invalid records for investigation
    invalid_df = df.subtract(valid_df)
    if invalid_df.count() > 0:
        invalid_df.write.json("s3://your-bucket/invalid-records/")
    
    return valid_df

Handling Late-Arriving Data

Watermarking manages late data in windowed aggregations:

# Aggregate events with watermarking for late data
windowed_aggregates = streaming_df \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(
        F.window("event_timestamp", "5 minutes"),
        "event_type"
    ) \
    .agg(
        F.count("*").alias("event_count"),
        F.countDistinct("user_id").alias("unique_users")
    )

Watermarking allows data up to 10 minutes late while preventing unbounded state growth. Records arriving more than 10 minutes late are dropped.

Optimizing Storage and Making Data Queryable

Processed streaming data must be stored efficiently and made available for analytics with minimal latency.

Partitioning Strategy for S3

Effective partitioning dramatically improves query performance:

# Partition by date and event type for efficient filtering
enriched_df.write \
    .format("parquet") \
    .partitionBy("year", "month", "day", "event_type") \
    .mode("append") \
    .option("compression", "snappy") \
    .save("s3://your-bucket/events/")

# Results in structure:
# s3://your-bucket/events/
#   year=2024/
#     month=11/
#       day=16/
#         event_type=page_view/
#           part-00000.snappy.parquet
#         event_type=button_click/
#           part-00000.snappy.parquet

Partitioning best practices:

  • Limit partition depth to 3-4 levels to avoid excessive directories
  • Choose partition keys based on common query filters
  • Avoid high-cardinality partitions (like user_id) that create millions of partitions
  • Use Hive-style partitioning (key=value format) for compatibility

File Size Optimization

Small file problem plagues streaming architectures writing frequently:

# Configure file size and compaction
glueContext.forEachBatch(
    frame=streaming_df,
    batch_function=process_micro_batch,
    options={
        "windowSize": "5 minutes",  # Larger windows = fewer, larger files
        "checkpointLocation": "s3://your-bucket/checkpoints/",
        "spark.sql.files.maxRecordsPerFile": 100000,  # Target records per file
        "spark.sql.files.maxPartitionBytes": 134217728  # 128 MB target file size
    }
)

# Periodic compaction job to merge small files
def compact_partitions():
    """Compact small files in each partition"""
    df = spark.read.parquet("s3://your-bucket/events/")
    
    df.repartition(10)  # Consolidate into larger files \
      .write \
      .format("parquet") \
      .partitionBy("year", "month", "day", "event_type") \
      .mode("overwrite") \
      .save("s3://your-bucket/events-compacted/")

Cataloging Data with Glue Data Catalog

Automated catalog updates make streaming data queryable immediately:

# Enable automatic catalog updates in Glue job
glueContext.write_dynamic_frame.from_options(
    frame=dynamic_frame,
    connection_type="s3",
    connection_options={
        "path": "s3://your-bucket/events/",
        "partitionKeys": ["year", "month", "day", "event_type"]
    },
    format="parquet",
    format_options={
        "compression": "snappy"
    },
    transformation_ctx="write_events",
    catalog_database="streaming_data",
    catalog_table_name="events",
    additional_options={
        "enableUpdateCatalog": True,
        "updateBehavior": "UPDATE_IN_DATABASE",
        "partitionKeys": ["year", "month", "day", "event_type"]
    }
)

Enabling enableUpdateCatalog automatically adds new partitions to the Glue Data Catalog as streaming job writes data, making it immediately queryable through Athena or other services.

🔧 Architecture Best Practices

✓ Implement Exactly-Once Processing
Use Glue bookmarking and idempotent writes to ensure each record processes exactly once despite failures or retries
✓ Monitor Iterator Age
Track how far behind consumers lag. High iterator age indicates processing can’t keep up with ingestion rate
✓ Partition Strategically
Partition by dimensions commonly used in queries (date, event type) to minimize data scanned and reduce costs
✓ Handle Schema Evolution
Design for schema changes—add columns without breaking existing queries, version schemas, test compatibility
✓ Implement Dead Letter Queues
Capture records that fail processing repeatedly for investigation without blocking the pipeline
✓ Test Failure Recovery
Regularly test checkpoint recovery, simulate failures, verify data integrity after restarts

Operationalizing the Streaming Pipeline

Production streaming architectures require robust monitoring, alerting, and operational procedures to maintain reliability and performance.

Comprehensive Monitoring Strategy

Multi-layer monitoring provides visibility across the pipeline:

Kinesis monitoring:

  • IncomingBytes/Records: Track ingestion rate
  • IteratorAgeMilliseconds: Consumer lag indicator
  • WriteProvisionedThroughputExceeded: Capacity alerts
  • GetRecords.Success: Consumer health

Glue job monitoring:

  • Job duration and success rate
  • Bytes read from Kinesis and written to S3
  • Data freshness (time between event and processing)
  • DPU (Data Processing Unit) utilization

S3 and Athena monitoring:

  • Query performance and costs
  • Data volume growth rate
  • Partition count and file count per partition

Alerting and Incident Response

Critical alerts require immediate attention:

# CloudWatch alarm for high iterator age
alarm = cloudwatch.put_metric_alarm(
    AlarmName='kinesis-consumer-lag',
    ComparisonOperator='GreaterThanThreshold',
    EvaluationPeriods=2,
    MetricName='GetRecords.IteratorAgeMilliseconds',
    Namespace='AWS/Kinesis',
    Period=300,
    Statistic='Maximum',
    Threshold=300000,  # 5 minutes
    ActionsEnabled=True,
    AlarmActions=['arn:aws:sns:us-east-1:123456789012:streaming-alerts'],
    AlarmDescription='Consumer falling behind in processing Kinesis stream'
)

Runbook procedures for common issues:

  • High iterator age: Scale Glue job DPUs or investigate slow transformations
  • Write throughput exceeded: Add Kinesis shards or reduce producer rate
  • Job failures: Check logs, verify checkpoint integrity, test with recent data sample
  • Query performance degradation: Run compaction, update table statistics

Cost Optimization

Streaming costs accumulate across multiple services:

Kinesis costs:

  • Shard hour pricing: $0.015 per shard-hour
  • PUT payload units: $0.014 per million units (25KB per unit)
  • Extended retention: Additional $0.023 per shard-hour beyond 24 hours

Optimization strategies:

  • Use on-demand mode for variable workloads to avoid over-provisioning
  • Aggregate records before writing to reduce PUT costs
  • Right-size shard count based on actual throughput
  • Enable compression in Firehose for S3 delivery

Glue costs:

  • DPU-hour pricing: $0.44 per DPU-hour
  • Glue Data Catalog: $1 per million objects stored

Optimization approaches:

  • Start with minimal DPUs (2) and scale based on processing lag
  • Use spot instances for non-critical Glue jobs (up to 70% savings)
  • Consolidate small files to reduce catalog object count
  • Archive old partitions to cheaper storage classes

Conclusion

Building an end-to-end streaming architecture with Kinesis and Glue enables organizations to process real-time data at scale while maintaining the reliability, performance, and cost-efficiency that production systems demand. The combination of Kinesis’s robust ingestion and buffering capabilities with Glue’s flexible serverless ETL transforms raw streaming data into analysis-ready formats stored in S3 and cataloged for immediate querying through Athena, Redshift, or other analytics tools. Success requires careful attention to capacity planning, efficient data serialization, strategic partitioning, comprehensive monitoring, and operational procedures that handle failures gracefully while maintaining data integrity.

The architecture patterns and best practices covered here—from efficient producer implementations and schema evolution handling to partition optimization and cost management—provide a foundation for building streaming pipelines that scale from thousands to millions of events per second while delivering insights with minimal latency. As your streaming requirements evolve, this architecture flexibly adapts through Kinesis’s elastic scaling, Glue’s serverless execution model, and S3’s unlimited storage capacity, ensuring your data infrastructure grows alongside your business needs without requiring fundamental redesigns or migrations.

Leave a Comment