MLOps Part 3: CI/CD for Machine Learning

Learn how to implement continuous integration and continuous deployment for ML systems, including automated testing, ML pipelines, experiment tracking, and deployment automation

Introduction to CI/CD for ML

Continuous Integration and Continuous Deployment (CI/CD) for machine learning extends traditional software engineering practices to handle the unique challenges of ML systems.

What You'll Learn

In this module, you'll understand:

  • Automated testing: Unit tests, integration tests, model tests
  • ML pipelines: Building reproducible training workflows
  • Continuous training: Automated model retraining
  • Version control: Managing models, data, and code
  • Experiment tracking: Tracking experiments with MLflow

Why CI/CD for ML is Different

Traditional software CI/CD focuses on code. ML CI/CD must handle:

  • Code + Data + Models
  • Non-deterministic outputs (model training)
  • Long-running processes (training can take hours/days)
  • Multiple experiment versions
  • Data validation and versioning

Without proper CI/CD, ML projects become chaotic!

---PAGE---

Automated Testing for ML

Testing ML systems requires testing code, data, and model behavior.

1. Unit Tests for ML Code

Test data preprocessing, feature engineering, and utility functions:

import pytest
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler

def preprocess_features(df):
    """Preprocess features for model training"""
    # Remove duplicates
    df = df.drop_duplicates()

    # Fill missing values
    df = df.fillna(df.median())

    # Scale features
    scaler = StandardScaler()
    numerical_cols = df.select_dtypes(include=[np.number]).columns
    df[numerical_cols] = scaler.fit_transform(df[numerical_cols])

    return df

# Unit tests
def test_preprocess_removes_duplicates():
    """Test that preprocessing removes duplicate rows"""
    df = pd.DataFrame({
        'age': [25, 30, 25],
        'income': [50000, 60000, 50000]
    })

    result = preprocess_features(df)
    assert len(result) == 2  # One duplicate removed

def test_preprocess_fills_missing_values():
    """Test that missing values are filled"""
    df = pd.DataFrame({
        'age': [25, np.nan, 35],
        'income': [50000, 60000, 70000]
    })

    result = preprocess_features(df)
    assert result.isnull().sum().sum() == 0  # No missing values

def test_preprocess_scales_features():
    """Test that features are standardized"""
    df = pd.DataFrame({
        'age': [25, 30, 35],
        'income': [50000, 60000, 70000]
    })

    result = preprocess_features(df)
    # Check mean is close to 0 and std is close to 1
    assert abs(result['age'].mean()) < 0.01
    assert abs(result['age'].std() - 1.0) < 0.01

2. Data Validation Tests

Validate data schema, types, and distributions:

import great_expectations as ge

def test_data_schema(df):
    """Validate data schema and types"""

    # Convert to Great Expectations dataset
    ge_df = ge.from_pandas(df)

    # Define expectations
    ge_df.expect_table_columns_to_match_ordered_list([
        'age', 'income', 'credit_score', 'label'
    ])

    ge_df.expect_column_values_to_be_of_type('age', 'int64')
    ge_df.expect_column_values_to_be_of_type('income', 'float64')

    # Validate
    results = ge_df.validate()
    assert results['success'], f"Data validation failed: {results}"

def test_data_ranges(df):
    """Test that data values are in expected ranges"""

    assert df['age'].min() >= 18, "Age should be >= 18"
    assert df['age'].max() <= 100, "Age should be <= 100"

    assert df['income'].min() >= 0, "Income should be non-negative"
    assert df['credit_score'].between(300, 850).all(), "Credit score out of range"

def test_data_completeness(df):
    """Test for missing values in critical columns"""

    critical_columns = ['age', 'income', 'label']
    for col in critical_columns:
        missing_pct = df[col].isnull().mean()
        assert missing_pct < 0.05, f"{col} has {missing_pct:.1%} missing values"

def test_class_distribution(df):
    """Test that classes are balanced enough"""

    class_dist = df['label'].value_counts(normalize=True)
    min_class_pct = class_dist.min()

    assert min_class_pct >= 0.1, f"Class imbalance too severe: {min_class_pct:.1%}"

3. Model Quality Tests

Test model performance and behavior:

from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import cross_val_score

def test_model_accuracy_threshold(model, X_test, y_test):
    """Test that model meets minimum accuracy threshold"""

    predictions = model.predict(X_test)
    accuracy = accuracy_score(y_test, predictions)

    MINIMUM_ACCURACY = 0.75
    assert accuracy >= MINIMUM_ACCURACY, \
        f"Model accuracy {accuracy:.2%} below threshold {MINIMUM_ACCURACY:.2%}"

