Building the Modern Data Stack: How Spark, Kafka, and dbt Transformed Data Engineering

The data engineering landscape has undergone a fundamental transformation over the past decade. What once required massive Hadoop clusters has evolved into a sophisticated ecosystem of specialized tools: Kafka for ingestion, Spark for processing, and dbt for transformation.

Modern Data Stack Architecture

Modern Data Stack Architecture

The Paradigm Shift: Monolithic → Modular

The old approach centered around monolithic platforms that tried to do everything. Today’s modern stack embraces modularity: each tool excels at a specific function while integrating cleanly with others.
AspectOld Stack (Hadoop)Modern Stack
IngestionSqoop, FlumeKafka, Airbyte, Fivetran
ProcessingMapReduce, HiveSpark, Flink, dbt
StorageHDFSS3, Delta Lake, Iceberg
OrchestrationOozieAirflow, Prefect, Dagster
ComplexityHigh (one tool for all)Moderate (best-of-breed)

Apache Kafka: The Nervous System

Kafka has become the de facto standard for real-time data ingestion. It’s not just a message queue—it’s a distributed streaming platform with event replay, strong ordering, and an ecosystem of connectors.

Kafka Producer Example

from kafka import KafkaProducer

import json



producer = KafkaProducer(

    bootstrap_servers=['localhost:9092'],

    value_serializer=lambda v: json.dumps(v).encode('utf-8'),

    acks='all',  # Wait for all replicas

    retries=3,

    max_in_flight_requests_per_connection=1  # Ordering guarantee

)



# Send user event

event = {

    'user_id': 12345,

    'action': 'purchase',

    'amount': 99.99,

    'timestamp': '2025-01-04T10:30:00Z'

}



producer.send('user_events', key=str(event['user_id']).encode(), value=event)

producer.flush()  # Ensure delivery

Kafka Consumer with Avro Schema

from kafka import KafkaConsumer

from confluent_kafka.avro import AvroConsumer

from confluent_kafka.avro.serializer import SerializerError



consumer = AvroConsumer({

    'bootstrap.servers': 'localhost:9092',

    'group.id': 'user-analytics-group',

    'schema.registry.url': 'http://localhost:8081',

    'auto.offset.reset': 'earliest',

    'enable.auto.commit': False  # Manual commit for exactly-once

})



consumer.subscribe(['user_events'])



while True:

    msg = consumer.poll(1.0)

    if msg:

        event = msg.value()  # Automatically deserialized from Avro

        process_event(event)

        consumer.commit(msg)  # Commit after successful processing

Apache Spark: Processing Powerhouse

Spark revolutionized distributed data processing with its unified engine for batch, streaming, ML, and graph processing through a consistent API.

Spark Structured Streaming

from pyspark.sql import SparkSession

from pyspark.sql.functions import *



spark = SparkSession.builder \

    .appName("KafkaStreaming") \

    .config("spark.sql.shuffle.partitions", "200") \

    .getOrCreate()



# Read from Kafka

df = spark.readStream \

    .format("kafka") \

    .option("kafka.bootstrap.servers", "localhost:9092") \

    .option("subscribe", "user_events") \

    .option("startingOffsets", "latest") \

    .load()



# Parse JSON and window aggregation

events = df.select(

    from_json(col("value").cast("string"), event_schema).alias("event")

).select("event.*")



# Windowed aggregation (5-minute windows)

windowed = events \

    .withWatermark("timestamp", "10 minutes") \

    .groupBy(

        window(col("timestamp"), "5 minutes"),

        col("action")

    ) \

    .agg(

        count("*").alias("count"),

        sum("amount").alias("total_amount")

    )



# Write to Delta Lake

query = windowed.writeStream \

    .format("delta") \

    .outputMode("append") \

    .option("checkpointLocation", "/checkpoints/user_events") \

    .option("path", "/delta/silver/user_events_windowed") \

    .start()



query.awaitTermination()

Medallion Architecture: Bronze → Silver → Gold

Medallion Architecture

Bronze → Silver Transformation (Spark)

# Read raw data from Bronze

bronze_df = spark.read.format("delta").load("/delta/bronze/user_events")



# Clean and validate

silver_df = bronze_df \

    .dropDuplicates(["user_id", "timestamp"]) \

    .filter(col("amount") > 0) \

    .filter(col("timestamp").isNotNull()) \

    .withColumn("processed_at", current_timestamp()) \

    .withColumn("date", to_date(col("timestamp")))



