How Spark’s internal rewriting framework becomes an external programming surface
Starting point
Inside Catalyst there is a language for manipulating logical trees. It has a grammar, a composition mechanism, and execution semantics.
It is, in fact, an embedded DSL — but it isn’t documented as such and doesn’t have a name. Yet it’s the mechanism by which all of Spark’s analysis and optimization rules are written, from name resolution all the way to predicate pushdown. And it’s the same mechanism that Delta Lake, Iceberg, and other external projects use to extend Catalyst’s pipeline.
This article tells its story.
The DSL
Catalyst’s pipeline is a sequence of rule batches, managed by RuleExecutor. Each batch has an execution strategy: Once or FixedPoint(n). A Once batch applies its rules a single time; a FixedPoint batch applies them all repeatedly until the plan stops changing or the maximum number of iterations is reached. The “stops changing” criterion is structural (fastEquals on the tree), not semantic: it’s a syntactic fixed point of the composed transformation function — not a guarantee of normal form, nor of confluence. What must stabilize is the composition of the batch’s rules, not each individual rule: per-rule idempotence is recommended engineering discipline — it prevents oscillation and rules producing structurally different but equivalent plans — not a formal API requirement.
To give a concrete idea, two typical batches, reduced to their main rules:
Batch "Operator Optimization before Inferring Filters" (FixedPoint)
├── PredicatePushdown
├── ConstantFolding
├── ColumnPruning
└── SimplifyConditionals
Batch "Finish Analysis" (FixedPoint(1))
├── EliminateSubqueryAliases
└── ReplaceExpressions
A note in the spirit of philological honesty: in the Spark source Finish Analysis is a batch with a single meta-rule (FinishAnalysis in Optimizer.scala) that internally foldLeft-applies a set of sub-rules, including EliminateSubqueryAliases and ReplaceExpressions. The diagram above is didactic.
Each element of a batch is a Rule[LogicalPlan]: a function LogicalPlan => LogicalPlan that receives a tree, rewrites it, and returns it. It’s the unit of composition of the DSL.
Inside the Rule, the actual work is done by transform — a method of TreeNode[T], the abstract class from which all logical plans in Spark derive. In current Spark transform delegates to transformDown (preorder, parent before children), but the API explicitly warns rule authors not to rely on transform for directionality: when traversal order matters, use transformDown or transformUp directly. The rule applies a partial function where the pattern matches, and returns the transformed node. Where it doesn’t match, the node is left unchanged. The Rule[LogicalPlan] signature makes the transformation an endomorphism over LogicalPlan: it preserves Catalyst’s syntactic universe. Semantic invariants — resolution, types, exprId coherence — remain the rule author’s responsibility, not a framework guarantee.
In the following example the pattern captures a SubqueryAlias — the node that marks where a named subquery begins — and replaces it with the child subtree. After name resolution the alias is no longer needed, and this rule removes it from the tree. It’s one of the rules in the Finish Analysis batch shown above:
object EliminateSubqueryAliases extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case SubqueryAlias(_, child) => child
}
}
This is the language: case classes for the nodes, partial functions for the pattern matching, Rule for composition, RuleExecutor for execution. It isn’t complex — but it’s the computational structure on which the entire relational layer of Spark is built.
A note for those familiar with functional compilers: this structure — iterative rewriting over term trees until stabilization — resembles the way GHC organizes its simplifier passes over Core (Santos & Peyton Jones, “A Transformation-Based Optimiser for Haskell”, Science of Computer Programming 32, 1998). Catalyst splits the rewriting across two ADTs — LogicalPlan for relational nodes, Expression for predicates and functions — while GHC operates on a single Core ADT. Same mechanism, different domain.
How Spark uses the DSL internally
Spark uses this DSL uniformly across the entire pipeline. Analyzer and Optimizer do different things — the first resolves names, types, and attribute bindings; the second rewrites the plan for performance — but they rest on the same machinery: both are RuleExecutor instances running batches of Rule[LogicalPlan].
The Analyzer does name resolution, type inference, and attribute binding:
UnresolvedAttribute("age") → AttributeReference("age", IntegerType, exprId=42)
This transformation introduces a surrogate key (exprId) that disambiguates homonymous references produced by self-joins and aliasing — α-equivalence of attributes. It’s closer to GHC’s Unique on Var, or to de Bruijn indices, than to classical symbol tables name → declaration.
What matters here isn’t so much the what as the how: the transformation happens through a Rule[LogicalPlan] that uses transform to pattern-match on the unresolved nodes and replace them with resolved ones. The same mechanism as PredicatePushdown or ConstantFolding.
The Optimizer applies the optimization rules — predicate pushdown, column pruning, constant folding, expression simplification. And these too are expressed as Rule[LogicalPlan]:
object PredicatePushdown extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Filter(condition, Join(left, right, joinType, joinCond, hint))
if canPushToLeft(condition, left) =>
Join(Filter(condition, left), right, joinType, joinCond, hint)
}
}
The snippet is didactic: the real rule (PushDownPredicates) also splits conjuncts — Filter(a AND b) is broken apart and each conjunct is pushed independently, not just the relocation shown here.
Rule-based vs cost-based
This choice — rewriting by rules, rather than cost-based search over a space of equivalent plans — isn’t a given. The main alternative family is the cost-based optimizers — the Volcano Optimizer Generator (Graefe & McKenna, ICDE 1993) and its successor Cascades (Graefe, 1995), alongside IBM’s Starburst (Lohman, Pirahesh et al., late ’80s), with evolutions in SQL Server (Cascades-derived) and Postgres (System-R / dynamic-programming lineage) — which builds a space of equivalent plans and searches for a minimum-cost one against a cost function. A rule, in that world, isn’t an “improving transformation” but an “equivalence”, and the optimizer is free to explore combinations no single rule knows how to produce. It’s more powerful in principle: in practice it requires high-quality statistics, a memo data structure to avoid combinatorial explosion, and a significantly more complex architecture. The real structural gap with Catalyst is precisely the memo: Cascades enumerates equivalences in a shared structure and applies cost only to choose; Catalyst has no memo and applies directive transformations.
Catalyst’s rule-based rewriting gives up global exploration in exchange for simplicity: a fixed sequence of local transformations, a single strategy per rule, no cost function governing the sequence. The operational advantage is that every rule is readable and testable in isolation, and the set is extendable from outside without rewriting the optimizer — exactly the property Delta, Iceberg, and similar projects need. The disadvantage is fragility with respect to ordering: adding a rule can change the output plan in unexpected ways if it interacts with those that follow, and plan quality depends on what the active rules can recognize, not on the global optimum.
Spark’s cost-based join reorder is itself implemented as a Rule[LogicalPlan]: the mechanism remains rule-based; the difference is that the rule consumes statistics. The practical consequence is that performance in Spark often depends on what the optimizer can “see” structurally in the tree, not on the exploration of alternative plans. For common cases this matters little: Catalyst has a good set of normalization rules — CombineFilters merges adjacent filters, CollapseProject fuses consecutive projections, BooleanSimplification rewrites logical expressions — that make superficially different forms converge on the same optimized plan.
The difference emerges in complex plans: multi-way joins when cost-based join reordering is disabled or statistics are missing; physical-strategy selection in JoinSelection, which gives broadcast hash join the highest priority and doesn’t generate alternatives if its conditions are met; rule interactions blocked by correlated subqueries, CTEs, or UDFs that prevent local rewriting. The last is probably the most frequent case in practice. In all these cases two logically equivalent DataFrame transformation sequences can produce different physical plans: not because the optimizer is wrong, but because the shape of the input tree activates different rules.
A concrete example on Spark 4.1.1 (AQE and CBO disabled to isolate the rule-based pipeline): three datasets, A large, B very small (broadcastable), C medium (above the broadcast threshold). Two logically equivalent orderings:
val plan1 = A.join(B, "k").join(C, "k") // (A ⋈ B) ⋈ C
val plan2 = A.join(C, "k").join(B, "k") // (A ⋈ C) ⋈ B
Same final result (495 million rows in both cases). But not the same physical plan:
Form 1: Form 2:
SortMergeJoin BroadcastHashJoin
├── BroadcastHashJoin(A, B) ├── SortMergeJoin(A, C)
│ ├── A │ ├── Exchange + Sort(A)
│ └── BroadcastExchange(B) │ └── Exchange + Sort(C)
└── Exchange + Sort(C) └── BroadcastExchange(B)
Form 1 broadcasts B first — a local operation, no shuffle of A — and pays the shuffle only for the sort-merge with C. Form 2 pays the full shuffle of A upfront for the sort-merge with C, and only afterwards applies the cheap broadcast of B. Same result, very different runtime cost. Spark doesn’t reorder because — without CBO and statistics — the JoinSelection strategy is applied to the two Join nodes in the order they appear in the tree: the first Join is mapped to the best of broadcast/sort-merge, and the second is evaluated on the output of the first — its broadcast/sort-merge decision depends on the size estimates of the resulting subtree. The alternatives (for instance: moving the broadcast of B to the top of the tree) aren’t even generated.
The examples/join_order_demo.scala script reproduces this comparison on Spark 4.1.1; the full output of the two explain calls shows every BroadcastExchange, Exchange, and Sort at the points predicted by the two orderings.
The example shows the rule-based pipeline’s sensitivity to logical tree shape; it doesn’t claim Spark can’t choose good joins in general — AQE can convert SortMergeJoin to BroadcastHashJoin at runtime, and explicit hints plus high-quality statistics with CBO enabled cover other cases. The point is isolated: with AQE and CBO disabled, what remains is the rule-based pipeline, and the rule-based pipeline is structurally blind to reorderings that would require global exploration.
How external projects use the DSL
Spark exposes an API — SparkSessionExtensions — that allows external projects to inject custom rules into the Catalyst pipeline. It is a documented Developer API — marked @Experimental/@Unstable in Spark’s ScalaDoc — but it is the official extension mechanism Spark exposes for injecting parser, analyzer, optimizer, and planner customizations. The extension surface covers most of the planning pipeline, from parsing and analysis to optimization, planning, columnar execution, and AQE-specific phases:
injectParser— extend or replace the SQL parserinjectResolutionRule— add rules to the AnalyzerinjectPostHocResolutionRule— post-analysis resolution rulesinjectCheckRule— plan validation after analysisinjectOptimizerRule— add rules to the OptimizerinjectPlannerStrategy— physical-planning strategies
In current Spark, optimizer extension rules are appended to the operatorOptimizationRuleSet and run inside the operator-optimization batches. That is an implementation detail, not a semantic contract: extensions should not depend on fine-grained neighboring order between external and native rules across Spark versions.
Delta Lake and Iceberg both use this mechanism, with different emphases: Delta intervenes mainly in parsing and analysis, Iceberg mainly in optimization, but there’s overlap — Iceberg also injects resolution rules, and Delta also injects optimizer rules.
Delta Lake injects a parser, analyzer rules, and check rules — it extends the analysis and validation portion of the pipeline:
// From DeltaSparkSessionExtension (simplified)
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectParser { (session, parser) =>
new DeltaSqlParser(parser)
}
extensions.injectResolutionRule { session =>
new DeltaAnalysis(session)
}
extensions.injectCheckRule { session =>
new DeltaUnsupportedOperationsCheck(session)
}
extensions.injectPostHocResolutionRule { session =>
new PreprocessTableMerge(session.sessionState.conf)
}
}
Iceberg injects rules into the optimizer — it extends the optimization portion:
// From IcebergSparkSessionExtensions (simplified)
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectParser { case (_, parser) =>
new IcebergSparkSqlExtensionsParser(parser)
}
extensions.injectResolutionRule { spark => ResolveProcedures(spark) }
extensions.injectOptimizerRule { spark => RewriteDelete(spark) }
extensions.injectOptimizerRule { _ =>
OptimizeConditionsInRowLevelOperations
}
}
The snippets above are simplified and version-dependent: the exact set of injected rules changes across Delta and Iceberg releases. The point isn’t which rules they inject — it’s that external rules operate in the same pipeline, on the same types, with the same mechanism as native rules. A Rule[LogicalPlan] injected by Iceberg is structurally indistinguishable from PredicatePushdown.
What you can do with this DSL
If projects like Delta Lake and Iceberg extend Catalyst for their own operations, the same mechanism is available to anyone. A concrete use case: an extension that intercepts dangerous patterns in the plan and signals — or blocks — them before execution.
A caveat. The code that follows is a sketch: it shows the extension point, it isn’t a production-ready library. The complete version is at github.com/cdelmonte-zg/planguard. The operational pitfalls — placement, idempotence, FixedPoint, PySpark bootstrap — are covered in a separate article (see below).
For an extension that inspects the plan and can reject queries — without transforming it — the correct hook is injectCheckRule. Unlike transformational rules, which are endomorphisms LogicalPlan => LogicalPlan, a check rule is a predicate LogicalPlan => Unit: it inspects, it may throw, but it does not rewrite. It is invoked in the analysis check phase — not in the optimizer’s FixedPoint batches — so it isn’t multiplied by iterations the way an optimizer rule would be. It’s the documented point for analysis-level errors.
class PlanGuardExtension extends (SparkSessionExtensions => Unit) {
// Entry point: Spark instantiates this class based on
// spark.sql.extensions and calls apply once per SparkSession.
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectCheckRule { session =>
DetectDangerousPatterns(session)
}
}
}
case class DetectDangerousPatterns(session: SparkSession)
extends (LogicalPlan => Unit) with Logging {
override def apply(plan: LogicalPlan): Unit = {
plan.foreach {
// 1. Inner join without a condition — the classic forgotten ON.
// df.crossJoin(other) is explicit user intent and is NOT
// intercepted (joinType = Cross, not Inner).
case Join(_, _, Inner, None, _) =>
throw new SparkException(
"Join without condition detected: possible cartesian product"
)
// 2. Join with a condition but no equi-join key: the planner
// will fall back to BroadcastNestedLoopJoin / CartesianProduct.
// ExtractEquiJoinKeys is the same helper Catalyst uses to
// decide between hash and nested-loop.
case j @ Join(_, _, _, Some(_), _) if !isEquiJoin(j) =>
logWarning(
"Non-equi join detected: will execute as nested-loop join"
)
// 3. Python UDF inside a Filter: an opaque node to Catalyst,
// blocks predicate pushdown and any rewrite on the predicate.
case Filter(condition, _) if containsPythonUdf(condition) =>
logWarning(
"Python UDF in filter predicate: pushdown and optimization blocked"
)
// 4. collect_list / collect_set / collect_top_k in an aggregation:
// dangerous memory profile, non-spillable in-memory buffer.
// Note: in Spark 4.x Aggregate has 4 fields (groupingExpressions,
// aggregateExpressions, child, hint: Option[AggregateHint]);
// in 3.x there are 3.
case Aggregate(_, aggregations, _, _)
if aggregations.exists(containsCollect) =>
logWarning(
"collect_* in aggregation: may create large in-memory buffers"
)
case _ =>
}
// inspection-only: no return value, no plan transformation.
}
private def isEquiJoin(j: Join): Boolean =
ExtractEquiJoinKeys.unapply(j).exists { case (_, leftKeys, _, _, _, _, _, _) =>
leftKeys.nonEmpty
}
private def containsPythonUdf(expr: Expression): Boolean =
expr.exists { case _: PythonUDF => true; case _ => false }
// Collect[_] is the abstract parent class of CollectList, CollectSet,
// and CollectTopK (all in org.apache.spark.sql.catalyst.expressions.aggregate).
// CollectList/CollectSet are unbounded per group; CollectTopK is bounded by k
// but can still be dangerous with large k or many groups.
private def containsCollect(expr: Expression): Boolean =
expr.exists {
case AggregateExpression(_: Collect[_], _, _, _, _) => true
case _ => false
}
}
Registration:
spark.sql.extensions=dev.cdelmonte.planguard.PlanGuardExtension
Deployment doesn’t require modifying Spark: the extension is compiled into a JAR, included in the cluster’s classpath or passed via --jars at submit time, and activated with spark.sql.extensions=.... It’s the same procedure used, among others, by io.delta.sql.DeltaSparkSessionExtension in the delta-spark JAR.
spark.sql.extensions is additive: multiple values are comma-separated — for example io.delta.sql.DeltaSparkSessionExtension,dev.cdelmonte.planguard.PlanGuardExtension. In clusters with multiple extensions the value must be concatenated, not overwritten.
The same hook works “on the fly” from a programmatically constructed SparkSession, for example in a notebook or in PySpark:
spark = (
SparkSession.builder
.config("spark.sql.extensions",
"dev.cdelmonte.planguard.PlanGuardExtension")
.getOrCreate()
)
An operational detail worth noting: the JAR must be on the classpath when the JVM starts, not when the SparkSession is built. Concretely, it must be passed with --jars to spark-submit, or, from “pure” Python, via PYSPARK_SUBMIT_ARGS set before the pyspark import:
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = (
"--jars /abs/path/to/extension.jar pyspark-shell"
)
from pyspark.sql import SparkSession # only from here on
This holds for any SparkSessionExtensions: it’s a property of the JVM bootstrap, not specific to this extension. In managed environments (Databricks attached cluster, EMR, Glue, Dataproc) the driver JVM is already running by the time getOrCreate is called: the classpath must be injected from the cluster configuration (init scripts, bootstrap actions, properties), not from code.
What the rule intercepts, and why at the logical-plan level
The cartesian join is intercepted at the logical-plan level, before the physical-strategy choice. AQE may demote a SortMergeJoin to SHJ/BHJ based on shuffle statistics, but it doesn’t convert a cartesian into something non-cartesian: the problem is structural, not resolvable at runtime. The rule distinguishes the intentional case (explicit crossJoin, joinType = Cross) from the accidental one (Inner with empty condition, the forgotten ON): only the latter is blocked.
The non-equi join is the cartesian’s less conspicuous cousin. The condition exists, syntactically it looks like a normal join, but no equality key can become a hash key — Catalyst has no alternative to BroadcastNestedLoopJoin, which is cartesian-like in cost. The rule doesn’t block it (there are legitimate cases: range joins, semi-joins with < to select top-N); it simply makes it visible. For the check it uses the same ExtractEquiJoinKeys the planner consults to decide the physical strategy.
The Python UDF in a filter is an opaque node that blocks predicate pushdown. Catalyst can’t inspect its contents, so no rewrite rule can cross it. The rule doesn’t solve the problem — it makes it visible before the query runs on a cluster with real data.
collect_list in an aggregation isn’t a logical error — it’s an operational risk. No native Catalyst rule flags it. A traditional optimizer doesn’t worry about it; in a framework for distributed dataflow, the operational risk belongs precisely to the level where the plan is still inspectable.
PlanGuard is a static analyzer over Catalyst’s IR — a linter that enforces local operational invariants before execution — not an optimizer: it doesn’t pick a better plan; it rejects or flags plans that embody dangerous patterns.
Extending Observability
The same architectural position — a rule running at plan-time before execution — also works as a signal-emission point, not just a blocking one. Instead of just logging warnings or throwing exceptions, the Rule can emit OpenTelemetry spans or events — with metadata about the detected pattern, the plan involved, the problematic predicate. A cartesian join or a UDF in a filter aren’t only intercepted at plan-time: they’re tracked as events in the cluster’s monitoring system, through any OTel-compatible backend. Catalyst’s pipeline becomes an emission point for diagnostic signals, not just for plan transformation.
The hook is mechanical. You initialize a Tracer once — the OTLP exporter reads endpoint and service name from standard environment variables (OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_SERVICE_NAME):
// lazy val: the SDK starts on first detection, not at class-load.
// Standard OTel env vars decide where and how to export.
private lazy val tracer: Tracer = {
val provider = SdkTracerProvider.builder()
.addSpanProcessor(
BatchSpanProcessor.builder(OtlpGrpcSpanExporter.builder().build()).build())
.build()
// Without a shutdown hook BatchSpanProcessor loses queued spans
// when the JVM terminates — the worst case for blocking events.
Runtime.getRuntime.addShutdownHook(new Thread(() => provider.close()))
OpenTelemetrySdk.builder().setTracerProvider(provider).build()
.getTracer("dev.cdelmonte.planguard")
}
Each detection becomes a span with attributes about the pattern and the node. The span is opened and closed in-place inside the case branch — the plan isn’t touched, only observed. It’s a zero-duration span: used as an event (a counter source via spanmetrics), not as a unit of time. On Tempo or Jaeger it will appear disconnected from the Spark query trace — which is exactly what we want.
case Filter(condition, _) if containsPythonUdf(condition) =>
tracer.spanBuilder("catalyst.plan_guard.detect")
.setAllAttributes(Attributes.of(
AttributeKey.stringKey("planguard.pattern"), "python_udf_in_filter",
AttributeKey.stringKey("planguard.node"), "Filter"))
.startSpan().end()
logWarning("Python UDF in filter predicate: pushdown blocked")
A production note: attributes must stay low-cardinality. Embedding the full plan, raw predicates, or a per-query query_id will blow up the cardinality of spanmetrics-derived metrics; a plan fingerprint (a hash) is safer.
From here on the pipeline is the standard OTel one: the span travels in OTLP gRPC to the collector, can be exported to any backend, and — through the spanmetrics connector — automatically converted into a Prometheus counter. A single emission produces both the detail for diagnosis (the individual query, the plan, the attributes) and the aggregated metric for alerting. Catalyst’s DSL doesn’t just rewrite the plan: it’s also the point at which the plan becomes visible.
Conclusions
Catalyst isn’t just Spark’s internal optimizer. It’s a framework with an embedded DSL for manipulating logical trees — and that DSL is one of the system’s main programmable surfaces at the query-planning level: the one in which the dataflow is still represented as a transformable structure, before it becomes physical code.
Logical-plan analysis and optimization are built on a single mechanism: rules that pattern-match on tree nodes and rewrite them. The same mechanism is exposed externally through SparkSessionExtensions, and it’s the same mechanism by which Delta Lake extends the analyzer and Iceberg rewrites operations in the optimizer. Anyone can inspect, transform, or reject a dataflow before it is executed.
Most Spark users will never see this DSL. But it’s the level at which the system decides how to analyze, optimize, and plan every query — the actual code generation (Janino → JVM bytecode) happens downstream, in WholeStageCodeGen. Once you’ve seen this level, Spark no longer appears as just an engine that runs distributed transformations, but as a system that exposes its own compiler as a surface for intervention.
The operational pitfalls one encounters writing real rules are the subject of the article Anti-patterns in Catalyst rules, the operational counterpart to this one.
References
- Armbrust et al., Spark SQL: Relational Data Processing in Spark, SIGMOD 2015 — the foundational Catalyst paper, with the architectural motivation for rule-based rewriting.
- SparkSessionExtensions — the JavaDoc for Spark’s extension point.
- Source code:
org.apache.spark.sql.catalyst.{trees, rules, plans}—TreeNode,Rule,RuleExecutor,LogicalPlanand the cited rules (EliminateSubqueryAliases,PushDownPredicates— referred to asPredicatePushdownin the text —,BooleanSimplification). - github.com/cdelmonte-zg/planguard — the extension shown on this page, in its complete version.
- Anti-patterns in Catalyst rules — the operational counterpart to this article.