MLOps Part 4: Scaling ML Systems

Learn how to scale machine learning systems for production workloads, including horizontal/vertical scaling, distributed inference, GPU optimization, and cost management

Introduction to Scaling ML Systems

As your ML application grows, you'll need to handle more requests, larger datasets, and complex models. Scaling ensures your system remains responsive and cost-effective.

What You'll Learn

In this module, you'll understand:

  • Horizontal scaling: Adding more servers to distribute load
  • Vertical scaling: Increasing resources on existing servers
  • Distributed inference: Running models across multiple machines
  • GPU optimization: Efficient use of expensive GPU resources
  • Caching strategies: Reducing redundant computations
  • Cost optimization: Maximizing performance per dollar

Why Scaling Matters

Production ML systems face unique scaling challenges:

  • High computational cost: ML inference is CPU/GPU intensive
  • Variable load: Traffic patterns can be unpredictable
  • Latency requirements: Users expect sub-second responses
  • Cost constraints: GPUs and compute resources are expensive

Without proper scaling, your ML service will be slow, expensive, or both!

---PAGE---

Horizontal Scaling

Distribute load across multiple servers running your ML service.

Load Balancing Basics

# nginx.conf
upstream ml_service {
    # Round-robin load balancing
    server ml-server-1:5000 weight=1;
    server ml-server-2:5000 weight=1;
    server ml-server-3:5000 weight=1;

    # Health checks
    keepalive 32;
}

server {
    listen 80;

    location /predict {
        proxy_pass http://ml_service;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;

        # Timeouts
        proxy_connect_timeout 5s;
        proxy_read_timeout 30s;
    }

    location /health {
        proxy_pass http://ml_service/health;
    }
}

Kubernetes Deployment for Auto-scaling

# ml-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-model-service
spec:
  replicas: 3  # Start with 3 replicas
  selector:
    matchLabels:
      app: ml-model
  template:
    metadata:
      labels:
        app: ml-model
    spec:
      containers:
      - name: ml-model
        image: myregistry/ml-model:latest
        ports:
        - containerPort: 5000
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 5000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 5000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: ml-model-service
spec:
  selector:
    app: ml-model
  ports:
  - protocol: TCP
    port: 80
    targetPort: 5000
  type: LoadBalancer
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ml-model-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ml-model-service
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 50
        periodSeconds: 60
    scaleUp:
      stabilizationWindowSeconds: 0
      policies:
      - type: Percent
        value: 100
        periodSeconds: 30

Python Client with Load Balancing

import requests
import random
from typing import List

class LoadBalancedMLClient:
    """Client with client-side load balancing"""

    def __init__(self, servers: List[str]):
        self.servers = servers
        self.server_health = {server: True for server in servers}

    def predict(self, features):
        """Make prediction with automatic failover"""

        # Get healthy servers
        healthy_servers = [s for s, h in self.server_health.items() if h]

        if not healthy_servers:
            raise Exception("No healthy servers available")

        # Try servers until one succeeds
        for attempt in range(len(healthy_servers)):
            server = random.choice(healthy_servers)

            try:
                response = requests.post(
                    f"{server}/predict",
                    json={'features': features},
                    timeout=5
                )

                if response.status_code == 200:
                    return response.json()

            except requests.exceptions.RequestException as e:
                print(f"⚠️ Server {server} failed: {e}")
                self.server_health[server] = False
                healthy_servers.remove(server)
                continue

        raise Exception("All servers failed")

    def check_health(self):
        """Check health of all servers"""

        for server in self.servers:
            try:
                response = requests.get(f"{server}/health", timeout=2)
                self.server_health[server] = response.status_code == 200
            except:
                self.server_health[server] = False

        healthy_count = sum(self.server_health.values())
        print(f"Health check: {healthy_count}/{len(self.servers)} servers healthy")

# Usage
client = LoadBalancedMLClient([
    'http://ml-server-1:5000',
    'http://ml-server-2:5000',
    'http://ml-server-3:5000'
])

client.check_health()
prediction = client.predict([1.0, 2.0, 3.0])

---PAGE---

Vertical Scaling

Optimize resource usage on individual servers.

CPU Optimization

Multi-threading for Batch Predictions

from concurrent.futures import ThreadPoolExecutor
import numpy as np