def test_model_cross_validation(model, X, y):
    """Test model performance with cross-validation"""

    scores = cross_val_score(model, X, y, cv=5, scoring='f1_weighted')
    mean_score = scores.mean()

    MINIMUM_F1 = 0.70
    assert mean_score >= MINIMUM_F1, \
        f"Cross-validation F1 {mean_score:.2%} below threshold"

def test_model_predictions_validity(model, X_test):
    """Test that predictions are valid"""

    predictions = model.predict(X_test)

    # No NaN predictions
    assert not np.isnan(predictions).any(), "Model produced NaN predictions"

    # Predictions in valid range
    assert predictions.min() >= 0, "Negative predictions found"
    assert predictions.max() <= 1, "Predictions > 1 found"

def test_model_inference_time(model, X_test):
    """Test that inference is fast enough"""

    import time

    MAX_LATENCY = 0.1  # 100ms

    start = time.time()
    predictions = model.predict(X_test[:100])
    latency = (time.time() - start) / 100

    assert latency < MAX_LATENCY, \
        f"Inference too slow: {latency*1000:.1f}ms per prediction"

def test_model_determinism(model, X_test):
    """Test that model produces consistent predictions"""

    pred1 = model.predict(X_test)
    pred2 = model.predict(X_test)

    assert np.array_equal(pred1, pred2), \
        "Model produced different predictions for same input"

4. Integration Tests

Test the entire ML pipeline:

def test_full_training_pipeline():
    """Test complete training pipeline end-to-end"""

    # 1. Load data
    df = load_training_data()
    assert len(df) > 0, "No training data loaded"

    # 2. Preprocess
    df_processed = preprocess_features(df)
    assert df_processed.shape[1] == df.shape[1], "Features lost during preprocessing"

    # 3. Split data
    X_train, X_test, y_train, y_test = train_test_split(
        df_processed.drop('label', axis=1),
        df_processed['label'],
        test_size=0.2
    )

    # 4. Train model
    model = train_model(X_train, y_train)
    assert model is not None, "Model training failed"

    # 5. Evaluate
    accuracy = evaluate_model(model, X_test, y_test)
    assert accuracy > 0.7, f"End-to-end accuracy too low: {accuracy:.2%}"

    # 6. Save model
    model_path = save_model(model, "test_model.pkl")
    assert os.path.exists(model_path), "Model not saved"

    # 7. Load and predict
    loaded_model = load_model(model_path)
    predictions = loaded_model.predict(X_test)
    assert len(predictions) == len(X_test), "Prediction count mismatch"

---PAGE---

ML Pipeline Orchestration

Build reproducible, automated training pipelines.

Pipeline with Scikit-learn Pipeline

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV

def create_ml_pipeline():
    """Create a complete ML pipeline"""

    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('classifier', RandomForestClassifier(random_state=42))
    ])

    # Define hyperparameter grid
    param_grid = {
        'classifier__n_estimators': [50, 100, 200],
        'classifier__max_depth': [10, 20, None],
        'classifier__min_samples_split': [2, 5, 10]
    }

    # Create grid search
    grid_search = GridSearchCV(
        pipeline,
        param_grid,
        cv=5,
        scoring='f1_weighted',
        n_jobs=-1,
        verbose=1
    )

    return grid_search

# Usage
pipeline = create_ml_pipeline()
pipeline.fit(X_train, y_train)

print(f"Best parameters: {pipeline.best_params_}")
print(f"Best F1 score: {pipeline.best_score_:.3f}")

# Save entire pipeline
import joblib
joblib.dump(pipeline.best_estimator_, 'pipeline.pkl')

Airflow DAG for ML Pipeline

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'ml-team',
    'depends_on_past': False,
    'start_date': datetime(2025, 1, 1),
    'email': ['ml-alerts@company.com'],
    'email_on_failure': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'ml_training_pipeline',
    default_args=default_args,
    description='Daily ML model training pipeline',
    schedule_interval='0 2 * * *',  # Run at 2 AM daily
    catchup=False
)

def extract_data(**context):
    """Extract data from database"""
    from sqlalchemy import create_engine

    engine = create_engine('postgresql://user:pass@localhost/db')
    df = pd.read_sql('SELECT * FROM training_data WHERE date > NOW() - INTERVAL 30 days', engine)

    # Save to shared location
    df.to_parquet('/data/raw_data.parquet')
    print(f"Extracted {len(df)} rows")

