post python series · Python idioms · 2025-09-08 · 7 min read
Python idioms I reach for daily, part 3: generators for streaming and composition
Part 3 (final) of a 3-part series on Python idioms I reach for daily in AI / data engineering work. Part 1: decorators that earn their keep Part 2: context managers beyond
with open()Part 3 (this post): generators for streaming and composition
Generators are one of those features that look like syntax sugar (just yield instead of return) but actually unlock a different style of code. Once it clicks, you stop writing “load everything into a list, then loop over it” and start writing “stream items through a pipeline.” The difference matters a lot when “everything” doesn’t fit in memory.
This post is the five generator patterns I use weekly, with the specific places they pay off in production AI / data work.
A 60-second refresher
A function with yield is a generator. Calling it returns a generator object; iterating that object runs the function up to each yield and pauses there.
def count_to(n): i = 0 while i < n: yield i i += 1
g = count_to(5)print(list(g)) # [0, 1, 2, 3, 4]The thing that makes this useful at scale: memory is constant. count_to(1_000_000_000) does not allocate a billion-element list; it produces values one at a time, each consumed and discarded.
Two equivalent shapes:
# generator functiondef squares(xs): for x in xs: yield x * x
# generator expression (one-liner)squares = (x * x for x in xs)Both produce the same iterator. Use the expression for trivial transforms; use the function form when you have multiple statements.
Pattern 1: streaming a large file
The most basic but most-impactful use case. Reading a 50 GB file into memory crashes. Reading line-by-line through a generator does not:
def read_lines(path: str): with open(path, "r") as f: for line in f: # `f` is itself a generator over lines yield line.rstrip()
# process a 50 GB file in constant memoryfor line in read_lines("/data/giant.log"): if "error" in line: process(line)The trick is that for line in f doesn’t load the file; it reads chunks lazily. Wrapping in your own generator function lets you preprocess each line before yielding (strip whitespace, parse JSON, filter).
JSON-lines variant:
import json
def read_jsonl(path: str): with open(path, "r") as f: for line in f: line = line.strip() if line: yield json.loads(line)
for record in read_jsonl("/data/events.jsonl"): handle(record)The function is six lines. It works on a 1 GB file or a 1 TB file. That’s the point.
Pattern 2: paginated APIs as a single iterator
You’re calling an API that returns 100 items per page, with a cursor. You want to treat all results as one stream without writing pagination logic at every call site:
def paginated_results(endpoint: str, page_size: int = 100): """Yield every item from a cursor-paginated API as if it were one list.""" cursor = None while True: params = {"limit": page_size} if cursor: params["cursor"] = cursor response = httpx.get(endpoint, params=params).json() for item in response["items"]: yield item cursor = response.get("next_cursor") if not cursor: breakUse:
for user in paginated_results("https://api/users"): print(user["email"])The caller doesn’t know or care about pagination. They iterate; the generator handles the cursor logic. Add it as a method on a client class and now your entire codebase treats paginated APIs as plain iterables.
This pattern composes with everything else. Want only the first 1000? itertools.islice(paginated_results(...), 1000). Want them in batches of 50? itertools.batched(paginated_results(...), 50) (Python 3.12+).
Pattern 3: itertools chains for pipeline composition
itertools is the standard library’s underrated-est module. Once you know what’s in it, you stop writing nested for-loops:
import itertools as it
events = read_jsonl("/data/events.jsonl") # generatorclean = (e for e in events if e["valid"]) # generatorflattened = it.chain.from_iterable(e["items"] for e in clean)batched = it.batched(flattened, 100) # 3.12+
for batch in batched: process_batch(batch)What this does:
- Stream events from disk
- Filter out invalid ones (still streaming)
- Flatten the nested
itemsarray (still streaming) - Batch into chunks of 100 (still streaming)
- Process each batch
No intermediate list ever exists. Memory stays constant regardless of file size. The pipeline reads top-to-bottom; each line is one transformation.
The itertools cheat sheet for pipelines:
| Function | What it does |
|---|---|
chain(a, b, c) | Concatenates iterables |
chain.from_iterable(iterable_of_iterables) | Flattens by one level |
islice(it, start, stop, step) | Slice an iterable without materialising |
takewhile(pred, it) | Yield until pred is False, then stop |
dropwhile(pred, it) | Skip while pred is True, then yield rest |
groupby(it, key) | Group consecutive equal-keyed items |
tee(it, n) | Split into n independent iterators |
batched(it, n) | (3.12+) Yield batches of size n |
I use chain.from_iterable, islice, and batched weekly. The others maybe once a month each.
Pattern 4: batching for downstream that prefers chunks
Many APIs (and most database insert endpoints) want batches, not individual rows. Without itertools.batched, you write the same five lines forever:
def batched(iterable, n): """Yield successive n-sized batches from iterable. Pre-3.12 fallback.""" batch = [] for item in iterable: batch.append(item) if len(batch) == n: yield batch batch = [] if batch: yield batchUse:
for batch in batched(read_jsonl("/data/events.jsonl"), n=500): db.insert_many(batch) # one DB round-trip per 500 eventsThe same pattern works for upstream rate-limiting too: “send no more than 50 requests per minute” becomes “yield 50, sleep, yield 50, sleep, …”
Pattern 5: async generators
For streaming over an async source — a websocket, a paginated async API, a Kafka consumer — async generators do for await what regular generators do for yield:
import httpx
async def paginated_async(endpoint: str, page_size: int = 100): cursor = None async with httpx.AsyncClient() as client: while True: params = {"limit": page_size} if cursor: params["cursor"] = cursor response = (await client.get(endpoint, params=params)).json() for item in response["items"]: yield item cursor = response.get("next_cursor") if not cursor: breakUse:
async def main(): async for user in paginated_async("https://api/users"): print(user["email"])async for consumes the generator; each iteration awaits the next batch when needed. Memory stays constant, network stays cooperative (each await yields the event loop).
This is the workhorse pattern for any async data-fetch with backpressure. Combined with asyncio.Semaphore from the asyncio post, you get a streaming, rate-limited, async pipeline in 20 lines.
A real example: streaming token-by-token from an LLM
Modern LLM APIs return tokens as a stream. The whole point is “show the user the first token in 100 ms” rather than “wait 4 seconds for the complete response.” Async generators are exactly the shape you need:
import anthropic
async def chat_stream(prompt: str): client = anthropic.AsyncAnthropic() async with client.messages.stream( model="claude-sonnet-4-7", max_tokens=1024, messages=[{"role": "user", "content": prompt}], ) as stream: async for text in stream.text_stream: yield text
async def main(): async for chunk in chat_stream("Why is the sky blue?"): print(chunk, end="", flush=True)The user sees text appear word-by-word. Memory holds at most one chunk at a time. The pattern composes: wrap the chat stream in another async generator that translates each chunk, or filters profanity, or counts tokens — pipelines compose top-to-bottom just like the sync itertools chain above.
When NOT to use a generator
A few cases where eager evaluation wins:
- You need to iterate the same data multiple times. A generator is consumed on first iteration; the second loop sees an empty iterator. Either materialise to a list (
list(gen)) or useitertools.teeif memory is tight. - You need random access. Generators are sequential. If you need
data[42], materialise to a list or use a different data structure. - The data fits comfortably in memory. For 10,000 small dicts, a list is faster and clearer than a generator. Generators win at scale, not at small N.
- Errors mid-stream are catastrophic. If the 500th item raises, the first 499 may have already been processed and committed downstream. Add explicit transaction boundaries (see context managers).
What I no longer do
- List-comprehend everything. I default to generator expressions now. If a list is needed, the consumer wraps
list(...)explicitly. - Hand-write pagination at every call site. One generator function once.
- Load CSVs and JSONLs entirely into memory. Stream them. The 5 GB file works the same way as the 5 MB one.
Closing the series
Three idioms, three posts:
- Decorators for cross-cutting concerns: retry, timing, feature-flag, cache, context.
- Context managers for resources: transactions, ExitStack, async, scoped config.
- Generators (this post) for streaming and composition: file streams, paginated APIs, itertools chains, async streams.
Each one removes a different class of repetitive code. None of them are advanced. All three are weekly tools in any codebase that has matured past the prototype phase. If you find yourself writing imperative for-loops with explicit accumulator lists, or wrapping every database call in try/except/commit, or copy-pasting timing code into every function: that’s where the next idiom is hiding.