Tips and Tricks – Implement Idempotent ETL with Merge Statements

Traditional INSERT-based ETL pipelines fail when re-run,
creating duplicate records and data quality issues. Pipeline crashes halfway through? Re-running creates chaos. Need
to backfill historical data? Duplicates everywhere. MERGE statements solve this by making ETL idempotent—run it once
or a hundred times, the result is identical.

This guide covers production-ready idempotent ETL patterns
using MERGE/UPSERT across SQL Server, PostgreSQL, MySQL, and Snowflake. We’ll build bulletproof data pipelines that
handle failures gracefully.

Why Idempotent ETL Transforms Reliability

The INSERT-Only Problem

Non-idempotent ETL suffers from:

  • Duplicate records: Re-running creates duplicate data
  • Can’t recover: Crashes leave partial, corrupted data
  • Manual cleanup: Delete duplicates before retry
  • No backfills: Can’t safely reload historical data
  • Data drift: Source changes not reflected in destination
  • Complex logic: Manual existence checks before insert

Idempotent ETL Benefits

  • Safe retries: Run pipeline multiple times safely
  • Automatic deduplication: MERGE handles duplicates
  • Easy backfills: Reload any date range without issues
  • Incremental updates: Only modify changed records
  • Resilient: Recover from failures by re-running
  • Simplified code: No manual existence checking

Pattern 1: SQL Server MERGE

Upsert with MERGE Statement

-- Create target table
CREATE TABLE dim_customers (
    customer_id INT PRIMARY KEY,
    customer_name NVARCHAR(100),
    email NVARCHAR(100),
    city NVARCHAR(50),
    country NVARCHAR(50),
    last_updated DATETIME2,
    CONSTRAINT uq_email UNIQUE (email)
);

-- Source data (staging table or CTE)
WITH source_data AS (
    SELECT 
        customer_id,
        customer_name,
        email,
        city,
        country,
        GETDATE() AS last_updated
    FROM staging.customers
)
-- Idempotent MERGE operation
MERGE INTO dim_customers AS target
USING source_data AS source
ON target.customer_id = source.customer_id
WHEN MATCHED AND (
    -- Only update if data changed
    target.customer_name != source.customer_name OR
    target.email != source.email OR
    target.city != source.city OR
    target.country != source.country
) THEN
    UPDATE SET
        customer_name = source.customer_name,
        email = source.email,
        city = source.city,
        country = source.country,
        last_updated = source.last_updated
WHEN NOT MATCHED BY TARGET THEN
    INSERT (customer_id, customer_name, email, city, country, last_updated)
    VALUES (source.customer_id, source.customer_name, source.email, 
            source.city, source.country, source.last_updated)
WHEN NOT MATCHED BY SOURCE THEN
    -- Optional: Delete records not in source
    DELETE;
    
-- Output changes
OUTPUT 
    $action AS operation,
    inserted.customer_id,
    inserted.customer_name,
    deleted.customer_id AS old_customer_id;

-- Result: Same outcome whether run once or 100 times!

Pattern 2: PostgreSQL UPSERT

INSERT ON CONFLICT

-- Create target table
CREATE TABLE fact_sales (
    sale_id BIGSERIAL,
    transaction_id VARCHAR(50) PRIMARY KEY,
    customer_id INT,
    product_id INT,
    sale_date DATE,
    amount DECIMAL(10, 2),
    quantity INT,
    last_updated TIMESTAMP,
    CONSTRAINT fk_customer FOREIGN KEY (customer_id) REFERENCES dim_customers(customer_id)
);

-- Create index for performance
CREATE INDEX idx_sale_date ON fact_sales(sale_date);

-- Idempotent upsert
INSERT INTO fact_sales (
    transaction_id,
    customer_id,
    product_id,
    sale_date,
    amount,
    quantity,
    last_updated
)
SELECT
    transaction_id,
    customer_id,
    product_id,
    sale_date,
    amount,
    quantity,
    CURRENT_TIMESTAMP
FROM staging.sales
ON CONFLICT (transaction_id) 
DO UPDATE SET
    customer_id = EXCLUDED.customer_id,
    product_id = EXCLUDED.product_id,
    sale_date = EXCLUDED.sale_date,
    amount = EXCLUDED.amount,
    quantity = EXCLUDED.quantity,
    last_updated = EXCLUDED.last_updated
WHERE 
    -- Only update if data actually changed
    fact_sales.customer_id IS DISTINCT FROM EXCLUDED.customer_id OR
    fact_sales.product_id IS DISTINCT FROM EXCLUDED.product_id OR
    fact_sales.amount IS DISTINCT FROM EXCLUDED.amount OR
    fact_sales.quantity IS DISTINCT FROM EXCLUDED.quantity;

