MLOps Part 2: Model Monitoring and Observability

Learn how to monitor machine learning models in production, detect drift, track performance metrics, and build observability into your ML systems

Introduction to ML Monitoring

Once your model is deployed, the real work begins: ensuring it continues to perform well over time. Production ML models face unique challenges that traditional software doesn't encounter.

What You'll Learn

In this module, you'll understand:

  • Drift detection: Identifying when data or model behavior changes
  • Performance monitoring: Tracking model accuracy and quality
  • Observability: Building visibility into ML systems
  • Alerting: Setting up notifications for issues

Why Monitoring Matters

Models degrade over time due to:

  • Data drift: Input data distribution changes
  • Concept drift: Relationships between features and targets change
  • Model staleness: Model doesn't adapt to new patterns
  • Data quality issues: Missing values, outliers, corrupt data

Without monitoring, you won't know when your model fails!

---PAGE---

Understanding Data Drift

Data drift occurs when the statistical properties of input features change over time, causing model performance to degrade.

Types of Data Drift

1. Covariate Shift

The distribution of input features (X) changes, but the relationship between features and target (P(Y|X)) remains the same.

Example:

Training data: Ages 25-45 (e-commerce shoppers)
Production data: Ages 18-25 (new demographic)
Result: Model sees different age distribution

2. Prior Probability Shift

The distribution of target variable (Y) changes, but P(X|Y) stays the same.

Example:

Training: 90% approved loans, 10% rejected
Production: 70% approved, 30% rejected (economic downturn)
Result: Class imbalance shifted

3. Concept Drift

The relationship between features and target P(Y|X) changes.

Example:

Training: "luxury car" = expensive
Production: Electric vehicles changed what "luxury" means
Result: Feature meanings evolved

Detecting Data Drift

Statistical Tests

Kolmogorov-Smirnov Test (continuous features):

from scipy.stats import ks_2samp
import numpy as np

# Training data distribution
training_ages = np.array([25, 30, 35, 40, 45, 50])

# Production data distribution
production_ages = np.array([18, 20, 22, 25, 28, 30])

# Perform KS test
statistic, p_value = ks_2samp(training_ages, production_ages)

if p_value < 0.05:
    print(f"⚠️ Data drift detected! p-value: {p_value:.4f}")
else:
    print(f"✓ No significant drift. p-value: {p_value:.4f}")

Chi-Square Test (categorical features):

from scipy.stats import chisquare

# Training data: category frequencies
training_categories = [100, 200, 150, 50]  # 4 categories

# Production data: category frequencies
production_categories = [80, 180, 200, 40]

# Perform chi-square test
statistic, p_value = chisquare(production_categories, training_categories)

if p_value < 0.05:
    print(f"⚠️ Category distribution changed! p-value: {p_value:.4f}")
else:
    print(f"✓ Category distribution stable. p-value: {p_value:.4f}")

Population Stability Index (PSI)

PSI measures drift in feature distributions:

import numpy as np

def calculate_psi(expected, actual, bins=10):
    """
    Calculate Population Stability Index
    PSI < 0.1: No significant change
    0.1 <= PSI < 0.2: Moderate change
    PSI >= 0.2: Significant change
    """
    # Create bins
    breakpoints = np.percentile(expected, np.linspace(0, 100, bins + 1))

    # Calculate frequencies
    expected_percents = np.histogram(expected, breakpoints)[0] / len(expected)
    actual_percents = np.histogram(actual, breakpoints)[0] / len(actual)

    # Avoid division by zero
    expected_percents = np.where(expected_percents == 0, 0.0001, expected_percents)
    actual_percents = np.where(actual_percents == 0, 0.0001, actual_percents)

    # Calculate PSI
    psi = np.sum((actual_percents - expected_percents) *
                 np.log(actual_percents / expected_percents))

    return psi

# Example usage
training_feature = np.random.normal(100, 15, 1000)
production_feature = np.random.normal(110, 20, 1000)  # Drifted

psi_score = calculate_psi(training_feature, production_feature)
print(f"PSI Score: {psi_score:.4f}")

if psi_score < 0.1:
    print("✓ No significant drift")
elif psi_score < 0.2:
    print("⚠️ Moderate drift detected")
else:
    print("🚨 Significant drift! Investigate immediately")

---PAGE---

Model Performance Monitoring

