The most consequential decisions in an execution engine are not always the ones that appear in the plan. Some live in the shape of the data.

Spark evolved into a row-oriented execution engine deeply shaped by the JVM runtime, and that choice defined what Spark became. The analytical ecosystem is now reorganizing itself around something different, and the trace of that reorganization is visible in the physical plan of a typical Spark query that reads Parquet.

Run df.explain on an ordinary Parquet scan, and you will often find one operator sitting just above the scan that has nothing to do with the query itself. It does not filter, join, aggregate, or sort. It is called ColumnarToRow, and its only job is to reorganize bytes from a columnar in-memory layout into a row stream. Why does an analytical engine that reads columnar data need this operator at all?

The data starts columnar

A Parquet file is not a sequence of records waiting to be materialized as Java objects. It is a physical layout built around a specific bet: analytical workloads benefit when the engine can avoid reading and decoding irrelevant columns.

Values are grouped by column, encoded by type (dictionary, run-length, delta, bit-packing), and compressed more effectively than row-oriented alternatives because adjacent values in a column tend to have low entropy relative to each other. A query that touches three columns out of forty reads three column chunks and skips the rest. Predicate pushdown can eliminate entire row groups before decoding begins. File-level and row-group statistics enable data skipping at the metadata layer.

In the dominant lakehouse path, this assumption is everywhere. The table is logical; the physical data is usually stored in columnar files, most often Parquet.

Spark SQL was not designed as a purely columnar execution engine.

This does not mean that every Spark query starts from Parquet, or even from a columnar source. Spark also reads CSV, JSON, Avro, JDBC, Kafka, and other inputs, and many of these paths do not expose a ColumnarToRow operator after the scan. That absence is not a counterexample. It simply means that the columnar-to-row boundary is not visible there. The deeper point is that Spark’s classic SQL execution path still converges on InternalRow and, in its optimized runtime form, on UnsafeRow. ColumnarToRow is the visible trace of this commitment only when the input representation disagrees with the execution representation.

Spark crosses into rows

Spark’s internal execution model has a different center of gravity. The key abstraction is not the Parquet column chunk. It is the internal row, specifically InternalRow and its dominant runtime implementation, UnsafeRow.

InternalRow is an abstract class that defines a uniform API for accessing fields by ordinal: getInt(i), getUTF8String(i), isNullAt(i). It says nothing about how the data is stored. UnsafeRow is the answer to that question.

UnsafeRow does not use the JVM object model to represent data. A row is encoded as a contiguous region of bytes, a single byte[] on the JVM heap (or a raw memory address off-heap), read and written through low-level unsafe memory access with pointer arithmetic. Inside the row payload: no object headers, no boxed values, no pointer chasing between fields. The engine addresses the row as a compact binary region.

To make this concrete, consider a row from a simple query:

SELECT user_id, name, age, score, city FROM users

Schema: [INT, STRING, INT, DOUBLE, STRING], with city null.

The binary layout of the corresponding UnsafeRow looks like this:

=== UnsafeRow Binary Layout ===
Logical payload: 57 bytes | Aligned UnsafeRow size: 64 bytes | Fields: 5

--- NULL BITSET ---
  [0000-0007] 0x0000000000000010  nulls: field[4]

--- FIXED-LENGTH REGION (8 bytes/field) ---
  field[0] [0008-0015] 0x000000000000002A          ← 42
  field[1] [0016-0023] 0x0000003000000009          ← offset=48, len=9
  field[2] [0024-0031] 0x0000000000000026          ← 38
  field[3] [0032-0039] 0x4058600000000000          ← 97.5 (IEEE 754)
  field[4] [0040-0047] 0x0000000000000000          ← NULL

--- VARIABLE-LENGTH REGION ---
  [0048-0056] 43 68 72 69 73 74 69 61 6E
  ASCII:      Christian
  [0057-0063] 00 00 00 00 00 00 00          ← 8-byte alignment padding

The layout above is simplified for exposition, but it follows the structure Spark uses for UnsafeRow: null bitset, fixed-width slots, and variable-length data.

