MLOps Pipelines: Automating the ML Lifecycle
Build end-to-end ML pipelines with automated training and deployment—but complexity breeds hidden failures
MLOps Pipelines: Automating the ML Lifecycle
MLOps brings DevOps practices to machine learning. This guide implements production ML pipelines with Kubeflow and MLflow.
MLflow Experiment Tracking
Track experiments with automatic logging:
import mlflow
import mlflow.pytorch
from sklearn.metrics import accuracy_score, f1_score
import torch
import torch.nn as nn
class MLflowTrainer:
"""Training with comprehensive experiment tracking"""
def __init__(self, experiment_name="my_experiment"):
mlflow.set_experiment(experiment_name)
self.run = None
def train(self, model, train_loader, val_loader, config):
"""Train with automatic logging"""
with mlflow.start_run() as run:
self.run = run
# Log hyperparameters
mlflow.log_params({
'learning_rate': config['lr'],
'batch_size': config['batch_size'],
'epochs': config['epochs'],
'optimizer': config['optimizer'],
'model_architecture': model.__class__.__name__
})
# Log model architecture
mlflow.set_tag("model_class", str(type(model)))
# Training loop
optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'])
criterion = nn.CrossEntropyLoss()
for epoch in range(config['epochs']):
# Training
model.train()
train_loss = 0
for batch_idx, (data, target) in enumerate(train_loader):
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
train_loss += loss.item()
# Validation
model.eval()
val_loss = 0
all_preds = []
all_targets = []
with torch.no_grad():
for data, target in val_loader:
output = model(data)
val_loss += criterion(output, target).item()
preds = output.argmax(dim=1)
all_preds.extend(preds.cpu().numpy())
all_targets.extend(target.cpu().numpy())
# Calculate metrics
accuracy = accuracy_score(all_targets, all_preds)
f1 = f1_score(all_targets, all_preds, average='weighted')
# Log metrics
mlflow.log_metrics({
'train_loss': train_loss / len(train_loader),
'val_loss': val_loss / len(val_loader),
'val_accuracy': accuracy,
'val_f1': f1
}, step=epoch)
print(f"Epoch {epoch}: val_acc={accuracy:.4f}, val_f1={f1:.4f}")
# ⚠️ Detect training anomalies
if epoch > 5 and accuracy < 0.1:
mlflow.set_tag("status", "failed_to_learn")
print("⚠️ Model not learning, aborting")
break
# Log final model
mlflow.pytorch.log_model(model, "model")
# Log model size
import os
model_path = "temp_model.pt"
torch.save(model.state_dict(), model_path)
model_size_mb = os.path.getsize(model_path) / 1e6
mlflow.log_metric("model_size_mb", model_size_mb)
os.remove(model_path)
# Register model if performance threshold met
if accuracy > 0.9:
model_uri = f"runs:/{run.info.run_id}/model"
mlflow.register_model(model_uri, "ProductionModel")
print("✅ Model registered for production")
else:
print(f"⚠️ Model accuracy {accuracy:.2%} below threshold, not registered")
return model
# Usage
trainer = MLflowTrainer("image_classification")
config = {
'lr': 0.001,
'batch_size': 64,
'epochs': 50,
'optimizer': 'Adam'
}
trained_model = trainer.train(model, train_loader, val_loader, config)
Kubeflow Pipeline
Orchestrate end-to-end ML workflow:
import kfp
from kfp import dsl
from kfp.components import func_to_container_op
@func_to_container_op
def preprocess_data(input_path: str, output_path: str) -> str:
"""Data preprocessing component"""
import pandas as pd
from sklearn.preprocessing import StandardScaler
import pickle
# Load raw data
df = pd.read_csv(input_path)
# Clean data
df = df.dropna()
df = df[df['value'] > 0] # Remove invalid values
# Feature engineering
df['feature_ratio'] = df['feature_a'] / (df['feature_b'] + 1e-8)
df['feature_log'] = np.log1p(df['feature_c'])
# Scale features
scaler = StandardScaler()
feature_cols = [c for c in df.columns if c != 'target']
df[feature_cols] = scaler.fit_transform(df[feature_cols])
# Save processed data and scaler
df.to_csv(output_path, index=False)
with open(f"{output_path}.scaler.pkl", 'wb') as f:
pickle.dump(scaler, f)
print(f"Processed {len(df)} samples")
return output_path
@func_to_container_op
def train_model(data_path: str, model_output_path: str, hyperparams: dict) -> str:
"""Training component"""
import pandas as pd
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
# Load data
df = pd.read_csv(data_path)
X = df.drop('target', axis=1).values
y = df['target'].values
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2)
# Define model
class SimpleNN(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super().__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, output_dim)
def forward(self, x):
x = torch.relu(self.fc1(x))
return self.fc2(x)
model = SimpleNN(X_train.shape[1], hyperparams['hidden_dim'], 10)
# Train
optimizer = torch.optim.Adam(model.parameters(), lr=hyperparams['lr'])
criterion = nn.CrossEntropyLoss()
for epoch in range(hyperparams['epochs']):
# Training code...
pass
# Save model
torch.save(model.state_dict(), model_output_path)
return model_output_path
@func_to_container_op
def evaluate_model(model_path: str, test_data_path: str) -> dict:
"""Evaluation component"""
import torch
import pandas as pd
from sklearn.metrics import accuracy_score, classification_report
# Load model and data
# ... evaluation code
metrics = {
'accuracy': 0.95,
'f1_score': 0.93
}
return metrics
@func_to_container_op
def deploy_model(model_path: str, metrics: dict, threshold: float = 0.9):
"""Conditional deployment based on metrics"""
if metrics['accuracy'] >= threshold:
# Deploy to production
print(f"✅ Deploying model (accuracy: {metrics['accuracy']})")
# kubectl apply -f deployment.yaml
# or TorchServe API call
else:
print(f"⚠️ Model accuracy {metrics['accuracy']} below threshold {threshold}")
raise Exception("Model performance insufficient for deployment")
@dsl.pipeline(
name='End-to-End ML Pipeline',
description='Automated training and deployment pipeline'
)
def ml_pipeline(
input_data_path: str = 's3://my-bucket/raw-data.csv',
model_registry: str = 's3://my-bucket/models/',
accuracy_threshold: float = 0.9
):
"""Complete ML pipeline"""
# Step 1: Preprocess
preprocess_task = preprocess_data(
input_path=input_data_path,
output_path='/tmp/processed_data.csv'
)
# Step 2: Train
hyperparams = {'lr': 0.001, 'hidden_dim': 128, 'epochs': 50}
train_task = train_model(
data_path=preprocess_task.output,
model_output_path='/tmp/model.pt',
hyperparams=hyperparams
)
train_task.after(preprocess_task)
# Step 3: Evaluate
eval_task = evaluate_model(
model_path=train_task.output,
test_data_path='/tmp/test_data.csv'
)
eval_task.after(train_task)
# Step 4: Deploy (conditional)
deploy_task = deploy_model(
model_path=train_task.output,
metrics=eval_task.output,
threshold=accuracy_threshold
)
deploy_task.after(eval_task)
# Compile and run pipeline
if __name__ == '__main__':
kfp.compiler.Compiler().compile(ml_pipeline, 'ml_pipeline.yaml')
client = kfp.Client(host='http://kubeflow.example.com')
run = client.create_run_from_pipeline_func(
ml_pipeline,
arguments={
'input_data_path': 's3://data/train.csv',
'accuracy_threshold': 0.92
}
)
Data Versioning with DVC
Version control for datasets:
# Initialize DVC
dvc init
# Track dataset
dvc add data/train.csv
git add data/train.csv.dvc data/.gitignore
git commit -m "Add training data v1"
# Configure remote storage
dvc remote add -d myremote s3://my-bucket/dvc-storage
dvc push
# Create data pipeline
dvc run -n preprocess \
-d data/raw.csv \
-o data/processed.csv \
python preprocess.py
# Reproduce pipeline
dvc repro
Continuous Training
Automatically retrain on new data:
import schedule
import time
from datetime import datetime
class ContinuousTrainer:
"""Automated retraining system"""
def __init__(self, data_source, model_registry):
self.data_source = data_source
self.model_registry = model_registry
self.last_training_time = None
self.performance_history = []
def check_data_drift(self):
"""Detect if retraining is needed"""
from scipy.stats import ks_2samp
# Load current production data vs new data
prod_data = load_production_data()
new_data = load_new_data(self.data_source)
# Kolmogorov-Smirnov test for distribution shift
for feature in prod_data.columns:
statistic, pvalue = ks_2samp(prod_data[feature], new_data[feature])
if pvalue < 0.05: # Significant drift detected
print(f"⚠️ Data drift detected in {feature} (p={pvalue:.4f})")
return True
return False
def check_performance_degradation(self):
"""Monitor production model performance"""
current_accuracy = get_production_accuracy()
if len(self.performance_history) > 0:
baseline_accuracy = self.performance_history[0]
if current_accuracy < baseline_accuracy * 0.95:
print(f"⚠️ Performance degradation: {current_accuracy:.2%} vs {baseline_accuracy:.2%}")
return True
self.performance_history.append(current_accuracy)
return False
def trigger_training(self):
"""Execute training pipeline"""
print(f"🔄 Triggering retraining at {datetime.now()}")
# Run Kubeflow pipeline
client = kfp.Client(host='http://kubeflow.example.com')
run = client.create_run_from_pipeline_func(ml_pipeline, arguments={})
self.last_training_time = datetime.now()
# Monitor pipeline execution
run.wait_for_run_completion(timeout=3600)
print("✅ Retraining complete")
def scheduled_check(self):
"""Periodic check for retraining triggers"""
print(f"Checking retraining conditions at {datetime.now()}")
if self.check_data_drift() or self.check_performance_degradation():
self.trigger_training()
else:
print("No retraining needed")
# Schedule daily checks
trainer = ContinuousTrainer(data_source='s3://bucket/data', model_registry='s3://bucket/models')
schedule.every().day.at("02:00").do(trainer.scheduled_check)
while True:
schedule.run_pending()
time.sleep(3600)
Warnings ⚠️
Pipeline Complexity: Multi-stage pipelines accumulate failure modes. The 2034 "Pipeline Cascade" occurred when 300-step ML pipelines became impossible to debug.
Hidden Dependencies: Data lineage tracking fails, causing silent data quality issues.
Automation Runaway: Continuous training without human oversight deployed progressively worse models for weeks before detection.
Related Chronicles: The MLOps Meltdown (2034) - Automated systems deploying broken models
Tools: Kubeflow, MLflow, DVC, Airflow, Prefect, Weights & Biases
Research: Continuous learning systems, online learning, model monitoring
Related Research
MLOps & Data Pipelines: The Backbone of Scalable AI Products
Why MLOps is critical for product success. A guide to CI/CD for ML, model monitoring, and data versioning.
Reproducible by Default: Why Your AI Eval Set Should Be Version-Controlled Like Code
NeurIPS and ICML now require artifact checklists. Enterprise AI teams should adopt the same discipline—version control your evaluation datasets or prepare for audit failures.
Production ML Model Serving: Deploying Models at Scale
Deploy ML models with low-latency inference—but cascading failures propagate quickly