post data-engineering · 2026-01-19 · 6 min read

Spark Structured Streaming: watermarks, late data, and the mistakes I made

#pyspark#spark#streaming#data-engineering#learnings

The first streaming job I shipped in production was a batch pipeline I had translated naively, replacing read.parquet() with readStream and calling it streaming. It worked for two weeks, then quietly started dropping events from a partition that lagged. By the time we noticed, three days of data was gone.

This post is the things I now know about Spark Structured Streaming that the docs explain in pieces: watermarks, late data, stateful aggregations, output modes, and the four operational habits that keep a streaming job healthy.

The mental model

Structured Streaming runs your query incrementally. Conceptually, every micro-batch is “what new data has arrived since the last run, what does the result look like now, write the diff.” That framing is enough to understand 80% of what’s happening.

stream = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", "events")
.load())
result = (stream
.selectExpr("CAST(value AS STRING)", "timestamp")
.groupBy(window("timestamp", "5 minutes"))
.count())
(result.writeStream
.outputMode("update")
.format("delta")
.option("checkpointLocation", "/chk/event_count")
.trigger(processingTime="30 seconds")
.start())

What this says: read from Kafka, count events per 5-minute window, write the running counts to Delta every 30 seconds, remember progress in a checkpoint.

Watermarks: the load-bearing concept

A watermark is your declaration of “I’m willing to wait this long for late data.” Without one, every aggregation accumulates state forever — every window stays open in memory because the engine doesn’t know if more events for that window will arrive. With one, the engine can close old windows and free their state.

stream.withWatermark("timestamp", "10 minutes") \
.groupBy(window("timestamp", "5 minutes")) \
.count()

This says: I’ll accept events up to 10 minutes after their timestamp. After that, drop them. The engine uses this to decide which windows are “complete enough” to finalise.

Two real failure modes I’ve seen:

Watermark too aggressive. “Why are we missing 3% of events?” Because the watermark was 1 minute and 3% of events arrive 70+ seconds late from a flaky producer. The engine’s silently dropping them. Fix: tune the watermark based on actual late-data distribution. Look at p99 of event_time - ingestion_time over a week, set watermark to ~1.5x that.

Watermark too lax. State table grows forever, executor memory creeps up, job OOMs after 6 days. Fix: profile state size in Spark UI’s streaming page, tighten until it stabilises.

Late-data distribution: measure before you guess

Pull a sample from the bronze layer and look at the gap between event_time and ingestion_time:

sample = (spark.read.parquet("/bronze/events")
.filter("ingestion_date = current_date()")
.selectExpr(
"(unix_timestamp(ingestion_time) - unix_timestamp(event_time)) AS lag_seconds"
)
.filter("lag_seconds >= 0"))
sample.summary("50%", "90%", "99%", "99.9%", "max").show()
+-------+-----------+
|summary|lag_seconds|
+-------+-----------+
| 50%| 12|
| 90%| 45|
| 99%| 287|
| 99.9%| 1804|
| max| 7251|
+-------+-----------+

p99 is 287 seconds (~5 min). p99.9 is ~30 min. Worst case is 2 hours. So a 10-minute watermark catches 99% of late data and drops 1%. A 30-minute watermark catches 99.9%. A 2-hour watermark catches everything but holds way more state.

Tradeoff: latency vs completeness. Pick based on the consumer. Real-time alerting? 10-min watermark, accept dropping 1%. Daily KPIs? 1-hour watermark, lose latency but get accuracy.

Output modes: complete, append, update

Three options, easy to pick wrong:

ModeWhat it writesUse when
appendOnly newly-finalised rowsStateless transforms; aggregations after watermark closes a window
updateEvery changed row since last batchAggregations you want to read continuously, like a running count
completeThe full result table every batchTiny aggregations only (fits in memory); avoid in production

complete mode is a trap. It rewrites the entire output every micro-batch. With a 1-billion-row aggregation, that’s 1 billion rows written every 30 seconds. Don’t.

