Building production-ready machine learning pipelines requires orchestrating complex workflows that transform raw data into model predictions. Apache Airflow and dbt (data build tool) have emerged as a powerful combination for this task—Airflow handles workflow orchestration and dependency management, while dbt brings software engineering best practices to data transformation. Together, they enable teams to build maintainable, testable, and scalable end-to-end ML pipelines.
Understanding the Airflow and DBT Architecture
Before diving into implementation, it’s important to understand how Airflow and dbt complement each other in ML pipelines. Airflow excels at orchestration—scheduling tasks, managing dependencies, handling retries, and monitoring execution. DBT specializes in data transformation—writing modular SQL, testing data quality, and documenting lineage.
In a typical ML pipeline, raw data arrives in your data warehouse from various sources. DBT transforms this raw data through multiple layers—staging tables that clean and standardize, intermediate tables that join and aggregate, and final feature tables ready for ML consumption. Airflow orchestrates this entire process, triggering dbt runs at appropriate times, handling upstream dependencies like data extraction, and coordinating downstream tasks like model training.
This separation of concerns provides significant advantages. Data analysts can write dbt models using SQL they’re comfortable with, while data engineers configure Airflow DAGs to orchestrate complex workflows. Changes to transformation logic happen in dbt with built-in testing, while pipeline scheduling and dependency logic lives in Airflow’s Python-based DAG definitions.
The architecture also promotes modularity. DBT models are independent, testable units that can be developed and validated separately. Airflow tasks compose these units into complete workflows, adding additional steps like data extraction, model training, and prediction storage that fall outside dbt’s scope.
Setting Up Your Environment
Begin by establishing a proper project structure that separates Airflow and dbt concerns while enabling smooth integration. Create a directory layout that looks like this:
ml-pipeline/
├── airflow/
│ ├── dags/
│ │ └── ml_feature_pipeline.py
│ ├── plugins/
│ └── config/
├── dbt/
│ ├── models/
│ │ ├── staging/
│ │ ├── intermediate/
│ │ └── marts/
│ ├── tests/
│ ├── macros/
│ └── dbt_project.yml
├── models/
│ └── training_scripts/
└── requirements.txt
Install the necessary dependencies:
pip install apache-airflow==2.8.0
pip install dbt-core dbt-snowflake # or dbt-postgres, dbt-bigquery, etc.
pip install apache-airflow-providers-dbt-cloud
Initialize your dbt project within the structure:
cd dbt
dbt init ml_features
Configure your dbt profiles.yml to connect to your data warehouse. This file typically lives in ~/.dbt/ and contains connection credentials:
ml_features:
target: dev
outputs:
dev:
type: snowflake
account: your_account
user: your_user
password: your_password
role: transformer
database: ml_data
warehouse: transform_wh
schema: ml_features
threads: 4
For Airflow, configure your connection to the data warehouse in the Airflow UI or through environment variables, enabling Airflow tasks to trigger dbt runs programmatically.
🏗️ Pipeline Architecture Components
Airflow operators extract raw data into warehouse landing zone
Staged SQL models clean, join, and engineer features
Python operators train models on transformed features
Results written back to warehouse for serving
Building DBT Models for Feature Engineering
DBT models form the transformation layer of your ML pipeline. Organize models in layers that progressively refine data from raw inputs to ML-ready features.
Start with staging models that standardize raw data. These models handle basic cleaning, type casting, and column renaming:
-- models/staging/stg_transactions.sql
with source as (
select * from {{ source('raw', 'transactions') }}
),
cleaned as (
select
transaction_id,
customer_id,
cast(amount as decimal(10,2)) as amount,
cast(transaction_date as date) as transaction_date,
lower(trim(category)) as category,
-- Handle nulls explicitly
coalesce(merchant_id, 'unknown') as merchant_id
from source
where transaction_date >= current_date - interval '2 years'
and amount > 0 -- Filter invalid amounts
)
select * from cleaned
Intermediate models perform aggregations and joins that create derived metrics:
-- models/intermediate/int_customer_purchase_metrics.sql
with transactions as (
select * from {{ ref('stg_transactions') }}
),
customer_metrics as (
select
customer_id,
count(*) as total_transactions,
sum(amount) as total_spend,
avg(amount) as avg_transaction_value,
max(transaction_date) as last_purchase_date,
min(transaction_date) as first_purchase_date,
count(distinct category) as distinct_categories,
stddev(amount) as spend_volatility
from transactions
group by customer_id
)
select * from customer_metrics
Mart models create final feature tables optimized for ML consumption:
-- models/marts/ml_customer_features.sql
with customer_metrics as (
select * from {{ ref('int_customer_purchase_metrics') }}
),
customer_demographics as (
select * from {{ ref('stg_customers') }}
),
recency_features as (
select
customer_id,
datediff(day, last_purchase_date, current_date) as days_since_last_purchase,
datediff(day, first_purchase_date, current_date) as customer_tenure_days
from customer_metrics
),
final as (
select
m.customer_id,
-- Purchase behavior features
m.total_transactions,
m.total_spend,
m.avg_transaction_value,
m.distinct_categories,
m.spend_volatility,
-- Recency features
r.days_since_last_purchase,
r.customer_tenure_days,
-- Frequency feature
m.total_transactions / nullif(r.customer_tenure_days, 0) as purchase_frequency,
-- Demographic features
d.age,
d.region,
d.account_type,
-- Metadata
current_timestamp as feature_computed_at
from customer_metrics m
join recency_features r using (customer_id)
join customer_demographics d using (customer_id)
)
select * from final
DBT’s ref() function creates dependencies between models. When you run dbt, it automatically determines the correct execution order based on these references.
Implement incremental models for large datasets to avoid reprocessing everything on each run:
-- models/marts/ml_customer_features.sql
{{ config(
materialized='incremental',
unique_key='customer_id',
on_schema_change='fail'
) }}
with customer_metrics as (
select * from {{ ref('int_customer_purchase_metrics') }}
{% if is_incremental() %}
-- Only process customers with recent transactions
where customer_id in (
select distinct customer_id
from {{ ref('stg_transactions') }}
where transaction_date > (select max(feature_computed_at) from {{ this }})
)
{% endif %}
),
final as (
-- feature computation logic
)
select * from final
Implementing Data Quality Tests in DBT
Testing is where dbt truly shines for ML pipelines. Data quality issues that slip into training data corrupt models, making systematic testing essential.
DBT provides built-in tests for common checks. Add tests to your schema.yml files:
# models/marts/schema.yml
version: 2
models:
- name: ml_customer_features
description: "Feature table for customer churn prediction model"
columns:
- name: customer_id
description: "Unique customer identifier"
tests:
- unique
- not_null
- name: total_spend
description: "Total customer spending"
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: days_since_last_purchase
description: "Days since customer's last purchase"
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0 AND <= 730" # Max 2 years
- name: purchase_frequency
description: "Average purchases per day"
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 10
inclusive: true
Create custom tests for ML-specific validation. Generic tests can be reused across multiple models:
-- tests/generic/test_no_extreme_outliers.sql
{% test no_extreme_outliers(model, column_name, z_threshold=3) %}
with stats as (
select
avg({{ column_name }}) as mean_val,
stddev({{ column_name }}) as stddev_val
from {{ model }}
),
outliers as (
select *
from {{ model }}, stats
where abs(({{ column_name }} - mean_val) / nullif(stddev_val, 0)) > {{ z_threshold }}
)
select * from outliers
{% endtest %}
Use this custom test in your schema definitions:
- name: avg_transaction_value
tests:
- no_extreme_outliers:
z_threshold: 4
Implement data freshness checks to ensure pipelines run on schedule:
sources:
- name: raw
database: production_db
tables:
- name: transactions
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
loaded_at_field: ingested_at
Creating Airflow DAGs for ML Pipelines
Airflow DAGs orchestrate the entire ML workflow, coordinating dbt runs with data extraction, model training, and prediction tasks. Create a DAG that represents your complete pipeline:
# airflow/dags/ml_feature_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtRunOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import sys
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def extract_raw_data(**context):
"""Extract data from source systems into warehouse."""
# Implementation would use database connectors or APIs
print(f"Extracting data for {context['ds']}")
# Your extraction logic here
return "extraction_complete"
def train_model(**context):
"""Train ML model using transformed features."""
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib
# Read features from data warehouse
# In production, use proper connectors
features_query = "SELECT * FROM ml_customer_features WHERE split = 'train'"
# df = pd.read_sql(features_query, connection)
print(f"Training model for {context['ds']}")
# Model training logic
# model = RandomForestClassifier()
# model.fit(X_train, y_train)
# joblib.dump(model, f'/models/churn_model_{context["ds"]}.pkl')
return "training_complete"
def generate_predictions(**context):
"""Generate predictions using trained model."""
print(f"Generating predictions for {context['ds']}")
# Load model
# model = joblib.load('/models/churn_model.pkl')
# Read inference features
# predictions = model.predict(X_inference)
# Write predictions back to warehouse
return "predictions_complete"
with DAG(
'ml_feature_pipeline',
default_args=default_args,
description='End-to-end ML pipeline with dbt feature engineering',
schedule_interval='0 2 * * *', # Run daily at 2 AM
start_date=days_ago(1),
catchup=False,
tags=['ml', 'features', 'production'],
) as dag:
# Extract raw data
extract_task = PythonOperator(
task_id='extract_raw_data',
python_callable=extract_raw_data,
provide_context=True,
)
# Run dbt to transform data and engineer features
dbt_run = DbtRunOperator(
task_id='dbt_run_features',
project_dir='/path/to/dbt',
profiles_dir='/path/to/.dbt',
select='models/marts/', # Run only mart models
exclude='models/staging/', # Already run staging
)
# Test dbt models
dbt_test = DbtRunOperator(
task_id='dbt_test_features',
project_dir='/path/to/dbt',
profiles_dir='/path/to/.dbt',
select='models/marts/',
do_xcom_push=True,
)
# Train model
train_task = PythonOperator(
task_id='train_ml_model',
python_callable=train_model,
provide_context=True,
)
# Generate predictions
predict_task = PythonOperator(
task_id='generate_predictions',
python_callable=generate_predictions,
provide_context=True,
)
# Define dependencies
extract_task >> dbt_run >> dbt_test >> train_task >> predict_task
For more complex workflows, use task groups to organize related tasks:
from airflow.utils.task_group import TaskGroup
with TaskGroup('feature_engineering', tooltip='DBT feature pipeline') as feature_group:
dbt_staging = DbtRunOperator(
task_id='run_staging',
select='models/staging/',
)
dbt_intermediate = DbtRunOperator(
task_id='run_intermediate',
select='models/intermediate/',
)
dbt_marts = DbtRunOperator(
task_id='run_marts',
select='models/marts/',
)
dbt_staging >> dbt_intermediate >> dbt_marts
extract_task >> feature_group >> train_task
💡 Best Practices for Airflow + DBT Pipelines
Keep transformation logic in dbt models and orchestration logic in Airflow DAGs. Don’t mix SQL transformations in Python operators.
Implement incremental processing for large tables to reduce compute costs, but ensure logic handles late-arriving data correctly.
Run dbt tests in your Airflow DAG and fail the pipeline if tests don’t pass. Never train models on unvalidated data.
Store dbt projects and Airflow DAGs in Git. Tag releases when deploying to production for full reproducibility.
Set up alerts for DAG failures, dbt test failures, and data freshness issues. Track execution times to detect performance degradation.
Handling Dynamic Dependencies and Branching
Real ML pipelines often require conditional logic—training only if new data exceeds a threshold, retraining when performance degrades, or running different feature sets for different model versions.
Implement branching in Airflow using BranchPythonOperator:
from airflow.operators.python import BranchPythonOperator
def check_data_volume(**context):
"""Decide whether to train based on data volume."""
# Check if sufficient new data arrived
new_records = get_new_record_count()
if new_records > 1000:
return 'train_ml_model'
else:
return 'skip_training'
branch_task = BranchPythonOperator(
task_id='check_training_criteria',
python_callable=check_data_volume,
provide_context=True,
)
skip_task = PythonOperator(
task_id='skip_training',
python_callable=lambda: print("Insufficient data for training"),
)
branch_task >> [train_task, skip_task]
Use Airflow’s XCom to pass information between tasks:
def run_dbt_and_capture_metrics(**context):
"""Run dbt and capture row counts."""
# Run dbt
result = subprocess.run(['dbt', 'run'], capture_output=True)
# Get row count from transformed table
row_count = get_table_row_count('ml_customer_features')
# Push to XCom
context['ti'].xcom_push(key='feature_row_count', value=row_count)
return row_count
def train_with_metadata(**context):
"""Train model using metadata from previous task."""
ti = context['ti']
row_count = ti.xcom_pull(task_ids='dbt_run_features', key='feature_row_count')
print(f"Training on {row_count} feature records")
# Training logic
Implementing CI/CD for Your Pipeline
Production ML pipelines require proper CI/CD to ensure changes don’t break existing functionality. Set up automated testing for both dbt and Airflow components.
For dbt, create a CI workflow that runs on pull requests:
# .github/workflows/dbt-ci.yml
name: DBT CI
on: [pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Install dbt
run: pip install dbt-snowflake
- name: Run dbt models
run: |
cd dbt
dbt run --profiles-dir .
- name: Run dbt tests
run: |
cd dbt
dbt test --profiles-dir .
For Airflow DAGs, implement unit tests:
# tests/test_dags.py
import pytest
from airflow.models import DagBag
def test_dag_loaded():
"""Test that DAG loads without errors."""
dagbag = DagBag(dag_folder='airflow/dags/', include_examples=False)
assert len(dagbag.import_errors) == 0, "DAG import errors detected"
def test_dag_structure():
"""Test DAG structure and dependencies."""
dagbag = DagBag(dag_folder='airflow/dags/', include_examples=False)
dag = dagbag.get_dag('ml_feature_pipeline')
assert len(dag.tasks) == 5, "Expected 5 tasks in DAG"
# Test specific dependencies
extract_task = dag.get_task('extract_raw_data')
dbt_task = dag.get_task('dbt_run_features')
assert dbt_task in extract_task.downstream_list
Monitoring and Observability
Production pipelines require comprehensive monitoring. Implement logging and metrics collection at key points:
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
def on_failure_callback(context):
"""Send alert on task failure."""
slack_alert = SlackWebhookOperator(
task_id='slack_alert',
http_conn_id='slack_webhook',
message=f"""
:red_circle: Task Failed
*Task*: {context.get('task_instance').task_id}
*DAG*: {context.get('task_instance').dag_id}
*Execution Time*: {context.get('execution_date')}
*Log*: {context.get('task_instance').log_url}
""",
)
return slack_alert.execute(context=context)
# Add to default_args
default_args['on_failure_callback'] = on_failure_callback
Track data quality metrics over time:
def log_data_quality_metrics(**context):
"""Log metrics for monitoring."""
import json
metrics = {
'timestamp': context['ts'],
'dag_id': context['dag'].dag_id,
'row_count': get_row_count(),
'null_percentage': calculate_null_percentage(),
'mean_feature_value': calculate_mean_features(),
}
# Write to monitoring system (e.g., CloudWatch, Datadog)
print(json.dumps(metrics))
monitoring_task = PythonOperator(
task_id='log_metrics',
python_callable=log_data_quality_metrics,
)
Conclusion
Building end-to-end ML pipelines with Airflow and dbt combines the strengths of both tools—dbt’s transformation capabilities and testing framework with Airflow’s orchestration power. This architecture enables teams to build maintainable pipelines where data transformations are version-controlled, tested, and documented, while complex workflows coordinate smoothly across extraction, transformation, training, and prediction.
The key to success lies in proper separation of concerns, comprehensive testing, and treating your data pipeline as production code deserving the same engineering rigor as your ML models. By following these patterns and best practices, you can build reliable ML pipelines that scale from prototype to production, handle failures gracefully, and evolve as your ML systems mature.