class OptimizedPredictor:
    """Predictor with multi-threading optimization"""

    def __init__(self, model, n_threads=4):
        self.model = model
        self.n_threads = n_threads

    def predict_batch(self, batch_features):
        """Predict on batch with multi-threading"""

        # Split batch into chunks
        chunk_size = len(batch_features) // self.n_threads
        chunks = [
            batch_features[i:i+chunk_size]
            for i in range(0, len(batch_features), chunk_size)
        ]

        # Process chunks in parallel
        with ThreadPoolExecutor(max_workers=self.n_threads) as executor:
            results = list(executor.map(self._predict_chunk, chunks))

        # Combine results
        return np.concatenate(results)

    def _predict_chunk(self, chunk):
        """Predict on a single chunk"""
        return self.model.predict(chunk)

# Benchmark
import time

# Single-threaded
start = time.time()
predictions_single = model.predict(large_batch)
time_single = time.time() - start

# Multi-threaded
predictor = OptimizedPredictor(model, n_threads=4)
start = time.time()
predictions_multi = predictor.predict_batch(large_batch)
time_multi = time.time() - start

print(f"Single-threaded: {time_single:.2f}s")
print(f"Multi-threaded: {time_multi:.2f}s")
print(f"Speedup: {time_single/time_multi:.2f}x")

ONNX Runtime for Faster Inference

import onnx
import onnxruntime as ort
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType

def convert_to_onnx(sklearn_model, n_features):
    """Convert scikit-learn model to ONNX format"""

    # Define input type
    initial_type = [('float_input', FloatTensorType([None, n_features]))]

    # Convert
    onnx_model = convert_sklearn(
        sklearn_model,
        initial_types=initial_type,
        target_opset=12
    )

    # Save
    onnx.save_model(onnx_model, 'model.onnx')

    return onnx_model

class ONNXPredictor:
    """Fast inference using ONNX Runtime"""

    def __init__(self, onnx_model_path):
        self.session = ort.InferenceSession(onnx_model_path)
        self.input_name = self.session.get_inputs()[0].name

    def predict(self, features):
        """Predict using ONNX Runtime"""

        features = np.array(features, dtype=np.float32)
        if len(features.shape) == 1:
            features = features.reshape(1, -1)

        result = self.session.run(None, {self.input_name: features})
        return result[0]

# Benchmark ONNX vs scikit-learn
# Convert model
convert_to_onnx(sklearn_model, n_features=10)

# Original scikit-learn
start = time.time()
for _ in range(1000):
    pred_sklearn = sklearn_model.predict(test_sample)
time_sklearn = time.time() - start

# ONNX Runtime
onnx_predictor = ONNXPredictor('model.onnx')
start = time.time()
for _ in range(1000):
    pred_onnx = onnx_predictor.predict(test_sample)
time_onnx = time.time() - start

print(f"Scikit-learn: {time_sklearn:.3f}s")
print(f"ONNX Runtime: {time_onnx:.3f}s")
print(f"Speedup: {time_sklearn/time_onnx:.2f}x")

Memory Optimization

import sys
import numpy as np

def optimize_memory_usage(df):
    """Reduce memory usage of DataFrame"""

    start_mem = df.memory_usage().sum() / 1024**2
    print(f"Memory usage before: {start_mem:.2f} MB")

    # Optimize integer columns
    for col in df.select_dtypes(include=['int']).columns:
        c_min = df[col].min()
        c_max = df[col].max()

        if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
            df[col] = df[col].astype(np.int8)
        elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
            df[col] = df[col].astype(np.int16)
        elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
            df[col] = df[col].astype(np.int32)

    # Optimize float columns
    for col in df.select_dtypes(include=['float']).columns:
        df[col] = df[col].astype(np.float32)

    end_mem = df.memory_usage().sum() / 1024**2
    print(f"Memory usage after: {end_mem:.2f} MB")
    print(f"Decreased by {100 * (start_mem - end_mem) / start_mem:.1f}%")

    return df

# Model quantization for memory reduction
import torch

def quantize_pytorch_model(model):
    """Quantize PyTorch model to int8"""

    model.eval()
    quantized_model = torch.quantization.quantize_dynamic(
        model,
        {torch.nn.Linear},
        dtype=torch.qint8
    )

    return quantized_model