-- Return affected rows
RETURNING 
    transaction_id,
    CASE 
        WHEN xmax = 0 THEN 'INSERT'
        ELSE 'UPDATE'
    END AS operation;

Pattern 3: Snowflake MERGE

Cloud Data Warehouse Pattern

-- Create target table
CREATE TABLE analytics.user_activity (
    user_id NUMBER,
    activity_date DATE,
    page_views NUMBER,
    sessions NUMBER,
    total_time_seconds NUMBER,
    last_updated TIMESTAMP_NTZ,
    PRIMARY KEY (user_id, activity_date)
);

-- Idempotent MERGE with Snowflake
MERGE INTO analytics.user_activity AS target
USING (
    -- Source query (could be from external stage, stream, etc.)
    SELECT
        user_id,
        activity_date,
        COUNT(*) AS page_views,
        COUNT(DISTINCT session_id) AS sessions,
        SUM(time_on_page) AS total_time_seconds,
        CURRENT_TIMESTAMP() AS last_updated
    FROM staging.raw_activity
    WHERE activity_date >= DATEADD(day, -7, CURRENT_DATE())
    GROUP BY user_id, activity_date
) AS source
ON target.user_id = source.user_id 
   AND target.activity_date = source.activity_date
WHEN MATCHED THEN
    UPDATE SET
        target.page_views = source.page_views,
        target.sessions = source.sessions,
        target.total_time_seconds = source.total_time_seconds,
        target.last_updated = source.last_updated
WHEN NOT MATCHED THEN
    INSERT (
        user_id,
        activity_date,
        page_views,
        sessions,
        total_time_seconds,
        last_updated
    )
    VALUES (
        source.user_id,
        source.activity_date,
        source.page_views,
        source.sessions,
        source.total_time_seconds,
        source.last_updated
    );

-- Check merge results
SELECT 
    COUNT(*) AS total_rows,
    MIN(last_updated) AS oldest_update,
    MAX(last_updated) AS newest_update
FROM analytics.user_activity;

Pattern 4: MySQL REPLACE/ON DUPLICATE KEY

MySQL-Specific Patterns

-- Create target table
CREATE TABLE product_inventory (
    product_id INT PRIMARY KEY,
    warehouse_id INT,
    quantity INT,
    last_count_date DATE,
    last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    UNIQUE KEY uk_product_warehouse (product_id, warehouse_id)
);

-- Option 1: INSERT ... ON DUPLICATE KEY UPDATE
INSERT INTO product_inventory (
    product_id,
    warehouse_id,
    quantity,
    last_count_date
)
SELECT
    product_id,
    warehouse_id,
    quantity,
    count_date
FROM staging.inventory
ON DUPLICATE KEY UPDATE
    quantity = VALUES(quantity),
    last_count_date = VALUES(last_count_date),
    last_updated = CURRENT_TIMESTAMP;

-- Option 2: REPLACE (deletes and inserts)
-- Use with caution - triggers cascade deletes!
REPLACE INTO product_inventory (
    product_id,
    warehouse_id,
    quantity,
    last_count_date
)
SELECT
    product_id,
    warehouse_id,
    quantity,
    count_date
FROM staging.inventory;

-- Verify idempotency
SELECT 
    product_id,
    SUM(quantity) AS total_quantity,
    COUNT(*) AS warehouse_count
FROM product_inventory
GROUP BY product_id
HAVING COUNT(*) > 1;  -- Should be empty if truly idempotent

Pattern 5: Incremental Load with Change Detection

Process Only Changed Records

-- Add change tracking columns
CREATE TABLE dim_products (
    product_id INT PRIMARY KEY,
    product_name VARCHAR(200),
    category VARCHAR(50),
    price DECIMAL(10, 2),
    -- Audit columns
    created_at TIMESTAMP,
    updated_at TIMESTAMP,
    source_hash VARCHAR(64),  -- MD5/SHA256 of source data
    is_active BIT DEFAULT 1
);

-- Compute hash of source data for change detection
WITH source_with_hash AS (
    SELECT
        product_id,
        product_name,
        category,
        price,
        -- Create hash of all relevant fields
        CONVERT(VARCHAR(64), HASHBYTES('SHA2_256', 
            CONCAT(
                ISNULL(product_name, ''),
                ISNULL(category, ''),
                ISNULL(CAST(price AS VARCHAR), '')
            )
        ), 2) AS source_hash,
        GETDATE() AS load_timestamp
    FROM staging.products
)
MERGE INTO dim_products AS target
USING source_with_hash AS source
ON target.product_id = source.product_id
WHEN MATCHED AND target.source_hash != source.source_hash THEN
    -- Only update if hash changed
    UPDATE SET
        product_name = source.product_name,
        category = source.category,
        price = source.price,
        updated_at = source.load_timestamp,
        source_hash = source.source_hash
