Building an ETL Pipeline Example with Databricks

Building an ETL pipeline in Databricks transforms raw data into actionable insights through a structured approach that leverages distributed computing, Delta Lake storage, and Python or SQL transformations. This guide walks through a complete ETL pipeline example, demonstrating practical implementation patterns that data engineers can adapt for their own projects. We’ll build a pipeline that processes e-commerce transaction data from raw CSV files through to analytics-ready aggregated tables.

Understanding the Pipeline Architecture

Before diving into code, understanding the architectural pattern guides implementation decisions. Our example pipeline follows the medallion architecture—a data design pattern that organizes data into bronze (raw), silver (cleansed), and gold (aggregated) layers. This layered approach provides clear data lineage, enables incremental processing, and separates concerns between ingestion, transformation, and business logic.

The pipeline processes e-commerce transaction data containing customer purchases, product information, and order details. Raw CSV files land in cloud storage daily, containing transactions from the previous day. Our ETL pipeline ingests these files, cleanses and validates the data, enriches it with dimension information, and produces aggregated metrics for business intelligence dashboards.

The complete data flow follows this pattern:

Bronze Layer: Ingest raw CSV files from cloud storage, preserving original structure and adding ingestion metadata. This layer maintains a complete historical record of all data received, enabling reprocessing if transformation logic changes.

Silver Layer: Apply data cleansing, type conversions, validation rules, and join with dimension tables. This layer produces clean, conformed data suitable for analytical queries but maintains detailed granularity.

Gold Layer: Aggregate data into business metrics and KPIs. These tables directly power dashboards and reports, optimized for query performance through pre-aggregation and appropriate partitioning.

This architecture balances several concerns: raw data preservation for auditability, incremental processing for efficiency, clear transformation stages for debugging, and optimized aggregations for query performance.

Setting Up the Environment and Data Sources

Every ETL pipeline begins with environment setup and understanding source data characteristics. In Databricks, this means configuring storage access, defining schemas, and establishing file paths that the pipeline will use.

First, configure access to cloud storage where raw files arrive. For AWS S3, this involves IAM roles or access keys; for Azure Blob Storage, account keys or managed identities:

# Configure storage access (AWS S3 example)
spark.conf.set("fs.s3a.access.key", dbutils.secrets.get("aws-secrets", "access-key"))
spark.conf.set("fs.s3a.secret.key", dbutils.secrets.get("aws-secrets", "secret-key"))

# Define storage paths
LANDING_ZONE = "s3a://company-data-lake/landing/transactions/"
BRONZE_PATH = "s3a://company-data-lake/bronze/transactions/"
SILVER_PATH = "s3a://company-data-lake/silver/transactions/"
GOLD_PATH = "s3a://company-data-lake/gold/transaction_metrics/"

Using dbutils.secrets keeps credentials secure rather than hardcoding sensitive information in notebooks. The path structure separates layers clearly, making data organization intuitive.

Define the expected schema for incoming transaction data. Explicit schemas prevent incorrect type inference and catch structural changes in source files:

from pyspark.sql.types import *

transaction_schema = StructType([
    StructField("transaction_id", StringType(), False),
    StructField("customer_id", LongType(), False),
    StructField("product_id", StringType(), False),
    StructField("quantity", IntegerType(), False),
    StructField("unit_price", DecimalType(10, 2), False),
    StructField("transaction_date", StringType(), False),
    StructField("payment_method", StringType(), True),
    StructField("discount_amount", DecimalType(10, 2), True),
    StructField("shipping_country", StringType(), True)
])

This schema defines not just types but also nullability constraints. Non-nullable fields (False) represent critical data that must exist in every record. The schema becomes a contract between source systems and the ETL pipeline.

Create dimension tables that will enrich transaction data. These slowly changing dimensions provide context for analytical queries:

# Create customer dimension table
customer_dim_data = [
    (1001, "Alice Johnson", "Premium", "2022-03-15"),
    (1002, "Bob Smith", "Standard", "2023-01-10"),
    (1003, "Carol White", "Premium", "2021-11-20"),
    (1004, "David Brown", "Standard", "2023-06-05")
]

customer_dim_df = spark.createDataFrame(
    customer_dim_data,
    ["customer_id", "customer_name", "customer_tier", "registration_date"]
)