def validate_data(**context):
    """Validate data quality"""
    df = pd.read_parquet('/data/raw_data.parquet')

    # Run validations
    assert len(df) > 1000, "Insufficient data"
    assert df.isnull().sum().sum() / df.size < 0.05, "Too many missing values"

    print("✓ Data validation passed")

def train_model(**context):
    """Train ML model"""
    df = pd.read_parquet('/data/raw_data.parquet')

    X = df.drop('label', axis=1)
    y = df['label']

    pipeline = create_ml_pipeline()
    pipeline.fit(X, y)

    # Save model
    joblib.dump(pipeline, f'/models/model_{datetime.now():%Y%m%d}.pkl')
    print(f"✓ Model trained - F1: {pipeline.best_score_:.3f}")

def evaluate_model(**context):
    """Evaluate model on test set"""
    # Load latest model
    model_path = sorted(glob.glob('/models/model_*.pkl'))[-1]
    model = joblib.load(model_path)

    # Load test data
    test_df = pd.read_parquet('/data/test_data.parquet')
    X_test = test_df.drop('label', axis=1)
    y_test = test_df['label']

    # Evaluate
    predictions = model.predict(X_test)
    accuracy = accuracy_score(y_test, predictions)

    # Check threshold
    assert accuracy > 0.75, f"Model accuracy {accuracy:.2%} below threshold"

    print(f"✓ Model evaluation - Accuracy: {accuracy:.2%}")

def deploy_model(**context):
    """Deploy model to production"""
    model_path = sorted(glob.glob('/models/model_*.pkl'))[-1]

    # Copy to production location
    import shutil
    shutil.copy(model_path, '/production/models/current_model.pkl')

    print(f"✓ Model deployed: {model_path}")

# Define tasks
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag
)

validate_task = PythonOperator(
    task_id='validate_data',
    python_callable=validate_data,
    dag=dag
)

train_task = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag
)

evaluate_task = PythonOperator(
    task_id='evaluate_model',
    python_callable=evaluate_model,
    dag=dag
)

deploy_task = PythonOperator(
    task_id='deploy_model',
    python_callable=deploy_model,
    dag=dag
)

# Define dependencies
extract_task >> validate_task >> train_task >> evaluate_task >> deploy_task

---PAGE---

Experiment Tracking with MLflow

Track experiments, parameters, metrics, and models systematically.

Basic MLflow Tracking

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

# Set experiment
mlflow.set_experiment("fraud_detection")

def train_with_mlflow(X_train, y_train, X_test, y_test, params):
    """Train model with MLflow tracking"""

    with mlflow.start_run(run_name=f"rf_experiment_{datetime.now():%Y%m%d_%H%M}"):

        # Log parameters
        mlflow.log_params(params)

        # Log dataset info
        mlflow.log_param("train_size", len(X_train))
        mlflow.log_param("test_size", len(X_test))
        mlflow.log_param("n_features", X_train.shape[1])

        # Train model
        model = RandomForestClassifier(**params)
        model.fit(X_train, y_train)

        # Make predictions
        y_pred = model.predict(X_test)

        # Calculate metrics
        metrics = {
            'accuracy': accuracy_score(y_test, y_pred),
            'f1_score': f1_score(y_test, y_pred, average='weighted'),
            'precision': precision_score(y_test, y_pred, average='weighted'),
            'recall': recall_score(y_test, y_pred, average='weighted')
        }

        # Log metrics
        mlflow.log_metrics(metrics)

        # Log model
        mlflow.sklearn.log_model(
            model,
            "model",
            registered_model_name="fraud_detector"
        )

        # Log feature importance plot
        import matplotlib.pyplot as plt

        plt.figure(figsize=(10, 6))
        importances = model.feature_importances_
        indices = np.argsort(importances)[::-1][:10]

        plt.bar(range(10), importances[indices])
        plt.title('Top 10 Feature Importances')
        plt.savefig('feature_importance.png')
        mlflow.log_artifact('feature_importance.png')
        plt.close()

        print(f"✓ Experiment logged - F1: {metrics['f1_score']:.3f}")

        return model, metrics

# Run multiple experiments
param_sets = [
    {'n_estimators': 50, 'max_depth': 10},
    {'n_estimators': 100, 'max_depth': 20},
    {'n_estimators': 200, 'max_depth': None}
]

for params in param_sets:
    train_with_mlflow(X_train, y_train, X_test, y_test, params)