The layout has three regions. A null bitset where each bit maps to a field: reading nullability is a single getLong plus a bit mask. A fixed-length region with one 8-byte slot per field: primitives are stored inline, while variable-length types pack an offset and a byte length into a single 8-byte word, with the upper 4 bytes encoding the offset (measured from the start of the row) where that field’s variable-length bytes begin and the lower 4 bytes encoding the length. A variable-length region where strings live as UTF-8 byte sequences, while arrays, maps, and nested structs are embedded as Spark’s own unsafe binary structures.

Reading user_id means computing the slot address and loading the primitive value directly from the binary region. No allocation. No virtual dispatch. Pointer arithmetic dressed as Java.

That entire row occupies 64 bytes after alignment. On a typical 64-bit JVM with compressed ordinary object pointers, the same logical row represented as an Object[] with boxed primitives and String instances can easily occupy several times more memory across multiple heap objects. At scale, the difference is not only memory footprint. It is the number of objects the JVM has to track, scan, and eventually collect.

The exact byte layout matters less than what the layout enables. By moving execution data out of the ordinary JVM object graph and into a compact binary segment, Spark made it possible to relocate, sort, and ship rows without ever decoding them back into objects. UnsafeRow is still a row, but it behaves more like a memory segment than like a Java object.

The format is not confined to in-memory operator evaluation. UnsafeRow also underlies Spark’s serialized shuffle path, handled by UnsafeShuffleWriter: because rows are already contiguous, length-prefixed byte regions, the shuffle writer can treat them as relocatable byte buffers and sort them by manipulating pointers rather than decoding payloads. UnsafeRow is not just an executor-local row format. It is part of how data moves through the entire execution pipeline.

This is the representation Spark’s classic execution path is built around once the plan crosses the boundary from columnar batches.

The boundary has a name

When Spark reads Parquet through its vectorized reader (ParquetFileFormat with spark.sql.parquet.enableVectorizedReader = true, which is the default), the data arrives as ColumnarBatch, a structure that holds column vectors, one per schema field. This is still columnar. The data has not yet changed shape.

But when the next operator in the physical plan expects rows, which most traditional Spark SQL operators do, the planner inserts ColumnarToRowExec. This operator converts the columnar batch into the row stream expected by the downstream row-based operator.

You can see it in a physical plan:

== Physical Plan ==
*(1) Filter (isnotnull(amount) AND (amount > 100.0))
+- *(1) ColumnarToRow
   +- FileScan parquet [id, name, amount] ...

ColumnarToRow sits between the scan and the first operator that touches the data. It appears only in the physical plan, inserted by ApplyColumnarRulesAndInsertTransitions, the physical planning rule that detects mismatches between operators producing columnar output and operators consuming rows.

From the logical perspective, the query is: scan, filter, project. From the physical perspective: scan columnar, change representation, filter rows, project rows.

   columnar (at rest → in memory)                   row-oriented execution

   ┌─────────┐   ┌───────────────┐   ╔═══════════════╗   ┌─────────────┐
   │ Parquet │──▶│ ColumnarBatch │──▶║ ColumnarToRow ║──▶│ UnsafeRow   │
   │  files  │   │ column vectors│   ║   boundary    ║   │ stream → ops│
   └─────────┘   └───────────────┘   ╚═══════════════╝   └─────────────┘

   physical plan:
     FileScan parquet → *(1) ColumnarToRow → *(1) Filter → *(1) Project

Figure 1: A lakehouse scan starts columnar and stays columnar through ColumnarBatch; classic Spark SQL crosses into row execution at ColumnarToRow, the one operator whose only job is to change the data’s shape.

Conceptually, the values have to be re-expressed from a columnar in-memory layout as a row stream compatible with Spark’s internal row representation. Nullability is mapped to row-level null bits, fixed-width values become row slots, variable-length data has to be made accessible through row-oriented offsets.

