post data-engineering · 2025-08-14 · 5 min read
Delta Lake MERGE patterns I wish I'd learned a year earlier
I spent my first year with Delta Lake using INSERT INTO and OVERWRITE for everything. They worked. They also produced duplicate rows on retries, lost late-arriving updates, and forced full-table rewrites for changes that could have been targeted. The fix was learning when MERGE is the right hammer.
This post is the five MERGE patterns I now reach for, with the gotchas I found out about the painful way.
What MERGE actually does
MERGE INTO target USING source ON condition is an upsert with three branches:
MERGE INTO target tUSING source sON t.id = s.idWHEN MATCHED THEN UPDATE SET ...WHEN NOT MATCHED THEN INSERT ...WHEN NOT MATCHED BY SOURCE THEN DELETEYou can have any combination of the three WHEN clauses. The unmatched-by-source branch is the one most people forget about; it’s how you handle deletions when the source is the authoritative state.
PySpark equivalent (Delta API):
from delta.tables import DeltaTable
target = DeltaTable.forPath(spark, "/data/silver/users")target.alias("t").merge( source.alias("s"), "t.id = s.id").whenMatchedUpdateAll() \ .whenNotMatchedInsertAll() \ .execute()Pattern 1: Idempotent upsert (the bread and butter)
You have a daily batch that produces user updates. Re-running yesterday’s batch should not create duplicates.
target.alias("t").merge( daily_batch.alias("s"), "t.user_id = s.user_id") \ .whenMatchedUpdateAll() \ .whenNotMatchedInsertAll() \ .execute()Re-running with the same source produces the same target state. Idempotency for free.
The gotcha: this works only if user_id is unique per row in both sides. If your source has two rows for the same user_id (it happens), MERGE will throw MultipleSourceRowsForTargetRowException. Deduplicate the source first:
from pyspark.sql import Windowfrom pyspark.sql.functions import row_number
w = Window.partitionBy("user_id").orderBy("event_time", ascending=False)deduped_source = daily_batch.withColumn("rn", row_number().over(w)) \ .filter("rn = 1") \ .drop("rn")The “keep the latest by event_time” pattern is the workhorse. Memorise it.
Pattern 2: Soft delete
Some rows in the source represent deletions. Rather than dropping them from the target, mark them deleted (lets you audit and recover):
target.alias("t").merge( source.alias("s"), "t.user_id = s.user_id") \ .whenMatchedUpdate( condition="s.is_deleted = true", set={"deleted_at": "s.event_time", "is_active": "false"} ) \ .whenMatchedUpdateAll(condition="s.is_deleted = false") \ .whenNotMatchedInsertAll(condition="s.is_deleted = false") \ .execute()Three branches now: a deletion update, a normal update, an insert that ignores deletions. Conditional WHEN clauses let you express this without a second pass.
Pattern 3: Late-arriving data (out-of-order events)
You’re running an hourly batch. An event from 3 hours ago shows up in this hour’s batch (network blip, retry queue, whatever). You want the target to reflect the latest state per key, not whatever shows up last.
target.alias("t").merge( source.alias("s"), "t.user_id = s.user_id") \ .whenMatchedUpdate( condition="s.event_time > t.event_time", set={"name": "s.name", "email": "s.email", "event_time": "s.event_time"} ) \ .whenNotMatchedInsertAll() \ .execute()The condition s.event_time > t.event_time means “only overwrite if the source row is newer than what we already have”. An out-of-order, older event hits the WHEN MATCHED branch but does nothing because the condition fails. The target keeps the newer state.
This pattern alone has saved me from an entire class of “user’s email got reverted to last week’s value” bugs.
Pattern 4: Deduplication on ingest
The source has the same row appearing N times (replay, faulty producer, whatever). Target should hold one row per logical key.
# Deduplicate first using row_numberw = Window.partitionBy("event_id").orderBy("ingest_time")deduped = source.withColumn("rn", row_number().over(w)) \ .filter("rn = 1") \ .drop("rn")
target.alias("t").merge( deduped.alias("s"), "t.event_id = s.event_id") \ .whenNotMatchedInsertAll() \ .execute()Because the source is deduped and we only INSERT (no UPDATE), repeated runs are effectively idempotent. Existing target rows just stay; new ones get added.
For high-throughput streaming, the same pattern works with .foreachBatch():
def upsert_batch(microbatch_df, batch_id): deduped = microbatch_df.dropDuplicates(["event_id"]) target.alias("t").merge(deduped.alias("s"), "t.event_id = s.event_id") \ .whenNotMatchedInsertAll() \ .execute()
stream.writeStream \ .foreachBatch(upsert_batch) \ .option("checkpointLocation", "/chk/events") \ .start()Pattern 5: Slowly-changing-dimension type 2 (history-preserving updates)
You have a user dimension table. Address changes happen but you want to keep history: the previous row stays in the table, marked as no-longer-current; a new row is inserted as the current state.
This is the classic SCD-2 pattern. MERGE handles it in two steps:
# Step 1: close the currently-active row for any user whose data changedtarget.alias("t").merge( changes.alias("s"), "t.user_id = s.user_id AND t.is_current = true") \ .whenMatchedUpdate( condition="t.address != s.address", set={"is_current": "false", "valid_until": "s.event_time"} ) \ .execute()
# Step 2: insert new current rows for the changesnew_rows = changes.withColumn("is_current", lit(True)) \ .withColumn("valid_from", col("event_time")) \ .withColumn("valid_until", lit(None).cast("timestamp"))
new_rows.write.format("delta").mode("append").save("/data/silver/users")Two passes (close + insert) is conceptually cleaner than trying to do it in one MERGE. Performance is similar in practice; readability wins.
Performance gotchas
A few things that bit me:
Z-order on the merge key, or pay for it. MERGE has to scan the target to find matches. If your target is big and the merge key isn’t co-located, every MERGE is a full table scan. OPTIMIZE table ZORDER BY (user_id) once a day, MERGE stays fast.
.whenMatchedUpdateAll() is wasteful for narrow updates. If you’re only updating two columns, name them in .set() instead. Updating-all writes the whole row even when 90% of fields are unchanged.
Watch the explain plan for BroadcastHashJoin. MERGE chooses join strategy based on source size estimate. If source is small but Spark thinks it’s big (stale stats), you’ll get a sort-merge join with a giant shuffle. Either ANALYZE TABLE for fresh stats, or hint broadcast(source) explicitly.
MERGE doesn’t compact files. Many small MERGEs leave many small files. Run OPTIMIZE periodically. delta.autoOptimize.optimizeWrite = true and delta.autoOptimize.autoCompact = true help, but they’re not free; benchmark on your workload.
What I no longer do
- Full overwrite on every batch. Once the table is bigger than ~10 GB, the cost of rewriting it daily eclipses the cost of a MERGE.
INSERT INTO ... SELECT ... WHERE NOT EXISTS. That pattern works but is slower than MERGE because Spark can’t optimise the existence check the same way.- Custom dedup-then-overwrite scripts. MERGE handles the dedup case directly with
dropDuplicates+whenNotMatchedInsertAll.
When NOT to use MERGE
Three cases:
- Append-only event streams. If you never update or delete, a plain
INSERTis faster. - Full table rebuilds. If today’s source is the entire authoritative state and the table is small,
OVERWRITEis simpler and the same speed. - Highly write-skewed targets. If 99% of your MERGEs hit the same 1% of rows, you’ll fight contention. Consider partitioning by the hot key, or batching changes.
Closing
Five patterns, one mental model: MERGE is the conditional upsert with optional delete. Pattern-match on the WHEN clauses to express any of the five common shapes. Z-order the merge key, dedupe the source first, watch out for stale statistics. After a few weeks of using these in production, the old OVERWRITE-everything pipeline starts feeling primitive.