MLflow Model Registry

from mlflow.tracking import MlflowClient

client = MlflowClient()

def promote_model_to_production(model_name, run_id):
    """Promote model from staging to production"""

    # Get model version
    model_version = client.search_model_versions(
        f"run_id='{run_id}'"
    )[0].version

    # Transition to staging first
    client.transition_model_version_stage(
        name=model_name,
        version=model_version,
        stage="Staging"
    )

    print(f"✓ Model version {model_version} moved to Staging")

    # After testing in staging, promote to production
    client.transition_model_version_stage(
        name=model_name,
        version=model_version,
        stage="Production"
    )

    print(f"✓ Model version {model_version} promoted to Production")

def load_production_model(model_name):
    """Load latest production model"""

    model_uri = f"models:/{model_name}/Production"
    model = mlflow.sklearn.load_model(model_uri)

    return model

# Usage
production_model = load_production_model("fraud_detector")
predictions = production_model.predict(new_data)

Comparing Experiments

def compare_experiments(experiment_name):
    """Compare all runs in an experiment"""

    experiment = mlflow.get_experiment_by_name(experiment_name)
    runs = mlflow.search_runs(experiment.experiment_id)

    # Sort by F1 score
    runs_sorted = runs.sort_values('metrics.f1_score', ascending=False)

    print("Top 5 Experiments:")
    print(runs_sorted[['run_id', 'params.n_estimators', 'params.max_depth',
                       'metrics.accuracy', 'metrics.f1_score']].head())

    # Get best run
    best_run = runs_sorted.iloc[0]
    print(f"\n✓ Best model - F1: {best_run['metrics.f1_score']:.3f}")
    print(f"  Run ID: {best_run['run_id']}")
    print(f"  Parameters: {best_run['params']}")

    return best_run

best_run = compare_experiments("fraud_detection")

---PAGE---

Version Control for ML

Version control for ML involves code, data, and models.

Git for Code

# Standard Git workflow
git checkout -b feature/new-model-architecture
git add src/models/new_model.py
git add tests/test_new_model.py
git commit -m "feat: Add new transformer-based model architecture"
git push origin feature/new-model-architecture

DVC for Data and Models

DVC (Data Version Control) tracks large files and datasets:

# Initialize DVC
dvc init

# Add data to DVC
dvc add data/train_data.parquet
git add data/train_data.parquet.dvc data/.gitignore
git commit -m "Add training data"

# Add model to DVC
dvc add models/model.pkl
git add models/model.pkl.dvc models/.gitignore
git commit -m "Add trained model"

# Configure remote storage (S3, GCS, Azure, etc.)
dvc remote add -d storage s3://my-bucket/dvc-storage
dvc push

# Pull data/models on another machine
dvc pull

DVC Pipeline

Define reproducible ML pipelines:

# dvc.yaml
stages:
  prepare:
    cmd: python src/prepare_data.py
    deps:
      - data/raw_data.csv
    params:
      - prepare.test_split
    outs:
      - data/train.parquet
      - data/test.parquet

  train:
    cmd: python src/train.py
    deps:
      - data/train.parquet
      - src/train.py
    params:
      - train.n_estimators
      - train.max_depth
    outs:
      - models/model.pkl
    metrics:
      - metrics.json:
          cache: false

  evaluate:
    cmd: python src/evaluate.py
    deps:
      - data/test.parquet
      - models/model.pkl
    metrics:
      - evaluation.json:
          cache: false

Run the pipeline:

# Run entire pipeline
dvc repro

# Run specific stage
dvc repro train

# Compare metrics across experiments
dvc metrics show
dvc metrics diff main workspace

Model Versioning Best Practices

import hashlib
import json
from datetime import datetime

