Introduction
Building production machine learning systems goes far beyond training models. The real engineering challenge lies in the data infrastructure: how you ingest, transform, store, serve, and monitor features at scale. This guide provides a comprehensive overview of a complete ML system design, from raw data sources to real-time serving and drift monitoring.
Whether you’re preparing for system design interviews or building production systems, this guide covers the architecture, components, tradeoffs, and implementation patterns you need to know.
System Architecture Overview
┌─────────────────────────────────────────────────────────────────────────────┐
│ RAW DATA SOURCES │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Event Logs │ │ Databases │ │ 3rd Party │ │ Files/Logs │ │
│ │ (Clickstream│ │ (Postgres, │ │ APIs │ │ (CSV, JSON) │ │
│ │ Actions) │ │ MySQL) │ │ │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘ │
└────────────────────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ DATA INGESTION LAYER │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ Ingestion Framework (Kafka, Kinesis, Airflow, dbt) │ │
│ │ • Batch ingestion (daily/hourly dumps) │ │
│ │ • Stream ingestion (real-time events) │ │
│ │ • CDC (Change Data Capture) from databases │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
└────────────────────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ FEATURE COMPUTATION ENGINE │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Batch Processor│ │ Stream Processor│ │ On-Demand │ │
│ │ (Spark, dbt, │ │ (Flink, Kafka │ │ Computation │ │
│ │ SQL, Python) │ │ Streams) │ │ (Python, SQL) │ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │ │
│ └────────────────────┴────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────▼────────────────────────────────────┐ │
│ │ Transformation Logic │ │
│ │ • Aggregations (sum, avg, count, min, max) │ │
│ │ • Rolling windows (7d, 30d, 90d) │ │
│ │ • Cross-entity joins │ │
│ │ • UDFs (Python, SQL functions) │ │
│ │ • ML transformations (embeddings, encodings) │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
└────────────────────────────────┬────────────────────────────────────────────┘
│
┌──────────────────┴──────────────────┐
│ │
▼ ▼
┌─────────────────────────────┐ ┌─────────────────────────────┐
│ OFFLINE FEATURE STORE │ │ ONLINE FEATURE STORE │
│ │ │ │
│ ┌───────────────────────┐ │ │ ┌───────────────────────┐ │
│ │ Storage Layer │ │ │ │ Storage Layer │ │
│ │ • Data Lake │ │ │ │ • Key-Value Store │ │
│ │ (Parquet/Iceberg) │ │ │ │ (Redis, DynamoDB) │ │
│ │ • Data Warehouse │ │ │ │ • Specialized │ │
│ │ (BigQuery, │ │ │ │ (Feast, Tecton) │ │
│ │ Snowflake) │ │ │ │ │ │
│ └───────────────────────┘ │ │ └───────────────────────┘ │
│ │ │ │
│ ┌───────────────────────┐ │ │ ┌───────────────────────┐ │
│ │ Indexing & Partition │ │ │ │ Caching Layer │ │
│ │ • Date partitioning │ │ │ │ • L1: In-memory │ │
│ │ • Entity partitioning│ │ │ │ • L2: Distributed │ │
│ │ • Columnar storage │ │ │ │ cache │ │
│ └───────────────────────┘ │ │ └───────────────────────┘ │
│ │ │ │
│ ┌───────────────────────┐ │ │ ┌───────────────────────┐ │
│ │ Query Engine │ │ │ │ Lookup Engine │ │
│ │ • ASOF joins │ │ │ │ • Point lookups │ │
│ │ • Range queries │ │ │ │ • Batch lookups │ │
│ │ • Historical scans │ │ │ │ • TTL management │ │
│ └───────────────────────┘ │ │ └───────────────────────┘ │
│ │ │ │
│ ┌───────────────────────┐ │ │ ┌───────────────────────┐ │
│ │ Materialization │ │ │ │ Consistency │ │
│ │ • Latest snapshot │ │ │ │ • Eventual │ │
│ │ • Point-in-time │ │ │ │ • Strong (optional) │ │
│ │ export │ │ │ │ • Versioning │ │
│ └───────────────────────┘ │ │ └───────────────────────┘ │
└──────────────┬──────────────┘ └──────────────┬──────────────┘
│ │
│ ┌──────────────────┐ │
└─────────┤ MATERIALIZATION ├────────┘
│ JOB │
│ • Incremental │
│ • Full refresh │
│ • Conflict resol│
└──────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ FEATURE SERVING LAYER │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ Serving API / SDK │ │
│ │ • REST/gRPC endpoints │ │
│ │ • Python/Java/Go SDKs │ │
│ │ • Feature retrieval modes: │ │
│ │ - Online: Real-time point lookups │ │
│ │ - Batch: Export for batch prediction │ │
│ │ - Training: Point-in-time joins │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
└────────────────────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ FEATURE REGISTRY / METADATA STORE │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Feature │ │ Lineage │ │ Ownership & │ │
│ │ Catalog │ │ Tracking │ │ Governance │ │
│ │ • Name, type │ │ • Source → │ │ • Owner team │ │
│ │ • Schema │ │ Transform → │ │ • Access │ │
│ │ • Description │ │ Feature │ │ control │ │
│ │ • Tags │ │ • Impact │ │ • SLAs │ │
│ │ • Owner │ │ analysis │ │ • Deprecation │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ MODEL TRAINING & EVALUATION PIPELINE │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Training Data │ │ Feature │ │ Model │ │
│ │ Assembly │ │ Preprocessing │ │ Training │ │
│ │ • ASOF join │ │ • Imputation │ │ • Spark MLlib │ │
│ │ • Label eng. │ │ • Scaling │ │ • XGBoost │ │
│ │ • Time split │ │ • Encoding │ │ • PyTorch/TF │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Model │ │ Model │ │
│ │ Evaluation │ │ Registry │ │
│ │ • AUC, F1, MAE │ │ • Versioning │ │
│ │ • Fairness │ │ • Staging │ │
│ │ • Baseline cmp │ │ • Rollback │ │
│ └─────────────────┘ └─────────────────┘ │
└────────────────────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ REAL-TIME MODEL SERVING PIPELINE │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Request │ │ Feature │ │ Model │ │
│ │ Router │ │ Retrieval │ │ Inference │ │
│ │ • Load balance │ │ • Redis lookup │ │ • TF Serving │ │
│ │ • Circuit brk │ │ • Cache mgmt │ │ • TorchServe │ │
│ │ • Validation │ │ • Fallback │ │ • Triton │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Prediction │ │ Serving │ │
│ │ Response │ │ Log │ │
│ │ • JSON format │ │ • Kafka stream │ │
│ │ • Confidence │ │ • Monitoring │ │
│ │ • Explanation │ │ • Retraining │ │
│ └─────────────────┘ └─────────────────┘ │
└────────────────────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ METRIC AGGREGATION PIPELINE (DRIFT) │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Distribution │ │ Metric │ │ Aggregation │ │
│ │ Snapshot │ │ Computation │ │ Pipeline │ │
│ │ • Training │ │ • PSI │ │ • Batch (Spark)│ │
│ │ • Serving │ │ • KS test │ │ • Stream │ │
│ │ • Histograms │ │ • Mean shift │ │ (Flink) │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Alerting & │ │ Root Cause │ │ Retraining │ │
│ │ Thresholds │ │ Analysis │ │ Trigger │ │
│ │ • PSI > 0.25 │ │ • Feature │ │ • Auto/manual │ │
│ │ • Sustained │ │ drill-down │ │ • Validation │ │
│ │ • Segmented │ │ • Time-series │ │ • Deployment │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└────────────────────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ DATA QUALITY & MONITORING │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Schema │ │ Drift │ │ Freshness & │ │
│ │ Validation │ │ Detection │ │ Latency │ │
│ │ • Type checks │ │ • PSI │ │ • Age checks │ │
│ │ • Null rates │ │ • KS test │ │ • Lag alerts │ │
│ │ • Cardinality │ │ • Distribution │ │ • SLA tracking │ │
│ │ • Range checks │ │ shifts │ │ │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Alerting │ │ Dashboard & │ │ Automated │ │
│ │ System │ │ Observability │ │ Remediation │ │
│ │ • PagerDuty │ │ • Grafana │ │ • Auto-retry │ │
│ │ • Slack │ │ • Datadog │ │ • Fallback │ │
│ │ • Email │ │ • Custom UI │ │ features │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
1. Data Ingestion Layer
Purpose: Move raw data from sources to the computation engine reliably and efficiently.
1.1 Batch Ingestion
What: Periodic bulk data loads (daily, hourly)
Tools: Airflow, dbt, custom ETL scripts
Pattern: Extract → Transform → Load (ETL) or Extract → Load → Transform (ELT)
Key concerns:
- Idempotency: Re-running produces same result
- Incremental loads: Only new/changed data
- Schema evolution: Handle column additions/removals
Example workflow:
Daily job at 2 AM:
1. Extract: Pull yesterday's data from source database
2. Transform: Clean, validate, aggregate
3. Load: Write to data lake (Parquet)
4. Validate: Check row counts, null rates
Tradeoffs:
- ✅ Simpler, cheaper, easier to debug
- ❌ Higher latency (hours/days)
- ❌ Not suitable for real-time features
1.2 Stream Ingestion
What: Real-time event processing
Tools: Kafka, Kinesis, Pulsar
Pattern: Event-driven, continuous processing
Key concerns:
- Exactly-once semantics: Process each event exactly once
- Ordering guarantees: Maintain event order within partitions
- Backpressure handling: Handle traffic spikes
- Late-arriving events: Handle events that arrive after window closes
Example workflow:
User clicks "Add to Cart"
→ Event published to Kafka topic
→ Flink consumer processes event
→ Feature computed (e.g., cart_value)
→ Written to online store (Redis)
Tradeoffs:
- ✅ Real-time, low latency
- ❌ Complex, expensive, harder to debug
- ❌ Requires more infrastructure
1.3 CDC (Change Data Capture)
What: Capture database changes in real-time
Tools: Debezium, Maxwell, AWS DMS
Pattern: Read database logs (binlog, WAL) → stream changes
Key concerns:
- Schema changes: Handle DDL operations
- Data type mapping: Map database types to feature types
- Transaction boundaries: Maintain transactional consistency
Example workflow:
PostgreSQL database
→ Debezium reads WAL (Write-Ahead Log)
→ Publishes changes to Kafka
→ Downstream consumers process changes
Tradeoffs:
- ✅ Captures all changes, low latency
- ❌ Adds complexity to database
- ❌ Database-specific implementations
2. Feature Computation Engine
Purpose: Transform raw data into features using batch, stream, or on-demand computation.
2.1 Batch Processor
What: Compute features on historical data at regular intervals
Tools: Spark (Scala/Python), dbt, SQL, Python (pandas)
Use cases:
- Daily aggregations (e.g., user_avg_purchase_30d)
- Rolling window features (e.g., review_count_7d)
- Cross-entity joins (e.g., brand_avg_rating)
Scala/Spark example:
// Compute rolling 30-day average rating per brand
val features = reviewsDF
.where(col("timestamp") >= current_date() - 30)
.groupBy("brand")
.agg(
avg("rating").as("avg_rating_30d"),
count("*").as("review_count_30d")
)
features.write.parquet("s3://features/brand_features/")
Key concerns:
- Partitioning: Partition by date/entity for efficient queries
- Incremental computation: Only recompute changed data
- Idempotency: Re-running produces same result
2.2 Stream Processor
What: Compute features in real-time from event streams
Tools: Flink, Kafka Streams, Spark Structured Streaming
Use cases:
- Real-time aggregations (e.g., session_duration)
- Sliding windows (e.g., clicks_last_5min)
- Event-driven features (e.g., last_action_timestamp)
Flink example:
// Compute clicks in last 5 minutes per user
DataStream<ClickEvent> clicks = ...;
clicks
.keyBy(ClickEvent::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new ClickCountAggregator())
.addSink(new RedisSink());
Key concerns:
- State management: Maintain window state efficiently
- Checkpointing: Recover from failures
- Late events: Handle out-of-order events
2.3 On-Demand Computation
What: Compute features at request time
Tools: Python, SQL, custom functions
Use cases:
- Features that depend on request context
- Rarely-used features (not worth precomputing)
- Complex transformations
Example:
def compute_feature(user_id, request_context):
# Fetch user data
user = get_user(user_id)
# Compute feature based on request
return user.age * request_context.multiplier
Key concerns:
- Latency: Must be fast (<10ms)
- Caching: Cache expensive computations
- Fallback: Handle computation failures
2.4 Transformation Logic
Common transformations:
Aggregations:
-- Rolling window aggregations
SELECT
user_id,
AVG(purchase_amount) OVER (
PARTITION BY user_id
ORDER BY timestamp
RANGE BETWEEN INTERVAL '30' DAY PRECEDING AND CURRENT ROW
) AS avg_purchase_30d
FROM transactions
Cross-entity joins:
-- Join user features with product features
SELECT
u.user_id,
u.avg_purchase_30d,
p.product_category,
p.avg_rating
FROM user_features u
JOIN product_features p ON u.product_id = p.product_id
ML transformations:
# Text embeddings
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
embeddings = model.encode(reviews['text'])
# One-hot encoding
import pandas as pd
df = pd.get_dummies(df, columns=['category'])
3. Offline Feature Store
Purpose: Store historical feature values for training, backfills, and analytics.
3.1 Storage Layer
Options:
Data Lake (Parquet/Iceberg):
- ✅ Cost-effective, flexible schema
- ✅ Columnar storage (efficient queries)
- ❌ Requires query engine (Spark, Presto)
Data Warehouse (BigQuery, Snowflake):
- ✅ Managed, SQL-native
- ✅ Automatic scaling
- ❌ Expensive at scale
Key decisions:
- Partitioning: By date, entity, or both
- Compression: Snappy, Zstd for storage efficiency
- Format: Parquet (columnar), ORC, or Iceberg (ACID)
3.2 Indexing & Partitioning
Partitioning strategies:
Date partitioning:
s3://features/
date=2024-01-15/
user_id=123/
features.parquet
Entity partitioning:
s3://features/
user_id=123/
date=2024-01-15/
features.parquet
Benefits:
- Faster queries (partition pruning)
- Easier data lifecycle management (delete old partitions)
3.3 Query Engine
Key operations:
ASOF join (point-in-time join):
-- For each event, get features as of that timestamp
SELECT
e.event_id,
e.timestamp AS event_timestamp,
f.feature_timestamp,
f.feature_1,
f.feature_2
FROM events e
ASOF JOIN features f
ON e.user_id = f.user_id
AND f.feature_timestamp <= e.timestamp
ORDER BY f.feature_timestamp DESC
LIMIT 1
Range queries:
-- Get all features for user in last 90 days
SELECT *
FROM features
WHERE user_id = '123'
AND timestamp >= current_date() - 90
Historical scans:
-- Scan all features for training
SELECT *
FROM features
WHERE date BETWEEN '2023-01-01' AND '2024-01-01'
3.4 Materialization
Purpose: Export latest feature snapshot for online store
Pattern:
-- Get latest feature per entity
SELECT
user_id,
feature_1,
feature_2,
timestamp
FROM (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY timestamp DESC) as rn
FROM features
) t
WHERE rn = 1
Materialization job:
# Daily job to sync offline → online
def materialize_features():
# Read latest snapshot from offline store
df = spark.read.parquet("s3://features/latest/")
# Write to Redis
for row in df.collect():
redis.hset(
f"features:{row.user_id}",
mapping={
"feature_1": row.feature_1,
"feature_2": row.feature_2,
"timestamp": row.timestamp
}
)
Key concerns:
- Incremental: Only materialize changed features
- Conflict resolution: Handle concurrent updates
- Backfill: Support historical materialization
4. Online Feature Store
Purpose: Serve latest feature values with low latency (<10ms) for real-time inference.
4.1 Storage Layer
Options:
Key-Value Store (Redis, DynamoDB):
- ✅ Sub-millisecond reads
- ✅ TTL support (auto-expire stale data)
- ❌ Memory-bounded (expensive at scale)
Specialized Feature Store (Feast, Tecton):
- ✅ Purpose-built for features
- ✅ Built-in consistency, versioning
- ❌ Additional infrastructure
Schema:
Key: "features:{entity_id}"
Value: {
"feature_1": 0.87,
"feature_2": 42,
"timestamp": "2024-01-15T10:30:00Z"
}
4.2 Caching Layer
Multi-tier caching:
L1 Cache (In-memory):
- Local to application server
- <1ms latency
- Limited size (per-instance)
L2 Cache (Distributed):
- Redis Cluster, Memcached
- 1-5ms latency
- Shared across instances
Pattern:
def get_features(entity_id):
# Try L1 cache first
if entity_id in local_cache:
return local_cache[entity_id]
# Try L2 cache
features = redis.hgetall(f"features:{entity_id}")
if features:
local_cache[entity_id] = features # Populate L1
return features
# Fallback: compute on-demand
features = compute_features(entity_id)
redis.hset(f"features:{entity_id}", mapping=features)
return features
4.3 Lookup Engine
Point lookup:
# Single entity
features = redis.hgetall("features:user_123")
# Latency: 1-5ms
Batch lookup:
# Multiple entities
entity_ids = ["user_123", "user_456", "user_789"]
pipe = redis.pipeline()
for eid in entity_ids:
pipe.hgetall(f"features:{eid}")
results = pipe.execute()
# Latency: 5-20ms (vs 15-60ms for sequential)
4.4 Consistency
Eventual consistency (most common):
- Online store may lag behind offline by minutes/hours
- Acceptable for most ML use cases
- Simpler, cheaper
Strong consistency (optional):
- Synchronous writes to online store
- Required for fraud detection, real-time bidding
- More complex, expensive
Versioning:
Key: "features:user_123:v2"
Value: {
"feature_1": 0.87,
"model_version": "v2.3"
}
5. Materialization Job
Purpose: Sync features from offline store to online store.
5.1 Incremental Materialization
Pattern:
def incremental_materialize():
# Get features updated since last run
last_run = get_last_run_timestamp()
new_features = spark.sql(f"""
SELECT * FROM features
WHERE updated_at > '{last_run}'
""")
# Write to online store
for row in new_features.collect():
redis.hset(
f"features:{row.entity_id}",
mapping=row.asDict()
)
# Update last run timestamp
save_last_run_timestamp(now())
Benefits:
- Only process changed data
- Faster, cheaper than full refresh
5.2 Full Refresh
Pattern:
def full_refresh():
# Read latest snapshot
features = spark.read.parquet("s3://features/latest/")
# Write to online store
for row in features.collect():
redis.hset(
f"features:{row.entity_id}",
mapping=row.asDict()
)
Use cases:
- Schema changes
- Backfill historical data
- Fix data quality issues
5.3 Conflict Resolution
Problem: Concurrent updates to same entity
Solutions:
Last-write-wins:
# Simple: latest timestamp wins
if new_timestamp > existing_timestamp:
redis.hset(key, mapping=new_features)
Version vectors:
# Track update history
version = increment_version(entity_id)
redis.hset(key, mapping={**features, "version": version})
Merge strategies:
# Merge features from multiple sources
def merge_features(existing, new):
return {
"feature_1": new.get("feature_1", existing["feature_1"]),
"feature_2": max(existing["feature_2"], new["feature_2"])
}
6. Model Training & Evaluation Pipeline
Purpose: Transform features into trained, validated models ready for deployment.
6.1 Training Data Assembly
Point-in-time join:
-- Join events with features as of event time
SELECT
e.event_id,
e.user_id,
e.timestamp AS event_time,
f.feature_1,
f.feature_2,
e.label
FROM events e
ASOF JOIN features f
ON e.user_id = f.user_id
AND f.timestamp <= e.timestamp
WHERE f.timestamp IS NOT NULL
Key concerns:
- Temporal correctness: Feature timestamp ≤ event timestamp
- Label leakage: Ensure labels are truly future events
- Train/validation/test split: Time-based, not random
Time-based split:
# Split by time, not random
train = df[df.timestamp < "2023-01-01"]
val = df[(df.timestamp >= "2023-01-01") & (df.timestamp < "2023-06-01")]
test = df[df.timestamp >= "2023-06-01"]
6.2 Feature Preprocessing
Transformations:
// Spark MLlib pipeline
val imputer = new Imputer()
.setInputCols(Array("feature_1", "feature_2"))
.setOutputCols(Array("feature_1_imp", "feature_2_imp"))
.setStrategy("median")
val scaler = new StandardScaler()
.setInputCol("feature_1_imp")
.setOutputCol("feature_1_scaled")
.setWithMean(true)
.setWithStd(true)
val pipeline = new Pipeline()
.setStages(Array(imputer, scaler, classifier))
val model = pipeline.fit(trainDF)
Key concerns:
- Training-serving skew: Preprocessing must be identical in training and serving
- Fit on train only: Scalers, encoders fit on training data only
- Persist artifacts: Save preprocessing pipeline with model
6.3 Model Training
Distributed training (Spark MLlib):
val classifier = new RandomForestClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setNumTrees(100)
val model = classifier.fit(trainDF)
model.write.overwrite().save("s3://models/random_forest_v1")
Hyperparameter tuning:
val paramGrid = new ParamGridBuilder()
.addGrid(classifier.numTrees, Array(50, 100, 200))
.addGrid(classifier.maxDepth, Array(5, 10, 15))
.build()
val crossval = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new BinaryClassificationEvaluator())
.setEstimatorParamMaps(paramGrid)
.setNumFolds(3)
val cvModel = crossval.fit(trainDF)
6.4 Model Evaluation
Metrics:
val predictions = model.transform(testDF)
val evaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("prediction")
.setMetricName("areaUnderROC")
val auc = evaluator.evaluate(predictions)
println(s"AUC: $auc")
Additional checks:
- Baseline comparison: New model vs. current production model
- Fairness: Performance across demographic segments
- Latency profiling: Inference time on production hardware
6.5 Model Registry
Components:
- Artifact storage: Model files, preprocessing pipeline
- Versioning: Track model versions, features, data
- Stage management: Development → Staging → Production
- Approval workflows: Human review before deployment
Example:
import mlflow
with mlflow.start_run():
mlflow.log_param("num_trees", 100)
mlflow.log_metric("auc", 0.87)
mlflow.spark.log_model(model, "model")
# Register model
mlflow.register_model(
model_uri="runs:/<run_id>/model",
name="fraud_detection_model"
)
7. Real-time Model Serving Pipeline
Purpose: Serve predictions with low latency (<50ms) for real-time applications.
7.1 Request Router
Pattern:
@app.post("/predict")
async def predict(request: PredictionRequest):
# Route to appropriate model
if request.model_type == "fraud":
return await fraud_model.predict(request)
elif request.model_type == "recommendation":
return await rec_model.predict(request)
Key concerns:
- Load balancing: Distribute requests across instances
- Circuit breakers: Fail fast if downstream is down
- Request validation: Validate input schema
7.2 Feature Retrieval
Pattern:
async def get_features(entity_id: str) -> dict:
# Fetch from online store
features = await redis.hgetall(f"features:{entity_id}")
if not features:
# Fallback: compute on-demand
features = await compute_features(entity_id)
return features
Latency budget:
- Feature retrieval: 5ms
- Preprocessing: 3ms
- Model inference: 10ms
- Total: ~18ms
7.3 Model Inference Engine
Options:
TensorFlow Serving:
tensorflow_model_server \
--rest_api_port=8501 \
--model_name=fraud_model \
--model_base_path=s3://models/fraud_model
TorchServe:
torchserve --start --model-store model_store --models fraud_model= fraud_model.mar
Triton Inference Server:
tritonserver --model-repository=/models
Pattern:
# gRPC call to model server
response = stub.Predict(
request=PredictRequest(
model_spec=ModelSpec(name="fraud_model"),
inputs={"features": feature_vector}
),
timeout=0.01 # 10ms timeout
)
7.4 Prediction Response
Format:
{
"prediction": 0.87,
"model_version": "v2.3",
"feature_timestamp": "2024-01-15T10:30:00Z",
"latency_ms": 23,
"confidence_interval": [0.82, 0.92]
}
Optional: Include explanation (SHAP, LIME)
{
"prediction": 0.87,
"explanation": {
"feature_1": 0.35,
"feature_2": -0.12
}
}
7.5 Serving Log
Pattern:
async def log_prediction(request, response):
log_entry = {
"entity_id": request.entity_id,
"features": request.features,
"prediction": response.prediction,
"model_version": response.model_version,
"timestamp": datetime.utcnow().isoformat()
}
# Publish to Kafka
await kafka_producer.send("predictions", log_entry)
Use cases:
- Drift monitoring
- Model retraining
- A/B testing analysis
8. Metric Aggregation Pipeline (Drift Detection)
Purpose: Continuously monitor feature distributions and detect drift between training and serving.
8.1 Distribution Snapshot Collection
Training snapshot:
# Compute once during training
training_stats = {
"feature_1": {
"histogram": np.histogram(train_df["feature_1"], bins=50),
"mean": train_df["feature_1"].mean(),
"std": train_df["feature_1"].std(),
"null_rate": train_df["feature_1"].isnull().mean()
}
}
# Save with model
mlflow.log_dict(training_stats, "training_stats.json")
Serving snapshot:
# Compute continuously from serving logs
def compute_serving_stats(logs_df, window="1h"):
return {
"feature_1": {
"histogram": np.histogram(logs_df["feature_1"], bins=50),
"mean": logs_df["feature_1"].mean(),
"std": logs_df["feature_1"].std(),
"null_rate": logs_df["feature_1"].isnull().mean()
}
}
8.2 Metric Computation
PSI (Population Stability Index):
def compute_psi(expected, actual, bins=10):
# Bin features into quantiles from training
breakpoints = np.quantile(expected, np.linspace(0, 1, bins+1))
expected_counts = np.histogram(expected, bins=breakpoints)[0]
actual_counts = np.histogram(actual, bins=breakpoints)[0]
expected_pct = expected_counts / len(expected)
actual_pct = actual_counts / len(actual)
# Avoid division by zero
epsilon = 1e-4
psi = np.sum((actual_pct - expected_pct) * np.log((actual_pct + epsilon) / (expected_pct + epsilon)))
return psi
# Interpretation
# PSI < 0.1: No significant drift
# 0.1 ≤ PSI < 0.25: Moderate drift
# PSI ≥ 0.25: Significant drift
KS Statistic:
from scipy.stats import ks_2samp
def compute_ks(expected, actual):
stat, p_value = ks_2samp(expected, actual)
return stat, p_value
# KS stat > 0.2 + p-value < 0.01 indicates drift
Mean/Std Shift:
def compute_mean_shift(expected_mean, expected_std, actual_mean):
z_score = (actual_mean - expected_mean) / expected_std
return z_score
# |z_score| > 2 indicates significant shift
8.3 Aggregation Pipeline
Batch mode (daily):
# Daily Spark job
def daily_drift_check():
# Load serving logs from last 24 hours
logs_df = spark.read.parquet("s3://logs/predictions/date=2024-01-15/")
# Load training snapshot
training_stats = load_training_stats("s3://models/v2.3/training_stats.json")
# Compute drift metrics
for feature in features:
actual = logs_df.select(feature).rdd.map(lambda r: r[0]).collect()
expected = training_stats[feature]["histogram"]
psi = compute_psi(expected, actual)
ks_stat, ks_pvalue = compute_ks(expected, actual)
# Store metrics
save_drift_metric(feature, psi, ks_stat, ks_pvalue)
Streaming mode (real-time):
// Flink job
DataStream<PredictionLog> logs = ...;
logs
.keyBy(log -> log.getModelVersion())
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.apply(new DriftDetectionFunction())
.addSink(new AlertSink());
8.4 Alerting & Thresholds
Thresholds:
def check_alerts(feature, psi, ks_stat, mean_shift):
alerts = []
if psi > 0.25:
alerts.append(Alert(
severity="critical",
message=f"Significant PSI drift in {feature}: {psi:.3f}"
))
elif psi > 0.1:
alerts.append(Alert(
severity="warning",
message=f"Moderate PSI drift in {feature}: {psi:.3f}"
))
if ks_stat > 0.2 and p_value < 0.01:
alerts.append(Alert(
severity="warning",
message=f"KS drift in {feature}: stat={ks_stat:.3f}, p={p_value:.3f}"
))
if abs(mean_shift) > 2:
alerts.append(Alert(
severity="warning",
message=f"Mean shift in {feature}: z-score={mean_shift:.2f}"
))
return alerts
Alert fatigue prevention:
- Require sustained drift (e.g., PSI > 0.25 for 3 consecutive hours)
- Segment alerts by feature priority
- Deduplicate similar alerts
8.5 Root Cause Analysis
Feature-level drill-down:
# Which features drifted?
drifted_features = [f for f in features if psi[f] > 0.25]
# Visualize distributions
for feature in drifted_features:
plot_distribution(training_stats[feature], serving_stats[feature])
Entity-level drill-down:
# Which entities changed?
entity_drift = logs_df.groupby("entity_id").agg(
avg_feature_1=avg("feature_1")
)
# Compare to training
entity_drift = entity_drift.join(training_entity_stats, "entity_id")
entity_drift["shift"] = entity_drift["avg_feature_1"] - entity_drift["training_mean"]
Time-series visualization:
# When did drift start?
hourly_psi = logs_df.groupby("hour").apply(
lambda df: compute_psi(training_stats, df)
)
plot(hourly_psi)
8.6 Retraining Trigger
Pattern:
def should_retrain(feature_drift_metrics):
# Require sustained drift
if all(m["psi"] > 0.25 for m in feature_drift_metrics[-3:]):
return True
# Or significant performance degradation
if current_auc < baseline_auc - 0.05:
return True
return False
if should_retrain(drift_metrics):
trigger_retraining_job()
Key concerns:
- Don’t auto-retrain on every spike
- Require human approval for production models
- Validate retrained model before deployment
9. Data Quality & Monitoring
Purpose: Ensure data and features are correct, fresh, and reliable.
9.1 Schema Validation
Checks:
from great_expectations.core import ExpectationSuite
suite = ExpectationSuite("feature_validation")
suite.add_expectation(
ExpectColumnValuesToBeBetween("feature_1", min_value=0, max_value=100)
)
suite.add_expectation(
ExpectColumnValuesToNotBeNull("feature_1", mostly=0.95) # < 5% nulls
)
suite.add_expectation(
ExpectColumnUniqueValueCountToBeBetween("feature_2", min_value=10, max_value=1000)
)
# Validate
results = suite.validate(features_df)
if not results["success"]:
send_alert("Schema validation failed")
9.2 Freshness & Latency Monitoring
Metrics:
def check_freshness(feature_name):
last_update = get_last_update_timestamp(feature_name)
age_hours = (now() - last_update).total_seconds() / 3600
if age_hours > 6:
send_alert(f"Feature {feature_name} is {age_hours:.1f} hours old")
elif age_hours > 2:
send_warning(f"Feature {feature_name} is {age_hours:.1f} hours old")
Pipeline latency:
def measure_latency():
start = time.time()
run_feature_pipeline()
latency = time.time() - start
log_metric("pipeline_latency_seconds", latency)
if latency > 3600: # > 1 hour
send_alert("Pipeline latency exceeded SLA")
9.3 Observability Dashboard
Components:
Feature distribution plots:
# Grafana panel
SELECT
feature_name,
histogram(feature_value, 50) AS distribution
FROM features
WHERE timestamp > now() - interval '1' hour
GROUP BY feature_name
Drift time-series:
# Grafana panel
SELECT
timestamp,
feature_name,
psi_value
FROM drift_metrics
WHERE timestamp > now() - interval '7' day
ORDER BY timestamp
Pipeline run status:
# Datadog dashboard
- Pipeline runs (success/failure)
- Run duration
- Row counts
- Error rates
10. Key Design Decisions & Tradeoffs
10.1 Batch vs. Streaming
| Aspect | Batch | Streaming |
|---|---|---|
| Latency | Hours/days | Seconds/minutes |
| Complexity | Low | High |
| Cost | Low | High |
| Debugging | Easy | Hard |
| Use case | Daily aggregations | Real-time features |
Recommendation: Start with batch, add streaming only for time-critical features.
10.2 Centralized vs. Decentralized
| Aspect | Centralized | Decentralized |
|---|---|---|
| Consistency | High | Low |
| Autonomy | Low | High |
| Duplication | Low | High |
| Bottleneck | Yes | No |
Recommendation: Centralized platform, decentralized ownership.
10.3 Push vs. Pull
| Aspect | Push | Pull |
|---|---|---|
| Serving latency | Low | High |
| Freshness | Stale | Fresh |
| Complexity | Medium | Low |
Recommendation: Push for most features, pull for rarely-used features.
11. Common Pitfalls
- Training-serving skew: Features differ between training and inference
- Solution: Share preprocessing code between training and serving
- Point-in-time leakage: Future data leaks into training features
- Solution: Use ASOF joins with strict temporal filtering
- Feature duplication: Multiple teams compute same feature differently
- Solution: Centralized feature registry with ownership
- Stale features: Online store not updated, serving old data
- Solution: Freshness monitoring, TTL on cached features
- No monitoring: Data quality issues silently corrupt models
- Solution: Automated drift detection, schema validation, alerting
- Over-engineering: Building complex streaming pipeline when batch suffices
- Solution: Start simple, add complexity only when needed
12. Implementation Roadmap
Phase 1: Foundation (1-2 months)
- Set up batch ingestion (Airflow + Spark)
- Build offline feature store (Parquet on S3)
- Implement basic feature computation (daily aggregations)
- Create feature registry (simple metadata DB)
Phase 2: Serving (2-3 months)
- Build online feature store (Redis)
- Implement materialization job (offline → online)
- Create serving API (REST/gRPC)
- Add basic monitoring (freshness, null rates)
Phase 3: Training Pipeline (2-3 months)
- Build training data assembly (ASOF joins, point-in-time correctness)
- Implement feature preprocessing pipeline (imputation, scaling, encoding)
- Set up model training jobs (Spark MLlib or PyTorch/TF)
- Create model evaluation framework (AUC, F1, fairness checks)
- Deploy model registry (MLflow or custom)
Phase 4: Real-time Serving (2-3 months)
- Deploy model serving infrastructure (Triton, TorchServe, or managed)
- Build request router and API gateway
- Implement feature retrieval from online store
- Add prediction logging for monitoring
- Load test for latency and throughput
Phase 5: Monitoring & Drift Detection (1-2 months)
- Implement drift detection pipeline (PSI, KS, mean shift)
- Set up alerting (PagerDuty, Slack)
- Build observability dashboard (Grafana, Datadog)
- Create automated retraining triggers
- Add schema validation and freshness monitoring
Phase 6: Optimization & Scale (ongoing)
- Optimize query performance (partitioning, indexing)
- Add caching layers (L1/L2 cache)
- Implement feature versioning and A/B testing
- Scale infrastructure (horizontal scaling, sharding)
- Add advanced monitoring (root cause analysis, impact tracking)
13. Technology Stack Recommendations
For Small Teams (< 10 engineers, < 100 features)
Ingestion: Airflow + custom Python scripts
Computation: Pandas, dbt, or simple Spark jobs
Offline Store: Parquet files on S3, queried with DuckDB or Athena
Online Store: Redis or DynamoDB
Serving: FastAPI or Flask REST API
Monitoring: Prometheus + Grafana, custom drift scripts
Model Registry: MLflow (open source)
Estimated cost: $500-2000/month (cloud infrastructure)
For Medium Teams (10-50 engineers, 100-1000 features)
Ingestion: Airflow + Kafka for streaming
Computation: Spark (Scala or PySpark), Flink for real-time
Offline Store: Data lake (Iceberg/Delta Lake) or data warehouse (BigQuery, Snowflake)
Online Store: Redis Cluster or managed feature store (Feast)
Serving: gRPC with TensorFlow Serving or TorchServe
Monitoring: Datadog or custom Grafana dashboards
Model Registry: MLflow or SageMaker Model Registry
Estimated cost: $5000-20000/month
For Large Teams (50+ engineers, 1000+ features)
Ingestion: Kafka + Flink + Airflow orchestration
Computation: Distributed Spark clusters, Flink for streaming
Offline Store: Data lakehouse (Iceberg + Spark) with partitioning and indexing
Online Store: Managed feature store (Tecton, Feast) with caching layers
Serving: Triton Inference Server or custom gRPC services
Monitoring: Custom observability platform with automated drift detection
Model Registry: Custom registry with approval workflows
Estimated cost: $50000+/month
14. Real-world Examples
Example 1: E-commerce Recommendation System
Features:
- User features: avg_purchase_30d, browsing_history, cart_value
- Product features: avg_rating, view_count, category
- Cross features: user_product_affinity, purchase_frequency
Architecture:
Clickstream events → Kafka → Flink (real-time features)
↓
Database (products) → Spark (batch features) → Offline Store (Iceberg)
↓
Materialization → Redis
↓
Serving API → Model → Recommendation
Scale: 10M users, 1M products, 500 features, 1000 req/sec
Latency: <50ms end-to-end
Example 2: Fraud Detection System
Features:
- Transaction features: amount, merchant_category, time_of_day
- User features: avg_transaction_7d, velocity_1h, device_fingerprint
- Graph features: connected_fraudulent_accounts, community_score
Architecture:
Transaction events → Kafka → Flink (real-time features)
↓
Historical data → Spark (batch features) → Offline Store (Parquet)
↓
Materialization → Redis
↓
Serving API → Model → Fraud Score (<20ms)
Scale: 50M transactions/day, 10M users, 300 features, 5000 req/sec
Latency: <20ms (critical for real-time decisions)
Drift monitoring: Hourly PSI checks, automatic retraining if PSI > 0.25 for 3 hours
Example 3: Content Ranking System
Features:
- User features: engagement_rate, content_preferences, session_duration
- Content features: freshness, popularity, category, author_reputation
- Context features: time_of_day, device_type, location
Architecture:
User interactions → Kafka → Flink (real-time engagement features)
↓
Content metadata → Spark (batch content features) → Offline Store (BigQuery)
↓
Materialization → Redis
↓
Serving API → Model → Ranking Score
Scale: 100M users, 10M content items, 800 features, 10000 req/sec
Latency: <100ms
15. Performance Benchmarks
Offline Store Query Performance
| Storage | Query Type | Latency | Throughput |
|---|---|---|---|
| Parquet (S3) | Full scan (1TB) | 5-10 min | 100 GB/min |
| Parquet (S3) | Partitioned scan (10GB) | 30-60 sec | 1 GB/sec |
| Iceberg | ASOF join (100M rows) | 2-5 min | 20M rows/min |
| BigQuery | Point-in-time join (1B rows) | 1-3 min | 300M rows/min |
| Snowflake | Historical scan (1TB) | 30-60 sec | 20 GB/sec |
Online Store Lookup Performance
| Storage | Point Lookup | Batch Lookup (100 entities) |
|---|---|---|
| Redis | 0.5-2ms | 5-20ms |
| DynamoDB | 2-5ms | 10-30ms |
| Feast (Redis) | 1-3ms | 8-25ms |
| Tecton | 2-5ms | 10-35ms |
Model Serving Performance
| Framework | Latency (p50) | Latency (p99) | Throughput |
|---|---|---|---|
| TensorFlow Serving | 5-10ms | 15-30ms | 1000 req/sec/GPU |
| TorchServe | 3-8ms | 10-25ms | 1200 req/sec/GPU |
| Triton (GPU) | 2-5ms | 8-20ms | 2000 req/sec/GPU |
| Triton (CPU) | 10-20ms | 30-50ms | 200 req/sec/core |
16. Cost Optimization Strategies
1. Tiered Storage
Hot data (last 7 days): Keep in fast storage (SSD, Redis)
Warm data (7-90 days): Move to cheaper storage (HDD, S3 Standard)
Cold data (90+ days): Archive to cheapest storage (S3 Glacier)
Savings: 50-70% storage cost reduction
2. Incremental Computation
Instead of: Recompute all features daily (expensive)
Do: Only recompute features for entities with new data
Savings: 60-80% compute cost reduction
3. Feature Caching
Pattern: Cache frequently-accessed features in L1/L2 cache
Benefit: Reduce online store reads by 80-90%
Savings: 40-60% Redis/DynamoDB cost reduction
4. Spot Instances for Batch Jobs
Use case: Training data assembly, batch feature computation
Savings: 60-90% compute cost (vs. on-demand)
Risk: Interruptions (mitigate with checkpointing)
5. Right-sizing Infrastructure
Monitor: CPU, memory, network utilization
Action: Downsize over-provisioned instances
Savings: 20-40% infrastructure cost
17. Security & Compliance
Data Privacy
PII handling:
- Encrypt PII at rest and in transit
- Mask PII in logs and monitoring
- Implement data retention policies (auto-delete after N days)
Access control:
- Role-based access control (RBAC)
- Feature-level permissions (who can read/write which features)
- Audit logs for all data access
Compliance
GDPR:
- Right to deletion: Implement feature deletion pipeline
- Data portability: Export features in standard formats
- Consent management: Track user consent for feature computation
SOC 2:
- Encryption at rest and in transit
- Audit logging
- Incident response procedures
HIPAA (if applicable):
- PHI handling: Separate infrastructure for health data
- Business Associate Agreements (BAAs) with cloud providers
18. Future Trends
1. Unified Batch and Streaming
Trend: Frameworks that handle both batch and streaming with same API
Examples: Apache Beam, Flink unified API
Benefit: Simpler codebase, easier maintenance
2. Feature Stores as a Service
Trend: Managed feature stores (Tecton, Feast, SageMaker Feature Store)
Benefit: Less infrastructure management, faster time-to-market
Tradeoff: Vendor lock-in, less customization
3. Real-time Feature Engineering
Trend: Compute features at request time with sub-millisecond latency
Examples: On-demand feature computation, edge computing
Benefit: Fresher features, lower storage costs
4. Automated ML Pipelines
Trend: End-to-end automation from data to deployment
Examples: Auto-retraining, auto-scaling, auto-monitoring
Benefit: Less manual intervention, faster iteration
5. Graph-based Feature Stores
Trend: Store features as graphs for relationship-based ML
Examples: User-product graphs, social networks, knowledge graphs
Benefit: Capture complex relationships, better recommendations
19. Conclusion
Building a production ML system is a complex engineering challenge that goes far beyond model training. The data infrastructure—ingestion, computation, storage, serving, and monitoring—often takes 80% of the effort.
Key takeaways:
- Start simple: Begin with batch processing, add streaming only when needed
- Separate offline and online: Different requirements, different storage
- Ensure consistency: Training-serving skew is the #1 cause of production failures
- Monitor everything: Drift detection, freshness, schema validation
- Plan for scale: Partitioning, caching, incremental computation
- Govern features: Registry, ownership, lineage tracking
Common mistakes to avoid:
- Over-engineering (building streaming when batch suffices)
- Ignoring monitoring (data quality issues silently corrupt models)
- Training-serving skew (different preprocessing in training vs. serving)
- Feature duplication (multiple teams computing same feature)
- No point-in-time correctness (future data leaks into training)
Next steps:
- Assess your current state (what exists, what’s missing)
- Define requirements (latency, scale, features)
- Choose technology stack (based on team size, budget, expertise)
- Build incrementally (Phase 1 → Phase 6)
- Monitor and iterate (continuous improvement)
The best ML system is one that delivers value quickly and scales gracefully. Don’t try to build the perfect system on day one—start with the basics, learn from production, and evolve.
20. Additional Resources
Books
- “Designing Data-Intensive Applications” by Martin Kleppmann
- “Machine Learning Systems Design” by Chip Huyen
- “Designing Machine Learning Systems” by Chip Huyen
Tools & Frameworks
- Feature Stores: Feast, Tecton, Hopsworks, SageMaker Feature Store
- Orchestration: Airflow, Prefect, Dagster
- Stream Processing: Flink, Kafka Streams, Spark Structured Streaming
- Model Serving: TensorFlow Serving, TorchServe, Triton, BentoML
- Monitoring: Evidently AI, WhyLabs, Arize, Fiddler
Courses
- “Machine Learning Systems Design” on Educative
- “Data Engineering” on DataCamp
- “Apache Flink” on Udemy
Communities
- r/MachineLearning on Reddit
- MLOps Community Slack
- Feature Store Community Slack
About the Author: This guide is a comprehensive overview of ML system design, synthesized from real-world experience building production systems at scale. The patterns and tradeoffs discussed here are battle-tested across e-commerce, fintech, and content platforms.
Last Updated: June 2026
Feedback: Found this helpful? Have suggestions for improvement? Let me know!
If you found this guide useful, share it with your team. Building great ML systems is a team effort, and everyone should understand the architecture.