skip to content

Search

Real-Time Anomaly Detection

14 min read

Build a streaming anomaly detection pipeline with Kafka and PyTorch. Train an autoencoder on normal sensor behavior, then deploy it for real-time inference on live data.

Real-Time Anomaly Detection with Kafka and PyTorch

A sensor starts drifting at 2:47 AM. The temperature reading climbs 0.3 degrees every minute—not enough to trigger a static threshold, but unmistakably wrong to anyone watching the trend. By the time an operator checks the dashboard at 9 AM, six hours of product has been ruined.

Static thresholds fail because anomalies aren’t just “values that are too high.” They’re values that are unusual given context. A temperature of 85°F might be normal at 2 PM in July but anomalous at 2 AM in January. A vibration pattern might be fine at 1000 RPM but dangerous at 500 RPM.

This is where deep learning shines. An autoencoder learns the normal patterns in your data—the daily cycles, the correlations between sensors, the expected response to operating conditions. At inference time, it reconstructs what it expects to see. When the reconstruction error spikes, something unexpected is happening.

In this post, we’ll build the full pipeline:

  1. Generate realistic synthetic sensor data with injected anomalies
  2. Train a PyTorch autoencoder on “normal” behavior
  3. Stream live sensor data through Kafka
  4. Detect anomalies in real-time with sub-second latency
  5. Alert when reconstruction error exceeds calibrated thresholds

By the end, you’ll have a pattern you can adapt to any time-series monitoring problem—IoT sensors, server metrics, financial transactions, or network traffic.


Why Autoencoders for Anomaly Detection?

The fundamental insight: you don’t need labeled anomalies to detect anomalies. You only need labeled normal data—and you have tons of it.

An autoencoder is trained to compress and reconstruct its input. When trained only on normal data:

  • Normal inputs → low reconstruction error (the model has seen patterns like this)
  • Anomalous inputs → high reconstruction error (the model can’t reconstruct what it hasn’t learned)

This is fundamentally different from supervised approaches (which need labeled anomaly examples) and statistical approaches (which assume simple distributions). The autoencoder learns arbitrarily complex patterns of normality.

Normal data ─────►  [Encoder] ──► latent space ──► [Decoder] ──► Reconstruction
                                                                      │
                         Compare ◄──────────────────────────────────────
                            │
                     Reconstruction Error
                            │
                    Low = Normal    High = Anomaly

Setup

import json
import time
import threading
from datetime import datetime, timedelta
from dataclasses import dataclass
 
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
 
# Reproducibility
np.random.seed(42)
torch.manual_seed(42)
 
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

Generating Realistic Sensor Data

Real IoT sensor data has structure: daily cycles, correlations between sensors, gradual drift, and noise. We’ll simulate a system with 6 correlated sensors—think of a grain dryer or industrial HVAC unit.

The key is making the normal data realistic enough that the autoencoder learns meaningful patterns, not just noise.

@dataclass
class SensorConfig:
    name: str
    base_value: float
    daily_amplitude: float  # How much it varies over the day
    noise_std: float
    correlation_with: dict[str, float] = None  # Correlation with other sensors
 
 
SENSORS = [
    SensorConfig("temperature", base_value=72.0, daily_amplitude=8.0, noise_std=0.5),
    SensorConfig("humidity", base_value=45.0, daily_amplitude=12.0, noise_std=1.0),
    SensorConfig("pressure", base_value=14.7, daily_amplitude=0.3, noise_std=0.05),
    SensorConfig("vibration", base_value=2.5, daily_amplitude=0.8, noise_std=0.2),
    SensorConfig("power_draw", base_value=450.0, daily_amplitude=100.0, noise_std=15.0),
    SensorConfig("airflow", base_value=1200.0, daily_amplitude=200.0, noise_std=30.0),
]
N_SENSORS = len(SENSORS)
 
 
def generate_normal_data(n_samples: int, sample_interval_minutes: int = 5) -> np.ndarray:
    """Generate realistic correlated sensor data for normal operation."""
    t = np.arange(n_samples)
    hours = (t * sample_interval_minutes / 60) % 24  # Time of day
    days = t * sample_interval_minutes / (60 * 24)  # Day number
 
    data = np.zeros((n_samples, N_SENSORS))
 
    for i, sensor in enumerate(SENSORS):
        # Daily cycle (sinusoidal with phase offset per sensor)
        phase = i * 0.5  # Each sensor peaks at a different time
        daily_pattern = sensor.daily_amplitude * np.sin(2 * np.pi * (hours - phase) / 24)
 
        # Slow weekly drift
        weekly_drift = sensor.daily_amplitude * 0.1 * np.sin(2 * np.pi * days / 7)
 
        # Gaussian noise
        noise = np.random.normal(0, sensor.noise_std, n_samples)
 
        data[:, i] = sensor.base_value + daily_pattern + weekly_drift + noise
 
    # Add correlations: temperature and power_draw are positively correlated
    data[:, 4] += 0.5 * (data[:, 0] - SENSORS[0].base_value)  # power follows temp
    data[:, 5] += 0.3 * (data[:, 4] - SENSORS[4].base_value)  # airflow follows power
    data[:, 1] -= 0.4 * (data[:, 0] - SENSORS[0].base_value)  # humidity inversely tracks temp
 
    return data
 
 