# Write to Silver with partitioning

silver_df.write \

    .format("delta") \

    .mode("append") \

    .partitionBy("date") \

    .save("/delta/silver/user_events")

dbt: SQL-Based Transformations

dbt represents a philosophical shift: embrace SQL as the transformation language and bring software engineering best practices to analytics.

dbt Model: Silver → Gold

-- models/gold/user_metrics_daily.sql

{{

    config(

        materialized='incremental',

        unique_key='date_user_id',

        partition_by={

            "field": "date",

            "data_type": "date"

        }

    )

}}



WITH source AS (

    SELECT * FROM {{ ref('silver_user_events') }}

    {% if is_incremental() %}

    WHERE date > (SELECT MAX(date) FROM {{ this }})

    {% endif %}

)



SELECT

    date,

    user_id,

    CONCAT(CAST(date AS STRING), '_', CAST(user_id AS STRING)) AS date_user_id,

    COUNT(*) AS event_count,

    SUM(CASE WHEN action = 'purchase' THEN amount ELSE 0 END) AS total_revenue,

    COUNT(DISTINCT session_id) AS session_count

FROM source

GROUP BY 1, 2

dbt Tests

# models/gold/schema.yml

version: 2



models:

  - name: user_metrics_daily

    description: Daily aggregated user metrics

    columns:

      - name: date_user_id

        description: Composite key

        tests:

          - unique

          - not_null

      - name: total_revenue

        description: Total purchase revenue

        tests:

          - not_null

          - dbt_utils.expression_is_true:

              expression: ">= 0"

Delta Lake: ACID for the Lakehouse

Delta Lake brings ACID transactions, schema enforcement, and time travel to data lakes, bridging the gap between data warehouses and data lakes.
# Time travel - query historical data

df_yesterday = spark.read.format("delta") \

    .option("timestampAsOf", "2025-01-03") \

    .load("/delta/gold/user_metrics")



# Or by version

df_v5 = spark.read.format("delta") \

    .option("versionAsOf", 5) \

    .load("/delta/gold/user_metrics")



# MERGE for upserts (SCD Type 1)

from delta.tables import DeltaTable



target = DeltaTable.forPath(spark, "/delta/gold/user_metrics")



target.alias("target").merge(

    updates_df.alias("updates"),

    "target.user_id = updates.user_id AND target.date = updates.date"

).whenMatchedUpdateAll() \

 .whenNotMatchedInsertAll() \

 .execute()



# Optimize and Z-order

spark.sql("OPTIMIZE delta.`/delta/gold/user_metrics` ZORDER BY (user_id)")

Orchestration with Airflow

from airflow import DAG

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator

from datetime import datetime, timedelta



default_args = {

    'owner': 'data-team',

    'retries': 2,

    'retry_delay': timedelta(minutes=5)

}



with DAG(

    'daily_data_pipeline',

    default_args=default_args,

    schedule_interval='@daily',

    start_date=datetime(2025, 1, 1)

) as dag:



    bronze_to_silver = SparkSubmitOperator(

        task_id='bronze_to_silver',

        application='/jobs/bronze_to_silver.py',

        conf={'spark.executor.memory': '4g'}

    )



    dbt_run = DbtCloudRunJobOperator(

        task_id='dbt_transformation',

        job_id=12345,

        check_interval=30,

        timeout=3600

    )



    bronze_to_silver >> dbt_run

Best Practices

  • Use Kafka for event streaming, not batch file transfers
  • Implement medallion architecture for clear data quality layers
  • Partition Delta tables by date for performance
  • Write dbt tests for every model (uniqueness, nulls, relationships)
  • Enable checkpointing in Spark Streaming for exactly-once
  • Use Schema Registry with Kafka for schema evolution
  • Optimize Delta tables regularly with ZORDER
  • Monitor lag in Kafka consumers
  • Version control all dbt models and Spark jobs

Looking Forward

The modern data stack continues to evolve. Iceberg and Hudi compete with Delta Lake. Flink challenges Spark for stream processing. But the core principles remain: modularity, best-of-breed tools, and clear separation of concerns. The future is lakehouse architectures—combining the flexibility of data lakes with the performance of data warehouses, all powered by Kafka, Spark, and dbt.

References


Discover more from C4: Container, Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.