For “running count per window” use update. For “finalised counts after the window closes” use append (combined with a watermark so the engine knows when a window is final).

Stateful aggregations: where memory goes

Stateful operations (aggregations, joins between streams, dropDuplicates with watermark) hold state per key. State lives in the executor heap and gets checkpointed to disk.

The 80/20 rule for state size:

state size ≈ (avg state per key) × (num keys still tracked)

num keys still tracked is bounded by the watermark. If your watermark is 10 minutes and you have ~10k events per minute with ~1k unique keys per minute, you’re tracking roughly 10k unique keys at any time. Each key holds maybe a few KB of aggregation state. State size ~10 MB. Fine.

If you have 100M unique keys (one per user_id with a long-tail of inactive users), state grows accordingly. Now you need to either:

  1. Tighten the watermark so old keys age out faster.
  2. Use mapGroupsWithState / flatMapGroupsWithState with custom timeouts to evict keys explicitly.
  3. Move state to RocksDB-backed state store (spark.sql.streaming.stateStore.providerClass) which lives on disk, not heap. Slower per-key but unbounded.

Option 3 became much more practical in Databricks Runtime 13+. Default to it for any aggregation with > 10k state keys.

Idempotent sinks via checkpoints + transactional writes

Streaming jobs WILL retry. Network blip, executor crash, deploy. The job restarts from the checkpoint, re-processes the most-recent uncommitted micro-batch. If your sink isn’t idempotent, you get duplicates.

Delta Lake makes this easy. Use foreachBatch with a deterministic merge:

def upsert_to_delta(microbatch_df, batch_id):
target = DeltaTable.forPath(spark, "/silver/events")
(target.alias("t")
.merge(microbatch_df.alias("s"), "t.event_id = s.event_id")
.whenNotMatchedInsertAll()
.execute())
(stream.writeStream
.foreachBatch(upsert_to_delta)
.option("checkpointLocation", "/chk/events")
.trigger(processingTime="30 seconds")
.start())

Each micro-batch is a MERGE on a stable key. Re-running the same batch produces the same target state. Duplicates impossible.

For non-Delta sinks, use the batch_id argument as an idempotency key. Many warehouses support a MERGE INTO ... ON dedup_key = ? pattern; pass the batch_id so retries no-op.

Four operational habits that keep streaming healthy

After enough 3am pages, these became reflexes:

1. Monitor input rate vs processing rate. The Spark streaming UI shows both. If input > processing for more than a few batches, you’re falling behind. Either scale up the cluster or drop the watermark to free state.

2. Alert on watermark progression. If latestOffset - committedOffset grows without bound, the job’s stuck on a partition. Often a malformed message that throws repeatedly. Set up alerting on the offset-lag metric.

3. Bound the micro-batch. Use maxOffsetsPerTrigger (Kafka) or maxFilesPerTrigger (file source) to cap how much each batch processes. Without it, a backlogged source dumps gigabytes into one batch and the job pauses for 20 minutes.

4. Test recovery before you need it. Once a quarter, kill the streaming job, delete the checkpoint, restart with a backfill. Make sure the backfill catches up cleanly. The first time you do this in a real outage is not the time to find out the recovery path is broken.

What I no longer do

When to NOT use Structured Streaming

Sometimes batch is the right answer:

If your latency requirement is < 5 minutes and data flows continuously, streaming wins. Otherwise reconsider.

Closing

Structured Streaming is a small idea (incremental query) wrapped in a lot of machinery (watermarks, state stores, checkpoints, output modes). Get the watermark right, pick output mode based on the consumer, use foreachBatch with idempotent sinks, monitor input vs processing rate. The four operational habits keep most streaming jobs alive without 3am pages.

The mistake I started with — translating batch to streaming with no watermark — is exactly the failure mode the watermark exists to prevent. Don’t skip it.