# Compare sizes
print(f"Original model size: {sys.getsizeof(model) / 1024**2:.2f} MB")
quantized = quantize_pytorch_model(model)
print(f"Quantized model size: {sys.getsizeof(quantized) / 1024**2:.2f} MB")

---PAGE---

GPU Optimization

Efficiently utilize GPU resources for deep learning inference.

Batch Processing for GPU

import torch
import time

class GPUBatchPredictor:
    """Optimize GPU usage with batching"""

    def __init__(self, model, device='cuda', max_batch_size=32):
        self.model = model.to(device)
        self.device = device
        self.max_batch_size = max_batch_size
        self.model.eval()

    def predict_single(self, input_tensor):
        """Predict single sample (inefficient)"""

        with torch.no_grad():
            input_tensor = input_tensor.to(self.device)
            output = self.model(input_tensor.unsqueeze(0))
        return output.cpu()

    def predict_batch(self, input_tensors):
        """Predict batch of samples (efficient)"""

        with torch.no_grad():
            # Move entire batch to GPU
            batch_tensor = torch.stack(input_tensors).to(self.device)

            # Single forward pass
            outputs = self.model(batch_tensor)

        return outputs.cpu()

    def predict_stream(self, input_stream):
        """Process stream of inputs with dynamic batching"""

        batch = []
        results = []

        for input_tensor in input_stream:
            batch.append(input_tensor)

            # Process when batch is full
            if len(batch) >= self.max_batch_size:
                batch_results = self.predict_batch(batch)
                results.extend(batch_results)
                batch = []

        # Process remaining items
        if batch:
            batch_results = self.predict_batch(batch)
            results.extend(batch_results)

        return results

# Benchmark: Single vs Batched
predictor = GPUBatchPredictor(model)
test_inputs = [torch.randn(3, 224, 224) for _ in range(100)]

# Single predictions
start = time.time()
for input_tensor in test_inputs:
    pred = predictor.predict_single(input_tensor)
time_single = time.time() - start

# Batched predictions
start = time.time()
preds = predictor.predict_batch(test_inputs)
time_batch = time.time() - start

print(f"Single predictions: {time_single:.2f}s")
print(f"Batched predictions: {time_batch:.2f}s")
print(f"Speedup: {time_single/time_batch:.2f}x")

TensorRT Optimization

import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit

class TensorRTPredictor:
    """Ultra-fast inference with TensorRT"""

    def __init__(self, onnx_model_path):
        self.logger = trt.Logger(trt.Logger.WARNING)

        # Build TensorRT engine
        with trt.Builder(self.logger) as builder, \
             builder.create_network() as network, \
             trt.OnnxParser(network, self.logger) as parser:

            # Parse ONNX model
            with open(onnx_model_path, 'rb') as model_file:
                parser.parse(model_file.read())

            # Build engine with optimization
            config = builder.create_builder_config()
            config.max_workspace_size = 1 << 30  # 1GB

            # Enable FP16 mode for speed
            if builder.platform_has_fast_fp16:
                config.set_flag(trt.BuilderFlag.FP16)

            self.engine = builder.build_engine(network, config)

        # Create execution context
        self.context = self.engine.create_execution_context()

        # Allocate buffers
        self.input_shape = (1, 3, 224, 224)
        self.output_shape = (1, 1000)
        self.allocate_buffers()

    def allocate_buffers(self):
        """Allocate GPU memory for inputs/outputs"""

        self.h_input = cuda.pagelocked_empty(
            trt.volume(self.input_shape), dtype=np.float32
        )
        self.h_output = cuda.pagelocked_empty(
            trt.volume(self.output_shape), dtype=np.float32
        )

        self.d_input = cuda.mem_alloc(self.h_input.nbytes)
        self.d_output = cuda.mem_alloc(self.h_output.nbytes)

        self.stream = cuda.Stream()

    def predict(self, input_array):
        """Predict with TensorRT"""

        # Copy input to GPU
        np.copyto(self.h_input, input_array.ravel())
        cuda.memcpy_htod_async(self.d_input, self.h_input, self.stream)

        # Run inference
        self.context.execute_async_v2(
            bindings=[int(self.d_input), int(self.d_output)],
            stream_handle=self.stream.handle
        )

        # Copy output to CPU
        cuda.memcpy_dtoh_async(self.h_output, self.d_output, self.stream)
        self.stream.synchronize()

        return self.h_output.reshape(self.output_shape)

