How to Build End-to-End ML Pipelines with Airflow and DBT

Building production-ready machine learning pipelines requires orchestrating complex workflows that transform raw data into model predictions. Apache Airflow and dbt (data build tool) have emerged as a powerful combination for this task—Airflow handles workflow orchestration and dependency management, while dbt brings software engineering best practices to data transformation. Together, they enable teams to build maintainable, testable, and scalable end-to-end ML pipelines.

Understanding the Airflow and DBT Architecture

Before diving into implementation, it’s important to understand how Airflow and dbt complement each other in ML pipelines. Airflow excels at orchestration—scheduling tasks, managing dependencies, handling retries, and monitoring execution. DBT specializes in data transformation—writing modular SQL, testing data quality, and documenting lineage.

In a typical ML pipeline, raw data arrives in your data warehouse from various sources. DBT transforms this raw data through multiple layers—staging tables that clean and standardize, intermediate tables that join and aggregate, and final feature tables ready for ML consumption. Airflow orchestrates this entire process, triggering dbt runs at appropriate times, handling upstream dependencies like data extraction, and coordinating downstream tasks like model training.

This separation of concerns provides significant advantages. Data analysts can write dbt models using SQL they’re comfortable with, while data engineers configure Airflow DAGs to orchestrate complex workflows. Changes to transformation logic happen in dbt with built-in testing, while pipeline scheduling and dependency logic lives in Airflow’s Python-based DAG definitions.

The architecture also promotes modularity. DBT models are independent, testable units that can be developed and validated separately. Airflow tasks compose these units into complete workflows, adding additional steps like data extraction, model training, and prediction storage that fall outside dbt’s scope.

Setting Up Your Environment

Begin by establishing a proper project structure that separates Airflow and dbt concerns while enabling smooth integration. Create a directory layout that looks like this:

ml-pipeline/
├── airflow/
│   ├── dags/
│   │   └── ml_feature_pipeline.py
│   ├── plugins/
│   └── config/
├── dbt/
│   ├── models/
│   │   ├── staging/
│   │   ├── intermediate/
│   │   └── marts/
│   ├── tests/
│   ├── macros/
│   └── dbt_project.yml
├── models/
│   └── training_scripts/
└── requirements.txt

Install the necessary dependencies:

pip install apache-airflow==2.8.0
pip install dbt-core dbt-snowflake  # or dbt-postgres, dbt-bigquery, etc.
pip install apache-airflow-providers-dbt-cloud

Initialize your dbt project within the structure:

cd dbt
dbt init ml_features

Configure your dbt profiles.yml to connect to your data warehouse. This file typically lives in ~/.dbt/ and contains connection credentials:

ml_features:
  target: dev
  outputs:
    dev:
      type: snowflake
      account: your_account
      user: your_user
      password: your_password
      role: transformer
      database: ml_data
      warehouse: transform_wh
      schema: ml_features
      threads: 4

For Airflow, configure your connection to the data warehouse in the Airflow UI or through environment variables, enabling Airflow tasks to trigger dbt runs programmatically.

🏗️ Pipeline Architecture Components

📥
Data Ingestion

Airflow operators extract raw data into warehouse landing zone

🔄
DBT Transformation

Staged SQL models clean, join, and engineer features

🎯
Model Training

Python operators train models on transformed features

💾
Prediction Storage

Results written back to warehouse for serving

Building DBT Models for Feature Engineering

DBT models form the transformation layer of your ML pipeline. Organize models in layers that progressively refine data from raw inputs to ML-ready features.

Start with staging models that standardize raw data. These models handle basic cleaning, type casting, and column renaming:

-- models/staging/stg_transactions.sql
with source as (
    select * from {{ source('raw', 'transactions') }}
),

cleaned as (
    select
        transaction_id,
        customer_id,
        cast(amount as decimal(10,2)) as amount,
        cast(transaction_date as date) as transaction_date,
        lower(trim(category)) as category,
        -- Handle nulls explicitly
        coalesce(merchant_id, 'unknown') as merchant_id
    from source
    where transaction_date >= current_date - interval '2 years'
        and amount > 0  -- Filter invalid amounts
)

select * from cleaned

Intermediate models perform aggregations and joins that create derived metrics:

-- models/intermediate/int_customer_purchase_metrics.sql
with transactions as (
    select * from {{ ref('stg_transactions') }}
),