Tracking model performance in production requires collecting ground truth labels and comparing predictions.

Key Metrics to Monitor

Classification Metrics

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import numpy as np

def log_classification_metrics(y_true, y_pred, timestamp):
    """Log classification metrics to monitoring system"""

    metrics = {
        'timestamp': timestamp,
        'accuracy': accuracy_score(y_true, y_pred),
        'precision': precision_score(y_true, y_pred, average='weighted'),
        'recall': recall_score(y_true, y_pred, average='weighted'),
        'f1_score': f1_score(y_true, y_pred, average='weighted')
    }

    # Log to monitoring system (Prometheus, CloudWatch, etc.)
    for metric_name, value in metrics.items():
        print(f"{metric_name}: {value:.4f}")

    return metrics

# Example usage
y_true = np.array([1, 0, 1, 1, 0, 1, 0, 0])
y_pred = np.array([1, 0, 1, 0, 0, 1, 1, 0])

metrics = log_classification_metrics(y_true, y_pred, '2025-10-20')

Regression Metrics

from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

def log_regression_metrics(y_true, y_pred, timestamp):
    """Log regression metrics"""

    metrics = {
        'timestamp': timestamp,
        'mse': mean_squared_error(y_true, y_pred),
        'rmse': np.sqrt(mean_squared_error(y_true, y_pred)),
        'mae': mean_absolute_error(y_true, y_pred),
        'r2': r2_score(y_true, y_pred)
    }

    for metric_name, value in metrics.items():
        print(f"{metric_name}: {value:.4f}")

    return metrics

Delayed Ground Truth

Many ML systems have delayed feedback:

import time
from datetime import datetime, timedelta

class PredictionStore:
    """Store predictions and match with ground truth later"""

    def __init__(self):
        self.predictions = {}

    def log_prediction(self, prediction_id, features, prediction):
        """Store prediction for later evaluation"""
        self.predictions[prediction_id] = {
            'features': features,
            'prediction': prediction,
            'timestamp': datetime.now(),
            'ground_truth': None
        }

    def update_ground_truth(self, prediction_id, ground_truth):
        """Update with actual outcome when available"""
        if prediction_id in self.predictions:
            self.predictions[prediction_id]['ground_truth'] = ground_truth

            # Calculate metrics when we have both prediction and truth
            self._evaluate_prediction(prediction_id)

    def _evaluate_prediction(self, prediction_id):
        """Evaluate individual prediction"""
        record = self.predictions[prediction_id]

        if record['ground_truth'] is not None:
            correct = record['prediction'] == record['ground_truth']
            latency = (datetime.now() - record['timestamp']).total_seconds()

            print(f"Prediction {prediction_id}: {'✓' if correct else '✗'}")
            print(f"Feedback latency: {latency:.0f} seconds")

# Example usage
store = PredictionStore()

# Log prediction
store.log_prediction('pred_001', [25, 50000, 'A'], prediction=1)

# Later, when ground truth arrives
time.sleep(2)  # Simulate delay
store.update_ground_truth('pred_001', ground_truth=1)

---PAGE---

System Performance Metrics

Beyond model accuracy, monitor system health and performance.

Latency Monitoring

import time
from prometheus_client import Histogram
import numpy as np

# Define Prometheus metric
prediction_latency = Histogram(
    'model_prediction_latency_seconds',
    'Time spent processing prediction',
    buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0]
)

def predict_with_monitoring(model, features):
    """Make prediction with latency monitoring"""

    start_time = time.time()

    try:
        prediction = model.predict([features])
        latency = time.time() - start_time

        # Record latency
        prediction_latency.observe(latency)

        # Log if slow
        if latency > 0.5:
            print(f"⚠️ Slow prediction: {latency:.3f}s")

        return prediction

    except Exception as e:
        latency = time.time() - start_time
        print(f"❌ Prediction failed after {latency:.3f}s: {e}")
        raise

Throughput Monitoring

from prometheus_client import Counter
import time
from collections import deque

# Request counter
prediction_counter = Counter(
    'model_predictions_total',
    'Total number of predictions',
    ['model_version', 'status']
)

