What to know before writing an extension that enters Spark’s pipeline


Foreword

In the previous article I described Catalyst as an embedded DSL: rules that pattern-match on tree nodes, batches executed by a RuleExecutor, an extension surface exposed via SparkSessionExtensions. The final example (a rule that intercepts cartesian joins, Python UDFs, and collect_list) was intentionally a sketch.

When the sketch becomes real code running inside a cluster’s pipeline, things that looked like details turn into concrete problems. This article is the operational counterpart to that first one: a collection of anti-patterns I’ve encountered (some of which I’ve made myself) building PlanGuard in its operational version, with the goal of making the why behind each one explicit.


The temptation to put the throw inside an optimizer rule

In the previous article PlanGuard is registered with injectCheckRule. The choice isn’t obvious. The most immediate reflex, reading the SparkSessionExtensions documentation,1 is to use injectOptimizerRule with a Rule[LogicalPlan] that throws an exception:

object BlockCartesian extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case Join(_, _, Inner, None, _) =>
      throw new SparkException("cartesian product detected")
  }
}

…and register it with injectOptimizerRule. The rule enters the optimizer batch, pattern-matches on the Inner join without condition, throws. It works, at least the day you write it.

What you don’t see, and what justifies the different choice in the main article, is where the throw ends up. An optimizer rule is invoked by RuleExecutor.execute inside a Batch(FixedPoint, ...), which is itself called by QueryExecution.optimizedPlan. Above sits assertAnalyzed, and above that the caller where the user wrote df.collect() or df.explain(). The throw crosses all these levels, and any operation that materializes executedPlan: df.collect(), df.show(), df.write..., even df.explain(), which is supposed to be there precisely to see the plan and diagnose it. Operations that stop at analysis (df.queryExecution.analyzed, df.printSchema which uses analyzed.schema) don’t trigger it: the optimizer is strictly lazy.

The right hook is injectCheckRule. A check rule is a function LogicalPlan => Unit that Spark invokes inside checkAnalysis, at the end of Analyzer.execute, so after analysis, before optimization. It can throw an exception to reject the plan. It’s documented by Spark for exactly this purpose: “this query must not be executed”.

A useful mechanical detail: analysis is memoized via QueryExecution.analyzed (lazy val), so on a DataFrame that has already been analyzed explain() does not re-fire the check rule. But if the plan hasn’t been analyzed yet, explain() does activate it: the end of analysis is the single point at which the check rule runs.

There’s also a less obvious temporal consequence. With injectOptimizerRule, the throw reaches the caller of df.collect() (the optimizer is lazy: rewriting only happens when an action requires the physical plan). With injectCheckRule, analysis is run inside Dataset.ofRows, which is called by every method that returns a new DataFrame: df.filter, df.join, df.select. The query “blows up” at the DataFrame’s construction, not at collect.

val joined = left.join(right)   // ← with CheckRule, SparkException is thrown here
joined.collect()                 // ← with OptimizerRule, it would have been here

From the user’s perspective the check rule is better: the failure is attached to analysis, not to optimization or execution. In Spark classic this usually means the exception is raised while constructing the next DataFrame, close to the offending transformation, rather than being deferred to collect/show/write. But it’s a detail that changes the shape of the tests: the try/catch must wrap the DataFrame construction, not just the action. It’s exactly the shift that separates an “article-sketch rule” from a “pipeline rule”.

In short, choose the hook based on intent:

  • semantic invariant of the plan (rejecting queries that must not run) → injectCheckRule2
  • plan rewriting (transformation that produces a new LogicalPlan) → injectOptimizerRule
  • per-query metrics/observability (observing without modifying) → QueryExecutionListener

Side-effects and FixedPoint multiplication

The other reason injectCheckRule is preferred isn’t the throw: it’s the number of times the rule is called. Even when you write a legitimate optimizer rule (one that actually rewrites the plan, because that’s what optimizer rules are supposed to do) and you add “innocent” side-effects like logging or metric emission, there’s a less obvious consequence. Spark’s optimizer organizes its rules into batches; many batches have a FixedPoint strategy, which means: “execute all the batch’s rules repeatedly until the plan stops changing”.

