MLOps Part 4: Scaling ML Systems
Learn how to scale machine learning systems for production workloads, including horizontal/vertical scaling, distributed inference, GPU optimization, and cost management
Introduction to Scaling ML Systems
As your ML application grows, you'll need to handle more requests, larger datasets, and complex models. Scaling ensures your system remains responsive and cost-effective.
What You'll Learn
In this module, you'll understand:
- Horizontal scaling: Adding more servers to distribute load
- Vertical scaling: Increasing resources on existing servers
- Distributed inference: Running models across multiple machines
- GPU optimization: Efficient use of expensive GPU resources
- Caching strategies: Reducing redundant computations
- Cost optimization: Maximizing performance per dollar
Why Scaling Matters
Production ML systems face unique scaling challenges:
- High computational cost: ML inference is CPU/GPU intensive
- Variable load: Traffic patterns can be unpredictable
- Latency requirements: Users expect sub-second responses
- Cost constraints: GPUs and compute resources are expensive
Without proper scaling, your ML service will be slow, expensive, or both!
---PAGE---
Horizontal Scaling
Distribute load across multiple servers running your ML service.
Load Balancing Basics
# nginx.conf
upstream ml_service {
# Round-robin load balancing
server ml-server-1:5000 weight=1;
server ml-server-2:5000 weight=1;
server ml-server-3:5000 weight=1;
# Health checks
keepalive 32;
}
server {
listen 80;
location /predict {
proxy_pass http://ml_service;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
# Timeouts
proxy_connect_timeout 5s;
proxy_read_timeout 30s;
}
location /health {
proxy_pass http://ml_service/health;
}
}
Kubernetes Deployment for Auto-scaling
# ml-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-model-service
spec:
replicas: 3 # Start with 3 replicas
selector:
matchLabels:
app: ml-model
template:
metadata:
labels:
app: ml-model
spec:
containers:
- name: ml-model
image: myregistry/ml-model:latest
ports:
- containerPort: 5000
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 5000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: ml-model-service
spec:
selector:
app: ml-model
ports:
- protocol: TCP
port: 80
targetPort: 5000
type: LoadBalancer
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: ml-model-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: ml-model-service
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 30
Python Client with Load Balancing
import requests
import random
from typing import List
class LoadBalancedMLClient:
"""Client with client-side load balancing"""
def __init__(self, servers: List[str]):
self.servers = servers
self.server_health = {server: True for server in servers}
def predict(self, features):
"""Make prediction with automatic failover"""
# Get healthy servers
healthy_servers = [s for s, h in self.server_health.items() if h]
if not healthy_servers:
raise Exception("No healthy servers available")
# Try servers until one succeeds
for attempt in range(len(healthy_servers)):
server = random.choice(healthy_servers)
try:
response = requests.post(
f"{server}/predict",
json={'features': features},
timeout=5
)
if response.status_code == 200:
return response.json()
except requests.exceptions.RequestException as e:
print(f"⚠️ Server {server} failed: {e}")
self.server_health[server] = False
healthy_servers.remove(server)
continue
raise Exception("All servers failed")
def check_health(self):
"""Check health of all servers"""
for server in self.servers:
try:
response = requests.get(f"{server}/health", timeout=2)
self.server_health[server] = response.status_code == 200
except:
self.server_health[server] = False
healthy_count = sum(self.server_health.values())
print(f"Health check: {healthy_count}/{len(self.servers)} servers healthy")
# Usage
client = LoadBalancedMLClient([
'http://ml-server-1:5000',
'http://ml-server-2:5000',
'http://ml-server-3:5000'
])
client.check_health()
prediction = client.predict([1.0, 2.0, 3.0])
---PAGE---
Vertical Scaling
Optimize resource usage on individual servers.
CPU Optimization
Multi-threading for Batch Predictions
from concurrent.futures import ThreadPoolExecutor
import numpy as np
class OptimizedPredictor:
"""Predictor with multi-threading optimization"""
def __init__(self, model, n_threads=4):
self.model = model
self.n_threads = n_threads
def predict_batch(self, batch_features):
"""Predict on batch with multi-threading"""
# Split batch into chunks
chunk_size = len(batch_features) // self.n_threads
chunks = [
batch_features[i:i+chunk_size]
for i in range(0, len(batch_features), chunk_size)
]
# Process chunks in parallel
with ThreadPoolExecutor(max_workers=self.n_threads) as executor:
results = list(executor.map(self._predict_chunk, chunks))
# Combine results
return np.concatenate(results)
def _predict_chunk(self, chunk):
"""Predict on a single chunk"""
return self.model.predict(chunk)
# Benchmark
import time
# Single-threaded
start = time.time()
predictions_single = model.predict(large_batch)
time_single = time.time() - start
# Multi-threaded
predictor = OptimizedPredictor(model, n_threads=4)
start = time.time()
predictions_multi = predictor.predict_batch(large_batch)
time_multi = time.time() - start
print(f"Single-threaded: {time_single:.2f}s")
print(f"Multi-threaded: {time_multi:.2f}s")
print(f"Speedup: {time_single/time_multi:.2f}x")
ONNX Runtime for Faster Inference
import onnx
import onnxruntime as ort
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
def convert_to_onnx(sklearn_model, n_features):
"""Convert scikit-learn model to ONNX format"""
# Define input type
initial_type = [('float_input', FloatTensorType([None, n_features]))]
# Convert
onnx_model = convert_sklearn(
sklearn_model,
initial_types=initial_type,
target_opset=12
)
# Save
onnx.save_model(onnx_model, 'model.onnx')
return onnx_model
class ONNXPredictor:
"""Fast inference using ONNX Runtime"""
def __init__(self, onnx_model_path):
self.session = ort.InferenceSession(onnx_model_path)
self.input_name = self.session.get_inputs()[0].name
def predict(self, features):
"""Predict using ONNX Runtime"""
features = np.array(features, dtype=np.float32)
if len(features.shape) == 1:
features = features.reshape(1, -1)
result = self.session.run(None, {self.input_name: features})
return result[0]
# Benchmark ONNX vs scikit-learn
# Convert model
convert_to_onnx(sklearn_model, n_features=10)
# Original scikit-learn
start = time.time()
for _ in range(1000):
pred_sklearn = sklearn_model.predict(test_sample)
time_sklearn = time.time() - start
# ONNX Runtime
onnx_predictor = ONNXPredictor('model.onnx')
start = time.time()
for _ in range(1000):
pred_onnx = onnx_predictor.predict(test_sample)
time_onnx = time.time() - start
print(f"Scikit-learn: {time_sklearn:.3f}s")
print(f"ONNX Runtime: {time_onnx:.3f}s")
print(f"Speedup: {time_sklearn/time_onnx:.2f}x")
Memory Optimization
import sys
import numpy as np
def optimize_memory_usage(df):
"""Reduce memory usage of DataFrame"""
start_mem = df.memory_usage().sum() / 1024**2
print(f"Memory usage before: {start_mem:.2f} MB")
# Optimize integer columns
for col in df.select_dtypes(include=['int']).columns:
c_min = df[col].min()
c_max = df[col].max()
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
# Optimize float columns
for col in df.select_dtypes(include=['float']).columns:
df[col] = df[col].astype(np.float32)
end_mem = df.memory_usage().sum() / 1024**2
print(f"Memory usage after: {end_mem:.2f} MB")
print(f"Decreased by {100 * (start_mem - end_mem) / start_mem:.1f}%")
return df
# Model quantization for memory reduction
import torch
def quantize_pytorch_model(model):
"""Quantize PyTorch model to int8"""
model.eval()
quantized_model = torch.quantization.quantize_dynamic(
model,
{torch.nn.Linear},
dtype=torch.qint8
)
return quantized_model
# Compare sizes
print(f"Original model size: {sys.getsizeof(model) / 1024**2:.2f} MB")
quantized = quantize_pytorch_model(model)
print(f"Quantized model size: {sys.getsizeof(quantized) / 1024**2:.2f} MB")
---PAGE---
GPU Optimization
Efficiently utilize GPU resources for deep learning inference.
Batch Processing for GPU
import torch
import time
class GPUBatchPredictor:
"""Optimize GPU usage with batching"""
def __init__(self, model, device='cuda', max_batch_size=32):
self.model = model.to(device)
self.device = device
self.max_batch_size = max_batch_size
self.model.eval()
def predict_single(self, input_tensor):
"""Predict single sample (inefficient)"""
with torch.no_grad():
input_tensor = input_tensor.to(self.device)
output = self.model(input_tensor.unsqueeze(0))
return output.cpu()
def predict_batch(self, input_tensors):
"""Predict batch of samples (efficient)"""
with torch.no_grad():
# Move entire batch to GPU
batch_tensor = torch.stack(input_tensors).to(self.device)
# Single forward pass
outputs = self.model(batch_tensor)
return outputs.cpu()
def predict_stream(self, input_stream):
"""Process stream of inputs with dynamic batching"""
batch = []
results = []
for input_tensor in input_stream:
batch.append(input_tensor)
# Process when batch is full
if len(batch) >= self.max_batch_size:
batch_results = self.predict_batch(batch)
results.extend(batch_results)
batch = []
# Process remaining items
if batch:
batch_results = self.predict_batch(batch)
results.extend(batch_results)
return results
# Benchmark: Single vs Batched
predictor = GPUBatchPredictor(model)
test_inputs = [torch.randn(3, 224, 224) for _ in range(100)]
# Single predictions
start = time.time()
for input_tensor in test_inputs:
pred = predictor.predict_single(input_tensor)
time_single = time.time() - start
# Batched predictions
start = time.time()
preds = predictor.predict_batch(test_inputs)
time_batch = time.time() - start
print(f"Single predictions: {time_single:.2f}s")
print(f"Batched predictions: {time_batch:.2f}s")
print(f"Speedup: {time_single/time_batch:.2f}x")
TensorRT Optimization
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
class TensorRTPredictor:
"""Ultra-fast inference with TensorRT"""
def __init__(self, onnx_model_path):
self.logger = trt.Logger(trt.Logger.WARNING)
# Build TensorRT engine
with trt.Builder(self.logger) as builder, \
builder.create_network() as network, \
trt.OnnxParser(network, self.logger) as parser:
# Parse ONNX model
with open(onnx_model_path, 'rb') as model_file:
parser.parse(model_file.read())
# Build engine with optimization
config = builder.create_builder_config()
config.max_workspace_size = 1 << 30 # 1GB
# Enable FP16 mode for speed
if builder.platform_has_fast_fp16:
config.set_flag(trt.BuilderFlag.FP16)
self.engine = builder.build_engine(network, config)
# Create execution context
self.context = self.engine.create_execution_context()
# Allocate buffers
self.input_shape = (1, 3, 224, 224)
self.output_shape = (1, 1000)
self.allocate_buffers()
def allocate_buffers(self):
"""Allocate GPU memory for inputs/outputs"""
self.h_input = cuda.pagelocked_empty(
trt.volume(self.input_shape), dtype=np.float32
)
self.h_output = cuda.pagelocked_empty(
trt.volume(self.output_shape), dtype=np.float32
)
self.d_input = cuda.mem_alloc(self.h_input.nbytes)
self.d_output = cuda.mem_alloc(self.h_output.nbytes)
self.stream = cuda.Stream()
def predict(self, input_array):
"""Predict with TensorRT"""
# Copy input to GPU
np.copyto(self.h_input, input_array.ravel())
cuda.memcpy_htod_async(self.d_input, self.h_input, self.stream)
# Run inference
self.context.execute_async_v2(
bindings=[int(self.d_input), int(self.d_output)],
stream_handle=self.stream.handle
)
# Copy output to CPU
cuda.memcpy_dtoh_async(self.h_output, self.d_output, self.stream)
self.stream.synchronize()
return self.h_output.reshape(self.output_shape)
# Benchmark TensorRT vs PyTorch
# PyTorch (GPU)
start = time.time()
for _ in range(1000):
pred_torch = pytorch_model(test_input)
time_torch = time.time() - start
# TensorRT
trt_predictor = TensorRTPredictor('model.onnx')
start = time.time()
for _ in range(1000):
pred_trt = trt_predictor.predict(test_input)
time_trt = time.time() - start
print(f"PyTorch GPU: {time_torch:.3f}s")
print(f"TensorRT: {time_trt:.3f}s")
print(f"Speedup: {time_torch/time_trt:.2f}x")
Multi-GPU Inference
import torch
import torch.nn as nn
class MultiGPUPredictor:
"""Distribute inference across multiple GPUs"""
def __init__(self, model, gpu_ids=[0, 1, 2, 3]):
self.gpu_ids = gpu_ids
self.num_gpus = len(gpu_ids)
# Replicate model to all GPUs
self.models = []
for gpu_id in gpu_ids:
model_copy = model.to(f'cuda:{gpu_id}')
model_copy.eval()
self.models.append(model_copy)
def predict_batch(self, batch_inputs):
"""Split batch across GPUs"""
# Split batch
batch_size = len(batch_inputs)
chunk_size = batch_size // self.num_gpus
chunks = [
batch_inputs[i:i+chunk_size]
for i in range(0, batch_size, chunk_size)
]
# Process on each GPU
results = []
for gpu_id, model, chunk in zip(self.gpu_ids, self.models, chunks):
with torch.no_grad():
chunk_tensor = torch.stack(chunk).to(f'cuda:{gpu_id}')
output = model(chunk_tensor)
results.append(output.cpu())
# Combine results
return torch.cat(results)
# Usage
predictor = MultiGPUPredictor(model, gpu_ids=[0, 1])
large_batch = [torch.randn(3, 224, 224) for _ in range(128)]
predictions = predictor.predict_batch(large_batch)
---PAGE---
Caching Strategies
Reduce redundant computations with intelligent caching.
Prediction Caching
import redis
import hashlib
import json
import pickle
class PredictionCache:
"""Cache predictions with Redis"""
def __init__(self, redis_host='localhost', ttl=3600):
self.redis_client = redis.Redis(host=redis_host, decode_responses=False)
self.ttl = ttl # Time to live in seconds
def get_cache_key(self, features):
"""Generate cache key from features"""
# Convert features to string
features_str = json.dumps(features, sort_keys=True)
# Hash for consistent key
return hashlib.md5(features_str.encode()).hexdigest()
def get(self, features):
"""Get cached prediction"""
key = self.get_cache_key(features)
cached = self.redis_client.get(key)
if cached:
return pickle.loads(cached)
return None
def set(self, features, prediction):
"""Cache prediction"""
key = self.get_cache_key(features)
self.redis_client.setex(
key,
self.ttl,
pickle.dumps(prediction)
)
def predict_with_cache(self, model, features):
"""Predict with caching"""
# Check cache first
cached_pred = self.get(features)
if cached_pred is not None:
print("✓ Cache hit")
return cached_pred
# Cache miss - compute prediction
print("⚠️ Cache miss")
prediction = model.predict([features])[0]
# Cache result
self.set(features, prediction)
return prediction
# Usage
cache = PredictionCache(ttl=1800) # 30 minutes
# First call - cache miss
pred1 = cache.predict_with_cache(model, [1.0, 2.0, 3.0])
# Second call - cache hit (instant)
pred2 = cache.predict_with_cache(model, [1.0, 2.0, 3.0])
Feature Cache
class FeatureCache:
"""Cache expensive feature computations"""
def __init__(self):
self.cache = {}
def compute_expensive_features(self, raw_data, user_id):
"""Compute features with caching"""
cache_key = f"features_{user_id}"
# Check cache
if cache_key in self.cache:
cached_features, timestamp = self.cache[cache_key]
# Check if cache is fresh (< 1 hour old)
if (datetime.now() - timestamp).seconds < 3600:
print("✓ Using cached features")
return cached_features
# Compute features
print("⚠️ Computing features...")
features = self._expensive_computation(raw_data)
# Cache result
self.cache[cache_key] = (features, datetime.now())
return features
def _expensive_computation(self, raw_data):
"""Simulated expensive feature engineering"""
import time
time.sleep(0.5) # Simulate expensive operation
return [x * 2 for x in raw_data]
# Benchmark
cache = FeatureCache()
# First call - expensive
start = time.time()
features1 = cache.compute_expensive_features([1, 2, 3], user_id=123)
time1 = time.time() - start
# Second call - cached
start = time.time()
features2 = cache.compute_expensive_features([1, 2, 3], user_id=123)
time2 = time.time() - start
print(f"First call: {time1:.3f}s")
print(f"Second call: {time2:.3f}s")
print(f"Speedup: {time1/time2:.0f}x")
Model Cache (Warm Start)
class ModelCache:
"""Keep frequently used models in memory"""
def __init__(self, max_models=3):
self.max_models = max_models
self.cache = {}
self.access_count = {}
def load_model(self, model_id):
"""Load model with caching"""
# Check if already loaded
if model_id in self.cache:
self.access_count[model_id] += 1
print(f"✓ Model {model_id} already loaded")
return self.cache[model_id]
# Load model
print(f"⚠️ Loading model {model_id} from disk...")
model = joblib.load(f'models/{model_id}.pkl')
# Evict least used model if cache full
if len(self.cache) >= self.max_models:
least_used = min(self.access_count, key=self.access_count.get)
print(f"Evicting model {least_used}")
del self.cache[least_used]
del self.access_count[least_used]
# Add to cache
self.cache[model_id] = model
self.access_count[model_id] = 1
return model
# Usage
model_cache = ModelCache(max_models=2)
# Load models
model_a = model_cache.load_model('model_v1') # Load from disk
model_a = model_cache.load_model('model_v1') # From cache
model_b = model_cache.load_model('model_v2') # Load from disk
model_c = model_cache.load_model('model_v3') # Load from disk, evicts model_a
---PAGE---
Distributed Inference
Scale beyond a single machine with distributed systems.
Ray for Distributed Inference
import ray
from ray import serve
# Initialize Ray
ray.init()
@serve.deployment(num_replicas=3, ray_actor_options={"num_cpus": 1})
class MLModelServe:
"""Distributed ML model serving with Ray Serve"""
def __init__(self):
import joblib
self.model = joblib.load('model.pkl')
async def __call__(self, request):
"""Handle prediction request"""
data = await request.json()
features = data['features']
prediction = self.model.predict([features])[0]
return {
'prediction': float(prediction),
'model_version': 'v1.0'
}
# Deploy
serve.run(MLModelServe.bind())
# Client code
import requests
response = requests.post(
'http://localhost:8000/MLModelServe',
json={'features': [1.0, 2.0, 3.0]}
)
print(response.json())
Celery for Async Processing
from celery import Celery
import joblib
# Configure Celery
app = Celery(
'ml_tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
# Load model once per worker
model = None
def load_model():
"""Load model in worker process"""
global model
if model is None:
model = joblib.load('model.pkl')
return model
@app.task(name='predict')
def predict_task(features):
"""Async prediction task"""
model = load_model()
prediction = model.predict([features])[0]
return {
'prediction': float(prediction),
'status': 'completed'
}
@app.task(name='batch_predict')
def batch_predict_task(batch_features):
"""Batch prediction task"""
model = load_model()
predictions = model.predict(batch_features)
return {
'predictions': predictions.tolist(),
'count': len(predictions),
'status': 'completed'
}
# Client usage
from celery.result import AsyncResult
# Submit async task
result = predict_task.delay([1.0, 2.0, 3.0])
# Check status
print(f"Task ID: {result.id}")
print(f"Status: {result.status}")
# Get result (blocks until ready)
prediction = result.get(timeout=10)
print(f"Prediction: {prediction}")
# Batch processing
batch = [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]]
batch_result = batch_predict_task.delay(batch)
batch_predictions = batch_result.get()
---PAGE---
Cost Optimization
Reduce infrastructure costs while maintaining performance.
Spot Instances for Training
# AWS Spot Instance configuration
import boto3
ec2 = boto3.client('ec2', region_name='us-east-1')
def launch_spot_training(instance_type='p3.2xlarge'):
"""Launch spot instance for training"""
# Spot instance specification
launch_specification = {
'ImageId': 'ami-xxxxx', # Deep Learning AMI
'InstanceType': instance_type,
'KeyName': 'my-key-pair',
'UserData': '''#!/bin/bash
cd /home/ubuntu
git clone https://github.com/my-org/ml-training.git
cd ml-training
pip install -r requirements.txt
python train.py --config production.yaml
aws s3 cp models/ s3://my-bucket/models/ --recursive
shutdown -h now
''',
'IamInstanceProfile': {
'Name': 'ML-Training-Role'
}
}
# Request spot instance
response = ec2.request_spot_instances(
SpotPrice='1.00', # Max price per hour
InstanceCount=1,
Type='one-time',
LaunchSpecification=launch_specification
)
print(f"✓ Spot instance requested: {response['SpotInstanceRequests'][0]['SpotInstanceRequestId']}")
return response
Auto-scaling Based on Queue Length
class QueueBasedScaler:
"""Scale workers based on queue length"""
def __init__(self, redis_client, min_workers=1, max_workers=10):
self.redis = redis_client
self.min_workers = min_workers
self.max_workers = max_workers
self.queue_name = 'ml_predictions'
def get_queue_length(self):
"""Get current queue length"""
return self.redis.llen(self.queue_name)
def calculate_desired_workers(self):
"""Calculate how many workers we need"""
queue_length = self.get_queue_length()
# Scale: 1 worker per 10 items in queue
desired = max(
self.min_workers,
min(queue_length // 10, self.max_workers)
)
return desired
def scale_workers(self, current_workers):
"""Decide to scale up or down"""
desired = self.calculate_desired_workers()
if desired > current_workers:
# Scale up
return desired - current_workers, 'up'
elif desired < current_workers:
# Scale down
return current_workers - desired, 'down'
else:
# No change
return 0, 'stable'
# Monitor and scale
import time
scaler = QueueBasedScaler(redis_client)
current_workers = 2
while True:
change, direction = scaler.scale_workers(current_workers)
if change > 0:
if direction == 'up':
print(f"📈 Scaling UP by {change} workers (queue: {scaler.get_queue_length()})")
# Trigger scale up
current_workers += change
else:
print(f"📉 Scaling DOWN by {change} workers (queue: {scaler.get_queue_length()})")
# Trigger scale down
current_workers -= change
time.sleep(30) # Check every 30 seconds
Cost Monitoring
import boto3
from datetime import datetime, timedelta
class CostMonitor:
"""Monitor AWS costs for ML infrastructure"""
def __init__(self):
self.ce_client = boto3.client('ce', region_name='us-east-1')
def get_daily_cost(self, service='Amazon SageMaker'):
"""Get cost for specific service"""
end = datetime.now().date()
start = end - timedelta(days=30)
response = self.ce_client.get_cost_and_usage(
TimePeriod={
'Start': start.strftime('%Y-%m-%d'),
'End': end.strftime('%Y-%m-%d')
},
Granularity='DAILY',
Metrics=['UnblendedCost'],
Filter={
'Dimensions': {
'Key': 'SERVICE',
'Values': [service]
}
}
)
costs = []
for result in response['ResultsByTime']:
date = result['TimePeriod']['Start']
cost = float(result['Total']['UnblendedCost']['Amount'])
costs.append({'date': date, 'cost': cost})
return costs
def get_cost_forecast(self, days=7):
"""Get cost forecast"""
end = (datetime.now() + timedelta(days=days)).date()
start = datetime.now().date()
response = self.ce_client.get_cost_forecast(
TimePeriod={
'Start': start.strftime('%Y-%m-%d'),
'End': end.strftime('%Y-%m-%d')
},
Metric='UNBLENDED_COST',
Granularity='DAILY'
)
forecast = float(response['Total']['Amount'])
print(f"💰 Forecast for next {days} days: ${forecast:.2f}")
return forecast
# Usage
monitor = CostMonitor()
costs = monitor.get_daily_cost('Amazon SageMaker')
total = sum(c['cost'] for c in costs)
print(f"Total SageMaker cost (30 days): ${total:.2f}")
forecast = monitor.get_cost_forecast(days=7)
---PAGE---
Performance Benchmarking
Measure and optimize system performance.
Load Testing
import concurrent.futures
import requests
import time
import statistics
class LoadTester:
"""Load test ML service"""
def __init__(self, endpoint_url):
self.endpoint_url = endpoint_url
def single_request(self, request_id):
"""Make single prediction request"""
start = time.time()
try:
response = requests.post(
self.endpoint_url,
json={'features': [1.0, 2.0, 3.0]},
timeout=10
)
latency = time.time() - start
return {
'request_id': request_id,
'status_code': response.status_code,
'latency': latency,
'success': response.status_code == 200
}
except Exception as e:
return {
'request_id': request_id,
'status_code': 0,
'latency': time.time() - start,
'success': False,
'error': str(e)
}
def run_load_test(self, num_requests=1000, concurrency=10):
"""Run load test with multiple concurrent users"""
print(f"Starting load test:")
print(f" Requests: {num_requests}")
print(f" Concurrency: {concurrency}")
start_time = time.time()
results = []
# Execute requests concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = [
executor.submit(self.single_request, i)
for i in range(num_requests)
]
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
total_time = time.time() - start_time
# Analyze results
self.analyze_results(results, total_time)
def analyze_results(self, results, total_time):
"""Analyze load test results"""
successful = [r for r in results if r['success']]
failed = [r for r in results if not r['success']]
latencies = [r['latency'] for r in successful]
print("\n" + "="*50)
print("LOAD TEST RESULTS")
print("="*50)
print(f"Total requests: {len(results)}")
print(f"Successful: {len(successful)} ({len(successful)/len(results)*100:.1f}%)")
print(f"Failed: {len(failed)} ({len(failed)/len(results)*100:.1f}%)")
print(f"Total time: {total_time:.2f}s")
print(f"Requests/second: {len(results)/total_time:.2f}")
print()
print("Latency statistics (successful requests):")
print(f" Min: {min(latencies)*1000:.0f}ms")
print(f" Max: {max(latencies)*1000:.0f}ms")
print(f" Mean: {statistics.mean(latencies)*1000:.0f}ms")
print(f" Median: {statistics.median(latencies)*1000:.0f}ms")
print(f" P95: {statistics.quantiles(latencies, n=20)[18]*1000:.0f}ms")
print(f" P99: {statistics.quantiles(latencies, n=100)[98]*1000:.0f}ms")
print("="*50)
# Usage
tester = LoadTester('http://localhost:5000/predict')
tester.run_load_test(num_requests=1000, concurrency=20)
---PAGE---
---QUIZ--- TITLE: Scaling ML Systems Knowledge Check INTRO: Test your understanding of ML system scaling concepts!
Q: What is the main difference between horizontal and vertical scaling? A: Horizontal adds more servers; vertical increases resources on existing servers A: Horizontal is cheaper; vertical is more expensive A: Horizontal uses GPUs; vertical uses CPUs A: There is no difference CORRECT: 0 EXPLAIN: Horizontal scaling (scaling out) adds more machines to distribute load, while vertical scaling (scaling up) increases CPU, RAM, or GPU on existing machines. Both have different cost and complexity trade-offs.
Q: Why is batching important for GPU inference? A: It reduces memory usage A: It maximizes GPU utilization and throughput by processing multiple samples together A: It makes predictions more accurate A: It reduces model size CORRECT: 1 EXPLAIN: GPUs excel at parallel processing. Batching allows the GPU to process multiple samples simultaneously, fully utilizing its parallel processing capabilities. Single predictions leave most of the GPU idle.
Q: What is the purpose of prediction caching? A: To train models faster A: To avoid redundant computations by storing and reusing previous predictions A: To increase model accuracy A: To save model files CORRECT: 1 EXPLAIN: Prediction caching stores results of previous predictions so identical inputs return cached results instantly, avoiding expensive model inference. This is especially valuable when many users request predictions for similar or identical inputs.
Q: When should you use spot instances for ML workloads? A: For production inference serving A: For model training where interruptions are acceptable A: For real-time critical predictions A: Never, they are unreliable CORRECT: 1 EXPLAIN: Spot instances are much cheaper but can be terminated at any time. They're ideal for training (which can checkpoint and resume) but NOT for production serving where uptime is critical.
Q: What is TensorRT used for? A: Training models faster A: Optimizing inference performance on NVIDIA GPUs A: Data preprocessing A: Model versioning CORRECT: 1 EXPLAIN: TensorRT is NVIDIA's inference optimization library that dramatically speeds up model inference on GPUs through techniques like layer fusion, precision calibration, and kernel auto-tuning. It can provide 2-10x speedup over standard frameworks.
Q: What metric indicates when to scale up your ML service? A: Model accuracy drops A: High CPU/GPU utilization, increased latency, or growing request queue A: Training loss increases A: Number of features changes CORRECT: 1 EXPLAIN: Scaling indicators include high resource utilization (>70-80% CPU/GPU), increasing response latency, or growing request queues. These show your current capacity can't handle the load.
Q: What is ONNX Runtime used for? A: Training models A: Cross-platform optimized model inference A: Data validation A: Model monitoring CORRECT: 1 EXPLAIN: ONNX Runtime is a cross-platform inference engine that runs ONNX format models with optimizations for different hardware (CPU, GPU, edge devices). It often provides 2-5x speedup over native framework inference.
Q: How does multi-GPU inference work? A: One model on one GPU processes all requests A: Batch is split across GPUs, each processes a portion in parallel A: Multiple models train simultaneously A: GPUs take turns processing requests CORRECT: 1 EXPLAIN: Multi-GPU inference splits a large batch across multiple GPUs, with each GPU processing its portion in parallel. The results are then combined, providing near-linear speedup with the number of GPUs. ---END-QUIZ---
---PAGE---
Key Takeaways
Congratulations! You've completed the entire MLOps series!
MLOps Series Summary
You've learned the complete MLOps pipeline:
Part 1: Deployment → Part 2: Monitoring → Part 3: CI/CD → Part 4: Scaling
Part 4 Core Concepts
✓ Horizontal Scaling: Load balancing, Kubernetes auto-scaling
✓ Vertical Scaling: CPU/GPU optimization, memory management
✓ GPU Optimization: Batching, TensorRT, multi-GPU inference
✓ Caching: Prediction cache, feature cache, model cache
✓ Distributed Systems: Ray, Celery for async processing
✓ Cost Optimization: Spot instances, auto-scaling, monitoring
Complete MLOps Skills
✓ Deployment: REST APIs, containers, deployment strategies
✓ Monitoring: Drift detection, performance tracking, alerting
✓ CI/CD: Automated testing, ML pipelines, experiment tracking
✓ Scaling: Load balancing, GPU optimization, cost management
Production ML Checklist
- Model deployed with proper serving infrastructure
- Monitoring for drift, performance, and data quality
- Automated testing for code, data, and models
- CI/CD pipeline for continuous training/deployment
- Horizontal scaling with load balancing
- GPU optimization for deep learning workloads
- Caching for common predictions
- Cost monitoring and optimization
- Rollback strategy for failed deployments
- Documentation and runbooks
Resources for Continued Learning
Deployment & Serving:
- TensorFlow Serving: https://www.tensorflow.org/tfx/guide/serving
- NVIDIA Triton: https://github.com/triton-inference-server
Monitoring:
- Evidently AI: https://evidentlyai.com
- WhyLabs: https://whylabs.ai
CI/CD:
- MLflow: https://mlflow.org
- DVC: https://dvc.org
- Kubeflow: https://kubeflow.org
Scaling:
- Ray Serve: https://docs.ray.io/en/latest/serve/
- TensorRT: https://developer.nvidia.com/tensorrt
- ONNX Runtime: https://onnxruntime.ai
Cloud Platforms:
- AWS SageMaker: https://aws.amazon.com/sagemaker/
- Google Vertex AI: https://cloud.google.com/vertex-ai
- Azure ML: https://azure.microsoft.com/en-us/services/machine-learning/
Next Steps
Now that you've completed the MLOps series:
- Apply these concepts to your own ML projects
- Start small: Implement one improvement at a time
- Measure impact: Track metrics before and after changes
- Iterate: Continuously improve your ML systems
- Share knowledge: Help others learn MLOps
Final Thoughts
MLOps is a journey, not a destination. The field evolves rapidly with new tools and best practices. Stay curious, keep learning, and focus on delivering reliable, scalable ML systems that create real value.
Thank you for completing the MLOps series! 🎉
Happy building!