class ThroughputMonitor:
    """Monitor requests per second"""

    def __init__(self, window_seconds=60):
        self.window_seconds = window_seconds
        self.timestamps = deque()

    def record_request(self):
        """Record a new request"""
        now = time.time()
        self.timestamps.append(now)

        # Remove old timestamps outside window
        cutoff = now - self.window_seconds
        while self.timestamps and self.timestamps[0] < cutoff:
            self.timestamps.popleft()

    def get_requests_per_second(self):
        """Calculate current RPS"""
        if not self.timestamps:
            return 0.0

        return len(self.timestamps) / self.window_seconds

# Usage
monitor = ThroughputMonitor()

# Simulate requests
for _ in range(100):
    monitor.record_request()
    time.sleep(0.01)

rps = monitor.get_requests_per_second()
print(f"Current throughput: {rps:.2f} requests/second")

Error Rate Monitoring

from prometheus_client import Counter

# Error counters
prediction_errors = Counter(
    'model_prediction_errors_total',
    'Total prediction errors',
    ['error_type']
)

def predict_with_error_tracking(model, features):
    """Track different types of errors"""

    try:
        # Validate input
        if not features or len(features) == 0:
            prediction_errors.labels(error_type='invalid_input').inc()
            raise ValueError("Empty features")

        # Make prediction
        prediction = model.predict([features])

        # Check for NaN
        if np.isnan(prediction).any():
            prediction_errors.labels(error_type='nan_prediction').inc()
            raise ValueError("Model returned NaN")

        return prediction

    except ValueError as e:
        prediction_errors.labels(error_type='validation_error').inc()
        raise
    except Exception as e:
        prediction_errors.labels(error_type='unknown_error').inc()
        raise

---PAGE---

Data Quality Monitoring

Monitor input data quality to detect issues before they affect predictions.

Missing Values Detection

import pandas as pd
import numpy as np

class DataQualityMonitor:
    """Monitor data quality metrics"""

    def __init__(self, expected_columns):
        self.expected_columns = expected_columns
        self.missing_value_threshold = 0.1  # 10%

    def check_missing_values(self, df):
        """Check for excessive missing values"""

        missing_pct = df.isnull().sum() / len(df)
        issues = []

        for column, pct in missing_pct.items():
            if pct > self.missing_value_threshold:
                issues.append({
                    'column': column,
                    'missing_pct': pct,
                    'severity': 'high' if pct > 0.3 else 'medium'
                })
                print(f"⚠️ {column}: {pct*100:.1f}% missing values")

        return issues

    def check_column_presence(self, df):
        """Verify all expected columns are present"""

        missing_cols = set(self.expected_columns) - set(df.columns)
        extra_cols = set(df.columns) - set(self.expected_columns)

        if missing_cols:
            print(f"❌ Missing columns: {missing_cols}")

        if extra_cols:
            print(f"⚠️ Extra columns: {extra_cols}")

        return len(missing_cols) == 0

    def check_data_types(self, df, expected_types):
        """Verify column data types"""

        type_mismatches = []

        for column, expected_type in expected_types.items():
            if column in df.columns:
                actual_type = df[column].dtype
                if actual_type != expected_type:
                    type_mismatches.append({
                        'column': column,
                        'expected': expected_type,
                        'actual': actual_type
                    })
                    print(f"⚠️ {column}: expected {expected_type}, got {actual_type}")

        return type_mismatches

# Example usage
monitor = DataQualityMonitor(['age', 'income', 'score'])

# Simulate production data
data = pd.DataFrame({
    'age': [25, np.nan, 35, 40],
    'income': [50000, 60000, np.nan, 80000],
    'score': [0.8, 0.9, 0.7, np.nan]
})

issues = monitor.check_missing_values(data)

Outlier Detection

from scipy import stats
import numpy as np

def detect_outliers_zscore(data, threshold=3):
    """Detect outliers using Z-score method"""

    z_scores = np.abs(stats.zscore(data))
    outliers = np.where(z_scores > threshold)[0]

    outlier_pct = len(outliers) / len(data) * 100

    if outlier_pct > 5:  # More than 5% outliers
        print(f"⚠️ {outlier_pct:.1f}% outliers detected")

    return outliers

def detect_outliers_iqr(data):
    """Detect outliers using Interquartile Range"""

    Q1 = np.percentile(data, 25)
    Q3 = np.percentile(data, 75)
    IQR = Q3 - Q1

    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    outliers = np.where((data < lower_bound) | (data > upper_bound))[0]

    print(f"Valid range: [{lower_bound:.2f}, {upper_bound:.2f}]")
    print(f"Outliers found: {len(outliers)}")

    return outliers