On the write path, the picture is slightly different. The row-to-columnar encoding never appears as an operator in the plan. The write command consumes a stream of InternalRows (WriteFilesExec runs its child as an RDD[InternalRow]), and ParquetWriteSupport writes each row field by field into Parquet’s record consumer; the columnar on-disk encoding is performed downstream by the Parquet column writers, not by an in-memory row-to-columnar conversion.

The round trip is real either way: columnar storage to row execution and back to columnar storage. The transition in the plan is the observable symptom. What produces it is what the rest of this article is about.

Why UnsafeRow made sense

This shape change is sometimes read as a critique of Spark’s design, as if a more modern engine would not need it. It is more accurate to read UnsafeRow and the row-oriented execution model as a coherent answer to a specific set of constraints. Constraints that were binding when Spark’s execution engine was built, and that have shifted since.

The first constraint was the JVM itself. In the early 2010s, building a distributed analytical engine meant building it on the JVM. The ecosystem was JVM: Scala for productivity, Hadoop and Hive for storage and SQL, HDFS clients in Java, Kafka in Scala, YARN as the cluster substrate. Distributed coordination, network IO, fault tolerance, and ecosystem integration were the dominant engineering problems, and the JVM solved them well. Bytecode portability meant the same artifact ran across heterogeneous hardware. A mature concurrency model and rich serialization libraries shortened the path from research code to production system. The JVM was not chosen against alternatives. It was the substrate that made the system possible.

But the JVM imposes costs on data-intensive execution that are easy to underestimate until they dominate. Spark’s early execution engine allocated millions of small objects per query: one per field, one per row, one per intermediate result. Large heaps and object-heavy execution made garbage collection a first-order performance concern. Object headers, pointer indirection, and poor cache locality made the CPU spend more time chasing references than computing results.

Project Tungsten was the response. It did not reject the JVM as the runtime. It pushed against the JVM’s own data model from inside it. UnsafeRow was the materialization of that idea: a format that moved data out of the JVM object model into binary buffers managed by Spark’s own memory manager. The engine took back control of allocation, layout, and access patterns while keeping the data on (or off) the JVM heap. The remarkable property of UnsafeRow is not that it is row-oriented. It is that it is almost not-JVM: manual memory management, pointer arithmetic, off-heap buffers, implemented in a language whose runtime was designed to make exactly these operations unnecessary.

Whole-stage codegen extended the same logic. Instead of interpreting a tree of operator objects at runtime, where each operator called next() on its child, received an InternalRow, processed it, and passed it up, Spark began generating Java source code that fused an entire pipeline stage into a single function. Two distinct gains came from this. The first, often described, is that the generated function processes rows through a tight scalar loop in which the JIT can inline field access, eliminate bounds checks, and keep hot variables in CPU registers. The second, structurally larger, is what disappears: the per-row virtual dispatch through the operator tree, and the boxing of InternalRow at operator boundaries. Codegen turns a tree walk over polymorphic objects into a straight-line scalar function. The JIT is the consumer; eliminating polymorphism at the operator level is what makes the JIT’s job tractable.

The choice extended further than execution. UnsafeRow became central to Spark SQL’s shuffle path. Once a row was already a contiguous, length-prefixed byte region, the network layer could move it between executors without re-serializing the rows. The same representation that operators consumed in memory was the representation that flowed through the system between stages. A decision about how to lay out a row in memory had become a decision about how data moved across the cluster.

Each layer reinforced the next, until after several years of development the JVM was no longer just the runtime under Spark. It had become the shape of the engine’s thinking.

The choice of row-oriented execution was not a limitation Spark failed to overcome. It was a trade-off that extracted a decade of production value from the JVM execution model.

That optimization had a cost, but it was not visible until the alternative became practicable.

The cascade

The choice of representation sets the unit of work.

A row engine pushes each row through a chain of operators; a columnar engine applies one operation across an entire column. The difference is not cosmetic. In a row-oriented engine, a filter that evaluates amount > 100.0 on a million rows executes a loop: load the row, extract the field at the right offset, compare, branch, move on. The JVM JIT compiles this into tight scalar code. It is fast, but it is fundamentally one-value-at-a-time.