WHEN NOT MATCHED BY TARGET THEN
    INSERT (
        product_id, product_name, category, price,
        created_at, updated_at, source_hash
    )
    VALUES (
        source.product_id, source.product_name, source.category, source.price,
        source.load_timestamp, source.load_timestamp, source.source_hash
    );

-- Performance benefit: Only updates actually changed records
-- Without hash: Updates all matched records
-- With hash: Only updates changed records (10-100x fewer writes)

Pattern 6: SCD Type 2 with MERGE

Slowly Changing Dimensions

-- SCD Type 2 table (track history)
CREATE TABLE dim_customer_scd2 (
    customer_key BIGINT IDENTITY(1,1) PRIMARY KEY,
    customer_id INT,
    customer_name NVARCHAR(100),
    email NVARCHAR(100),
    city NVARCHAR(50),
    -- SCD Type 2 columns
    effective_date DATE,
    end_date DATE,
    is_current BIT,
    version INT
);

-- Idempotent SCD Type 2 MERGE
MERGE INTO dim_customer_scd2 AS target
USING (
    SELECT 
        customer_id,
        customer_name,
        email,
        city,
        CAST(GETDATE() AS DATE) AS effective_date
    FROM staging.customers
) AS source
ON target.customer_id = source.customer_id 
   AND target.is_current = 1
WHEN MATCHED AND (
    -- Detect changes
    target.customer_name != source.customer_name OR
    target.email != source.email OR
    target.city != source.city
) THEN
    -- Expire current record
    UPDATE SET
        end_date = DATEADD(day, -1, source.effective_date),
        is_current = 0
WHEN NOT MATCHED BY TARGET THEN
    -- Insert new record
    INSERT (customer_id, customer_name, email, city, 
            effective_date, end_date, is_current, version)
    VALUES (source.customer_id, source.customer_name, source.email, source.city,
            source.effective_date, '9999-12-31', 1, 1);

-- Insert new versions for changed records
INSERT INTO dim_customer_scd2 (
    customer_id, customer_name, email, city,
    effective_date, end_date, is_current, version
)
SELECT 
    s.customer_id,
    s.customer_name,
    s.email,
    s.city,
    s.effective_date,
    '9999-12-31' AS end_date,
    1 AS is_current,
    ISNULL(MAX(t.version), 0) + 1 AS version
FROM staging.customers s
INNER JOIN dim_customer_scd2 t
    ON s.customer_id = t.customer_id
    AND t.is_current = 0  -- Just expired
    AND t.end_date = DATEADD(day, -1, CAST(GETDATE() AS DATE))
GROUP BY s.customer_id, s.customer_name, s.email, s.city, s.effective_date;

Real-World Example: Complete ETL Pipeline

Production-Ready Implementation