customer_metrics as (
    select
        customer_id,
        count(*) as total_transactions,
        sum(amount) as total_spend,
        avg(amount) as avg_transaction_value,
        max(transaction_date) as last_purchase_date,
        min(transaction_date) as first_purchase_date,
        count(distinct category) as distinct_categories,
        stddev(amount) as spend_volatility
    from transactions
    group by customer_id
)

select * from customer_metrics

Mart models create final feature tables optimized for ML consumption:

-- models/marts/ml_customer_features.sql
with customer_metrics as (
    select * from {{ ref('int_customer_purchase_metrics') }}
),

customer_demographics as (
    select * from {{ ref('stg_customers') }}
),

recency_features as (
    select
        customer_id,
        datediff(day, last_purchase_date, current_date) as days_since_last_purchase,
        datediff(day, first_purchase_date, current_date) as customer_tenure_days
    from customer_metrics
),

final as (
    select
        m.customer_id,
        -- Purchase behavior features
        m.total_transactions,
        m.total_spend,
        m.avg_transaction_value,
        m.distinct_categories,
        m.spend_volatility,
        -- Recency features
        r.days_since_last_purchase,
        r.customer_tenure_days,
        -- Frequency feature
        m.total_transactions / nullif(r.customer_tenure_days, 0) as purchase_frequency,
        -- Demographic features
        d.age,
        d.region,
        d.account_type,
        -- Metadata
        current_timestamp as feature_computed_at
    from customer_metrics m
    join recency_features r using (customer_id)
    join customer_demographics d using (customer_id)
)

select * from final

DBT’s ref() function creates dependencies between models. When you run dbt, it automatically determines the correct execution order based on these references.

Implement incremental models for large datasets to avoid reprocessing everything on each run:

-- models/marts/ml_customer_features.sql
{{ config(
    materialized='incremental',
    unique_key='customer_id',
    on_schema_change='fail'
) }}

with customer_metrics as (
    select * from {{ ref('int_customer_purchase_metrics') }}
    
    {% if is_incremental() %}
    -- Only process customers with recent transactions
    where customer_id in (
        select distinct customer_id 
        from {{ ref('stg_transactions') }}
        where transaction_date > (select max(feature_computed_at) from {{ this }})
    )
    {% endif %}
),

final as (
    -- feature computation logic
)

select * from final

Implementing Data Quality Tests in DBT

Testing is where dbt truly shines for ML pipelines. Data quality issues that slip into training data corrupt models, making systematic testing essential.

DBT provides built-in tests for common checks. Add tests to your schema.yml files:

# models/marts/schema.yml
version: 2

models:
  - name: ml_customer_features
    description: "Feature table for customer churn prediction model"
    columns:
      - name: customer_id
        description: "Unique customer identifier"
        tests:
          - unique
          - not_null
      
      - name: total_spend
        description: "Total customer spending"
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: ">= 0"
      
      - name: days_since_last_purchase
        description: "Days since customer's last purchase"
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: ">= 0 AND <= 730"  # Max 2 years
      
      - name: purchase_frequency
        description: "Average purchases per day"
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 10
              inclusive: true

Create custom tests for ML-specific validation. Generic tests can be reused across multiple models:

-- tests/generic/test_no_extreme_outliers.sql
{% test no_extreme_outliers(model, column_name, z_threshold=3) %}

with stats as (
    select
        avg({{ column_name }}) as mean_val,
        stddev({{ column_name }}) as stddev_val
    from {{ model }}
),

outliers as (
    select *
    from {{ model }}, stats
    where abs(({{ column_name }} - mean_val) / nullif(stddev_val, 0)) > {{ z_threshold }}
)

select * from outliers

{% endtest %}

Use this custom test in your schema definitions:

- name: avg_transaction_value
  tests:
    - no_extreme_outliers:
        z_threshold: 4

Implement data freshness checks to ensure pipelines run on schedule:

sources:
  - name: raw
    database: production_db
    tables:
      - name: transactions
        freshness:
          warn_after: {count: 12, period: hour}
          error_after: {count: 24, period: hour}
        loaded_at_field: ingested_at

Creating Airflow DAGs for ML Pipelines

Airflow DAGs orchestrate the entire ML workflow, coordinating dbt runs with data extraction, model training, and prediction tasks. Create a DAG that represents your complete pipeline:

# airflow/dags/ml_feature_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtRunOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import sys