In a columnar engine, the same filter operates on a contiguous array of doubles. The engine can process the array in batches using SIMD instructions, comparing multiple values with a single instruction. For fixed-width primitive columns there is no row object to decode and no per-row field offset to compute; nullability is handled through a separate bitmap. The values are already adjacent in memory, the CPU prefetcher works efficiently, and cache lines are fully utilized.

From this single choice of representation, the rest of the engine follows as a cascade.

Choosing row at the representation layer makes row-at-a-time operators natural. Row-at-a-time operators make scalar codegen natural. Scalar codegen makes the JVM JIT a natural compiler target. The JVM as compiler target makes manual memory management like UnsafeRow defensible as the way out.

Choosing columnar at the representation layer makes vectorized operators natural. Vectorized operators make native execution natural. Native execution gives the developer the SIMD intrinsics, the memory layout control, and the cache-line discipline that columnar processing rewards.

Each layer constrains the next. Representation is not an implementation detail. It is the constraint that is logically prior to every other choice in the execution model. The shift between row-oriented and columnar execution is not, at its core, about which language hosts the kernel. It is about how the engine organizes its data plane, and everything else cascades from there.

                          same logical query
                       ╱                      ╲
               ▼                                        ▼
     ── ROW PATH ──                             ── COLUMNAR / NATIVE PATH ──

     Row representation                         Columnar representation
               │                                        │
               ▼                                        ▼
     Row-at-a-time operators                    Vectorized operators
               │                                        │
               ▼                                        ▼
     Whole-stage codegen                        Native kernels (C++ / Rust)
               │                                        │
               ▼                                        ▼
     JVM JIT as compiler target                 SIMD + cache locality
               │                                        │
               ▼                                        ▼
     Off-heap manual memory                     Arrow / Velox / Photon

     Representation is not an implementation detail.
     It is the constraint that precedes every other choice.

Figure 2: The representation choice cascades through the stack. Rows pull scalar operators, whole-stage codegen, the JVM JIT, and manual memory behind them; columnar pulls vectorized operators, native kernels, and SIMD.

Spark chose one path coherently. The analytical execution ecosystem is now organizing itself around the other.

Moving the boundary

Several projects now attempt to keep data columnar through execution rather than converting it to rows.

Databricks Photon moves supported parts of SQL and DataFrame execution into a native vectorized engine written in C++ that processes data in columnar batches. At the API layer, nothing changes: the same SQL, the same DataFrame interface, the same Delta tables. Operationally, the picture is more nuanced. Photon is proprietary to Databricks, carries a DBU multiplier, supports a set of operators that changes between releases, and falls back to Spark’s JVM execution for operators it does not cover.

Gluten and Velox approach the same problem from outside Databricks. Gluten is a Spark plugin that intercepts the physical plan, translates it into Substrait (a cross-engine intermediate representation for query plans), and delegates execution to a native backend, typically Velox, Meta’s C++ execution library. The data stays in columnar batches through JNI calls to native code and returns as ColumnarBatch through Spark’s columnar execution API. Velox uses its own columnar vector model, similar to Apache Arrow and interoperable with it but not identical to Arrow’s in-memory layout for every type. Arrow itself has emerged as the lingua franca for columnar in-memory data interchange across analytical engines.

Across these systems the same dynamic applies. For operators the native path covers, the row transition disappears from the hot path. For operators it does not, control returns to Spark’s JVM execution and the ColumnarToRow boundary reappears at the fallback edge.

What these projects share is not a preference for C++ over Java. It is a specific architectural choice: align the in-memory representation with the storage representation, so that the boundary between columnar at rest and columnar in execution disappears for the operators where compute density is high.

ColumnarToRow in the physical plan is one of the boundaries these systems try to remove. They do not remove it everywhere; they remove it where it matters most.

The portability has relocated

The response these systems share is larger than any of them individually. Read together, Photon, Gluten, and Velox describe a single architectural movement that goes beyond Spark.