# Benchmark TensorRT vs PyTorch
# PyTorch (GPU)
start = time.time()
for _ in range(1000):
    pred_torch = pytorch_model(test_input)
time_torch = time.time() - start

# TensorRT
trt_predictor = TensorRTPredictor('model.onnx')
start = time.time()
for _ in range(1000):
    pred_trt = trt_predictor.predict(test_input)
time_trt = time.time() - start

print(f"PyTorch GPU: {time_torch:.3f}s")
print(f"TensorRT: {time_trt:.3f}s")
print(f"Speedup: {time_torch/time_trt:.2f}x")

Multi-GPU Inference

import torch
import torch.nn as nn

class MultiGPUPredictor:
    """Distribute inference across multiple GPUs"""

    def __init__(self, model, gpu_ids=[0, 1, 2, 3]):
        self.gpu_ids = gpu_ids
        self.num_gpus = len(gpu_ids)

        # Replicate model to all GPUs
        self.models = []
        for gpu_id in gpu_ids:
            model_copy = model.to(f'cuda:{gpu_id}')
            model_copy.eval()
            self.models.append(model_copy)

    def predict_batch(self, batch_inputs):
        """Split batch across GPUs"""

        # Split batch
        batch_size = len(batch_inputs)
        chunk_size = batch_size // self.num_gpus
        chunks = [
            batch_inputs[i:i+chunk_size]
            for i in range(0, batch_size, chunk_size)
        ]

        # Process on each GPU
        results = []
        for gpu_id, model, chunk in zip(self.gpu_ids, self.models, chunks):
            with torch.no_grad():
                chunk_tensor = torch.stack(chunk).to(f'cuda:{gpu_id}')
                output = model(chunk_tensor)
                results.append(output.cpu())

        # Combine results
        return torch.cat(results)

# Usage
predictor = MultiGPUPredictor(model, gpu_ids=[0, 1])
large_batch = [torch.randn(3, 224, 224) for _ in range(128)]
predictions = predictor.predict_batch(large_batch)

---PAGE---

Caching Strategies

Reduce redundant computations with intelligent caching.

Prediction Caching

import redis
import hashlib
import json
import pickle

class PredictionCache:
    """Cache predictions with Redis"""

    def __init__(self, redis_host='localhost', ttl=3600):
        self.redis_client = redis.Redis(host=redis_host, decode_responses=False)
        self.ttl = ttl  # Time to live in seconds

    def get_cache_key(self, features):
        """Generate cache key from features"""

        # Convert features to string
        features_str = json.dumps(features, sort_keys=True)

        # Hash for consistent key
        return hashlib.md5(features_str.encode()).hexdigest()

    def get(self, features):
        """Get cached prediction"""

        key = self.get_cache_key(features)
        cached = self.redis_client.get(key)

        if cached:
            return pickle.loads(cached)

        return None

    def set(self, features, prediction):
        """Cache prediction"""

        key = self.get_cache_key(features)
        self.redis_client.setex(
            key,
            self.ttl,
            pickle.dumps(prediction)
        )

    def predict_with_cache(self, model, features):
        """Predict with caching"""

        # Check cache first
        cached_pred = self.get(features)
        if cached_pred is not None:
            print("✓ Cache hit")
            return cached_pred

        # Cache miss - compute prediction
        print("⚠️ Cache miss")
        prediction = model.predict([features])[0]

        # Cache result
        self.set(features, prediction)

        return prediction

# Usage
cache = PredictionCache(ttl=1800)  # 30 minutes

# First call - cache miss
pred1 = cache.predict_with_cache(model, [1.0, 2.0, 3.0])

# Second call - cache hit (instant)
pred2 = cache.predict_with_cache(model, [1.0, 2.0, 3.0])

Feature Cache

class FeatureCache:
    """Cache expensive feature computations"""

    def __init__(self):
        self.cache = {}

    def compute_expensive_features(self, raw_data, user_id):
        """Compute features with caching"""

        cache_key = f"features_{user_id}"

        # Check cache
        if cache_key in self.cache:
            cached_features, timestamp = self.cache[cache_key]

            # Check if cache is fresh (< 1 hour old)
            if (datetime.now() - timestamp).seconds < 3600:
                print("✓ Using cached features")
                return cached_features

        # Compute features
        print("⚠️ Computing features...")
        features = self._expensive_computation(raw_data)

        # Cache result
        self.cache[cache_key] = (features, datetime.now())

        return features

    def _expensive_computation(self, raw_data):
        """Simulated expensive feature engineering"""
        import time
        time.sleep(0.5)  # Simulate expensive operation
        return [x * 2 for x in raw_data]

