post python series · Python idioms · 2025-09-08 · 7 min read

Python idioms I reach for daily, part 3: generators for streaming and composition

#python#generators#streaming#idioms#series-python-idioms

Part 3 (final) of a 3-part series on Python idioms I reach for daily in AI / data engineering work.   Part 1: decorators that earn their keep   Part 2: context managers beyond with open()   Part 3 (this post): generators for streaming and composition

Generators are one of those features that look like syntax sugar (just yield instead of return) but actually unlock a different style of code. Once it clicks, you stop writing “load everything into a list, then loop over it” and start writing “stream items through a pipeline.” The difference matters a lot when “everything” doesn’t fit in memory.

This post is the five generator patterns I use weekly, with the specific places they pay off in production AI / data work.

A 60-second refresher

A function with yield is a generator. Calling it returns a generator object; iterating that object runs the function up to each yield and pauses there.

def count_to(n):
i = 0
while i < n:
yield i
i += 1
g = count_to(5)
print(list(g)) # [0, 1, 2, 3, 4]

The thing that makes this useful at scale: memory is constant. count_to(1_000_000_000) does not allocate a billion-element list; it produces values one at a time, each consumed and discarded.

Two equivalent shapes:

# generator function
def squares(xs):
for x in xs:
yield x * x
# generator expression (one-liner)
squares = (x * x for x in xs)

Both produce the same iterator. Use the expression for trivial transforms; use the function form when you have multiple statements.

Pattern 1: streaming a large file

The most basic but most-impactful use case. Reading a 50 GB file into memory crashes. Reading line-by-line through a generator does not:

def read_lines(path: str):
with open(path, "r") as f:
for line in f: # `f` is itself a generator over lines
yield line.rstrip()
# process a 50 GB file in constant memory
for line in read_lines("/data/giant.log"):
if "error" in line:
process(line)

The trick is that for line in f doesn’t load the file; it reads chunks lazily. Wrapping in your own generator function lets you preprocess each line before yielding (strip whitespace, parse JSON, filter).

JSON-lines variant:

import json
def read_jsonl(path: str):
with open(path, "r") as f:
for line in f:
line = line.strip()
if line:
yield json.loads(line)
for record in read_jsonl("/data/events.jsonl"):
handle(record)

The function is six lines. It works on a 1 GB file or a 1 TB file. That’s the point.

Pattern 2: paginated APIs as a single iterator

You’re calling an API that returns 100 items per page, with a cursor. You want to treat all results as one stream without writing pagination logic at every call site:

def paginated_results(endpoint: str, page_size: int = 100):
"""Yield every item from a cursor-paginated API as if it were one list."""
cursor = None
while True:
params = {"limit": page_size}
if cursor:
params["cursor"] = cursor
response = httpx.get(endpoint, params=params).json()
for item in response["items"]:
yield item
cursor = response.get("next_cursor")
if not cursor:
break

Use:

for user in paginated_results("https://api/users"):
print(user["email"])

The caller doesn’t know or care about pagination. They iterate; the generator handles the cursor logic. Add it as a method on a client class and now your entire codebase treats paginated APIs as plain iterables.

This pattern composes with everything else. Want only the first 1000? itertools.islice(paginated_results(...), 1000). Want them in batches of 50? itertools.batched(paginated_results(...), 50) (Python 3.12+).

Pattern 3: itertools chains for pipeline composition

itertools is the standard library’s underrated-est module. Once you know what’s in it, you stop writing nested for-loops:

import itertools as it
events = read_jsonl("/data/events.jsonl") # generator
clean = (e for e in events if e["valid"]) # generator
flattened = it.chain.from_iterable(e["items"] for e in clean)
batched = it.batched(flattened, 100) # 3.12+
for batch in batched:
process_batch(batch)

What this does:

  1. Stream events from disk
  2. Filter out invalid ones (still streaming)
  3. Flatten the nested items array (still streaming)
  4. Batch into chunks of 100 (still streaming)
  5. Process each batch

No intermediate list ever exists. Memory stays constant regardless of file size. The pipeline reads top-to-bottom; each line is one transformation.

The itertools cheat sheet for pipelines:

FunctionWhat it does
chain(a, b, c)Concatenates iterables
chain.from_iterable(iterable_of_iterables)Flattens by one level
islice(it, start, stop, step)Slice an iterable without materialising
takewhile(pred, it)Yield until pred is False, then stop
dropwhile(pred, it)Skip while pred is True, then yield rest
groupby(it, key)Group consecutive equal-keyed items
tee(it, n)Split into n independent iterators
batched(it, n)(3.12+) Yield batches of size n

I use chain.from_iterable, islice, and batched weekly. The others maybe once a month each.

Pattern 4: batching for downstream that prefers chunks

Many APIs (and most database insert endpoints) want batches, not individual rows. Without itertools.batched, you write the same five lines forever:

def batched(iterable, n):
"""Yield successive n-sized batches from iterable. Pre-3.12 fallback."""
batch = []
for item in iterable:
batch.append(item)
if len(batch) == n:
yield batch
batch = []
if batch:
yield batch

Use:

for batch in batched(read_jsonl("/data/events.jsonl"), n=500):
db.insert_many(batch) # one DB round-trip per 500 events

The same pattern works for upstream rate-limiting too: “send no more than 50 requests per minute” becomes “yield 50, sleep, yield 50, sleep, …”

Pattern 5: async generators

For streaming over an async source — a websocket, a paginated async API, a Kafka consumer — async generators do for await what regular generators do for yield:

import httpx
async def paginated_async(endpoint: str, page_size: int = 100):
cursor = None
async with httpx.AsyncClient() as client:
while True:
params = {"limit": page_size}
if cursor:
params["cursor"] = cursor
response = (await client.get(endpoint, params=params)).json()
for item in response["items"]:
yield item
cursor = response.get("next_cursor")
if not cursor:
break

Use:

async def main():
async for user in paginated_async("https://api/users"):
print(user["email"])

async for consumes the generator; each iteration awaits the next batch when needed. Memory stays constant, network stays cooperative (each await yields the event loop).

This is the workhorse pattern for any async data-fetch with backpressure. Combined with asyncio.Semaphore from the asyncio post, you get a streaming, rate-limited, async pipeline in 20 lines.

A real example: streaming token-by-token from an LLM

Modern LLM APIs return tokens as a stream. The whole point is “show the user the first token in 100 ms” rather than “wait 4 seconds for the complete response.” Async generators are exactly the shape you need:

import anthropic
async def chat_stream(prompt: str):
client = anthropic.AsyncAnthropic()
async with client.messages.stream(
model="claude-sonnet-4-7",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
yield text
async def main():
async for chunk in chat_stream("Why is the sky blue?"):
print(chunk, end="", flush=True)

The user sees text appear word-by-word. Memory holds at most one chunk at a time. The pattern composes: wrap the chat stream in another async generator that translates each chunk, or filters profanity, or counts tokens — pipelines compose top-to-bottom just like the sync itertools chain above.

When NOT to use a generator

A few cases where eager evaluation wins:

What I no longer do

Closing the series

Three idioms, three posts:

Each one removes a different class of repetitive code. None of them are advanced. All three are weekly tools in any codebase that has matured past the prototype phase. If you find yourself writing imperative for-loops with explicit accumulator lists, or wrapping every database call in try/except/commit, or copy-pasting timing code into every function: that’s where the next idiom is hiding.