In large-scale data pipelines, skew can occur when one key carries a disproportionately large payload, for example a nested array containing thousands of records. Even if that key appears only once, the task responsible for processing it must handle significantly more data than the others, leading to execution imbalance. This article introduces asymmetric salting: a practical optimization technique for addressing data skew caused not by how frequently a key appears, but by how much data is associated with it.
Unlike traditional salting, where both sides of a join are treated symmetrically because the skew comes from repeated keys, asymmetric salting targets a fundamentally different root cause: oversized nested payloads attached to otherwise unique keys. The “asymmetry” lies in the fact that only the heavy side needs to be decomposed (via explode), while the light side is merely replicated with matching salt values. The two sides undergo structurally different transformations to achieve balanced parallelism.
The primary target is skewed joins and enrichment pipelines where a nested payload must be matched against a reference dataset; the nested re-aggregation that often follows is a common but not always necessary step, and, as discussed later, can itself become a bottleneck.
An Example
Let’s start with an analogy.
Imagine organizing package delivery across a city. For logistical reasons, each building’s deliveries are routed through a dedicated sorting depot. This ensures that all packages destined for the same address are grouped and handled together, maintaining consistency and avoiding mix-ups.
Most buildings receive a handful of small packages each day. But one day, a perfectly ordinary-looking box shows up. It’s addressed to just one building, like all the others. But when opened, it reveals a seemingly endless stream of items. It’s a bottomless container, pulling out thousands of smaller packages from within.
Even though this building appears only once on the delivery manifest, the sheer volume of content associated with it overwhelms its assigned depot. The problem isn’t that the building is frequently listed: its single entry carries a massive payload.
This is more or less what happens in Spark. During operations like groupBy, join, or collect_list ¹, Spark routes all records sharing the same key to the same partition ², just like the depot. If that key includes a deeply nested or large array, even in a single row, the entire partition becomes skewed, and the task assigned to it becomes a bottleneck.
It is important to note that this form of skew manifests specifically when the nested array remains materialized as a single row until a shuffle boundary, for instance when a join or aggregation forces Spark to redistribute data by key before the nested structure has been expanded. If the array were exploded before the shuffle, the individual elements would already be separate rows and could be distributed across partitions normally. The problem, in other words, is not the nested structure per se, but the fact that it crosses a shuffle boundary intact, concentrating all of its weight into a single task. This is precisely why the first step of asymmetric salting is to explode the payload before the first shuffle that repartitions by the join key (i.e., before the join’s exchange).
However, exploding before the join is necessary but not sufficient. Even after the nested array has been flattened into individual rows, all those rows still share the same join key. When the join’s exchange repartitions by that key, every exploded element for the heavy key lands in the same shuffle partition: the partition is now large not because a single row was heavy, but because one key dominates the row count. If the reducer for that partition must also perform expensive per-row work (UDF evaluation, enrichment lookups, or complex predicate filtering), it remains the straggler. Salting is the explicit lever that forces parallelism for that key, regardless of Spark’s partitioning heuristics.
Why Adaptive Query Execution (AQE) Isn’t Enough
Since Spark 3.0, Adaptive Query Execution (AQE) has introduced several optimizations that help mitigate data skew automatically. For example, Spark can detect partitions with a disproportionate size in bytes ³ (exceeding both an absolute threshold and a relative factor compared to median partition size) and apply dynamic join strategies (like broadcasting small tables) or split oversized partitions into smaller subpartitions.
AQE’s skew detection is effective when the imbalance is visible in serialized shuffle data size, and if an explode happens before the shuffle, AQE can in principle detect and split the resulting oversized partition. But it offers no guarantees when the skew is driven by in-memory amplification or per-row computational complexity rather than raw byte volume ⁴. A single key associated with a nested structure that expands into hundreds of internal records during processing can appear normal in serialized size while requiring gigabytes of memory to process. AQE sees an acceptable partition and assumes balanced workload. Asymmetric salting exists precisely to give the developer explicit control over how the load is distributed, rather than relying on a heuristic that may or may not trigger.
Asymmetric Salting: Algorithm
This is where asymmetric salting comes in. Let’s describe its algorithm.
Input:
- A left-side dataset containing lightweight entities uniquely identified by a key.
- A right-side dataset with a foreign key pointing to the left-side key, and a nested array of structs, which may be very large for some keys.
Problem: Some right-side records contain massive nested structures for specific keys, causing a single task to become overloaded in Spark during the join, despite a uniform key distribution.
Objective: Redistribute the processing of these large payloads across multiple tasks, without altering the logical meaning of the data.
Algorithm Steps
-
Detect the skew source: Identify that the skew originates not from repeated keys but from a single key associated with a large nested array on the right-hand side.
-
Expand the payload: Convert the nested structure into a flat representation ⁵, so that individual nested elements become distinct rows. This enables fine-grained distribution of the load.
-
Generate a virtual partitioning key: For each expanded row, compute a salt value ⁶, for instance by assigning a sequential index to each row within the same key and taking its modulo over a fixed number of buckets. This salt value is used to artificially partition the data across multiple tasks. Note that the salt is derived from the row’s position within its group, not from the join key itself.
-
Replicate the salt space on the left side: On the left-side dataset, generate one copy of each row for every possible salt value ⁷. This ensures that every sub-partition of the exploded right-side data finds a corresponding match in the join. This replication has a cost: the left side grows by a factor equal to the number of salt buckets, which should be taken into account when sizing the salt space (see Performance Considerations).
-
Construct a salted join key: Combine the original join key and the salt value to form a new composite key ⁸ used for joining. This ensures that multiple logical partitions of the heavy key can be processed in parallel.
-
Execute the join: Perform the join using the salted composite key. This enables Spark to distribute the join across multiple tasks even for originally skewed keys.
-
(Optional) Reaggregate the payload: If needed, group the resulting data by the original key and reconstruct the nested structure ⁹. When using a left outer join, take care to filter out NULL entries introduced by the replication of the left side: keys with no match on the right side will produce one NULL row per salt bucket instead of a single NULL row ¹⁰.
Semantic Correctness
Spark’s execution model guarantees that all records sharing the same key are routed to the same shuffle partition under the default hash-partitioning strategy; this is the semantic contract that makes groupBy and join correct. Asymmetric salting temporarily breaks this contract by splitting the records of a single logical key across multiple partitions via a synthetic composite key. Correctness is restored by the re-aggregation step (step 7), which collapses the salted sub-partitions back into the original key space. If re-aggregation is omitted or groups by the salted key instead of the original key, the output is semantically wrong: a single logical entity appears as multiple fragments. This reversibility is the theoretical invariant that makes the entire approach valid.
The Re-aggregation Bottleneck
The re-aggregation step deserves special attention. Although asymmetric salting distributes the join workload across multiple tasks, the final groupBy(original_key).collect_list(...) gathers all records for each key back into a single task. For the very keys that caused the original skew, this means the entire payload, now joined and potentially enriched, must be materialized in memory on one executor. If the array was large enough to cause problems in the first place, reconstructing it can reintroduce the same bottleneck.
This means asymmetric salting is fully effective only in one of two scenarios: either the final per-key payload, after joining, is small enough to fit comfortably in executor memory (because the join filtered, projected, or reduced the data), or the pipeline does not need to reconstruct the nested structure at all. Many production pipelines can remain in exploded (flat) form and write directly to a child table or a partitioned dataset, avoiding the re-aggregation entirely. As a practical decision rule: if the final representation must remain nested and the maximum per-key cardinality remains on the order of 10⁶ rows or more, asymmetric salting only moves the bottleneck from the join stage to the re-aggregation stage; in that case, a different output shape (child table, exploded sink, or incremental chunked writes) is the correct solution.
One additional caveat: collect_list does not guarantee element order after shuffles. If your downstream consumers expect a stable array ordering (e.g. chronological, by ID), you must explicitly sort the collected array, for instance by applying F.sort_array() on the result if the element type supports lexicographic ordering, or by collecting a struct that includes the sort key and then extracting the payload with transform(array_sort(collect_list(named_struct('ord', sort_col, 'val', payload))), x -> x.val). Sorting the DataFrame before the groupBy is not sufficient, because the shuffle redistributes rows and does not preserve the pre-shuffle order. This is a common footgun that is unrelated to salting but becomes more visible when salting changes the physical layout of rows.
Performance Considerations: Is Asymmetric Salting Expensive?
Asymmetric salting is a computationally expensive strategy, and it is important to understand when its cost is justified.
The core operation that drives this cost is the explode applied to a nested array. When each row in the dataset contains a nested array with multiple elements, exploding these structures causes the dataset to grow significantly, potentially multiplying the number of rows by the average array size. For a dataset with n rows and an average of m nested elements per row, the resulting exploded dataset has a size on the order of n × m. This growth increases not only storage and memory pressure but also the volume of data shuffled ¹¹ during the join.
An equally important cost factor is the replication of the left side. To ensure that every salted sub-partition on the right side has a matching key on the left, each row in the left dataset must be duplicated once per salt bucket. If the left side has L rows and there are s salt buckets, the replicated left side contains L × s rows. In the toy example of this article this is negligible (4 × 5 = 20 rows), but in production pipelines with millions of entities on the left side and a high number of salt buckets, this multiplication can become a significant source of memory and shuffle overhead.
Total Shuffle Cost
It is important to recognize that the total shuffle cost depends on the salt strategy chosen. The row_number-based variant can introduce up to three full shuffles: one for the window function (which requires repartitioning by the original key and sorting within each partition), one for the salted join itself, and one for the final re-aggregation by the original key. The hash-based variant (the recommended default) eliminates the window shuffle entirely, reducing the total to two shuffles: one for the salted join and one for the re-aggregation. In contrast, a non-salted join would incur only one shuffle. This means the technique replaces one skewed shuffle with one or two balanced ones, a trade-off that is worthwhile only when the skewed task’s duration dominates the total job runtime by a wide margin. Monitoring the Spark UI’s stage breakdown before and after applying salting is essential to verify that the overhead of additional shuffles is justified by the reduction in skew.
Choosing the Number of Salt Buckets
Choosing the number of salt buckets is a trade-off between parallelism and replication overhead ¹².
Selective Salting
In production pipelines, it is common for only a handful of keys to be responsible for the skew, while the vast majority of keys carry small payloads. Applying asymmetric salting to the entire dataset, replicating the full left side and exploding the full right side, is wasteful in such cases. A more efficient strategy is to split the pipeline into two paths: identify the skewed keys (e.g. by computing array sizes and filtering above a threshold), apply salting only to those keys, execute a standard join for the rest, and then union the two results.
Implementing this correctly requires care to avoid data duplication or loss. The typical pattern is: compute the set of skewed keys as a small DataFrame (e.g. df_right.select("KEY", F.size("PAYLOAD").alias("sz")).where("sz > threshold").select("KEY").distinct() when there is one row per key, or df_right.groupBy("KEY").agg(F.max(F.size("PAYLOAD")).alias("sz")).where("sz > threshold").select("KEY") when multiple rows per key are possible); broadcast this set explicitly via F.broadcast(skewed_keys) to keep the semi/anti joins cheap; split both the left and right datasets using a semi-join (skewed path) and an anti-join (normal path) against that key set; apply asymmetric salting only to the skewed splits; perform a standard join on the normal splits; and finally combine the two results with unionByName. The left-side replication (crossJoin with salt indices) should only be applied to the skewed left split, not the full left dataset. The skewed-keys DataFrame must remain small and deduplicated: if it grows to thousands of keys or more, the semi/anti joins themselves become expensive, and the split strategy loses its advantage. The threshold used to identify skewed keys should reflect an application-specific notion of “heaviness”: F.size("PAYLOAD") works when the payload is a single array, but pipelines with multiple nested fields or deeply nested structs may need a different metric (e.g. sum of nested array lengths, or a byte-size proxy). Getting these splits wrong, for instance by filtering only one side, is a common source of either missing rows or duplicates in the output.
A compact skeleton of the split-path pattern:
# 1. Identify skewed keys (small, deduplicated, broadcast)
skewed_keys = (
df_right.select("KEY", F.size("PAYLOAD").alias("sz"))
.where("sz > threshold")
.select("KEY").distinct()
)
skewed_keys = F.broadcast(skewed_keys)
# 2. Split both sides
left_skewed = df_left.join(skewed_keys, on="KEY", how="semi")
left_normal = df_left.join(skewed_keys, on="KEY", how="anti")
right_skewed = df_right.join(skewed_keys, on="KEY", how="semi")
right_normal = df_right.join(skewed_keys, on="KEY", how="anti")
# 3. Salted join on skewed path only
result_skewed = salted_join(left_skewed, right_skewed, num_salts)
# 4. Standard join on normal path
result_normal = left_normal.join(right_normal, on="KEY", how="left")
# 5. Combine
result = result_skewed.unionByName(result_normal)
Code Example
This example demonstrates asymmetric salting to mitigate data skew in a left outer join in Apache Spark. The left side (df_buildings) contains a list of building IDs, while the right side (df_deliveries) includes a nested array of packages per building. One building ("A") has significantly more packages than the others, causing skew.
To address this, the nested array on the right side is first exploded, turning each package into its own row. A salt_index is then assigned to each row and combined with the building ID into a struct-based JOIN_KEY. Two salt strategies are shown: a hash-based approach (recommended for production, no extra shuffle or sort) and a row_number-based approach (perfect round-robin distribution, but requires a window shuffle and sort; see Note 6).
On the left side, each building is replicated for every possible salt index, generating the same JOIN_KEY values. This enables a join that spreads the heavy building’s records across multiple partitions, ensuring better parallelism.
After the join, the results are re-aggregated to restore the original nested format. Tests are included to confirm both that all 1000 packages for building "A" are preserved, and that no spurious NULL entries appear for buildings with few or no packages.
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
spark = SparkSession.builder.appName("AsymmetricSaltingExample").getOrCreate()
# Sample data: Buildings (left side)
buildings_data = [("A",), ("B",), ("C",), ("D",)]
df_buildings = spark.createDataFrame(buildings_data, ["BUILDING_ID"])
# Sample data: Deliveries (right side), skewed on BUILDING_ID = 'A'
# Includes an empty-array case ("C") and a null-array case ("D") to
# exercise the null-payload guard.
# NOTE: both null and empty arrays become empty arrays in the final output.
# explode_outer emits a single null-element row for both cases, then
# array_compact(collect_list(...)) removes the null, producing [].
# This is a deliberate simplification; see Note 14 if your pipeline must
# distinguish null from empty.
deliveries_data = [
("A", [{"package_id": f"A-{i}"} for i in range(1000)]),
("B", [{"package_id": "B-1"}]),
("C", []),
("D", None),
]
deliveries_schema = StructType(
[
StructField("BUILDING_ID", StringType(), True),
StructField("PACKAGES", ArrayType(StructType([StructField("package_id", StringType(), True)])), True),
]
)
df_deliveries = spark.createDataFrame(deliveries_data, deliveries_schema)
# Explode nested array on the skewed side.
# explode_outer preserves rows with null or empty arrays by emitting a
# single null-element row (see Note 5).
df_exploded = df_deliveries.withColumn("PACKAGE", F.explode_outer("PACKAGES"))
# --- Salt strategy --------------------------------------------------------
# Choose ONE of the two strategies below.
#
# RECOMMENDED (production): hash-based salt.
# No extra shuffle, no sort. Deterministic.
# Uses pmod to guarantee a non-negative result (see Note 6).
# Null payloads (from explode_outer on null or empty arrays) are routed to
# bucket 0 so they match exactly one left-side replica. Without this guard,
# hash(struct(null, null)) returns 42 (identical to hash(null)), landing
# in whichever bucket pmod(42, s) maps to. The guard makes the routing
# explicit and independent of the MurmurHash3 seed value.
# The hash input is a struct(isNull, id) to avoid sentinel collisions
# when package_id can be null — see Note 6 for alternatives.
# Two distinct null scenarios are handled:
# 1. PACKAGE is null (from explode_outer on null or empty arrays) → bucket 0
# (explicit routing, independent of hash implementation details)
# 2. PACKAGE exists but package_id is null → struct(true, null) hashes
# differently from struct(false, "real_id"), preventing collisions
num_salts = 5
df_salted_right = df_exploded.withColumn(
"salt_index",
F.when(F.col("PACKAGE").isNull(), F.lit(0))
.otherwise(F.pmod(
F.hash(F.struct(
F.col("PACKAGE.package_id").isNull(),
F.col("PACKAGE.package_id"),
)),
F.lit(num_salts)
))
)
# ALTERNATIVE (didactic): row_number-based salt.
# Perfect round-robin distribution, but introduces an extra shuffle + sort
# via the window function — can become a bottleneck on very large arrays.
# Null payloads from explode_outer (null or empty arrays) receive
# row_number = 1, so (1 - 1) % num_salts = 0 — they land in bucket 0
# naturally.
# Uncomment the lines below (and comment out the hash-based block above)
# to use this variant.
#
# window_spec = Window.partitionBy("BUILDING_ID").orderBy(F.col("PACKAGE.package_id"))
# df_salted_right = df_exploded.withColumn(
# "salt_index", (F.row_number().over(window_spec) - 1) % num_salts
# )
# --------------------------------------------------------------------------
df_salted_right = (
df_salted_right
.withColumn("JOIN_KEY", F.struct(F.col("BUILDING_ID"), F.col("salt_index")))
.drop("BUILDING_ID", "salt_index", "PACKAGES")
)
# Left side: replicate rows for each salt_index (0 to num_salts - 1).
# This multiplies the left side by num_salts — acceptable here (4 × 5 = 20 rows),
# but in production with millions of left-side rows, this cost must be considered.
# The explicit broadcast() ensures the cross join is executed locally
# per partition (no shuffle for the replication step), with the salt
# table broadcasted. See Note 7 for why the explicit hint matters.
df_salts = spark.createDataFrame([(i,) for i in range(num_salts)], ["salt_index"])
df_left_expanded = (
df_buildings.crossJoin(F.broadcast(df_salts))
.withColumn("JOIN_KEY", F.struct(F.col("BUILDING_ID"), F.col("salt_index")))
.drop("salt_index")
)
# Join on salted key
df_joined = df_left_expanded.join(df_salted_right, on="JOIN_KEY", how="left")
# Re-aggregation: restore nested format.
# array_compact is a defensive guard against NULLs from unmatched salt
# replicas in the left outer join. collect_list already skips nulls, but
# the explicit removal makes the intent clear (see Note 9).
# array_compact requires Spark >= 3.4. For Spark 3.1–3.3, replace with:
# F.filter(F.collect_list("PACKAGE"), lambda x: x.isNotNull())
df_result = df_joined.groupBy("BUILDING_ID").agg(
F.array_compact(F.collect_list("PACKAGE")).alias("PACKAGES")
)
# Materialize once to avoid re-executing the full lineage per test.
# count() forces a full pass over the output partitions, populating the
# cache; show() alone may only evaluate a subset.
df_result = df_result.cache()
try:
df_result.count()
df_result.show(truncate=False)
# ------------------------
# Tests for correctness
# ------------------------
def test_asymmetric_salting():
rows = {row["BUILDING_ID"]: row["PACKAGES"] for row in df_result.collect()}
# Test 1: All 1000 packages for building 'A' are preserved
assert len(rows["A"]) == 1000, f"Expected 1000 packages for 'A', got {len(rows['A'])}"
# Test 2: Buildings with few packages have the correct count (no duplicates from salt replicas)
assert len(rows["B"]) == 1, f"Expected 1 package for 'B', got {len(rows['B'])}"
# Test 3: Empty-array building produces an empty PACKAGES array (not null, not inflated)
assert len(rows["C"]) == 0, f"Expected 0 packages for 'C', got {len(rows['C'])}"
# Test 4: Null-array building also normalizes to an empty PACKAGES array
assert len(rows["D"]) == 0, f"Expected 0 packages for 'D', got {len(rows['D'])}"
# Test 5: No NULL entries leak into any PACKAGES array
for bid, packages in rows.items():
null_count = sum(1 for p in packages if p is None)
assert null_count == 0, f"Found {null_count} NULL(s) in PACKAGES for building '{bid}'"
test_asymmetric_salting()
finally:
df_result.unpersist()
spark.stop()
Conclusions
Asymmetric salting offers a targeted solution to data skew in Spark when the issue lies not in key frequency but in oversized payloads per key. By exploding nested data and introducing synthetic keys, it enables Spark to parallelize workloads that would otherwise overwhelm a single task.
While more expensive than built-in optimizations, due to both the explosion of the right side and the replication of the left side, it is effective when AQE falls short, especially for complex aggregations involving nested arrays. It is worth noting that this is one of several strategies for handling nested skew ¹³; alternatives such as map-side pre-aggregation, bucketing, or custom partitioners may be more appropriate depending on the specific pipeline topology and data characteristics. Used judiciously, and with careful calibration of the number of salt buckets, asymmetric salting turns skewed joins into scalable, balanced computations.
Notes
¹ collect_list is particularly sensitive to skew because it materializes the entire array for a given key in the memory of a single executor. Unlike aggregations such as count or sum, which produce a scalar, collect_list accumulates every row into a growing in-memory buffer. While Spark 3.x has improved memory management through Tungsten aggregation buffers and sort-based aggregation fallback, the fundamental constraint remains: collect_list requires the full group to be materialized before producing a result, and the per-group memory pressure dominates when a single key concentrates a disproportionate number of rows. This makes collect_list one of the most common triggers for skew-related OutOfMemoryError failures in pipelines that work with nested data.
² Spark determines partition assignment during a shuffle using hash partitioning by default: it computes hash(key) % numPartitions and routes the record to the corresponding partition. This guarantees that all records with the same key land on the same partition, a prerequisite for operations like groupBy, join, and reduceByKey. The number of shuffle partitions is controlled by spark.sql.shuffle.partitions (default: 200). Increasing this value creates more, smaller partitions, which can help with general load distribution but does nothing against single-key skew: no matter how many partitions exist, all records sharing the same key will still hash to exactly one of them.
³ AQE’s skew detection for joins is governed by two parameters (documented in Spark’s configuration reference): spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (default: 256 MB) sets the absolute minimum byte size for a partition to be considered skewed, while spark.sql.adaptive.skewJoin.skewedPartitionFactor (default: 5) defines the relative factor, where a partition is skewed if its size exceeds this factor multiplied by the median partition size. Both conditions must be met simultaneously. When triggered, AQE splits the oversized partition into subpartitions and replicates the corresponding data from the other side of the join. Crucially, these thresholds operate on serialized shuffle data size, which is measured after the shuffle write and after optional compression (spark.shuffle.compress, enabled by default), but before deserialization and any in-memory expansion caused by processing nested structures.
⁴ It is worth noting that AQE also provides spark.sql.adaptive.coalescePartitions.enabled, which merges small post-shuffle partitions to reduce task overhead. However, like skew join handling, this mechanism is based on partition byte sizes and cannot detect or respond to per-row computational asymmetry. There is currently no built-in Spark mechanism that profiles per-row processing cost at runtime.
⁵ In PySpark, the standard function for flattening arrays is F.explode(), which takes an array column and produces one output row for each element. An important caveat: explode silently drops rows where the array is null or empty. F.explode_outer() preserves these rows by emitting a single row with a null value for the exploded column; this applies to both null arrays and empty arrays ([]), which is important for left-join semantics where every key must appear in the output. The code example uses explode_outer() as the safer default. When combining explode_outer with a hash-based salt strategy, the resulting null elements require explicit handling (see Note 6 for details).
⁶ The code example uses pmod(hash(...)) as the default salt strategy because it avoids the extra shuffle and sort that a window function would introduce. The row_number() alternative, shown commented out, has a hidden cost: the window function Window.partitionBy("BUILDING_ID") forces a shuffle that repartitions the data by the original key, and the orderBy clause then requires a sort within each resulting partition. It is this shuffle, not the preceding explode_outer, that concentrates all rows of the skewed key into a single task. The explode_outer itself merely transforms one row into many within the same partition; it is the window’s partitionBy that triggers the redistribution. After the shuffle, the sort operates on the full exploded array (e.g. 1000 rows for building “A”) inside that one task. In extreme cases, with arrays containing millions of elements, this sort can become a significant bottleneck before the salted join even begins.
The three native Catalyst alternatives shown or discussed avoid the extra shuffle and sort introduced by the window:
(F.rand() * num_salts).cast("int"): generates a pseudo-random salt per row with no shuffle, no sort, and no window specification. The distribution is statistically uniform but not perfectly uniform (one bucket might receive 210 rows and another 190 instead of exactly 200 each). For salting purposes, this variance is negligible, makingrand()the lightest-weight option available. However,rand()produces different values on every action (and every re-computation due to lineage replay) unless a seed is fixed viaF.rand(42). Seededrandis deterministic but still pseudo-random; this is fine because correctness does not depend on which bucket each row lands in, only on the final re-aggregation. In test and QA environments, always use a seeded variant to ensure reproducible results; in production, unseededrand()is acceptable but complicates troubleshooting and makes stage metrics less stable across retries.F.monotonically_increasing_id() % num_salts: assigns a globally unique, monotonically increasing 64-bit integer to each row, also without requiring a shuffle or sort. However, the IDs encode the partition ID in the upper bits, so the modulo distribution across buckets may be less uniform thanrand(). The IDs also depend on physical partitioning and execution order, so they can change between runs and even between stage retries, acceptable for load balancing but not for reproducibility.F.pmod(F.hash(F.struct(F.col("PACKAGE.package_id").isNull(), F.col("PACKAGE.package_id"))), F.lit(num_salts)): computes a deterministic hash of an element-level attribute (with a null-safe struct wrapper) and takes the positive modulo. This avoids the shuffle and sort, produces a reproducible salt (unlikerand()), and distributes well if the hashed attribute has high cardinality. It is the most robust option when the ordering implied byrow_number()is correlated with the physical partition layout, which can cause micro-skew within salt buckets. Note the use ofF.pmodinstead of the%operator:F.hash()returns a signed 32-bit integer, and Python/Spark’s%can produce negative results, which would create salt indices outside the expected0..s-1range and break the join with the left-side replicas. A common misconception is thatF.hash(null)returnsnull; in fact, Spark’shash()(MurmurHash3) never returnsnull. For a null column value it currently returns a deterministic non-null integer (observably 42, the MurmurHash3 seed value, though this is an implementation detail, not a documented contract). A non-null struct where all fields are null, such asstruct(null, null), also returns the same value. However, a struct with at least one non-null field produces a distinct hash: for example,struct(true, null)returns −559580957. This is why the struct wrapperstruct(isNull(package_id), package_id)works: whenpackage_idis null the struct becomes(true, null), which hashes differently from any(false, real_id)pair. The key takeaway is not the specific value but the principle: do not base correctness on the hash of null inputs; use an explicit guard instead. Without theF.whenguard, null-payload rows (where the entirePACKAGEstruct is null) would silently land in whichever bucket the hash implementation happens to map them to. The guard makes the routing explicit (bucket 0) and independent of any implementation detail. An alternative isF.coalescewith a domain-impossible sentinel, but the struct approach is cleaner when the set of possible values is not fully known. One practical caveat: if the element attribute you hash has low cardinality (e.g. many elements share the samepackage_id), the salt distribution collapses and most rows land in the same bucket. Always hash a stable high-cardinality surrogate (such as a unique element ID); if none exists, fall back torand(seed)which guarantees statistical uniformity regardless of cardinality.
All three expressions run entirely in the JVM and are fully optimizable by Catalyst. A Python UDF generating random keys would also avoid the sort, but it introduces serde overhead (JVM/Python serialization for every row) and is opaque to the optimizer, making it the worst choice among the available alternatives. The code example uses pmod(hash(...)) as the production-recommended default; the row_number() variant is included as a commented-out alternative for cases where perfect round-robin distribution is required and the sort cost is acceptable.
A separate note on bucket distribution: with (row_number() - 1) % s, the assignment is perfectly round-robin and zero-based. If the total count is not a multiple of s, the last bucket receives one fewer row, a negligible imbalance in practice. With pmod(hash(...), s), the distribution is statistically uniform but depends on the hash function’s behavior for the specific data; in practice, Spark’s built-in hash distributes well for columns with reasonable cardinality.
⁷ In the code example, the replication is performed via crossJoin with a small DataFrame of salt indices, wrapped in an explicit F.broadcast() hint. The broadcast ensures the cross join is executed locally per partition (no shuffle for the replication step): each executor receives the tiny salt DataFrame in memory and replicates its local data without network redistribution. Without the explicit hint, Spark’s optimizer will typically broadcast the salt DataFrame automatically if its size falls below spark.sql.autoBroadcastJoinThreshold (default: 10 MB), which is almost always the case for a DataFrame of s rows. However, this threshold is sometimes set to -1 in large-scale jobs to disable automatic broadcasting globally, and even with the threshold enabled, the broadcast decision can depend on cost-based optimizer statistics that may not always be available. The explicit broadcast() removes this ambiguity. The real cost of the replication is not the cross join itself but the resulting L × s rows that must be shuffled and processed in downstream stages.
⁸ The composite key in the code example is built using F.struct(F.col("BUILDING_ID"), F.col("salt_index")), which produces a struct column that preserves type information and avoids any risk of key collision. A simpler alternative is F.concat_ws("-", key, salt), which produces a human-readable string like "A-3", useful when inspecting intermediate DataFrames during debugging. However, string concatenation carries a subtle risk: if the original key can contain the separator character (here, "-"), the composite key becomes ambiguous. For instance, key "A-1" with salt 3 and key "A" with salt 13 could both produce "A-1-3". The struct-based approach eliminates this class of bugs entirely and is the recommended default for production code.
⁹ The re-aggregation step in the code example wraps collect_list with array_compact as a defensive guard. In practice, collect_list already skips null values (the Collect base class checks for null before appending to the buffer). The array_compact wrapper makes the null-exclusion intent explicit and protects against potential future changes to collect_list’s null-handling behavior. array_compact is available since Spark 3.4. For earlier versions, the same result can be achieved with the higher-order function F.filter() (available since Spark 3.1): df_result.withColumn("PACKAGES", F.filter(F.col("PACKAGES"), lambda x: x.isNotNull())), or by wrapping the collect_list argument in F.when(F.col("PACKAGE").isNotNull(), F.col("PACKAGE")) to avoid collecting nulls in the first place. If your pipeline must support Spark versions before 3.4, use the F.filter approach.
¹⁰ The NULL multiplication caused by left outer join with salting has a subtle implication for pipeline stages that inspect intermediate results. Between the salted join and the re-aggregation, the DataFrame’s cardinality is temporarily inflated. Keys with no right-side match appear s times (once per salt bucket), each producing a NULL payload. But keys with partial matches are also affected: if a key’s exploded elements land in only k of the s buckets, the remaining s − k replicas produce NULL rows. For example, building “B” with 1 package occupies one bucket; the other s − 1 replicas all yield NULLs. If downstream monitoring, audit, or data-quality stages count rows or compute metrics on this intermediate DataFrame, as is common in regulated pipelines (finance, healthcare), the inflated cardinality can trigger false alerts or produce incorrect statistics. The re-aggregation corrects this inflation, but any logic that runs between the join and the re-aggregation must account for it.
¹¹ A shuffle in Spark is the mechanism by which data is redistributed across partitions, and it is one of the most expensive operations in the execution engine. During a shuffle, each mapper task serializes its output records, writes them to local disk grouped by target partition, and then each reducer task fetches the relevant segments over the network from every mapper. This involves serialization, disk I/O, network transfer, and deserialization: four distinct cost layers. Shuffle volume is reported in Spark’s UI under “Shuffle Write” and “Shuffle Read” and is one of the most reliable indicators of a job’s I/O cost. In the context of asymmetric salting, the explode step can dramatically increase shuffle write volume because the number of rows, and thus the total serialized payload, grows by a factor of m (the average array size).
¹² The number of salt buckets s should ideally be calibrated against the physical characteristics of the cluster, not just the data. The effective parallelism achievable for a single skewed key is bounded by min(s, spark.sql.shuffle.partitions, cluster_parallelism), where cluster parallelism is the total number of executor cores available. Setting s significantly above either spark.sql.shuffle.partitions or the available cores does not yield additional parallelism; it only increases the overhead of left-side replication and join cardinality. Conversely, s should be large enough that max_array_size / s produces chunks comparable to the average partition size, preventing any single salt bucket from dominating. In practice, values between 5 and 50 cover most production scenarios. As a rule of thumb, s should satisfy two constraints simultaneously: max_array_size / s ≈ average_partition_size (to balance the skewed key) and L × s should remain manageable relative to the original left-side size (to limit replication overhead).
¹³ Asymmetric salting is one of several strategies for mitigating nested-data skew in Spark. Depending on the pipeline’s topology and constraints, other approaches may be more appropriate: map-side pre-aggregation can reduce the payload size before the shuffle if the nested data supports partial aggregation; repartitionByRange instead of hash partitioning can produce more balanced partitions when key values have a known distribution; the RDD API’s custom partitioner offers full control over partition assignment at the cost of leaving the DataFrame/SQL optimizer behind; persistent bucketing (bucketBy) eliminates shuffle entirely for joins on bucketed columns, at the cost of a one-time write with controlled partitioning; AQE skew hints (Spark 3.2+) allow developers to flag specific tables as skewed, enabling AQE to apply split-and-replicate even when its automatic detection thresholds are not met; and broadcast side switching can be effective when the non-skewed side is small enough to broadcast. Each strategy involves different trade-offs in terms of implementation complexity, runtime overhead, and compatibility with the existing execution plan.
¹⁴ The code example deliberately normalizes both null arrays and empty arrays ([]) to empty arrays in the output. explode_outer emits a single row with PACKAGE = null for both cases; the F.when guard routes these null-payload rows to bucket 0, where they match exactly one left-side replica. After array_compact(collect_list(...)) removes the null entry, the result is an empty array for both. Many pipelines prefer canonicalizing “missing” and “empty” to “empty” at the boundary to simplify downstream logic. If your domain treats the distinction as semantic, for instance in auditing, SLA enforcement, or contractual data delivery, preserve it by adding a flag column before exploding: df_deliveries.withColumn("was_null", F.col("PACKAGES").isNull()). Since the flag is a per-row column, it survives explode_outer (which preserves the row for both null and empty arrays), but it must also survive the groupBy re-aggregation: include it as F.max("was_null").alias("was_null") (or F.first("was_null", ignorenulls=True)) in the aggregation expression. After re-aggregation, restore the original null with F.when(F.col("was_null"), F.lit(None)).otherwise(F.col("PACKAGES")) and drop the flag.
Bibliography
Apache Spark. Configuration — Runtime SQL Configuration. In Spark 3.5.8 Documentation. Retrieved from https://spark.apache.org/docs/3.5.8/configuration.html#runtime-sql-configuration
Apache Spark. Performance Tuning — Adaptive Query Execution. In Spark 3.5.8 Documentation. Retrieved from https://spark.apache.org/docs/3.5.8/sql-performance-tuning.html
Canadian Data Guy. A deep dive into skewed joins, groupby bottlenecks, and smart strategies to keep your Spark jobs flying. Canadian Data Guy’s No Fluff Newsletter. Retrieved June 2025 from https://www.canadiandataguy.com/p/a-deep-dive-into-skewed-joins-groupby
Cox Automotive Data Solutions. The Taming of the Skew — Part Two. Data Driven Blog. Retrieved from https://coxautomotivedatasolutions.github.io/datadriven/spark/data%20skew/joins/data-skew-2/
Databricks. Adaptive query execution. Databricks Documentation. Retrieved from https://docs.databricks.com/aws/en/optimizations/aqe