What to know before writing an extension that enters Spark’s pipeline
Foreword
In the previous article I described Catalyst as an embedded DSL: rules that pattern-match on tree nodes, batches executed by a RuleExecutor, an extension surface exposed via SparkSessionExtensions. The final example — a rule that intercepts cartesian joins, Python UDFs, and collect_list — was intentionally a sketch.
When the sketch becomes real code running inside a cluster’s pipeline, things that looked like details turn into concrete problems. This article is the operational counterpart to that first one: a collection of anti-patterns I’ve encountered (some of which I’ve made myself) building PlanGuard in its operational version, with the goal of making the why behind each one explicit.
The temptation to put the throw inside an optimizer rule
In the previous article PlanGuard is registered with injectCheckRule. The choice isn’t obvious — the most immediate reflex, reading the SparkSessionExtensions documentation, is to use injectOptimizerRule with a Rule[LogicalPlan] that throws an exception:
object BlockCartesian extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Join(_, _, Inner, None, _) =>
throw new SparkException("cartesian product detected")
}
}
…and register it with injectOptimizerRule. The rule enters the optimizer batch, pattern-matches on the Inner join without condition, throws. It works — at least the day you write it.
What you don’t see, and what justifies the different choice in the main article, is where the throw ends up. An optimizer rule is invoked by RuleExecutor.execute inside a Batch(FixedPoint, ...), which is itself called by QueryExecution.optimizedPlan. Above sits assertAnalyzed, and above that the caller where the user wrote df.collect() or df.explain(). The throw crosses all these levels — and any operation that materializes executedPlan: df.collect(), df.show(), df.write..., even df.explain(), which is supposed to be there precisely to see the plan and diagnose it. Operations that stop at analysis (df.queryExecution.analyzed, df.printSchema which uses analyzed.schema) don’t trigger it — the optimizer is strictly lazy.
The right hook is injectCheckRule. A check rule is a function LogicalPlan => Unit that Spark invokes inside checkAnalysis, at the end of Analyzer.execute — so after analysis, before optimization. It can throw an exception to reject the plan. It’s documented by Spark for exactly this purpose: “this query must not be executed”.
A useful mechanical detail: analysis is memoized via QueryExecution.analyzed (lazy val), so on a DataFrame that has already been analyzed explain() does not re-fire the check rule. But if the plan hasn’t been analyzed yet, explain() does activate it — the end of analysis is the single point at which the check rule runs.
There’s also a less obvious temporal consequence. With injectOptimizerRule, the throw reaches the caller of df.collect() (the optimizer is lazy: rewriting only happens when an action requires the physical plan). With injectCheckRule, analysis is run inside Dataset.ofRows, which is called by every method that returns a new DataFrame — df.filter, df.join, df.select. The query “blows up” at the DataFrame’s construction, not at collect.
val joined = left.join(right) // ← with CheckRule, SparkException is thrown here
joined.collect() // ← with OptimizerRule, it would have been here
From the user’s perspective the check rule is better: the failure is attached to analysis, not to optimization or execution. In Spark classic this usually means the exception is raised while constructing the next DataFrame, close to the offending transformation, rather than being deferred to collect/show/write. But it’s a detail that changes the shape of the tests: the try/catch must wrap the DataFrame construction, not just the action. It’s exactly the shift that separates an “article-sketch rule” from a “pipeline rule”.
In short, choose the hook based on intent:
- semantic invariant of the plan (rejecting queries that must not run) →
injectCheckRule - plan rewriting (transformation that produces a new
LogicalPlan) →injectOptimizerRule - per-query metrics/observability (observing without modifying) →
QueryExecutionListener
A note: the check rule fires once per analyzed plan. If the same query goes through CacheManager/Dataset.persist and is re-executed on a cache hit, the check rule is not re-applied — the plan has already “passed” analysis.
Side-effects and FixedPoint multiplication
The other reason injectCheckRule is preferred isn’t the throw: it’s the number of times the rule is called. Even when you write a legitimate optimizer rule — one that actually rewrites the plan, because that’s what optimizer rules are supposed to do — and you add “innocent” side-effects like logging or metric emission, there’s a less obvious consequence. Spark’s optimizer organizes its rules into batches; many batches have a FixedPoint strategy, which means: “execute all the batch’s rules repeatedly until the plan stops changing”.
Batch "Operator Optimization" (FixedPoint)
├── PredicatePushdown
├── ConstantFolding
├── ColumnPruning
├── DetectDangerousPatterns ← ours
└── SimplifyConditionals
If our rule is a no-op (returns the plan unchanged), it doesn’t contribute to termination. But if other rules in the batch keep rewriting — ConstantFolding simplifies an expression, PredicatePushdown moves a filter, ColumnPruning trims a projection — every batch iteration re-applies ours too (in the absence of pruning declared via nodePatterns/containsAnyPattern). Typically this is a few iterations for a simple plan; with subqueries, CTEs, or complex predicates the number can rise significantly.
The result is concretely observable: a Prometheus counter (catalyst_calls_total) that reports 8 for four actual detections, because each emission was duplicated in proportion to the batch’s iterations. The same happens to log warnings: three identical lines for the same Python UDF, because another rule in the batch changed the plan, and the next FixedPoint iteration revisited the same node.
Faced with this problem, the practicable routes are three, in order of preference:
-
Move the rule into a check rule (as in the previous point). Check rules run once after analysis — no FixedPoint, no multiplication.
-
Memoize per plan-id (treat as anti-pattern, not solution). The natural temptation is to track the
System.identityHashCode(plan)of the input and skip if already seen. It doesn’t work: Catalyst is immutable, everytransform/copyproduces a new instance, so at iteration N+1 the input plan has a different identityHashCode from iteration N even if structurally identical. The cache never hits between iterations. To deduplicate across iterations the key must be structural (plan.canonicalizedor a hash oftreeString), not referential — but at that point the cost of structural comparison exceeds the cost of the duplication you were trying to avoid. The truly correct pattern, when the need is “one metric per query”, is to observe outside the optimizer:QueryExecutionListener.onSuccessis the documented point for per-query emissions. Caveat: it only sees queries that reach execution. Queries rejected in analysis don’t trigger it — so it isn’t a substitute for a check rule when enforcement is required. -
Use
transformDownWithPruningwithnodePatterns. It doesn’t resolve the multiplication of emissions across iterations — the rule still gets revisited — but it reduces the cost of a single traversal by skipping entire subtrees that don’t contain the patterns of interest. It’s a cost mitigation, not an architectural fix.
The rule many people think they’ve written (“one emission per detection”) corresponds to the check rule. The one written as an optimizer rule is “one emission per detection per batch iteration” — a different thing.
Unprotected mutable state
The architectural observation comes before the code: if a Rule needs mutable state to be correct, it’s probably in the wrong place. A check rule that runs once doesn’t have this problem; neither does a QueryExecutionListener for per-query metrics. If the temptation comes anyway, it’s worth seeing why the “obvious” mitigations — var, ThreadLocal, ConcurrentHashMap — are traps.
The memoization from the previous point — “track the plans already processed” — looks innocuous. It’s written like this:
private var lastSeen: LogicalPlan = _
def apply(plan: LogicalPlan): LogicalPlan = {
if (plan eq lastSeen) return plan
lastSeen = plan
// ... emit, log, etc.
plan
}
A var inside a Rule. It works as long as there’s one query at a time on a single rule instance. Spark by default doesn’t parallelize the optimizer across different queries, but several scenarios break this quiet: SparkSession.newSession() allows multiple sessions on the same JVM; notebooks execute cells sequentially — but not always; Spark Connect (SparkConnectService) keeps the same JVM alive with N clients; AQE (AdaptiveSparkPlanExec) re-applies the optimizer on subplans during execution, on threads different from the main one. How the rule is instantiated depends on how you register it. injectOptimizerRule(_ => MyRule) with MyRule as a singleton object means state shared globally across every session in the JVM; injectOptimizerRule(session => new MyRule(session)) gives at least one instance per session — but that same instance can still be used by multiple concurrent queries within the same session.
ThreadLocal isn’t a reliable way out: AQE crosses threads (sub-plans are optimized in scala.concurrent.Future with a dedicated executor), and reused thread pools can leak values across queries. ConcurrentHashMap keyed on identityHashCode is the FixedPoint multiplication problem amplified: a LogicalPlan’s referential identity changes with every transform/copy, so the cache doesn’t hit either across FixedPoint iterations or across queries. To really deduplicate across plans the key must be structural (canonicalized), and at that point you’ve written a mini-optimizer inside a rule.
Conclusion: mutable state in a Rule is a time bomb, and the mitigations one would find writing the fix alone are almost all wrong. If the need is “don’t emit the same warning more than once in 5 minutes”, that rate-limit belongs to the observability system — not to the Spark rule.
foreach when transformDownWithPruning would have done
A typical inspection rule does something like:
def apply(plan: LogicalPlan): LogicalPlan = {
plan.foreach {
case Filter(condition, _) if isDangerous(condition) =>
logWarning("...")
case _ =>
}
plan
}
plan.foreach visits every node of the tree, whatever the pattern of interest. On a large plan (nested subqueries, views, CTEs, broadcast hints) that means many visits per rule, per iteration. It isn’t a correctness problem — foreach doesn’t rewrite the plan, so it doesn’t violate idempotency — it’s a cost problem. Catalyst has an API that does the same thing with structural pruning: transformDownWithPruning, paired with a containsAnyPattern(...) test that skips entire subtrees if they don’t contain any of the indicated patterns.
import org.apache.spark.sql.catalyst.trees.TreePattern._
def apply(plan: LogicalPlan): LogicalPlan = {
plan.transformDownWithPruning(_.containsAnyPattern(JOIN, FILTER, AGGREGATE)) {
case f @ Filter(condition, _) if isDangerous(condition) =>
logWarning("...")
f
case other => other
}
}
The mechanism is the same one Spark’s native rules use internally — nodePatterns and TreePatternBits were introduced in Spark 3.2 precisely to reduce the cost of iteration in the FixedPoint. It’s an accelerator for the rewriting engine: subtrees without the indicated patterns are skipped entirely, so on plans with few relevant nodes one visits a fraction of the total nodes. It must be kept distinct from idempotency and termination — containsAnyPattern doesn’t help convergence, it only accelerates it.
Two notes. One on readability: transformDownWithPruning has the same signature as transform, and for an inspection rule that returns the node unchanged the visual pattern is identical. The difference is only in the pruning predicate, which is one line. One on compatibility: TreePattern is Spark’s internal API (org.apache.spark.sql.catalyst.trees), and its members can change between minor versions (3.2 → 3.3 → 3.5 → 4.0). For extensions distributed cross-version it’s worth verifying against the target version before relying on a specific enum name.
Idempotency
An optimizer rule should be idempotent, or at least participate in a batch whose composed transformation reaches a fixed point. Catalyst does not check idempotency rule by rule: it compares the plan produced by one batch iteration with that of the previous iteration, and stops when they are structurally equal (fastEquals). A useful theoretical note: in term rewriting the formal termination condition is the existence of a well-founded ordering (Knuth–Bendix) — idempotency is sufficient at the single-rule level but not necessary. In practice it holds as operational discipline: without it, the batch may diverge or saturate the iteration limit. A related consequence: Catalyst is not a confluent system (Church–Rosser). Semantically equivalent plans can have different normal forms, and the order of rules in the batch determines which one is obtained — that’s why order matters.
// Anti-pattern: the rule keeps "improving" the plan and never terminates.
case Filter(c, child) =>
Filter(And(c, Literal(true)), child)
This example is obvious. There are less obvious ones, and they tend to appear when the rule introduces syntactically fresh nodes on each application. For example:
// Anti-pattern: each application creates new Aliases with new ExprIds.
case Project(projectList, child) =>
Project(projectList.map(e => Alias(e, e.name)()), child)
The plan stays semantically identical, but it changes structurally on every iteration: every Alias() without an explicit ExprId generates a new one, fastEquals fails, and the FixedPoint doesn’t terminate.
Spark has a quiet way to protect itself from non-idempotent rules: FixedPoint has a maximum iteration limit (spark.sql.optimizer.maxIterations, default 100), beyond which it stops and logs a warning. It works — but the warning is easily ignored in production, and a rule that converges at “100 iterations” is a rule doing useless work hundreds of times per query.
The minimum test, when writing a rule, is: apply it, take the output, apply it again, verify output1.fastEquals(output2). It’s a syntactic idempotency test: two semantically equivalent plans with different forms — reordering of conjuncts in an And, reassociation of filters — fail fastEquals even though they’re logically the same plan. To go further, Spark exposes official test scaffolding: PlanChangeLogger logs every rule application (spark.sql.planChangeLog.level=WARN), and since Spark 3.4 RuleExecutor.checkRules / OptimizerRuleChecker verify plan integrity (isPlanIntegral) automatically in test mode.
JVM bootstrap in PySpark
This last one isn’t strictly a rule-writing anti-pattern, but it’s the trap one falls into deploying an extension: the JAR doesn’t load and Spark logs a seemingly inexplicable ClassNotFoundException. The main article touches on it as an operational detail; here what matters is the diagnosis and the scenarios the basic pattern doesn’t cover.
spark = (
SparkSession.builder
.config("spark.jars", "/path/to/extension.jar") # ← often too late
.config("spark.sql.extensions",
"com.example.MyExtension")
.getOrCreate()
)
The anti-pattern isn’t spark.jars itself. The Spark docs are clear: spark.jars is a comma-separated list of JARs included on the driver and executor classpaths. The anti-pattern is setting it too late, once the driver JVM/SparkContext is already up: the builder option cannot retroactively change the already-fixed classpath.
The order of events is this: PySpark, when getOrCreate() is invoked for the first time, launches a new JVM via python/pyspark/java_gateway.py (the function is called launch_gateway). The JVM receives the driver classpath at launch, built by spark-submit from PYSPARK_SUBMIT_ARGS (in particular --jars). Once up, the JVM reads spark.sql.extensions and looks up the class on the driver classpath — which is already fixed. In notebooks and managed environments where the driver is already running before user code starts, .config("spark.jars", ...) in the builder is simply too late. Hence the ClassNotFoundException.
The correct forms depend on the environment:
# 1. via spark-submit, --jars is read at JVM launch
spark-submit --jars /path/to/extension.jar your_script.py
# 2. from "pure" Python, set PYSPARK_SUBMIT_ARGS before import
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = (
"--jars /path/to/extension.jar pyspark-shell"
)
from pyspark.sql import SparkSession # ← now the JVM starts with the JAR
spark = SparkSession.builder \
.config("spark.sql.extensions", "com.example.MyExtension") \
.getOrCreate()
For dependencies resolved as Maven coordinates, the equivalent is spark.jars.packages (also passed via PYSPARK_SUBMIT_ARGS or --packages to spark-submit): same temporal constraint, must be set before the JVM launches.
In managed environments the os.environ pattern doesn’t work at all: the driver JVM is launched by the infrastructure, before user code runs. Quick references:
| Environment | Mechanism |
|---|---|
| spark-submit / local PySpark | --jars or PYSPARK_SUBMIT_ARGS as above |
| Databricks (attached cluster) | cluster-level init script that installs the JAR in /databricks/jars/, or library install from the UI/API |
| EMR Serverless | application-configuration with sparkSubmitParameters that include --jars |
| Dataproc Serverless | gcloud dataproc batches submit with --properties spark.jars=... |
| AWS Glue | --extra-jars in job parameters; sealed classpath, no runtime override |
The same constraint applies to Delta Lake and Iceberg, and to anything based on spark.sql.extensions. It’s not a property of the extension — it’s a property of the JVM bootstrap in PySpark. If you don’t know it, you only learn it after losing an afternoon.
Conclusions
Catalyst makes it easy to write a rule — literally, a LogicalPlan => LogicalPlan function with a pattern match inside. It also makes you believe, at first, that this is enough to solve real problems. The distance between “the rule matches the right pattern” and “the rule behaves well inside the pipeline” is short to write but long to walk: choosing between optimizer rule and check rule, understanding FixedPoint’s effect on emissions, avoiding mutable state, leveraging pruning, ensuring idempotency, and handling JAR bootstrap.
None of the above is esoteric — it’s in Spark’s sources, in the project’s issues, in various Data+AI Summit talks. But it’s not collected in one place, and for someone writing their first extension it’s almost all invisible until it manifests as duplicates in spans, failed classloading in production, or a FixedPoint that doesn’t terminate. The DSL is simple; the framework around the DSL is where the real complexity hides.
References
- The Hidden DSL in Catalyst — the companion article to this one, on Catalyst as an embedded DSL and its extension surface.
- github.com/cdelmonte-zg/planguard — the PlanGuard extension referenced throughout, in its complete version.
- SparkSessionExtensions — the JavaDoc for Spark’s extension point.
- Source code:
org.apache.spark.sql.catalyst.{trees, rules, plans}—RuleExecutor,Rule,TreePattern, and the analysis check phase (checkAnalysis). spark.sql.optimizer.maxIterations,spark.sql.planChangeLog.level— the Spark properties cited forFixedPointsafety and rule-application logging.