data_engineering / 05 · transformation lesson 5 / 11

Transformation at scale — Spark, Ray, Daft

Once the data is in a columnar store (lesson 04), you have to process it: filter, map, join, dedup, score. At a few GB you do it on one box. At a few TB you need a cluster — and the moment you do, one operation dominates everything: the shuffle.

Where we are
Lesson 04 gave us partitioned Parquet that's cheap to scan. This lesson is the engine that reads those partitions in parallel and turns bronze into silver. The next three lessons (dedup, tokenization, quality) are all specific transformations that run on the model built here.

The model: data is partitions, work is a DAG over them

Every distributed data tool — Spark, Ray Data, Daft — shares one mental model. Your dataset is split into partitions (independent chunks, ideally ~100–250 MB each). Your job is a DAG of operations over those partitions. Operations come in two kinds, and the difference is the whole game:

Narrow transformationWide transformation (shuffle)
Examplesmap, filter, select, tokenizegroupBy, join, distinct, repartition, dedup
Data movementNone — each partition processed in placeAll-to-all — every worker sends rows to every other
CostEmbarrassingly parallel; scales linearly with workersNetwork + disk spill; the bottleneck
Failure blast radiusOne partitionThe whole stage re-runs

A narrow op is free in the sense that throughput scales with the number of workers W: double the cluster, halve the time. A shuffle is different. To groupBy(prompt_hash) for dedup, rows with the same key must end up on the same worker — so every worker partitions its rows by key and sends each bucket across the network to the worker that owns it. That all-to-all exchange writes to disk, crosses the network, and reads back. Adding workers helps far less than for narrow ops: the byte volume per worker drops, but the connection fan-out grows as O(W2) and per-partition coordination overhead eats into the gain — past a point you are buying more shuffle, not less.

The one rule
Minimize shuffles, and shuffle as little data as possible. Filter before you join. Project away columns you don't need before the shuffle. A pipeline that does one shuffle over 200 GB will beat one that does three shuffles over 2 TB, even on the same cluster.

Skew: the straggler that eats your cluster

Partitions are processed in parallel, so a stage finishes only when its slowest partition finishes. If your data is skewed — one prompt hash has 50× the rows, one language dominates — that one fat partition becomes a straggler, and 99 idle workers wait for it. This is the single most common reason a "scaled-out" job is somehow not faster. The fixes: salt the hot key, increase partition count so the hot key spreads, or handle the heavy key separately.

Interactive · the shuffle and the straggler

Model a transformation over a dataset. Set the cluster size, how much of the work is a shuffle, and how skewed the data is. Watch where the wall-clock actually goes — and whether adding workers helps or just adds shuffle overhead.

Distributed transformation cost model
A simplified but honest model: narrow work parallelizes across workers; shuffle work pays an all-to-all network/spill tax that grows with partition count; skew creates a straggler that gates the stage.
Map time
Shuffle time
Straggler tax
Wall-clock
Effective
Suggested tool

Three tools, three sweet spots

The model above is tool-agnostic, but the tools are not interchangeable. Pick by data size, what runs inside the transform, and your team's language.

ToolEngineSweet spotWatch out
Single-node
(Polars, DuckDB)
Rust / C++, in-process< ~100 GB, no cluster to babysit. Often the right answer — most post-training datasets fit.Bound by one machine's RAM & cores.
SparkJVM, lazy DAGTB+ batch ETL, mature SQL, joins/aggregations at scale, well-understood shuffle.JVM ↔ Python (UDF) serialization tax; awkward for GPU model calls.
Ray DataPython-native, streamingML in the pipeline — running a reward model or LLM as a transform on GPUs, streaming so CPU and GPU stages overlap.Younger; you manage more of the execution.
DaftRust + Python, lazyMultimodal (images/audio) and Python-first columnar work with a Spark-like API.Newest of the four; smaller ecosystem.
Post-training–specific reason Ray shows up
Much post-training data engineering involves running models inside the pipeline: scoring responses with a reward model, generating synthetic data with an LLM, filtering with a classifier. That's a GPU transform sitting in the middle of a CPU pipeline (read → tokenize → model → write). Ray Data's streaming executor keeps the GPU fed while CPU stages run ahead — which is exactly the producer/consumer overlap you'll see again in the RL online dataplane (lesson 10).

Lazy execution: why nothing runs until you ask

All three frameworks are lazy: df.filter(...).select(...).groupBy(...) builds a plan, not a result. Execution happens only at an action (write, count, collect). This lets the optimizer rewrite your plan — push the filter below the join, prune unused columns, fuse adjacent maps — before a single byte moves. It's the same predicate/column pushdown from lesson 04, now applied to the whole DAG. The practical consequence: write transforms declaratively and let the planner reorder them; don't hand-optimize by calling collect() in the middle, which forces materialization and defeats the optimizer.

Classic mistake
Calling .collect() or .toPandas() mid-pipeline to "check" the data pulls the entire distributed dataset onto the driver node — which promptly runs out of memory, or silently truncates. Inspect with .limit(n).show(), which the planner can satisfy without materializing everything.

Takeaway

What to carry to lesson 06
Distributed transformation is partitions + a DAG of narrow (free-scaling) and wide (shuffle, expensive) ops. The shuffle and the straggler — not the map — decide wall-clock. Choose single-node until you can't, then Spark for batch SQL/joins and Ray when a model runs inside the transform. Dedup, the subject of lesson 06, is the canonical shuffle — it groups the whole dataset by a similarity key — so everything here applies directly.