-- Complete idempotent ETL pipeline
CREATE OR REPLACE PROCEDURE load_daily_sales(
    @load_date DATE
)
AS
BEGIN
    SET NOCOUNT ON;
    
    DECLARE @rows_inserted INT = 0;
    DECLARE @rows_updated INT = 0;
    DECLARE @rows_deleted INT = 0;
    
    BEGIN TRANSACTION;
    
    BEGIN TRY
        -- Step 1: Load staging table (truncate/load pattern)
        TRUNCATE TABLE staging.daily_sales;
        
        INSERT INTO staging.daily_sales
        SELECT *
        FROM external_source.sales
        WHERE sale_date = @load_date;
        
        -- Step 2: Idempotent MERGE into fact table
        MERGE INTO fact_sales AS target
        USING (
            SELECT
                transaction_id,
                customer_id,
                product_id,
                sale_date,
                amount,
                quantity,
                GETDATE() AS last_updated
            FROM staging.daily_sales
        ) AS source
        ON target.transaction_id = source.transaction_id
        WHEN MATCHED AND (
            target.amount != source.amount OR
            target.quantity != source.quantity
        ) THEN
            UPDATE SET
                customer_id = source.customer_id,
                product_id = source.product_id,
                amount = source.amount,
                quantity = source.quantity,
                last_updated = source.last_updated
        WHEN NOT MATCHED BY TARGET THEN
            INSERT (transaction_id, customer_id, product_id, 
                    sale_date, amount, quantity, last_updated)
            VALUES (source.transaction_id, source.customer_id, source.product_id,
                    source.sale_date, source.amount, source.quantity, source.last_updated)
        WHEN NOT MATCHED BY SOURCE 
            AND target.sale_date = @load_date THEN
            -- Delete records for this date not in source
            DELETE
        OUTPUT 
            $action,
            inserted.transaction_id,
            deleted.transaction_id
        INTO @merge_results;
        
        -- Get counts
        SELECT 
            @rows_inserted = COUNT(*) 
        FROM @merge_results 
        WHERE action = 'INSERT';
        
        SELECT 
            @rows_updated = COUNT(*) 
        FROM @merge_results 
        WHERE action = 'UPDATE';
        
        SELECT 
            @rows_deleted = COUNT(*) 
        FROM @merge_results 
        WHERE action = 'DELETE';
        
        -- Step 3: Update aggregates
        MERGE INTO daily_sales_summary AS target
        USING (
            SELECT
                sale_date,
                COUNT(*) AS transaction_count,
                SUM(amount) AS total_amount,
                AVG(amount) AS avg_amount
            FROM fact_sales
            WHERE sale_date = @load_date
            GROUP BY sale_date
        ) AS source
        ON target.sale_date = source.sale_date
        WHEN MATCHED THEN
            UPDATE SET
                transaction_count = source.transaction_count,
                total_amount = source.total_amount,
                avg_amount = source.avg_amount
        WHEN NOT MATCHED THEN
            INSERT (sale_date, transaction_count, total_amount, avg_amount)
            VALUES (source.sale_date, source.transaction_count, 
                    source.total_amount, source.avg_amount);
        
        -- Log results
        INSERT INTO etl_log (
            procedure_name,
            load_date,
            rows_inserted,
            rows_updated,
            rows_deleted,
            execution_time,
            status
        )
        VALUES (
            'load_daily_sales',
            @load_date,
            @rows_inserted,
            @rows_updated,
            @rows_deleted,
            GETDATE(),
            'SUCCESS'
        );
        
        COMMIT TRANSACTION;
        
        PRINT 'Success: ' + 
              CAST(@rows_inserted AS VARCHAR) + ' inserted, ' +
              CAST(@rows_updated AS VARCHAR) + ' updated, ' +
              CAST(@rows_deleted AS VARCHAR) + ' deleted';
              
    END TRY
    BEGIN CATCH
        ROLLBACK TRANSACTION;
        
        -- Log error
        INSERT INTO etl_log (
            procedure_name,
            load_date,
            error_message,
            execution_time,
            status
        )
        VALUES (
            'load_daily_sales',
            @load_date,
            ERROR_MESSAGE(),
            GETDATE(),
            'FAILED'
        );
        
        THROW;
    END CATCH;
END;

-- Run pipeline (safe to run multiple times!)
EXEC load_daily_sales '2024-01-15';
EXEC load_daily_sales '2024-01-15';  -- Same result!

Best Practices

  • Use MERGE for upserts: Single atomic operation
  • Define composite keys: Ensure unique identification
  • Add change detection: Only update actually changed records
  • Use hash columns: Fast change detection without comparing all fields
  • Log operations: Track inserts/updates/deletes for monitoring
  • Handle NULL properly: Use IS DISTINCT FROM or ISNULL()
  • Test idempotency: Run pipeline twice, compare results
  • Use transactions: Ensure atomicity

Common Pitfalls

  • Missing unique constraints: Allows duplicates to slip through
  • No change detection: Updates all records even when unchanged
  • Wrong join keys: Creates duplicates instead of updates
  • NULL handling: NULL != NULL fails equality checks
  • No error handling: Partial loads leave inconsistent state
  • Performance: MERGE can be slow without proper indexes

Performance Comparison

Approach Idempotent? Performance Complexity
INSERT only No Fast Simple
DELETE + INSERT Yes Slow (2 ops) Simple
Manual EXISTS check Yes Slow (multiple queries) Complex
MERGE/UPSERT Yes Fast (1 atomic op) Moderate

Key Takeaways

  • Idempotent ETL allows safe retries and backfills
  • MERGE/UPSERT provides atomic insert-or-update operations
  • Use hash columns for efficient change detection
  • Always test by running pipeline multiple times
  • Define proper unique constraints to prevent duplicates
  • Handle NULL values explicitly (IS DISTINCT FROM)
  • Log operations for monitoring and debugging
  • Combine with transactions for atomicity

Idempotent ETL is fundamental to reliable data pipelines. By using MERGE/UPSERT statements, you build pipelines
that handle failures gracefully, support backfills effortlessly, and maintain data quality automatically. It’s
the difference between fragile pipelines that require manual intervention and robust systems that just
work.


Discover more from C4: Container, Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.