SQL transformations quickly become unmaintainable spaghetti
code scattered across scripts, notebooks, and stored procedures. dbt (data build tool) brings software engineering
best practices to analytics—version control, testing, documentation, and modularity—transforming how teams build and
maintain data pipelines.
This guide covers production dbt implementations that scale
from small teams to enterprise data warehouses. We’ll build testable, documented, modular transformations that your
entire team can understand and maintain.
Why dbt Changes Data Engineering
The Problem with Traditional SQL
Without dbt, teams struggle with:
- No version control: SQL in BI tools, notebooks, or databases
- Dependency hell: Which table depends on which?
- No testing: Hope transformations are correct
- Poor documentation: Tribal knowledge required
- Duplicated logic: Same calculations across multiple queries
- Hard to debug: No lineage or impact analysis
dbt Benefits
- Version controlled: Git for analytics code
- Testable: Built-in data testing framework
- Self-documenting: Auto-generated documentation
- Modular: DRY principles for SQL
- Dependency management: Automatic build order
- Environment management: Dev, staging, prod
Pattern 1: Basic dbt Project Structure
Initialize Project
# Install dbt
pip install dbt-core dbt-snowflake # or dbt-bigquery, dbt-redshift, etc.
# Initialize project
dbt init my_analytics
# Project structure
my_analytics/
├── dbt_project.yml # Project configuration
├── models/ # SQL transformations
│ ├── staging/ # Raw data cleaning
│ ├── intermediate/ # Business logic
│ └── marts/ # Final analytics tables
├── tests/ # Custom tests
├── macros/ # Reusable SQL functions
├── seeds/ # CSV reference data
└── snapshots/ # Slowly changing dimensions
Configure Project
# dbt_project.yml
name: 'my_analytics'
version: '1.0.0'
config-version: 2
profile: 'my_analytics'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
models:
my_analytics:
# Staging models
staging:
+materialized: view
+schema: staging
# Intermediate models
intermediate:
+materialized: view
+schema: analytics
# Final mart models
marts:
+materialized: table
+schema: marts
Pattern 2: Staging Models
Clean Raw Data
-- models/staging/stg_customers.sql
-- Clean and standardize raw customer data
with source as (
select * from {{ source('raw', 'customers') }}
),
cleaned as (
select
id as customer_id,
lower(trim(email)) as email,
lower(trim(first_name)) as first_name,
lower(trim(last_name)) as last_name,
created_at,
updated_at,
-- Data quality flags
case
when email is null or email = '' then false
when email not like '%@%' then false
else true
end as has_valid_email,
-- Standardize nulls
nullif(trim(phone), '') as phone
from source
)
select * from cleaned
-- Add tests
{{ config(
materialized='view',
tags=['staging', 'customers']
) }}
Source Configuration
# models/staging/sources.yml
version: 2
sources:
- name: raw
database: analytics_db
schema: raw_data
tables:
- name: customers
description: "Raw customer data from production database"
columns:
- name: id
description: "Primary key"
tests:
- unique
- not_null
- name: email
description: "Customer email address"
tests:
- not_null
- name: created_at
description: "Account creation timestamp"
tests:
- not_null
- name: orders
description: "Raw order data"
loaded_at_field: updated_at
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
Pattern 3: Intermediate Models
Business Logic Layer
-- models/intermediate/int_customer_orders.sql
-- Join customers with their order history
with customers as (
select * from {{ ref('stg_customers') }}
),
orders as (
select * from {{ ref('stg_orders') }}
),
customer_orders as (
select
c.customer_id,
c.email,
c.first_name,
c.last_name,
-- Order metrics
count(distinct o.order_id) as total_orders,
coalesce(sum(o.amount), 0) as lifetime_value,
min(o.created_at) as first_order_date,
max(o.created_at) as last_order_date,
-- Customer segments
case
when count(distinct o.order_id) = 0 then 'never_purchased'
when count(distinct o.order_id) = 1 then 'one_time'
when count(distinct o.order_id) < 5 then 'occasional'
else 'frequent'
end as customer_segment,
-- Recency
datediff(day, max(o.created_at), current_date()) as days_since_last_order
from customers c
left join orders o on c.customer_id = o.customer_id
group by 1, 2, 3, 4
)
select * from customer_orders
{{ config(
materialized='view',
tags=['intermediate', 'customers']
) }}
Pattern 4: Mart Models
Final Analytics Tables
-- models/marts/fct_customer_metrics.sql
-- Customer analytics fact table
with customer_orders as (
select * from {{ ref('int_customer_orders') }}
),
customer_support as (
select * from {{ ref('int_customer_support') }}
),
final as (
select
co.customer_id,
co.email,
co.first_name,
co.last_name,
co.total_orders,
co.lifetime_value,
co.customer_segment,
co.first_order_date,
co.last_order_date,
co.days_since_last_order,
-- Support metrics
coalesce(cs.total_tickets, 0) as total_support_tickets,
coalesce(cs.avg_resolution_time_hours, 0) as avg_support_resolution_hours,
-- Customer health score
case
when co.days_since_last_order < 30 and co.total_orders > 5 then 'healthy'
when co.days_since_last_order < 90 then 'at_risk'
else 'churned'
end as health_status,
current_timestamp() as updated_at
from customer_orders co
left join customer_support cs on co.customer_id = cs.customer_id
)
select * from final
{{ config(
materialized='table',
tags=['mart', 'customers'],
unique_key='customer_id'
) }}
Pattern 5: Reusable Macros
DRY SQL with Macros
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
-- macros/generate_alias_name.sql
{% macro generate_alias_name(custom_alias_name=none, node=none) -%}
{%- if custom_alias_name is none -%}
{{ node.name }}
{%- else -%}
{{ custom_alias_name | trim }}
{%- endif -%}
{%- endmacro %}
-- macros/surrogate_key.sql
{% macro surrogate_key(field_list) %}
md5({% for field in field_list %}
coalesce(cast({{ field }} as varchar), '')
{% if not loop.last %}|| '|' ||{% endif %}
{% endfor %})
{% endmacro %}
-- Usage in models
select
{{ surrogate_key(['customer_id', 'order_date']) }} as order_key,
{{ cents_to_dollars('amount_cents') }} as amount_dollars
from orders
Pattern 6: Comprehensive Testing
Schema Tests
# models/marts/schema.yml
version: 2
models:
- name: fct_customer_metrics
description: "Customer metrics fact table"
columns:
- name: customer_id
description: "Unique customer identifier"
tests:
- unique
- not_null
- name: email
description: "Customer email"
tests:
- not_null
- unique
- name: lifetime_value
description: "Total customer spend"
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: total_orders
description: "Total number of orders"
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 10000
- name: health_status
description: "Customer health classification"
tests:
- accepted_values:
values: ['healthy', 'at_risk', 'churned']
Custom Tests
-- tests/assert_revenue_positive.sql
-- Custom test: ensure daily revenue is always positive
select
order_date,
sum(amount) as daily_revenue
from {{ ref('fct_orders') }}
group by order_date
having sum(amount) < 0
Pattern 7: Incremental Models
Efficient Large Tables
-- models/marts/fct_events.sql
-- Incrementally process event data
{{
config(
materialized='incremental',
unique_key='event_id',
on_schema_change='fail'
)
}}
with events as (
select * from {{ source('raw', 'events') }}
{% if is_incremental() %}
-- Only process new events
where created_at > (select max(created_at) from {{ this }})
{% endif %}
),
transformed as (
select
event_id,
user_id,
event_type,
event_properties,
created_at,
-- Parse JSON properties
json_extract_path_text(event_properties, 'page_url') as page_url,
json_extract_path_text(event_properties, 'referrer') as referrer,
current_timestamp() as processed_at
from events
)
select * from transformed
Pattern 8: Documentation
Auto-Generated Docs
# models/marts/schema.yml
version: 2
models:
- name: fct_customer_metrics
description: |
# Customer Metrics Fact Table
This table contains one row per customer with comprehensive metrics including:
- Order history and lifetime value
- Customer segmentation
- Health status
- Support interaction history
## Refresh Schedule
- Runs daily at 2 AM UTC
- Incremental updates throughout the day
## Data Quality
- All customers must have valid email
- Lifetime value cannot be negative
- Health status must be one of: healthy, at_risk, churned
meta:
owner: "analytics-team@company.com"
contains_pii: true
columns:
- name: customer_id
description: "Unique identifier for customer"
meta:
dimension:
type: string
# Generate documentation
dbt docs generate
# Serve documentation site
dbt docs serve --port 8080
# Opens at http://localhost:8080
# Interactive lineage graphs, searchable docs
Pattern 9: Production Workflow
CI/CD Pipeline
# .github/workflows/dbt.yml
name: dbt CI/CD
on:
pull_request:
branches: [main]
push:
branches: [main]
jobs:
dbt-run:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dbt
run: |
pip install dbt-snowflake==1.5.0
- name: dbt deps
run: dbt deps
- name: dbt debug
run: dbt debug
env:
DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
DBT_SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
DBT_SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
- name: dbt test (on PR)
if: github.event_name == 'pull_request'
run: dbt test --select state:modified+
- name: dbt run (on merge)
if: github.event_name == 'push'
run: dbt run --select state:modified+
- name: Upload artifacts
uses: actions/upload-artifact@v2
with:
name: dbt-artifacts
path: target/
Production Deployment
# Run specific models
dbt run --select fct_customer_metrics
# Run with dependencies
dbt run --select +fct_customer_metrics
# Run modified models only
dbt run --select state:modified+
# Full refresh incremental models
dbt run --select fct_events --full-refresh
# Test before running
dbt test && dbt run
# Run in production
dbt run --target prod --threads 8
Pattern 10: Monitoring & Alerts
Data Quality Monitoring
# monitoring/dbt_monitor.py
import json
from pathlib import Path
from datetime import datetime
class DBTMonitor:
def __init__(self, manifest_path='target/manifest.json', run_results_path='target/run_results.json'):
self.manifest = self.load_json(manifest_path)
self.run_results = self.load_json(run_results_path)
def load_json(self, path):
with open(path, 'r') as f:
return json.load(f)
def check_test_failures(self):
"""Check for test failures"""
failures = []
for result in self.run_results.get('results', []):
if result.get('status') == 'fail':
failures.append({
'name': result['unique_id'],
'message': result.get('message', 'No message'),
'execution_time': result.get('execution_time', 0)
})
return failures
def check_model_freshness(self):
"""Check source freshness"""
stale_sources = []
for source in self.manifest.get('sources', {}).values():
if source.get('freshness', {}).get('error_after'):
# Check if source is stale
# (Implementation depends on your data warehouse)
pass
return stale_sources
def send_alert(self, failures):
"""Send alert on failures"""
if failures:
# Send to Slack, PagerDuty, etc.
print(f"ALERT: {len(failures)} test failures detected!")
for failure in failures:
print(f" - {failure['name']}: {failure['message']}")
# Usage
monitor = DBTMonitor()
failures = monitor.check_test_failures()
monitor.send_alert(failures)
Best Practices
- Layer your models: staging → intermediate → marts
- One model per business entity: Don’t try to do everything in one query
- Use CTEs liberally: Make logic readable and testable
- Test everything: Unique keys, not nulls, accepted values, relationships
- Document as you go: Future you will thank present you
- Use incremental models: For large event tables (>millions of rows)
- Materialize strategically: Views for low-use, tables for high-use
- Version lock packages: Pin dbt and package versions
Common Pitfalls
- Over-nesting: Don’t create 10+ layers of dependencies
- No testing: Tests are not optional in production
- Materializing everything as tables: Use views for intermediate models
- Ignoring performance: Monitor query execution times
- Poor naming: Use clear, consistent naming conventions (stg_, int_, fct_, dim_)
- Not using sources: Always reference raw data via sources, not direct table names
Key Takeaways
- dbt brings software engineering to analytics: version control, testing, documentation
- Layer models: staging → intermediate → marts for maintainability
- Test everything: data quality is not optional
- Use incremental models for large tables to improve performance
- Document as you build: auto-generated docs are a game-changer
- Implement CI/CD: test on PRs, deploy on merge
- Monitor in production: track test failures and freshness
dbt transforms data transformation from ad-hoc SQL scripts into maintainable, tested, documented code. Teams that
adopt dbt ship faster, with higher confidence, and spend less time debugging production issues.
Discover more from C4: Container, Code, Cloud & Context
Subscribe to get the latest posts sent to your email.