customer_dim_df.write.format("delta").mode("overwrite").save(
    "s3a://company-data-lake/dimensions/customers"
)

# Create product dimension table
product_dim_data = [
    ("PROD-001", "Laptop", "Electronics", 1299.99),
    ("PROD-002", "Wireless Mouse", "Electronics", 29.99),
    ("PROD-003", "Office Chair", "Furniture", 399.99),
    ("PROD-004", "Desk Lamp", "Furniture", 79.99)
]

product_dim_df = spark.createDataFrame(
    product_dim_data,
    ["product_id", "product_name", "category", "list_price"]
)

product_dim_df.write.format("delta").mode("overwrite").save(
    "s3a://company-data-lake/dimensions/products"
)

These dimension tables establish reference data that enriches raw transactions with meaningful business context.

Medallion Architecture Data Flow

BRONZE
Raw ingestion • Preserve original structure • Add metadata • Full history
SILVER
Cleansing • Validation • Type conversion • Join dimensions • Quality rules
GOLD
Aggregation • Business metrics • KPIs • Optimized for queries

Implementing the Bronze Layer: Raw Data Ingestion

The bronze layer focuses on reliable data ingestion with minimal transformation. The goal is capturing everything exactly as received, providing a foundation for downstream processing and enabling reprocessing if transformation logic evolves.

Read raw CSV files from the landing zone, applying the defined schema to ensure structural consistency:

from pyspark.sql.functions import current_timestamp, input_file_name, col

# Read raw transaction files
raw_transactions_df = (
    spark.read
    .format("csv")
    .option("header", "true")
    .schema(transaction_schema)
    .load(LANDING_ZONE)
)

# Add ingestion metadata
bronze_transactions_df = (
    raw_transactions_df
    .withColumn("ingestion_timestamp", current_timestamp())
    .withColumn("source_file", input_file_name())
)

# Write to bronze layer as Delta table
(
    bronze_transactions_df
    .write
    .format("delta")
    .mode("append")
    .option("mergeSchema", "true")
    .save(BRONZE_PATH)
)

print(f"Bronze layer ingestion complete: {bronze_transactions_df.count()} records processed")

The mergeSchema option handles schema evolution gracefully. If source files add new columns, Delta Lake incorporates them automatically rather than failing. The append mode preserves historical data—every ingestion creates new records rather than overwriting previous loads.

Adding ingestion_timestamp and source_file columns provides crucial debugging information. When data quality issues emerge, these fields help trace problems back to specific source files and ingestion runs.

Implement basic validation to catch corrupted files early:

# Validate critical fields exist and contain data
validation_checks = {
    "has_transaction_id": bronze_transactions_df.filter(col("transaction_id").isNull()).count() == 0,
    "has_customer_id": bronze_transactions_df.filter(col("customer_id").isNull()).count() == 0,
    "has_positive_quantity": bronze_transactions_df.filter(col("quantity") <= 0).count() == 0
}

for check_name, passed in validation_checks.items():
    status = "PASSED" if passed else "FAILED"
    print(f"Validation {check_name}: {status}")

if not all(validation_checks.values()):
    raise Exception("Bronze layer validation failed - critical data quality issues detected")

These validation checks catch structural problems early. If critical fields are missing or contain impossible values, stopping the pipeline prevents propagating bad data downstream.

Building the Silver Layer: Data Cleansing and Enrichment

The silver layer transforms raw data into clean, validated, and enriched records suitable for analytical queries. This stage applies business rules, handles data quality issues, and joins with dimension tables to add context.

Read from the bronze layer and apply transformations:

from pyspark.sql.functions import to_timestamp, trim, upper, when, round

# Read bronze data
bronze_df = spark.read.format("delta").load(BRONZE_PATH)

# Apply cleansing transformations
cleansed_df = (
    bronze_df
    .withColumn("transaction_date", to_timestamp(col("transaction_date"), "yyyy-MM-dd HH:mm:ss"))
    .withColumn("payment_method", upper(trim(col("payment_method"))))
    .withColumn("shipping_country", upper(trim(col("shipping_country"))))
    .withColumn("total_amount", 
                round((col("quantity") * col("unit_price")) - col("discount_amount"), 2))
)