class ModelVersionManager:
    """Manage model versions with metadata"""

    def __init__(self, models_dir='models'):
        self.models_dir = models_dir
        os.makedirs(models_dir, exist_ok=True)

    def save_model(self, model, metadata):
        """Save model with version metadata"""

        # Generate version ID
        version_id = datetime.now().strftime('%Y%m%d_%H%M%S')

        # Calculate model hash
        model_bytes = joblib.dumps(model)
        model_hash = hashlib.sha256(model_bytes).hexdigest()[:8]

        # Full version name
        version_name = f"v{version_id}_{model_hash}"

        # Save model
        model_path = os.path.join(self.models_dir, f"{version_name}.pkl")
        joblib.dump(model, model_path)

        # Save metadata
        full_metadata = {
            'version': version_name,
            'timestamp': datetime.now().isoformat(),
            'model_hash': model_hash,
            **metadata
        }

        metadata_path = os.path.join(self.models_dir, f"{version_name}_metadata.json")
        with open(metadata_path, 'w') as f:
            json.dump(full_metadata, f, indent=2)

        print(f"✓ Model saved: {version_name}")
        print(f"  Accuracy: {metadata.get('accuracy', 'N/A')}")

        return version_name

    def load_model(self, version_name):
        """Load model by version name"""

        model_path = os.path.join(self.models_dir, f"{version_name}.pkl")
        metadata_path = os.path.join(self.models_dir, f"{version_name}_metadata.json")

        model = joblib.load(model_path)

        with open(metadata_path, 'r') as f:
            metadata = json.load(f)

        return model, metadata

    def list_versions(self):
        """List all model versions"""

        import glob

        metadata_files = glob.glob(os.path.join(self.models_dir, '*_metadata.json'))
        versions = []

        for metadata_file in metadata_files:
            with open(metadata_file, 'r') as f:
                versions.append(json.load(f))

        # Sort by timestamp
        versions.sort(key=lambda x: x['timestamp'], reverse=True)

        return versions

# Usage
manager = ModelVersionManager()

# Save model
version = manager.save_model(model, {
    'accuracy': 0.92,
    'f1_score': 0.89,
    'training_samples': 10000,
    'algorithm': 'RandomForest',
    'hyperparameters': {'n_estimators': 100, 'max_depth': 20}
})

# List versions
versions = manager.list_versions()
for v in versions:
    print(f"{v['version']}: Accuracy={v['accuracy']:.2%}")

# Load specific version
model, metadata = manager.load_model(version)

---PAGE---

Continuous Training

Automate model retraining based on triggers.

Trigger-based Retraining

from datetime import datetime, timedelta

class RetrainingScheduler:
    """Schedule and trigger model retraining"""

    def __init__(self):
        self.last_training = None
        self.performance_history = []

    def should_retrain(self, current_metrics, data_drift_score):
        """Decide if retraining is needed"""

        triggers = []

        # 1. Time-based trigger
        if self._time_trigger():
            triggers.append("time_elapsed")

        # 2. Performance degradation trigger
        if self._performance_trigger(current_metrics):
            triggers.append("performance_drop")

        # 3. Data drift trigger
        if self._drift_trigger(data_drift_score):
            triggers.append("data_drift")

        # 4. Data volume trigger
        if self._data_volume_trigger():
            triggers.append("new_data_available")

        if triggers:
            print(f"🔄 Retraining triggered by: {', '.join(triggers)}")
            return True

        return False

    def _time_trigger(self):
        """Retrain every 7 days"""
        if self.last_training is None:
            return True

        days_since_training = (datetime.now() - self.last_training).days
        return days_since_training >= 7

    def _performance_trigger(self, current_metrics):
        """Retrain if performance drops > 5%"""
        if not self.performance_history:
            return False

        baseline = max(self.performance_history[-30:])  # Best of last 30 days
        current = current_metrics.get('f1_score', 0)

        drop = (baseline - current) / baseline
        return drop > 0.05

    def _drift_trigger(self, drift_score):
        """Retrain if significant drift detected"""
        DRIFT_THRESHOLD = 0.2
        return drift_score > DRIFT_THRESHOLD

    def _data_volume_trigger(self):
        """Retrain if 20% more data available"""
        # Check if new data volume exceeds threshold
        # Implementation depends on your data storage
        return False

    def record_training(self, metrics):
        """Record that training occurred"""
        self.last_training = datetime.now()
        self.performance_history.append(metrics.get('f1_score', 0))

# Usage
scheduler = RetrainingScheduler()

# In production monitoring loop
current_performance = {'f1_score': 0.85}
drift_score = 0.25

if scheduler.should_retrain(current_performance, drift_score):
    # Trigger retraining pipeline
    trigger_training_pipeline()
    scheduler.record_training(new_metrics)

Automated Retraining Pipeline