def inject_anomalies(data: np.ndarray, anomaly_fraction: float = 0.05) -> tuple[np.ndarray, np.ndarray]:
    """Inject realistic anomalies into sensor data. Returns (data, labels)."""
    n_samples = len(data)
    labels = np.zeros(n_samples, dtype=int)  # 0 = normal, 1 = anomaly
    anomaly_data = data.copy()
    n_anomalies = int(n_samples * anomaly_fraction)
 
    anomaly_types = ["spike", "drift", "stuck", "correlation_break"]
 
    idx = 0
    while idx < n_anomalies:
        anomaly_type = np.random.choice(anomaly_types)
        sensor_idx = np.random.randint(0, N_SENSORS)
        start = np.random.randint(0, n_samples - 50)
 
        if anomaly_type == "spike":
            # Sudden spike lasting 1-5 samples
            duration = np.random.randint(1, 6)
            magnitude = SENSORS[sensor_idx].daily_amplitude * np.random.uniform(3, 8)
            end = min(start + duration, n_samples)
            anomaly_data[start:end, sensor_idx] += magnitude * np.random.choice([-1, 1])
            labels[start:end] = 1
            idx += duration
 
        elif anomaly_type == "drift":
            # Gradual drift over 20-50 samples
            duration = np.random.randint(20, 51)
            end = min(start + duration, n_samples)
            drift = np.linspace(0, SENSORS[sensor_idx].daily_amplitude * 4, end - start)
            anomaly_data[start:end, sensor_idx] += drift
            labels[start:end] = 1
            idx += duration
 
        elif anomaly_type == "stuck":
            # Sensor stuck at a single value
            duration = np.random.randint(10, 30)
            end = min(start + duration, n_samples)
            anomaly_data[start:end, sensor_idx] = anomaly_data[start, sensor_idx]
            labels[start:end] = 1
            idx += duration
 
        elif anomaly_type == "correlation_break":
            # Break the normal correlation between sensors
            duration = np.random.randint(15, 40)
            end = min(start + duration, n_samples)
            anomaly_data[start:end, sensor_idx] = (
                SENSORS[sensor_idx].base_value
                + np.random.normal(0, SENSORS[sensor_idx].daily_amplitude * 2, end - start)
            )
            labels[start:end] = 1
            idx += duration
 
    return anomaly_data, labels
 
 
# Generate data
print("Generating sensor data...")
 
# Training data: 30 days of normal operation (5-minute intervals)
n_train = 30 * 24 * 12  # 30 days × 24 hours × 12 samples/hour
train_data = generate_normal_data(n_train)
 
# Test data: 7 days with injected anomalies
n_test = 7 * 24 * 12
test_normal = generate_normal_data(n_test)
test_data, test_labels = inject_anomalies(test_normal, anomaly_fraction=0.08)
 
print(f"Training samples: {n_train:,} ({n_train * 5 / 60 / 24:.0f} days of normal data)")
print(f"Test samples: {n_test:,} ({n_test * 5 / 60 / 24:.0f} days with anomalies)")
print(f"Anomaly rate in test: {test_labels.mean():.1%}")
# Visualize a slice of the test data with anomalies highlighted
fig, axes = plt.subplots(N_SENSORS, 1, figsize=(14, 10), sharex=True)
plot_range = slice(0, 2000)  # First ~7 days
 
