Real-Time Data Monitoring Pipelines for AI Systems
Learn how to build scalable real-time data pipelines for AI systems using PostgreSQL, Supabase, and WebSockets. A practical guide to streaming analytics and ETL automation.

The Real-Time Data Challenge
You've built an AI system—maybe a computer vision model detecting objects, an LLM processing user queries, or a recommendation engine. It's running in production, handling requests, generating predictions. But here's the question that keeps you up at night: is it actually working?
Batch analytics won't tell you when your model suddenly starts hallucinating. Daily reports don't catch the performance degradation happening right now. You need real-time visibility: streaming telemetry, live dashboards, instant alerts when something goes wrong.
This is where data monitoring pipelines come in. Not the heavy enterprise solutions requiring a dedicated team to operate, but lightweight, cloud-native systems you can build and maintain yourself. Let's explore how.
Anatomy of a Real-Time Pipeline
A monitoring pipeline has four key stages:
1. Ingestion: Getting Data In
Your AI system generates events continuously: API requests, model predictions, error logs, performance metrics. These need to flow into your pipeline with minimal latency and overhead. The ingestion layer handles this collection.
For most applications, a simple REST endpoint works perfectly. Your service makes a POST request with telemetry data. No message queues, no complex streaming infrastructure—just HTTP. It's debuggable, has no special dependencies, and scales well enough for the majority of use cases.
2. Storage: PostgreSQL as Your Data Backbone
PostgreSQL is the underrated hero of real-time pipelines. People assume you need specialized time-series databases, but Postgres handles streaming data beautifully—especially with proper indexing and partitioning.
Why Postgres? ACID guarantees, rich querying capabilities, excellent JSON support, built-in full-text search, and native pub/sub with LISTEN/NOTIFY. Plus, if you're using Supabase, you get a managed Postgres instance with real-time subscriptions out of the box.
3. Processing: Transform and Enrich
Raw telemetry needs transformation: aggregating metrics, calculating rolling averages, detecting anomalies, enriching with metadata. This processing can happen in real-time (as data arrives) or on-demand (when queries run).
Database triggers handle simple transformations efficiently. For complex logic, lightweight workers pull events, process them, and write results back. The key is keeping processing close to the data—minimize network hops.
4. Delivery: Getting Insights to Users
Live dashboards require streaming updates—no polling, no refresh buttons. WebSockets deliver database changes instantly to connected clients. When a new metric arrives, all dashboards update in real-time. It's the difference between monitoring and actually seeing what's happening.
Building the Pipeline: A Practical Example
Let's build a real-time monitoring system for a computer vision API. We'll track request volume, inference latency, model confidence scores, and error rates—all updating live on a dashboard.
Setting Up the Database Schema
Start with a clean schema design optimized for time-series queries:
-- Table for inference telemetry
CREATE TABLE inference_logs (
id BIGSERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
model_version TEXT NOT NULL,
inference_time_ms NUMERIC(10, 2) NOT NULL,
confidence_score NUMERIC(5, 4),
input_size_bytes INTEGER,
prediction_class TEXT,
is_error BOOLEAN DEFAULT FALSE,
error_message TEXT,
metadata JSONB
);
-- Index for time-range queries (most common operation)
CREATE INDEX idx_inference_logs_timestamp
ON inference_logs (timestamp DESC);
-- Index for model version filtering
CREATE INDEX idx_inference_logs_model
ON inference_logs (model_version, timestamp DESC);
-- Composite index for error queries
CREATE INDEX idx_inference_logs_errors
ON inference_logs (is_error, timestamp DESC)
WHERE is_error = TRUE;
-- Enable automatic partitioning for older data
-- (keeps recent queries fast)
SELECT create_hypertable('inference_logs', 'timestamp',
chunk_time_interval => INTERVAL '1 day');
Ingestion Endpoint in Python
A lightweight FastAPI endpoint handles telemetry ingestion:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncpg
from datetime import datetime
from typing import Optional
app = FastAPI()
# Database connection pool
db_pool = None
class InferenceTelemetry(BaseModel):
model_version: str
inference_time_ms: float
confidence_score: Optional[float] = None
input_size_bytes: int
prediction_class: Optional[str] = None
is_error: bool = False
error_message: Optional[str] = None
metadata: Optional[dict] = None
@app.on_event("startup")
async def startup():
global db_pool
db_pool = await asyncpg.create_pool(
host="your-supabase-host.supabase.co",
database="postgres",
user="postgres",
password="your-password",
min_size=5,
max_size=20
)
@app.post("/telemetry/inference")
async def log_inference(telemetry: InferenceTelemetry):
"""Ingest inference telemetry with minimal latency"""
try:
async with db_pool.acquire() as conn:
await conn.execute("""
INSERT INTO inference_logs (
model_version, inference_time_ms, confidence_score,
input_size_bytes, prediction_class, is_error,
error_message, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
""",
telemetry.model_version,
telemetry.inference_time_ms,
telemetry.confidence_score,
telemetry.input_size_bytes,
telemetry.prediction_class,
telemetry.is_error,
telemetry.error_message,
telemetry.metadata
)
return {"status": "logged"}
except Exception as e:
# Never let telemetry failures crash the main service
print(f"Telemetry error: {e}")
raise HTTPException(status_code=500, detail="Logging failed")
Real-Time Aggregation with PostgreSQL Views
Create materialized views for common metrics queries:
-- Real-time metrics for the last hour
CREATE MATERIALIZED VIEW inference_metrics_hourly AS
SELECT
date_trunc('minute', timestamp) AS minute,
model_version,
COUNT(*) AS request_count,
AVG(inference_time_ms) AS avg_latency_ms,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY inference_time_ms) AS p95_latency_ms,
AVG(confidence_score) AS avg_confidence,
SUM(CASE WHEN is_error THEN 1 ELSE 0 END) AS error_count,
ROUND(100.0 * SUM(CASE WHEN is_error THEN 1 ELSE 0 END) / COUNT(*), 2) AS error_rate_pct
FROM inference_logs
WHERE timestamp > NOW() - INTERVAL '1 hour'
GROUP BY date_trunc('minute', timestamp), model_version
ORDER BY minute DESC;
-- Refresh every 10 seconds (adjust based on load)
CREATE UNIQUE INDEX ON inference_metrics_hourly (minute, model_version);
-- Auto-refresh function
CREATE OR REPLACE FUNCTION refresh_metrics()
RETURNS void AS $$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY inference_metrics_hourly;
END;
$$ LANGUAGE plpgsql;
-- Schedule refresh with pg_cron (if available)
SELECT cron.schedule('refresh-metrics', '*/10 * * * *', 'SELECT refresh_metrics()');
PostgreSQL Triggers for Anomaly Detection
Database triggers can detect anomalies as data arrives:
-- Alert table for tracking issues
CREATE TABLE system_alerts (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
severity TEXT NOT NULL, -- 'warning', 'critical'
alert_type TEXT NOT NULL,
message TEXT NOT NULL,
metadata JSONB,
acknowledged BOOLEAN DEFAULT FALSE
);
-- Function to check for high latency
CREATE OR REPLACE FUNCTION check_inference_anomalies()
RETURNS TRIGGER AS $$
DECLARE
recent_avg NUMERIC;
BEGIN
-- Check if inference time is unusually high
IF NEW.inference_time_ms > 5000 THEN
-- Get recent average
SELECT AVG(inference_time_ms) INTO recent_avg
FROM inference_logs
WHERE timestamp > NOW() - INTERVAL '5 minutes'
AND model_version = NEW.model_version;
-- If current inference is 3x the recent average, alert
IF NEW.inference_time_ms > (recent_avg * 3) THEN
INSERT INTO system_alerts (
severity, alert_type, message, metadata
) VALUES (
'warning',
'high_latency',
format('Inference latency spike: %sms (avg: %sms)',
NEW.inference_time_ms, recent_avg),
jsonb_build_object(
'model_version', NEW.model_version,
'inference_time_ms', NEW.inference_time_ms,
'recent_avg_ms', recent_avg
)
);
END IF;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Trigger on every insert
CREATE TRIGGER inference_anomaly_check
AFTER INSERT ON inference_logs
FOR EACH ROW
EXECUTE FUNCTION check_inference_anomalies();
Real-Time Updates with Supabase Subscriptions
Supabase provides WebSocket-based real-time subscriptions built on top of PostgreSQL's LISTEN/NOTIFY. This lets dashboards receive database changes instantly—no polling required.
Frontend Dashboard with Live Updates
Here's a React component that subscribes to metrics updates:
import { createClient } from '@supabase/supabase-js'
import { useEffect, useState } from 'react'
const supabase = createClient(
'https://your-project.supabase.co',
'your-anon-key'
)
export default function InferenceMonitor() {
const [metrics, setMetrics] = useState([])
const [alerts, setAlerts] = useState([])
useEffect(() => {
// Load initial data
loadMetrics()
loadAlerts()
// Subscribe to new inference logs
const inferenceSubscription = supabase
.channel('inference-logs')
.on(
'postgres_changes',
{
event: 'INSERT',
schema: 'public',
table: 'inference_logs'
},
(payload) => {
console.log('New inference:', payload.new)
// Refresh metrics when new data arrives
loadMetrics()
}
)
.subscribe()
// Subscribe to new alerts
const alertSubscription = supabase
.channel('alerts')
.on(
'postgres_changes',
{
event: 'INSERT',
schema: 'public',
table: 'system_alerts'
},
(payload) => {
setAlerts(prev => [payload.new, ...prev])
// Show notification
if (payload.new.severity === 'critical') {
showCriticalAlert(payload.new.message)
}
}
)
.subscribe()
return () => {
inferenceSubscription.unsubscribe()
alertSubscription.unsubscribe()
}
}, [])
async function loadMetrics() {
const { data } = await supabase
.from('inference_metrics_hourly')
.select('*')
.order('minute', { ascending: false })
.limit(60)
setMetrics(data || [])
}
async function loadAlerts() {
const { data } = await supabase
.from('system_alerts')
.select('*')
.eq('acknowledged', false)
.order('timestamp', { ascending: false })
setAlerts(data || [])
}
return (
<div className="dashboard">
<MetricsChart data={metrics} />
<AlertPanel alerts={alerts} />
<LiveStats metrics={metrics[0]} />
</div>
)
}
ETL Automation for Historical Analysis
Real-time monitoring handles the present. But you also need historical analysis: weekly trends, month-over-month comparisons, long-term performance tracking. This requires ETL jobs that aggregate raw data into summary tables.
Python ETL Job with Schedule
A simple scheduled job handles daily aggregations:
import asyncpg
from datetime import datetime, timedelta
import schedule
import time
async def aggregate_daily_metrics():
"""Run daily to aggregate metrics into summary table"""
conn = await asyncpg.connect(
host="your-host.supabase.co",
database="postgres",
user="postgres",
password="your-password"
)
yesterday = datetime.now() - timedelta(days=1)
# Aggregate yesterday's data
await conn.execute("""
INSERT INTO daily_metrics_summary (
date, model_version, total_requests, avg_latency_ms,
p50_latency_ms, p95_latency_ms, p99_latency_ms,
avg_confidence, total_errors, error_rate_pct
)
SELECT
DATE($1) AS date,
model_version,
COUNT(*) AS total_requests,
AVG(inference_time_ms) AS avg_latency_ms,
PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY inference_time_ms) AS p50,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY inference_time_ms) AS p95,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY inference_time_ms) AS p99,
AVG(confidence_score) AS avg_confidence,
SUM(CASE WHEN is_error THEN 1 ELSE 0 END) AS total_errors,
ROUND(100.0 * SUM(CASE WHEN is_error THEN 1 ELSE 0 END) / COUNT(*), 2) AS error_rate
FROM inference_logs
WHERE DATE(timestamp) = DATE($1)
GROUP BY model_version
ON CONFLICT (date, model_version)
DO UPDATE SET
total_requests = EXCLUDED.total_requests,
avg_latency_ms = EXCLUDED.avg_latency_ms,
p50_latency_ms = EXCLUDED.p50_latency_ms,
p95_latency_ms = EXCLUDED.p95_latency_ms,
p99_latency_ms = EXCLUDED.p99_latency_ms,
avg_confidence = EXCLUDED.avg_confidence,
total_errors = EXCLUDED.total_errors,
error_rate_pct = EXCLUDED.error_rate_pct
""", yesterday)
await conn.close()
print(f"Aggregated metrics for {yesterday.date()}")
# Schedule job
schedule.every().day.at("02:00").do(
lambda: asyncio.run(aggregate_daily_metrics())
)
# Run scheduler
while True:
schedule.run_pending()
time.sleep(60)
Architecture Patterns for Scale
When to Add a Message Queue
For most applications, direct database writes work perfectly. But at high scale (10K+ events/second), consider adding a message queue like Redis or RabbitMQ as a buffer. This prevents telemetry spikes from overwhelming your database.
Separation of Hot and Cold Data
Keep recent data (last 7-30 days) in fast tables with extensive indexes. Archive older data to cheaper cold storage or separate tables with minimal indexing. Queries on recent data stay fast, and you save on storage costs.
Sampling for High-Volume Streams
At extreme scale, log everything but only process samples. Store 100% of errors and anomalies, but only 1-10% of successful requests. You maintain statistical accuracy while reducing infrastructure costs by an order of magnitude.
Monitoring the Monitors: Meta-Observability
Your monitoring pipeline is critical infrastructure. What happens when it fails? Implement health checks:
@app.get("/health")
async def health_check():
"""Endpoint for monitoring the monitoring system"""
try:
# Check database connectivity
async with db_pool.acquire() as conn:
await conn.fetchval("SELECT 1")
# Check recent data freshness
async with db_pool.acquire() as conn:
last_log = await conn.fetchval("""
SELECT timestamp FROM inference_logs
ORDER BY timestamp DESC LIMIT 1
""")
# Alert if no logs in last 5 minutes
age = datetime.now(last_log.tzinfo) - last_log
if age.total_seconds() > 300:
return {
"status": "degraded",
"message": f"No logs received in {age.total_seconds()}s"
}
return {"status": "healthy", "database": "connected"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
Cost Optimization Strategies
Telemetry can get expensive fast. Here's how to keep costs reasonable:
- Batch writes: Buffer events client-side and send in batches of 10-100 instead of one at a time
- Compress payloads: gzip compression reduces bandwidth costs by 70-90%
- Smart retention: Auto-delete raw logs after 30 days, keep aggregates for years
- Index pruning: Only index fields you actually query—every index costs insert performance
- Connection pooling: Reuse database connections instead of creating new ones per request
Real-World Lessons from Production
Never Block Your Main Service
Telemetry failures should never crash your application. Always use timeouts, catch exceptions, and fail silently. It's better to lose some logs than to take down your API because the monitoring database is overloaded.
Start Simple, Optimize Later
Don't build Kafka clusters and Spark jobs on day one. A Postgres table and scheduled Python script will handle 95% of use cases. Premature optimization wastes time and adds complexity. Scale your monitoring system only when you have actual scale problems.
Dashboard Fatigue Is Real
You can build beautiful real-time dashboards, but if nobody looks at them, they're worthless. Focus on actionable alerts instead of vanity metrics. A Slack notification when error rates spike is worth more than 50 graphs that nobody monitors.
The Tools That Matter
You don't need a massive stack. Here's what actually gets used in production:
- Supabase: Managed Postgres with real-time subscriptions and great DX
- FastAPI: Lightweight Python framework perfect for telemetry endpoints
- React + Recharts: Build dashboards that look professional in hours, not weeks
- Vercel/Railway: Deploy monitoring services with zero DevOps overhead
- GitHub Actions: Schedule ETL jobs without managing cron servers
Beyond Basic Monitoring
Once you have the foundation, extend it:
- A/B testing infrastructure: Track metrics separately for experimental model versions
- User behavior analytics: Understand how people interact with your AI system
- Cost tracking: Monitor API costs, compute usage, and ROI per user/feature
- Model drift detection: Compare current predictions against historical baselines
- Performance regression testing: Alert when new model versions degrade metrics
The Bottom Line
Real-time monitoring isn't optional for production AI systems—it's how you know your system is working. The good news: building lightweight, scalable pipelines is simpler than ever. Modern managed databases like Supabase handle the hard parts (real-time subscriptions, connection pooling, backups), letting you focus on your application logic.
You don't need enterprise tools or dedicated data engineering teams. A well-designed schema, clean ingestion code, and WebSocket subscriptions get you 90% of the way there. The remaining 10% comes from experience: learning which metrics matter, tuning indexes for your query patterns, and optimizing for your specific scale.
Start building your monitoring pipeline today. Your future self—the one responding to production incidents at 3 AM—will thank you for the visibility.