def automated_retraining_pipeline():
    """Complete automated retraining pipeline"""

    print("=" * 50)
    print("AUTOMATED RETRAINING PIPELINE")
    print("=" * 50)

    # 1. Extract new training data
    print("\n[1/7] Extracting data...")
    df = extract_recent_data(days=30)
    print(f"✓ Extracted {len(df)} samples")

    # 2. Validate data quality
    print("\n[2/7] Validating data...")
    validate_data_quality(df)
    print("✓ Data validation passed")

    # 3. Prepare features
    print("\n[3/7] Preparing features...")
    X, y = prepare_features(df)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
    print(f"✓ Train: {len(X_train)}, Test: {len(X_test)}")

    # 4. Train new model
    print("\n[4/7] Training model...")
    new_model = train_model_with_mlflow(X_train, y_train)
    print("✓ Model trained")

    # 5. Evaluate new model
    print("\n[5/7] Evaluating model...")
    new_metrics = evaluate_model(new_model, X_test, y_test)
    print(f"✓ Accuracy: {new_metrics['accuracy']:.2%}, F1: {new_metrics['f1_score']:.2%}")

    # 6. Compare with production model
    print("\n[6/7] Comparing with production...")
    production_model = load_production_model()
    prod_metrics = evaluate_model(production_model, X_test, y_test)

    improvement = new_metrics['f1_score'] - prod_metrics['f1_score']
    print(f"  Production F1: {prod_metrics['f1_score']:.2%}")
    print(f"  New model F1: {new_metrics['f1_score']:.2%}")
    print(f"  Improvement: {improvement:+.2%}")

    # 7. Deploy if better
    print("\n[7/7] Deployment decision...")
    if improvement > 0.01:  # At least 1% improvement
        deploy_model(new_model)
        print("✓ NEW MODEL DEPLOYED")
    else:
        print("⚠️ New model not better - keeping production model")

    print("\n" + "=" * 50)
    print("PIPELINE COMPLETE")
    print("=" * 50)

---PAGE---

GitHub Actions for ML CI/CD

Automate testing and deployment with GitHub Actions.

Testing Workflow

# .github/workflows/ml-tests.yml
name: ML Tests

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest

    steps:
    - uses: actions/checkout@v3

    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.9'

    - name: Install dependencies
      run: |
        pip install -r requirements.txt
        pip install pytest pytest-cov

    - name: Run unit tests
      run: |
        pytest tests/unit -v --cov=src

    - name: Run data validation tests
      run: |
        pytest tests/data -v

    - name: Run model tests
      run: |
        pytest tests/models -v

    - name: Upload coverage
      uses: codecov/codecov-action@v3

Training Workflow

# .github/workflows/train-model.yml
name: Train Model

on:
  schedule:
    - cron: '0 2 * * 0'  # Weekly on Sunday at 2 AM
  workflow_dispatch:  # Manual trigger

jobs:
  train:
    runs-on: ubuntu-latest

    steps:
    - uses: actions/checkout@v3

    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.9'

    - name: Install dependencies
      run: |
        pip install -r requirements.txt

    - name: Download training data
      run: |
        python scripts/download_data.py
      env:
        AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
        AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

    - name: Train model
      run: |
        python src/train.py --config configs/production.yaml
      env:
        MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}

    - name: Evaluate model
      run: |
        python src/evaluate.py

    - name: Upload model artifact
      uses: actions/upload-artifact@v3
      with:
        name: trained-model
        path: models/model.pkl

    - name: Send notification
      if: success()
      uses: 8398a7/action-slack@v3
      with:
        status: ${{ job.status }}
        text: 'Model training completed successfully'
      env:
        SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}

Deployment Workflow

# .github/workflows/deploy-model.yml
name: Deploy Model

on:
  workflow_run:
    workflows: ["Train Model"]
    types:
      - completed

jobs:
  deploy:
    runs-on: ubuntu-latest
    if: ${{ github.event.workflow_run.conclusion == 'success' }}

    steps:
    - uses: actions/checkout@v3

    - name: Download model
      uses: actions/download-artifact@v3
      with:
        name: trained-model
        path: models/

    - name: Test model
      run: |
        python tests/deployment/test_model_ready.py

    - name: Deploy to staging
      run: |
        python scripts/deploy.py --env staging
      env:
        AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
        AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

    - name: Run smoke tests
      run: |
        python tests/deployment/smoke_tests.py --env staging

    - name: Deploy to production
      if: success()
      run: |
        python scripts/deploy.py --env production
      env:
        AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
        AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

    - name: Notify deployment
      if: always()
      uses: 8398a7/action-slack@v3
      with:
        status: ${{ job.status }}
        text: 'Model deployment ${{ job.status }}'
      env:
        SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}

---PAGE---

Best Practices for ML CI/CD

1. Separate Test Environments

# config/environments.py

class Config:
    """Base configuration"""
    TESTING = False
    MODEL_PATH = 'models/production'

