Environment Config Loader
Merge JSON config defaults with environment variable overrides and automatic type casting.
A lightweight event system with priority-based listeners and error isolation per handler.
class EventDispatcher
{
private array $listeners = [];
public function listen(string $event, callable $listener, int $priority = 0): void
{
$this->listeners[$event][] = [
'callback' => $listener,
'priority' => $priority,
];
}
public function dispatch(string $event, array $payload = []): void
{
if (!isset($this->listeners[$event])) {
return;
}
// Sort by priority (highest first)
$sorted = $this->listeners[$event];
usort($sorted, fn($a, $b) => $b['priority'] <=> $a['priority']);
foreach ($sorted as $listener) {
try {
$listener['callback']($payload);
} catch (Throwable $e) {
// Log but do not crash — observability must not kill the pipeline
error_log("Listener failed for {$event}: " . $e->getMessage());
}
}
}
}
Let us trace what happens when a pipeline dispatches an event with three listeners registered:
dispatch("record.cleaned", ["id" => 123, "field" => "phone"])
Step 1: Find listeners for "record.cleaned"
→ Found 3 listeners
Step 2: Sort by priority
→ Logger (priority: 100) — runs first
→ Metrics (priority: 50) — runs second
→ Slack (priority: 10) — runs last
Step 3: Execute in order
→ Logger: writes to file ✓
→ Metrics: increments counter ✓
→ Slack: THROWS (Slack is down)
→ Caught, logged, pipeline continues ✓
The try/catch around each listener is the most important detail. Without it, a broken Slack notification would crash your entire data pipeline. Error isolation ensures observability never takes down the system it is observing.
$events = new EventDispatcher();
// Register listeners with priorities
$events->listen('pipeline.started', function ($data) {
echo "[LOG] Pipeline started: {$data['name']}\n";
}, priority: 100);
$events->listen('pipeline.started', function ($data) {
Metrics::increment('pipeline.starts');
}, priority: 50);
// Dispatch events from your pipeline
$events->dispatch('pipeline.started', ['name' => 'customer-sync']);
$events->dispatch('record.processed', ['id' => 123, 'status' => 'clean']);
This is the same pattern used in frameworks like Symfony and Laravel. Start simple with logging, then add metrics, alerting, and distributed tracing as listeners — without changing your pipeline code.