Real-Time Data Pipeline Architecture: From Sensors to Dashboards
Why Batch Processing Isn't Enough
Traditional data pipelines run in batches: collect data for an hour, process it, load it into a warehouse, query it. This works for reporting. It doesn't work when a sensor reading from a tunnel ventilation system is abnormal and needs to trigger an alert in seconds, not hours.
Real-time data pipelines process events as they arrive. The architecture is fundamentally different from batch ETL — and the failure modes are different too.
This architecture is based on systems we've built for infrastructure monitoring (WestConnex tunnel project), energy grid management, and industrial IoT deployments processing 2–5 million events per hour.
Architecture Overview
A production real-time pipeline has four stages:
┌──────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Ingest │───►│ Process │───►│ Store │───►│ Serve │
│ │ │ │ │ │ │ │
│ MQTT │ │ Stream proc. │ │ Time-series │ │ WebSocket │
│ HTTP │ │ Enrichment │ │ Object store │ │ REST API │
│ Kafka │ │ Alerting │ │ Cache layer │ │ Dashboards │
└──────────┘ └──────────────┘ └──────────────┘ └──────────────┘
Let's walk through each stage with the engineering decisions that matter.
Stage 1: Ingestion
Protocol Selection
Different data sources require different ingestion protocols:
- MQTT — low-bandwidth IoT sensors, battery-powered devices. Lightweight, supports QoS levels, handles intermittent connectivity.
- HTTP/REST — web applications, third-party APIs, webhook-based integrations. Familiar, well-tooled, but higher overhead per message.
- gRPC — high-throughput service-to-service communication. Protobuf serialization is 3–5x more compact than JSON.
- Kafka Producer API — when the data source can write directly to your message broker. Highest throughput, lowest latency.
The Message Broker
Every real-time pipeline needs a message broker between ingestion and processing. It serves three critical purposes:
- Decoupling — producers and consumers run independently
- Buffering — absorbs traffic spikes without data loss
- Replay — if a consumer fails, it can reprocess from a checkpoint
// Kafka topic configuration for sensor data const topicConfig = { name: "sensor-readings", partitions: 12, // parallelism for consumers replicationFactor: 3, // durability config: { "retention.ms": 7 * 24 * 60 * 60 * 1000, // 7 days replay window "cleanup.policy": "delete", "compression.type": "lz4", // 60-70% size reduction }, };
Partitioning strategy matters. Partition by sensor ID or device ID so that all readings from the same sensor land in the same partition — this preserves ordering and enables stateful processing.
Backpressure Handling
When the processing layer is slower than the ingestion rate, you need backpressure:
// Rate limiting at the ingestion layer const rateLimiter = new TokenBucket({ capacity: 10000, // max burst refillRate: 5000, // sustained rate per second }); async function handleIncoming(event: SensorReading) { if (!rateLimiter.tryConsume(1)) { // Buffer to disk or reject with retry-after header await diskBuffer.write(event); return { status: 429, retryAfter: 5 }; } await kafka.produce("sensor-readings", event); }
Never drop data silently. If you must shed load, log what was dropped, buffer to disk for later processing, or reject with a clear signal to the producer to retry. Silent data loss in monitoring systems is worse than temporary delays.
Stage 2: Stream Processing
Processing Patterns
Real-time processing isn't just "read message, write to database." Production pipelines do several things per event:
Validation and Cleaning
function validateReading(reading: SensorReading): ValidatedReading | null { // Reject impossible values if (reading.temperature < -273.15 || reading.temperature > 1000) { metrics.increment("readings.invalid", { reason: "out_of_range" }); return null; } // Flag stale timestamps (sensor clock drift) const staleness = Date.now() - reading.timestamp; if (staleness > 5 * 60 * 1000) { reading.flags = [...(reading.flags || []), "stale_timestamp"]; } return reading as ValidatedReading; }
Enrichment — join the event with reference data:
// Enrich sensor reading with location and threshold data async function enrichReading(reading: ValidatedReading) { const sensor = await sensorCache.get(reading.sensorId); return { ...reading, location: sensor.location, zone: sensor.zone, thresholds: sensor.alertThresholds, unit: sensor.measurementUnit, }; }
Windowed Aggregation — compute rolling statistics:
// 5-minute rolling average per sensor const windowedAverage = readings .groupBy(r => r.sensorId) .window({ type: "tumbling", size: "5m" }) .aggregate({ avgValue: avg(r => r.value), maxValue: max(r => r.value), minValue: min(r => r.value), count: count(), });
Anomaly Detection — flag readings that deviate from expected patterns:
function detectAnomaly(reading: EnrichedReading, history: RollingStats) { const zScore = Math.abs(reading.value - history.mean) / history.stddev; if (zScore > 3) { return { type: "statistical_anomaly", severity: zScore > 5 ? "critical" : "warning", reading, context: { mean: history.mean, stddev: history.stddev, zScore }, }; } return null; }
Alerting
Anomalies trigger alerts, but not every anomaly deserves an alert. Production alerting needs:
- Deduplication — don't send 100 alerts for 100 readings from the same failing sensor
- Cooldown periods — after alerting, suppress duplicates for N minutes
- Escalation — if the condition persists, escalate from notification to page
- Correlation — if 5 sensors in the same zone alert simultaneously, send one grouped alert
async function processAlert(anomaly: Anomaly) { const alertKey = `${anomaly.reading.sensorId}:${anomaly.type}`; // Check cooldown const lastAlert = await alertStore.getLastAlert(alertKey); if (lastAlert && Date.now() - lastAlert.timestamp < COOLDOWN_MS) { return; // suppress duplicate } // Check for correlated alerts in the same zone const zoneAlerts = await alertStore.getRecentAlerts({ zone: anomaly.reading.zone, since: Date.now() - 60_000, // last minute }); if (zoneAlerts.length >= CORRELATION_THRESHOLD) { await sendGroupedAlert(anomaly.reading.zone, zoneAlerts); } else { await sendAlert(anomaly); } await alertStore.recordAlert(alertKey, anomaly); }
Stage 3: Storage
Real-time pipelines need multiple storage tiers because no single database handles all access patterns well.
Time-Series Database
For sensor readings and metric data, use a time-series database (TimescaleDB, InfluxDB, or QuestDB):
-- TimescaleDB hypertable for sensor readings CREATE TABLE sensor_readings ( time TIMESTAMPTZ NOT NULL, sensor_id TEXT NOT NULL, value DOUBLE PRECISION NOT NULL, zone TEXT, flags TEXT[] ); SELECT create_hypertable('sensor_readings', 'time'); -- Continuous aggregate for dashboard queries CREATE MATERIALIZED VIEW sensor_hourly WITH (timescaledb.continuous) AS SELECT time_bucket('1 hour', time) AS bucket, sensor_id, avg(value) AS avg_value, max(value) AS max_value, min(value) AS min_value, count(*) AS reading_count FROM sensor_readings GROUP BY bucket, sensor_id;
Hot/Warm/Cold Tiering
Not all data needs the same access speed:
- Hot (0–24 hours) — in Redis or memory. Sub-millisecond queries for real-time dashboards.
- Warm (1–90 days) — in TimescaleDB or ClickHouse. Millisecond queries for recent history and alerting.
- Cold (90+ days) — in object storage (S3) in Parquet format. Seconds to query, but pennies per GB to store.
┌──────────────────────────────────────────────────┐
│ Hot: Redis / In-Memory (last 24 hours) │
│ ↓ age-based eviction │
│ Warm: TimescaleDB (last 90 days) │
│ ↓ compression + archival job │
│ Cold: S3 Parquet (90+ days) │
└──────────────────────────────────────────────────┘
Exactly-Once Semantics
Data loss is unacceptable in monitoring systems. Ensure exactly-once processing with:
- Kafka consumer offsets — commit offsets only after successful processing and storage
- Idempotent writes — use upserts keyed on
(sensor_id, timestamp)so duplicate processing doesn't create duplicate records - Transaction boundaries — wrap processing + storage in a single transaction where possible
Stage 4: Serving
Real-Time Dashboards
For live dashboards showing current sensor states:
// WebSocket server pushing real-time updates wss.on("connection", (ws, req) => { const zone = req.query.zone; // Send current state immediately const currentState = await cache.getZoneState(zone); ws.send(JSON.stringify({ type: "snapshot", data: currentState })); // Subscribe to updates for this zone const subscription = eventBus.subscribe(`zone:${zone}`, (update) => { ws.send(JSON.stringify({ type: "update", data: update })); }); ws.on("close", () => subscription.unsubscribe()); });
Historical Queries
For dashboards showing trends and historical data, use pre-aggregated views:
// API endpoint for historical data app.get("/api/sensors/:id/history", async (req, res) => { const { id } = req.params; const { from, to, resolution } = req.query; // Choose data source based on time range const source = selectDataSource(from, to); // Use pre-aggregated data for wide time ranges if (resolution === "hourly" || daysBetween(from, to) > 7) { return res.json(await source.queryAggregate(id, from, to, resolution)); } // Use raw data for short time ranges return res.json(await source.queryRaw(id, from, to)); });
Performance Benchmarks
From our production deployments:
| Metric | Target | Achieved | |---|---|---| | Ingestion throughput | 50K events/sec | 75K events/sec | | End-to-end latency (p50) | < 500ms | 120ms | | End-to-end latency (p99) | < 2s | 850ms | | Data loss rate | 0% | 0% (verified over 6 months) | | Alert delivery time | < 30s | 8s average | | Dashboard refresh rate | 1s | 1s (WebSocket push) | | Storage cost (per TB/month) | — | $23 (hot) / $4 (cold) |
Operational Concerns
Monitoring the Monitor
Your data pipeline monitoring system needs its own monitoring. We use a heartbeat pattern:
- Synthetic sensors send known values every 60 seconds
- End-to-end checks verify these values appear in the dashboard within 30 seconds
- If the heartbeat stops, the alerting system itself is down — alert via a completely independent channel (PagerDuty, SMS)
Schema Evolution
Sensor firmware updates change the data format. Your pipeline must handle:
- New fields appearing in events (forward compatibility)
- Old fields disappearing (backward compatibility)
- Value range changes (recalibrate anomaly detection)
Use a schema registry (Confluent Schema Registry or AWS Glue) to version and validate event schemas.
Capacity Planning
Real-time pipelines have predictable capacity needs:
Daily events = sensors × readings_per_sensor_per_day
Storage/day = daily_events × avg_event_size
Throughput = daily_events / 86400 (peak = 3× average)
For a 10,000-sensor deployment at 1 reading/minute:
- 14.4M events/day
- ~7 GB raw data/day (500 bytes/event)
- 167 events/second average, ~500/second peak
This is well within the capacity of a single Kafka broker and a moderately-sized TimescaleDB instance.
When to Build vs. Use Managed Services
Build your own pipeline when:
- You need sub-second end-to-end latency
- Your data contains sensitive information that can't leave your infrastructure
- You need custom processing logic that managed services don't support
- Event volume exceeds 100K/second (managed services become expensive)
Use managed services when:
- You're processing fewer than 10K events/second
- Standard aggregation and alerting rules suffice
- Your team doesn't have stream processing expertise
- Time to market matters more than per-event cost
We've built real-time data pipelines for infrastructure monitoring, energy management, and industrial IoT — systems that process millions of events daily with zero data loss and sub-second alerting. If you're designing a data pipeline and need it to work at scale, this is what we do.