class DevelopmentConfig(Config):
    """Development configuration"""
    DEBUG = True
    DATABASE_URI = 'postgresql://localhost/ml_dev'
    MODEL_PATH = 'models/development'

class StagingConfig(Config):
    """Staging configuration"""
    DATABASE_URI = 'postgresql://staging-db/ml_staging'
    MODEL_PATH = 's3://staging-bucket/models'

class ProductionConfig(Config):
    """Production configuration"""
    DATABASE_URI = 'postgresql://prod-db/ml_prod'
    MODEL_PATH = 's3://production-bucket/models'

class TestConfig(Config):
    """Test configuration"""
    TESTING = True
    DATABASE_URI = 'postgresql://localhost/ml_test'
    MODEL_PATH = 'models/test'

# Load config based on environment
import os
env = os.getenv('ENVIRONMENT', 'development')
config = {
    'development': DevelopmentConfig,
    'staging': StagingConfig,
    'production': ProductionConfig,
    'test': TestConfig
}[env]

2. Feature Flags for Gradual Rollouts

class FeatureFlags:
    """Control feature rollout"""

    def __init__(self):
        self.flags = {
            'new_model_v2': {
                'enabled': False,
                'rollout_percentage': 0
            },
            'advanced_preprocessing': {
                'enabled': True,
                'rollout_percentage': 50
            }
        }

    def is_enabled(self, feature_name, user_id=None):
        """Check if feature is enabled for user"""

        flag = self.flags.get(feature_name, {})

        if not flag.get('enabled', False):
            return False

        rollout_pct = flag.get('rollout_percentage', 0)

        if rollout_pct == 100:
            return True

        if user_id:
            # Deterministic rollout based on user ID
            import hashlib
            hash_val = int(hashlib.md5(str(user_id).encode()).hexdigest(), 16)
            return (hash_val % 100) < rollout_pct

        return False

# Usage in prediction service
flags = FeatureFlags()

def predict(features, user_id):
    """Make prediction with feature flags"""

    if flags.is_enabled('new_model_v2', user_id):
        model = load_model('v2')
    else:
        model = load_model('v1')

    if flags.is_enabled('advanced_preprocessing', user_id):
        features = advanced_preprocess(features)
    else:
        features = standard_preprocess(features)

    return model.predict([features])

3. Rollback Strategy

class ModelDeployment:
    """Manage model deployments with rollback"""

    def __init__(self):
        self.deployments = []

    def deploy(self, model_version, environment='production'):
        """Deploy a model version"""

        deployment = {
            'version': model_version,
            'environment': environment,
            'timestamp': datetime.now(),
            'status': 'active'
        }

        # Mark previous deployment as inactive
        for d in self.deployments:
            if d['environment'] == environment and d['status'] == 'active':
                d['status'] = 'inactive'

        self.deployments.append(deployment)

        # Copy model to production path
        self._deploy_model_file(model_version, environment)

        print(f"✓ Deployed {model_version} to {environment}")

    def rollback(self, environment='production'):
        """Rollback to previous version"""

        # Get current and previous deployments
        env_deployments = [d for d in self.deployments if d['environment'] == environment]
        env_deployments.sort(key=lambda x: x['timestamp'], reverse=True)

        if len(env_deployments) < 2:
            print("❌ No previous version to rollback to")
            return False

        current = env_deployments[0]
        previous = env_deployments[1]

        # Mark current as rolled back
        current['status'] = 'rolled_back'

        # Reactivate previous
        previous['status'] = 'active'

        # Deploy previous model
        self._deploy_model_file(previous['version'], environment)

        print(f"✓ Rolled back from {current['version']} to {previous['version']}")
        return True

    def _deploy_model_file(self, version, environment):
        """Copy model file to deployment location"""
        import shutil
        src = f"models/{version}.pkl"
        dst = f"deployments/{environment}/current_model.pkl"
        shutil.copy(src, dst)

# Usage
deployment = ModelDeployment()

# Deploy new version
deployment.deploy('v20251020_001')

# If issues detected, rollback
deployment.rollback()

---PAGE---

---QUIZ--- TITLE: CI/CD for ML Knowledge Check INTRO: Test your understanding of CI/CD practices for machine learning!

Q: What are the three main components that need version control in ML systems? A: Code, Data, and Models A: Code, Tests, and Documentation A: Models, APIs, and Databases A: Data, Metrics, and Logs CORRECT: 0 EXPLAIN: ML systems require versioning of Code (algorithms, preprocessing), Data (training datasets), and Models (trained artifacts). This is different from traditional software which primarily versions code.