Batch "Operator Optimization" (FixedPoint)
  ├── PredicatePushdown
  ├── ConstantFolding
  ├── ColumnPruning
  ├── DetectDangerousPatterns      ← ours
  └── SimplifyConditionals

If our rule is a no-op (returns the plan unchanged), it doesn’t contribute to termination. But if other rules in the batch keep rewriting (ConstantFolding simplifies an expression, PredicatePushdown moves a filter, ColumnPruning trims a projection), every batch iteration re-applies ours too (in the absence of pruning declared via nodePatterns/containsAnyPattern). Typically this is a few iterations for a simple plan; with subqueries, CTEs, or complex predicates the number can rise significantly.

The result is concretely observable, and reproducible. The script examples/fixedpoint_multiplication_demo.scala builds a custom RuleExecutor whose single batch is FixedPoint(100) containing a counting probe alongside ConstantFolding, BooleanSimplification, and CombineFilters — three real Catalyst rules whose rewrites keep the batch iterating until convergence. The probe is observational (returns the plan unchanged); the other three are what makes the batch loop. Run on a query with three adjacent Filter nodes (Spark 4.1.1, local[1]):

[demo] Filters in analyzed plan (N = expected count): 3
[demo] CountingProbe invocations under FixedPoint: 6
[demo] CountingProbe invocations as a check rule (one-shot): 3

The probe fires 6 times for 3 actual nodes; the same logic applied as a check rule, outside any FixedPoint, fires exactly 3. The 2× ratio is artifact-specific — it depends on which rules are in the batch and how quickly they converge — but the structural point is unchanged: every FixedPoint iteration re-applies the rule, and any side-effect (Prometheus counter increment, log warning, audit event) is duplicated accordingly. In real Spark optimizer batches with subqueries, CTEs, or complex predicates, the multiplier rises.

Faced with this problem, the practicable routes are three, in order of preference:

  1. Move the rule into a check rule (as in the previous point). Check rules run once after analysis: no FixedPoint, no multiplication.

  2. Memoize per plan-id (treat as anti-pattern, not solution). The natural temptation is to track the System.identityHashCode(plan) of the input and skip if already seen. It doesn’t work: Catalyst is immutable, every transform/copy produces a new instance, so at iteration N+1 the input plan has a different identityHashCode from iteration N even if structurally identical. The cache never hits between iterations. To deduplicate across iterations the key must be structural (plan.canonicalized or a hash of treeString), not referential, but at that point the cost of structural comparison exceeds the cost of the duplication you were trying to avoid. The truly correct pattern, when the need is “one metric per query”, is to observe outside the optimizer: QueryExecutionListener.onSuccess is the documented point for per-query emissions. Caveat: it only sees queries that reach execution. Queries rejected in analysis don’t trigger it, so it isn’t a substitute for a check rule when enforcement is required.

  3. Use transformDownWithPruning with nodePatterns. It doesn’t resolve the multiplication of emissions across iterations (the rule still gets revisited), but it reduces the cost of a single traversal by skipping entire subtrees that don’t contain the patterns of interest. It’s a cost mitigation, not an architectural fix.

The rule many people think they’ve written (“one emission per detection”) corresponds to the check rule. The one written as an optimizer rule is “one emission per detection per batch iteration”, a different thing.


Unprotected mutable state

The architectural observation comes before the code: if a Rule needs mutable state to be correct, it’s probably in the wrong place. A check rule that runs once doesn’t have this problem; neither does a QueryExecutionListener for per-query metrics. If the temptation comes anyway, it’s worth seeing why the “obvious” mitigations (var, ThreadLocal, ConcurrentHashMap) are traps.

The memoization from the previous point (“track the plans already processed”) looks innocuous. It’s written like this:

private var lastSeen: LogicalPlan = _

def apply(plan: LogicalPlan): LogicalPlan = {
  if (plan eq lastSeen) return plan
  lastSeen = plan
  // ... emit, log, etc.
  plan
}

