MLOps Part 3: CI/CD for Machine Learning
Learn how to implement continuous integration and continuous deployment for ML systems, including automated testing, ML pipelines, experiment tracking, and deployment automation
Introduction to CI/CD for ML
Continuous Integration and Continuous Deployment (CI/CD) for machine learning extends traditional software engineering practices to handle the unique challenges of ML systems.
What You'll Learn
In this module, you'll understand:
- Automated testing: Unit tests, integration tests, model tests
- ML pipelines: Building reproducible training workflows
- Continuous training: Automated model retraining
- Version control: Managing models, data, and code
- Experiment tracking: Tracking experiments with MLflow
Why CI/CD for ML is Different
Traditional software CI/CD focuses on code. ML CI/CD must handle:
- Code + Data + Models
- Non-deterministic outputs (model training)
- Long-running processes (training can take hours/days)
- Multiple experiment versions
- Data validation and versioning
Without proper CI/CD, ML projects become chaotic!
---PAGE---
Automated Testing for ML
Testing ML systems requires testing code, data, and model behavior.
1. Unit Tests for ML Code
Test data preprocessing, feature engineering, and utility functions:
import pytest
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
def preprocess_features(df):
"""Preprocess features for model training"""
# Remove duplicates
df = df.drop_duplicates()
# Fill missing values
df = df.fillna(df.median())
# Scale features
scaler = StandardScaler()
numerical_cols = df.select_dtypes(include=[np.number]).columns
df[numerical_cols] = scaler.fit_transform(df[numerical_cols])
return df
# Unit tests
def test_preprocess_removes_duplicates():
"""Test that preprocessing removes duplicate rows"""
df = pd.DataFrame({
'age': [25, 30, 25],
'income': [50000, 60000, 50000]
})
result = preprocess_features(df)
assert len(result) == 2 # One duplicate removed
def test_preprocess_fills_missing_values():
"""Test that missing values are filled"""
df = pd.DataFrame({
'age': [25, np.nan, 35],
'income': [50000, 60000, 70000]
})
result = preprocess_features(df)
assert result.isnull().sum().sum() == 0 # No missing values
def test_preprocess_scales_features():
"""Test that features are standardized"""
df = pd.DataFrame({
'age': [25, 30, 35],
'income': [50000, 60000, 70000]
})
result = preprocess_features(df)
# Check mean is close to 0 and std is close to 1
assert abs(result['age'].mean()) < 0.01
assert abs(result['age'].std() - 1.0) < 0.01
2. Data Validation Tests
Validate data schema, types, and distributions:
import great_expectations as ge
def test_data_schema(df):
"""Validate data schema and types"""
# Convert to Great Expectations dataset
ge_df = ge.from_pandas(df)
# Define expectations
ge_df.expect_table_columns_to_match_ordered_list([
'age', 'income', 'credit_score', 'label'
])
ge_df.expect_column_values_to_be_of_type('age', 'int64')
ge_df.expect_column_values_to_be_of_type('income', 'float64')
# Validate
results = ge_df.validate()
assert results['success'], f"Data validation failed: {results}"
def test_data_ranges(df):
"""Test that data values are in expected ranges"""
assert df['age'].min() >= 18, "Age should be >= 18"
assert df['age'].max() <= 100, "Age should be <= 100"
assert df['income'].min() >= 0, "Income should be non-negative"
assert df['credit_score'].between(300, 850).all(), "Credit score out of range"
def test_data_completeness(df):
"""Test for missing values in critical columns"""
critical_columns = ['age', 'income', 'label']
for col in critical_columns:
missing_pct = df[col].isnull().mean()
assert missing_pct < 0.05, f"{col} has {missing_pct:.1%} missing values"
def test_class_distribution(df):
"""Test that classes are balanced enough"""
class_dist = df['label'].value_counts(normalize=True)
min_class_pct = class_dist.min()
assert min_class_pct >= 0.1, f"Class imbalance too severe: {min_class_pct:.1%}"
3. Model Quality Tests
Test model performance and behavior:
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import cross_val_score
def test_model_accuracy_threshold(model, X_test, y_test):
"""Test that model meets minimum accuracy threshold"""
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
MINIMUM_ACCURACY = 0.75
assert accuracy >= MINIMUM_ACCURACY, \
f"Model accuracy {accuracy:.2%} below threshold {MINIMUM_ACCURACY:.2%}"
def test_model_cross_validation(model, X, y):
"""Test model performance with cross-validation"""
scores = cross_val_score(model, X, y, cv=5, scoring='f1_weighted')
mean_score = scores.mean()
MINIMUM_F1 = 0.70
assert mean_score >= MINIMUM_F1, \
f"Cross-validation F1 {mean_score:.2%} below threshold"
def test_model_predictions_validity(model, X_test):
"""Test that predictions are valid"""
predictions = model.predict(X_test)
# No NaN predictions
assert not np.isnan(predictions).any(), "Model produced NaN predictions"
# Predictions in valid range
assert predictions.min() >= 0, "Negative predictions found"
assert predictions.max() <= 1, "Predictions > 1 found"
def test_model_inference_time(model, X_test):
"""Test that inference is fast enough"""
import time
MAX_LATENCY = 0.1 # 100ms
start = time.time()
predictions = model.predict(X_test[:100])
latency = (time.time() - start) / 100
assert latency < MAX_LATENCY, \
f"Inference too slow: {latency*1000:.1f}ms per prediction"
def test_model_determinism(model, X_test):
"""Test that model produces consistent predictions"""
pred1 = model.predict(X_test)
pred2 = model.predict(X_test)
assert np.array_equal(pred1, pred2), \
"Model produced different predictions for same input"
4. Integration Tests
Test the entire ML pipeline:
def test_full_training_pipeline():
"""Test complete training pipeline end-to-end"""
# 1. Load data
df = load_training_data()
assert len(df) > 0, "No training data loaded"
# 2. Preprocess
df_processed = preprocess_features(df)
assert df_processed.shape[1] == df.shape[1], "Features lost during preprocessing"
# 3. Split data
X_train, X_test, y_train, y_test = train_test_split(
df_processed.drop('label', axis=1),
df_processed['label'],
test_size=0.2
)
# 4. Train model
model = train_model(X_train, y_train)
assert model is not None, "Model training failed"
# 5. Evaluate
accuracy = evaluate_model(model, X_test, y_test)
assert accuracy > 0.7, f"End-to-end accuracy too low: {accuracy:.2%}"
# 6. Save model
model_path = save_model(model, "test_model.pkl")
assert os.path.exists(model_path), "Model not saved"
# 7. Load and predict
loaded_model = load_model(model_path)
predictions = loaded_model.predict(X_test)
assert len(predictions) == len(X_test), "Prediction count mismatch"
---PAGE---
ML Pipeline Orchestration
Build reproducible, automated training pipelines.
Pipeline with Scikit-learn Pipeline
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
def create_ml_pipeline():
"""Create a complete ML pipeline"""
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(random_state=42))
])
# Define hyperparameter grid
param_grid = {
'classifier__n_estimators': [50, 100, 200],
'classifier__max_depth': [10, 20, None],
'classifier__min_samples_split': [2, 5, 10]
}
# Create grid search
grid_search = GridSearchCV(
pipeline,
param_grid,
cv=5,
scoring='f1_weighted',
n_jobs=-1,
verbose=1
)
return grid_search
# Usage
pipeline = create_ml_pipeline()
pipeline.fit(X_train, y_train)
print(f"Best parameters: {pipeline.best_params_}")
print(f"Best F1 score: {pipeline.best_score_:.3f}")
# Save entire pipeline
import joblib
joblib.dump(pipeline.best_estimator_, 'pipeline.pkl')
Airflow DAG for ML Pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'email': ['ml-alerts@company.com'],
'email_on_failure': True,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'ml_training_pipeline',
default_args=default_args,
description='Daily ML model training pipeline',
schedule_interval='0 2 * * *', # Run at 2 AM daily
catchup=False
)
def extract_data(**context):
"""Extract data from database"""
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@localhost/db')
df = pd.read_sql('SELECT * FROM training_data WHERE date > NOW() - INTERVAL 30 days', engine)
# Save to shared location
df.to_parquet('/data/raw_data.parquet')
print(f"Extracted {len(df)} rows")
def validate_data(**context):
"""Validate data quality"""
df = pd.read_parquet('/data/raw_data.parquet')
# Run validations
assert len(df) > 1000, "Insufficient data"
assert df.isnull().sum().sum() / df.size < 0.05, "Too many missing values"
print("✓ Data validation passed")
def train_model(**context):
"""Train ML model"""
df = pd.read_parquet('/data/raw_data.parquet')
X = df.drop('label', axis=1)
y = df['label']
pipeline = create_ml_pipeline()
pipeline.fit(X, y)
# Save model
joblib.dump(pipeline, f'/models/model_{datetime.now():%Y%m%d}.pkl')
print(f"✓ Model trained - F1: {pipeline.best_score_:.3f}")
def evaluate_model(**context):
"""Evaluate model on test set"""
# Load latest model
model_path = sorted(glob.glob('/models/model_*.pkl'))[-1]
model = joblib.load(model_path)
# Load test data
test_df = pd.read_parquet('/data/test_data.parquet')
X_test = test_df.drop('label', axis=1)
y_test = test_df['label']
# Evaluate
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
# Check threshold
assert accuracy > 0.75, f"Model accuracy {accuracy:.2%} below threshold"
print(f"✓ Model evaluation - Accuracy: {accuracy:.2%}")
def deploy_model(**context):
"""Deploy model to production"""
model_path = sorted(glob.glob('/models/model_*.pkl'))[-1]
# Copy to production location
import shutil
shutil.copy(model_path, '/production/models/current_model.pkl')
print(f"✓ Model deployed: {model_path}")
# Define tasks
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
dag=dag
)
validate_task = PythonOperator(
task_id='validate_data',
python_callable=validate_data,
dag=dag
)
train_task = PythonOperator(
task_id='train_model',
python_callable=train_model,
dag=dag
)
evaluate_task = PythonOperator(
task_id='evaluate_model',
python_callable=evaluate_model,
dag=dag
)
deploy_task = PythonOperator(
task_id='deploy_model',
python_callable=deploy_model,
dag=dag
)
# Define dependencies
extract_task >> validate_task >> train_task >> evaluate_task >> deploy_task
---PAGE---
Experiment Tracking with MLflow
Track experiments, parameters, metrics, and models systematically.
Basic MLflow Tracking
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
# Set experiment
mlflow.set_experiment("fraud_detection")
def train_with_mlflow(X_train, y_train, X_test, y_test, params):
"""Train model with MLflow tracking"""
with mlflow.start_run(run_name=f"rf_experiment_{datetime.now():%Y%m%d_%H%M}"):
# Log parameters
mlflow.log_params(params)
# Log dataset info
mlflow.log_param("train_size", len(X_train))
mlflow.log_param("test_size", len(X_test))
mlflow.log_param("n_features", X_train.shape[1])
# Train model
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Make predictions
y_pred = model.predict(X_test)
# Calculate metrics
metrics = {
'accuracy': accuracy_score(y_test, y_pred),
'f1_score': f1_score(y_test, y_pred, average='weighted'),
'precision': precision_score(y_test, y_pred, average='weighted'),
'recall': recall_score(y_test, y_pred, average='weighted')
}
# Log metrics
mlflow.log_metrics(metrics)
# Log model
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="fraud_detector"
)
# Log feature importance plot
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
importances = model.feature_importances_
indices = np.argsort(importances)[::-1][:10]
plt.bar(range(10), importances[indices])
plt.title('Top 10 Feature Importances')
plt.savefig('feature_importance.png')
mlflow.log_artifact('feature_importance.png')
plt.close()
print(f"✓ Experiment logged - F1: {metrics['f1_score']:.3f}")
return model, metrics
# Run multiple experiments
param_sets = [
{'n_estimators': 50, 'max_depth': 10},
{'n_estimators': 100, 'max_depth': 20},
{'n_estimators': 200, 'max_depth': None}
]
for params in param_sets:
train_with_mlflow(X_train, y_train, X_test, y_test, params)
MLflow Model Registry
from mlflow.tracking import MlflowClient
client = MlflowClient()
def promote_model_to_production(model_name, run_id):
"""Promote model from staging to production"""
# Get model version
model_version = client.search_model_versions(
f"run_id='{run_id}'"
)[0].version
# Transition to staging first
client.transition_model_version_stage(
name=model_name,
version=model_version,
stage="Staging"
)
print(f"✓ Model version {model_version} moved to Staging")
# After testing in staging, promote to production
client.transition_model_version_stage(
name=model_name,
version=model_version,
stage="Production"
)
print(f"✓ Model version {model_version} promoted to Production")
def load_production_model(model_name):
"""Load latest production model"""
model_uri = f"models:/{model_name}/Production"
model = mlflow.sklearn.load_model(model_uri)
return model
# Usage
production_model = load_production_model("fraud_detector")
predictions = production_model.predict(new_data)
Comparing Experiments
def compare_experiments(experiment_name):
"""Compare all runs in an experiment"""
experiment = mlflow.get_experiment_by_name(experiment_name)
runs = mlflow.search_runs(experiment.experiment_id)
# Sort by F1 score
runs_sorted = runs.sort_values('metrics.f1_score', ascending=False)
print("Top 5 Experiments:")
print(runs_sorted[['run_id', 'params.n_estimators', 'params.max_depth',
'metrics.accuracy', 'metrics.f1_score']].head())
# Get best run
best_run = runs_sorted.iloc[0]
print(f"\n✓ Best model - F1: {best_run['metrics.f1_score']:.3f}")
print(f" Run ID: {best_run['run_id']}")
print(f" Parameters: {best_run['params']}")
return best_run
best_run = compare_experiments("fraud_detection")
---PAGE---
Version Control for ML
Version control for ML involves code, data, and models.
Git for Code
# Standard Git workflow
git checkout -b feature/new-model-architecture
git add src/models/new_model.py
git add tests/test_new_model.py
git commit -m "feat: Add new transformer-based model architecture"
git push origin feature/new-model-architecture
DVC for Data and Models
DVC (Data Version Control) tracks large files and datasets:
# Initialize DVC
dvc init
# Add data to DVC
dvc add data/train_data.parquet
git add data/train_data.parquet.dvc data/.gitignore
git commit -m "Add training data"
# Add model to DVC
dvc add models/model.pkl
git add models/model.pkl.dvc models/.gitignore
git commit -m "Add trained model"
# Configure remote storage (S3, GCS, Azure, etc.)
dvc remote add -d storage s3://my-bucket/dvc-storage
dvc push
# Pull data/models on another machine
dvc pull
DVC Pipeline
Define reproducible ML pipelines:
# dvc.yaml
stages:
prepare:
cmd: python src/prepare_data.py
deps:
- data/raw_data.csv
params:
- prepare.test_split
outs:
- data/train.parquet
- data/test.parquet
train:
cmd: python src/train.py
deps:
- data/train.parquet
- src/train.py
params:
- train.n_estimators
- train.max_depth
outs:
- models/model.pkl
metrics:
- metrics.json:
cache: false
evaluate:
cmd: python src/evaluate.py
deps:
- data/test.parquet
- models/model.pkl
metrics:
- evaluation.json:
cache: false
Run the pipeline:
# Run entire pipeline
dvc repro
# Run specific stage
dvc repro train
# Compare metrics across experiments
dvc metrics show
dvc metrics diff main workspace
Model Versioning Best Practices
import hashlib
import json
from datetime import datetime
class ModelVersionManager:
"""Manage model versions with metadata"""
def __init__(self, models_dir='models'):
self.models_dir = models_dir
os.makedirs(models_dir, exist_ok=True)
def save_model(self, model, metadata):
"""Save model with version metadata"""
# Generate version ID
version_id = datetime.now().strftime('%Y%m%d_%H%M%S')
# Calculate model hash
model_bytes = joblib.dumps(model)
model_hash = hashlib.sha256(model_bytes).hexdigest()[:8]
# Full version name
version_name = f"v{version_id}_{model_hash}"
# Save model
model_path = os.path.join(self.models_dir, f"{version_name}.pkl")
joblib.dump(model, model_path)
# Save metadata
full_metadata = {
'version': version_name,
'timestamp': datetime.now().isoformat(),
'model_hash': model_hash,
**metadata
}
metadata_path = os.path.join(self.models_dir, f"{version_name}_metadata.json")
with open(metadata_path, 'w') as f:
json.dump(full_metadata, f, indent=2)
print(f"✓ Model saved: {version_name}")
print(f" Accuracy: {metadata.get('accuracy', 'N/A')}")
return version_name
def load_model(self, version_name):
"""Load model by version name"""
model_path = os.path.join(self.models_dir, f"{version_name}.pkl")
metadata_path = os.path.join(self.models_dir, f"{version_name}_metadata.json")
model = joblib.load(model_path)
with open(metadata_path, 'r') as f:
metadata = json.load(f)
return model, metadata
def list_versions(self):
"""List all model versions"""
import glob
metadata_files = glob.glob(os.path.join(self.models_dir, '*_metadata.json'))
versions = []
for metadata_file in metadata_files:
with open(metadata_file, 'r') as f:
versions.append(json.load(f))
# Sort by timestamp
versions.sort(key=lambda x: x['timestamp'], reverse=True)
return versions
# Usage
manager = ModelVersionManager()
# Save model
version = manager.save_model(model, {
'accuracy': 0.92,
'f1_score': 0.89,
'training_samples': 10000,
'algorithm': 'RandomForest',
'hyperparameters': {'n_estimators': 100, 'max_depth': 20}
})
# List versions
versions = manager.list_versions()
for v in versions:
print(f"{v['version']}: Accuracy={v['accuracy']:.2%}")
# Load specific version
model, metadata = manager.load_model(version)
---PAGE---
Continuous Training
Automate model retraining based on triggers.
Trigger-based Retraining
from datetime import datetime, timedelta
class RetrainingScheduler:
"""Schedule and trigger model retraining"""
def __init__(self):
self.last_training = None
self.performance_history = []
def should_retrain(self, current_metrics, data_drift_score):
"""Decide if retraining is needed"""
triggers = []
# 1. Time-based trigger
if self._time_trigger():
triggers.append("time_elapsed")
# 2. Performance degradation trigger
if self._performance_trigger(current_metrics):
triggers.append("performance_drop")
# 3. Data drift trigger
if self._drift_trigger(data_drift_score):
triggers.append("data_drift")
# 4. Data volume trigger
if self._data_volume_trigger():
triggers.append("new_data_available")
if triggers:
print(f"🔄 Retraining triggered by: {', '.join(triggers)}")
return True
return False
def _time_trigger(self):
"""Retrain every 7 days"""
if self.last_training is None:
return True
days_since_training = (datetime.now() - self.last_training).days
return days_since_training >= 7
def _performance_trigger(self, current_metrics):
"""Retrain if performance drops > 5%"""
if not self.performance_history:
return False
baseline = max(self.performance_history[-30:]) # Best of last 30 days
current = current_metrics.get('f1_score', 0)
drop = (baseline - current) / baseline
return drop > 0.05
def _drift_trigger(self, drift_score):
"""Retrain if significant drift detected"""
DRIFT_THRESHOLD = 0.2
return drift_score > DRIFT_THRESHOLD
def _data_volume_trigger(self):
"""Retrain if 20% more data available"""
# Check if new data volume exceeds threshold
# Implementation depends on your data storage
return False
def record_training(self, metrics):
"""Record that training occurred"""
self.last_training = datetime.now()
self.performance_history.append(metrics.get('f1_score', 0))
# Usage
scheduler = RetrainingScheduler()
# In production monitoring loop
current_performance = {'f1_score': 0.85}
drift_score = 0.25
if scheduler.should_retrain(current_performance, drift_score):
# Trigger retraining pipeline
trigger_training_pipeline()
scheduler.record_training(new_metrics)
Automated Retraining Pipeline
def automated_retraining_pipeline():
"""Complete automated retraining pipeline"""
print("=" * 50)
print("AUTOMATED RETRAINING PIPELINE")
print("=" * 50)
# 1. Extract new training data
print("\n[1/7] Extracting data...")
df = extract_recent_data(days=30)
print(f"✓ Extracted {len(df)} samples")
# 2. Validate data quality
print("\n[2/7] Validating data...")
validate_data_quality(df)
print("✓ Data validation passed")
# 3. Prepare features
print("\n[3/7] Preparing features...")
X, y = prepare_features(df)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
print(f"✓ Train: {len(X_train)}, Test: {len(X_test)}")
# 4. Train new model
print("\n[4/7] Training model...")
new_model = train_model_with_mlflow(X_train, y_train)
print("✓ Model trained")
# 5. Evaluate new model
print("\n[5/7] Evaluating model...")
new_metrics = evaluate_model(new_model, X_test, y_test)
print(f"✓ Accuracy: {new_metrics['accuracy']:.2%}, F1: {new_metrics['f1_score']:.2%}")
# 6. Compare with production model
print("\n[6/7] Comparing with production...")
production_model = load_production_model()
prod_metrics = evaluate_model(production_model, X_test, y_test)
improvement = new_metrics['f1_score'] - prod_metrics['f1_score']
print(f" Production F1: {prod_metrics['f1_score']:.2%}")
print(f" New model F1: {new_metrics['f1_score']:.2%}")
print(f" Improvement: {improvement:+.2%}")
# 7. Deploy if better
print("\n[7/7] Deployment decision...")
if improvement > 0.01: # At least 1% improvement
deploy_model(new_model)
print("✓ NEW MODEL DEPLOYED")
else:
print("⚠️ New model not better - keeping production model")
print("\n" + "=" * 50)
print("PIPELINE COMPLETE")
print("=" * 50)
---PAGE---
GitHub Actions for ML CI/CD
Automate testing and deployment with GitHub Actions.
Testing Workflow
# .github/workflows/ml-tests.yml
name: ML Tests
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run unit tests
run: |
pytest tests/unit -v --cov=src
- name: Run data validation tests
run: |
pytest tests/data -v
- name: Run model tests
run: |
pytest tests/models -v
- name: Upload coverage
uses: codecov/codecov-action@v3
Training Workflow
# .github/workflows/train-model.yml
name: Train Model
on:
schedule:
- cron: '0 2 * * 0' # Weekly on Sunday at 2 AM
workflow_dispatch: # Manual trigger
jobs:
train:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Download training data
run: |
python scripts/download_data.py
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Train model
run: |
python src/train.py --config configs/production.yaml
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
- name: Evaluate model
run: |
python src/evaluate.py
- name: Upload model artifact
uses: actions/upload-artifact@v3
with:
name: trained-model
path: models/model.pkl
- name: Send notification
if: success()
uses: 8398a7/action-slack@v3
with:
status: ${{ job.status }}
text: 'Model training completed successfully'
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}
Deployment Workflow
# .github/workflows/deploy-model.yml
name: Deploy Model
on:
workflow_run:
workflows: ["Train Model"]
types:
- completed
jobs:
deploy:
runs-on: ubuntu-latest
if: ${{ github.event.workflow_run.conclusion == 'success' }}
steps:
- uses: actions/checkout@v3
- name: Download model
uses: actions/download-artifact@v3
with:
name: trained-model
path: models/
- name: Test model
run: |
python tests/deployment/test_model_ready.py
- name: Deploy to staging
run: |
python scripts/deploy.py --env staging
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Run smoke tests
run: |
python tests/deployment/smoke_tests.py --env staging
- name: Deploy to production
if: success()
run: |
python scripts/deploy.py --env production
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Notify deployment
if: always()
uses: 8398a7/action-slack@v3
with:
status: ${{ job.status }}
text: 'Model deployment ${{ job.status }}'
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}
---PAGE---
Best Practices for ML CI/CD
1. Separate Test Environments
# config/environments.py
class Config:
"""Base configuration"""
TESTING = False
MODEL_PATH = 'models/production'
class DevelopmentConfig(Config):
"""Development configuration"""
DEBUG = True
DATABASE_URI = 'postgresql://localhost/ml_dev'
MODEL_PATH = 'models/development'
class StagingConfig(Config):
"""Staging configuration"""
DATABASE_URI = 'postgresql://staging-db/ml_staging'
MODEL_PATH = 's3://staging-bucket/models'
class ProductionConfig(Config):
"""Production configuration"""
DATABASE_URI = 'postgresql://prod-db/ml_prod'
MODEL_PATH = 's3://production-bucket/models'
class TestConfig(Config):
"""Test configuration"""
TESTING = True
DATABASE_URI = 'postgresql://localhost/ml_test'
MODEL_PATH = 'models/test'
# Load config based on environment
import os
env = os.getenv('ENVIRONMENT', 'development')
config = {
'development': DevelopmentConfig,
'staging': StagingConfig,
'production': ProductionConfig,
'test': TestConfig
}[env]
2. Feature Flags for Gradual Rollouts
class FeatureFlags:
"""Control feature rollout"""
def __init__(self):
self.flags = {
'new_model_v2': {
'enabled': False,
'rollout_percentage': 0
},
'advanced_preprocessing': {
'enabled': True,
'rollout_percentage': 50
}
}
def is_enabled(self, feature_name, user_id=None):
"""Check if feature is enabled for user"""
flag = self.flags.get(feature_name, {})
if not flag.get('enabled', False):
return False
rollout_pct = flag.get('rollout_percentage', 0)
if rollout_pct == 100:
return True
if user_id:
# Deterministic rollout based on user ID
import hashlib
hash_val = int(hashlib.md5(str(user_id).encode()).hexdigest(), 16)
return (hash_val % 100) < rollout_pct
return False
# Usage in prediction service
flags = FeatureFlags()
def predict(features, user_id):
"""Make prediction with feature flags"""
if flags.is_enabled('new_model_v2', user_id):
model = load_model('v2')
else:
model = load_model('v1')
if flags.is_enabled('advanced_preprocessing', user_id):
features = advanced_preprocess(features)
else:
features = standard_preprocess(features)
return model.predict([features])
3. Rollback Strategy
class ModelDeployment:
"""Manage model deployments with rollback"""
def __init__(self):
self.deployments = []
def deploy(self, model_version, environment='production'):
"""Deploy a model version"""
deployment = {
'version': model_version,
'environment': environment,
'timestamp': datetime.now(),
'status': 'active'
}
# Mark previous deployment as inactive
for d in self.deployments:
if d['environment'] == environment and d['status'] == 'active':
d['status'] = 'inactive'
self.deployments.append(deployment)
# Copy model to production path
self._deploy_model_file(model_version, environment)
print(f"✓ Deployed {model_version} to {environment}")
def rollback(self, environment='production'):
"""Rollback to previous version"""
# Get current and previous deployments
env_deployments = [d for d in self.deployments if d['environment'] == environment]
env_deployments.sort(key=lambda x: x['timestamp'], reverse=True)
if len(env_deployments) < 2:
print("❌ No previous version to rollback to")
return False
current = env_deployments[0]
previous = env_deployments[1]
# Mark current as rolled back
current['status'] = 'rolled_back'
# Reactivate previous
previous['status'] = 'active'
# Deploy previous model
self._deploy_model_file(previous['version'], environment)
print(f"✓ Rolled back from {current['version']} to {previous['version']}")
return True
def _deploy_model_file(self, version, environment):
"""Copy model file to deployment location"""
import shutil
src = f"models/{version}.pkl"
dst = f"deployments/{environment}/current_model.pkl"
shutil.copy(src, dst)
# Usage
deployment = ModelDeployment()
# Deploy new version
deployment.deploy('v20251020_001')
# If issues detected, rollback
deployment.rollback()
---PAGE---
---QUIZ--- TITLE: CI/CD for ML Knowledge Check INTRO: Test your understanding of CI/CD practices for machine learning!
Q: What are the three main components that need version control in ML systems? A: Code, Data, and Models A: Code, Tests, and Documentation A: Models, APIs, and Databases A: Data, Metrics, and Logs CORRECT: 0 EXPLAIN: ML systems require versioning of Code (algorithms, preprocessing), Data (training datasets), and Models (trained artifacts). This is different from traditional software which primarily versions code.
Q: What is the purpose of DVC (Data Version Control)? A: To version control large data files and ML models A: To replace Git for code versioning A: To deploy models to production A: To monitor model performance CORRECT: 0 EXPLAIN: DVC is specifically designed to version control large files like datasets and ML models, which are too large for Git. It works alongside Git to provide complete version control for ML projects.
Q: In MLflow, what is the Model Registry used for? A: Storing training data A: Managing model lifecycle and promoting models between stages A: Running experiments A: Tracking metrics only CORRECT: 1 EXPLAIN: The MLflow Model Registry manages the full lifecycle of ML models, including versioning, stage transitions (Staging → Production), and model lineage. It helps teams collaborate on model deployment.
Q: What triggers should be considered for automated model retraining? A: Only time-based triggers (e.g., weekly) A: Time, performance degradation, data drift, and new data availability A: Only when manually triggered by data scientists A: Only when the model crashes CORRECT: 1 EXPLAIN: Effective retraining automation uses multiple triggers: time-based (regular intervals), performance drops, data drift detection, and sufficient new data availability. Relying on a single trigger can miss important retraining opportunities.
Q: What is the main difference between unit tests and integration tests for ML? A: Unit tests are faster than integration tests A: Unit tests check individual components; integration tests verify the entire pipeline A: Integration tests are optional A: Unit tests only test the model CORRECT: 1 EXPLAIN: Unit tests validate individual components (preprocessing functions, feature engineering) in isolation. Integration tests verify that the entire ML pipeline works end-to-end, from data loading to prediction.
Q: What should you test when validating ML model quality? A: Only accuracy on test set A: Accuracy, inference speed, prediction validity, and determinism A: Only the training loss A: Only the code syntax CORRECT: 1 EXPLAIN: Comprehensive model testing includes accuracy/performance metrics, inference latency (production requirement), prediction validity (no NaN, correct ranges), and determinism (reproducible predictions).
Q: What is a feature flag in ML deployment? A: A way to gradually roll out new models to a percentage of users A: A type of model parameter A: A data quality check A: A testing framework CORRECT: 0 EXPLAIN: Feature flags enable gradual rollout of new models or features by controlling what percentage of users see the new version. This allows safe deployment and easy rollback if issues arise.
Q: Why is rollback capability important in ML deployments? A: To save storage space A: To quickly revert to a previous working model if the new one performs poorly A: To delete old models A: To reduce costs CORRECT: 1 EXPLAIN: Rollback capability is critical because new models might perform poorly in production despite good test metrics. Quick rollback to a known-good previous version minimizes business impact while investigating issues. ---END-QUIZ---
---PAGE---
Key Takeaways
Congratulations! You've completed Part 3 of the MLOps series on CI/CD for Machine Learning.
Core Concepts
✓ Automated Testing: Unit, integration, and model quality tests
✓ ML Pipelines: Reproducible training workflows with Airflow
✓ Experiment Tracking: MLflow for tracking experiments and models
✓ Version Control: Git for code, DVC for data and models
✓ Continuous Training: Automated retraining based on triggers
Technical Skills
✓ Testing Frameworks: pytest for ML code, Great Expectations for data
✓ Pipeline Tools: Scikit-learn pipelines, Airflow DAGs
✓ MLflow: Experiment tracking, model registry, model deployment
✓ GitHub Actions: CI/CD workflows for testing and deployment
✓ DVC: Data and model versioning with remote storage
Best Practices
✓ Environment Separation: Dev, staging, production configs
✓ Feature Flags: Gradual rollout and A/B testing
✓ Rollback Strategy: Quick recovery from failed deployments
✓ Automated Triggers: Smart retraining based on multiple signals
What's Next
In Part 4: Scaling ML Systems, you'll learn:
- Horizontal and vertical scaling strategies
- Load balancing for ML services
- Distributed inference across multiple machines
- GPU optimization for deep learning models
- Batch processing at scale with Spark
- Caching strategies for predictions
- Cost optimization techniques
Resources for Further Learning
- MLflow Documentation: https://mlflow.org/docs/latest/
- DVC Documentation: https://dvc.org/doc
- Apache Airflow: https://airflow.apache.org/docs/
- GitHub Actions for ML: https://github.com/features/actions
- Great Expectations: https://greatexpectations.io/
- Kubeflow Pipelines: https://www.kubeflow.org/docs/components/pipelines/
Practice Exercises
To reinforce your learning:
- Set up MLflow tracking for your ML project
- Create a DVC pipeline for your training workflow
- Write unit tests for your preprocessing functions
- Build an Airflow DAG for automated training
- Implement automated retraining with triggers
- Create a GitHub Actions workflow for model testing
Continue building robust ML systems, and see you in Part 4!