Q: What is the purpose of DVC (Data Version Control)? A: To version control large data files and ML models A: To replace Git for code versioning A: To deploy models to production A: To monitor model performance CORRECT: 0 EXPLAIN: DVC is specifically designed to version control large files like datasets and ML models, which are too large for Git. It works alongside Git to provide complete version control for ML projects.

Q: In MLflow, what is the Model Registry used for? A: Storing training data A: Managing model lifecycle and promoting models between stages A: Running experiments A: Tracking metrics only CORRECT: 1 EXPLAIN: The MLflow Model Registry manages the full lifecycle of ML models, including versioning, stage transitions (Staging → Production), and model lineage. It helps teams collaborate on model deployment.

Q: What triggers should be considered for automated model retraining? A: Only time-based triggers (e.g., weekly) A: Time, performance degradation, data drift, and new data availability A: Only when manually triggered by data scientists A: Only when the model crashes CORRECT: 1 EXPLAIN: Effective retraining automation uses multiple triggers: time-based (regular intervals), performance drops, data drift detection, and sufficient new data availability. Relying on a single trigger can miss important retraining opportunities.

Q: What is the main difference between unit tests and integration tests for ML? A: Unit tests are faster than integration tests A: Unit tests check individual components; integration tests verify the entire pipeline A: Integration tests are optional A: Unit tests only test the model CORRECT: 1 EXPLAIN: Unit tests validate individual components (preprocessing functions, feature engineering) in isolation. Integration tests verify that the entire ML pipeline works end-to-end, from data loading to prediction.

Q: What should you test when validating ML model quality? A: Only accuracy on test set A: Accuracy, inference speed, prediction validity, and determinism A: Only the training loss A: Only the code syntax CORRECT: 1 EXPLAIN: Comprehensive model testing includes accuracy/performance metrics, inference latency (production requirement), prediction validity (no NaN, correct ranges), and determinism (reproducible predictions).

Q: What is a feature flag in ML deployment? A: A way to gradually roll out new models to a percentage of users A: A type of model parameter A: A data quality check A: A testing framework CORRECT: 0 EXPLAIN: Feature flags enable gradual rollout of new models or features by controlling what percentage of users see the new version. This allows safe deployment and easy rollback if issues arise.

Q: Why is rollback capability important in ML deployments? A: To save storage space A: To quickly revert to a previous working model if the new one performs poorly A: To delete old models A: To reduce costs CORRECT: 1 EXPLAIN: Rollback capability is critical because new models might perform poorly in production despite good test metrics. Quick rollback to a known-good previous version minimizes business impact while investigating issues. ---END-QUIZ---

---PAGE---

Key Takeaways

Congratulations! You've completed Part 3 of the MLOps series on CI/CD for Machine Learning.

Core Concepts

Automated Testing: Unit, integration, and model quality tests

ML Pipelines: Reproducible training workflows with Airflow

Experiment Tracking: MLflow for tracking experiments and models

Version Control: Git for code, DVC for data and models

Continuous Training: Automated retraining based on triggers

Technical Skills

Testing Frameworks: pytest for ML code, Great Expectations for data

Pipeline Tools: Scikit-learn pipelines, Airflow DAGs

MLflow: Experiment tracking, model registry, model deployment

GitHub Actions: CI/CD workflows for testing and deployment

DVC: Data and model versioning with remote storage

Best Practices

Environment Separation: Dev, staging, production configs

Feature Flags: Gradual rollout and A/B testing

Rollback Strategy: Quick recovery from failed deployments

Automated Triggers: Smart retraining based on multiple signals

What's Next

In Part 4: Scaling ML Systems, you'll learn:

  • Horizontal and vertical scaling strategies
  • Load balancing for ML services
  • Distributed inference across multiple machines
  • GPU optimization for deep learning models
  • Batch processing at scale with Spark
  • Caching strategies for predictions
  • Cost optimization techniques

Resources for Further Learning

Practice Exercises

To reinforce your learning:

  1. Set up MLflow tracking for your ML project
  2. Create a DVC pipeline for your training workflow
  3. Write unit tests for your preprocessing functions
  4. Build an Airflow DAG for automated training
  5. Implement automated retraining with triggers
  6. Create a GitHub Actions workflow for model testing

Continue building robust ML systems, and see you in Part 4!

MLOps Part 3: CI/CD for Machine Learning | Software Engineer Blog