post python series Β· Python idioms Β· 2025-04-30 Β· 6 min read

Python idioms I reach for daily, part 2: context managers beyond `with open()`

#python#context-managers#idioms#series-python-idioms

Part 2 of a 3-part series on Python idioms I reach for daily in AI / data engineering work. Β Β Part 1: decorators that earn their keep Β Β Part 2 (this post): context managers beyond with open() Β Β Part 3: generators for streaming and composition

Most Python developers learn with open(...) as f: and stop there. That covers maybe 5% of what context managers can do. The other 95% are situations where you have a resource β€” a database connection, a span in a tracing system, a temporary file, a lock β€” that must be set up and torn down even when the code in between raises.

This post is the five patterns I reach for in production AI/data work, with code.

A 60-second refresher

A context manager is anything with __enter__ and __exit__ methods. The with statement calls __enter__ on entry and guarantees __exit__ is called on exit, including when an exception propagates through.

class Timer:
def __enter__(self):
self.start = time.perf_counter()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.elapsed = time.perf_counter() - self.start
with Timer() as t:
do_work()
print(f"took {t.elapsed} sec")

99% of the time, contextlib.contextmanager saves you the boilerplate:

from contextlib import contextmanager
@contextmanager
def timer():
start = time.perf_counter()
try:
yield # control returns to the `with` body here
finally:
elapsed = time.perf_counter() - start
print(f"took {elapsed} sec")
with timer():
do_work()

The try / yield / finally shape is the entire pattern. Everything below is variations.

Pattern 1: scoped configuration changes

You want to temporarily change a setting (a logging level, a numpy print precision, a feature flag) for a block of code, then restore the original.

from contextlib import contextmanager
import logging
@contextmanager
def log_level(logger: logging.Logger, level: int):
"""Temporarily set the logger to `level` for the duration of the block."""
original = logger.level
logger.setLevel(level)
try:
yield logger
finally:
logger.setLevel(original)

Use:

with log_level(my_logger, logging.DEBUG):
something_chatty()
# logger goes back to whatever it was before, even if `something_chatty` raised

The pattern: capture original, set new, restore on exit. Works for any kind of β€œset/unset” pair: environment variables, working directory (os.chdir), CWD-sensitive libraries, monkey-patches.

Pattern 2: database transactions

The single most-used context manager in any code that touches a database:

from contextlib import contextmanager
from sqlalchemy.orm import Session
@contextmanager
def transaction(session: Session):
"""Commit on success, rollback on any exception."""
try:
yield session
session.commit()
except Exception:
session.rollback()
raise

Use:

with transaction(session) as s:
user = s.query(User).get(uid)
user.email = new_email
s.add(AuditLog(user_id=uid, action="email_changed"))
# auto-commit here. If anything raised inside the block, auto-rollback.

Three guarantees the pattern delivers:

This same shape works for any β€œmust commit / rollback” semantic β€” message queue acknowledgements, distributed locks, two-phase resource grabs.

Pattern 3: ExitStack for dynamic numbers of resources

You need to open N files, M connections, or both β€” and N and M are determined at runtime. Nesting with statements doesn’t help when N is unknown. contextlib.ExitStack does:

from contextlib import ExitStack
def merge_files(paths: list[str], output: str):
with ExitStack() as stack:
# Open all input files; ExitStack closes them all on exit
files = [stack.enter_context(open(p, "r")) for p in paths]
with open(output, "w") as out:
for f in files:
out.write(f.read())

ExitStack is essentially a deferred-cleanup queue. Each stack.enter_context(...) registers a context manager; when the with ExitStack() block exits, every registered manager is exited in reverse order (LIFO).

Use cases I’ve reached for it:

Pattern 4: async context managers

For anything that needs await during setup or teardown, use __aenter__/__aexit__ and the async with syntax:

from contextlib import asynccontextmanager
import httpx
@asynccontextmanager
async def http_client(timeout: float = 5.0):
"""An httpx client that's awaited-closed on exit."""
async with httpx.AsyncClient(timeout=timeout) as client:
yield client

Use:

async def fetch_user(uid: int) -> dict:
async with http_client() as client:
response = await client.get(f"/users/{uid}")
response.raise_for_status()
return response.json()

The httpx.AsyncClient is itself an async context manager that handles connection pooling. Wrapping it lets you build a project-specific factory (with shared timeouts, retries, headers) that callers use uniformly.

For database transactions in async code, the pattern transfers exactly:

@asynccontextmanager
async def async_transaction(session):
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise

Pattern 5: suppress (when an exception is expected)

contextlib.suppress is for the case where you genuinely want to ignore a specific exception:

from contextlib import suppress
import os
def safe_remove(path: str):
with suppress(FileNotFoundError):
os.remove(path)

Equivalent to try: ... except FileNotFoundError: pass but more declarative β€” the intent (β€œthis exception is expected and should be silenced”) shows up in the with statement, not buried in an except: pass that often looks like an oversight.

Use sparingly. If you find yourself suppressing exceptions in production code, the question to ask is β€œis the exception type really part of the contract here?” Sometimes yes (file already gone, key already deleted from a cache). Often no, and the right fix is to not throw in the first place.

Pattern 6 (bonus): the redirect family

contextlib.redirect_stdout and redirect_stderr are handy for the β€œI want to capture print output without modifying the function I’m calling”:

from contextlib import redirect_stdout
from io import StringIO
def capture_legacy_output(legacy_fn, *args, **kwargs):
buf = StringIO()
with redirect_stdout(buf):
legacy_fn(*args, **kwargs)
return buf.getvalue()

Useful for testing legacy code that prints rather than returns, or for capturing a CLI tool’s output for downstream parsing. Niche but cleaner than monkey-patching sys.stdout.

Combining patterns: a real example

A realistic production snippet combining several of the above. An async function that opens an HTTP client, holds a database transaction, times the whole thing, and uses a feature-flag scope:

async def process_user_payment(uid: int, amount: float):
async with (
timer() as t, # pattern 1, async-friendly
http_client() as client, # pattern 4
async_transaction(session) as s, # pattern 2 (async variant)
log_level(payments_logger, logging.DEBUG), # pattern 1
):
user = await s.get(User, uid)
response = await client.post("/charge", json={"uid": uid, "amount": amount})
s.add(PaymentLog(uid=uid, amount=amount, status=response.status_code))
# if any of the above raised: client closed, transaction rolled back, log restored

The async with (a, b, c, d): parenthesised syntax (Python 3.10+) lets you stack context managers cleanly without async with a:\n async with b:\n ... indentation hell.

When NOT to write a context manager

What I no longer do

Closing

Five patterns: scoped config, transactions, ExitStack, async, suppress. Each one removes a specific class of β€œdid you remember to clean up?” bug from your codebase. The mental model is the try / yield / finally shape; once internalised, every new resource you introduce (β€œhow do I make sure this Kafka consumer always closes its connection?”) becomes obvious.

Next post in the series: generators for streaming and composition β€” how yield lets you build pipelines that don’t load everything into memory and read like a sequence of operations rather than a loop.