The JVM was the right portability layer for the problem Spark was originally solving. It provided a common runtime across heterogeneous machines, integrated naturally with Hadoop, Hive, Kafka, and the broader Scala/Java ecosystem, and made distributed analytics operationally portable before the lakehouse stack existed.

The bottleneck mix has shifted. Distributing tasks across nodes is still hard, shuffle still dominates many workloads, skew has not been eliminated. What has changed is that, for an increasing share of analytical compute, the inner loop matters as much as the coordination layer. SIMD width, cache line discipline, vectorized kernel specialization, zero-copy interchange with GPU memory and with other analytical engines: these are the layers where the modern native ecosystems have accumulated the deepest tooling and library coverage. The Vector API, introduced by JEP 338 and refined through many incubation rounds, gives the JVM a credible path into the same space, but it remains an incubating API, and the practical center of gravity for new analytical engine development has settled in C++ and Rust.

This is why DuckDB is C++, Velox is C++, ClickHouse is C++, DataFusion is Rust, Polars is Rust, Photon is C++. The convergence reflects both a technical fit and a set of historical and ecosystem choices: mature columnar libraries, embeddable deployment models, and direct integration with Python and the broader scientific computing stack are easier to access in those ecosystems today. The shift is not nostalgia for compiled languages. It is a practical reorganization toward the stack where new analytical workloads are most directly served.

But portability does not disappear. It relocates.

The old portability layer was the runtime. The new portability layer is the data format and the query plan.

The portability that used to live in JVM bytecode now lives, in stratified form, across distinct layers of the stack. Apache Arrow provides portability of the in-memory columnar representation, enabling zero-copy interchange between engines that agree on how vectors are laid out in RAM. Apache Parquet (and ORC) provide portability of the on-disk format, so the same files are readable by Spark, Trino, DuckDB, ClickHouse, and Polars. Substrait provides portability of the query plan, a cross-engine representation of the operators and functions a backend supports, allowing a planner in one engine to delegate execution to a backend in another. Delta Lake, Iceberg, and Hudi provide portability of the table across engines (snapshots, schema evolution, transactional metadata exposed independently of the execution engine that touches them), each within its own format.

   BEFORE: one center of portability

         ┌────────────────────┐
         │    JVM bytecode    │
         │ the common runtime │
         │   across machines  │
         └────────────────────┘
         Spark execution · Scala/Java ecosystem · Hadoop/Hive/Kafka

                    ║  relocation

   AFTER: portability stratified across layers

     Table semantics    →  Delta · Iceberg · Hudi
     Storage format     →  Parquet · ORC
     In-memory format   →  Apache Arrow
     Query plan         →  Substrait (supported fragments)
     ───────────────────────────────────────────────────────
     Execution          →  Photon · Velox · DataFusion  (specializes)

Figure 3: Portability relocates. The four layers above the line carry the portability the JVM once held alone; the planner, catalog, API, and governance stay stable, and the execution engine underneath is free to specialize.

Each of these layers carries part of the portability burden that JVM bytecode once carried more centrally. The result is a system in which the planner, the catalog, the API surface, and the governance layer can remain stable, while the execution engine can be specialized: compiled for a specific hardware target, written in a language that exposes SIMD and memory layout, optimized for the workload’s actual bottleneck.

This is the division Spark is increasingly being adapted to, from inside through Photon and from outside through Gluten and Velox. The strongest evidence that the portability layer has moved is that even Spark, the system most associated with the old layer, is increasingly surrounded by mechanisms that move execution elsewhere.

Conclusions

ColumnarToRow in a Spark physical plan is the visible trace of a system that lives on the boundary between the two layers. It is the point where a query designed inside the new portability model (columnar at rest, with a logical plan that in principle any engine could execute) meets an execution engine built coherently for the old one. The transition is not a flaw. It is the seam where two architectural eras meet.

The next time ColumnarToRow appears in a physical plan, read it as a record of where Spark stands in the longer movement of analytical systems toward a different locus of portability.