# Example
ages = np.array([25, 30, 35, 40, 45, 200, 28, 33])  # 200 is outlier
outliers = detect_outliers_iqr(ages)

---PAGE---

Building Monitoring Dashboards

Visualize metrics and create alerts using popular monitoring tools.

Prometheus + Grafana Setup

Step 1: Instrument your code

from prometheus_client import Counter, Histogram, Gauge, generate_latest
from flask import Flask, Response
import time

app = Flask(__name__)

# Define metrics
predictions_total = Counter(
    'ml_predictions_total',
    'Total predictions made',
    ['model', 'version']
)

prediction_latency = Histogram(
    'ml_prediction_latency_seconds',
    'Prediction latency in seconds',
    ['model']
)

model_accuracy = Gauge(
    'ml_model_accuracy',
    'Current model accuracy',
    ['model', 'version']
)

data_drift_score = Gauge(
    'ml_data_drift_psi',
    'PSI score for data drift',
    ['feature']
)

@app.route('/predict', methods=['POST'])
def predict():
    start = time.time()

    # Make prediction
    prediction = make_prediction()

    # Record metrics
    predictions_total.labels(model='fraud_detector', version='v2').inc()
    prediction_latency.labels(model='fraud_detector').observe(time.time() - start)

    return {'prediction': prediction}

@app.route('/metrics')
def metrics():
    """Expose metrics for Prometheus to scrape"""
    return Response(generate_latest(), mimetype='text/plain')

def make_prediction():
    """Dummy prediction function"""
    time.sleep(0.1)  # Simulate processing
    return 1

Step 2: Prometheus configuration

# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'ml_service'
    static_configs:
      - targets: ['localhost:5000']

Step 3: Key Grafana queries

# Prediction rate (requests per second)
rate(ml_predictions_total[5m])

# Average latency
rate(ml_prediction_latency_seconds_sum[5m])
/
rate(ml_prediction_latency_seconds_count[5m])

# 95th percentile latency
histogram_quantile(0.95,
  rate(ml_prediction_latency_seconds_bucket[5m])
)

# Model accuracy over time
ml_model_accuracy

# Data drift alert
ml_data_drift_psi > 0.2

CloudWatch Metrics (AWS)

import boto3
from datetime import datetime

cloudwatch = boto3.client('cloudwatch')

def log_metric_to_cloudwatch(metric_name, value, unit='None'):
    """Send custom metric to CloudWatch"""

    cloudwatch.put_metric_data(
        Namespace='MLOps/Models',
        MetricData=[
            {
                'MetricName': metric_name,
                'Value': value,
                'Unit': unit,
                'Timestamp': datetime.utcnow(),
                'Dimensions': [
                    {
                        'Name': 'Model',
                        'Value': 'fraud_detector'
                    },
                    {
                        'Name': 'Version',
                        'Value': 'v2'
                    }
                ]
            }
        ]
    )

# Usage
log_metric_to_cloudwatch('PredictionLatency', 0.145, 'Seconds')
log_metric_to_cloudwatch('ModelAccuracy', 0.92, 'Percent')
log_metric_to_cloudwatch('DataDriftPSI', 0.08, 'None')

---PAGE---

Alerting Strategies

Set up alerts to notify you when issues occur.

Alert Rules

Define clear thresholds for different severity levels:

class AlertConfig:
    """Alert thresholds configuration"""

    # Latency alerts
    LATENCY_WARNING = 0.5  # 500ms
    LATENCY_CRITICAL = 2.0  # 2 seconds

    # Accuracy alerts
    ACCURACY_WARNING = 0.85  # Drop below 85%
    ACCURACY_CRITICAL = 0.75  # Drop below 75%

    # Data drift alerts
    PSI_WARNING = 0.1  # Moderate drift
    PSI_CRITICAL = 0.2  # Significant drift

    # Error rate alerts
    ERROR_RATE_WARNING = 0.01  # 1%
    ERROR_RATE_CRITICAL = 0.05  # 5%

    # Throughput alerts
    MIN_THROUGHPUT = 10  # requests/second
    MAX_THROUGHPUT = 1000  # requests/second

