Traditional INSERT-based ETL pipelines fail when re-run,
creating duplicate records and data quality issues. Pipeline crashes halfway through? Re-running creates chaos. Need
to backfill historical data? Duplicates everywhere. MERGE statements solve this by making ETL idempotent—run it once
or a hundred times, the result is identical.
This guide covers production-ready idempotent ETL patterns
using MERGE/UPSERT across SQL Server, PostgreSQL, MySQL, and Snowflake. We’ll build bulletproof data pipelines that
handle failures gracefully.
Why Idempotent ETL Transforms Reliability
The INSERT-Only Problem
Non-idempotent ETL suffers from:
- Duplicate records: Re-running creates duplicate data
- Can’t recover: Crashes leave partial, corrupted data
- Manual cleanup: Delete duplicates before retry
- No backfills: Can’t safely reload historical data
- Data drift: Source changes not reflected in destination
- Complex logic: Manual existence checks before insert
Idempotent ETL Benefits
- Safe retries: Run pipeline multiple times safely
- Automatic deduplication: MERGE handles duplicates
- Easy backfills: Reload any date range without issues
- Incremental updates: Only modify changed records
- Resilient: Recover from failures by re-running
- Simplified code: No manual existence checking
Pattern 1: SQL Server MERGE
Upsert with MERGE Statement
-- Create target table
CREATE TABLE dim_customers (
customer_id INT PRIMARY KEY,
customer_name NVARCHAR(100),
email NVARCHAR(100),
city NVARCHAR(50),
country NVARCHAR(50),
last_updated DATETIME2,
CONSTRAINT uq_email UNIQUE (email)
);
-- Source data (staging table or CTE)
WITH source_data AS (
SELECT
customer_id,
customer_name,
email,
city,
country,
GETDATE() AS last_updated
FROM staging.customers
)
-- Idempotent MERGE operation
MERGE INTO dim_customers AS target
USING source_data AS source
ON target.customer_id = source.customer_id
WHEN MATCHED AND (
-- Only update if data changed
target.customer_name != source.customer_name OR
target.email != source.email OR
target.city != source.city OR
target.country != source.country
) THEN
UPDATE SET
customer_name = source.customer_name,
email = source.email,
city = source.city,
country = source.country,
last_updated = source.last_updated
WHEN NOT MATCHED BY TARGET THEN
INSERT (customer_id, customer_name, email, city, country, last_updated)
VALUES (source.customer_id, source.customer_name, source.email,
source.city, source.country, source.last_updated)
WHEN NOT MATCHED BY SOURCE THEN
-- Optional: Delete records not in source
DELETE;
-- Output changes
OUTPUT
$action AS operation,
inserted.customer_id,
inserted.customer_name,
deleted.customer_id AS old_customer_id;
-- Result: Same outcome whether run once or 100 times!
Pattern 2: PostgreSQL UPSERT
INSERT ON CONFLICT
-- Create target table
CREATE TABLE fact_sales (
sale_id BIGSERIAL,
transaction_id VARCHAR(50) PRIMARY KEY,
customer_id INT,
product_id INT,
sale_date DATE,
amount DECIMAL(10, 2),
quantity INT,
last_updated TIMESTAMP,
CONSTRAINT fk_customer FOREIGN KEY (customer_id) REFERENCES dim_customers(customer_id)
);
-- Create index for performance
CREATE INDEX idx_sale_date ON fact_sales(sale_date);
-- Idempotent upsert
INSERT INTO fact_sales (
transaction_id,
customer_id,
product_id,
sale_date,
amount,
quantity,
last_updated
)
SELECT
transaction_id,
customer_id,
product_id,
sale_date,
amount,
quantity,
CURRENT_TIMESTAMP
FROM staging.sales
ON CONFLICT (transaction_id)
DO UPDATE SET
customer_id = EXCLUDED.customer_id,
product_id = EXCLUDED.product_id,
sale_date = EXCLUDED.sale_date,
amount = EXCLUDED.amount,
quantity = EXCLUDED.quantity,
last_updated = EXCLUDED.last_updated
WHERE
-- Only update if data actually changed
fact_sales.customer_id IS DISTINCT FROM EXCLUDED.customer_id OR
fact_sales.product_id IS DISTINCT FROM EXCLUDED.product_id OR
fact_sales.amount IS DISTINCT FROM EXCLUDED.amount OR
fact_sales.quantity IS DISTINCT FROM EXCLUDED.quantity;
-- Return affected rows
RETURNING
transaction_id,
CASE
WHEN xmax = 0 THEN 'INSERT'
ELSE 'UPDATE'
END AS operation;
Pattern 3: Snowflake MERGE
Cloud Data Warehouse Pattern
-- Create target table
CREATE TABLE analytics.user_activity (
user_id NUMBER,
activity_date DATE,
page_views NUMBER,
sessions NUMBER,
total_time_seconds NUMBER,
last_updated TIMESTAMP_NTZ,
PRIMARY KEY (user_id, activity_date)
);
-- Idempotent MERGE with Snowflake
MERGE INTO analytics.user_activity AS target
USING (
-- Source query (could be from external stage, stream, etc.)
SELECT
user_id,
activity_date,
COUNT(*) AS page_views,
COUNT(DISTINCT session_id) AS sessions,
SUM(time_on_page) AS total_time_seconds,
CURRENT_TIMESTAMP() AS last_updated
FROM staging.raw_activity
WHERE activity_date >= DATEADD(day, -7, CURRENT_DATE())
GROUP BY user_id, activity_date
) AS source
ON target.user_id = source.user_id
AND target.activity_date = source.activity_date
WHEN MATCHED THEN
UPDATE SET
target.page_views = source.page_views,
target.sessions = source.sessions,
target.total_time_seconds = source.total_time_seconds,
target.last_updated = source.last_updated
WHEN NOT MATCHED THEN
INSERT (
user_id,
activity_date,
page_views,
sessions,
total_time_seconds,
last_updated
)
VALUES (
source.user_id,
source.activity_date,
source.page_views,
source.sessions,
source.total_time_seconds,
source.last_updated
);
-- Check merge results
SELECT
COUNT(*) AS total_rows,
MIN(last_updated) AS oldest_update,
MAX(last_updated) AS newest_update
FROM analytics.user_activity;
Pattern 4: MySQL REPLACE/ON DUPLICATE KEY
MySQL-Specific Patterns
-- Create target table
CREATE TABLE product_inventory (
product_id INT PRIMARY KEY,
warehouse_id INT,
quantity INT,
last_count_date DATE,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_product_warehouse (product_id, warehouse_id)
);
-- Option 1: INSERT ... ON DUPLICATE KEY UPDATE
INSERT INTO product_inventory (
product_id,
warehouse_id,
quantity,
last_count_date
)
SELECT
product_id,
warehouse_id,
quantity,
count_date
FROM staging.inventory
ON DUPLICATE KEY UPDATE
quantity = VALUES(quantity),
last_count_date = VALUES(last_count_date),
last_updated = CURRENT_TIMESTAMP;
-- Option 2: REPLACE (deletes and inserts)
-- Use with caution - triggers cascade deletes!
REPLACE INTO product_inventory (
product_id,
warehouse_id,
quantity,
last_count_date
)
SELECT
product_id,
warehouse_id,
quantity,
count_date
FROM staging.inventory;
-- Verify idempotency
SELECT
product_id,
SUM(quantity) AS total_quantity,
COUNT(*) AS warehouse_count
FROM product_inventory
GROUP BY product_id
HAVING COUNT(*) > 1; -- Should be empty if truly idempotent
Pattern 5: Incremental Load with Change Detection
Process Only Changed Records
-- Add change tracking columns
CREATE TABLE dim_products (
product_id INT PRIMARY KEY,
product_name VARCHAR(200),
category VARCHAR(50),
price DECIMAL(10, 2),
-- Audit columns
created_at TIMESTAMP,
updated_at TIMESTAMP,
source_hash VARCHAR(64), -- MD5/SHA256 of source data
is_active BIT DEFAULT 1
);
-- Compute hash of source data for change detection
WITH source_with_hash AS (
SELECT
product_id,
product_name,
category,
price,
-- Create hash of all relevant fields
CONVERT(VARCHAR(64), HASHBYTES('SHA2_256',
CONCAT(
ISNULL(product_name, ''),
ISNULL(category, ''),
ISNULL(CAST(price AS VARCHAR), '')
)
), 2) AS source_hash,
GETDATE() AS load_timestamp
FROM staging.products
)
MERGE INTO dim_products AS target
USING source_with_hash AS source
ON target.product_id = source.product_id
WHEN MATCHED AND target.source_hash != source.source_hash THEN
-- Only update if hash changed
UPDATE SET
product_name = source.product_name,
category = source.category,
price = source.price,
updated_at = source.load_timestamp,
source_hash = source.source_hash
WHEN NOT MATCHED BY TARGET THEN
INSERT (
product_id, product_name, category, price,
created_at, updated_at, source_hash
)
VALUES (
source.product_id, source.product_name, source.category, source.price,
source.load_timestamp, source.load_timestamp, source.source_hash
);
-- Performance benefit: Only updates actually changed records
-- Without hash: Updates all matched records
-- With hash: Only updates changed records (10-100x fewer writes)
Pattern 6: SCD Type 2 with MERGE
Slowly Changing Dimensions
-- SCD Type 2 table (track history)
CREATE TABLE dim_customer_scd2 (
customer_key BIGINT IDENTITY(1,1) PRIMARY KEY,
customer_id INT,
customer_name NVARCHAR(100),
email NVARCHAR(100),
city NVARCHAR(50),
-- SCD Type 2 columns
effective_date DATE,
end_date DATE,
is_current BIT,
version INT
);
-- Idempotent SCD Type 2 MERGE
MERGE INTO dim_customer_scd2 AS target
USING (
SELECT
customer_id,
customer_name,
email,
city,
CAST(GETDATE() AS DATE) AS effective_date
FROM staging.customers
) AS source
ON target.customer_id = source.customer_id
AND target.is_current = 1
WHEN MATCHED AND (
-- Detect changes
target.customer_name != source.customer_name OR
target.email != source.email OR
target.city != source.city
) THEN
-- Expire current record
UPDATE SET
end_date = DATEADD(day, -1, source.effective_date),
is_current = 0
WHEN NOT MATCHED BY TARGET THEN
-- Insert new record
INSERT (customer_id, customer_name, email, city,
effective_date, end_date, is_current, version)
VALUES (source.customer_id, source.customer_name, source.email, source.city,
source.effective_date, '9999-12-31', 1, 1);
-- Insert new versions for changed records
INSERT INTO dim_customer_scd2 (
customer_id, customer_name, email, city,
effective_date, end_date, is_current, version
)
SELECT
s.customer_id,
s.customer_name,
s.email,
s.city,
s.effective_date,
'9999-12-31' AS end_date,
1 AS is_current,
ISNULL(MAX(t.version), 0) + 1 AS version
FROM staging.customers s
INNER JOIN dim_customer_scd2 t
ON s.customer_id = t.customer_id
AND t.is_current = 0 -- Just expired
AND t.end_date = DATEADD(day, -1, CAST(GETDATE() AS DATE))
GROUP BY s.customer_id, s.customer_name, s.email, s.city, s.effective_date;
Real-World Example: Complete ETL Pipeline
Production-Ready Implementation
-- Complete idempotent ETL pipeline
CREATE OR REPLACE PROCEDURE load_daily_sales(
@load_date DATE
)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @rows_inserted INT = 0;
DECLARE @rows_updated INT = 0;
DECLARE @rows_deleted INT = 0;
BEGIN TRANSACTION;
BEGIN TRY
-- Step 1: Load staging table (truncate/load pattern)
TRUNCATE TABLE staging.daily_sales;
INSERT INTO staging.daily_sales
SELECT *
FROM external_source.sales
WHERE sale_date = @load_date;
-- Step 2: Idempotent MERGE into fact table
MERGE INTO fact_sales AS target
USING (
SELECT
transaction_id,
customer_id,
product_id,
sale_date,
amount,
quantity,
GETDATE() AS last_updated
FROM staging.daily_sales
) AS source
ON target.transaction_id = source.transaction_id
WHEN MATCHED AND (
target.amount != source.amount OR
target.quantity != source.quantity
) THEN
UPDATE SET
customer_id = source.customer_id,
product_id = source.product_id,
amount = source.amount,
quantity = source.quantity,
last_updated = source.last_updated
WHEN NOT MATCHED BY TARGET THEN
INSERT (transaction_id, customer_id, product_id,
sale_date, amount, quantity, last_updated)
VALUES (source.transaction_id, source.customer_id, source.product_id,
source.sale_date, source.amount, source.quantity, source.last_updated)
WHEN NOT MATCHED BY SOURCE
AND target.sale_date = @load_date THEN
-- Delete records for this date not in source
DELETE
OUTPUT
$action,
inserted.transaction_id,
deleted.transaction_id
INTO @merge_results;
-- Get counts
SELECT
@rows_inserted = COUNT(*)
FROM @merge_results
WHERE action = 'INSERT';
SELECT
@rows_updated = COUNT(*)
FROM @merge_results
WHERE action = 'UPDATE';
SELECT
@rows_deleted = COUNT(*)
FROM @merge_results
WHERE action = 'DELETE';
-- Step 3: Update aggregates
MERGE INTO daily_sales_summary AS target
USING (
SELECT
sale_date,
COUNT(*) AS transaction_count,
SUM(amount) AS total_amount,
AVG(amount) AS avg_amount
FROM fact_sales
WHERE sale_date = @load_date
GROUP BY sale_date
) AS source
ON target.sale_date = source.sale_date
WHEN MATCHED THEN
UPDATE SET
transaction_count = source.transaction_count,
total_amount = source.total_amount,
avg_amount = source.avg_amount
WHEN NOT MATCHED THEN
INSERT (sale_date, transaction_count, total_amount, avg_amount)
VALUES (source.sale_date, source.transaction_count,
source.total_amount, source.avg_amount);
-- Log results
INSERT INTO etl_log (
procedure_name,
load_date,
rows_inserted,
rows_updated,
rows_deleted,
execution_time,
status
)
VALUES (
'load_daily_sales',
@load_date,
@rows_inserted,
@rows_updated,
@rows_deleted,
GETDATE(),
'SUCCESS'
);
COMMIT TRANSACTION;
PRINT 'Success: ' +
CAST(@rows_inserted AS VARCHAR) + ' inserted, ' +
CAST(@rows_updated AS VARCHAR) + ' updated, ' +
CAST(@rows_deleted AS VARCHAR) + ' deleted';
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION;
-- Log error
INSERT INTO etl_log (
procedure_name,
load_date,
error_message,
execution_time,
status
)
VALUES (
'load_daily_sales',
@load_date,
ERROR_MESSAGE(),
GETDATE(),
'FAILED'
);
THROW;
END CATCH;
END;
-- Run pipeline (safe to run multiple times!)
EXEC load_daily_sales '2024-01-15';
EXEC load_daily_sales '2024-01-15'; -- Same result!
Best Practices
- Use MERGE for upserts: Single atomic operation
- Define composite keys: Ensure unique identification
- Add change detection: Only update actually changed records
- Use hash columns: Fast change detection without comparing all fields
- Log operations: Track inserts/updates/deletes for monitoring
- Handle NULL properly: Use IS DISTINCT FROM or ISNULL()
- Test idempotency: Run pipeline twice, compare results
- Use transactions: Ensure atomicity
Common Pitfalls
- Missing unique constraints: Allows duplicates to slip through
- No change detection: Updates all records even when unchanged
- Wrong join keys: Creates duplicates instead of updates
- NULL handling: NULL != NULL fails equality checks
- No error handling: Partial loads leave inconsistent state
- Performance: MERGE can be slow without proper indexes
Performance Comparison
| Approach | Idempotent? | Performance | Complexity |
|---|---|---|---|
| INSERT only | No | Fast | Simple |
| DELETE + INSERT | Yes | Slow (2 ops) | Simple |
| Manual EXISTS check | Yes | Slow (multiple queries) | Complex |
| MERGE/UPSERT | Yes | Fast (1 atomic op) | Moderate |
Key Takeaways
- Idempotent ETL allows safe retries and backfills
- MERGE/UPSERT provides atomic insert-or-update operations
- Use hash columns for efficient change detection
- Always test by running pipeline multiple times
- Define proper unique constraints to prevent duplicates
- Handle NULL values explicitly (IS DISTINCT FROM)
- Log operations for monitoring and debugging
- Combine with transactions for atomicity
Idempotent ETL is fundamental to reliable data pipelines. By using MERGE/UPSERT statements, you build pipelines
that handle failures gracefully, support backfills effortlessly, and maintain data quality automatically. It’s
the difference between fragile pipelines that require manual intervention and robust systems that just
work.
Discover more from C4: Container, Code, Cloud & Context
Subscribe to get the latest posts sent to your email.