# Apply data quality rules - filter out invalid records
validated_df = (
    cleansed_df
    .filter(col("transaction_date").isNotNull())
    .filter(col("total_amount") > 0)
    .filter(col("quantity") > 0)
    .filter(col("payment_method").isin("CREDIT_CARD", "DEBIT_CARD", "PAYPAL", "BANK_TRANSFER"))
)

print(f"Records filtered out: {cleansed_df.count() - validated_df.count()}")

The cleansing logic standardizes data formats—uppercase country codes and payment methods ensure consistency across the dataset. Calculating total_amount derives a commonly needed field, avoiding repeated calculation in downstream queries.

Validation filtering removes records that violate business rules. Invalid payment methods or negative amounts indicate data corruption that shouldn’t propagate to analytical tables.

Enrich transactions with dimension data:

# Load dimension tables
customer_dim_df = spark.read.format("delta").load("s3a://company-data-lake/dimensions/customers")
product_dim_df = spark.read.format("delta").load("s3a://company-data-lake/dimensions/products")

# Join with customer dimension
enriched_with_customer = (
    validated_df
    .join(customer_dim_df, "customer_id", "left")
    .select(
        validated_df["*"],
        customer_dim_df["customer_name"],
        customer_dim_df["customer_tier"]
    )
)

# Join with product dimension
silver_transactions_df = (
    enriched_with_customer
    .join(product_dim_df, "product_id", "left")
    .select(
        enriched_with_customer["*"],
        product_dim_df["product_name"],
        product_dim_df["category"]
    )
)

# Write to silver layer
(
    silver_transactions_df
    .write
    .format("delta")
    .mode("overwrite")
    .partitionBy("transaction_date")
    .save(SILVER_PATH)
)

print(f"Silver layer created: {silver_transactions_df.count()} enriched records")

The left joins preserve transactions even when dimension records are missing, preventing data loss from incomplete reference data. The partitionBy clause organizes data by transaction date, optimizing queries that filter by date ranges.

Implement additional data quality checks on the enriched data:

# Check for orphaned records (transactions without matching dimensions)
orphaned_customers = silver_transactions_df.filter(col("customer_name").isNull()).count()
orphaned_products = silver_transactions_df.filter(col("product_name").isNull()).count()

print(f"Transactions without customer match: {orphaned_customers}")
print(f"Transactions without product match: {orphaned_products}")

if orphaned_customers > silver_transactions_df.count() * 0.05:
    print("WARNING: More than 5% of transactions lack customer information")

if orphaned_products > silver_transactions_df.count() * 0.05:
    print("WARNING: More than 5% of transactions lack product information")

These checks identify referential integrity issues. High orphan rates suggest problems with dimension table completeness or mismatched identifiers between systems.

Creating the Gold Layer: Business Aggregations

The gold layer produces aggregated metrics and KPIs optimized for analytical consumption. These tables power dashboards and reports, pre-computing common aggregations to ensure fast query performance.

Create daily sales metrics aggregated by product category:

from pyspark.sql.functions import sum, count, avg, date_trunc

# Read silver layer
silver_df = spark.read.format("delta").load(SILVER_PATH)

# Aggregate daily metrics by category
daily_category_metrics = (
    silver_df
    .withColumn("transaction_date_day", date_trunc("day", col("transaction_date")))
    .groupBy("transaction_date_day", "category")
    .agg(
        sum("total_amount").alias("total_revenue"),
        count("transaction_id").alias("transaction_count"),
        sum("quantity").alias("total_units_sold"),
        avg("total_amount").alias("avg_transaction_value"),
        count(col("customer_id").distinct()).alias("unique_customers")
    )
    .orderBy("transaction_date_day", "category")
)

# Write to gold layer
(
    daily_category_metrics
    .write
    .format("delta")
    .mode("overwrite")
    .partitionBy("transaction_date_day")
    .save(GOLD_PATH + "daily_category_metrics/")
)

This aggregation provides category-level insights into sales performance. The metrics support questions like “Which category generated the most revenue yesterday?” or “How many unique customers purchased electronics last week?”

Create customer-level metrics for segmentation analysis:

# Aggregate customer lifetime metrics
customer_lifetime_metrics = (
    silver_df
    .groupBy("customer_id", "customer_name", "customer_tier")
    .agg(
        sum("total_amount").alias("lifetime_revenue"),
        count("transaction_id").alias("total_transactions"),
        avg("total_amount").alias("avg_transaction_value"),
        max("transaction_date").alias("last_transaction_date"),
        count(when(col("transaction_date") >= date_trunc("month", current_timestamp()), 1)).alias("transactions_this_month")
    )
)

# Add calculated fields for business intelligence
customer_analytics = (
    customer_lifetime_metrics
    .withColumn("revenue_per_transaction", round(col("lifetime_revenue") / col("total_transactions"), 2))
    .withColumn("is_active_customer", col("transactions_this_month") > 0)
)

# Write customer analytics table
(
    customer_analytics
    .write
    .format("delta")
    .mode("overwrite")
    .save(GOLD_PATH + "customer_analytics/")
)

print(f"Customer analytics created for {customer_analytics.count()} customers")

Customer-level aggregations enable segmentation and retention analysis. Marketing teams use these metrics to identify high-value customers, target re-engagement campaigns, and measure customer health.

Gold Layer Table Specifications

Daily Category Metrics

Purpose: Track sales performance by product category and date

Grain: One row per category per day

Key Metrics: Total revenue, transaction count, units sold, average transaction value, unique customers

Partitioning: By transaction date for efficient date range queries

Customer Analytics

Purpose: Provide customer-level insights for segmentation and retention

Grain: One row per customer with lifetime metrics

Key Metrics: Lifetime revenue, transaction count, average order value, recency, monthly activity

Use Cases: Customer segmentation, churn prediction, marketing campaign targeting

Orchestrating the Complete Pipeline

The individual transformation steps come together through orchestration that executes them in proper sequence with error handling and logging. Databricks Workflows provides native orchestration, but you can also use external tools like Apache Airflow.

Create a main orchestration notebook that calls each transformation step:

# ETL Pipeline Orchestrator

import time
from datetime import datetime