class AlertManager:
    """Manage and send alerts"""

    def __init__(self):
        self.active_alerts = set()

    def check_latency(self, latency):
        """Check if latency exceeds thresholds"""

        if latency > AlertConfig.LATENCY_CRITICAL:
            self.send_alert(
                severity='critical',
                message=f'Prediction latency critical: {latency:.2f}s',
                metric='latency',
                value=latency
            )
        elif latency > AlertConfig.LATENCY_WARNING:
            self.send_alert(
                severity='warning',
                message=f'Prediction latency high: {latency:.2f}s',
                metric='latency',
                value=latency
            )

    def check_accuracy(self, accuracy):
        """Check if accuracy drops below thresholds"""

        if accuracy < AlertConfig.ACCURACY_CRITICAL:
            self.send_alert(
                severity='critical',
                message=f'Model accuracy critical: {accuracy:.2%}',
                metric='accuracy',
                value=accuracy
            )
        elif accuracy < AlertConfig.ACCURACY_WARNING:
            self.send_alert(
                severity='warning',
                message=f'Model accuracy degraded: {accuracy:.2%}',
                metric='accuracy',
                value=accuracy
            )

    def check_data_drift(self, psi_score, feature_name):
        """Check for data drift"""

        if psi_score > AlertConfig.PSI_CRITICAL:
            self.send_alert(
                severity='critical',
                message=f'Significant drift in {feature_name}: PSI={psi_score:.3f}',
                metric='data_drift',
                value=psi_score,
                feature=feature_name
            )
        elif psi_score > AlertConfig.PSI_WARNING:
            self.send_alert(
                severity='warning',
                message=f'Moderate drift in {feature_name}: PSI={psi_score:.3f}',
                metric='data_drift',
                value=psi_score,
                feature=feature_name
            )

    def send_alert(self, severity, message, metric, value, **kwargs):
        """Send alert via multiple channels"""

        alert_key = f"{metric}_{severity}"

        # Avoid duplicate alerts
        if alert_key in self.active_alerts:
            return

        self.active_alerts.add(alert_key)

        print(f"🚨 [{severity.upper()}] {message}")

        # Send to different channels based on severity
        if severity == 'critical':
            self.send_pagerduty(message, **kwargs)
            self.send_slack(message, channel='#ml-alerts-critical')
            self.send_email(message, recipients=['ml-team@company.com'])
        elif severity == 'warning':
            self.send_slack(message, channel='#ml-alerts')

    def send_slack(self, message, channel='#ml-alerts'):
        """Send Slack notification"""
        # Implementation would use Slack API
        print(f"📱 Slack -> {channel}: {message}")

    def send_email(self, message, recipients):
        """Send email notification"""
        # Implementation would use email service
        print(f"📧 Email -> {recipients}: {message}")

    def send_pagerduty(self, message, **kwargs):
        """Create PagerDuty incident"""
        # Implementation would use PagerDuty API
        print(f"📟 PagerDuty: {message}")

# Example usage
alert_mgr = AlertManager()

# Simulate monitoring
alert_mgr.check_latency(1.5)  # Warning
alert_mgr.check_accuracy(0.72)  # Critical
alert_mgr.check_data_drift(0.25, 'age')  # Critical

Slack Integration

import requests
import json

def send_slack_alert(webhook_url, alert_data):
    """Send formatted alert to Slack"""

    color = {
        'critical': '#FF0000',
        'warning': '#FFA500',
        'info': '#0000FF'
    }.get(alert_data['severity'], '#808080')

    payload = {
        "attachments": [
            {
                "color": color,
                "title": f"ML Model Alert: {alert_data['severity'].upper()}",
                "fields": [
                    {
                        "title": "Metric",
                        "value": alert_data['metric'],
                        "short": True
                    },
                    {
                        "title": "Value",
                        "value": str(alert_data['value']),
                        "short": True
                    },
                    {
                        "title": "Message",
                        "value": alert_data['message'],
                        "short": False
                    },
                    {
                        "title": "Timestamp",
                        "value": alert_data['timestamp'],
                        "short": True
                    }
                ]
            }
        ]
    }

    response = requests.post(
        webhook_url,
        data=json.dumps(payload),
        headers={'Content-Type': 'application/json'}
    )

    return response.status_code == 200

# Usage
webhook = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
alert = {
    'severity': 'critical',
    'metric': 'model_accuracy',
    'value': 0.72,
    'message': 'Model accuracy dropped below critical threshold',
    'timestamp': '2025-10-20 14:30:00'
}

