MLOps Part 2: Model Monitoring and Observability
Learn how to monitor machine learning models in production, detect drift, track performance metrics, and build observability into your ML systems
Introduction to ML Monitoring
Once your model is deployed, the real work begins: ensuring it continues to perform well over time. Production ML models face unique challenges that traditional software doesn't encounter.
What You'll Learn
In this module, you'll understand:
- Drift detection: Identifying when data or model behavior changes
- Performance monitoring: Tracking model accuracy and quality
- Observability: Building visibility into ML systems
- Alerting: Setting up notifications for issues
Why Monitoring Matters
Models degrade over time due to:
- Data drift: Input data distribution changes
- Concept drift: Relationships between features and targets change
- Model staleness: Model doesn't adapt to new patterns
- Data quality issues: Missing values, outliers, corrupt data
Without monitoring, you won't know when your model fails!
---PAGE---
Understanding Data Drift
Data drift occurs when the statistical properties of input features change over time, causing model performance to degrade.
Types of Data Drift
1. Covariate Shift
The distribution of input features (X) changes, but the relationship between features and target (P(Y|X)) remains the same.
Example:
Training data: Ages 25-45 (e-commerce shoppers)
Production data: Ages 18-25 (new demographic)
Result: Model sees different age distribution
2. Prior Probability Shift
The distribution of target variable (Y) changes, but P(X|Y) stays the same.
Example:
Training: 90% approved loans, 10% rejected
Production: 70% approved, 30% rejected (economic downturn)
Result: Class imbalance shifted
3. Concept Drift
The relationship between features and target P(Y|X) changes.
Example:
Training: "luxury car" = expensive
Production: Electric vehicles changed what "luxury" means
Result: Feature meanings evolved
Detecting Data Drift
Statistical Tests
Kolmogorov-Smirnov Test (continuous features):
from scipy.stats import ks_2samp
import numpy as np
# Training data distribution
training_ages = np.array([25, 30, 35, 40, 45, 50])
# Production data distribution
production_ages = np.array([18, 20, 22, 25, 28, 30])
# Perform KS test
statistic, p_value = ks_2samp(training_ages, production_ages)
if p_value < 0.05:
print(f"⚠️ Data drift detected! p-value: {p_value:.4f}")
else:
print(f"✓ No significant drift. p-value: {p_value:.4f}")
Chi-Square Test (categorical features):
from scipy.stats import chisquare
# Training data: category frequencies
training_categories = [100, 200, 150, 50] # 4 categories
# Production data: category frequencies
production_categories = [80, 180, 200, 40]
# Perform chi-square test
statistic, p_value = chisquare(production_categories, training_categories)
if p_value < 0.05:
print(f"⚠️ Category distribution changed! p-value: {p_value:.4f}")
else:
print(f"✓ Category distribution stable. p-value: {p_value:.4f}")
Population Stability Index (PSI)
PSI measures drift in feature distributions:
import numpy as np
def calculate_psi(expected, actual, bins=10):
"""
Calculate Population Stability Index
PSI < 0.1: No significant change
0.1 <= PSI < 0.2: Moderate change
PSI >= 0.2: Significant change
"""
# Create bins
breakpoints = np.percentile(expected, np.linspace(0, 100, bins + 1))
# Calculate frequencies
expected_percents = np.histogram(expected, breakpoints)[0] / len(expected)
actual_percents = np.histogram(actual, breakpoints)[0] / len(actual)
# Avoid division by zero
expected_percents = np.where(expected_percents == 0, 0.0001, expected_percents)
actual_percents = np.where(actual_percents == 0, 0.0001, actual_percents)
# Calculate PSI
psi = np.sum((actual_percents - expected_percents) *
np.log(actual_percents / expected_percents))
return psi
# Example usage
training_feature = np.random.normal(100, 15, 1000)
production_feature = np.random.normal(110, 20, 1000) # Drifted
psi_score = calculate_psi(training_feature, production_feature)
print(f"PSI Score: {psi_score:.4f}")
if psi_score < 0.1:
print("✓ No significant drift")
elif psi_score < 0.2:
print("⚠️ Moderate drift detected")
else:
print("🚨 Significant drift! Investigate immediately")
---PAGE---
Model Performance Monitoring
Tracking model performance in production requires collecting ground truth labels and comparing predictions.
Key Metrics to Monitor
Classification Metrics
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import numpy as np
def log_classification_metrics(y_true, y_pred, timestamp):
"""Log classification metrics to monitoring system"""
metrics = {
'timestamp': timestamp,
'accuracy': accuracy_score(y_true, y_pred),
'precision': precision_score(y_true, y_pred, average='weighted'),
'recall': recall_score(y_true, y_pred, average='weighted'),
'f1_score': f1_score(y_true, y_pred, average='weighted')
}
# Log to monitoring system (Prometheus, CloudWatch, etc.)
for metric_name, value in metrics.items():
print(f"{metric_name}: {value:.4f}")
return metrics
# Example usage
y_true = np.array([1, 0, 1, 1, 0, 1, 0, 0])
y_pred = np.array([1, 0, 1, 0, 0, 1, 1, 0])
metrics = log_classification_metrics(y_true, y_pred, '2025-10-20')
Regression Metrics
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
def log_regression_metrics(y_true, y_pred, timestamp):
"""Log regression metrics"""
metrics = {
'timestamp': timestamp,
'mse': mean_squared_error(y_true, y_pred),
'rmse': np.sqrt(mean_squared_error(y_true, y_pred)),
'mae': mean_absolute_error(y_true, y_pred),
'r2': r2_score(y_true, y_pred)
}
for metric_name, value in metrics.items():
print(f"{metric_name}: {value:.4f}")
return metrics
Delayed Ground Truth
Many ML systems have delayed feedback:
import time
from datetime import datetime, timedelta
class PredictionStore:
"""Store predictions and match with ground truth later"""
def __init__(self):
self.predictions = {}
def log_prediction(self, prediction_id, features, prediction):
"""Store prediction for later evaluation"""
self.predictions[prediction_id] = {
'features': features,
'prediction': prediction,
'timestamp': datetime.now(),
'ground_truth': None
}
def update_ground_truth(self, prediction_id, ground_truth):
"""Update with actual outcome when available"""
if prediction_id in self.predictions:
self.predictions[prediction_id]['ground_truth'] = ground_truth
# Calculate metrics when we have both prediction and truth
self._evaluate_prediction(prediction_id)
def _evaluate_prediction(self, prediction_id):
"""Evaluate individual prediction"""
record = self.predictions[prediction_id]
if record['ground_truth'] is not None:
correct = record['prediction'] == record['ground_truth']
latency = (datetime.now() - record['timestamp']).total_seconds()
print(f"Prediction {prediction_id}: {'✓' if correct else '✗'}")
print(f"Feedback latency: {latency:.0f} seconds")
# Example usage
store = PredictionStore()
# Log prediction
store.log_prediction('pred_001', [25, 50000, 'A'], prediction=1)
# Later, when ground truth arrives
time.sleep(2) # Simulate delay
store.update_ground_truth('pred_001', ground_truth=1)
---PAGE---
System Performance Metrics
Beyond model accuracy, monitor system health and performance.
Latency Monitoring
import time
from prometheus_client import Histogram
import numpy as np
# Define Prometheus metric
prediction_latency = Histogram(
'model_prediction_latency_seconds',
'Time spent processing prediction',
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0]
)
def predict_with_monitoring(model, features):
"""Make prediction with latency monitoring"""
start_time = time.time()
try:
prediction = model.predict([features])
latency = time.time() - start_time
# Record latency
prediction_latency.observe(latency)
# Log if slow
if latency > 0.5:
print(f"⚠️ Slow prediction: {latency:.3f}s")
return prediction
except Exception as e:
latency = time.time() - start_time
print(f"❌ Prediction failed after {latency:.3f}s: {e}")
raise
Throughput Monitoring
from prometheus_client import Counter
import time
from collections import deque
# Request counter
prediction_counter = Counter(
'model_predictions_total',
'Total number of predictions',
['model_version', 'status']
)
class ThroughputMonitor:
"""Monitor requests per second"""
def __init__(self, window_seconds=60):
self.window_seconds = window_seconds
self.timestamps = deque()
def record_request(self):
"""Record a new request"""
now = time.time()
self.timestamps.append(now)
# Remove old timestamps outside window
cutoff = now - self.window_seconds
while self.timestamps and self.timestamps[0] < cutoff:
self.timestamps.popleft()
def get_requests_per_second(self):
"""Calculate current RPS"""
if not self.timestamps:
return 0.0
return len(self.timestamps) / self.window_seconds
# Usage
monitor = ThroughputMonitor()
# Simulate requests
for _ in range(100):
monitor.record_request()
time.sleep(0.01)
rps = monitor.get_requests_per_second()
print(f"Current throughput: {rps:.2f} requests/second")
Error Rate Monitoring
from prometheus_client import Counter
# Error counters
prediction_errors = Counter(
'model_prediction_errors_total',
'Total prediction errors',
['error_type']
)
def predict_with_error_tracking(model, features):
"""Track different types of errors"""
try:
# Validate input
if not features or len(features) == 0:
prediction_errors.labels(error_type='invalid_input').inc()
raise ValueError("Empty features")
# Make prediction
prediction = model.predict([features])
# Check for NaN
if np.isnan(prediction).any():
prediction_errors.labels(error_type='nan_prediction').inc()
raise ValueError("Model returned NaN")
return prediction
except ValueError as e:
prediction_errors.labels(error_type='validation_error').inc()
raise
except Exception as e:
prediction_errors.labels(error_type='unknown_error').inc()
raise
---PAGE---
Data Quality Monitoring
Monitor input data quality to detect issues before they affect predictions.
Missing Values Detection
import pandas as pd
import numpy as np
class DataQualityMonitor:
"""Monitor data quality metrics"""
def __init__(self, expected_columns):
self.expected_columns = expected_columns
self.missing_value_threshold = 0.1 # 10%
def check_missing_values(self, df):
"""Check for excessive missing values"""
missing_pct = df.isnull().sum() / len(df)
issues = []
for column, pct in missing_pct.items():
if pct > self.missing_value_threshold:
issues.append({
'column': column,
'missing_pct': pct,
'severity': 'high' if pct > 0.3 else 'medium'
})
print(f"⚠️ {column}: {pct*100:.1f}% missing values")
return issues
def check_column_presence(self, df):
"""Verify all expected columns are present"""
missing_cols = set(self.expected_columns) - set(df.columns)
extra_cols = set(df.columns) - set(self.expected_columns)
if missing_cols:
print(f"❌ Missing columns: {missing_cols}")
if extra_cols:
print(f"⚠️ Extra columns: {extra_cols}")
return len(missing_cols) == 0
def check_data_types(self, df, expected_types):
"""Verify column data types"""
type_mismatches = []
for column, expected_type in expected_types.items():
if column in df.columns:
actual_type = df[column].dtype
if actual_type != expected_type:
type_mismatches.append({
'column': column,
'expected': expected_type,
'actual': actual_type
})
print(f"⚠️ {column}: expected {expected_type}, got {actual_type}")
return type_mismatches
# Example usage
monitor = DataQualityMonitor(['age', 'income', 'score'])
# Simulate production data
data = pd.DataFrame({
'age': [25, np.nan, 35, 40],
'income': [50000, 60000, np.nan, 80000],
'score': [0.8, 0.9, 0.7, np.nan]
})
issues = monitor.check_missing_values(data)
Outlier Detection
from scipy import stats
import numpy as np
def detect_outliers_zscore(data, threshold=3):
"""Detect outliers using Z-score method"""
z_scores = np.abs(stats.zscore(data))
outliers = np.where(z_scores > threshold)[0]
outlier_pct = len(outliers) / len(data) * 100
if outlier_pct > 5: # More than 5% outliers
print(f"⚠️ {outlier_pct:.1f}% outliers detected")
return outliers
def detect_outliers_iqr(data):
"""Detect outliers using Interquartile Range"""
Q1 = np.percentile(data, 25)
Q3 = np.percentile(data, 75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = np.where((data < lower_bound) | (data > upper_bound))[0]
print(f"Valid range: [{lower_bound:.2f}, {upper_bound:.2f}]")
print(f"Outliers found: {len(outliers)}")
return outliers
# Example
ages = np.array([25, 30, 35, 40, 45, 200, 28, 33]) # 200 is outlier
outliers = detect_outliers_iqr(ages)
---PAGE---
Building Monitoring Dashboards
Visualize metrics and create alerts using popular monitoring tools.
Prometheus + Grafana Setup
Step 1: Instrument your code
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from flask import Flask, Response
import time
app = Flask(__name__)
# Define metrics
predictions_total = Counter(
'ml_predictions_total',
'Total predictions made',
['model', 'version']
)
prediction_latency = Histogram(
'ml_prediction_latency_seconds',
'Prediction latency in seconds',
['model']
)
model_accuracy = Gauge(
'ml_model_accuracy',
'Current model accuracy',
['model', 'version']
)
data_drift_score = Gauge(
'ml_data_drift_psi',
'PSI score for data drift',
['feature']
)
@app.route('/predict', methods=['POST'])
def predict():
start = time.time()
# Make prediction
prediction = make_prediction()
# Record metrics
predictions_total.labels(model='fraud_detector', version='v2').inc()
prediction_latency.labels(model='fraud_detector').observe(time.time() - start)
return {'prediction': prediction}
@app.route('/metrics')
def metrics():
"""Expose metrics for Prometheus to scrape"""
return Response(generate_latest(), mimetype='text/plain')
def make_prediction():
"""Dummy prediction function"""
time.sleep(0.1) # Simulate processing
return 1
Step 2: Prometheus configuration
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'ml_service'
static_configs:
- targets: ['localhost:5000']
Step 3: Key Grafana queries
# Prediction rate (requests per second)
rate(ml_predictions_total[5m])
# Average latency
rate(ml_prediction_latency_seconds_sum[5m])
/
rate(ml_prediction_latency_seconds_count[5m])
# 95th percentile latency
histogram_quantile(0.95,
rate(ml_prediction_latency_seconds_bucket[5m])
)
# Model accuracy over time
ml_model_accuracy
# Data drift alert
ml_data_drift_psi > 0.2
CloudWatch Metrics (AWS)
import boto3
from datetime import datetime
cloudwatch = boto3.client('cloudwatch')
def log_metric_to_cloudwatch(metric_name, value, unit='None'):
"""Send custom metric to CloudWatch"""
cloudwatch.put_metric_data(
Namespace='MLOps/Models',
MetricData=[
{
'MetricName': metric_name,
'Value': value,
'Unit': unit,
'Timestamp': datetime.utcnow(),
'Dimensions': [
{
'Name': 'Model',
'Value': 'fraud_detector'
},
{
'Name': 'Version',
'Value': 'v2'
}
]
}
]
)
# Usage
log_metric_to_cloudwatch('PredictionLatency', 0.145, 'Seconds')
log_metric_to_cloudwatch('ModelAccuracy', 0.92, 'Percent')
log_metric_to_cloudwatch('DataDriftPSI', 0.08, 'None')
---PAGE---
Alerting Strategies
Set up alerts to notify you when issues occur.
Alert Rules
Define clear thresholds for different severity levels:
class AlertConfig:
"""Alert thresholds configuration"""
# Latency alerts
LATENCY_WARNING = 0.5 # 500ms
LATENCY_CRITICAL = 2.0 # 2 seconds
# Accuracy alerts
ACCURACY_WARNING = 0.85 # Drop below 85%
ACCURACY_CRITICAL = 0.75 # Drop below 75%
# Data drift alerts
PSI_WARNING = 0.1 # Moderate drift
PSI_CRITICAL = 0.2 # Significant drift
# Error rate alerts
ERROR_RATE_WARNING = 0.01 # 1%
ERROR_RATE_CRITICAL = 0.05 # 5%
# Throughput alerts
MIN_THROUGHPUT = 10 # requests/second
MAX_THROUGHPUT = 1000 # requests/second
class AlertManager:
"""Manage and send alerts"""
def __init__(self):
self.active_alerts = set()
def check_latency(self, latency):
"""Check if latency exceeds thresholds"""
if latency > AlertConfig.LATENCY_CRITICAL:
self.send_alert(
severity='critical',
message=f'Prediction latency critical: {latency:.2f}s',
metric='latency',
value=latency
)
elif latency > AlertConfig.LATENCY_WARNING:
self.send_alert(
severity='warning',
message=f'Prediction latency high: {latency:.2f}s',
metric='latency',
value=latency
)
def check_accuracy(self, accuracy):
"""Check if accuracy drops below thresholds"""
if accuracy < AlertConfig.ACCURACY_CRITICAL:
self.send_alert(
severity='critical',
message=f'Model accuracy critical: {accuracy:.2%}',
metric='accuracy',
value=accuracy
)
elif accuracy < AlertConfig.ACCURACY_WARNING:
self.send_alert(
severity='warning',
message=f'Model accuracy degraded: {accuracy:.2%}',
metric='accuracy',
value=accuracy
)
def check_data_drift(self, psi_score, feature_name):
"""Check for data drift"""
if psi_score > AlertConfig.PSI_CRITICAL:
self.send_alert(
severity='critical',
message=f'Significant drift in {feature_name}: PSI={psi_score:.3f}',
metric='data_drift',
value=psi_score,
feature=feature_name
)
elif psi_score > AlertConfig.PSI_WARNING:
self.send_alert(
severity='warning',
message=f'Moderate drift in {feature_name}: PSI={psi_score:.3f}',
metric='data_drift',
value=psi_score,
feature=feature_name
)
def send_alert(self, severity, message, metric, value, **kwargs):
"""Send alert via multiple channels"""
alert_key = f"{metric}_{severity}"
# Avoid duplicate alerts
if alert_key in self.active_alerts:
return
self.active_alerts.add(alert_key)
print(f"🚨 [{severity.upper()}] {message}")
# Send to different channels based on severity
if severity == 'critical':
self.send_pagerduty(message, **kwargs)
self.send_slack(message, channel='#ml-alerts-critical')
self.send_email(message, recipients=['ml-team@company.com'])
elif severity == 'warning':
self.send_slack(message, channel='#ml-alerts')
def send_slack(self, message, channel='#ml-alerts'):
"""Send Slack notification"""
# Implementation would use Slack API
print(f"📱 Slack -> {channel}: {message}")
def send_email(self, message, recipients):
"""Send email notification"""
# Implementation would use email service
print(f"📧 Email -> {recipients}: {message}")
def send_pagerduty(self, message, **kwargs):
"""Create PagerDuty incident"""
# Implementation would use PagerDuty API
print(f"📟 PagerDuty: {message}")
# Example usage
alert_mgr = AlertManager()
# Simulate monitoring
alert_mgr.check_latency(1.5) # Warning
alert_mgr.check_accuracy(0.72) # Critical
alert_mgr.check_data_drift(0.25, 'age') # Critical
Slack Integration
import requests
import json
def send_slack_alert(webhook_url, alert_data):
"""Send formatted alert to Slack"""
color = {
'critical': '#FF0000',
'warning': '#FFA500',
'info': '#0000FF'
}.get(alert_data['severity'], '#808080')
payload = {
"attachments": [
{
"color": color,
"title": f"ML Model Alert: {alert_data['severity'].upper()}",
"fields": [
{
"title": "Metric",
"value": alert_data['metric'],
"short": True
},
{
"title": "Value",
"value": str(alert_data['value']),
"short": True
},
{
"title": "Message",
"value": alert_data['message'],
"short": False
},
{
"title": "Timestamp",
"value": alert_data['timestamp'],
"short": True
}
]
}
]
}
response = requests.post(
webhook_url,
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
return response.status_code == 200
# Usage
webhook = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
alert = {
'severity': 'critical',
'metric': 'model_accuracy',
'value': 0.72,
'message': 'Model accuracy dropped below critical threshold',
'timestamp': '2025-10-20 14:30:00'
}
send_slack_alert(webhook, alert)
---PAGE---
Complete Monitoring Solution
Putting it all together: a production-ready monitoring system.
Full Monitoring Pipeline
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import time
class MLMonitoringSystem:
"""Complete ML monitoring solution"""
def __init__(self, model, training_data):
self.model = model
self.training_data = training_data
self.alert_manager = AlertManager()
self.prediction_store = PredictionStore()
# Calculate baseline statistics
self.baseline_stats = self._calculate_baseline_stats()
def _calculate_baseline_stats(self):
"""Calculate statistics from training data"""
stats = {}
for column in self.training_data.columns:
if pd.api.types.is_numeric_dtype(self.training_data[column]):
stats[column] = {
'mean': self.training_data[column].mean(),
'std': self.training_data[column].std(),
'min': self.training_data[column].min(),
'max': self.training_data[column].max(),
'distribution': self.training_data[column].values
}
return stats
def predict_with_monitoring(self, features, prediction_id=None):
"""Make prediction with full monitoring"""
start_time = time.time()
try:
# 1. Data quality checks
self._check_data_quality(features)
# 2. Data drift detection
self._check_data_drift(features)
# 3. Make prediction
prediction = self.model.predict([features])[0]
# 4. Log prediction
if prediction_id:
self.prediction_store.log_prediction(
prediction_id, features, prediction
)
# 5. Monitor latency
latency = time.time() - start_time
self.alert_manager.check_latency(latency)
# 6. Log metrics
self._log_metrics({
'latency': latency,
'prediction': prediction,
'timestamp': datetime.now()
})
return prediction
except Exception as e:
print(f"❌ Prediction failed: {e}")
raise
def _check_data_quality(self, features):
"""Check input data quality"""
# Check for NaN values
if any(pd.isna(features)):
raise ValueError("Input contains NaN values")
# Check for infinite values
if any(np.isinf(features)):
raise ValueError("Input contains infinite values")
def _check_data_drift(self, features):
"""Check for data drift in features"""
for i, (feature_name, value) in enumerate(zip(
self.training_data.columns, features
)):
if feature_name in self.baseline_stats:
baseline_dist = self.baseline_stats[feature_name]['distribution']
# Calculate PSI for this feature
psi = calculate_psi(
baseline_dist,
np.array([value] * len(baseline_dist))
)
self.alert_manager.check_data_drift(psi, feature_name)
def _log_metrics(self, metrics):
"""Log metrics to monitoring system"""
# Log to Prometheus, CloudWatch, etc.
print(f"📊 Metrics logged: {metrics}")
def evaluate_batch(self, predictions, ground_truth):
"""Evaluate model performance on batch"""
from sklearn.metrics import accuracy_score, f1_score
accuracy = accuracy_score(ground_truth, predictions)
f1 = f1_score(ground_truth, predictions, average='weighted')
# Check against thresholds
self.alert_manager.check_accuracy(accuracy)
print(f"✓ Batch evaluation - Accuracy: {accuracy:.2%}, F1: {f1:.2%}")
return {
'accuracy': accuracy,
'f1_score': f1,
'timestamp': datetime.now()
}
# Example usage
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
# Create sample data and model
X_train, y_train = make_classification(n_samples=1000, n_features=5, random_state=42)
training_df = pd.DataFrame(X_train, columns=[f'feature_{i}' for i in range(5)])
model = RandomForestClassifier(random_state=42)
model.fit(X_train, y_train)
# Initialize monitoring system
monitoring = MLMonitoringSystem(model, training_df)
# Make monitored prediction
test_features = [0.5, -0.3, 1.2, 0.8, -0.5]
prediction = monitoring.predict_with_monitoring(test_features, prediction_id='pred_001')
print(f"Prediction: {prediction}")
---PAGE---
Best Practices
1. Monitor the Right Metrics
Essential metrics:
- Model performance (accuracy, precision, recall, etc.)
- System performance (latency, throughput, errors)
- Data quality (missing values, outliers, types)
- Data drift (PSI, KS test, chi-square)
Don't over-monitor:
- Focus on actionable metrics
- Avoid vanity metrics
- Balance coverage vs. noise
2. Set Meaningful Thresholds
# Bad: Arbitrary thresholds
if accuracy < 0.8: # Why 0.8?
alert()
# Good: Data-driven thresholds
baseline_accuracy = 0.92
threshold = baseline_accuracy * 0.9 # 10% drop from baseline
if accuracy < threshold:
alert(f"Accuracy dropped {(baseline_accuracy - accuracy):.1%} from baseline")
3. Implement Graceful Degradation
class RobustPredictor:
"""Predictor with fallback strategies"""
def __init__(self, primary_model, fallback_model):
self.primary_model = primary_model
self.fallback_model = fallback_model
self.primary_failures = 0
def predict(self, features):
"""Try primary model, fallback if needed"""
try:
# Check data quality
if self._is_data_quality_acceptable(features):
prediction = self.primary_model.predict([features])
return prediction
else:
# Use simpler fallback model for poor quality data
print("⚠️ Using fallback model due to data quality")
return self.fallback_model.predict([features])
except Exception as e:
self.primary_failures += 1
print(f"❌ Primary model failed, using fallback: {e}")
return self.fallback_model.predict([features])
def _is_data_quality_acceptable(self, features):
"""Check if data quality is good enough for primary model"""
# Implement quality checks
return not any(pd.isna(features))
4. Automate Retraining Triggers
class AutoRetrainingMonitor:
"""Monitor metrics and trigger retraining"""
def __init__(self, accuracy_threshold=0.85, drift_threshold=0.2):
self.accuracy_threshold = accuracy_threshold
self.drift_threshold = drift_threshold
self.recent_accuracy = []
self.recent_drift_scores = []
def should_retrain(self):
"""Decide if model should be retrained"""
reasons = []
# Check accuracy trend
if len(self.recent_accuracy) >= 10:
avg_accuracy = np.mean(self.recent_accuracy[-10:])
if avg_accuracy < self.accuracy_threshold:
reasons.append(f"Low accuracy: {avg_accuracy:.2%}")
# Check drift
if len(self.recent_drift_scores) >= 5:
max_drift = max(self.recent_drift_scores[-5:])
if max_drift > self.drift_threshold:
reasons.append(f"High drift: PSI={max_drift:.3f}")
if reasons:
print("🔄 Retraining triggered:")
for reason in reasons:
print(f" - {reason}")
return True
return False
def log_metrics(self, accuracy, drift_score):
"""Log metrics for retraining decision"""
self.recent_accuracy.append(accuracy)
self.recent_drift_scores.append(drift_score)
5. Document Your Monitoring
Create a monitoring runbook:
# Model Monitoring Runbook
## Alerts and Response
### Critical: Model Accuracy < 75%
**Response Time:** Immediate
**Actions:**
1. Check recent data for quality issues
2. Verify ground truth labels are correct
3. Investigate data drift in top features
4. Consider rolling back to previous model version
5. Trigger emergency retraining if needed
### Warning: Data Drift PSI > 0.1
**Response Time:** Within 24 hours
**Actions:**
1. Identify which features are drifting
2. Analyze production data distribution
3. Determine if drift is temporary or permanent
4. Plan retraining with recent data
5. Update feature engineering if needed
### Critical: Prediction Latency > 2s
**Response Time:** Within 1 hour
**Actions:**
1. Check system resources (CPU, memory, GPU)
2. Review recent deployments or changes
3. Analyze batch sizes and concurrency
4. Consider scaling infrastructure
5. Implement caching if applicable
---PAGE---
---QUIZ--- TITLE: Monitoring and Observability Knowledge Check INTRO: Test your understanding of ML monitoring concepts!
Q: What is data drift? A: When input data distribution changes over time A: When the model performance improves A: When the server runs out of memory A: When predictions are cached CORRECT: 0 EXPLAIN: Data drift occurs when the statistical properties of input features change over time. This can cause model performance to degrade because the model was trained on different data distributions.
Q: What does a PSI (Population Stability Index) score of 0.25 indicate? A: No significant change A: Moderate change, monitor closely A: Significant change, investigate immediately A: Perfect model performance CORRECT: 2 EXPLAIN: PSI >= 0.2 indicates significant change in data distribution. PSI < 0.1 is no significant change, 0.1-0.2 is moderate change, and >= 0.2 requires immediate investigation.
Q: Why is monitoring latency important for ML systems? A: It affects user experience and SLA compliance B: It indicates model accuracy C: It measures data quality D: It prevents overfitting CORRECT: 0 EXPLAIN: Latency (response time) directly impacts user experience. High latency can violate service level agreements (SLAs) and make your ML service unusable in real-time applications, even if the model is accurate.
Q: What is the main challenge with monitoring ML model accuracy in production? A: Models are too complex to evaluate A: Ground truth labels are often delayed or unavailable A: Accuracy metrics don't exist A: Production data is always perfect CORRECT: 1 EXPLAIN: In production, you often don't have immediate access to ground truth labels. For example, in fraud detection, you might not know if a transaction was fraudulent until weeks later. This makes real-time accuracy monitoring challenging.
Q: Which metric is NOT typically monitored for ML systems? A: Prediction latency A: Data drift score A: Number of code comments A: Error rate CORRECT: 2 EXPLAIN: Number of code comments is a code quality metric, not an ML system metric. Important ML metrics include latency (response time), data drift (distribution changes), and error rate (prediction failures).
Q: What does the Kolmogorov-Smirnov (KS) test measure? A: Model accuracy A: Difference between two probability distributions A: Server CPU usage A: Code quality CORRECT: 1 EXPLAIN: The KS test measures whether two samples come from the same distribution. In ML monitoring, it's used to detect data drift by comparing training data distribution with production data distribution.
Q: What should you do if your model's accuracy suddenly drops from 92% to 75%? A: Ignore it, accuracy fluctuates normally A: Immediately retrain on all historical data A: Investigate data quality, drift, and consider rolling back A: Delete the monitoring system CORRECT: 2 EXPLAIN: A sudden significant drop in accuracy requires immediate investigation. Check for data quality issues, data drift, or bugs. Consider rolling back to the previous model version while investigating. Don't immediately retrain without understanding the root cause.
Q: What is the purpose of implementing graceful degradation in ML systems? A: To make the model slower A: To provide fallback strategies when primary model fails A: To reduce monitoring costs A: To increase model complexity CORRECT: 1 EXPLAIN: Graceful degradation provides fallback strategies (like simpler models or rule-based systems) when the primary model fails or encounters poor quality data. This ensures your system continues to function even when issues occur. ---END-QUIZ---
---PAGE---
Key Takeaways
Congratulations! You've completed Part 2 of the MLOps series on monitoring and observability.
Core Concepts
✓ Data Drift: Detecting when input distributions change (KS test, PSI, chi-square)
✓ Model Performance: Tracking accuracy metrics with delayed ground truth
✓ System Metrics: Monitoring latency, throughput, and error rates
✓ Data Quality: Validating inputs for missing values and outliers
Technical Skills
✓ Statistical Tests: Implementing drift detection algorithms
✓ Metrics Collection: Using Prometheus, CloudWatch for monitoring
✓ Dashboards: Building Grafana dashboards for visualization
✓ Alerting: Setting up Slack, email, PagerDuty notifications
Best Practices
✓ Meaningful Thresholds: Data-driven alert thresholds
✓ Graceful Degradation: Fallback strategies for failures
✓ Automated Retraining: Triggering retraining based on metrics
✓ Documentation: Creating runbooks for alert response
What's Next
In Part 3: CI/CD for Machine Learning, you'll learn:
- Automated testing for ML models and pipelines
- Building continuous training workflows
- Version control for models, data, and code
- Pipeline orchestration with Airflow and Kubeflow
- Experiment tracking with MLflow
- Continuous deployment strategies
In Part 4: Scaling ML Systems, you'll explore:
- Horizontal and vertical scaling strategies
- Distributed inference across multiple machines
- GPU optimization for deep learning
- Caching and performance tuning
- Cost optimization techniques
Resources for Further Learning
- Evidently AI: Open-source ML monitoring - https://evidentlyai.com
- Prometheus Docs: Monitoring and alerting - https://prometheus.io/docs/
- Grafana Tutorials: Dashboard creation - https://grafana.com/tutorials/
- AWS CloudWatch: ML metrics monitoring - https://aws.amazon.com/cloudwatch/
- Seldon Alibi Detect: Drift detection library - https://github.com/SeldonIO/alibi-detect
Practice Exercises
To reinforce your learning:
- Implement PSI calculation for your own dataset
- Set up Prometheus metrics for a simple ML service
- Create a Grafana dashboard with key ML metrics
- Build a data drift detector using statistical tests
- Design an alerting strategy for a production ML system
Keep monitoring, and see you in Part 3!