for i, (ax, sensor) in enumerate(zip(axes, SENSORS)):
    t = np.arange(len(test_data[plot_range]))
    values = test_data[plot_range, i]
    anomaly_mask = test_labels[plot_range].astype(bool)
 
    ax.plot(t[~anomaly_mask], values[~anomaly_mask], '.', markersize=1, alpha=0.5, label='Normal')
    ax.plot(t[anomaly_mask], values[anomaly_mask], '.', markersize=3, color='red', alpha=0.8, label='Anomaly')
    ax.set_ylabel(sensor.name, fontsize=8)
    if i == 0:
        ax.legend(fontsize=8, loc='upper right')
 
axes[-1].set_xlabel("Sample index")
fig.suptitle("Sensor Data with Injected Anomalies", fontsize=12)
plt.tight_layout()
plt.show()

Building the Autoencoder

Our autoencoder takes a window of sensor readings and tries to reconstruct it. The architecture is deliberately simple—anomaly detection doesn’t need a massive model. What matters is that the bottleneck (latent dimension) is small enough to force the model to learn compressed representations of normal behavior.

We use a temporal window approach: instead of looking at one timestamp, the model sees the last N readings across all sensors. This lets it learn temporal patterns like “temperature usually rises gradually” and catch anomalies like sudden spikes.

WINDOW_SIZE = 12  # 12 samples × 5 min = 1 hour of context
INPUT_DIM = WINDOW_SIZE * N_SENSORS  # Flattened input
LATENT_DIM = 8  # Compressed representation
 
 
class SensorAutoencoder(nn.Module):
    """Autoencoder for multi-sensor time series anomaly detection."""
 
    def __init__(self, input_dim: int, latent_dim: int):
        super().__init__()
 
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.BatchNorm1d(32),
            nn.Linear(32, latent_dim),
        )
 
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 32),
            nn.ReLU(),
            nn.BatchNorm1d(32),
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Linear(64, input_dim),
        )
 
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)
        return reconstructed
 
 
model = SensorAutoencoder(INPUT_DIM, LATENT_DIM).to(DEVICE)
print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")
print(f"Input dimension: {INPUT_DIM} ({WINDOW_SIZE} steps × {N_SENSORS} sensors)")
print(f"Latent dimension: {LATENT_DIM} (compression ratio: {INPUT_DIM / LATENT_DIM:.0f}x)")

Preprocessing: Windowing and Normalization

We normalize each sensor independently (zero mean, unit variance) using statistics from the training set only. Then we create sliding windows of sensor readings.

# Compute normalization stats from training data only
train_mean = train_data.mean(axis=0)
train_std = train_data.std(axis=0)
 
 
def normalize(data: np.ndarray) -> np.ndarray:
    return (data - train_mean) / train_std
 
 
def create_windows(data: np.ndarray, window_size: int) -> np.ndarray:
    """Create sliding windows from time series data."""
    n_windows = len(data) - window_size + 1
    windows = np.zeros((n_windows, window_size * data.shape[1]))
    for i in range(n_windows):
        windows[i] = data[i:i + window_size].flatten()
    return windows
 
 
# Prepare training data
train_normalized = normalize(train_data)
train_windows = create_windows(train_normalized, WINDOW_SIZE)
train_tensor = torch.FloatTensor(train_windows)
train_loader = DataLoader(
    TensorDataset(train_tensor, train_tensor),  # Input = target for autoencoder
    batch_size=256,
    shuffle=True,
)
 
# Prepare test data
test_normalized = normalize(test_data)
test_windows = create_windows(test_normalized, WINDOW_SIZE)
test_tensor = torch.FloatTensor(test_windows)
# Labels for windows: a window is anomalous if ANY sample in it is anomalous
test_window_labels = np.array([
    test_labels[i:i + WINDOW_SIZE].max() for i in range(len(test_windows))
])
 
print(f"Training windows: {len(train_windows):,}")
print(f"Test windows: {len(test_windows):,} ({test_window_labels.mean():.1%} anomalous)")

Training

We train on normal data only. The loss function is MSE between input and reconstruction—the model learns to minimize reconstruction error for normal patterns. Anomalies, which the model has never seen, will naturally have higher reconstruction error.

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, factor=0.5)
criterion = nn.MSELoss()
 
N_EPOCHS = 50
train_losses = []
 