send_slack_alert(webhook, alert)

---PAGE---

Complete Monitoring Solution

Putting it all together: a production-ready monitoring system.

Full Monitoring Pipeline

import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import time

class MLMonitoringSystem:
    """Complete ML monitoring solution"""

    def __init__(self, model, training_data):
        self.model = model
        self.training_data = training_data
        self.alert_manager = AlertManager()
        self.prediction_store = PredictionStore()

        # Calculate baseline statistics
        self.baseline_stats = self._calculate_baseline_stats()

    def _calculate_baseline_stats(self):
        """Calculate statistics from training data"""

        stats = {}
        for column in self.training_data.columns:
            if pd.api.types.is_numeric_dtype(self.training_data[column]):
                stats[column] = {
                    'mean': self.training_data[column].mean(),
                    'std': self.training_data[column].std(),
                    'min': self.training_data[column].min(),
                    'max': self.training_data[column].max(),
                    'distribution': self.training_data[column].values
                }

        return stats

    def predict_with_monitoring(self, features, prediction_id=None):
        """Make prediction with full monitoring"""

        start_time = time.time()

        try:
            # 1. Data quality checks
            self._check_data_quality(features)

            # 2. Data drift detection
            self._check_data_drift(features)

            # 3. Make prediction
            prediction = self.model.predict([features])[0]

            # 4. Log prediction
            if prediction_id:
                self.prediction_store.log_prediction(
                    prediction_id, features, prediction
                )

            # 5. Monitor latency
            latency = time.time() - start_time
            self.alert_manager.check_latency(latency)

            # 6. Log metrics
            self._log_metrics({
                'latency': latency,
                'prediction': prediction,
                'timestamp': datetime.now()
            })

            return prediction

        except Exception as e:
            print(f"❌ Prediction failed: {e}")
            raise

    def _check_data_quality(self, features):
        """Check input data quality"""

        # Check for NaN values
        if any(pd.isna(features)):
            raise ValueError("Input contains NaN values")

        # Check for infinite values
        if any(np.isinf(features)):
            raise ValueError("Input contains infinite values")

    def _check_data_drift(self, features):
        """Check for data drift in features"""

        for i, (feature_name, value) in enumerate(zip(
            self.training_data.columns, features
        )):
            if feature_name in self.baseline_stats:
                baseline_dist = self.baseline_stats[feature_name]['distribution']

                # Calculate PSI for this feature
                psi = calculate_psi(
                    baseline_dist,
                    np.array([value] * len(baseline_dist))
                )

                self.alert_manager.check_data_drift(psi, feature_name)

    def _log_metrics(self, metrics):
        """Log metrics to monitoring system"""

        # Log to Prometheus, CloudWatch, etc.
        print(f"📊 Metrics logged: {metrics}")

    def evaluate_batch(self, predictions, ground_truth):
        """Evaluate model performance on batch"""

        from sklearn.metrics import accuracy_score, f1_score

        accuracy = accuracy_score(ground_truth, predictions)
        f1 = f1_score(ground_truth, predictions, average='weighted')

        # Check against thresholds
        self.alert_manager.check_accuracy(accuracy)

        print(f"✓ Batch evaluation - Accuracy: {accuracy:.2%}, F1: {f1:.2%}")

        return {
            'accuracy': accuracy,
            'f1_score': f1,
            'timestamp': datetime.now()
        }

# Example usage
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier

# Create sample data and model
X_train, y_train = make_classification(n_samples=1000, n_features=5, random_state=42)
training_df = pd.DataFrame(X_train, columns=[f'feature_{i}' for i in range(5)])

model = RandomForestClassifier(random_state=42)
model.fit(X_train, y_train)

# Initialize monitoring system
monitoring = MLMonitoringSystem(model, training_df)

# Make monitored prediction
test_features = [0.5, -0.3, 1.2, 0.8, -0.5]
prediction = monitoring.predict_with_monitoring(test_features, prediction_id='pred_001')
print(f"Prediction: {prediction}")

---PAGE---

Best Practices

1. Monitor the Right Metrics

Essential metrics:

  • Model performance (accuracy, precision, recall, etc.)
  • System performance (latency, throughput, errors)
  • Data quality (missing values, outliers, types)
  • Data drift (PSI, KS test, chi-square)

