post python · 2024-08-12 · 5 min read

asyncio in production: what works, what surprises, and what to skip

#python#async#concurrency#performance

asyncio confuses people because Python explains it backwards. The docs lead with async def and await syntax, when the thing you actually need to understand is the event loop. This post is the mental model that finally made it click for me, plus the four production patterns I reach for and the four mistakes I’ve stopped making.

The mental model

asyncio runs on a single thread, with cooperative scheduling. There is one event loop. It picks a task, runs it until that task hits an await, suspends it, and runs another task. No preemption: a task only yields when it explicitly does.

import asyncio
async def task(name, delay):
print(f"{name}: starting")
await asyncio.sleep(delay) # ← yields here
print(f"{name}: done")
async def main():
await asyncio.gather(
task("A", 1),
task("B", 2),
task("C", 1),
)
asyncio.run(main())
# Output (in some order):
# A: starting
# B: starting
# C: starting
# A: done
# C: done
# B: done

Three tasks “ran concurrently” not because there’s threading, but because each one yielded at await asyncio.sleep(...), letting the loop interleave them.

This is the load-bearing fact: asyncio gives you concurrency, not parallelism. No CPU is being shared between cores. One thread doing many waits at once.

When to actually use it

asyncio shines when your bottleneck is I/O wait:

asyncio does not help with:

For CPU work, use threads (if your hot path releases the GIL — numpy, scipy, most C extensions) or processes (if pure Python).

I/O bound CPU bound, releases GIL CPU bound, pure Python
───────── ─────────────────────── ──────────────────────
asyncio best
threads good
processes best

Pattern 1: TaskGroup (3.11+) for structured concurrency

asyncio.gather was the old way. It has a tricky failure model: if one task raises, the others may or may not be cancelled depending on how you call it. TaskGroup fixes this:

async def fetch_all(urls: list[str]) -> list[bytes]:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch(u)) for u in urls]
# All tasks are awaited at the close of the with-block.
# If any task raised, ALL others were cancelled, and we
# exit the block with an ExceptionGroup.
return [t.result() for t in tasks]

Key properties:

try:
results = await fetch_all(urls)
except* TimeoutError as eg:
log.warn(f"{len(eg.exceptions)} timeouts")
except* ConnectionError as eg:
log.error(f"{len(eg.exceptions)} connection errors")

Default to TaskGroup for any gather-shaped use case in 3.11+.

Pattern 2: Bounded concurrency with Semaphore

Naive gather over 10,000 URLs will try to open 10,000 sockets at once. The OS will not be amused. Bound the concurrency:

import asyncio
async def fetch_with_limit(url: str, sem: asyncio.Semaphore) -> bytes:
async with sem:
return await fetch(url)
async def fetch_all_bounded(urls: list[str], max_in_flight: int = 50) -> list[bytes]:
sem = asyncio.Semaphore(max_in_flight)
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_with_limit(u, sem)) for u in urls]
return [t.result() for t in tasks]

The async with sem: blocks until a permit is available. Effectively a “max 50 concurrent” governor. Tune the number to your downstream’s tolerance.

Pattern 3: run_in_executor for blocking calls

If you must call a blocking library from an async function (because no async equivalent exists), don’t just call it. That freezes the event loop.

# Wrong: blocks the event loop, freezing every other task
async def get_user_avatar(uid: int) -> bytes:
img = PIL.Image.open(f"/cache/{uid}.png") # synchronous file read
img = img.resize((128, 128)) # synchronous CPU work
return img.tobytes()
# Right: offload to a thread, let the loop continue
async def get_user_avatar(uid: int) -> bytes:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _sync_resize, uid)
def _sync_resize(uid: int) -> bytes:
img = PIL.Image.open(f"/cache/{uid}.png")
img = img.resize((128, 128))
return img.tobytes()

The None argument means “use the default thread pool”. For CPU-heavy work, pass a ProcessPoolExecutor instead so the GIL doesn’t hold you up.

Pattern 4: timeouts on every external call

Without a timeout, one slow API call can hang your whole pipeline indefinitely.

async def fetch_with_timeout(url: str, seconds: float = 5.0) -> bytes:
async with asyncio.timeout(seconds): # 3.11+
return await fetch(url)

Pre-3.11, use asyncio.wait_for:

async def fetch_with_timeout(url: str, seconds: float = 5.0) -> bytes:
return await asyncio.wait_for(fetch(url), timeout=seconds)

asyncio.timeout is preferred in modern code because it’s an async context manager, which composes more cleanly with TaskGroup and other context-managed resources.

Mistake 1: blocking calls inside async def

async def get_data(id: int):
return requests.get(f"/data/{id}").json() # requests is sync, freezes the loop

This freezes the event loop for the duration of the HTTP call. Symptom: latency does not improve no matter how many tasks you gather.

Fix: use httpx or aiohttp (async-native).

Mistake 2: forgetting to await

async def main():
fetch(url) # returns a coroutine, never runs

Coroutines are lazy: they don’t execute until awaited. The above creates a coroutine object that gets garbage-collected without running. Type checkers (pyright, mypy) catch this. Run them.

Mistake 3: using asyncio.run() inside an async function

async def outer():
result = asyncio.run(inner()) # RuntimeError: this event loop is already running

asyncio.run creates a new event loop. You cannot create one when one is already running. Inside an async function, just await:

async def outer():
result = await inner() # works, no nested loop

asyncio.run is for the top of your program. Once. Never call it from inside async code.

Mistake 4: using one event loop forever in long-running services

Some web frameworks (FastAPI, Sanic) own the event loop for you. You don’t call asyncio.run; the framework does. Inside route handlers, just async def and await — the framework already has a loop running.

If you’re hand-rolling a long-running service, prefer asyncio.Runner over asyncio.run:

async def main():
while True:
await tick()
with asyncio.Runner() as runner:
runner.run(main())

Runner (3.11+) gives you control over loop lifecycle and enables clean shutdown handlers.

What asyncio is bad at

A few things asyncio doesn’t solve, despite the marketing:

Closing

The mental model: cooperative scheduling on a single thread, yields at each await. The patterns: TaskGroup for structure, Semaphore for bounds, executor for blocking, timeout on every external call. The mistakes: blocking the loop, forgetting await, nesting asyncio.run, leaking event loops in long services.

Get those right and asyncio earns its keep for any IO-heavy workload, especially agentic systems where every tool call is a network round-trip.