# Benchmark
cache = FeatureCache()

# First call - expensive
start = time.time()
features1 = cache.compute_expensive_features([1, 2, 3], user_id=123)
time1 = time.time() - start

# Second call - cached
start = time.time()
features2 = cache.compute_expensive_features([1, 2, 3], user_id=123)
time2 = time.time() - start

print(f"First call: {time1:.3f}s")
print(f"Second call: {time2:.3f}s")
print(f"Speedup: {time1/time2:.0f}x")

Model Cache (Warm Start)

class ModelCache:
    """Keep frequently used models in memory"""

    def __init__(self, max_models=3):
        self.max_models = max_models
        self.cache = {}
        self.access_count = {}

    def load_model(self, model_id):
        """Load model with caching"""

        # Check if already loaded
        if model_id in self.cache:
            self.access_count[model_id] += 1
            print(f"✓ Model {model_id} already loaded")
            return self.cache[model_id]

        # Load model
        print(f"⚠️ Loading model {model_id} from disk...")
        model = joblib.load(f'models/{model_id}.pkl')

        # Evict least used model if cache full
        if len(self.cache) >= self.max_models:
            least_used = min(self.access_count, key=self.access_count.get)
            print(f"Evicting model {least_used}")
            del self.cache[least_used]
            del self.access_count[least_used]

        # Add to cache
        self.cache[model_id] = model
        self.access_count[model_id] = 1

        return model

# Usage
model_cache = ModelCache(max_models=2)

# Load models
model_a = model_cache.load_model('model_v1')  # Load from disk
model_a = model_cache.load_model('model_v1')  # From cache
model_b = model_cache.load_model('model_v2')  # Load from disk
model_c = model_cache.load_model('model_v3')  # Load from disk, evicts model_a

---PAGE---

Distributed Inference

Scale beyond a single machine with distributed systems.

Ray for Distributed Inference

import ray
from ray import serve

# Initialize Ray
ray.init()

@serve.deployment(num_replicas=3, ray_actor_options={"num_cpus": 1})
class MLModelServe:
    """Distributed ML model serving with Ray Serve"""

    def __init__(self):
        import joblib
        self.model = joblib.load('model.pkl')

    async def __call__(self, request):
        """Handle prediction request"""

        data = await request.json()
        features = data['features']

        prediction = self.model.predict([features])[0]

        return {
            'prediction': float(prediction),
            'model_version': 'v1.0'
        }

# Deploy
serve.run(MLModelServe.bind())

# Client code
import requests

response = requests.post(
    'http://localhost:8000/MLModelServe',
    json={'features': [1.0, 2.0, 3.0]}
)

print(response.json())

Celery for Async Processing

from celery import Celery
import joblib

# Configure Celery
app = Celery(
    'ml_tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

# Load model once per worker
model = None

def load_model():
    """Load model in worker process"""
    global model
    if model is None:
        model = joblib.load('model.pkl')
    return model

@app.task(name='predict')
def predict_task(features):
    """Async prediction task"""

    model = load_model()
    prediction = model.predict([features])[0]

    return {
        'prediction': float(prediction),
        'status': 'completed'
    }

@app.task(name='batch_predict')
def batch_predict_task(batch_features):
    """Batch prediction task"""

    model = load_model()
    predictions = model.predict(batch_features)

    return {
        'predictions': predictions.tolist(),
        'count': len(predictions),
        'status': 'completed'
    }

# Client usage
from celery.result import AsyncResult

# Submit async task
result = predict_task.delay([1.0, 2.0, 3.0])

# Check status
print(f"Task ID: {result.id}")
print(f"Status: {result.status}")

# Get result (blocks until ready)
prediction = result.get(timeout=10)
print(f"Prediction: {prediction}")

# Batch processing
batch = [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]]
batch_result = batch_predict_task.delay(batch)
batch_predictions = batch_result.get()

---PAGE---

Cost Optimization