Don't over-monitor:

  • Focus on actionable metrics
  • Avoid vanity metrics
  • Balance coverage vs. noise

2. Set Meaningful Thresholds

# Bad: Arbitrary thresholds
if accuracy < 0.8:  # Why 0.8?
    alert()

# Good: Data-driven thresholds
baseline_accuracy = 0.92
threshold = baseline_accuracy * 0.9  # 10% drop from baseline

if accuracy < threshold:
    alert(f"Accuracy dropped {(baseline_accuracy - accuracy):.1%} from baseline")

3. Implement Graceful Degradation

class RobustPredictor:
    """Predictor with fallback strategies"""

    def __init__(self, primary_model, fallback_model):
        self.primary_model = primary_model
        self.fallback_model = fallback_model
        self.primary_failures = 0

    def predict(self, features):
        """Try primary model, fallback if needed"""

        try:
            # Check data quality
            if self._is_data_quality_acceptable(features):
                prediction = self.primary_model.predict([features])
                return prediction
            else:
                # Use simpler fallback model for poor quality data
                print("⚠️ Using fallback model due to data quality")
                return self.fallback_model.predict([features])

        except Exception as e:
            self.primary_failures += 1
            print(f"❌ Primary model failed, using fallback: {e}")
            return self.fallback_model.predict([features])

    def _is_data_quality_acceptable(self, features):
        """Check if data quality is good enough for primary model"""
        # Implement quality checks
        return not any(pd.isna(features))

4. Automate Retraining Triggers

class AutoRetrainingMonitor:
    """Monitor metrics and trigger retraining"""

    def __init__(self, accuracy_threshold=0.85, drift_threshold=0.2):
        self.accuracy_threshold = accuracy_threshold
        self.drift_threshold = drift_threshold
        self.recent_accuracy = []
        self.recent_drift_scores = []

    def should_retrain(self):
        """Decide if model should be retrained"""

        reasons = []

        # Check accuracy trend
        if len(self.recent_accuracy) >= 10:
            avg_accuracy = np.mean(self.recent_accuracy[-10:])
            if avg_accuracy < self.accuracy_threshold:
                reasons.append(f"Low accuracy: {avg_accuracy:.2%}")

        # Check drift
        if len(self.recent_drift_scores) >= 5:
            max_drift = max(self.recent_drift_scores[-5:])
            if max_drift > self.drift_threshold:
                reasons.append(f"High drift: PSI={max_drift:.3f}")

        if reasons:
            print("🔄 Retraining triggered:")
            for reason in reasons:
                print(f"  - {reason}")
            return True

        return False

    def log_metrics(self, accuracy, drift_score):
        """Log metrics for retraining decision"""
        self.recent_accuracy.append(accuracy)
        self.recent_drift_scores.append(drift_score)

5. Document Your Monitoring

Create a monitoring runbook:

# Model Monitoring Runbook

## Alerts and Response

### Critical: Model Accuracy < 75%
**Response Time:** Immediate
**Actions:**
1. Check recent data for quality issues
2. Verify ground truth labels are correct
3. Investigate data drift in top features
4. Consider rolling back to previous model version
5. Trigger emergency retraining if needed

### Warning: Data Drift PSI > 0.1
**Response Time:** Within 24 hours
**Actions:**
1. Identify which features are drifting
2. Analyze production data distribution
3. Determine if drift is temporary or permanent
4. Plan retraining with recent data
5. Update feature engineering if needed

### Critical: Prediction Latency > 2s
**Response Time:** Within 1 hour
**Actions:**
1. Check system resources (CPU, memory, GPU)
2. Review recent deployments or changes
3. Analyze batch sizes and concurrency
4. Consider scaling infrastructure
5. Implement caching if applicable

---PAGE---

---QUIZ--- TITLE: Monitoring and Observability Knowledge Check INTRO: Test your understanding of ML monitoring concepts!

Q: What is data drift? A: When input data distribution changes over time A: When the model performance improves A: When the server runs out of memory A: When predictions are cached CORRECT: 0 EXPLAIN: Data drift occurs when the statistical properties of input features change over time. This can cause model performance to degrade because the model was trained on different data distributions.

Q: What does a PSI (Population Stability Index) score of 0.25 indicate? A: No significant change A: Moderate change, monitor closely A: Significant change, investigate immediately A: Perfect model performance CORRECT: 2 EXPLAIN: PSI >= 0.2 indicates significant change in data distribution. PSI < 0.1 is no significant change, 0.1-0.2 is moderate change, and >= 0.2 requires immediate investigation.