A var inside a Rule. It works as long as there’s one query at a time on a single rule instance. Spark by default doesn’t parallelize the optimizer across different queries, but several scenarios break this quiet: SparkSession.newSession() allows multiple sessions on the same JVM; notebooks execute cells sequentially, but not always; Spark Connect (SparkConnectService) keeps the same JVM alive with N clients; AQE (AdaptiveSparkPlanExec) re-applies the optimizer on subplans during execution, on threads different from the main one. How the rule is instantiated depends on how you register it. injectOptimizerRule(_ => MyRule) with MyRule as a singleton object means state shared globally across every session in the JVM; injectOptimizerRule(session => new MyRule(session)) gives at least one instance per session, but that same instance can still be used by multiple concurrent queries within the same session.

ThreadLocal isn’t a reliable way out: AQE crosses threads (sub-plans are optimized in scala.concurrent.Future with a dedicated executor), and reused thread pools can leak values across queries. ConcurrentHashMap keyed on identityHashCode is the FixedPoint multiplication problem amplified: a LogicalPlan’s referential identity changes with every transform/copy, so the cache doesn’t hit either across FixedPoint iterations or across queries. To really deduplicate across plans the key must be structural (canonicalized), and at that point you’ve written a mini-optimizer inside a rule.

Conclusion: mutable state in a Rule is a time bomb, and the mitigations one would find writing the fix alone are almost all wrong. If the need is “don’t emit the same warning more than once in 5 minutes”, that rate-limit belongs to the observability system, not to the Spark rule.


foreach when transformDownWithPruning would have done

A typical inspection rule does something like:

def apply(plan: LogicalPlan): LogicalPlan = {
  plan.foreach {
    case Filter(condition, _) if isDangerous(condition) =>
      logWarning("...")
    case _ =>
  }
  plan
}

plan.foreach visits every node of the tree, whatever the pattern of interest. On a large plan (nested subqueries, views, CTEs, broadcast hints) that means many visits per rule, per iteration. It isn’t a correctness problem (foreach doesn’t rewrite the plan, so it doesn’t violate idempotency); it’s a cost problem. Catalyst has an API that does the same thing with structural pruning: transformDownWithPruning, paired with a containsAnyPattern(...) test that skips entire subtrees if they don’t contain any of the indicated patterns.

import org.apache.spark.sql.catalyst.trees.TreePattern._

def apply(plan: LogicalPlan): LogicalPlan = {
  plan.transformDownWithPruning(_.containsAnyPattern(JOIN, FILTER, AGGREGATE)) {
    case f @ Filter(condition, _) if isDangerous(condition) =>
      logWarning("...")
      f
    case other => other
  }
}

The mechanism is the same one Spark’s native rules use internally: nodePatterns and TreePatternBits were introduced in Spark 3.2 precisely to reduce the cost of iteration in the FixedPoint. It’s an accelerator for the rewriting engine: subtrees without the indicated patterns are skipped entirely, so on plans with few relevant nodes one visits a fraction of the total nodes. It must be kept distinct from idempotency and termination: containsAnyPattern doesn’t help convergence, it only accelerates it.3


Idempotency

An optimizer rule should be idempotent, or at least participate in a batch whose composed transformation reaches a fixed point.4 Catalyst does not check idempotency rule by rule: it compares the plan produced by one batch iteration with that of the previous iteration, and stops when they are structurally equal (fastEquals). In practice idempotency holds as operational discipline: without it, the batch may diverge or saturate the iteration limit. Semantically equivalent plans can have different normal forms, and the order of rules in the batch determines which one is obtained: that’s why order matters.

// Anti-pattern: the rule keeps "improving" the plan and never terminates.
case Filter(c, child) =>
  Filter(And(c, Literal(true)), child)

This example is obvious. There are less obvious ones, and they tend to appear when the rule introduces syntactically fresh nodes on each application. For example:

// Anti-pattern: each application creates new Aliases with new ExprIds.
case Project(projectList, child) =>
  Project(projectList.map(e => Alias(e, e.name)()), child)

