Skip to main content
  • Company
    • About Us
    • Projects
    • Startup Lab
    • AI Solutions
    • Security Expertise
    • Contact
  • Knowledge
    • Blog
    • Research
hello@horizon-dynamics.tech
Horizon Dynamics
  1. Home
  2. Blog
  3. Real time data pipeline architecture
Company
  • About Us
  • Projects
  • Startup Lab
  • AI Solutions
  • Security Expertise
  • Contact
Contact Ushello@horizon-dynamics.tech
Horizon Dynamics
© 2013 - 2026 Horizon Dynamics LLC — All rights reserved.

Right Solution For True Ideas

Blog/Real-Time Data Pipeline Architecture: From Sensors to Dashboards
Engineering12 min read

Real-Time Data Pipeline Architecture: From Sensors to Dashboards

Oleksandr Melnychenko·February 10, 2026
Data PipelineReal-TimeIoTArchitectureStreaming

Why Batch Processing Isn't Enough

Traditional data pipelines run in batches: collect data for an hour, process it, load it into a warehouse, query it. This works for reporting. It doesn't work when a sensor reading from a tunnel ventilation system is abnormal and needs to trigger an alert in seconds, not hours.

Real-time data pipelines process events as they arrive. The architecture is fundamentally different from batch ETL — and the failure modes are different too.

This architecture is based on systems we've built for infrastructure monitoring (WestConnex tunnel project), energy grid management, and industrial IoT deployments processing 2–5 million events per hour.

Architecture Overview

A production real-time pipeline has four stages:

┌──────────┐    ┌──────────────┐    ┌──────────────┐    ┌──────────────┐
│  Ingest  │───►│   Process    │───►│    Store     │───►│   Serve      │
│          │    │              │    │              │    │              │
│ MQTT     │    │ Stream proc. │    │ Time-series  │    │ WebSocket    │
│ HTTP     │    │ Enrichment   │    │ Object store │    │ REST API     │
│ Kafka    │    │ Alerting     │    │ Cache layer  │    │ Dashboards   │
└──────────┘    └──────────────┘    └──────────────┘    └──────────────┘

Let's walk through each stage with the engineering decisions that matter.

Stage 1: Ingestion

Protocol Selection

Different data sources require different ingestion protocols:

  • MQTT — low-bandwidth IoT sensors, battery-powered devices. Lightweight, supports QoS levels, handles intermittent connectivity.
  • HTTP/REST — web applications, third-party APIs, webhook-based integrations. Familiar, well-tooled, but higher overhead per message.
  • gRPC — high-throughput service-to-service communication. Protobuf serialization is 3–5x more compact than JSON.
  • Kafka Producer API — when the data source can write directly to your message broker. Highest throughput, lowest latency.

The Message Broker

Every real-time pipeline needs a message broker between ingestion and processing. It serves three critical purposes:

  1. Decoupling — producers and consumers run independently
  2. Buffering — absorbs traffic spikes without data loss
  3. Replay — if a consumer fails, it can reprocess from a checkpoint
// Kafka topic configuration for sensor data
const topicConfig = {
  name: "sensor-readings",
  partitions: 12, // parallelism for consumers
  replicationFactor: 3, // durability
  config: {
    "retention.ms": 7 * 24 * 60 * 60 * 1000, // 7 days replay window
    "cleanup.policy": "delete",
    "compression.type": "lz4", // 60-70% size reduction
  },
};

Partitioning strategy matters. Partition by sensor ID or device ID so that all readings from the same sensor land in the same partition — this preserves ordering and enables stateful processing.

Backpressure Handling

When the processing layer is slower than the ingestion rate, you need backpressure:

// Rate limiting at the ingestion layer
const rateLimiter = new TokenBucket({
  capacity: 10000, // max burst
  refillRate: 5000, // sustained rate per second
});

async function handleIncoming(event: SensorReading) {
  if (!rateLimiter.tryConsume(1)) {
    // Buffer to disk or reject with retry-after header
    await diskBuffer.write(event);
    return { status: 429, retryAfter: 5 };
  }

  await kafka.produce("sensor-readings", event);
}

Never drop data silently. If you must shed load, log what was dropped, buffer to disk for later processing, or reject with a clear signal to the producer to retry. Silent data loss in monitoring systems is worse than temporary delays.

Stage 2: Stream Processing

Processing Patterns

Real-time processing isn't just "read message, write to database." Production pipelines do several things per event:

Validation and Cleaning

function validateReading(reading: SensorReading): ValidatedReading | null {
  // Reject impossible values
  if (reading.temperature < -273.15 || reading.temperature > 1000) {
    metrics.increment("readings.invalid", { reason: "out_of_range" });
    return null;
  }

  // Flag stale timestamps (sensor clock drift)
  const staleness = Date.now() - reading.timestamp;
  if (staleness > 5 * 60 * 1000) {
    reading.flags = [...(reading.flags || []), "stale_timestamp"];
  }

  return reading as ValidatedReading;
}