print("Training autoencoder on normal data...\n")
for epoch in range(N_EPOCHS):
    model.train()
    epoch_loss = 0.0
    n_batches = 0
 
    for batch_x, batch_y in train_loader:
        batch_x = batch_x.to(DEVICE)
        batch_y = batch_y.to(DEVICE)
 
        reconstructed = model(batch_x)
        loss = criterion(reconstructed, batch_y)
 
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
 
        epoch_loss += loss.item()
        n_batches += 1
 
    avg_loss = epoch_loss / n_batches
    train_losses.append(avg_loss)
    scheduler.step(avg_loss)
 
    if (epoch + 1) % 10 == 0:
        print(f"Epoch {epoch+1:3d}/{N_EPOCHS}  Loss: {avg_loss:.6f}  LR: {optimizer.param_groups[0]['lr']:.2e}")
 
print(f"\nFinal loss: {train_losses[-1]:.6f}")

Threshold Calibration

The reconstruction error on normal data gives us a distribution of “how wrong the model usually is.” We set the anomaly threshold based on this distribution—typically at the 95th or 99th percentile.

Choosing the right percentile is a precision-recall tradeoff:

  • Lower threshold (95th percentile) → more alerts, catches more anomalies, more false positives
  • Higher threshold (99th percentile) → fewer alerts, misses subtle anomalies, fewer false positives

In production, tune this based on the cost of false positives vs. missed anomalies.

model.eval()
 
# Compute reconstruction errors on training data (normal only)
with torch.no_grad():
    train_reconstructed = model(train_tensor.to(DEVICE))
    train_errors = torch.mean((train_tensor.to(DEVICE) - train_reconstructed) ** 2, dim=1).cpu().numpy()
 
# Set thresholds at different percentiles
thresholds = {
    "aggressive (95th)": np.percentile(train_errors, 95),
    "balanced (99th)": np.percentile(train_errors, 99),
    "conservative (99.5th)": np.percentile(train_errors, 99.5),
}
 
print("Reconstruction error distribution on NORMAL training data:")
print(f"  Mean:   {train_errors.mean():.6f}")
print(f"  Std:    {train_errors.std():.6f}")
print(f"  Median: {np.median(train_errors):.6f}")
print(f"  Max:    {train_errors.max():.6f}")
print()
for name, thresh in thresholds.items():
    print(f"  {name}: {thresh:.6f}")
# Evaluate on test data
with torch.no_grad():
    test_reconstructed = model(test_tensor.to(DEVICE))
    test_errors = torch.mean((test_tensor.to(DEVICE) - test_reconstructed) ** 2, dim=1).cpu().numpy()
 
print("=== Detection Performance ===\n")
for name, thresh in thresholds.items():
    predicted = (test_errors > thresh).astype(int)
    tp = ((predicted == 1) & (test_window_labels == 1)).sum()
    fp = ((predicted == 1) & (test_window_labels == 0)).sum()
    fn = ((predicted == 0) & (test_window_labels == 1)).sum()
    tn = ((predicted == 0) & (test_window_labels == 0)).sum()
 
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
 
    print(f"{name}:")
    print(f"  Precision: {precision:.3f}  Recall: {recall:.3f}  F1: {f1:.3f}")
    print(f"  TP={tp}  FP={fp}  FN={fn}  TN={tn}")
    print()
# Visualize reconstruction errors vs anomaly labels
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 6), sharex=True)
 
# Reconstruction error over time
ax1.plot(test_errors, linewidth=0.5, alpha=0.7, label='Reconstruction Error')
ax1.axhline(y=thresholds["balanced (99th)"], color='r', linestyle='--', label='Threshold (99th)')
ax1.set_ylabel('Reconstruction Error')
ax1.legend(fontsize=8)
ax1.set_title('Autoencoder Anomaly Detection')
 
# Ground truth labels
ax2.fill_between(range(len(test_window_labels)), test_window_labels, alpha=0.3, color='red', label='True Anomalies')
predicted_99 = (test_errors > thresholds["balanced (99th)"]).astype(int)
ax2.fill_between(range(len(predicted_99)), -predicted_99, alpha=0.3, color='blue', label='Predicted Anomalies')
ax2.set_ylabel('Labels')
ax2.set_xlabel('Window Index')
ax2.legend(fontsize=8)
 
plt.tight_layout()
plt.show()

Streaming Inference with Kafka

