At 10 TB, one modified row can fan out to thousands of rewritten files. Here’s why, and what to do about it.
The Assumption
Most teams treat MERGE as a black box: match rows, update or insert, done. At small scale this works. At 10+ TB with skewed keys, nested structures, and high-frequency ingestion, MERGE becomes the bottleneck. Not because of a bug — because of what it actually does under the hood.
Where It Breaks: Three Failure Modes at Scale
A — Skewed join keys. If 30% of the source rows match the same subset of target partitions, the tasks responsible for those partitions become stragglers. The work concentrates on a few tasks while the rest of the cluster idles — P99 task duration 50x the median.
B — Small file problem. Frequent ingestion (every 5–15 minutes) produces thousands of small files. MERGE must scan all of them. Each small file is I/O overhead, metadata overhead, and scheduling overhead.
C — Wide rewrite radius. Without partition pruning, MERGE scans every partition of the target. On a 20 TB table without correct partitioning, that means reading 20 TB to update 500 MB. Even with data skipping, the scan overhead is significant when file-level statistics do not align with the merge key.
These are symptoms. The next section explains the mechanics that cause them.
What MERGE Actually Does
Not the SQL. The physical execution.
Conceptually, MERGE boils down to five phases. First, Spark uses the merge condition and table metadata (partitioning and file statistics) to build a (possibly full-table) candidate target set and correlate source and target rows. Second, Spark scans those candidate files — with Delta reducing scope via partition pruning, dynamic partition pruning (DPP), and data skipping using min/max statistics recorded in the transaction log. But if the merge condition does not constrain partition columns, partition pruning cannot shrink the candidate set — so in the worst case the scan degenerates to all partitions (and effectively all files). Data skipping can reduce the damage, but cannot compensate for missing partition predicates. Operationally, Delta typically runs an initial pass to identify touched files, then a second pass to rewrite them — two distinct jobs with different bottlenecks.
Third, row classification: each row pair is labeled matched-update, matched-delete, or not-matched-insert based on the MERGE clauses. Fourth, file rewrite: Delta does not update rows in place. It rewrites every Parquet file that contains at least one matched row. One modified row in 10,000 files can mean 10,000 files rewritten. Fifth, an atomic commit to the transaction log (_delta_log): the new files are registered, the old files are marked as removed, and the entire operation becomes visible to readers in a single, atomic update. Until the commit succeeds, no reader sees partial results.
The cost of MERGE has three components: files scanned + files rewritten + shuffle. On large tables with small update sets, the scan phase often dominates. On tables with poor file layout, the rewrite phase dominates. Both are forms of amplification — the first on reads, the second on writes.
Now the failure modes become causal chains. Skewed keys → task concentration on the same partitions → stragglers. Small files → high file count → scan and scheduling overhead. Wide rewrite radius → no partition pruning → full table scan amplification.
There is a subtlety that most teams miss. Delta’s MERGE semantics require a 1:1 match between source and target rows. In Spark/Delta, MERGE fails when multiple source rows match the same target row — the engine raises an ambiguous update error. delta-rs currently does not enforce this constraint, which can yield nondeterministic results if duplicates slip through. At scale, with skewed keys, duplicate source rows are easy to introduce unintentionally. The safe default: always deduplicate the source on the merge key before executing.
The Numbers
Measured on a single-node Spark 4.1 standalone cluster with Delta Lake 4.1 OSS. The setup:
- Hardware: AMD Ryzen 9 7950X3D (16 cores / 32 threads), 125 GB RAM, 1.8 TB NVMe
- Runtime: Spark 4.1.1
local[*], 90 GB driver memory, Delta Lake 4.1.0 OSS, Java 17 - Target: 44 GB, 58,400 Parquet files (~750 KB avg, fragmented from 5 batch writes), partitioned by
date(365 partitions, ~160 files per partition) - Source: 354 MB, 200k rows — 92% concentrated on 30 dates (120k updates sampled from hot partitions + 80k inserts, 80% on the same 30 dates)
- Merge key:
id(UUID, high-cardinality, uniformly distributed within each partition) — does not coincide with partition keydate - Match rate: 60% updates (120k matches), 40% inserts (80k new rows)
Full setup, scripts, and raw results: delta-merge-bench.
Repro notes:
- Data generation is seeded (
SEED=42) for deterministic row distribution across partitions maxRecordsPerFile=1000controls the initial file fragmentation (~58,400 files)- In this benchmark, file-level stats were present in
_delta_log(add.statsJSON) for the relevant columns; since stats are optional in the protocol, data skipping only helps when those stats are actually recorded
Each row in the table adds one optimization on top of the previous.
Scenario Files scanned Files rewritten Scan × Rewrite × Scan time Rewrite time Total
────────────────────────────── ─────────────── ───────────────── ──────── ─────────── ────────── ───────────── ──────
Baseline (no optimization) 58,400 3,092 19× 1× (ref) 16.0 s 8.7 s 25.0 s
+ Partition pruning ~4,800 3,092 1.5× 1× 13.0 s 7.8 s 20.9 s
+ Z-ordering on merge key ~5,300 441 1.7× 0.14× 5.4 s 11.2 s 16.6 s
+ Pre-compaction (OPTIMIZE) ~5,200 427 1.7× 0.14× 5.4 s 11.7 s 17.1 s
Chunk 1 (hot dates) ~4,300 427 1.4× 0.14× 2.8 s 10.9 s 13.8 s
Chunks 2–7 (cold, insert-only) ~150 / chunk 0 / chunk — — 2.9 s avg 0.3 s avg ~3.4 s each
Scan × (file-count proxy) = files scanned / files rewritten. Rewrite × = files rewritten / baseline rewritten (improvement factor). File-count is used as a portable proxy; in bytes the amplification depends on file-size distribution, which changes after Z-ordering and compaction.
Reading the table.
The baseline scans all 58,400 files because the merge condition (t.id = s.id) does not constrain the partition column. Spark must consider every file for matching IDs. However, because the source updates are concentrated on 30 dates, only 3,092 files — those in the hot partitions that contain at least one matching row — are actually rewritten. The scan amplification is maximal (58,400 files scanned), but the rewrite radius is naturally bounded by the data distribution.
Adding a partition constraint to the merge condition (t.id = s.id AND t.date = s.date) reduces the scan from 58,400 files to ~4,800 (30/365 × 58,400). Files rewritten stay the same — 3,092 — because the same files contain the same matches. The improvement is entirely on the scan side: 12x fewer files scanned. On NVMe storage this translates to a modest wall-clock improvement (25s → 21s) because local I/O is cheap. On cloud storage with network-attached disks (S3, ADLS, GCS), where each file open carries milliseconds of latency, this 12x reduction translates directly to proportional time savings.
Z-ordering on the merge key does not necessarily reduce the scan further — pruning already limits it to hot partitions (scanned file count can shift slightly after Z-ordering because file boundaries change) — but it co-locates rows with similar id values into fewer files. Matching rows that were previously scattered across 3,092 files now cluster into 441 — a 7x reduction in files rewritten. This is the key result: Z-ordering attacks rewrite amplification, not scan amplification. The scan time drops further (13s → 5.4s) because the Z-ordered table has fewer, larger files (5,341 after Z-ordering vs 58,400 before).
A counterintuitive detail: the rewrite time increases from 7.8s to 11.2s despite rewriting 7x fewer files. This is because Z-ordered files are larger — fewer files means more data per file. On NVMe, where throughput is high but per-file overhead is negligible, writing fewer large files costs more than writing many small ones. On cloud storage, where per-file latency dominates, the relationship inverts: fewer files means less total latency.
Pre-compaction (OPTIMIZE + Z-ordering in sequence) produces a marginal additional improvement over Z-ordering alone: 427 files rewritten vs 441. The compaction reduces the total file count from 58,400 to ~5,200, but since Z-ordering already achieved most of the co-location benefit, the incremental gain is small. The OPTIMIZE + Z-order pass costs 258 seconds (4.3 minutes) — an investment that pays for itself after two subsequent MERGEs.
Chunking the source into 7 date-range batches changes the profile entirely. Chunk 1 contains 93% of the source rows (186k rows, all concentrated on the hot dates) and accounts for most of the work: 427 files rewritten in 13.8 seconds. Chunks 2–7 are insert-only (the remaining 14k rows distributed across cold dates) — they rewrite zero files and complete in ~3 seconds each. Total duration is higher than the single optimized MERGE (36s vs 17s) due to the overhead of 7 commits and 7 scan phases. The trade-off: lower peak memory, fine-grained retry, and isolation between hot and cold data. If chunk 1 fails, you restart ~14 seconds of work, not the whole batch.
A note on absolute timings: this benchmark runs on NVMe storage with 3+ GB/s sequential throughput. The scan time differences between scenarios (16s → 5s) are compressed because local I/O is not the bottleneck. On cloud storage where file-open latency dominates, the 12x scan reduction from partition pruning and the 7x rewrite reduction from Z-ordering translate to proportionally larger wall-clock improvements. The file counts — not the durations — are the portable metric.
Partition Pruning: The First Line of Defense
MERGE without a partition predicate means scanning every partition. With a predicate, Spark scans only the relevant ones. But the mechanics are more nuanced than most teams assume.
Static partition pruning. If the merge condition contains equality constraints on partition columns, Spark prunes at plan time. ON t.id = s.id AND t.date = s.date where date is a partition column — Spark scans only partitions with dates present in the source. This is deterministic and reliable.
Dynamic partition pruning (DPP). In some cases Spark pushes runtime filters from the source side into the target scan. Spark may derive runtime filters from the build side and apply them to the target scan; whether this prunes partitions depends on the plan shape and runtime statistics. DPP is opportunistic: it depends on the join shape, source size, and Spark’s cost model.
Data skipping. Even when partitions cannot be pruned, Delta can skip individual files using min/max statistics stored in the transaction log. Every Parquet file in Delta can have column-level statistics — minimum, maximum, and null count — for up to N columns (by default controlled by dataSkippingNumIndexedCols; name and namespace vary by runtime). If the merge key is among those columns, Delta can eliminate files whose min/max range does not overlap with the source values. If the merge key is not indexed, data skipping is blind.
The interaction between these three mechanisms determines the actual scan scope. Partition pruning is the most reliable — it operates on the directory structure and is always deterministic. DPP is opportunistic — it depends on the optimizer’s cost model and runtime conditions. Data skipping is a safety net, not a strategy — it helps on top of pruning but cannot replace it.
The practical consequence: always include the partition column in the merge condition. If the merge key does not align with the partition key, add the partition column as a secondary constraint. In the benchmark, this single change reduced files scanned from 58,400 to ~4,800 — a 12x improvement.
File Compaction and Z-Ordering: Reducing the Rewrite Radius
The rewrite cost of MERGE is proportional to the number of files touched. Fewer files with better data layout means less rewrite.
OPTIMIZE (compaction). Delta’s OPTIMIZE command merges small files into larger ones. On the benchmark table, setting spark.databricks.delta.optimize.maxFileSize to 8 MB before OPTIMIZE reduced the file count from 58,400 to ~5,200. Availability, behavior, and even the exact config name may vary by runtime — verify in your environment. The 8 MB target was chosen deliberately: in this dataset, a 1 GB target would tend to compact many partitions into a single file, eliminating the granularity that Z-ordering needs to be effective. Keeping multiple files per partition preserves file-level granularity, which is what data skipping and rewrite-radius reduction operate on. The 8 MB setting is specific to this benchmark; in production you typically target larger files; partitioning and clustering are how you regain selectivity. The payoff is on every subsequent operation: fewer files to scan, fewer files to rewrite, less scheduling overhead.
Z-ordering on the merge key. Z-ordering clusters data using a space-filling curve so that rows with similar values of the specified columns end up in fewer files. When applied to the merge key, matching rows concentrate into a fraction of the original file set. In the benchmark, Z-ordering reduced files rewritten from 3,092 to 441 — a 7x improvement — because Z-ordering rewrites the table so rows that are close in the Z-order curve end up co-located within fewer files, even for high-cardinality keys. The Z-ordering pass cost 135 seconds on the benchmark hardware.
Combined, OPTIMIZE + Z-ordering cost 258 seconds (4.3 minutes) and reduced subsequent MERGE rewrite from 3,092 to 427 files. Each subsequent MERGE saves approximately 4 seconds of rewrite time on NVMe — a modest absolute gain on fast storage, but the 7x file reduction is the metric that matters. On cloud storage, the savings scale with file-open latency.
Liquid clustering (Databricks Runtime 13.3+) is the incremental evolution. Instead of a batch OPTIMIZE that rewrites the entire table, liquid clustering reorganizes data incrementally based on declared clustering keys. It subsumes both compaction and Z-ordering. The trade-off: it is available only on Databricks runtime.
When each strategy pays off. The break-even depends on MERGE frequency and storage characteristics. On NVMe, the absolute time savings per MERGE are small, so the payback period is longer. On cloud storage — where the 12x scan reduction and 7x rewrite reduction translate to proportional wall-clock improvements — OPTIMIZE + Z-ordering pays for itself after the first subsequent MERGE. For daily MERGE workloads, the recommendation is straightforward: run OPTIMIZE with Z-ordering on the merge key before the MERGE batch, or schedule it as a trailing job after the last ingestion window.
The real question is how fast file fragmentation returns. With ingestion every 10 minutes, small files accumulate within hours. A single OPTIMIZE does not solve the problem permanently — it must be repeated. Liquid clustering addresses this by design: it compacts incrementally on every write, preventing fragmentation from accumulating.
Chunked MERGE: Breaking the Monolith
When the source is large and skewed, a single MERGE is unstable. The working set is too large, shuffle is excessive, and a single failure means restarting the entire operation. The solution: break the source into temporal or key-based chunks and execute N small MERGEs.
Working set control. Each chunk processes a subset of the source — typically defined by date range or key range. The working set fits comfortably in executor memory, GC pressure drops, and the probability of OOM failures decreases.
Retry granularity. With a monolithic MERGE, any failure — OOM, preemption, transient storage error — means restarting the entire operation. With chunked MERGE, a failure in one chunk means restarting only that chunk. In the benchmark, chunk 1 (the hot chunk with 186k rows) completed in 13.8 seconds. A failure there costs 14 seconds of rework, not the full 36-second batch.
Shuffle reduction. Smaller source means a proportionally smaller join, which means less data shuffled across the network. The reduction is roughly linear with chunk size.
The trade-off is transactional. Each chunk produces a separate commit in the Delta transaction log. Seven chunks means seven commits instead of one. This has two consequences. First, concurrent writers face more conflict opportunities — each commit must be reconciled against the log. If another process writes to the same partitions between chunks, conflict resolution may reject later chunks. Second, the table is in an intermediate state between chunks — readers using snapshot isolation see a consistent view, but time-travel queries between chunks see partial results.
The benchmark revealed a structural insight: with source data concentrated on 30 dates, chunking by date range produces one heavy chunk (93% of the work) and six lightweight chunks (insert-only, zero files rewritten). This is not a failure of the chunking strategy — it reflects the underlying data skew. A more effective chunking strategy for this distribution would be to split the hot dates into multiple chunks (e.g., 5 chunks of 6 dates each) and batch the cold dates together. The general principle: chunk boundaries should split the heavy partitions, not the light ones.
The concrete pattern: chunk by ingestion date, MERGE per chunk in chronological order, final OPTIMIZE compaction to clean up the small files produced by each individual MERGE.
MERGE Internals: What Catalyst Actually Generates
This section is for engineers who want to understand the physical plan. It is optional but differentiating — the details here explain behaviors that the higher-level model cannot.
When Spark receives a MERGE statement, it does not execute it as a sequence of UPDATE and INSERT operations. Catalyst compiles the entire MERGE into a single logical plan, which Delta often executes as two jobs: (1) find touched files, (2) rewrite and commit.
The entry point is DeltaMergeIntoCommand. This command reads the merge condition, the WHEN MATCHED and WHEN NOT MATCHED clauses, and the target table metadata. It constructs a full outer join (or right outer join, depending on the clause combination) between the source and target. The join produces a stream of row pairs — each pair contains the source row and the corresponding target row, or null on either side if there is no match.
After the join, a row-classification step evaluates the WHEN clauses and assigns each pair to one of: update the target row, delete the target row, insert a new row, or pass through unchanged. The output of this step is a new set of rows that represents the desired state of each affected file.
Finally, the rewrite step. Delta identifies which files contain at least one affected row. For each such file, it reads the entire file, applies the classified changes, and writes a new file. Unaffected files are not touched. The new files are committed atomically — the transaction log records which old files are removed and which new files are added.
Multiple source match detection. In Spark/Delta, MERGE fails when multiple source rows match the same target row — the engine raises an ambiguous update error. Recent Databricks runtimes have improved duplicate match handling, making detection more robust across execution plans. delta-rs currently does not enforce this constraint, so duplicates can produce nondeterministic results. The safe practice remains: deduplicate the source on the merge key before executing, regardless of runtime.
Copy-on-write vs merge-on-read (Deletion Vectors). Everything described above is copy-on-write: to modify one row, Delta rewrites the entire file that contains it. Deletion Vectors change the model. When enabled and applicable, Delta can avoid rewriting base files by recording row-level deletions in sidecar files and writing only the new or updated rows separately. This can reduce rewrite amplification substantially for update-heavy workloads.
The trade-off is deferred: deletion vectors fragment the table over time. Reads must check the deletion vector for each file, and the storage layout degrades. Compaction becomes necessary — not to fix the MERGE, but to clean up after it. The write amplification problem does not disappear. It shifts from eager rewrite to deferred compaction.
Decision Framework
Given your failure mode, here is where to start.
Problem Solution When to use Cost
──────────────────────── ──────────────────── ───────────────────────────── ────────────────────
Full table scan Partition pruning Always — prerequisite Zero (one-line change)
Wide rewrite radius Z-order on merge key Join key ≠ partition key 135 s / 44 GB (measured)
Too many small files Pre-MERGE OPTIMIZE Frequent ingestion 258 s / 44 GB (measured)
Large skewed source Chunked MERGE Source > 1% of target N commits vs 1
Source duplicates (1:N) Dedup + constraints Skewed keys, dirty data Pre-processing step
File rewrite overhead Deletion Vectors Databricks runtime DV compaction required
Scan amplification Data skipping tuning Key lacks file-level stats Config change
Epilogue: MERGE Is an Amplification Problem
Delta Lake MERGE is not a compute problem. It is an amplification problem — on both reads and writes.
Every modified row amplifies cost in two ways: scan amplification (how much of the table you read to find the rows that need changing) and rewrite amplification (how many files you rewrite for every row modified). In the baseline scenario, merging 354 MB of source data into a 44 GB table scanned 58,400 files to rewrite 3,092 of them. The scan amplification factor was 19x (58,400 files scanned for 3,092 that needed rewriting). Partition pruning reduced it to 1.5x. Z-ordering reduced the rewrite count by 7x.
These two amplification factors are independent. Partition pruning attacks scan amplification — it reduces how much of the table Spark reads. Z-ordering attacks rewrite amplification — it reduces how many files contain at least one matched row. Compaction reduces both by consolidating small files. Chunking reduces the blast radius of each individual operation.
The correct way to reason about MERGE at scale: what is my scan amplification factor, and what is my rewrite amplification factor? When you can answer both, you can predict the cost — and choose the right optimization to reduce it.