Reduce infrastructure costs while maintaining performance.

Spot Instances for Training

# AWS Spot Instance configuration
import boto3

ec2 = boto3.client('ec2', region_name='us-east-1')

def launch_spot_training(instance_type='p3.2xlarge'):
    """Launch spot instance for training"""

    # Spot instance specification
    launch_specification = {
        'ImageId': 'ami-xxxxx',  # Deep Learning AMI
        'InstanceType': instance_type,
        'KeyName': 'my-key-pair',
        'UserData': '''#!/bin/bash
            cd /home/ubuntu
            git clone https://github.com/my-org/ml-training.git
            cd ml-training
            pip install -r requirements.txt
            python train.py --config production.yaml
            aws s3 cp models/ s3://my-bucket/models/ --recursive
            shutdown -h now
        ''',
        'IamInstanceProfile': {
            'Name': 'ML-Training-Role'
        }
    }

    # Request spot instance
    response = ec2.request_spot_instances(
        SpotPrice='1.00',  # Max price per hour
        InstanceCount=1,
        Type='one-time',
        LaunchSpecification=launch_specification
    )

    print(f"✓ Spot instance requested: {response['SpotInstanceRequests'][0]['SpotInstanceRequestId']}")

    return response

Auto-scaling Based on Queue Length

class QueueBasedScaler:
    """Scale workers based on queue length"""

    def __init__(self, redis_client, min_workers=1, max_workers=10):
        self.redis = redis_client
        self.min_workers = min_workers
        self.max_workers = max_workers
        self.queue_name = 'ml_predictions'

    def get_queue_length(self):
        """Get current queue length"""
        return self.redis.llen(self.queue_name)

    def calculate_desired_workers(self):
        """Calculate how many workers we need"""

        queue_length = self.get_queue_length()

        # Scale: 1 worker per 10 items in queue
        desired = max(
            self.min_workers,
            min(queue_length // 10, self.max_workers)
        )

        return desired

    def scale_workers(self, current_workers):
        """Decide to scale up or down"""

        desired = self.calculate_desired_workers()

        if desired > current_workers:
            # Scale up
            return desired - current_workers, 'up'
        elif desired < current_workers:
            # Scale down
            return current_workers - desired, 'down'
        else:
            # No change
            return 0, 'stable'

# Monitor and scale
import time

scaler = QueueBasedScaler(redis_client)
current_workers = 2

while True:
    change, direction = scaler.scale_workers(current_workers)

    if change > 0:
        if direction == 'up':
            print(f"📈 Scaling UP by {change} workers (queue: {scaler.get_queue_length()})")
            # Trigger scale up
            current_workers += change
        else:
            print(f"📉 Scaling DOWN by {change} workers (queue: {scaler.get_queue_length()})")
            # Trigger scale down
            current_workers -= change

    time.sleep(30)  # Check every 30 seconds

Cost Monitoring

import boto3
from datetime import datetime, timedelta

class CostMonitor:
    """Monitor AWS costs for ML infrastructure"""

    def __init__(self):
        self.ce_client = boto3.client('ce', region_name='us-east-1')

    def get_daily_cost(self, service='Amazon SageMaker'):
        """Get cost for specific service"""

        end = datetime.now().date()
        start = end - timedelta(days=30)

        response = self.ce_client.get_cost_and_usage(
            TimePeriod={
                'Start': start.strftime('%Y-%m-%d'),
                'End': end.strftime('%Y-%m-%d')
            },
            Granularity='DAILY',
            Metrics=['UnblendedCost'],
            Filter={
                'Dimensions': {
                    'Key': 'SERVICE',
                    'Values': [service]
                }
            }
        )

        costs = []
        for result in response['ResultsByTime']:
            date = result['TimePeriod']['Start']
            cost = float(result['Total']['UnblendedCost']['Amount'])
            costs.append({'date': date, 'cost': cost})

        return costs

    def get_cost_forecast(self, days=7):
        """Get cost forecast"""

        end = (datetime.now() + timedelta(days=days)).date()
        start = datetime.now().date()

        response = self.ce_client.get_cost_forecast(
            TimePeriod={
                'Start': start.strftime('%Y-%m-%d'),
                'End': end.strftime('%Y-%m-%d')
            },
            Metric='UNBLENDED_COST',
            Granularity='DAILY'
        )

        forecast = float(response['Total']['Amount'])
        print(f"💰 Forecast for next {days} days: ${forecast:.2f}")

        return forecast

# Usage
monitor = CostMonitor()
costs = monitor.get_daily_cost('Amazon SageMaker')
total = sum(c['cost'] for c in costs)
print(f"Total SageMaker cost (30 days): ${total:.2f}")

forecast = monitor.get_cost_forecast(days=7)

---PAGE---

Performance Benchmarking

Measure and optimize system performance.

Load Testing

import concurrent.futures
import requests
import time
import statistics

class LoadTester:
    """Load test ML service"""

    def __init__(self, endpoint_url):
        self.endpoint_url = endpoint_url

    def single_request(self, request_id):
        """Make single prediction request"""

        start = time.time()

        try:
            response = requests.post(
                self.endpoint_url,
                json={'features': [1.0, 2.0, 3.0]},
                timeout=10
            )

            latency = time.time() - start

            return {
                'request_id': request_id,
                'status_code': response.status_code,
                'latency': latency,
                'success': response.status_code == 200
            }

        except Exception as e:
            return {
                'request_id': request_id,
                'status_code': 0,
                'latency': time.time() - start,
                'success': False,
                'error': str(e)
            }

    def run_load_test(self, num_requests=1000, concurrency=10):
        """Run load test with multiple concurrent users"""

        print(f"Starting load test:")
        print(f"  Requests: {num_requests}")
        print(f"  Concurrency: {concurrency}")

        start_time = time.time()
        results = []

        # Execute requests concurrently
        with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
            futures = [
                executor.submit(self.single_request, i)
                for i in range(num_requests)
            ]

            for future in concurrent.futures.as_completed(futures):
                results.append(future.result())

        total_time = time.time() - start_time

        # Analyze results
        self.analyze_results(results, total_time)

    def analyze_results(self, results, total_time):
        """Analyze load test results"""

        successful = [r for r in results if r['success']]
        failed = [r for r in results if not r['success']]

        latencies = [r['latency'] for r in successful]

        print("\n" + "="*50)
        print("LOAD TEST RESULTS")
        print("="*50)
        print(f"Total requests: {len(results)}")
        print(f"Successful: {len(successful)} ({len(successful)/len(results)*100:.1f}%)")
        print(f"Failed: {len(failed)} ({len(failed)/len(results)*100:.1f}%)")
        print(f"Total time: {total_time:.2f}s")
        print(f"Requests/second: {len(results)/total_time:.2f}")
        print()
        print("Latency statistics (successful requests):")
        print(f"  Min: {min(latencies)*1000:.0f}ms")
        print(f"  Max: {max(latencies)*1000:.0f}ms")
        print(f"  Mean: {statistics.mean(latencies)*1000:.0f}ms")
        print(f"  Median: {statistics.median(latencies)*1000:.0f}ms")
        print(f"  P95: {statistics.quantiles(latencies, n=20)[18]*1000:.0f}ms")
        print(f"  P99: {statistics.quantiles(latencies, n=100)[98]*1000:.0f}ms")
        print("="*50)

# Usage
tester = LoadTester('http://localhost:5000/predict')
tester.run_load_test(num_requests=1000, concurrency=20)

---PAGE---

---QUIZ--- TITLE: Scaling ML Systems Knowledge Check INTRO: Test your understanding of ML system scaling concepts!

Q: What is the main difference between horizontal and vertical scaling? A: Horizontal adds more servers; vertical increases resources on existing servers A: Horizontal is cheaper; vertical is more expensive A: Horizontal uses GPUs; vertical uses CPUs A: There is no difference CORRECT: 0 EXPLAIN: Horizontal scaling (scaling out) adds more machines to distribute load, while vertical scaling (scaling up) increases CPU, RAM, or GPU on existing machines. Both have different cost and complexity trade-offs.

Q: Why is batching important for GPU inference? A: It reduces memory usage A: It maximizes GPU utilization and throughput by processing multiple samples together A: It makes predictions more accurate A: It reduces model size CORRECT: 1 EXPLAIN: GPUs excel at parallel processing. Batching allows the GPU to process multiple samples simultaneously, fully utilizing its parallel processing capabilities. Single predictions leave most of the GPU idle.

Q: What is the purpose of prediction caching? A: To train models faster A: To avoid redundant computations by storing and reusing previous predictions A: To increase model accuracy A: To save model files CORRECT: 1 EXPLAIN: Prediction caching stores results of previous predictions so identical inputs return cached results instantly, avoiding expensive model inference. This is especially valuable when many users request predictions for similar or identical inputs.

Q: When should you use spot instances for ML workloads? A: For production inference serving A: For model training where interruptions are acceptable A: For real-time critical predictions A: Never, they are unreliable CORRECT: 1 EXPLAIN: Spot instances are much cheaper but can be terminated at any time. They're ideal for training (which can checkpoint and resume) but NOT for production serving where uptime is critical.

Q: What is TensorRT used for? A: Training models faster A: Optimizing inference performance on NVIDIA GPUs A: Data preprocessing A: Model versioning CORRECT: 1 EXPLAIN: TensorRT is NVIDIA's inference optimization library that dramatically speeds up model inference on GPUs through techniques like layer fusion, precision calibration, and kernel auto-tuning. It can provide 2-10x speedup over standard frameworks.

Q: What metric indicates when to scale up your ML service? A: Model accuracy drops A: High CPU/GPU utilization, increased latency, or growing request queue A: Training loss increases A: Number of features changes CORRECT: 1 EXPLAIN: Scaling indicators include high resource utilization (>70-80% CPU/GPU), increasing response latency, or growing request queues. These show your current capacity can't handle the load.

Q: What is ONNX Runtime used for? A: Training models A: Cross-platform optimized model inference A: Data validation A: Model monitoring CORRECT: 1 EXPLAIN: ONNX Runtime is a cross-platform inference engine that runs ONNX format models with optimizations for different hardware (CPU, GPU, edge devices). It often provides 2-5x speedup over native framework inference.

Q: How does multi-GPU inference work? A: One model on one GPU processes all requests A: Batch is split across GPUs, each processes a portion in parallel A: Multiple models train simultaneously A: GPUs take turns processing requests CORRECT: 1 EXPLAIN: Multi-GPU inference splits a large batch across multiple GPUs, with each GPU processing its portion in parallel. The results are then combined, providing near-linear speedup with the number of GPUs. ---END-QUIZ---

---PAGE---

Key Takeaways

Congratulations! You've completed the entire MLOps series!

MLOps Series Summary

You've learned the complete MLOps pipeline:

Part 1: DeploymentPart 2: MonitoringPart 3: CI/CDPart 4: Scaling

Part 4 Core Concepts

Horizontal Scaling: Load balancing, Kubernetes auto-scaling

Vertical Scaling: CPU/GPU optimization, memory management

GPU Optimization: Batching, TensorRT, multi-GPU inference

Caching: Prediction cache, feature cache, model cache

Distributed Systems: Ray, Celery for async processing

Cost Optimization: Spot instances, auto-scaling, monitoring

Complete MLOps Skills

Deployment: REST APIs, containers, deployment strategies

Monitoring: Drift detection, performance tracking, alerting

CI/CD: Automated testing, ML pipelines, experiment tracking

Scaling: Load balancing, GPU optimization, cost management

Production ML Checklist

  • Model deployed with proper serving infrastructure
  • Monitoring for drift, performance, and data quality
  • Automated testing for code, data, and models
  • CI/CD pipeline for continuous training/deployment
  • Horizontal scaling with load balancing
  • GPU optimization for deep learning workloads
  • Caching for common predictions
  • Cost monitoring and optimization
  • Rollback strategy for failed deployments
  • Documentation and runbooks

Resources for Continued Learning

Deployment & Serving:

Monitoring:

CI/CD:

Scaling:

Cloud Platforms:

Next Steps

Now that you've completed the MLOps series:

  1. Apply these concepts to your own ML projects
  2. Start small: Implement one improvement at a time
  3. Measure impact: Track metrics before and after changes
  4. Iterate: Continuously improve your ML systems
  5. Share knowledge: Help others learn MLOps

Final Thoughts

MLOps is a journey, not a destination. The field evolves rapidly with new tools and best practices. Stay curious, keep learning, and focus on delivering reliable, scalable ML systems that create real value.

Thank you for completing the MLOps series! 🎉

Happy building!

MLOps Part 4: Scaling ML Systems | Software Engineer Blog