What defines a Spark job’s performance is not when execution happens, but what the optimizer can see.


Three Kinds of “Lazy”

The word “lazy” is used to describe at least three different ideas. They are related, but they are not the same thing, and the difference matters.

Lazy evaluation is a semantic property of a language: values are not computed until they are needed. Haskell is the canonical example. Each computation is local and demand-driven — a thunk fires when its result is required, not before.

Deferred execution is a property of an API. Operations are recorded but not executed immediately. Java’s Stream API works this way: intermediate operations like filter and map are registered, and execution begins only when a terminal operation like collect is invoked.

DAG compilation is a property of the execution model. Operations are collected, analyzed, reordered, rewritten, and optimized before execution begins. The entire computation graph is globally visible and subject to whole-program optimization. SQL query engines follow this model: a query planner rewrites and optimizes a query before running it, yet no one calls PostgreSQL “lazy” for doing so.

The discriminating criterion is scope. In lazy evaluation, each thunk is local and demand-driven. In DAG compilation, the system sees the entire plan at once and rewrites it globally. Deferred execution is a prerequisite for both, but says nothing about what happens between declaration and execution.

What Spark Actually Does

Spark combines deferred execution and DAG compilation. At the API level, transformations are recorded. At runtime, Catalyst — Spark’s built-in query optimizer — analyzes the logical plan, optimizes it, produces a physical plan, and hands it to the scheduler. The fact that execution is deferred is not the point; it is a consequence of global compilation. Spark delays execution to enable whole-plan optimization.

The pattern is not unique to Spark. No one calls LLVM “lazy” because it transforms source code into IR, optimizes it, and emits machine code. The structural property Spark shares with LLVM is that a complete representation of the computation is available before execution begins, and an optimization phase rewrites that representation globally. The domains differ — Catalyst operates on relational algebra, LLVM on SSA-form IR — but the architecture is the same: declare, then compile, then execute. Spark is closer to LLVM than to Haskell.

The compiler model also explains failures that the “lazy” framing cannot.

Where the Compiler Model Bites

Consider a simple filter-then-aggregate pipeline:

df = spark.read.parquet("events/")
result = df.filter(col("country") == "IT").groupBy("city").count()
result.show()

Catalyst pushes the filter down into the Parquet scan. The physical plan reads only the rows and columns it needs. This is predicate pushdown, and it happens because Catalyst can see the entire plan before execution begins.

Now consider the same logic, routed through a Python UDF:

is_italian = udf(lambda c: c == "IT", BooleanType())
result = df.filter(is_italian(col("country"))).groupBy("city").count()
result.show()

Functionally equivalent. But Catalyst cannot see inside the UDF. It is an opaque box. The predicate is not pushed down. The scan reads every row from every partition, serializes it to Python, evaluates the function, and sends the result back. You can verify this with explain():

# Native expression
== Physical Plan ==
*(2) HashAggregate(keys=[city], functions=[count(1)])
+- *(2) HashAggregate(keys=[city], functions=[partial_count(1)])
   +- *(1) Filter (country = IT)
      +- *(1) ColumnarToRow
         +- FileScan parquet [city,country]
              PushedFilters: [EqualTo(country,IT)]

# UDF
== Physical Plan ==
*(2) HashAggregate(keys=[city], functions=[count(1)])
+- *(2) HashAggregate(keys=[city], functions=[partial_count(1)])
   +- *(1) Filter UDF(country)
      +- *(1) ColumnarToRow
         +- FileScan parquet [city,country]
              PushedFilters: []

Notice PushedFilters. In the first plan, the predicate reaches the storage layer. In the second, it does not. The “lazy” model cannot explain this difference — both plans defer execution equally. The compiler model explains it directly: Catalyst optimizes what it can see. A UDF is opaque; a native expression is transparent.

This is not specific to Python. Scala UDFs are equally opaque to Catalyst. The issue is not the language; it is that any UDF is a ScalaUDF or PythonUDF node in the expression tree — not a native Catalyst expression, and therefore not rewritable.

If you must use a UDF, prefer pandas_udf. Catalyst still cannot push predicates through it, but Arrow-based batching significantly reduces the per-row serialization overhead that makes scalar Python UDFs expensive at scale. The optimizer limitation remains; the serialization cost changes shape.

Nested structures create a subtler blind spot. Suppose your Parquet schema has a struct column:

root
 |-- event_id: string
 |-- address: struct
 |    |-- country: string
 |    |-- city: string
 |-- timestamp: long

You filter on the nested field:

df = spark.read.parquet("events/")
df.filter(col("address.country") == "IT").select("event_id", "address.city")

This looks entirely native. No UDF, no opaque function — just dot notation on a Column. Catalyst resolves it, places it in the logical plan, and optimizes around it. But predicate pushdown to the storage layer is a different matter.

There are two distinct optimizations at play, and they do not behave the same way on nested fields:

Nested column pruning works. Spark can project only the struct subfields it needs — address.country and address.city — rather than reading the entire address struct. This has been supported since Spark 2.4 and works reliably with Parquet.

Nested predicate pushdown is more data-source-specific and less predictable in practice. Whether the predicate address.country = "IT" reaches the Parquet reader depends on the data source implementation, the Spark version, and the predicate type. Nested column pruning is mature and reliable; nested predicate pushdown is not — simple cases may work, but deeper nesting or complex predicates often do not reach the storage layer. When pushdown does not apply, the filter stays above the scan:

== Physical Plan ==
*(1) Filter (address.country = IT)
+- *(1) ColumnarToRow
   +- FileScan parquet [event_id,address]
        PushedFilters: []

Compare with the equivalent schema flattened to top-level columns:

== Physical Plan ==
*(1) Filter (country = IT)
+- *(1) ColumnarToRow
   +- FileScan parquet [event_id,city,country]
        PushedFilters: [EqualTo(country,IT)]

The logical plan is optimized in both cases. The difference is in what reaches the storage layer. This is not a Catalyst limitation; it is a boundary between the optimizer and the data source API. Catalyst hands off a set of predicates; the data source decides which ones it can handle. For nested fields, support is materially less predictable than for top-level columns.

UDFs and nested schemas are not wrong in themselves. What you give up when you use them is the compiler’s optimization scope — wholly or partially. Sometimes that tradeoff is worth it. But you should know you are making it.

Caching as Materialization Boundary

The same perspective reframes caching. If you think in terms of lazy versus eager execution, cache() becomes a way to “force Spark to compute here.” If you think in terms of compilation, caching becomes a materialization boundary in the dataflow graph.

Each action generates a separate QueryExecution — a distinct logical plan, optimized plan, and physical plan. Catalyst optimizes within that boundary. It cannot optimize across actions. When two jobs share a common subgraph, Catalyst recomputes it independently for each. Caching materializes an intermediate result so that subsequent jobs do not recompute it from scratch.

base = spark.read.parquet("events/").filter(col("year") == 2024)

# Without cache: the filter + scan is compiled and executed twice
italian = base.filter(col("country") == "IT").count()
german  = base.filter(col("country") == "DE").count()

# With cache: the shared subgraph is materialized once
base.cache()
italian = base.filter(col("country") == "IT").count()
german  = base.filter(col("country") == "DE").count()

This is analogous to an intermediate target in a build system. You introduce a materialization point where the plan structure requires it — where multiple downstream jobs depend on the same computed result — not where you feel the urge to force execution.

Note that cache() is itself lazy — it marks a DataFrame for caching but does not trigger computation. The first action after cache() materializes the result. This reinforces the point: even the API for materialization follows the compiler model, not an eager one. One subtlety worth knowing: the cache manager matches effectively by plan structure, not by semantic equivalence. Two DataFrames with equivalent logic but different syntactic construction may not share a cache entry.

Misplaced caching is a common symptom of the “lazy” mental model. If Spark is a compiler, caching is a structural decision about where the plan should be split — not a way to control when it works.

Shuffle Boundaries and Plan Shape

Another area where the compiler model pays off is shuffle-awareness. Consider these two logically equivalent pipelines:

# A: filter, then join
orders_it = orders.filter(col("country") == "IT")
result_a = orders_it.join(products, "product_id")

# B: join, then filter
joined = orders.join(products, "product_id")
result_b = joined.filter(col("country") == "IT")

For an inner join with a deterministic filter on a column from orders, Catalyst will produce the same physical plan for both — it pushes the filter before the join regardless of how you wrote it. This is the benefit of global compilation: the optimizer sees the whole plan, not the sequence of API calls.

But this equivalence is not universal. With a left outer join, pushing a filter on the nullable (right) side below the join would change semantics — it would eliminate rows before they get a chance to produce null-padded results. Catalyst knows this and will not reorder. Similarly, if the filter is a UDF or a non-deterministic expression, the optimizer’s hands are tied. The order you wrote matters again, because you have forced the compiler to take your code literally.

The less you constrain the plan, the more room the optimizer has.

What Compilation Cannot Reach

The compiler model also clarifies Spark’s limits. Catalyst compiles and optimizes the logical plan of a single job because that plan is fully known before execution begins. You declare the entire dataflow, then trigger it. Within that job, you do not read intermediate results to dynamically decide the next step. Adaptive Query Execution (AQE, enabled by default since Spark 3.2 via spark.sql.adaptive.enabled) may refine the physical plan at runtime — coalescing post-shuffle partitions, switching join strategies based on actual data sizes — but it still operates within the boundary of a single query plan. It is runtime refinement, not interactive replanning.

Where the flow is interactive and sequential — an OLTP transaction, a step-by-step notebook that inspects results and branches — global compilation is not possible. This is not a limitation of Spark’s implementation; it is a structural constraint. A compiler needs the whole program. If you feed it one statement at a time, each statement is still compiled, but the optimization scope shrinks to a single step — and the opportunities for global optimization are significantly reduced.

This explains why chaining multiple actions in a loop, each depending on the result of the previous one, tends to produce suboptimal plans. It is not because Spark is slow; it is because the compiler scope is too narrow to optimize across iterations.

Spark Connect, introduced in Spark 3.4 and made substantially more central in Spark 4.x, makes the compiler metaphor even more literal: the client submits a logical plan over gRPC, and the server compiles and executes it. The boundary between declaration and execution is no longer just conceptual — it is a network call.


Calling Spark lazy is not wrong. But it is reductive, and the reduction has a cost. The question is never “when does Spark execute?” It is always “what can Catalyst see?”