On this page
- 1. ML Pipeline Architecture
- 2. Training Pipeline Automation
- 3. Continuous Training (CT)
- 4. Continuous Integration for ML
- 5. Continuous Deployment for ML Models
- 6. Model Testing Strategies
- 7. Data Validation in Pipelines
- 8. Automated Model Evaluation
- 9. Model Promotion Strategies (Staging → Production)
- 10. Feature Pipeline Automation
- 11. Trigger-Based Retraining
- 12. Model Drift Detection Concepts
- 13. Automated Model Rollbacks
- 14. GitOps for ML Deployments
CI/CD for Machine Learning
1. ML Pipeline Architecture
Before getting into individual concepts, you need a mental model of what an ML pipeline actually is and why it needs to be a pipeline at all rather than a collection of scripts someone runs manually.
In the early stages of any ML project, the workflow is informal. A data scientist downloads some data, writes preprocessing code in a Jupyter notebook, trains a model, evaluates it, and if it looks good, somehow gets it to production. This works for prototypes. It completely falls apart when you need to retrain the model regularly, when multiple people are working on different parts of the process, when you need to reproduce results from six months ago, when production model quality degrades and you need to debug why, or when the business demands a new model version every week instead of every quarter.
An ML pipeline is a structured, automated sequence of steps that takes raw data and produces a deployed, monitored model. It’s not a script — it’s an orchestrated workflow where each step has defined inputs, defined outputs, defined failure behavior, and defined dependencies on other steps. The pipeline can be triggered, monitored, versioned, and reproduced. It produces artifacts at each stage that are tracked and auditable.
The architecture of a complete ML pipeline has several distinct layers that each solve different problems.
The data layer is where raw data comes from and how it flows into the pipeline. This includes data ingestion from source systems, data validation to catch quality problems before they corrupt downstream steps, and data versioning so you always know exactly which data trained each model. The data layer is often the most complex and least glamorous part of ML infrastructure. Getting it right is what makes everything else reliable.
The feature layer transforms raw data into model-ready features. Feature engineering code lives here — the logic that creates meaningful representations of your data. This layer must be executed consistently across training and serving to avoid training-serving skew. It also handles feature storage so that expensive-to-compute features can be reused across experiments and by multiple models.
The training layer is where model fitting actually happens. It receives features, trains model weights, tracks the training run with all hyperparameters and metrics, and produces a model artifact. This layer needs to be parameterized — the same code runs differently based on configuration, enabling hyperparameter search and architecture experiments without code changes.
The evaluation layer validates the trained model before it’s allowed to proceed toward production. It runs evaluation on held-out test data, computes relevant metrics, compares against the current production model, checks fairness and robustness properties, and makes a pass/fail decision based on defined thresholds. This layer is the automated quality gate that prevents bad models from reaching users.
The deployment layer takes a validated model and makes it available to serve predictions. It handles model registration, environment-specific deployment (staging, canary, production), traffic routing, and integration with the serving infrastructure.
The monitoring layer watches the deployed model continuously, detecting when its performance degrades due to data drift, concept drift, or operational issues, and triggering retraining when necessary.
These layers are connected by an orchestration system — Airflow, Kubeflow Pipelines, Prefect, Metaflow, or Vertex AI Pipelines — that manages execution order, handles failures, passes artifacts between steps, and provides visibility into pipeline runs. The orchestration system is the nervous system that makes the pipeline a coherent whole rather than a collection of independent scripts.
For ML systems at any meaningful scale, the pipeline architecture is not optional. It’s the foundation that makes everything else — reproducibility, automation, governance, monitoring, and team collaboration — possible.
2. Training Pipeline Automation
A training pipeline is automated end-to-end so that training a new model version is as simple as pushing a button or having it triggered automatically, rather than requiring manual execution of steps in the right order with the right parameters.
The case for automation is compelling. Manual training processes are slow because they require human intervention at each step. They’re error-prone because humans make mistakes — running an old version of preprocessing code, forgetting to set a random seed, evaluating on the wrong test split. They’re non-reproducible because the exact steps and parameters aren’t recorded systematically. And they don’t scale — a team trying to run experiments frequently while also retraining production models on a schedule quickly runs out of capacity to do both manually.
An automated training pipeline addresses all of these. Given a trigger (a schedule, a code change, a data threshold being crossed, a manual trigger), the pipeline runs the complete training workflow without human intervention, records everything needed to reproduce the run, and produces a registered model ready for evaluation and deployment.
A concrete automated training pipeline with Kubeflow Pipelines:
from kfp import dsl, compiler
from kfp.dsl import Dataset, Model, Metrics, Input, Output
import kfp.components as comp
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn", "mlflow", "boto3"]
)
def fetch_training_data(
start_date: str,
end_date: str,
data_version: str,
output_dataset: Output[Dataset]
):
import pandas as pd
import boto3
import json
s3 = boto3.client('s3')
# Fetch data for the specified date range
df = pd.read_parquet(
f"s3://ml-data/transactions/{start_date}_{end_date}.parquet"
)
# Save with version metadata
df.to_parquet(output_dataset.path)
output_dataset.metadata["row_count"] = len(df)
output_dataset.metadata["date_range"] = f"{start_date}_{end_date}"
output_dataset.metadata["data_version"] = data_version
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn", "great-expectations"]
)
def validate_data(
input_dataset: Input[Dataset],
validation_passed: Output[Dataset]
):
import pandas as pd
import great_expectations as ge
df = pd.read_parquet(input_dataset.path)
# Run data quality checks
ge_df = ge.from_pandas(df)
results = ge_df.expect_column_values_to_not_be_null("transaction_amount")
if not results.success:
raise ValueError(f"Data validation failed: {results}")
results = ge_df.expect_column_values_to_be_between(
"transaction_amount", min_value=0, max_value=1_000_000
)
if not results.success:
raise ValueError(f"Data validation failed: {results}")
# Pass through validated data
df.to_parquet(validation_passed.path)
validation_passed.metadata.update(input_dataset.metadata)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn", "feature-engine"]
)
def engineer_features(
input_dataset: Input[Dataset],
feature_dataset: Output[Dataset]
):
import pandas as pd
import joblib
from sklearn.preprocessing import StandardScaler, LabelEncoder
import numpy as np
df = pd.read_parquet(input_dataset.path)
# Feature engineering
df['hour_of_day'] = pd.to_datetime(df['timestamp']).dt.hour
df['day_of_week'] = pd.to_datetime(df['timestamp']).dt.dayofweek
df['amount_log'] = np.log1p(df['transaction_amount'])
# Encode categoricals
le = LabelEncoder()
df['merchant_category_encoded'] = le.fit_transform(df['merchant_category'])
# Scale numerical features
scaler = StandardScaler()
numerical_cols = ['amount_log', 'account_age_days']
df[numerical_cols] = scaler.fit_transform(df[numerical_cols])
# Save preprocessors for serving
joblib.dump({
'label_encoder': le,
'scaler': scaler,
'feature_columns': ['hour_of_day', 'day_of_week', 'amount_log',
'merchant_category_encoded', 'account_age_days']
}, feature_dataset.path + '_preprocessors.pkl')
df.to_parquet(feature_dataset.path)
@dsl.component(
base_image="python:3.11",
packages_to_install=["pandas", "scikit-learn", "mlflow", "xgboost"]
)
def train_model(
feature_dataset: Input[Dataset],
n_estimators: int,
max_depth: int,
learning_rate: float,
random_seed: int,
trained_model: Output[Model],
training_metrics: Output[Metrics]
):
import pandas as pd
import xgboost as xgb
import mlflow
import joblib
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, average_precision_score
df = pd.read_parquet(feature_dataset.path)
feature_cols = feature_dataset.metadata.get('feature_columns', [])
X = df[feature_cols]
y = df['is_fraud']
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=random_seed, stratify=y
)
with mlflow.start_run() as run:
# Log parameters
mlflow.log_params({
"n_estimators": n_estimators,
"max_depth": max_depth,
"learning_rate": learning_rate,
"random_seed": random_seed,
"training_samples": len(X_train),
"validation_samples": len(X_val),
"data_version": feature_dataset.metadata.get("data_version"),
})
# Train
model = xgb.XGBClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
learning_rate=learning_rate,
random_state=random_seed,
eval_metric='auc'
)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=20,
verbose=False
)
# Evaluate
val_proba = model.predict_proba(X_val)[:, 1]
auc_roc = roc_auc_score(y_val, val_proba)
auc_pr = average_precision_score(y_val, val_proba)
mlflow.log_metrics({"val_auc_roc": auc_roc, "val_auc_pr": auc_pr})
# Save
joblib.dump(model, trained_model.path)
trained_model.metadata["mlflow_run_id"] = run.info.run_id
trained_model.metadata["val_auc_roc"] = auc_roc
training_metrics.log_metric("val_auc_roc", auc_roc)
training_metrics.log_metric("val_auc_pr", auc_pr)
@dsl.pipeline(
name="fraud-detection-training",
description="Complete training pipeline for fraud detection model"
)
def fraud_detection_training_pipeline(
start_date: str = "2024-01-01",
end_date: str = "2024-03-31",
data_version: str = "v2024q1",
n_estimators: int = 500,
max_depth: int = 6,
learning_rate: float = 0.05,
random_seed: int = 42
):
fetch_task = fetch_training_data(
start_date=start_date,
end_date=end_date,
data_version=data_version
)
validate_task = validate_data(
input_dataset=fetch_task.outputs["output_dataset"]
)
feature_task = engineer_features(
input_dataset=validate_task.outputs["validation_passed"]
)
train_task = train_model(
feature_dataset=feature_task.outputs["feature_dataset"],
n_estimators=n_estimators,
max_depth=max_depth,
learning_rate=learning_rate,
random_seed=random_seed
)
return train_task.outputs
# Compile the pipeline
compiler.Compiler().compile(
pipeline_func=fraud_detection_training_pipeline,
package_path="fraud_detection_pipeline.yaml"
)This pipeline runs each step in a separate container, tracks all artifacts between steps, logs parameters and metrics to MLflow, and produces a registered model as output. The entire workflow can be triggered with a single API call or scheduled to run automatically.
3. Continuous Training (CT)
Continuous Training is the practice of automatically retraining models on a regular cadence or in response to specific triggers, rather than training once and leaving the model in production indefinitely. It’s the ML analog of continuous integration — just as CI automatically builds and tests code whenever changes are pushed, CT automatically trains and evaluates models whenever the conditions for retraining are met.
The motivation for Continuous Training is the fundamental characteristic of ML models that distinguishes them from traditional software: they degrade over time without code changes. A fraud detection model trained in January performs worse in September because fraudster behavior evolves. A recommendation model trained before a major product change performs poorly after the change because user behavior patterns are different. A demand forecasting model trained before an economic disruption fails afterward because the economic context has changed.
Traditional software doesn’t have this problem — a function that correctly sorts a list in January still correctly sorts a list in September. ML models are different because their performance is tied to the statistical distribution of the world at the time they were trained, and the world changes.
Scheduled retraining is the simplest form of CT. On a fixed schedule (weekly, monthly, quarterly depending on how fast the domain changes), a training pipeline runs, produces a new model version, evaluates it against the current production model, and promotes it if it’s better or at least not significantly worse.
The schedule should be informed by how fast model quality degrades. A news recommendation model trained on rapidly changing topic distributions might need daily retraining. A credit risk model based on slower-moving behavioral signals might be fine with monthly retraining. Monitoring tells you whether your current schedule is appropriate — if you’re seeing significant metric degradation between retraining cycles, retrain more frequently.
Event-triggered retraining trains when specific conditions are met rather than on a fixed schedule. We’ll cover this in detail in the trigger-based retraining section, but the key idea is that retraining runs when it’s needed, not when the calendar says it should.
Continuous Training infrastructure requirements — CT is not free. Running training pipelines regularly requires compute resources (potentially expensive GPUs), storage for training data and model artifacts, an orchestration system to schedule and monitor pipeline runs, and human oversight to catch cases where automated retraining produces worse models.
The compute cost of CT is why not all models should be continuously trained. The right question is: does the business value of a fresher model outweigh the compute cost of retraining frequently? For a model that makes high-value real-time decisions (fraud detection, pricing), yes. For a model that generates weekly reports, perhaps not.
CT vs traditional ML development — in traditional ML, there’s a clear beginning (start training) and end (model is deployed). In CT, there’s no end — training is a continuous process that runs indefinitely. This requires infrastructure that’s designed to run robustly for months and years, not just for a one-time project. It requires monitoring of the training process itself (are pipelines succeeding? are models improving?), not just monitoring of the deployed model. And it requires governance — automated systems that prevent bad models from being deployed without introducing human bottlenecks for every training run.
4. Continuous Integration for ML
Continuous Integration for ML extends the standard software CI concept to handle the unique challenges of machine learning code. Standard CI builds and tests code. ML CI does that plus validates data, tests model behavior, and ensures the training pipeline itself works correctly.
The distinction matters because ML code has multiple failure modes that standard code testing doesn’t cover. Your Python code might be syntactically correct and pass all unit tests, but the model it trains might be catastrophically wrong because the feature engineering has a subtle bug, because the train/test split has data leakage, or because the evaluation metric is being computed incorrectly. Standard CI catches syntax errors and unit test failures. ML CI must catch model quality problems too.
What ML CI validates:
Code correctness — the same linting, type checking, and unit testing as standard CI. This is table stakes, not optional.
Data pipeline correctness — unit tests for feature engineering functions (given this input, does the feature computation produce the expected output?), schema validation tests (does the feature output have the expected columns and types?), and data leakage tests (do features inadvertently include target-correlated information that wouldn’t be available at serving time?).
# Unit test for feature engineering
import pytest
import pandas as pd
import numpy as np
from src.features import engineer_features
def test_amount_log_transformation():
"""Log transformation should handle zero values without error"""
input_df = pd.DataFrame({
'transaction_amount': [0, 1, 100, 10000],
'timestamp': pd.to_datetime(['2024-01-01'] * 4),
'account_age_days': [30, 365, 730, 1000],
'merchant_category': ['grocery'] * 4
})
result = engineer_features(input_df)
assert 'amount_log' in result.columns
assert result['amount_log'].isna().sum() == 0 # No NaN from log(0)
assert (result['amount_log'] >= 0).all() # log1p gives non-negative values
def test_no_future_data_leakage():
"""Features should not use any future timestamp relative to the transaction"""
# Create transactions with known timestamps
input_df = pd.DataFrame({
'transaction_id': [1, 2, 3],
'timestamp': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-03']),
'transaction_amount': [100, 200, 300],
'account_age_days': [30, 31, 32],
'merchant_category': ['grocery'] * 3
})
result = engineer_features(input_df)
# hour_of_day should be derived only from the transaction timestamp
expected_hours = pd.to_datetime(input_df['timestamp']).dt.hour
assert (result['hour_of_day'] == expected_hours).all()
def test_feature_schema():
"""Output features must have exactly the expected columns"""
input_df = create_sample_transactions(n=100)
result = engineer_features(input_df)
expected_columns = {
'hour_of_day', 'day_of_week', 'amount_log',
'merchant_category_encoded', 'account_age_days'
}
assert set(result.columns) == expected_columns
assert result.dtypes['merchant_category_encoded'] == np.int64
assert result.dtypes['amount_log'] == np.float64Training pipeline validation — run the full training pipeline on a small sample of data to verify it completes without errors. This “smoke test” catches bugs in the training code that unit tests might miss (wrong column names passed to the model, incompatible library versions, incorrect file paths):
# In GitHub Actions CI
- name: Run training pipeline smoke test
run: |
python -m scripts.run_pipeline \
--config configs/ci_smoke_test.yaml \
--max-samples 1000 \
--fast-dev-run true
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}The ci_smoke_test.yaml config uses a tiny dataset (1000 samples instead of millions), a minimal model (10 trees instead of 500), and skips expensive validation steps. It runs in 2-3 minutes in CI and catches integration bugs before they reach the full training pipeline.
Model quality gates — even on the small CI dataset, you can validate that the trained model beats random (AUC > 0.5), that outputs are in the expected range, and that the model can be serialized and loaded:
def test_trained_model_quality():
"""Model should beat random baseline on validation set"""
# Train on small sample
model = train_on_sample(n_samples=1000, seed=42)
# Must beat random baseline significantly
val_auc = evaluate_model(model, validation_set)
assert val_auc > 0.65, f"Model AUC {val_auc} doesn't beat random baseline"
# Must produce valid probabilities
predictions = model.predict_proba(sample_features)
assert predictions.min() >= 0
assert predictions.max() <= 1
assert not np.isnan(predictions).any()
def test_model_serialization():
"""Model must be serializable and produce identical predictions after loading"""
model = train_on_sample(n_samples=500)
predictions_before = model.predict_proba(sample_features)
# Save and reload
save_model(model, "/tmp/test_model.onnx")
loaded_model = load_model("/tmp/test_model.onnx")
predictions_after = loaded_model.predict_proba(sample_features)
np.testing.assert_array_almost_equal(
predictions_before, predictions_after, decimal=5
)CI for ML is expensive — running even a smoke test training run costs more compute than running unit tests. This forces prioritization: which checks must run on every PR, which run only on merges to main, which run nightly. The principle is the same as standard CI: fast feedback on common failures, slower comprehensive checks less frequently.
5. Continuous Deployment for ML Models
Continuous Deployment for ML means that a model which passes all automated quality gates is automatically deployed to production — or at least to a staging environment — without requiring manual intervention. This is the ML analog of software CD, and it has the same fundamental purpose: reducing the time between a validated change and its impact on users.
The case for CD in ML is the same as in software: manual deployment processes are slow, error-prone, and create a bottleneck. If deploying a new model version requires a data scientist to run a script, wait for it to finish, check that it completed correctly, then file a ticket for an engineer to update the serving infrastructure, then wait for that to be reviewed and executed — the process might take days. With CD, a model that completes training, passes evaluation, and meets promotion criteria is deployed automatically, potentially within hours of training completing.
But CD for ML has unique complexities that don’t exist in standard software CD. A new software version that passes tests is almost certainly an improvement or at least not a regression. A new model version that passes all automated tests might still behave worse than the production model in subtle ways that tests didn’t catch. Model predictions can be confidently wrong — the model produces a high-confidence prediction that happens to be incorrect — in ways that only manifest at scale on real user behavior.
This is why ML CD almost always includes graduated deployment: the new model version doesn’t immediately serve 100% of traffic. Instead, it’s promoted through stages.
A complete ML CD pipeline:
# GitHub Actions workflow triggered after training pipeline completes
name: ML Model CD
on:
workflow_dispatch:
inputs:
model_version:
description: 'MLflow model version to deploy'
required: true
environment:
description: 'Target environment'
required: true
default: 'staging'
jobs:
validate-model:
runs-on: ubuntu-latest
outputs:
deployment-approved: ${{ steps.gate.outputs.approved }}
steps:
- name: Download model artifacts
run: |
python scripts/download_model.py \
--model-name fraud-detection \
--version ${{ github.event.inputs.model_version }} \
--output-dir /tmp/model
- name: Run comprehensive model tests
run: |
python scripts/test_model.py \
--model-path /tmp/model \
--test-data data/held-out-test-set.parquet \
--min-auc-roc 0.92 \
--min-auc-pr 0.78 \
--max-latency-p99-ms 200 \
--output results/model_test_results.json
- name: Compare against production baseline
id: gate
run: |
python scripts/compare_models.py \
--candidate-version ${{ github.event.inputs.model_version }} \
--baseline-stage Production \
--minimum-improvement -0.005 # Allow at most 0.5% degradation
if [ $? -eq 0 ]; then
echo "approved=true" >> $GITHUB_OUTPUT
else
echo "approved=false" >> $GITHUB_OUTPUT
fi
deploy-staging:
needs: validate-model
if: needs.validate-model.outputs.deployment-approved == 'true'
runs-on: ubuntu-latest
environment: staging
steps:
- name: Deploy to staging
run: |
python scripts/deploy_model.py \
--model-version ${{ github.event.inputs.model_version }} \
--environment staging \
--traffic-percentage 100
- name: Wait for staging deployment health
run: |
python scripts/wait_for_health.py \
--environment staging \
--timeout-minutes 10
- name: Run staging smoke tests
run: |
python scripts/smoke_test.py \
--environment staging \
--test-requests 100 \
--max-error-rate 0.01
deploy-production-canary:
needs: deploy-staging
runs-on: ubuntu-latest
environment: production
steps:
- name: Deploy canary (5% traffic)
run: |
python scripts/deploy_model.py \
--model-version ${{ github.event.inputs.model_version }} \
--environment production \
--traffic-percentage 5 \
--deployment-type canary
- name: Monitor canary for 30 minutes
run: |
python scripts/monitor_canary.py \
--environment production \
--model-version ${{ github.event.inputs.model_version }} \
--duration-minutes 30 \
--max-error-rate 0.01 \
--max-latency-p99-ms 300 \
--min-prediction-confidence 0.6
- name: Promote to full production
run: |
python scripts/deploy_model.py \
--model-version ${{ github.event.inputs.model_version }} \
--environment production \
--traffic-percentage 100
- name: Notify team
run: |
python scripts/notify_deployment.py \
--model-version ${{ github.event.inputs.model_version }} \
--status success \
--slack-channel "#ml-deployments"The difference between ML CD and software CD — software CD typically deploys immediately after tests pass. ML CD includes a monitoring period (the canary watch) where the new model runs on real traffic and automated systems check whether real-world behavior matches expectations. This monitoring period is ML-specific because model behavior can be subtly wrong in ways that only become apparent at real production scale with real user distributions.
6. Model Testing Strategies
Testing ML models is fundamentally different from testing software functions. A software function either returns the correct output or it doesn’t — the test checks for correctness. A model produces probabilistic outputs that vary across inputs — the test checks for quality, consistency, robustness, and appropriate behavior across different input conditions.
Model testing has multiple levels, each answering different questions:
Behavioral testing (What-if testing) checks model behavior on specific inputs where you know what the correct behavior should be, even without knowing the exact correct output. This comes from Ribeiro et al.’s Checklist methodology:
class FraudModelBehaviorTests:
def test_high_amount_increases_fraud_probability(self):
"""Higher transaction amounts should generally increase fraud probability"""
base_transaction = create_normal_transaction(amount=50)
high_amount_transaction = base_transaction.copy()
high_amount_transaction['amount'] = 10000
base_prob = model.predict_proba([base_transaction])[0][1]
high_prob = model.predict_proba([high_amount_transaction])[0][1]
# High amounts should generally be riskier
assert high_prob > base_prob, (
f"High amount transaction ({high_prob:.3f}) should have higher "
f"fraud probability than normal ({base_prob:.3f})"
)
def test_new_account_is_riskier(self):
"""Very new accounts should be considered riskier"""
established = create_transaction(account_age_days=730)
new_account = create_transaction(account_age_days=1)
established_prob = model.predict_proba([established])[0][1]
new_prob = model.predict_proba([new_account])[0][1]
assert new_prob > established_prob
def test_prediction_consistency(self):
"""Same input should always produce same output"""
transaction = create_sample_transaction()
predictions = [
model.predict_proba([transaction])[0][1]
for _ in range(10)
]
assert len(set(round(p, 6) for p in predictions)) == 1, \
"Model is non-deterministic"
def test_invariance_to_irrelevant_features(self):
"""Changing irrelevant fields should not affect prediction"""
transaction = create_sample_transaction()
modified = transaction.copy()
modified['internal_record_id'] = 99999 # Should not affect prediction
original_prob = model.predict_proba([transaction])[0][1]
modified_prob = model.predict_proba([modified])[0][1]
assert abs(original_prob - modified_prob) < 1e-6
def test_minimum_functionality(self):
"""Model must correctly classify obvious cases"""
# Clear legitimate transactions
legitimate = [create_normal_transaction() for _ in range(100)]
# Clear fraud patterns
fraudulent = [create_fraud_transaction() for _ in range(100)]
legitimate_probs = model.predict_proba(legitimate)[:, 1]
fraud_probs = model.predict_proba(fraudulent)[:, 1]
# On average, fraud transactions should score much higher
assert fraud_probs.mean() > legitimate_probs.mean() + 0.3Robustness testing checks behavior under unusual but valid inputs:
def test_edge_case_inputs():
"""Model should handle edge cases without crashing"""
edge_cases = [
create_transaction(amount=0.01), # Minimum amount
create_transaction(amount=999999.99), # Maximum amount
create_transaction(account_age_days=0), # Brand new account
create_transaction(account_age_days=36500), # Very old account (100 years)
create_transaction(hour_of_day=0), # Midnight
create_transaction(hour_of_day=23), # 11pm
]
for case in edge_cases:
prob = model.predict_proba([case])[0][1]
assert 0 <= prob <= 1, f"Invalid probability {prob} for edge case"
assert not np.isnan(prob), f"NaN probability for edge case"
def test_adversarial_inputs():
"""Model should not produce extreme outputs for slightly perturbed inputs"""
base = create_normal_transaction()
for feature in ['amount', 'account_age_days']:
perturbed = base.copy()
perturbed[feature] = base[feature] * 1.01 # 1% change
base_prob = model.predict_proba([base])[0][1]
perturbed_prob = model.predict_proba([perturbed])[0][1]
# 1% input change should not cause >10% prediction change
assert abs(base_prob - perturbed_prob) < 0.1, (
f"Model too sensitive to small changes in {feature}"
)Fairness testing checks that the model doesn’t perform significantly differently across demographic groups:
def test_demographic_parity():
"""Model should not have significantly different FPR across groups"""
for group in ['group_a', 'group_b', 'group_c']:
group_data = test_data[test_data['demographic_group'] == group]
# Compute false positive rate for this group
legitimate = group_data[group_data['is_fraud'] == 0]
probs = model.predict_proba(legitimate[feature_cols])[:, 1]
fpr = (probs > 0.7).mean() # Fraction of legitimate flagged as fraud
# FPR should be within 2x of the overall FPR
assert fpr <= overall_fpr * 2, (
f"False positive rate for {group} ({fpr:.3f}) is more than 2x "
f"overall rate ({overall_fpr:.3f})"
)Performance testing ensures the model meets latency requirements:
import time
def test_inference_latency():
"""Model must meet P99 latency SLO of 200ms"""
sample_requests = generate_sample_requests(n=1000)
latencies = []
for request in sample_requests:
start = time.perf_counter()
model.predict_proba([request])
latencies.append((time.perf_counter() - start) * 1000)
p99_ms = np.percentile(latencies, 99)
assert p99_ms < 200, f"P99 latency {p99_ms:.1f}ms exceeds 200ms SLO"
# Also check for batch inference throughput
batch_start = time.perf_counter()
model.predict_proba(sample_requests[:100])
batch_duration = (time.perf_counter() - batch_start) * 1000
throughput = 100 / (batch_duration / 1000)
assert throughput > 500, f"Batch throughput {throughput:.0f} req/s below 500 req/s target"7. Data Validation in Pipelines
Data validation is the process of automatically checking data quality and properties at each stage of the ML pipeline before the data is used for training or inference. It’s the safety net that prevents corrupted, drifted, or incorrectly formatted data from silently producing bad models.
The critical insight about data validation is that data problems are often silent failures. A training pipeline without validation will happily train a model on corrupted data, evaluate it against an equally corrupted test set, produce seemingly reasonable metrics, and deploy a model that makes terrible predictions in production. The failure is invisible until its effects are felt by users. Data validation makes silent failures visible and loud.
Schema validation is the first layer — checking that data has the expected structure:
import pandera as pa
from pandera import Column, DataFrameSchema, Check
# Define the expected schema for training data
training_data_schema = DataFrameSchema({
"transaction_id": Column(str, nullable=False),
"timestamp": Column(pa.Timestamp, nullable=False),
"transaction_amount": Column(
float,
checks=[
Check.greater_than_or_equal_to(0),
Check.less_than_or_equal_to(1_000_000),
],
nullable=False
),
"merchant_category": Column(
str,
checks=Check.isin(['grocery', 'electronics', 'restaurant',
'travel', 'other']),
nullable=False
),
"account_age_days": Column(
int,
checks=[
Check.greater_than_or_equal_to(0),
Check.less_than_or_equal_to(36500),
],
nullable=False
),
"is_fraud": Column(
bool,
nullable=False
),
})
def validate_training_data(df: pd.DataFrame) -> pd.DataFrame:
try:
validated = training_data_schema.validate(df)
logging.info(f"Schema validation passed: {len(df)} rows")
return validated
except pa.errors.SchemaError as e:
logging.error(f"Schema validation failed: {e}")
raiseDistribution validation checks that the data’s statistical properties match expectations — catching data drift, pipeline bugs, and sampling errors:
import great_expectations as ge
from scipy import stats
def validate_data_distribution(
current_df: pd.DataFrame,
reference_df: pd.DataFrame,
significance_level: float = 0.05
) -> dict:
results = {}
for column in ['transaction_amount', 'account_age_days']:
# Kolmogorov-Smirnov test: does current distribution match reference?
ks_stat, p_value = stats.ks_2samp(
reference_df[column].dropna(),
current_df[column].dropna()
)
results[column] = {
'ks_statistic': ks_stat,
'p_value': p_value,
'distribution_shifted': p_value < significance_level,
'reference_mean': reference_df[column].mean(),
'current_mean': current_df[column].mean(),
'mean_change_pct': abs(
current_df[column].mean() - reference_df[column].mean()
) / reference_df[column].mean() * 100
}
# Check class balance
fraud_rate = current_df['is_fraud'].mean()
expected_fraud_rate = reference_df['is_fraud'].mean()
results['class_balance'] = {
'current_fraud_rate': fraud_rate,
'expected_fraud_rate': expected_fraud_rate,
'deviation_pct': abs(fraud_rate - expected_fraud_rate) / expected_fraud_rate * 100,
'severe_imbalance': abs(fraud_rate - expected_fraud_rate) > 0.05
}
return results
def validate_and_fail_if_needed(
current_df: pd.DataFrame,
reference_df: pd.DataFrame
):
results = validate_data_distribution(current_df, reference_df)
failures = [
col for col, result in results.items()
if result.get('distribution_shifted') or result.get('severe_imbalance')
]
if failures:
raise ValueError(
f"Data distribution validation failed for: {failures}. "
f"Results: {json.dumps(results, indent=2)}"
)
logging.info("Distribution validation passed")
return resultsVolume validation ensures the data pipeline delivered approximately the expected amount of data:
def validate_data_volume(
df: pd.DataFrame,
expected_min_rows: int,
expected_max_rows: int,
date_range: tuple
):
actual_rows = len(df)
if actual_rows < expected_min_rows:
raise ValueError(
f"Data volume too low: {actual_rows} rows "
f"(expected >= {expected_min_rows}). "
f"Possible upstream pipeline failure or data source issue."
)
if actual_rows > expected_max_rows:
raise ValueError(
f"Data volume too high: {actual_rows} rows "
f"(expected <= {expected_max_rows}). "
f"Possible data duplication or incorrect date range."
)
# Check for date coverage - no large gaps
timestamps = pd.to_datetime(df['timestamp'])
date_coverage = (timestamps.max() - timestamps.min()).days
expected_days = (date_range[1] - date_range[0]).days
if date_coverage < expected_days * 0.9:
raise ValueError(
f"Incomplete date coverage: {date_coverage} days "
f"(expected {expected_days} days). "
f"Possible gap in data."
)
logging.info(
f"Volume validation passed: {actual_rows} rows "
f"covering {date_coverage} days"
)Great Expectations is the most widely used library for production data validation in ML pipelines, providing a rich suite of expectations and integration with pipeline orchestrators:
import great_expectations as ge
def create_expectation_suite(suite_name: str) -> ge.ExpectationSuite:
context = ge.get_context()
suite = context.create_expectation_suite(suite_name, overwrite_existing=True)
validator = context.get_validator(
batch_request=...,
expectation_suite=suite
)
validator.expect_column_values_to_not_be_null("transaction_amount")
validator.expect_column_values_to_be_between(
"transaction_amount", min_value=0, max_value=1_000_000
)
validator.expect_column_proportion_of_unique_values_to_be_between(
"transaction_id", min_value=0.99 # Almost all IDs should be unique
)
validator.expect_column_values_to_be_in_set(
"merchant_category",
['grocery', 'electronics', 'restaurant', 'travel', 'other']
)
validator.save_expectation_suite()
return suite8. Automated Model Evaluation
Automated model evaluation is the systematic, code-defined process of assessing whether a trained model meets the quality bar required for deployment. Rather than a data scientist manually reviewing metrics and making a judgment call, automated evaluation applies predefined criteria and makes a deterministic pass/fail decision.
The goal is to make the deployment decision consistent, auditable, and fast. Consistent because the same criteria are applied to every model version — no variation based on who’s reviewing or how busy they are. Auditable because the evaluation results are recorded and linked to the model version. Fast because it runs automatically as part of the pipeline without waiting for human review.
Absolute threshold evaluation checks that metrics exceed minimum acceptable values:
class ModelEvaluator:
def __init__(self, model, test_data: pd.DataFrame, config: dict):
self.model = model
self.test_data = test_data
self.config = config
self.results = {}
def evaluate(self) -> dict:
X_test = self.test_data[self.config['feature_columns']]
y_test = self.test_data[self.config['target_column']]
y_prob = self.model.predict_proba(X_test)[:, 1]
y_pred = (y_prob >= 0.5).astype(int)
self.results = {
'auc_roc': roc_auc_score(y_test, y_prob),
'auc_pr': average_precision_score(y_test, y_prob),
'precision': precision_score(y_test, y_pred),
'recall': recall_score(y_test, y_pred),
'f1': f1_score(y_test, y_pred),
'test_samples': len(y_test),
'positive_rate': y_test.mean(),
}
# Segment evaluation
for segment in self.config.get('evaluation_segments', []):
mask = self.test_data[segment['column']] == segment['value']
if mask.sum() >= 100: # Only evaluate if enough samples
segment_y = y_test[mask]
segment_prob = y_prob[mask]
self.results[f"auc_roc_{segment['name']}"] = (
roc_auc_score(segment_y, segment_prob)
)
return self.results
def check_thresholds(self, thresholds: dict) -> tuple[bool, list]:
"""Returns (passed, list of failures)"""
failures = []
for metric, min_value in thresholds.items():
if metric in self.results:
actual = self.results[metric]
if actual < min_value:
failures.append(
f"{metric}: {actual:.4f} < {min_value:.4f} (minimum)"
)
return len(failures) == 0, failuresComparative evaluation compares the candidate model against the current production model — the champion-challenger comparison:
def compare_against_production(
candidate_version: str,
production_version: str,
test_data: pd.DataFrame,
minimum_relative_improvement: float = -0.005 # Allow 0.5% degradation
) -> tuple[bool, dict]:
candidate_model = load_model_version(candidate_version)
production_model = load_model_version(production_version)
X = test_data[feature_columns]
y = test_data['is_fraud']
candidate_auc = roc_auc_score(y, candidate_model.predict_proba(X)[:, 1])
production_auc = roc_auc_score(y, production_model.predict_proba(X)[:, 1])
relative_change = (candidate_auc - production_auc) / production_auc
results = {
'candidate_auc': candidate_auc,
'production_auc': production_auc,
'relative_change': relative_change,
'absolute_change': candidate_auc - production_auc,
'candidate_better': candidate_auc > production_auc,
}
# Statistical significance test
candidate_probs = candidate_model.predict_proba(X)[:, 1]
production_probs = production_model.predict_proba(X)[:, 1]
from scipy import stats
t_stat, p_value = stats.ttest_rel(candidate_probs, production_probs)
results['p_value'] = p_value
results['statistically_significant'] = p_value < 0.05
# Decision: approve if not significantly worse
approved = relative_change >= minimum_relative_improvement
if not approved:
results['rejection_reason'] = (
f"Candidate model ({candidate_auc:.4f}) is significantly worse "
f"than production ({production_auc:.4f}): "
f"{relative_change*100:.2f}% change"
)
return approved, resultsLatency evaluation — a model that’s accurate but too slow is not deployable:
import time
import numpy as np
def evaluate_latency(model, sample_inputs: list, slo_p99_ms: float) -> dict:
latencies_ms = []
# Warmup
for _ in range(10):
model.predict_proba(sample_inputs[:1])
# Measure
for input_batch in sample_inputs:
start = time.perf_counter()
model.predict_proba([input_batch])
latencies_ms.append((time.perf_counter() - start) * 1000)
results = {
'p50_ms': np.percentile(latencies_ms, 50),
'p95_ms': np.percentile(latencies_ms, 95),
'p99_ms': np.percentile(latencies_ms, 99),
'max_ms': max(latencies_ms),
'slo_p99_ms': slo_p99_ms,
'slo_passed': np.percentile(latencies_ms, 99) <= slo_p99_ms
}
return resultsEvaluation reporting — all evaluation results are recorded as model metadata for auditability:
def record_evaluation_results(
model_version: str,
evaluation_results: dict,
approved: bool,
approver: str = "automated"
):
client = mlflow.tracking.MlflowClient()
# Log all metrics
for metric_name, value in evaluation_results.items():
if isinstance(value, (int, float)):
client.set_model_version_tag(
name="fraud-detection",
version=model_version,
key=f"eval.{metric_name}",
value=str(value)
)
# Record decision
client.set_model_version_tag(
name="fraud-detection",
version=model_version,
key="eval.approved",
value=str(approved)
)
client.set_model_version_tag(
name="fraud-detection",
version=model_version,
key="eval.approver",
value=approver
)
client.set_model_version_tag(
name="fraud-detection",
version=model_version,
key="eval.timestamp",
value=datetime.utcnow().isoformat()
)9. Model Promotion Strategies (Staging → Production)
Model promotion is the controlled process of moving a validated model version through environments toward production. The strategy for how promotion happens — what gates must pass, who must approve, how much traffic the model receives at each stage — determines the balance between deployment velocity and deployment risk.
The fundamental principle of model promotion is the same as the fundamental principle of canary deployments for software: validate on progressively larger audiences with progressively more real-world conditions before full exposure. The difference from software is that you’re also running the model against real user behavior, which is the most important validation possible — offline metrics sometimes don’t predict online performance.
The promotion stages:
Staging is a full production-like environment (same infrastructure, same feature pipeline, same monitoring) but not serving real users. You deploy the candidate model to staging, run integration tests against realistic synthetic data or replayed production traffic, and verify that the model’s serving infrastructure works correctly with the new version. The primary validation here is operational — does the model load correctly, does the API contract work, does monitoring detect the model correctly, does the health endpoint report readiness correctly.
Shadow (optional but powerful) deploys the candidate model to a shadow environment that receives copies of all production requests. The shadow model processes every request but its predictions are discarded — users receive responses from the production model only. This gives you real-world distribution validation without any user impact. You compare shadow predictions against production predictions to understand how the models agree and disagree.
Canary exposes the candidate model to a small percentage of real users (typically 5-10%). You watch online metrics carefully — error rates, latency, user engagement signals, business KPIs. If the canary shows good behavior, you progressively increase the percentage. If anything looks wrong, you immediately drain traffic back to the production model.
Full production — after successful canary validation, the candidate model handles 100% of traffic. The previous production model is archived but kept available for rapid rollback.
Implementing promotion gates:
class ModelPromotionController:
def __init__(self, model_name: str, candidate_version: str):
self.model_name = model_name
self.candidate_version = candidate_version
self.client = mlflow.tracking.MlflowClient()
def can_promote_to_staging(self) -> tuple[bool, str]:
"""Check if model passed evaluation gates"""
version = self.client.get_model_version(
self.model_name, self.candidate_version
)
approved = version.tags.get('eval.approved', 'False') == 'True'
auc = float(version.tags.get('eval.auc_roc', 0))
latency_ok = version.tags.get('eval.slo_passed', 'False') == 'True'
if not approved:
return False, "Model evaluation not approved"
if auc < 0.92:
return False, f"AUC {auc:.4f} below minimum threshold 0.92"
if not latency_ok:
return False, "Model latency SLO not met"
return True, "All staging promotion gates passed"
def promote_to_staging(self) -> bool:
can_promote, reason = self.can_promote_to_staging()
if not can_promote:
logging.error(f"Cannot promote to staging: {reason}")
return False
self.client.transition_model_version_stage(
name=self.model_name,
version=self.candidate_version,
stage="Staging"
)
# Trigger deployment to staging environment
self._deploy_to_environment("staging", traffic_pct=100)
self._run_staging_smoke_tests()
logging.info(f"Successfully promoted {self.candidate_version} to Staging")
return True
def promote_to_canary(self, initial_traffic_pct: int = 5) -> bool:
"""Promote staging model to production canary"""
# Verify staging health
if not self._verify_staging_health():
return False
# Deploy as canary
self._deploy_to_environment("production", traffic_pct=initial_traffic_pct)
# Monitor canary
monitoring_results = self._monitor_canary(
duration_minutes=30,
traffic_pct=initial_traffic_pct
)
if not monitoring_results['healthy']:
logging.error(f"Canary unhealthy: {monitoring_results}")
self._drain_canary_traffic()
return False
return True
def promote_to_production(self) -> bool:
"""Promote canary to full production"""
# Progressively increase traffic
for traffic_pct in [10, 25, 50, 100]:
self._set_canary_traffic(traffic_pct)
if traffic_pct < 100:
monitoring_results = self._monitor_canary(
duration_minutes=15,
traffic_pct=traffic_pct
)
if not monitoring_results['healthy']:
self._drain_canary_traffic()
return False
# Archive previous production model
self._archive_previous_production()
# Update registry
self.client.transition_model_version_stage(
name=self.model_name,
version=self.candidate_version,
stage="Production"
)
return True10. Feature Pipeline Automation
Feature pipelines are the systems that compute features — transforming raw data into the inputs your models use. Feature pipeline automation means these computations run reliably, on schedule or on trigger, producing fresh features that both training and serving can use with guaranteed consistency.
The feature pipeline challenge for ML systems is the training-serving skew problem. If feature computation during training uses different code than feature computation during serving, models trained on computed features see different inputs than they receive during inference. Even subtle differences — different null handling, different encoding logic, different aggregation windows — produce predictions that are consistently slightly wrong in ways that are very hard to diagnose.
Automated feature pipelines solve this by making feature computation a first-class, versioned, centrally managed concern rather than ad-hoc code in training notebooks and serving scripts.
Batch feature pipeline runs on a schedule and computes features for all entities:
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import redis
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def fetch_raw_transactions(start_time: str, end_time: str) -> pd.DataFrame:
"""Fetch raw transaction data from data warehouse"""
query = f"""
SELECT
user_id,
transaction_id,
amount,
merchant_category,
timestamp,
is_fraud
FROM transactions
WHERE timestamp BETWEEN '{start_time}' AND '{end_time}'
"""
return execute_warehouse_query(query)
@task
def compute_user_aggregation_features(
transactions: pd.DataFrame,
lookback_days: int = 30
) -> pd.DataFrame:
"""Compute rolling aggregation features per user"""
transactions['timestamp'] = pd.to_datetime(transactions['timestamp'])
# Sort for correct rolling computation
transactions = transactions.sort_values(['user_id', 'timestamp'])
features = transactions.groupby('user_id').apply(
lambda user_txns: pd.Series({
'txn_count_30d': len(user_txns[
user_txns['timestamp'] >=
user_txns['timestamp'].max() - pd.Timedelta(days=lookback_days)
]),
'avg_amount_30d': user_txns[
user_txns['timestamp'] >=
user_txns['timestamp'].max() - pd.Timedelta(days=lookback_days)
]['amount'].mean(),
'unique_merchants_30d': user_txns[
user_txns['timestamp'] >=
user_txns['timestamp'].max() - pd.Timedelta(days=lookback_days)
]['merchant_category'].nunique(),
'max_amount_30d': user_txns[
user_txns['timestamp'] >=
user_txns['timestamp'].max() - pd.Timedelta(days=lookback_days)
]['amount'].max(),
})
).reset_index()
return features
@task
def write_features_to_store(
features: pd.DataFrame,
feature_group: str,
timestamp: str
):
"""Write computed features to feature store"""
r = redis.Redis(host='feature-store', port=6379)
for _, row in features.iterrows():
key = f"{feature_group}:{row['user_id']}:{timestamp}"
feature_values = row.drop('user_id').to_dict()
# Store with TTL matching lookback window
r.hset(key, mapping=feature_values)
r.expire(key, 86400 * 35) # 35 days TTL
# Also write to feature registry for training data assembly
features['computed_at'] = timestamp
features['feature_group'] = feature_group
features.to_parquet(
f"s3://feature-store/{feature_group}/{timestamp}.parquet"
)
@flow(name="user-feature-pipeline")
def compute_user_features(
start_time: str,
end_time: str
):
"""Complete feature computation pipeline for user-level features"""
# Fetch raw data
raw_transactions = fetch_raw_transactions(start_time, end_time)
# Compute features
user_features = compute_user_aggregation_features(raw_transactions)
# Write to store
write_features_to_store(
user_features,
feature_group="user_transaction_stats",
timestamp=end_time
)
return user_features
if __name__ == "__main__":
# Deploy as a scheduled Prefect deployment
from prefect.deployments import Deployment
deployment = Deployment.build_from_flow(
flow=compute_user_features,
name="daily-user-features",
schedule={"cron": "0 2 * * *"} # Run at 2am daily
)
deployment.apply()Real-time feature computation for serving uses the same feature logic but runs on-demand for a specific user/entity at request time:
class FeatureServer:
def __init__(self):
self.redis = redis.Redis(host='feature-store', port=6379)
self.warehouse = WarehouseClient()
def get_features_for_inference(
self,
user_id: str,
transaction: dict
) -> dict:
"""Get complete feature set for real-time inference"""
# Real-time features from the current transaction
real_time_features = {
'transaction_amount': transaction['amount'],
'hour_of_day': pd.Timestamp(transaction['timestamp']).hour,
'day_of_week': pd.Timestamp(transaction['timestamp']).dayofweek,
'merchant_category_encoded': self._encode_merchant(
transaction['merchant_category']
)
}
# Pre-computed features from feature store
# These were computed by the batch pipeline
stored_key = f"user_transaction_stats:{user_id}:latest"
stored_features = self.redis.hgetall(stored_key)
if stored_features:
pre_computed = {
k.decode(): float(v)
for k, v in stored_features.items()
}
else:
# Fallback: compute on-demand if not in cache
pre_computed = self._compute_on_demand(user_id)
return {**real_time_features, **pre_computed}This pattern — batch computation for expensive aggregate features, real-time computation for transaction-specific features, with the same feature logic used in both — eliminates training-serving skew at the feature level.
11. Trigger-Based Retraining
Scheduled retraining is simple but wasteful — you retrain on a fixed schedule whether or not the model actually needs it. Trigger-based retraining is smarter: retraining happens when specific conditions indicate that a new model version is needed, whether that’s a week or three months after the last training run.
The triggers for retraining fall into several categories, each indicating different types of model degradation or business change.
Data volume triggers — retrain when enough new labeled data has accumulated. If you need at minimum 100,000 new fraud labels to meaningfully improve the model, trigger retraining when 100,000 new labeled transactions are available since the last training run:
from prefect import flow, task
import mlflow
@task
def check_new_data_volume(
last_training_timestamp: str,
minimum_new_samples: int
) -> tuple[bool, int]:
"""Check if enough new labeled data exists for retraining"""
query = f"""
SELECT COUNT(*) as new_samples
FROM labeled_transactions
WHERE
labeled_at > '{last_training_timestamp}'
AND label_confidence >= 0.9
"""
result = execute_warehouse_query(query)
new_samples = result['new_samples'].iloc[0]
should_retrain = new_samples >= minimum_new_samples
if should_retrain:
logging.info(f"Retraining triggered: {new_samples} new labeled samples available")
else:
logging.info(
f"Not enough new data: {new_samples}/{minimum_new_samples} samples"
)
return should_retrain, new_samples
@task
def check_performance_degradation(
model_name: str,
stage: str = "Production",
degradation_threshold: float = 0.02
) -> tuple[bool, dict]:
"""Check if production model performance has degraded below threshold"""
# Get recent production metrics from monitoring system
recent_metrics = get_monitoring_metrics(
model_name=model_name,
metric="prediction_accuracy",
window_days=7
)
baseline_metrics = get_monitoring_metrics(
model_name=model_name,
metric="prediction_accuracy",
window_days=7,
offset_days=30 # Compare against a month ago
)
recent_accuracy = recent_metrics['accuracy'].mean()
baseline_accuracy = baseline_metrics['accuracy'].mean()
degradation = (baseline_accuracy - recent_accuracy) / baseline_accuracy
should_retrain = degradation >= degradation_threshold
return should_retrain, {
'recent_accuracy': recent_accuracy,
'baseline_accuracy': baseline_accuracy,
'degradation_pct': degradation * 100
}
@task
def check_data_drift(
reference_dataset_path: str,
current_window_days: int = 7,
drift_threshold: float = 0.1
) -> tuple[bool, dict]:
"""Check if input data distribution has drifted significantly"""
from evidently.metric_preset import DataDriftPreset
from evidently.report import Report
reference = pd.read_parquet(reference_dataset_path)
current = fetch_recent_data(days=current_window_days)
drift_report = Report(metrics=[DataDriftPreset()])
drift_report.run(
reference_data=reference,
current_data=current
)
drift_result = drift_report.as_dict()
# How many features have drifted?
n_drifted = drift_result['metrics'][0]['result']['number_of_drifted_columns']
n_total = drift_result['metrics'][0]['result']['number_of_columns']
drift_fraction = n_drifted / n_total
should_retrain = drift_fraction >= drift_threshold
return should_retrain, {
'drifted_features': n_drifted,
'total_features': n_total,
'drift_fraction': drift_fraction,
'drift_details': drift_result
}
@flow(name="retraining-trigger-check")
def evaluate_retraining_triggers():
"""Evaluate all retraining triggers and launch training if needed"""
# Get last training run info
client = mlflow.tracking.MlflowClient()
latest_versions = client.get_latest_versions("fraud-detection", stages=["Production"])
if not latest_versions:
logging.info("No production model found - triggering training")
trigger_training_pipeline()
return
production_version = latest_versions[0]
last_training_time = production_version.creation_timestamp
# Check all triggers
data_trigger, new_samples = check_new_data_volume(
last_training_timestamp=last_training_time,
minimum_new_samples=50_000
)
perf_trigger, perf_metrics = check_performance_degradation("fraud-detection")
drift_trigger, drift_metrics = check_data_drift(
reference_dataset_path="s3://ml-data/reference/training_data.parquet"
)
# Retrain if any trigger fires
triggers_fired = {
'data_volume': data_trigger,
'performance_degradation': perf_trigger,
'data_drift': drift_trigger
}
if any(triggers_fired.values()):
logging.info(f"Retraining triggered by: {triggers_fired}")
trigger_training_pipeline(trigger_reason=triggers_fired)
else:
logging.info("No retraining triggers fired")12. Model Drift Detection Concepts
Model drift is the degradation of model performance over time due to changes in the relationship between input features and the target variable, or changes in the distribution of input data itself. Understanding the types of drift and how to detect them is essential for maintaining model quality in production.
There are two fundamentally different types of drift, each requiring different detection approaches.
Data drift (covariate shift) — the distribution of input features changes, but the relationship between features and the target stays the same. Imagine a fraud detection model trained when most transactions were in-person retail. After e-commerce becomes dominant, the distribution of merchant categories, transaction amounts, and timing patterns is very different from training — but the patterns that indicate fraud haven’t fundamentally changed. The model may still work, but features are in distribution ranges the model rarely saw during training.
Concept drift — the underlying relationship between features and the target changes. The model’s learned patterns are no longer accurate representations of reality. A fraud model trained before a major fraudster technique evolved — say, before synthetic identity fraud became common — might see inputs that look normal by historical standards but are actually fraudulent by new techniques. The statistical relationship between the features and the fraud label has changed.
Data drift is detectable from features alone — you don’t need labels. Concept drift is only fully detectable from labels (comparing predictions to actual outcomes), which are often delayed or unavailable for real-time systems.
Detecting data drift with statistical tests:
import pandas as pd
import numpy as np
from scipy import stats
from evidently.report import Report
from evidently.metrics import DataDriftTable
class DataDriftDetector:
def __init__(self, reference_data: pd.DataFrame, feature_columns: list):
self.reference = reference_data[feature_columns]
self.feature_columns = feature_columns
self.drift_history = []
def detect_drift(
self,
current_data: pd.DataFrame,
significance_level: float = 0.05
) -> dict:
current = current_data[self.feature_columns]
drift_results = {}
for column in self.feature_columns:
ref_values = self.reference[column].dropna()
cur_values = current[column].dropna()
# Statistical test depends on feature type
if self.reference[column].dtype in [np.float64, np.float32]:
# Continuous features: KS test
statistic, p_value = stats.ks_2samp(ref_values, cur_values)
test_name = "kolmogorov_smirnov"
else:
# Categorical features: chi-squared test
# Align categories
all_categories = set(ref_values) | set(cur_values)
ref_counts = ref_values.value_counts().reindex(all_categories, fill_value=0)
cur_counts = cur_values.value_counts().reindex(all_categories, fill_value=0)
statistic, p_value = stats.chisquare(cur_counts, f_exp=ref_counts)
test_name = "chi_squared"
drifted = p_value < significance_level
# Population Stability Index (PSI) for additional signal
psi = self._compute_psi(ref_values, cur_values)
drift_results[column] = {
'test': test_name,
'statistic': statistic,
'p_value': p_value,
'drifted': drifted,
'psi': psi,
'psi_interpretation': (
'stable' if psi < 0.1
else 'moderate_drift' if psi < 0.25
else 'severe_drift'
),
'reference_mean': float(ref_values.mean()) if ref_values.dtype in [np.float64, np.float32] else None,
'current_mean': float(cur_values.mean()) if cur_values.dtype in [np.float64, np.float32] else None,
}
overall_drift = {
'n_features_drifted': sum(1 for v in drift_results.values() if v['drifted']),
'n_features_total': len(self.feature_columns),
'fraction_drifted': sum(1 for v in drift_results.values() if v['drifted']) / len(self.feature_columns),
'feature_results': drift_results,
'detection_timestamp': pd.Timestamp.utcnow().isoformat()
}
self.drift_history.append(overall_drift)
return overall_drift
def _compute_psi(self, reference: pd.Series, current: pd.Series, bins: int = 10) -> float:
"""Population Stability Index - industry standard drift metric"""
# Bin the data
ref_percentiles = np.percentile(reference, np.linspace(0, 100, bins + 1))
ref_counts, _ = np.histogram(reference, bins=ref_percentiles)
cur_counts, _ = np.histogram(current, bins=ref_percentiles)
# Convert to proportions
ref_pct = ref_counts / len(reference) + 1e-4 # Avoid division by zero
cur_pct = cur_counts / len(current) + 1e-4
# PSI formula
psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
return psiDetecting concept drift with prediction monitoring:
class ConceptDriftDetector:
def __init__(self, model_name: str):
self.model_name = model_name
self.drift_detected = False
def monitor_prediction_calibration(
self,
predictions: np.ndarray,
actuals: np.ndarray,
window_days: int = 7
) -> dict:
"""
Compare model's calibration over time.
When a model becomes miscalibrated, it's a sign of concept drift.
"""
from sklearn.calibration import calibration_curve
prob_true, prob_pred = calibration_curve(
actuals, predictions, n_bins=10
)
# Measure calibration error
calibration_error = np.mean(np.abs(prob_true - prob_pred))
return {
'calibration_error': calibration_error,
'prob_true': prob_true.tolist(),
'prob_pred': prob_pred.tolist(),
'well_calibrated': calibration_error < 0.05
}
def detect_with_page_hinkley(
self,
error_stream: list,
threshold: float = 10,
delta: float = 0.005
) -> bool:
"""
Page-Hinkley test for detecting change in error rate.
Classic sequential change point detection algorithm.
"""
sum_changes = 0
min_sum = 0
mean_error = np.mean(error_stream[:100]) # Initial estimate
for error in error_stream[100:]:
sum_changes += error - mean_error - delta
min_sum = min(min_sum, sum_changes)
if sum_changes - min_sum > threshold:
logging.warning("Concept drift detected by Page-Hinkley test")
return True
return False13. Automated Model Rollbacks
When a newly deployed model version shows problems — elevated error rates, unexpected prediction distributions, degraded business metrics, or failing health checks — you need to get back to a known-good state as quickly as possible. Automated rollbacks do this without waiting for human intervention during an incident.
The fundamental challenge of ML model rollbacks compared to software rollbacks is that ML failures can be subtle and delayed. A software rollback is triggered by obvious signals — the service is returning 500 errors, it’s crashed, requests are timing out. Model quality failures can be subtle — the error rate looks fine, the latency is normal, but prediction confidence has dropped or a specific user segment is being systematically mis-predicted. These require model-specific monitoring signals beyond standard infrastructure metrics.
Automatic rollback triggers:
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class RollbackTrigger:
metric_name: str
threshold: float
comparison: str # 'greater_than', 'less_than'
window_minutes: int
consecutive_violations: int # How many violations before triggering
class AutomaticRollbackController:
def __init__(
self,
model_name: str,
current_version: str,
previous_version: str,
triggers: list[RollbackTrigger]
):
self.model_name = model_name
self.current_version = current_version
self.previous_version = previous_version
self.triggers = triggers
self.rollback_executed = False
self.violation_counts = {t.metric_name: 0 for t in triggers}
def monitor_and_rollback(self, monitoring_duration_minutes: int = 60):
"""Monitor deployment and roll back if triggers fire"""
start_time = time.time()
check_interval_seconds = 30
while (time.time() - start_time) < monitoring_duration_minutes * 60:
if self.rollback_executed:
break
# Check each trigger
for trigger in self.triggers:
current_value = self._get_metric(
trigger.metric_name,
trigger.window_minutes
)
violation = (
(trigger.comparison == 'greater_than' and current_value > trigger.threshold) or
(trigger.comparison == 'less_than' and current_value < trigger.threshold)
)
if violation:
self.violation_counts[trigger.metric_name] += 1
logging.warning(
f"Rollback trigger violation: {trigger.metric_name} = "
f"{current_value:.4f} (threshold: {trigger.threshold:.4f}). "
f"Violation count: {self.violation_counts[trigger.metric_name]}/"
f"{trigger.consecutive_violations}"
)
if self.violation_counts[trigger.metric_name] >= trigger.consecutive_violations:
self._execute_rollback(
reason=f"{trigger.metric_name} violated threshold "
f"{trigger.consecutive_violations} times"
)
return
else:
# Reset violation count on healthy check
self.violation_counts[trigger.metric_name] = 0
time.sleep(check_interval_seconds)
if not self.rollback_executed:
logging.info("Monitoring period complete, no rollback needed")
def _execute_rollback(self, reason: str):
"""Execute automatic rollback to previous model version"""
logging.critical(
f"AUTOMATIC ROLLBACK TRIGGERED for {self.model_name}. "
f"Reason: {reason}"
)
try:
# 1. Immediately drain traffic from current version
self._update_traffic_routing(
current_version_pct=0,
previous_version_pct=100
)
# 2. Update model registry
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name=self.model_name,
version=self.current_version,
stage="Staging" # Demote from production
)
client.transition_model_version_stage(
name=self.model_name,
version=self.previous_version,
stage="Production" # Restore previous
)
# 3. Update GitOps repository
self._update_gitops_configuration(
target_version=self.previous_version,
commit_message=f"AUTOMATIC ROLLBACK: {reason}"
)
# 4. Record rollback as incident
self._create_incident(
title=f"Automatic rollback: {self.model_name}",
description=reason,
severity="high"
)
# 5. Notify team
self._send_alert(
channel="#ml-alerts",
message=(
f"🚨 AUTOMATIC ROLLBACK EXECUTED\n"
f"Model: {self.model_name}\n"
f"Rolled back from v{self.current_version} "
f"to v{self.previous_version}\n"
f"Reason: {reason}"
)
)
self.rollback_executed = True
except Exception as e:
logging.critical(f"Rollback execution failed: {e}")
self._escalate_to_oncall(
message=f"Automatic rollback FAILED for {self.model_name}: {e}"
)
def _get_metric(self, metric_name: str, window_minutes: int) -> float:
"""Query monitoring system for current metric value"""
query = f"""
SELECT avg({metric_name})
FROM model_metrics
WHERE model_name = '{self.model_name}'
AND timestamp > NOW() - INTERVAL '{window_minutes} minutes'
"""
result = execute_metrics_query(query)
return result['avg'].iloc[0]
# Usage: deploy with automatic rollback monitoring
controller = AutomaticRollbackController(
model_name="fraud-detection",
current_version="15",
previous_version="14",
triggers=[
RollbackTrigger(
metric_name="error_rate",
threshold=0.02,
comparison="greater_than",
window_minutes=5,
consecutive_violations=3
),
RollbackTrigger(
metric_name="prediction_confidence_mean",
threshold=0.5,
comparison="less_than",
window_minutes=5,
consecutive_violations=5
),
RollbackTrigger(
metric_name="p99_latency_ms",
threshold=500,
comparison="greater_than",
window_minutes=5,
consecutive_violations=3
),
]
)
controller.monitor_and_rollback(monitoring_duration_minutes=60)GitOps-based rollback — updating the GitOps repository to the previous model version creates an auditable record of the rollback:
def rollback_via_gitops(
model_name: str,
rollback_to_version: str,
reason: str
):
"""Rollback by updating GitOps repository"""
import subprocess
# Clone GitOps repo
subprocess.run([
"git", "clone",
"https://github.com/myorg/gitops-config.git",
"/tmp/gitops"
], check=True)
# Update model version in production values
values_path = f"/tmp/gitops/environments/production/{model_name}/values.yaml"
with open(values_path, 'r') as f:
content = f.read()
content = re.sub(
r'modelVersion:.*',
f'modelVersion: "{rollback_to_version}"',
content
)
with open(values_path, 'w') as f:
f.write(content)
# Commit with detailed message
subprocess.run([
"git", "-C", "/tmp/gitops", "commit", "-am",
f"ROLLBACK: {model_name} to {rollback_to_version}\n\n"
f"Reason: {reason}\n"
f"Timestamp: {datetime.utcnow().isoformat()}\n"
f"Automated rollback triggered by monitoring system"
], check=True)
subprocess.run(["git", "-C", "/tmp/gitops", "push"], check=True)
# ArgoCD will detect the change and deploy automatically14. GitOps for ML Deployments
GitOps for ML combines the GitOps principles we covered in the GitOps section with the specific requirements of ML model deployments. The result is a deployment model where Git is the source of truth not just for application configuration but also for which model version should be deployed where, with automated reconciliation ensuring that the running system always matches what Git declares.
The unique challenge for ML GitOps compared to software GitOps is that ML deployments have more state: the deployed code (the serving infrastructure), the deployed model version (the artifacts), the deployed configuration (feature pipeline settings, evaluation thresholds), and the monitoring configuration (drift detection settings, alert thresholds). All of this state needs to be captured in Git.
GitOps repository structure for ML:
gitops-ml/
├── applications/
│ ├── fraud-detection/
│ │ ├── base/
│ │ │ ├── kustomization.yaml
│ │ │ ├── deployment.yaml
│ │ │ └── service.yaml
│ │ └── overlays/
│ │ ├── staging/
│ │ │ ├── kustomization.yaml
│ │ │ └── model-config.yaml # Staging model version
│ │ └── production/
│ │ ├── kustomization.yaml
│ │ └── model-config.yaml # Production model version
│ └── recommendation-model/
│ └── ...
├── pipelines/
│ ├── training/
│ │ └── fraud-detection-pipeline.yaml # Kubeflow Pipeline definition
│ └── evaluation/
│ └── evaluation-config.yaml
├── monitoring/
│ ├── alerts/
│ │ └── fraud-detection-alerts.yaml
│ └── dashboards/
│ └── fraud-detection-dashboard.json
└── policies/
└── model-promotion-policy.yamlModel version as GitOps configuration:
# overlays/production/model-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: fraud-detection-model-config
namespace: ml-production
data:
model_name: "fraud-detection"
model_version: "14" # This is what CI updates
model_stage: "Production"
registry_uri: "s3://mlflow-artifacts/1/abc123def456"
serving_threshold: "0.7"
feature_schema_version: "v2"
deployed_at: "2024-03-15T14:23:00Z"
deployed_by: "ci-bot"
training_run_id: "run-9f7c3e2a"The CI/CD to GitOps handoff for ML:
# GitHub Actions: Update GitOps after successful model training and evaluation
name: Update Model Deployment
on:
workflow_dispatch:
inputs:
model_version:
required: true
environment:
required: true
default: 'staging'
jobs:
update-gitops:
runs-on: ubuntu-latest
steps:
- name: Checkout GitOps repository
uses: actions/checkout@v4
with:
repository: myorg/gitops-ml
token: ${{ secrets.GITOPS_TOKEN }}
path: gitops
- name: Get model metadata from MLflow
id: model-meta
run: |
python << 'EOF'
import mlflow
import os
client = mlflow.tracking.MlflowClient()
version = client.get_model_version("fraud-detection", "${{ github.event.inputs.model_version }}")
print(f"::set-output name=registry_uri::{version.source}")
print(f"::set-output name=training_run_id::{version.run_id}")
EOF
- name: Update model version in GitOps
run: |
ENV="${{ github.event.inputs.environment }}"
VERSION="${{ github.event.inputs.model_version }}"
CONFIG_PATH="gitops/applications/fraud-detection/overlays/${ENV}/model-config.yaml"
# Update model version
yq eval ".data.model_version = \"${VERSION}\"" -i "$CONFIG_PATH"
yq eval ".data.deployed_at = \"$(date -u +%Y-%m-%dT%H:%M:%SZ)\"" -i "$CONFIG_PATH"
yq eval ".data.deployed_by = \"${{ github.actor }}\"" -i "$CONFIG_PATH"
yq eval ".data.training_run_id = \"${{ steps.model-meta.outputs.training_run_id }}\"" -i "$CONFIG_PATH"
- name: Create pull request for production
if: github.event.inputs.environment == 'production'
uses: peter-evans/create-pull-request@v5
with:
path: gitops
commit-message: "deploy: fraud-detection v${{ github.event.inputs.model_version }} to production"
title: "Deploy fraud-detection v${{ github.event.inputs.model_version }} to production"
body: |
## Model Deployment
**Model:** fraud-detection
**Version:** ${{ github.event.inputs.model_version }}
**Training Run:** ${{ steps.model-meta.outputs.training_run_id }}
### Validation Results
See MLflow experiment: [Training Run](${{ env.MLFLOW_URL }}/runs/${{ steps.model-meta.outputs.training_run_id }})
### Checklist
- [ ] Reviewed evaluation metrics
- [ ] Reviewed comparison against current production
- [ ] Reviewed latency benchmarks
reviewers: ml-platform-team
- name: Auto-commit for non-production
if: github.event.inputs.environment != 'production'
run: |
cd gitops
git config user.email "ci@company.com"
git config user.name "CI Bot"
git add -A
git commit -m "deploy: fraud-detection v${{ github.event.inputs.model_version }} to ${{ github.event.inputs.environment }}"
git pushArgoCD Application watching for model changes:
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: fraud-detection-production
namespace: argocd
annotations:
notifications.argoproj.io/subscribe.on-sync-succeeded.slack: ml-deployments
notifications.argoproj.io/subscribe.on-sync-failed.slack: ml-alerts
spec:
project: ml-platform
source:
repoURL: https://github.com/myorg/gitops-ml.git
targetRevision: main
path: applications/fraud-detection/overlays/production
destination:
server: https://kubernetes.default.svc
namespace: ml-production
syncPolicy:
automated:
prune: true
selfHeal: true
syncOptions:
- CreateNamespace=true
# Health check knows about ML-specific resources
ignoreDifferences:
- group: ""
kind: ConfigMap
jsonPointers:
- /data/deployed_at # Ignore timestamp field to prevent constant reconciliationSelf-healing for ML in GitOps — the selfHeal property means that if someone manually changes the model version on a running pod or updates a ConfigMap directly on the cluster, ArgoCD automatically reverts it to what Git declares. This is essential for ML deployments because it prevents the scenario where someone makes a “quick fix” model swap directly in the cluster without updating Git, leaving Git out of sync and creating confusion about what’s actually running in production.
The thread connecting all of them is this: ML systems require all the automation discipline of software CI/CD plus an additional layer specific to the statistical, data-dependent nature of ML. Code can be tested for correctness. Models must be tested for quality, which is a different and harder problem. Data pipelines must be validated, not just run. Deployment must be graduated because model failures can be subtle and statistical rather than immediately obvious. Monitoring must cover not just infrastructure but model behavior over time. And the training process itself must be triggered intelligently based on signals that the model needs updating. Building this complete automation stack is what makes ML systems reliable and maintainable at production scale.