Transformation at scale — Spark, Ray, Daft
Once the data is in a columnar store (lesson 04), you have to process it: filter, map, join, dedup, score. At a few GB you do it on one box. At a few TB you need a cluster — and the moment you do, one operation dominates everything: the shuffle.
The model: data is partitions, work is a DAG over them
Every distributed data tool — Spark, Ray Data, Daft — shares one mental model. Your dataset is split into partitions (independent chunks, ideally ~100–250 MB each). Your job is a DAG of operations over those partitions. Operations come in two kinds, and the difference is the whole game:
| Narrow transformation | Wide transformation (shuffle) | |
|---|---|---|
| Examples | map, filter, select, tokenize | groupBy, join, distinct, repartition, dedup |
| Data movement | None — each partition processed in place | All-to-all — every worker sends rows to every other |
| Cost | Embarrassingly parallel; scales linearly with workers | Network + disk spill; the bottleneck |
| Failure blast radius | One partition | The whole stage re-runs |
A narrow op is free in the sense that throughput scales with the number of workers W: double the cluster, halve the time. A shuffle is different. To groupBy(prompt_hash) for dedup, rows with the same key must end up on the same worker — so every worker partitions its rows by key and sends each bucket across the network to the worker that owns it. That all-to-all exchange writes to disk, crosses the network, and reads back. Adding workers helps far less than for narrow ops: the byte volume per worker drops, but the connection fan-out grows as O(W2) and per-partition coordination overhead eats into the gain — past a point you are buying more shuffle, not less.
Skew: the straggler that eats your cluster
Partitions are processed in parallel, so a stage finishes only when its slowest partition finishes. If your data is skewed — one prompt hash has 50× the rows, one language dominates — that one fat partition becomes a straggler, and 99 idle workers wait for it. This is the single most common reason a "scaled-out" job is somehow not faster. The fixes: salt the hot key, increase partition count so the hot key spreads, or handle the heavy key separately.
Interactive · the shuffle and the straggler
Model a transformation over a dataset. Set the cluster size, how much of the work is a shuffle, and how skewed the data is. Watch where the wall-clock actually goes — and whether adding workers helps or just adds shuffle overhead.
Three tools, three sweet spots
The model above is tool-agnostic, but the tools are not interchangeable. Pick by data size, what runs inside the transform, and your team's language.
| Tool | Engine | Sweet spot | Watch out |
|---|---|---|---|
| Single-node (Polars, DuckDB) | Rust / C++, in-process | < ~100 GB, no cluster to babysit. Often the right answer — most post-training datasets fit. | Bound by one machine's RAM & cores. |
| Spark | JVM, lazy DAG | TB+ batch ETL, mature SQL, joins/aggregations at scale, well-understood shuffle. | JVM ↔ Python (UDF) serialization tax; awkward for GPU model calls. |
| Ray Data | Python-native, streaming | ML in the pipeline — running a reward model or LLM as a transform on GPUs, streaming so CPU and GPU stages overlap. | Younger; you manage more of the execution. |
| Daft | Rust + Python, lazy | Multimodal (images/audio) and Python-first columnar work with a Spark-like API. | Newest of the four; smaller ecosystem. |
Lazy execution: why nothing runs until you ask
All three frameworks are lazy: df.filter(...).select(...).groupBy(...) builds a plan, not a result. Execution happens only at an action (write, count, collect). This lets the optimizer rewrite your plan — push the filter below the join, prune unused columns, fuse adjacent maps — before a single byte moves. It's the same predicate/column pushdown from lesson 04, now applied to the whole DAG. The practical consequence: write transforms declaratively and let the planner reorder them; don't hand-optimize by calling collect() in the middle, which forces materialization and defeats the optimizer.
.collect() or .toPandas() mid-pipeline to "check" the data pulls the entire distributed dataset onto the driver node — which promptly runs out of memory, or silently truncates. Inspect with .limit(n).show(), which the planner can satisfy without materializing everything.