Now let’s put this into a real-time pipeline. The architecture:

  1. Producer: Simulates sensors publishing readings to a sensor-readings Kafka topic
  2. Consumer/Detector: Reads from the topic, maintains a sliding window, runs the autoencoder, and publishes alerts to an anomaly-alerts topic

This pattern decouples data ingestion from anomaly detection. You can scale producers and consumers independently, and the Kafka topic provides a durable buffer if the detector falls behind.

from confluent_kafka import Producer, Consumer, KafkaError
 
KAFKA_BOOTSTRAP = "localhost:9092"
READINGS_TOPIC = "sensor-readings"
ALERTS_TOPIC = "anomaly-alerts"
 
 
def create_producer() -> Producer:
    return Producer({
        "bootstrap.servers": KAFKA_BOOTSTRAP,
        "linger.ms": 5,
        "batch.num.messages": 100,
    })
 
 
def simulate_sensor_producer(
    producer: Producer,
    data: np.ndarray,
    interval_seconds: float = 0.1,  # Accelerated for demo
):
    """Publish sensor readings to Kafka, simulating real-time data."""
    for i, reading in enumerate(data):
        message = {
            "timestamp": datetime.now().isoformat(),
            "sample_index": i,
            "sensors": {
                sensor.name: float(reading[j])
                for j, sensor in enumerate(SENSORS)
            },
        }
 
        producer.produce(
            READINGS_TOPIC,
            key=str(i),
            value=json.dumps(message),
        )
 
        if i % 100 == 0:
            producer.flush()
            print(f"Published {i:,}/{len(data):,} readings", end="\r")
 
        time.sleep(interval_seconds)
 
    producer.flush()
    print(f"\nPublished all {len(data):,} readings")
class StreamingAnomalyDetector:
    """Real-time anomaly detection consumer.
    
    Maintains a sliding window of sensor readings, runs the autoencoder
    on each new window, and publishes alerts when the reconstruction
    error exceeds the threshold.
    """
 
    def __init__(
        self,
        model: nn.Module,
        threshold: float,
        window_size: int,
        train_mean: np.ndarray,
        train_std: np.ndarray,
    ):
        self.model = model
        self.model.eval()
        self.threshold = threshold
        self.window_size = window_size
        self.train_mean = train_mean
        self.train_std = train_std
        self.buffer: list[np.ndarray] = []  # Sliding window buffer
        self.alerts: list[dict] = []
 
    def process_reading(self, reading: dict) -> dict | None:
        """Process a single sensor reading. Returns an alert dict if anomaly detected."""
        # Extract sensor values in the correct order
        values = np.array([reading["sensors"][s.name] for s in SENSORS])
        self.buffer.append(values)
 
        # Wait until we have a full window
        if len(self.buffer) < self.window_size:
            return None
 
        # Keep only the latest window
        if len(self.buffer) > self.window_size:
            self.buffer = self.buffer[-self.window_size:]
 
        # Normalize and flatten
        window = np.array(self.buffer)
        normalized = (window - self.train_mean) / self.train_std
        flat = normalized.flatten()
 
        # Run inference
        with torch.no_grad():
            input_tensor = torch.FloatTensor(flat).unsqueeze(0).to(DEVICE)
            reconstructed = self.model(input_tensor)
            error = torch.mean((input_tensor - reconstructed) ** 2).item()
 
        # Check threshold
        if error > self.threshold:
            alert = {
                "timestamp": reading["timestamp"],
                "sample_index": reading["sample_index"],
                "reconstruction_error": error,
                "threshold": self.threshold,
                "severity": "critical" if error > self.threshold * 3 else "warning",
                "sensor_values": reading["sensors"],
            }
            self.alerts.append(alert)
            return alert
 
        return None
 
 
def run_detector(
    detector: StreamingAnomalyDetector,
    consumer: Consumer,
    producer: Producer,
    max_messages: int = 1000,
):
    """Run the streaming detector loop."""
    consumer.subscribe([READINGS_TOPIC])
    processed = 0
    alerts_sent = 0
 
    try:
        while processed < max_messages:
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                print(f"Consumer error: {msg.error()}")
                break
 
            reading = json.loads(msg.value().decode("utf-8"))
            alert = detector.process_reading(reading)
 
            if alert:
                producer.produce(
                    ALERTS_TOPIC,
                    key=str(alert["sample_index"]),
                    value=json.dumps(alert),
                )
                alerts_sent += 1
                print(f"ALERT [{alert['severity'].upper()}] "
                      f"Sample {alert['sample_index']}: "
                      f"error={alert['reconstruction_error']:.4f}")
 
            processed += 1
            if processed % 200 == 0:
                print(f"Processed {processed:,} messages, {alerts_sent} alerts")
 
    finally:
        consumer.close()
        producer.flush()
        print(f"\nDone. Processed {processed:,} messages, sent {alerts_sent} alerts")