Enrichment — join the event with reference data:

// Enrich sensor reading with location and threshold data
async function enrichReading(reading: ValidatedReading) {
  const sensor = await sensorCache.get(reading.sensorId);

  return {
    ...reading,
    location: sensor.location,
    zone: sensor.zone,
    thresholds: sensor.alertThresholds,
    unit: sensor.measurementUnit,
  };
}

Windowed Aggregation — compute rolling statistics:

// 5-minute rolling average per sensor
const windowedAverage = readings
  .groupBy(r => r.sensorId)
  .window({ type: "tumbling", size: "5m" })
  .aggregate({
    avgValue: avg(r => r.value),
    maxValue: max(r => r.value),
    minValue: min(r => r.value),
    count: count(),
  });

Anomaly Detection — flag readings that deviate from expected patterns:

function detectAnomaly(reading: EnrichedReading, history: RollingStats) {
  const zScore = Math.abs(reading.value - history.mean) / history.stddev;

  if (zScore > 3) {
    return {
      type: "statistical_anomaly",
      severity: zScore > 5 ? "critical" : "warning",
      reading,
      context: { mean: history.mean, stddev: history.stddev, zScore },
    };
  }

  return null;
}

Alerting

Anomalies trigger alerts, but not every anomaly deserves an alert. Production alerting needs:

  • Deduplication — don't send 100 alerts for 100 readings from the same failing sensor
  • Cooldown periods — after alerting, suppress duplicates for N minutes
  • Escalation — if the condition persists, escalate from notification to page
  • Correlation — if 5 sensors in the same zone alert simultaneously, send one grouped alert
async function processAlert(anomaly: Anomaly) {
  const alertKey = `${anomaly.reading.sensorId}:${anomaly.type}`;

  // Check cooldown
  const lastAlert = await alertStore.getLastAlert(alertKey);
  if (lastAlert && Date.now() - lastAlert.timestamp < COOLDOWN_MS) {
    return; // suppress duplicate
  }

  // Check for correlated alerts in the same zone
  const zoneAlerts = await alertStore.getRecentAlerts({
    zone: anomaly.reading.zone,
    since: Date.now() - 60_000, // last minute
  });

  if (zoneAlerts.length >= CORRELATION_THRESHOLD) {
    await sendGroupedAlert(anomaly.reading.zone, zoneAlerts);
  } else {
    await sendAlert(anomaly);
  }

  await alertStore.recordAlert(alertKey, anomaly);
}

Stage 3: Storage

Real-time pipelines need multiple storage tiers because no single database handles all access patterns well.

Time-Series Database

For sensor readings and metric data, use a time-series database (TimescaleDB, InfluxDB, or QuestDB):

-- TimescaleDB hypertable for sensor readings
CREATE TABLE sensor_readings (
  time        TIMESTAMPTZ NOT NULL,
  sensor_id   TEXT NOT NULL,
  value       DOUBLE PRECISION NOT NULL,
  zone        TEXT,
  flags       TEXT[]
);

SELECT create_hypertable('sensor_readings', 'time');

-- Continuous aggregate for dashboard queries
CREATE MATERIALIZED VIEW sensor_hourly
WITH (timescaledb.continuous) AS
SELECT
  time_bucket('1 hour', time) AS bucket,
  sensor_id,
  avg(value) AS avg_value,
  max(value) AS max_value,
  min(value) AS min_value,
  count(*) AS reading_count
FROM sensor_readings
GROUP BY bucket, sensor_id;

Hot/Warm/Cold Tiering

Not all data needs the same access speed:

  • Hot (0–24 hours) — in Redis or memory. Sub-millisecond queries for real-time dashboards.
  • Warm (1–90 days) — in TimescaleDB or ClickHouse. Millisecond queries for recent history and alerting.
  • Cold (90+ days) — in object storage (S3) in Parquet format. Seconds to query, but pennies per GB to store.
┌──────────────────────────────────────────────────┐
│  Hot:  Redis / In-Memory       (last 24 hours)   │
│  ↓ age-based eviction                            │
│  Warm: TimescaleDB             (last 90 days)    │
│  ↓ compression + archival job                    │
│  Cold: S3 Parquet              (90+ days)        │
└──────────────────────────────────────────────────┘

Exactly-Once Semantics

Data loss is unacceptable in monitoring systems. Ensure exactly-once processing with:

  • Kafka consumer offsets — commit offsets only after successful processing and storage
  • Idempotent writes — use upserts keyed on (sensor_id, timestamp) so duplicate processing doesn't create duplicate records
  • Transaction boundaries — wrap processing + storage in a single transaction where possible