Q: Why is monitoring latency important for ML systems? A: It affects user experience and SLA compliance B: It indicates model accuracy C: It measures data quality D: It prevents overfitting CORRECT: 0 EXPLAIN: Latency (response time) directly impacts user experience. High latency can violate service level agreements (SLAs) and make your ML service unusable in real-time applications, even if the model is accurate.

Q: What is the main challenge with monitoring ML model accuracy in production? A: Models are too complex to evaluate A: Ground truth labels are often delayed or unavailable A: Accuracy metrics don't exist A: Production data is always perfect CORRECT: 1 EXPLAIN: In production, you often don't have immediate access to ground truth labels. For example, in fraud detection, you might not know if a transaction was fraudulent until weeks later. This makes real-time accuracy monitoring challenging.

Q: Which metric is NOT typically monitored for ML systems? A: Prediction latency A: Data drift score A: Number of code comments A: Error rate CORRECT: 2 EXPLAIN: Number of code comments is a code quality metric, not an ML system metric. Important ML metrics include latency (response time), data drift (distribution changes), and error rate (prediction failures).

Q: What does the Kolmogorov-Smirnov (KS) test measure? A: Model accuracy A: Difference between two probability distributions A: Server CPU usage A: Code quality CORRECT: 1 EXPLAIN: The KS test measures whether two samples come from the same distribution. In ML monitoring, it's used to detect data drift by comparing training data distribution with production data distribution.

Q: What should you do if your model's accuracy suddenly drops from 92% to 75%? A: Ignore it, accuracy fluctuates normally A: Immediately retrain on all historical data A: Investigate data quality, drift, and consider rolling back A: Delete the monitoring system CORRECT: 2 EXPLAIN: A sudden significant drop in accuracy requires immediate investigation. Check for data quality issues, data drift, or bugs. Consider rolling back to the previous model version while investigating. Don't immediately retrain without understanding the root cause.

Q: What is the purpose of implementing graceful degradation in ML systems? A: To make the model slower A: To provide fallback strategies when primary model fails A: To reduce monitoring costs A: To increase model complexity CORRECT: 1 EXPLAIN: Graceful degradation provides fallback strategies (like simpler models or rule-based systems) when the primary model fails or encounters poor quality data. This ensures your system continues to function even when issues occur. ---END-QUIZ---

---PAGE---

Key Takeaways

Congratulations! You've completed Part 2 of the MLOps series on monitoring and observability.

Core Concepts

Data Drift: Detecting when input distributions change (KS test, PSI, chi-square)

Model Performance: Tracking accuracy metrics with delayed ground truth

System Metrics: Monitoring latency, throughput, and error rates

Data Quality: Validating inputs for missing values and outliers

Technical Skills

Statistical Tests: Implementing drift detection algorithms

Metrics Collection: Using Prometheus, CloudWatch for monitoring

Dashboards: Building Grafana dashboards for visualization

Alerting: Setting up Slack, email, PagerDuty notifications

Best Practices

Meaningful Thresholds: Data-driven alert thresholds

Graceful Degradation: Fallback strategies for failures

Automated Retraining: Triggering retraining based on metrics

Documentation: Creating runbooks for alert response

What's Next

In Part 3: CI/CD for Machine Learning, you'll learn:

  • Automated testing for ML models and pipelines
  • Building continuous training workflows
  • Version control for models, data, and code
  • Pipeline orchestration with Airflow and Kubeflow
  • Experiment tracking with MLflow
  • Continuous deployment strategies

In Part 4: Scaling ML Systems, you'll explore:

  • Horizontal and vertical scaling strategies
  • Distributed inference across multiple machines
  • GPU optimization for deep learning
  • Caching and performance tuning
  • Cost optimization techniques

Resources for Further Learning

Practice Exercises

To reinforce your learning:

  1. Implement PSI calculation for your own dataset
  2. Set up Prometheus metrics for a simple ML service
  3. Create a Grafana dashboard with key ML metrics
  4. Build a data drift detector using statistical tests
  5. Design an alerting strategy for a production ML system

Keep monitoring, and see you in Part 3!

MLOps Part 2: Model Monitoring and Observability | Software Engineer Blog