Running the Full Pipeline

To run this end-to-end, you need Kafka running locally. The docker-compose.yml in this directory will set that up. Then run the producer and detector in separate terminals—or use the threaded demo below.

# Demo: Run both producer and detector in threads
# In production, these would be separate processes/containers
 
detector = StreamingAnomalyDetector(
    model=model,
    threshold=thresholds["balanced (99th)"],
    window_size=WINDOW_SIZE,
    train_mean=train_mean,
    train_std=train_std,
)
 
# Use a subset of test data for the demo
demo_data = test_data[:500]
demo_labels = test_labels[:500]
 
print(f"Running streaming demo with {len(demo_data)} readings...")
print(f"Anomalous readings in demo: {demo_labels.sum()} ({demo_labels.mean():.1%})")
print(f"Threshold: {thresholds['balanced (99th)']:.6f}")
print()
 
# Simulate the streaming pipeline without Kafka (direct function calls)
alerts = []
for i in range(len(demo_data)):
    reading = {
        "timestamp": datetime.now().isoformat(),
        "sample_index": i,
        "sensors": {
            sensor.name: float(demo_data[i, j])
            for j, sensor in enumerate(SENSORS)
        },
    }
    alert = detector.process_reading(reading)
    if alert:
        alerts.append(alert)
 
print(f"\nTotal alerts: {len(alerts)}")
print(f"True anomalous windows in range: {demo_labels[WINDOW_SIZE-1:].sum()}")
 
# Show alert distribution
if alerts:
    critical = sum(1 for a in alerts if a["severity"] == "critical")
    warning = len(alerts) - critical
    print(f"  Critical: {critical}")
    print(f"  Warning:  {warning}")

Production Considerations

The prototype above works. Here’s what you need to add for production:

Model Retraining

Normal behavior drifts over time—seasonal changes, equipment aging, process modifications. Retrain periodically on recent normal data:

  • Schedule weekly or monthly retraining
  • Compare new model’s reconstruction error distribution against the old one
  • Only deploy if the new model performs comparably on a held-out validation set

Alert Deduplication

A single anomaly event often triggers alerts on consecutive windows. In production, deduplicate by grouping alerts within a configurable cooldown window (e.g., suppress duplicate alerts within 5 minutes of the first).

Multi-Scale Detection

Different anomaly types have different timescales. Use multiple window sizes in parallel:

  • Short windows (5-10 samples) for spikes
  • Medium windows (50-100 samples) for drift
  • Long windows (500+ samples) for slow degradation

Inference Latency

Our autoencoder is tiny (~10K parameters) and runs in microseconds on CPU. For higher-dimensional models, consider:

  • ONNX Runtime for optimized inference
  • Batching multiple windows before inference
  • GPU inference if processing thousands of streams simultaneously

Conclusion

Real-time anomaly detection with autoencoders works because it inverts the usual ML problem. Instead of learning “what does an anomaly look like” (which requires labeled anomaly data you probably don’t have), you learn “what does normal look like” (which you have in abundance). Everything else is, by definition, anomalous.

The pipeline we built:

  1. Data generation with realistic sensor correlations and four anomaly types
  2. Autoencoder training on normal data only—no anomaly labels needed
  3. Threshold calibration using the reconstruction error distribution
  4. Streaming inference with Kafka for real-time detection

This same pattern applies to any domain where you have abundant normal data and rare, diverse anomalies—which is most monitoring problems.

Extending This

  • Variational Autoencoders (VAEs) provide probabilistic anomaly scores instead of hard thresholds
  • Temporal Convolutional Networks capture longer-range dependencies than our MLP
  • Transformer-based models can learn complex multi-sensor attention patterns
  • Combine with a feature store to serve real-time anomaly scores as features for downstream models
  • Deploy on AWS with Kinesis for cloud-native streaming