def log_step(step_name, status, duration=None, record_count=None):
    """Log pipeline execution steps"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    message = f"[{timestamp}] {step_name}: {status}"
    if duration:
        message += f" (Duration: {duration:.2f}s)"
    if record_count:
        message += f" (Records: {record_count})"
    print(message)

# Pipeline execution
pipeline_start = time.time()
print("=" * 80)
print("Starting ETL Pipeline Execution")
print("=" * 80)

try:
    # Step 1: Bronze Layer Ingestion
    step_start = time.time()
    bronze_count = bronze_transactions_df.count()
    log_step("Bronze Layer Ingestion", "COMPLETED", time.time() - step_start, bronze_count)
    
    # Step 2: Silver Layer Transformation
    step_start = time.time()
    silver_count = silver_transactions_df.count()
    log_step("Silver Layer Transformation", "COMPLETED", time.time() - step_start, silver_count)
    
    # Step 3: Gold Layer Aggregation
    step_start = time.time()
    gold_count = daily_category_metrics.count()
    log_step("Gold Layer Aggregation", "COMPLETED", time.time() - step_start, gold_count)
    
    # Pipeline summary
    total_duration = time.time() - pipeline_start
    print("=" * 80)
    print(f"Pipeline execution completed successfully in {total_duration:.2f} seconds")
    print(f"Bronze records: {bronze_count}")
    print(f"Silver records: {silver_count}")
    print(f"Gold aggregations: {gold_count}")
    print("=" * 80)
    
except Exception as e:
    log_step("Pipeline Execution", f"FAILED: {str(e)}")
    raise

This orchestrator provides visibility into pipeline execution through structured logging. Each step reports completion status, duration, and record counts, helping identify performance bottlenecks or data volume anomalies.

Implement incremental processing for efficiency. Rather than reprocessing all historical data on every run, track what’s been processed:

# Incremental processing using watermarks
from delta.tables import DeltaTable

# Check if silver table exists and has data
if DeltaTable.isDeltaTable(spark, SILVER_PATH):
    # Get maximum processed transaction date
    max_processed_date = (
        spark.read.format("delta")
        .load(SILVER_PATH)
        .agg({"transaction_date": "max"})
        .collect()[0][0]
    )
    
    # Only process new data from bronze
    incremental_bronze_df = (
        spark.read.format("delta")
        .load(BRONZE_PATH)
        .filter(col("ingestion_timestamp") > max_processed_date)
    )
    
    print(f"Processing incremental data since {max_processed_date}")
else:
    # First run - process all data
    incremental_bronze_df = spark.read.format("delta").load(BRONZE_PATH)
    print("Processing full historical data (first run)")

Incremental processing dramatically reduces execution time for pipelines running on schedules. Only new data gets transformed, making the pipeline scalable as historical data volume grows.

Testing and Validation Strategies

Production pipelines require comprehensive testing to ensure data quality and transformation correctness. Implement multiple validation layers that catch issues before they impact downstream consumers.

Create data quality tests that validate expected properties:

def validate_silver_layer(silver_df):
    """Validate silver layer data quality"""
    tests = []
    
    # Test 1: No null transaction IDs
    null_ids = silver_df.filter(col("transaction_id").isNull()).count()
    tests.append(("No null transaction IDs", null_ids == 0))
    
    # Test 2: All amounts are positive
    negative_amounts = silver_df.filter(col("total_amount") <= 0).count()
    tests.append(("All positive amounts", negative_amounts == 0))
    
    # Test 3: Dates are within reasonable range
    future_dates = silver_df.filter(col("transaction_date") > current_timestamp()).count()
    old_dates = silver_df.filter(col("transaction_date") < "2020-01-01").count()
    tests.append(("Valid date range", future_dates == 0 and old_dates == 0))
    
    # Test 4: All payment methods are valid
    invalid_payments = silver_df.filter(
        ~col("payment_method").isin("CREDIT_CARD", "DEBIT_CARD", "PAYPAL", "BANK_TRANSFER")
    ).count()
    tests.append(("Valid payment methods", invalid_payments == 0))
    
    # Test 5: Customer tier exists for all records
    missing_tier = silver_df.filter(col("customer_tier").isNull()).count()
    tests.append(("Customer tier populated", missing_tier == 0))
    
    # Report results
    print("\nData Quality Test Results:")
    print("-" * 50)
    for test_name, passed in tests:
        status = "✓ PASSED" if passed else "✗ FAILED"
        print(f"{test_name}: {status}")
    
    all_passed = all(result for _, result in tests)
    return all_passed

# Run validation
if not validate_silver_layer(silver_transactions_df):
    raise Exception("Silver layer validation failed - review data quality issues")

These automated tests catch common data quality issues that might otherwise go unnoticed until causing problems in downstream reports or analytics.

Implement row count reconciliation to ensure no data loss during transformations:

# Count reconciliation across layers
bronze_count = spark.read.format("delta").load(BRONZE_PATH).count()
silver_count = spark.read.format("delta").load(SILVER_PATH).count()

records_filtered = bronze_count - silver_count
filter_rate = (records_filtered / bronze_count) * 100

print(f"\nReconciliation Report:")
print(f"Bronze records: {bronze_count}")
print(f"Silver records: {silver_count}")
print(f"Records filtered: {records_filtered} ({filter_rate:.2f}%)")

if filter_rate > 10:
    print("WARNING: More than 10% of records were filtered - investigate data quality issues")

Monitoring filter rates helps detect upstream data quality degradation. A sudden increase in filtered records signals problems requiring investigation.

Conclusion

Building an ETL pipeline in Databricks combines distributed computing power with Delta Lake’s reliability to create robust data transformation workflows. The medallion architecture provides a proven pattern that balances raw data preservation, incremental processing efficiency, and analytical optimization. By implementing bronze, silver, and gold layers with proper validation, testing, and orchestration, data engineers create pipelines that reliably transform raw data into business value.

The example pipeline demonstrates practical patterns applicable to diverse use cases—ingestion with schema enforcement, incremental processing for efficiency, dimension enrichment for context, and pre-aggregation for performance. These techniques scale from proof-of-concept projects to enterprise data platforms processing terabytes daily. Start with this foundation and adapt transformation logic, aggregation patterns, and orchestration complexity to match your specific requirements and organizational maturity.

Leave a Comment