Lightweight Event Bus
A pub-sub system for decoupled component communication with auto-unsubscribe and error isolation.
Measure and log execution time of any code block using Python with statement, with automatic error tracking.
import time
from contextlib import contextmanager
@contextmanager
def timed_block(label="operation"):
start = time.perf_counter()
metrics = {"label": label, "elapsed": 0, "success": False}
try:
yield metrics
metrics["success"] = True
except Exception as e:
metrics["error"] = str(e)
raise
finally:
metrics["elapsed"] = round(time.perf_counter() - start, 4)
status = "OK" if metrics["success"] else "FAILED"
print(f"[{label}] {status} in {metrics['elapsed']}s")
Let us trace what happens when you wrap a database query with the context manager:
with timed_block("db_query") as metrics:
result = db.execute("SELECT * FROM users")
Step 1: Python calls timed_block("db_query")
→ @contextmanager turns the generator into a context manager
→ start = time.perf_counter() → captures high-resolution timestamp
→ metrics dict created: {"label": "db_query", "elapsed": 0, "success": False}
Step 2: yield metrics
→ Python pauses the generator HERE
→ Control passes to the `with` block body
→ `metrics` variable is bound to `as metrics`
Step 3: db.execute("SELECT * FROM users") runs
→ Takes 0.0342 seconds
Step 4: `with` block ends normally
→ Python resumes the generator after yield
→ metrics["success"] = True
Step 5: finally block runs (ALWAYS, even on error)
→ elapsed = perf_counter() - start = 0.0342
→ prints: [db_query] OK in 0.0342s
If Step 3 had raised an exception:
→ metrics["error"] = "connection refused"
→ raise re-throws the exception (caller still sees it)
→ finally: prints [db_query] FAILED in 0.0012s
The yield keyword is what makes this work. Everything before yield runs on entering the with block. Everything after yield runs on exiting — whether the block succeeded or raised an exception. The finally block guarantees the timing log always prints, even during errors. This is the same enter/exit pattern as __enter__ and __exit__ methods, but with cleaner syntax.
Manual timing (scattered, error-prone):
start = time.time()
try:
result = db.execute(query)
print(f"Query took {time.time() - start}s")
except Exception:
print(f"Query failed after {time.time() - start}s")
raise
Repeated in every function → inconsistent, easy to forget
Context manager (clean, reusable):
with timed_block("query"):
result = db.execute(query)
One line wraps any operation → consistent logging everywhere
# Time a database query
with timed_block("user_query") as m:
users = db.execute("SELECT * FROM users WHERE active = true")
# prints: [user_query] OK in 0.0342s
# Time an API call and access metrics after
with timed_block("api_call") as m:
response = requests.get("https://api.example.com/data")
print(f"API responded in {m['elapsed']}s")
# Time an ETL pipeline phase
with timed_block("extract_phase") as m:
records = extract_from_source(config)
with timed_block("transform_phase") as m:
cleaned = transform_records(records)
with timed_block("load_phase") as m:
load_to_warehouse(cleaned)
# Nest them for granular profiling
with timed_block("full_pipeline"):
with timed_block("extract"):
data = extract()
with timed_block("transform"):
data = transform(data)
Use this pattern wherever you need to measure execution time: database queries, API calls, ETL phases, file processing, ML model inference. The yielded metrics dict lets you access timing data after the block — useful for sending metrics to monitoring systems. For production, replace the print with your logging framework or metrics client.