post python · 2024-08-12 · 5 min read
asyncio in production: what works, what surprises, and what to skip
asyncio confuses people because Python explains it backwards. The docs lead with async def and await syntax, when the thing you actually need to understand is the event loop. This post is the mental model that finally made it click for me, plus the four production patterns I reach for and the four mistakes I’ve stopped making.
The mental model
asyncio runs on a single thread, with cooperative scheduling. There is one event loop. It picks a task, runs it until that task hits an await, suspends it, and runs another task. No preemption: a task only yields when it explicitly does.
import asyncio
async def task(name, delay): print(f"{name}: starting") await asyncio.sleep(delay) # ← yields here print(f"{name}: done")
async def main(): await asyncio.gather( task("A", 1), task("B", 2), task("C", 1), )
asyncio.run(main())
# Output (in some order):# A: starting# B: starting# C: starting# A: done# C: done# B: doneThree tasks “ran concurrently” not because there’s threading, but because each one yielded at await asyncio.sleep(...), letting the loop interleave them.
This is the load-bearing fact: asyncio gives you concurrency, not parallelism. No CPU is being shared between cores. One thread doing many waits at once.
When to actually use it
asyncio shines when your bottleneck is I/O wait:
- HTTP requests to slow APIs
- Database queries
- File reads on slow disks
- Subprocess calls
- LLM tool-call workflows (this is huge — agentic systems are 90% IO wait)
asyncio does not help with:
- CPU-bound work (number crunching, image processing, data transformation)
- Pure-Python compute (held back by the GIL anyway)
For CPU work, use threads (if your hot path releases the GIL — numpy, scipy, most C extensions) or processes (if pure Python).
I/O bound CPU bound, releases GIL CPU bound, pure Python ───────── ─────────────────────── ──────────────────────asyncio bestthreads goodprocesses bestPattern 1: TaskGroup (3.11+) for structured concurrency
asyncio.gather was the old way. It has a tricky failure model: if one task raises, the others may or may not be cancelled depending on how you call it. TaskGroup fixes this:
async def fetch_all(urls: list[str]) -> list[bytes]: async with asyncio.TaskGroup() as tg: tasks = [tg.create_task(fetch(u)) for u in urls] # All tasks are awaited at the close of the with-block. # If any task raised, ALL others were cancelled, and we # exit the block with an ExceptionGroup. return [t.result() for t in tasks]Key properties:
- All tasks finish or all are cancelled. No half-done state.
- Errors come out as an
ExceptionGroupyou can pattern-match withexcept*. - The
withblock doesn’t exit until every task has settled.
try: results = await fetch_all(urls)except* TimeoutError as eg: log.warn(f"{len(eg.exceptions)} timeouts")except* ConnectionError as eg: log.error(f"{len(eg.exceptions)} connection errors")Default to TaskGroup for any gather-shaped use case in 3.11+.
Pattern 2: Bounded concurrency with Semaphore
Naive gather over 10,000 URLs will try to open 10,000 sockets at once. The OS will not be amused. Bound the concurrency:
import asyncio
async def fetch_with_limit(url: str, sem: asyncio.Semaphore) -> bytes: async with sem: return await fetch(url)
async def fetch_all_bounded(urls: list[str], max_in_flight: int = 50) -> list[bytes]: sem = asyncio.Semaphore(max_in_flight) async with asyncio.TaskGroup() as tg: tasks = [tg.create_task(fetch_with_limit(u, sem)) for u in urls] return [t.result() for t in tasks]The async with sem: blocks until a permit is available. Effectively a “max 50 concurrent” governor. Tune the number to your downstream’s tolerance.
Pattern 3: run_in_executor for blocking calls
If you must call a blocking library from an async function (because no async equivalent exists), don’t just call it. That freezes the event loop.
# Wrong: blocks the event loop, freezing every other taskasync def get_user_avatar(uid: int) -> bytes: img = PIL.Image.open(f"/cache/{uid}.png") # synchronous file read img = img.resize((128, 128)) # synchronous CPU work return img.tobytes()
# Right: offload to a thread, let the loop continueasync def get_user_avatar(uid: int) -> bytes: loop = asyncio.get_running_loop() return await loop.run_in_executor(None, _sync_resize, uid)
def _sync_resize(uid: int) -> bytes: img = PIL.Image.open(f"/cache/{uid}.png") img = img.resize((128, 128)) return img.tobytes()The None argument means “use the default thread pool”. For CPU-heavy work, pass a ProcessPoolExecutor instead so the GIL doesn’t hold you up.
Pattern 4: timeouts on every external call
Without a timeout, one slow API call can hang your whole pipeline indefinitely.
async def fetch_with_timeout(url: str, seconds: float = 5.0) -> bytes: async with asyncio.timeout(seconds): # 3.11+ return await fetch(url)Pre-3.11, use asyncio.wait_for:
async def fetch_with_timeout(url: str, seconds: float = 5.0) -> bytes: return await asyncio.wait_for(fetch(url), timeout=seconds)asyncio.timeout is preferred in modern code because it’s an async context manager, which composes more cleanly with TaskGroup and other context-managed resources.
Mistake 1: blocking calls inside async def
async def get_data(id: int): return requests.get(f"/data/{id}").json() # requests is sync, freezes the loopThis freezes the event loop for the duration of the HTTP call. Symptom: latency does not improve no matter how many tasks you gather.
Fix: use httpx or aiohttp (async-native).
Mistake 2: forgetting to await
async def main(): fetch(url) # returns a coroutine, never runsCoroutines are lazy: they don’t execute until awaited. The above creates a coroutine object that gets garbage-collected without running. Type checkers (pyright, mypy) catch this. Run them.
Mistake 3: using asyncio.run() inside an async function
async def outer(): result = asyncio.run(inner()) # RuntimeError: this event loop is already runningasyncio.run creates a new event loop. You cannot create one when one is already running. Inside an async function, just await:
async def outer(): result = await inner() # works, no nested loopasyncio.run is for the top of your program. Once. Never call it from inside async code.
Mistake 4: using one event loop forever in long-running services
Some web frameworks (FastAPI, Sanic) own the event loop for you. You don’t call asyncio.run; the framework does. Inside route handlers, just async def and await — the framework already has a loop running.
If you’re hand-rolling a long-running service, prefer asyncio.Runner over asyncio.run:
async def main(): while True: await tick()
with asyncio.Runner() as runner: runner.run(main())Runner (3.11+) gives you control over loop lifecycle and enables clean shutdown handlers.
What asyncio is bad at
A few things asyncio doesn’t solve, despite the marketing:
- Mixing sync and async libraries. Every blocking call has to be either replaced with an async equivalent or wrapped in
run_in_executor. There’s no automatic conversion. - Debugging. Stack traces in async code are uglier and tools are slower to surface deadlocks. Get comfortable with
asyncio.create_task(..., name="...")for traceability. - CPU work. Single thread, GIL-bound. Don’t use it for compute.
Closing
The mental model: cooperative scheduling on a single thread, yields at each await. The patterns: TaskGroup for structure, Semaphore for bounds, executor for blocking, timeout on every external call. The mistakes: blocking the loop, forgetting await, nesting asyncio.run, leaking event loops in long services.
Get those right and asyncio earns its keep for any IO-heavy workload, especially agentic systems where every tool call is a network round-trip.