default_args = {
    'owner': 'ml-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

def extract_raw_data(**context):
    """Extract data from source systems into warehouse."""
    # Implementation would use database connectors or APIs
    print(f"Extracting data for {context['ds']}")
    # Your extraction logic here
    return "extraction_complete"

def train_model(**context):
    """Train ML model using transformed features."""
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    import joblib
    
    # Read features from data warehouse
    # In production, use proper connectors
    features_query = "SELECT * FROM ml_customer_features WHERE split = 'train'"
    # df = pd.read_sql(features_query, connection)
    
    print(f"Training model for {context['ds']}")
    # Model training logic
    # model = RandomForestClassifier()
    # model.fit(X_train, y_train)
    # joblib.dump(model, f'/models/churn_model_{context["ds"]}.pkl')
    
    return "training_complete"

def generate_predictions(**context):
    """Generate predictions using trained model."""
    print(f"Generating predictions for {context['ds']}")
    # Load model
    # model = joblib.load('/models/churn_model.pkl')
    
    # Read inference features
    # predictions = model.predict(X_inference)
    
    # Write predictions back to warehouse
    return "predictions_complete"

with DAG(
    'ml_feature_pipeline',
    default_args=default_args,
    description='End-to-end ML pipeline with dbt feature engineering',
    schedule_interval='0 2 * * *',  # Run daily at 2 AM
    start_date=days_ago(1),
    catchup=False,
    tags=['ml', 'features', 'production'],
) as dag:
    
    # Extract raw data
    extract_task = PythonOperator(
        task_id='extract_raw_data',
        python_callable=extract_raw_data,
        provide_context=True,
    )
    
    # Run dbt to transform data and engineer features
    dbt_run = DbtRunOperator(
        task_id='dbt_run_features',
        project_dir='/path/to/dbt',
        profiles_dir='/path/to/.dbt',
        select='models/marts/',  # Run only mart models
        exclude='models/staging/',  # Already run staging
    )
    
    # Test dbt models
    dbt_test = DbtRunOperator(
        task_id='dbt_test_features',
        project_dir='/path/to/dbt',
        profiles_dir='/path/to/.dbt',
        select='models/marts/',
        do_xcom_push=True,
    )
    
    # Train model
    train_task = PythonOperator(
        task_id='train_ml_model',
        python_callable=train_model,
        provide_context=True,
    )
    
    # Generate predictions
    predict_task = PythonOperator(
        task_id='generate_predictions',
        python_callable=generate_predictions,
        provide_context=True,
    )
    
    # Define dependencies
    extract_task >> dbt_run >> dbt_test >> train_task >> predict_task

For more complex workflows, use task groups to organize related tasks:

from airflow.utils.task_group import TaskGroup

with TaskGroup('feature_engineering', tooltip='DBT feature pipeline') as feature_group:
    dbt_staging = DbtRunOperator(
        task_id='run_staging',
        select='models/staging/',
    )
    
    dbt_intermediate = DbtRunOperator(
        task_id='run_intermediate',
        select='models/intermediate/',
    )
    
    dbt_marts = DbtRunOperator(
        task_id='run_marts',
        select='models/marts/',
    )
    
    dbt_staging >> dbt_intermediate >> dbt_marts

extract_task >> feature_group >> train_task

💡 Best Practices for Airflow + DBT Pipelines

Separate Concerns Cleanly

Keep transformation logic in dbt models and orchestration logic in Airflow DAGs. Don’t mix SQL transformations in Python operators.

Use Incremental Models Wisely

Implement incremental processing for large tables to reduce compute costs, but ensure logic handles late-arriving data correctly.

Test Rigorously

Run dbt tests in your Airflow DAG and fail the pipeline if tests don’t pass. Never train models on unvalidated data.

Version Everything

Store dbt projects and Airflow DAGs in Git. Tag releases when deploying to production for full reproducibility.

Monitor Pipeline Health

Set up alerts for DAG failures, dbt test failures, and data freshness issues. Track execution times to detect performance degradation.

Handling Dynamic Dependencies and Branching

Real ML pipelines often require conditional logic—training only if new data exceeds a threshold, retraining when performance degrades, or running different feature sets for different model versions.

Implement branching in Airflow using BranchPythonOperator:

from airflow.operators.python import BranchPythonOperator

def check_data_volume(**context):
    """Decide whether to train based on data volume."""
    # Check if sufficient new data arrived
    new_records = get_new_record_count()
    
    if new_records > 1000:
        return 'train_ml_model'
    else:
        return 'skip_training'

branch_task = BranchPythonOperator(
    task_id='check_training_criteria',
    python_callable=check_data_volume,
    provide_context=True,
)

skip_task = PythonOperator(
    task_id='skip_training',
    python_callable=lambda: print("Insufficient data for training"),
)

branch_task >> [train_task, skip_task]

Use Airflow’s XCom to pass information between tasks:

def run_dbt_and_capture_metrics(**context):
    """Run dbt and capture row counts."""
    # Run dbt
    result = subprocess.run(['dbt', 'run'], capture_output=True)
    
    # Get row count from transformed table
    row_count = get_table_row_count('ml_customer_features')
    
    # Push to XCom
    context['ti'].xcom_push(key='feature_row_count', value=row_count)
    
    return row_count

def train_with_metadata(**context):
    """Train model using metadata from previous task."""
    ti = context['ti']
    row_count = ti.xcom_pull(task_ids='dbt_run_features', key='feature_row_count')
    
    print(f"Training on {row_count} feature records")
    # Training logic

Implementing CI/CD for Your Pipeline

Production ML pipelines require proper CI/CD to ensure changes don’t break existing functionality. Set up automated testing for both dbt and Airflow components.

For dbt, create a CI workflow that runs on pull requests:

# .github/workflows/dbt-ci.yml
name: DBT CI
on: [pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      
      - name: Set up Python
        uses: actions/setup-python@v2
        with:
          python-version: 3.9
      
      - name: Install dbt
        run: pip install dbt-snowflake
      
      - name: Run dbt models
        run: |
          cd dbt
          dbt run --profiles-dir .
      
      - name: Run dbt tests
        run: |
          cd dbt
          dbt test --profiles-dir .

For Airflow DAGs, implement unit tests:

# tests/test_dags.py
import pytest
from airflow.models import DagBag

def test_dag_loaded():
    """Test that DAG loads without errors."""
    dagbag = DagBag(dag_folder='airflow/dags/', include_examples=False)
    assert len(dagbag.import_errors) == 0, "DAG import errors detected"

def test_dag_structure():
    """Test DAG structure and dependencies."""
    dagbag = DagBag(dag_folder='airflow/dags/', include_examples=False)
    dag = dagbag.get_dag('ml_feature_pipeline')
    
    assert len(dag.tasks) == 5, "Expected 5 tasks in DAG"
    
    # Test specific dependencies
    extract_task = dag.get_task('extract_raw_data')
    dbt_task = dag.get_task('dbt_run_features')
    
    assert dbt_task in extract_task.downstream_list

Monitoring and Observability

Production pipelines require comprehensive monitoring. Implement logging and metrics collection at key points:

from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

def on_failure_callback(context):
    """Send alert on task failure."""
    slack_alert = SlackWebhookOperator(
        task_id='slack_alert',
        http_conn_id='slack_webhook',
        message=f"""
        :red_circle: Task Failed
        *Task*: {context.get('task_instance').task_id}
        *DAG*: {context.get('task_instance').dag_id}
        *Execution Time*: {context.get('execution_date')}
        *Log*: {context.get('task_instance').log_url}
        """,
    )
    return slack_alert.execute(context=context)

# Add to default_args
default_args['on_failure_callback'] = on_failure_callback

Track data quality metrics over time:

def log_data_quality_metrics(**context):
    """Log metrics for monitoring."""
    import json
    
    metrics = {
        'timestamp': context['ts'],
        'dag_id': context['dag'].dag_id,
        'row_count': get_row_count(),
        'null_percentage': calculate_null_percentage(),
        'mean_feature_value': calculate_mean_features(),
    }
    
    # Write to monitoring system (e.g., CloudWatch, Datadog)
    print(json.dumps(metrics))
    
monitoring_task = PythonOperator(
    task_id='log_metrics',
    python_callable=log_data_quality_metrics,
)

Conclusion

Building end-to-end ML pipelines with Airflow and dbt combines the strengths of both tools—dbt’s transformation capabilities and testing framework with Airflow’s orchestration power. This architecture enables teams to build maintainable pipelines where data transformations are version-controlled, tested, and documented, while complex workflows coordinate smoothly across extraction, transformation, training, and prediction.

The key to success lies in proper separation of concerns, comprehensive testing, and treating your data pipeline as production code deserving the same engineering rigor as your ML models. By following these patterns and best practices, you can build reliable ML pipelines that scale from prototype to production, handle failures gracefully, and evolve as your ML systems mature.

Leave a Comment