In large-scale data pipelines, skew can occur when one key carries a disproportionately large payload, for example a nested array containing thousands of records. Even if that key appears only once, the task responsible for processing it must handle significantly more data than the others, leading to execution imbalance. This article introduces asymmetric salting: a practical optimization technique for addressing data skew caused not by how frequently a key appears, but by how much data is associated with it.
Unlike traditional salting, where both sides of a join are treated symmetrically because the skew comes from repeated keys, asymmetric salting targets a fundamentally different root cause: oversized nested payloads attached to otherwise unique keys. The “asymmetry” lies in the fact that only the heavy side needs to be decomposed (via explode), while the light side is merely replicated with matching salt values. The two sides undergo structurally different transformations to achieve balanced parallelism.
The primary target is skewed joins and enrichment pipelines where a nested payload must be matched against a reference dataset; the nested re-aggregation that often follows is a common but not always necessary step, and, as discussed later, can itself become a bottleneck.
An Example
Think of a delivery depot that routes all packages for a building to a single sorting station. Most buildings receive a handful of packages. One building receives a single box — but inside it are thousands of items. The station is overwhelmed not because the building appears often, but because its single entry carries a massive payload.
Spark works the same way. During groupBy, join, or collect_list, Spark routes all records sharing the same key to the same shuffle partition via pmod(hash(key), numPartitions). If that key carries a deeply nested or large array, even in a single row, the entire partition becomes skewed and the task assigned to it becomes a bottleneck.
This form of skew manifests when the nested array crosses a shuffle boundary intact — when a join or aggregation forces redistribution by key before the nested structure has been expanded. If the array were exploded before the shuffle, the individual elements would be separate rows and could be distributed normally. This is why the first step of asymmetric salting is to explode the payload before the join’s exchange.
But exploding alone is not sufficient. After flattening, all rows still share the same join key. The join’s exchange routes them to the same partition — now large not because a single row was heavy, but because one key dominates the row count. Salting is the explicit lever that forces parallelism for that key, regardless of Spark’s partitioning heuristics.
Why Adaptive Query Execution (AQE) Isn’t Enough
Since Spark 3.0, AQE can detect oversized shuffle partitions (exceeding both skewedPartitionThresholdInBytes — default 256 MB — and skewedPartitionFactor × median partition size — default 5×) and split them into subpartitions. If an explode happens before the shuffle, AQE can in principle detect and split the resulting oversized partition.
But AQE’s thresholds operate on serialized shuffle data size, measured after compression. A row containing a nested array of 10,000 structs may serialize to a few megabytes in a compressed shuffle block — well below AQE’s 256 MB threshold — while expanding to hundreds of megabytes or more in memory when the array is materialized during a collect_list or UDF evaluation. AQE sees an acceptable partition and assumes balanced workload. Asymmetric salting gives the developer explicit control over how the load is distributed, rather than relying on a heuristic that operates on the wrong metric for this class of skew.
Asymmetric Salting: Algorithm
This is where asymmetric salting comes in. Let’s describe its algorithm.
Input:
- A left-side dataset containing lightweight entities uniquely identified by a key.
- A right-side dataset with a foreign key pointing to the left-side key, and a nested array of structs, which may be very large for some keys.
Problem: Some right-side records contain massive nested structures for specific keys, causing a single task to become overloaded in Spark during the join, despite a uniform key distribution.
Objective: Redistribute the processing of these large payloads across multiple tasks, without altering the logical meaning of the data.
Algorithm Steps
-
Detect the skew source: Identify that the skew originates not from repeated keys but from a single key associated with a large nested array on the right-hand side.
-
Expand the payload: Convert the nested structure into a flat representation, so that individual nested elements become distinct rows. This enables fine-grained distribution of the load.
-
Generate a virtual partitioning key: For each expanded row, compute a salt value, for instance by assigning a sequential index to each row within the same key and taking its modulo over a fixed number of buckets. This salt value is used to artificially partition the data across multiple tasks. Note that the salt is derived from the row’s position within its group, not from the join key itself.
-
Replicate the salt space on the left side: On the left-side dataset, generate one copy of each row for every possible salt value. This ensures that every sub-partition of the exploded right-side data finds a corresponding match in the join. This replication has a cost: the left side grows by a factor equal to the number of salt buckets, which should be taken into account when sizing the salt space (see Performance Considerations).
-
Construct a salted join key: Combine the original join key and the salt value to form a new composite key used for joining. This ensures that multiple logical partitions of the heavy key can be processed in parallel.
-
Execute the join: Perform the join using the salted composite key. This enables Spark to distribute the join across multiple tasks even for originally skewed keys.
-
(Optional) Reaggregate the payload: If needed, group the resulting data by the original key and reconstruct the nested structure. When using a left outer join, take care to filter out NULL entries introduced by the replication of the left side: keys with no match on the right side will produce one NULL row per salt bucket instead of a single NULL row.
Semantic Correctness
Spark’s execution model requires that all records sharing the same key are co-located for operations like groupBy and join; under the default hash-partitioning strategy, this is achieved by routing all records with the same key to the same shuffle partition. Asymmetric salting temporarily breaks this contract by splitting the records of a single logical key across multiple partitions via a synthetic composite key. Correctness is restored by the re-aggregation step (step 7), which collapses the salted sub-partitions back into the original key space. If re-aggregation is omitted or groups by the salted key instead of the original key, the output is semantically wrong: a single logical entity appears as multiple fragments. This reversibility is the theoretical invariant that makes the entire approach valid.
The Re-aggregation Bottleneck
The re-aggregation step deserves special attention. Although asymmetric salting distributes the join workload across multiple tasks, the final groupBy(original_key).collect_list(...) gathers all records for each key back into a single task. For the very keys that caused the original skew, this means the entire payload, now joined and potentially enriched, must be materialized in memory on one executor. If the array was large enough to cause problems in the first place, reconstructing it can reintroduce the same bottleneck.
Asymmetric salting is therefore fully effective in two scenarios: either the join reduces the per-key payload enough to fit in executor memory, or the pipeline remains in exploded form and writes to a child table or partitioned dataset. As a decision rule: if the output must remain nested and the maximum per-key cardinality reaches 10⁶ rows or more, salting only moves the bottleneck from join to re-aggregation; a different output shape is the correct solution.
One caveat: collect_list does not guarantee element order after shuffles. If downstream consumers expect a stable ordering, sort the collected array explicitly — for instance with F.sort_array() or by collecting a struct with a sort key: transform(array_sort(collect_list(named_struct('ord', sort_col, 'val', payload))), x -> x.val). Sorting the DataFrame before groupBy is not sufficient; the shuffle does not preserve pre-shuffle order.
Performance Considerations: Is Asymmetric Salting Expensive?
Asymmetric salting is expensive. Two factors dominate. First, explode multiplies the right side: for n rows with an average of m nested elements, the exploded dataset has n × m rows. Second, left-side replication: each row must be duplicated once per salt bucket, producing L × s rows. In the toy example this is negligible (4 × 5 = 20), but in production with millions of left-side rows and a high salt count, this multiplication becomes a significant source of memory and shuffle overhead.
Total Shuffle Cost
The total shuffle cost depends on the strategy. A non-salted sort-merge join incurs two exchanges (one per join input). The hash-based salting variant adds one exchange for re-aggregation, for three total. The row_number variant adds the window shuffle on top of that, for four total. This means the technique replaces two skewed exchanges with three or four balanced ones — a trade-off worthwhile only when the skewed task’s duration dominates the total job runtime. Monitor the Spark UI’s stage breakdown before and after salting to verify the overhead is justified.
Choosing the Number of Salt Buckets
Choosing s is a trade-off between parallelism and replication overhead. Effective parallelism is bounded by min(s, spark.sql.shuffle.partitions, cluster_parallelism) — setting s above either limit adds replication cost without additional parallelism. Conversely, s should be large enough that max_array_size / s produces chunks comparable to the average partition size. In practice, values between 5 and 50 cover most production scenarios. Two constraints must hold simultaneously: max_array_size / s ≈ average_partition_size (to balance the skewed key) and L × s should remain manageable relative to the original left-side size (to limit replication overhead).
Selective Salting
In production pipelines, it is common for only a handful of keys to be responsible for the skew, while the vast majority of keys carry small payloads. Applying asymmetric salting to the entire dataset, replicating the full left side and exploding the full right side, is wasteful in such cases. A more efficient strategy is to split the pipeline into two paths: identify the skewed keys (e.g. by computing array sizes and filtering above a threshold), apply salting only to those keys, execute a standard join for the rest, and then union the two results.
The typical pattern: compute the set of skewed keys as a small, deduplicated, broadcast DataFrame; split both sides using semi-join (skewed path) and anti-join (normal path); apply salting only to the skewed splits; standard join for the rest; combine with unionByName. The left-side replication must only apply to the skewed left split. If the skewed-key set grows to thousands of keys, the semi/anti joins themselves become expensive and the split loses its advantage. Getting the splits wrong — filtering only one side — is a common source of missing rows or duplicates.
A compact skeleton of the split-path pattern:
# 1. Identify skewed keys (small, deduplicated, broadcast)
skewed_keys = (
df_right.select("KEY", F.size("PAYLOAD").alias("sz"))
.where("sz > threshold")
.select("KEY").distinct()
)
skewed_keys = F.broadcast(skewed_keys)
# 2. Split both sides
left_skewed = df_left.join(skewed_keys, on="KEY", how="semi")
left_normal = df_left.join(skewed_keys, on="KEY", how="anti")
right_skewed = df_right.join(skewed_keys, on="KEY", how="semi")
right_normal = df_right.join(skewed_keys, on="KEY", how="anti")
# 3. Salted join on skewed path only
result_skewed = salted_join(left_skewed, right_skewed, num_salts)
# 4. Standard join on normal path
result_normal = left_normal.join(right_normal, on="KEY", how="left")
# 5. Combine
result = result_skewed.unionByName(result_normal)
Code Example
This example demonstrates asymmetric salting to mitigate data skew in a left outer join in Apache Spark. The left side (df_buildings) contains a list of building IDs, while the right side (df_deliveries) includes a nested array of packages per building. One building ("A") has significantly more packages than the others, causing skew.
To address this, the nested array on the right side is first exploded, turning each package into its own row. A salt_index is then assigned to each row and combined with the building ID into a struct-based JOIN_KEY. Two salt strategies are shown: a hash-based approach (recommended for production, no extra shuffle or sort) and a row_number-based approach (perfect round-robin distribution, but requires a window shuffle and sort; see Note 6).
On the left side, each building is replicated for every possible salt index, generating the same JOIN_KEY values. This enables a join that spreads the heavy building’s records across multiple partitions, ensuring better parallelism.
After the join, the results are re-aggregated to restore the original nested format. Tests are included to confirm both that all 1000 packages for building "A" are preserved, and that no spurious NULL entries appear for buildings with few or no packages.
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
spark = SparkSession.builder.appName("AsymmetricSaltingExample").getOrCreate()
# Sample data: Buildings (left side)
buildings_data = [("A",), ("B",), ("C",), ("D",)]
df_buildings = spark.createDataFrame(buildings_data, ["BUILDING_ID"])
# Sample data: Deliveries (right side), skewed on BUILDING_ID = 'A'
# "C" has an empty array, "D" has null — both normalize to [] in the output.
deliveries_data = [
("A", [{"package_id": f"A-{i}"} for i in range(1000)]),
("B", [{"package_id": "B-1"}]),
("C", []),
("D", None),
]
deliveries_schema = StructType(
[
StructField("BUILDING_ID", StringType(), True),
StructField("PACKAGES", ArrayType(StructType([StructField("package_id", StringType(), True)])), True),
]
)
df_deliveries = spark.createDataFrame(deliveries_data, deliveries_schema)
# Explode nested array on the skewed side.
# explode_outer preserves rows with null or empty arrays by emitting a
# single null-element row (see Note 5).
df_exploded = df_deliveries.withColumn("PACKAGE", F.explode_outer("PACKAGES"))
# --- Salt strategy --------------------------------------------------------
# RECOMMENDED: hash-based salt. No extra shuffle, no sort, deterministic.
# Null payloads (from explode_outer) are routed to bucket 0 explicitly.
# The struct(isNull, id) wrapper avoids hash collisions on null package_ids.
num_salts = 5
df_salted_right = df_exploded.withColumn(
"salt_index",
F.when(F.col("PACKAGE").isNull(), F.lit(0))
.otherwise(F.pmod(
F.hash(F.struct(
F.col("PACKAGE.package_id").isNull(),
F.col("PACKAGE.package_id"),
)),
F.lit(num_salts)
))
)
# ALTERNATIVE: row_number-based salt. Perfect round-robin but adds a
# shuffle + sort via the window function. Uncomment to use.
#
# window_spec = Window.partitionBy("BUILDING_ID").orderBy(F.col("PACKAGE.package_id"))
# df_salted_right = df_exploded.withColumn(
# "salt_index", (F.row_number().over(window_spec) - 1) % num_salts
# )
# --------------------------------------------------------------------------
df_salted_right = (
df_salted_right
.withColumn("JOIN_KEY", F.struct(F.col("BUILDING_ID"), F.col("salt_index")))
.drop("BUILDING_ID", "salt_index", "PACKAGES")
)
# Left side: replicate each row for every salt_index (0 to num_salts - 1).
# broadcast() ensures the cross join runs locally per partition (no shuffle).
df_salts = spark.createDataFrame([(i,) for i in range(num_salts)], ["salt_index"])
df_left_expanded = (
df_buildings.crossJoin(F.broadcast(df_salts))
.withColumn("JOIN_KEY", F.struct(F.col("BUILDING_ID"), F.col("salt_index")))
.drop("salt_index")
)
# Join on salted key
df_joined = df_left_expanded.join(df_salted_right, on="JOIN_KEY", how="left")
# Re-aggregation: restore nested format.
# array_compact (Spark >= 3.4) removes NULLs from unmatched salt replicas.
df_result = df_joined.groupBy("BUILDING_ID").agg(
F.array_compact(F.collect_list("PACKAGE")).alias("PACKAGES")
)
# Materialize once; count() forces full evaluation before tests.
df_result = df_result.cache()
try:
df_result.count()
df_result.show(truncate=False)
# ------------------------
# Tests for correctness
# ------------------------
def test_asymmetric_salting():
rows = {row["BUILDING_ID"]: row["PACKAGES"] for row in df_result.collect()}
# Test 1: All 1000 packages for building 'A' are preserved
assert len(rows["A"]) == 1000, f"Expected 1000 packages for 'A', got {len(rows['A'])}"
# Test 2: Buildings with few packages have the correct count (no duplicates from salt replicas)
assert len(rows["B"]) == 1, f"Expected 1 package for 'B', got {len(rows['B'])}"
# Test 3: Empty-array building produces an empty PACKAGES array (not null, not inflated)
assert len(rows["C"]) == 0, f"Expected 0 packages for 'C', got {len(rows['C'])}"
# Test 4: Null-array building also normalizes to an empty PACKAGES array
assert len(rows["D"]) == 0, f"Expected 0 packages for 'D', got {len(rows['D'])}"
# Test 5: No NULL entries leak into any PACKAGES array
for bid, packages in rows.items():
null_count = sum(1 for p in packages if p is None)
assert null_count == 0, f"Found {null_count} NULL(s) in PACKAGES for building '{bid}'"
test_asymmetric_salting()
finally:
df_result.unpersist()
spark.stop()
Conclusions
Asymmetric salting offers a targeted solution to data skew in Spark when the issue lies not in key frequency but in oversized payloads per key. By exploding nested data and introducing synthetic keys, it enables Spark to parallelize workloads that would otherwise overwhelm a single task.
While more expensive than built-in optimizations, due to both the explosion of the right side and the replication of the left side, it is effective when AQE falls short, especially for complex aggregations involving nested arrays. It is worth noting that this is one of several strategies for handling nested skew; alternatives such as map-side pre-aggregation, bucketing, or custom partitioners may be more appropriate depending on the specific pipeline topology and data characteristics. Used judiciously, and with careful calibration of the number of salt buckets, asymmetric salting turns skewed joins into scalable, balanced computations.
Appendix: Implementation Notes
collect_list and skew. collect_list materializes the entire group in memory on a single executor. Unlike scalar aggregations (count, sum), it requires the full group before producing a result. This makes it one of the most common triggers for skew-related OutOfMemoryError in nested-data pipelines.
explode vs explode_outer. explode silently drops rows where the array is null or empty. explode_outer preserves them by emitting a single row with a null value for the exploded column — important for left-join semantics where every key must appear in the output.
AQE skew detection parameters. skewedPartitionThresholdInBytes (default: 256 MB) and skewedPartitionFactor (default: 5×) must both be met. Thresholds operate on serialized shuffle data size (after compression, before deserialization). See Spark’s configuration reference.
Salt strategies. The code example uses pmod(hash(...)) as the production default. Three alternatives avoid the window shuffle:
(F.rand(seed) * s).cast("int"): lightest option, statistically uniform but not perfectly so. Use a seed for reproducibility in test environments.F.monotonically_increasing_id() % s: deterministic per-run but distribution is uneven because the partition ID is encoded in the upper bits.pmod(hash(struct(isNull(col), col)), s): deterministic, reproducible, distributes well for high-cardinality attributes. Usepmod— not%— becausehash()returns a signed 32-bit integer.
The row_number() variant forces a shuffle + sort via Window.partitionBy, concentrating the skewed key’s rows into a single task before salting — the very problem salting is meant to solve. Use only when perfect round-robin is required and the sort cost is acceptable.
A note on hash(null): Spark’s MurmurHash3 never returns null. For a null column value it returns the seed value (currently 42) — an implementation detail, not a contract. The struct wrapper struct(isNull(id), id) ensures that (true, null) hashes differently from (false, real_id). Always use an explicit guard for null payloads rather than relying on hash behavior.
Broadcast hint for replication. The explicit F.broadcast() on the salt DataFrame ensures the cross join runs locally per partition. Without it, Spark usually broadcasts automatically (if size < autoBroadcastJoinThreshold), but this threshold is sometimes set to -1 in large-scale jobs. The explicit hint removes the ambiguity.
Struct vs string composite key. F.struct(key, salt) preserves types and avoids ambiguity. F.concat_ws("-", key, salt) is readable but if the key itself contains the separator, parsing the composite key back into its components becomes unreliable. The struct-based approach eliminates this class of bugs entirely.
array_compact and null handling. collect_list already skips nulls, but array_compact (Spark >= 3.4) makes the intent explicit. For Spark 3.1–3.3, use F.filter(collect_list(...), lambda x: x.isNotNull()).
NULL multiplication in left outer joins. Between the salted join and re-aggregation, cardinality is temporarily inflated: keys with no right-side match appear s times (one NULL per salt bucket), and keys with partial matches produce s − k NULL rows. If monitoring or audit stages run on the intermediate DataFrame, the inflated counts can trigger false alerts. Re-aggregation corrects this.
Alternative strategies. Asymmetric salting is one of several approaches: map-side pre-aggregation, repartitionByRange, custom RDD partitioners, persistent bucketBy, AQE automatic skew join optimization (Spark 3.0+, with improvements in 3.2+), and broadcast side switching. Each involves different trade-offs in complexity, overhead, and optimizer compatibility.
Null vs empty array normalization. The code normalizes both null and [] to []. If your domain requires the distinction (auditing, SLA enforcement), add a flag column before exploding: withColumn("was_null", col("PACKAGES").isNull()) and restore it after re-aggregation via F.when(col("was_null"), lit(None)).otherwise(col("PACKAGES")).
Bibliography
Apache Spark. Configuration — Runtime SQL Configuration. In Spark 3.5.8 Documentation. Retrieved from https://spark.apache.org/docs/3.5.8/configuration.html#runtime-sql-configuration
Apache Spark. Performance Tuning — Adaptive Query Execution. In Spark 3.5.8 Documentation. Retrieved from https://spark.apache.org/docs/3.5.8/sql-performance-tuning.html
Canadian Data Guy. A deep dive into skewed joins, groupby bottlenecks, and smart strategies to keep your Spark jobs flying. Canadian Data Guy’s No Fluff Newsletter. Retrieved June 2025 from https://www.canadiandataguy.com/p/a-deep-dive-into-skewed-joins-groupby
Cox Automotive Data Solutions. The Taming of the Skew — Part Two. Data Driven Blog. Retrieved from https://coxautomotivedatasolutions.github.io/datadriven/spark/data%20skew/joins/data-skew-2/
Databricks. Adaptive query execution. Databricks Documentation. Retrieved from https://docs.databricks.com/aws/en/optimizations/aqe