project · 2022-2024
ETL template for API analytics pipelines
Refactored the inconsistent set of ETL pipelines feeding TomTom's API analytics into a single OOP template, distributed as an internal Python package. Engineers subclass the base, get live Azure Data Explorer connections for free, and only write the business logic in extract / transform / load. Result: consistent, version-controlled pipelines across volume, response-time, and error-rate use cases.
The ingestion infrastructure (Azure EventHub feeding Azure Data Explorer) was already in place when I joined the API analytics work. So were the ETL pipelines that landed volume, response-time, and error-rate tables in ADX. The problem was that every pipeline had been written by a different person, against a different table, with a different style. Hard to read, harder to maintain, and impossible for a new engineer to extend without spending a week tracing what every pipeline was doing.
I refactored the lot of them into a single OOP template, packaged it as an internal Python library, and got every API analytics pipeline onto the same shape.
Sister project to the Developer-portal analytics APIs. The APIs sit on top of the ADX tables that this template populates. The template is what made the analytics APIs feasible: consistent table shapes, predictable upstream behaviour, no per-pipeline surprises.
The problem
Every pipeline in the API analytics surface had grown organically:
- One read connection-string from an env var, another from Key Vault, a third hard-coded.
- One had retry logic, another did not.
- Transformation lived inline in some, in helper modules in others, in notebooks in a third.
- Failure-handling was inconsistent: some swallowed errors, some raised, some logged.
The result was that the behaviour of any one pipeline was unguessable from reading another. New use cases meant copy-paste from whichever pipeline you happened to find first.
The fix, an OOP ETL template
I wrote a small BaseETL class as the contract every pipeline must implement, packaged it as a private wheel, and migrated every existing pipeline onto it.
The base class owns the plumbing every pipeline needed but had been writing inconsistently:
- ADX connection management: live cluster + database + auth wired up automatically when the subclass instantiates. The subclass never sees a connection string; it just calls
self.adx.execute(...). - Retry + back-off: standard policy for transient ADX failures, applied uniformly.
- Logging: structured logs with run id, pipeline name, stage. Same shape everywhere so dashboards work cross-pipeline.
- Schema validation: outputs validated against the target table’s schema before the load step writes.
- Run lifecycle:
pre_run,run,post_runhooks so common housekeeping (lock acquisition, run-id generation, completion marker) lives in one place.
A new pipeline becomes three methods:
class VolumeReportETL(BaseETL): def extract(self): # ADX connection is already wired up by the base class. return self.adx.execute(KQL_VOLUME_QUERY)
def transform(self, df): # Custom shaping for this use case. return (df .pipe(_normalize_endpoint_paths) .pipe(_add_product_dim) .pipe(_aggregate_by_day_endpoint))
def load(self, df): # Target table picked by the subclass; the base class handles the # actual write, schema check, and idempotent upsert. self.write_table('analytics_volume_daily', df)That is the entire pipeline. A new data engineer joining the team can read this in 30 seconds and understand what is happening, because every pipeline they will ever look at on this team has the same three methods.
Why this matters in practice
- Consistency: every API analytics pipeline now produces the same log shape, same failure semantics, same retry behaviour.
- Onboarding: a new engineer ships their first pipeline on day three, not week three. The path is
pip install, subclass, write three methods. - Version control: the template is a versioned package. A breaking change is a major-version bump and a deliberate migration, not a silent break.
- Tested at the base, not at the leaf: retry / connection / logging logic has unit tests once. Subclasses only test their three methods, which are the only things specific to that pipeline.
- Cross-pipeline observability: because every pipeline emits the same structured logs, one dashboard tracks run latency / failure rate / row counts across the whole API analytics surface.
What this unlocked
After every pipeline was on the template, the analytics APIs that power the developer-portal dashboards became feasible to build. Before the refactor, each API endpoint had to know the quirks of whichever underlying pipeline produced its table. After the refactor, the API layer could trust that every analytics table behaved the same way, was schema-validated, and was up-to-date on the same cadence. The refactor lifted the floor; the APIs were the next floor up.
Stack
- Python, packaged as a versioned wheel and distributed via internal index.
- Azure Data Explorer (Kusto) for both source and target tables.
- Pandas for the transform stage; KQL for everything that can be done server-side.
- GitHub Actions for CI: tests on the base class, schema-validation on subclasses, automated wheel publishing.
Why this earns a spot in projects
This was not glamorous infrastructure. It was a refactor of code that already worked, into code that was easier to keep working. Six months in, the team had stopped writing one-off pipelines because the template was the path of least resistance. New use cases shipped faster. Outages got debugged faster because logs looked the same everywhere. The refactor paid for itself within the quarter and kept paying afterwards. That is the bar for this kind of work: make the right thing the easy thing.