The plan stays semantically identical, but it changes structurally on every iteration: every Alias() without an explicit ExprId generates a new one, fastEquals fails, and the FixedPoint doesn’t terminate.

Spark has a quiet way to protect itself from non-idempotent rules: FixedPoint has a maximum iteration limit (spark.sql.optimizer.maxIterations, default 100), beyond which it stops and logs a warning. It works, but the warning is easily ignored in production, and a rule that converges at “100 iterations” is a rule doing useless work hundreds of times per query.

The minimum viable test, when writing a rule, is: apply it, take the output, apply it again, verify output1.fastEquals(output2). It catches the most frequent class of non-idempotency — the one shown above, where the rule introduces syntactically fresh nodes (Alias() without explicit ExprId, freshly allocated case classes) that defeat structural equality on every application. For subtler equivalences — reordering of conjuncts in an And, reassociation of filters — fastEquals is too strict; the upgrade path is plan.canonicalized.fastEquals(other.canonicalized), which normalizes the tree before comparing.


Testing rules the way Spark tests its own

Idempotency is the minimum bar. Spark also exposes the scaffolding it uses to test its own rules, and an extension that adopts the same scaffolding gets much closer to the testing discipline Spark uses for native rules. PlanChangeLogger (spark.sql.planChangeLog.level=WARN) logs rule applications that actually change the plan, together with batch-level summaries and metrics. It’s the same instrument Spark uses to inspect optimizer rewrites, and it works equally well for user-injected rules.

RuleExecutor exposes two override hooks, validatePlanChanges and validatePlanChangesLightweight, that run plan-integrity checks after every rule application; they’re gated by spark.sql.planChangeValidation and spark.sql.lightweightPlanChangeValidation. The actual checker is LogicalPlanIntegrity (org.apache.spark.sql.catalyst.plans.logical), with validateOptimizedPlan, validateExprIdUniqueness, and validateSchemaOutput covering the most common breakage modes: unresolved attributes after analysis, broken exprId references, schema drift between input and output. For unit testing, PlanTest and its derivatives (AnalysisTest, PlanTestBase) are the fixtures the Catalyst test suite uses: they run rules in isolation, compare logical plans with normalized exprIds, and assert on plan structure rather than on rendered strings. The practical consequence: a test that fails after a Spark version bump because the plan’s treeString formatting changed is a test you wrote yourself; a test that fails because the structure of the plan changed is the same kind of signal Spark’s own CI catches.


JVM bootstrap in PySpark

This last one isn’t strictly a rule-writing anti-pattern, but it’s the trap one falls into deploying an extension: the JAR doesn’t load and Spark logs a seemingly inexplicable ClassNotFoundException. The main article touches on it as an operational detail; here what matters is the diagnosis and the scenarios the basic pattern doesn’t cover.

spark = (
    SparkSession.builder
      .config("spark.jars", "/path/to/extension.jar")        # ← often too late
      .config("spark.sql.extensions",
              "com.example.MyExtension")
      .getOrCreate()
)

The anti-pattern isn’t spark.jars itself. The Spark docs are clear: spark.jars is a comma-separated list of JARs included on the driver and executor classpaths. The anti-pattern is setting it too late, once the driver JVM/SparkContext is already up: the builder option cannot retroactively change the already-fixed classpath.

The order of events is this: PySpark, when getOrCreate() is invoked for the first time, launches a new JVM via python/pyspark/java_gateway.py (the function is called launch_gateway). The JVM receives the driver classpath at launch, built by spark-submit from PYSPARK_SUBMIT_ARGS (in particular --jars). Once up, the JVM reads spark.sql.extensions and looks up the class on the driver classpath, which is already fixed. In notebooks and managed environments where the driver is already running before user code starts, .config("spark.jars", ...) in the builder is simply too late. Hence the ClassNotFoundException.

The correct forms depend on the environment:

# 1. via spark-submit, --jars is read at JVM launch
spark-submit --jars /path/to/extension.jar your_script.py
# 2. from "pure" Python, set PYSPARK_SUBMIT_ARGS before import
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--jars /path/to/extension.jar pyspark-shell"
)
from pyspark.sql import SparkSession   # ← now the JVM starts with the JAR
spark = SparkSession.builder \
    .config("spark.sql.extensions", "com.example.MyExtension") \
    .getOrCreate()

For dependencies resolved as Maven coordinates, the equivalent is spark.jars.packages (also passed via PYSPARK_SUBMIT_ARGS or --packages to spark-submit): same temporal constraint, must be set before the JVM launches.

In managed environments the os.environ pattern doesn’t work at all: the driver JVM is launched by the infrastructure, before user code runs. Quick references:

Environment Mechanism
spark-submit / local PySpark --jars or PYSPARK_SUBMIT_ARGS as above
Databricks (attached cluster) cluster-level init script that installs the JAR in /databricks/jars/, or library install from the UI/API
EMR Serverless application-configuration with sparkSubmitParameters that include --jars
Dataproc Serverless gcloud dataproc batches submit with --properties spark.jars=...
AWS Glue --extra-jars in job parameters; sealed classpath, no runtime override

A separate wrinkle for self-managed Spark with --deploy-mode cluster (YARN, Kubernetes, Standalone): spark-submit --jars still works — the cluster manager handles distribution to the remote driver — but the os.environ["PYSPARK_SUBMIT_ARGS"] trick presupposes that the local Python process launches the driver JVM, which isn’t the case when the driver runs remotely. Cluster mode goes through spark-submit proper, not through PySpark’s local gateway.

The same constraint applies to Delta Lake and Iceberg, and to anything based on spark.sql.extensions. It’s not a property of the extension: it’s a property of the JVM bootstrap in PySpark. If you don’t know it, you only learn it after losing an afternoon.


Conclusions

Catalyst makes it easy to write a rule: literally, a LogicalPlan => LogicalPlan function with a pattern match inside. It also makes you believe, at first, that this is enough to solve real problems. The distance between “the rule matches the right pattern” and “the rule behaves well inside the pipeline” is short to write but long to walk: choosing between optimizer rule and check rule, understanding FixedPoint’s effect on emissions, avoiding mutable state, leveraging pruning, ensuring idempotency, testing the rule the way Spark tests its own, and handling JAR bootstrap.

None of the above is esoteric: it’s in Spark’s sources, in the project’s issues, in various Data+AI Summit talks. But it’s not collected in one place, and for someone writing their first extension it’s almost all invisible until it manifests as duplicates in spans, failed classloading in production, or a FixedPoint that doesn’t terminate. The DSL is simple; the framework around the DSL is where the real complexity hides.


Notes


  1. SparkSessionExtensions ScalaDoc — the documented Developer API for the extension point. The corresponding source resides in org.apache.spark.sql.catalyst.{trees, rules, plans}: RuleExecutor, Rule, TreePattern, and the analysis check phase (checkAnalysis). ↩︎

  2. The check rule fires once per analyzed plan. If the same query goes through CacheManager/Dataset.persist and is re-executed on a cache hit, the check rule is not re-applied: the plan has already “passed” analysis. ↩︎

  3. Two practical notes. Readability: transformDownWithPruning has the same signature as transform, and for an inspection rule that returns the node unchanged the visual pattern is identical; the difference is only in the pruning predicate, which is one line. Compatibility: TreePattern is Spark’s internal API (org.apache.spark.sql.catalyst.trees), and its members can change between minor versions (3.2 → 3.3 → 3.5 → 4.0); for extensions distributed cross-version it’s worth verifying against the target version before relying on a specific enum name. ↩︎

  4. In term-rewriting theory the standard termination criterion is the existence of a reduction ordering (well-founded, monotonic, stable on contexts and substitutions); Knuth–Bendix completion presupposes one to orient equations toward confluence. Per-rule idempotency is sufficient but not necessary: mutually recursive rules can reach a structural fixed point without any one being idempotent. The non-uniqueness of normal forms is the related lack of confluence (Church–Rosser): both theoretical conditions are useful discipline for Catalyst rule authors, but neither is enforced by the framework. ↩︎