Stage 4: Serving

Real-Time Dashboards

For live dashboards showing current sensor states:

// WebSocket server pushing real-time updates
wss.on("connection", (ws, req) => {
  const zone = req.query.zone;

  // Send current state immediately
  const currentState = await cache.getZoneState(zone);
  ws.send(JSON.stringify({ type: "snapshot", data: currentState }));

  // Subscribe to updates for this zone
  const subscription = eventBus.subscribe(`zone:${zone}`, (update) => {
    ws.send(JSON.stringify({ type: "update", data: update }));
  });

  ws.on("close", () => subscription.unsubscribe());
});

Historical Queries

For dashboards showing trends and historical data, use pre-aggregated views:

// API endpoint for historical data
app.get("/api/sensors/:id/history", async (req, res) => {
  const { id } = req.params;
  const { from, to, resolution } = req.query;

  // Choose data source based on time range
  const source = selectDataSource(from, to);

  // Use pre-aggregated data for wide time ranges
  if (resolution === "hourly" || daysBetween(from, to) > 7) {
    return res.json(await source.queryAggregate(id, from, to, resolution));
  }

  // Use raw data for short time ranges
  return res.json(await source.queryRaw(id, from, to));
});

Performance Benchmarks

From our production deployments:

| Metric | Target | Achieved | |---|---|---| | Ingestion throughput | 50K events/sec | 75K events/sec | | End-to-end latency (p50) | < 500ms | 120ms | | End-to-end latency (p99) | < 2s | 850ms | | Data loss rate | 0% | 0% (verified over 6 months) | | Alert delivery time | < 30s | 8s average | | Dashboard refresh rate | 1s | 1s (WebSocket push) | | Storage cost (per TB/month) | — | $23 (hot) / $4 (cold) |

Operational Concerns

Monitoring the Monitor

Your data pipeline monitoring system needs its own monitoring. We use a heartbeat pattern:

  • Synthetic sensors send known values every 60 seconds
  • End-to-end checks verify these values appear in the dashboard within 30 seconds
  • If the heartbeat stops, the alerting system itself is down — alert via a completely independent channel (PagerDuty, SMS)

Schema Evolution

Sensor firmware updates change the data format. Your pipeline must handle:

  • New fields appearing in events (forward compatibility)
  • Old fields disappearing (backward compatibility)
  • Value range changes (recalibrate anomaly detection)

Use a schema registry (Confluent Schema Registry or AWS Glue) to version and validate event schemas.

Capacity Planning

Real-time pipelines have predictable capacity needs:

Daily events = sensors × readings_per_sensor_per_day
Storage/day  = daily_events × avg_event_size
Throughput   = daily_events / 86400 (peak = 3× average)

For a 10,000-sensor deployment at 1 reading/minute:

  • 14.4M events/day
  • ~7 GB raw data/day (500 bytes/event)
  • 167 events/second average, ~500/second peak

This is well within the capacity of a single Kafka broker and a moderately-sized TimescaleDB instance.

When to Build vs. Use Managed Services

Build your own pipeline when:

  • You need sub-second end-to-end latency
  • Your data contains sensitive information that can't leave your infrastructure
  • You need custom processing logic that managed services don't support
  • Event volume exceeds 100K/second (managed services become expensive)

Use managed services when:

  • You're processing fewer than 10K events/second
  • Standard aggregation and alerting rules suffice
  • Your team doesn't have stream processing expertise
  • Time to market matters more than per-event cost

We've built real-time data pipelines for infrastructure monitoring, energy management, and industrial IoT — systems that process millions of events daily with zero data loss and sub-second alerting. If you're designing a data pipeline and need it to work at scale, this is what we do.

Related Articles

Engineering10 min read

How Much Does Custom Software Development Cost in 2026?

An honest breakdown of what drives custom software pricing — from MVP to enterprise platform. Real ranges, not marketing fluff.

PricingSoftware DevelopmentMVP+2
Oleksandr MelnychenkoFeb 19, 2026
Modernization11 min read

The Legacy System Modernization Playbook: Strangler Fig, Not Big Bang

Rewriting from scratch is almost always wrong. Here's the incremental modernization approach we use to migrate enterprise systems without halting business operations.

Legacy SystemsModernizationArchitecture+2
Oleksandr MelnychenkoFeb 17, 2026
Engineering9 min read

What Makes Software Mission-Critical — And Why Most Agencies Can't Build It

Blood product logistics. Billion-dollar tunnel infrastructure. 2.5 million daily medication decisions. Here's what separates mission-critical software from everything else.

Mission-CriticalArchitectureReliability+2
Oleksandr MelnychenkoFeb 15, 2026
All Articles