What defines a Spark job’s performance is not when execution happens, but what the optimizer can see.
Three Kinds of “Lazy”
The word “lazy” is used to describe at least three different ideas. They are related, but they are not the same thing, and the difference matters.
Lazy evaluation is a semantic property of a language: values are not computed until they are needed. Haskell is the canonical example. Each computation is local, demand-driven, and memoized — a thunk fires when its result is required, not before, and once evaluated, is not recomputed.
Deferred execution is a property of an API. Operations are recorded but not executed immediately. Java’s Stream API works this way: intermediate operations like filter and map are registered, and execution begins only when a terminal operation like collect is invoked.
DAG compilation — a term used here to distinguish this property from the previous two, not a standard term in compiler literature — is a property of the execution model. Operations are collected, analyzed, reordered, rewritten, and optimized before execution begins. The entire computation graph is globally visible and subject to whole-program optimization. SQL query engines follow this model: a query planner rewrites and optimizes a query before running it.
The discriminating criterion is scope. In lazy evaluation, each thunk is local and demand-driven. In DAG compilation, the system sees the entire plan at once and rewrites it globally. Lazy evaluation implies deferral by its very nature; DAG compilation requires deferred execution as a prerequisite. But deferred execution alone says nothing about what happens between declaration and execution.
What Spark Actually Does
Spark combines deferred execution and DAG compilation. At the API level, transformations are recorded. At runtime, Catalyst — Spark’s optimizer framework, which this article treats as a query compilation pipeline — analyzes the logical plan, optimizes it, produces a physical plan, and hands it to the scheduler. The fact that execution is deferred is not the point; it is a consequence of global compilation. Spark delays execution to enable whole-plan optimization.
The pattern is not unique to Spark. No one calls LLVM “lazy” because it transforms source code into IR, optimizes it, and emits machine code. The structural property Spark shares with LLVM is that a complete representation of the computation is available before execution begins, and an optimization phase rewrites that representation globally. The domains differ — Catalyst operates on relational algebra, LLVM on SSA-form IR; one optimizes data access patterns, the other optimizes machine code — but the high-level architecture is the same: declare, then compile, then execute. Without code generation, this would make Catalyst a query optimizer with a compiler-like architecture — a meaningful but incomplete analogy. Tungsten’s Whole-Stage Code Generation closes the gap: the physical plan is compiled to JVM bytecode via Janino, making the compiler analogy not just structural but literal. Spark is closer to LLVM than to Haskell.
The compiler model also explains failures that the “lazy” framing cannot.
Where the Compiler Model Bites
Consider a simple filter-then-aggregate pipeline:
df = spark.read.parquet("events/")
result = df.filter(col("country") == "IT").groupBy("city").count()
result.show()
Catalyst pushes the filter down into the Parquet scan. The physical plan reads only the rows and columns it needs. This is predicate pushdown, and it happens because Catalyst can see the entire plan before execution begins.
Now consider the same logic, routed through a Python UDF:
is_italian = udf(lambda c: c == "IT", BooleanType())
result = df.filter(is_italian(col("country"))).groupBy("city").count()
result.show()
Functionally equivalent. But Catalyst cannot see inside the UDF. It is an opaque box. The predicate is not pushed down. The scan reads every row from every partition (though only the needed columns), serializes each to Python, evaluates the function, and sends the result back. You can verify this with explain():
# Native expression
== Physical Plan ==
*(2) HashAggregate(keys=[city], functions=[count(1)])
+- *(2) HashAggregate(keys=[city], functions=[partial_count(1)])
+- *(1) Filter (country = IT)
+- *(1) ColumnarToRow
+- FileScan parquet [city,country]
PushedFilters: [EqualTo(country,IT)]
# UDF
== Physical Plan ==
*(2) HashAggregate(keys=[city], functions=[count(1)])
+- *(2) HashAggregate(keys=[city], functions=[partial_count(1)])
+- *(1) Filter UDF(country)
+- *(1) ColumnarToRow
+- FileScan parquet [city,country]
PushedFilters: []
Notice PushedFilters. In the first plan, the predicate reaches the storage layer. In the second, it does not. The “lazy” model cannot explain this difference — both plans defer execution equally. The compiler model explains it directly: Catalyst optimizes what it can see. A UDF is opaque; a native expression is transparent.
This is not specific to Python. Scala UDFs are equally opaque to Catalyst. The issue is not the language; it is that any UDF is a ScalaUDF or PythonUDF node in the expression tree — not a native Catalyst expression, and therefore not rewritable.
If you must use a UDF, prefer
pandas_udf. Catalyst still cannot push predicates through it, but Arrow-based batching significantly reduces the per-row serialization overhead that makes scalar Python UDFs expensive at scale. The optimizer limitation remains; the serialization cost changes shape.
Nested structures create a subtler blind spot. Suppose your Parquet schema has a struct column:
root
|-- event_id: string
|-- address: struct
| |-- country: string
| |-- city: string
|-- timestamp: long
You filter on the nested field:
df = spark.read.parquet("events/")
df.filter(col("address.country") == "IT").select("event_id", "address.city")
This looks entirely native. No UDF, no opaque function — just dot notation on a Column. Catalyst resolves it, places it in the logical plan, and optimizes around it. But predicate pushdown to the storage layer is a different matter.
There are two distinct optimizations at play, and they do not behave the same way on nested fields:
Nested column pruning works. Spark can project only the struct subfields it needs — address.country and address.city — rather than reading the entire address struct. This has been supported and enabled by default via spark.sql.optimizer.nestedSchemaPruning.enabled since Spark 2.4.1, and works reliably with Parquet.
Nested predicate pushdown has historically been more data-source-specific and less predictable than its flat-column equivalent. Whether the predicate address.country = "IT" reaches the Parquet reader depends on the data source implementation, the Spark version, and the predicate type. In earlier versions of Spark, simple nested predicates often did not reach the storage layer, and the filter would stay above the scan with PushedFilters: []. Since Spark 4.x, nested predicate pushdown has improved significantly — simple equality predicates on top-level struct fields now push down reliably to the Parquet reader. Deeper nesting or complex predicates may still not reach the storage layer depending on the data source.
The broader point remains: the logical plan is optimized in both cases, but what reaches the storage layer is a boundary between the optimizer and the data source API. Catalyst hands off a set of predicates; the data source decides which ones it can handle.
UDFs and nested schemas are not wrong in themselves. What you give up when you use them is the compiler’s optimization scope — wholly or partially. Sometimes that tradeoff is worth it. But you should know you are making it.
Caching as Materialization Boundary
The same perspective reframes caching. If you think in terms of lazy versus eager execution, cache() becomes a way to “force Spark to compute here.” If you think in terms of compilation, caching becomes a materialization boundary in the dataflow graph.
Each action generates a separate QueryExecution — a distinct logical plan, optimized plan, and physical plan. Catalyst optimizes within that boundary. It cannot optimize across actions. When two jobs share a common subgraph, Catalyst recomputes it independently for each. Caching materializes an intermediate result so that subsequent jobs do not recompute it from scratch.
base = spark.read.parquet("events/").filter(col("year") == 2024)
# Without cache: the filter + scan is compiled and executed twice
italian = base.filter(col("country") == "IT").count()
german = base.filter(col("country") == "DE").count()
# With cache: the shared subgraph is materialized once
base.cache()
italian = base.filter(col("country") == "IT").count()
german = base.filter(col("country") == "DE").count()
This is analogous to an intermediate target in a build system. You introduce a materialization point where the plan structure requires it — where multiple downstream jobs depend on the same computed result — not where you feel the urge to force execution.
Note that cache() is itself lazy — it marks a DataFrame for caching but does not trigger computation. The first action after cache() materializes the result. This reinforces the point: even the API for materialization follows the compiler model, not an eager one. One subtlety worth knowing: the cache manager matches by canonicalized plan structure — an intermediate level between syntactic identity and full semantic equivalence. Canonicalization normalizes attribute names and rewrites commutative expressions into a canonical form. So df.filter(col("x") > 1) and df.where(col("x") > 1) share the cache, and so do df.filter(col("x") > 1) and df.filter(lit(1) < col("x")) — because canonicalization rewrites both to the same form. But more complex structural differences — such as reordering joins, combining filters differently, or adding an unnecessary withColumn that introduces a new projection node — can break cache sharing despite logical equivalence.
The corollary is unpersist(). If caching is a structural decision about where to split the plan, removing a cache entry is equally structural. In a long pipeline, forgetting to unpersist can saturate executor memory — a practical failure directly tied to treating caching as an afterthought rather than an architectural choice.
Misplaced caching is a common symptom of the “lazy” mental model. If Spark is a compiler, caching is a structural decision about where the plan should be split — not a way to control when it works.
Filter Placement and Join Semantics
Another area where the compiler model pays off is understanding how filter placement interacts with join semantics. Consider these two logically equivalent pipelines:
# A: filter, then join
orders_it = orders.filter(col("country") == "IT")
result_a = orders_it.join(products, "product_id")
# B: join, then filter
joined = orders.join(products, "product_id")
result_b = joined.filter(col("country") == "IT")
For an inner join with a deterministic filter on a column from orders, Catalyst will produce the same physical plan for both — it pushes the filter before the join regardless of how you wrote it. This is the benefit of global compilation: the optimizer sees the whole plan, not the sequence of API calls.
But this equivalence is not universal. With a left outer join, pushing a filter on the nullable (right) side below the join would change semantics — it would eliminate rows before they get a chance to produce null-padded results. Catalyst knows this and will not reorder. Similarly, if the filter is a UDF or a non-deterministic expression, the optimizer’s hands are tied. The order you wrote matters again, because you have forced the compiler to take your code literally.
Here is a concrete case. Suppose you want all orders, with product details where available, but only for products currently in stock:
# Left join: keep all orders, attach product info where it exists
result = orders.join(products, "product_id", "left_outer") \
.filter(col("in_stock") == True)
This silently drops every order whose product is not in the products table — or whose product is out of stock. The in_stock column comes from products — the nullable (right) side of the join; rows with no match have in_stock = null, and the filter eliminates them. What was a left outer join now behaves like an inner join with an extra predicate.
Catalyst will not push this filter below the join, because doing so would change the result. The optimizer is correct. But if you intended to filter before the join — selecting only in-stock products to join against — you need to say so explicitly:
in_stock_products = products.filter(col("in_stock") == True)
result = orders.join(in_stock_products, "product_id", "left_outer")
Now orders with no matching in-stock product still appear in the output, with null product columns — which is what a left join is supposed to do. Catalyst will push the filter into the scan of products because it is applied before the join, within a scope the optimizer fully controls.
The less you constrain the plan, the more room the optimizer has.
What Compilation Cannot Reach
The same framework clarifies where Spark’s optimization ends. Catalyst compiles and optimizes the logical plan of a single query because that plan is fully known before execution begins. You declare the entire dataflow, then trigger it. Within that query, you do not read intermediate results to dynamically decide the next step. Adaptive Query Execution (AQE, enabled by default since Spark 3.2 via spark.sql.adaptive.enabled) may refine the physical plan at runtime — coalescing post-shuffle partitions, switching join strategies based on actual data sizes, rewriting skew joins — but it still operates within the boundary of a single query plan. It is runtime refinement, not interactive replanning.
Where the flow is interactive and sequential — an OLTP transaction, a step-by-step notebook that inspects results and branches — global compilation is not possible. This is not a limitation of Spark’s implementation; it is a structural constraint. A whole-program compiler needs the whole program. If you feed it one statement at a time, each statement is still compiled, but the optimization scope shrinks to a single step — and the opportunities for global optimization are significantly reduced.
This explains why chaining multiple actions in a loop, each depending on the result of the previous one, tends to produce suboptimal plans. It is not because Spark is slow; it is because the compiler scope is too narrow to optimize across iterations.
The compiler model also illuminates fault recovery. Spark’s RDD lineage — the chain of transformations from source to result — is a recomputable intermediate representation that the runtime preserves. When a partition is lost, Spark does not restore from a checkpoint by default; it re-executes the lineage for that partition. Caching is the explicit memoization point; without it, recomputation is the default. You opt in to materialization; everything else is recomputable by design.
Spark Connect, introduced in Spark 3.4 and central in Spark 4.x, makes the separation between declaration and execution architecturally explicit: the client submits an unresolved logical plan over gRPC, and the server analyzes, optimizes, and executes it. The boundary is no longer just conceptual — it is a network call.
Calling Spark lazy is not wrong. But it is reductive, and the reduction has a cost. The question is never “when does Spark execute?” It is always “what can Catalyst see?”