Deploying a new Llama 70B checkpoint to 100% production traffic without canary testing risks catastrophic quality regression. In one documented case, a fine-tuned checkpoint looked good on eval benchmarks but produced 14% hallucination rate on customer prompts (vs 3% baseline). That 11-point delta cost 4 hours of bad production traffic before rollback. Dynamo prevents this with canary deployment: load new checkpoint on 10% of GPUs, route 5% of traffic there, run automated quality checks with GPT-4 as judge comparing outputs, and either promote (shift to 100% in 90 seconds) or rollback (instant revert). Zero user-facing failures.
The Model Update Problem
Why LLM Updates Are Hard
from dataclasses import dataclass, field
from enum import Enum
import time
import hashlib
import json
class UpdateRisk(Enum):
LOW = "low" # Config change, same weights
MEDIUM = "medium" # Fine-tuned version of same base
HIGH = "high" # New base model (architecture change)
CRITICAL = "critical" # New model family
@dataclass
class ModelVersion:
model_id: str
version: str
base_model: str
weight_hash: str
config_hash: str
gpu_memory_gb: float
load_time_seconds: float
warmup_tokens: int
created_at: float
metadata: dict = field(default_factory=dict)
@dataclass
class UpdatePlan:
current_version: ModelVersion
target_version: ModelVersion
risk_level: UpdateRisk
strategy: str # "canary", "blue_green", "rolling"
canary_percentage: float
evaluation_duration_seconds: int
rollback_trigger: dict
gpu_overlap_required: bool
class ModelUpdatePlanner:
"""
Plan safe model updates based on the risk level
of the change.
"""
def plan_update(self, current, target):
"""
Create an update plan based on the difference
between current and target versions.
"""
risk = self._assess_risk(current, target)
if risk == UpdateRisk.LOW:
strategy = "rolling"
canary_pct = 0.0 # No canary needed
eval_duration = 0
elif risk == UpdateRisk.MEDIUM:
strategy = "canary"
canary_pct = 0.10 # 10% canary
eval_duration = 3600 # 1 hour
elif risk == UpdateRisk.HIGH:
strategy = "canary"
canary_pct = 0.05 # 5% canary
eval_duration = 14400 # 4 hours
else:
strategy = "canary"
canary_pct = 0.02 # 2% canary
eval_duration = 86400 # 24 hours
# GPU overlap: need both models loaded simultaneously
gpu_overlap = (strategy == "canary")
total_gpu_memory = (
current.gpu_memory_gb + target.gpu_memory_gb
if gpu_overlap else max(
current.gpu_memory_gb, target.gpu_memory_gb
)
)
return UpdatePlan(
current_version=current,
target_version=target,
risk_level=risk,
strategy=strategy,
canary_percentage=canary_pct,
evaluation_duration_seconds=eval_duration,
rollback_trigger={
"p99_latency_increase_pct": 20,
"error_rate_increase_pct": 50,
"quality_score_decrease": 0.05,
},
gpu_overlap_required=gpu_overlap,
)
def _assess_risk(self, current, target):
"""Assess the risk level of an update."""
if current.weight_hash == target.weight_hash:
return UpdateRisk.LOW # Same weights, config change
elif current.base_model == target.base_model:
return UpdateRisk.MEDIUM # Fine-tuned variant
elif current.base_model.split("-")[0] == target.base_model.split("-")[0]:
return UpdateRisk.HIGH # Same family, different size
else:
return UpdateRisk.CRITICAL # Different model family
Model Update Scenarios and Recommended Strategies
| Change Type | Risk | Strategy | Canary % | Eval Duration | GPU Overhead |
|---|---|---|---|---|---|
| Config change (temperature) | Low | Rolling | 0% | None | 0% |
| LoRA adapter swap | Medium | Canary | 10% | 1 hour | 10-15% |
| Fine-tuned model update | Medium | Canary | 10% | 1 hour | 100% |
| New model version (70B v1 to v2) | High | Canary | 5% | 4 hours | 100% |
| New model family (Llama to Mistral) | Critical | Canary | 2% | 24 hours | 100% |
Traffic Splitting Router
Weighted Traffic Distribution
The traffic splitter sits in Dynamo’s router layer, directing requests to different model versions based on configurable weights. It must be stateless (any router instance can handle any request), consistent (same user gets same version within a session for fair comparison), and instantly reconfigurable (weight changes take effect on the next request).
import random
import threading
from collections import defaultdict
class TrafficSplitter:
"""
Split incoming traffic between model versions
based on configurable weights.
Features:
- Weighted random routing
- Sticky sessions (same user -> same model)
- Instant weight updates (no restart)
- Per-category routing (e.g., coding queries -> version B)
"""
def __init__(self):
self.weights = {} # model_version -> weight (0.0 to 1.0)
self.sticky_sessions = {} # user_id -> model_version
self.lock = threading.Lock()
self.request_counts = defaultdict(int)
self.category_overrides = {}
def set_weights(self, weights):
"""
Set traffic weights. Weights must sum to 1.0.
Example: {"v1": 0.95, "v2": 0.05} for 5% canary.
"""
total = sum(weights.values())
if abs(total - 1.0) > 0.001:
raise ValueError(
f"Weights must sum to 1.0, got {total}"
)
with self.lock:
self.weights = dict(weights)
def set_category_override(self, category, version):
"""
Route all requests of a category to a specific version.
Used for targeted A/B testing on specific use cases.
"""
self.category_overrides[category] = version
def route(self, request):
"""
Route a request to a model version.
Returns the selected version name.
"""
user_id = request.get("user_id")
category = request.get("category")
force_version = request.get("force_version")
# Priority 1: Explicit version override (for debugging)
if force_version and force_version in self.weights:
return force_version
# Priority 2: Category override
if category and category in self.category_overrides:
version = self.category_overrides[category]
self.request_counts[version] += 1
return version
# Priority 3: Sticky session
if user_id and user_id in self.sticky_sessions:
version = self.sticky_sessions[user_id]
if version in self.weights:
self.request_counts[version] += 1
return version
# Priority 4: Weighted random selection
with self.lock:
versions = list(self.weights.keys())
weights = [self.weights[v] for v in versions]
# Weighted random choice
r = random.random()
cumulative = 0.0
selected = versions[-1]
for version, weight in zip(versions, weights):
cumulative += weight
if r < cumulative:
selected = version
break
# Store sticky session
if user_id:
self.sticky_sessions[user_id] = selected
self.request_counts[selected] += 1
return selected
def get_stats(self):
"""Get traffic distribution statistics."""
total = sum(self.request_counts.values())
stats = {}
for version, count in self.request_counts.items():
stats[version] = {
"count": count,
"actual_percentage": (
count / total * 100 if total > 0 else 0
),
"configured_weight": (
self.weights.get(version, 0) * 100
),
}
return stats
def clear_sticky_sessions(self):
"""Clear all sticky sessions (e.g., after rollback)."""
self.sticky_sessions.clear()
Canary Evaluation Engine
Comparing Model Versions
The canary evaluator continuously compares the canary version against the stable version across three dimensions: latency (is the canary slower?), quality (is the canary worse?), and reliability (does the canary error more?).
import numpy as np
from collections import deque
from datetime import datetime
@dataclass
class RequestMetrics:
version: str
request_id: str
prompt: str
response: str
time_to_first_token_ms: float
total_latency_ms: float
tokens_generated: int
error: bool
error_message: str
timestamp: float
class CanaryEvaluator:
"""
Continuously evaluate canary version against stable.
Collects per-request metrics, computes aggregate
statistics, and triggers rollback if thresholds
are exceeded.
"""
def __init__(self, stable_version, canary_version,
rollback_config):
self.stable = stable_version
self.canary = canary_version
self.rollback_config = rollback_config
# Metrics storage (ring buffers for sliding window)
self.window_size = 1000
self.stable_metrics = deque(maxlen=self.window_size)
self.canary_metrics = deque(maxlen=self.window_size)
# Quality evaluation queue
self.quality_comparison_queue = deque(maxlen=100)
# State
self.started_at = time.time()
self.rollback_triggered = False
self.rollback_reason = None
def record_metric(self, metric):
"""Record a request metric."""
if metric.version == self.stable:
self.stable_metrics.append(metric)
elif metric.version == self.canary:
self.canary_metrics.append(metric)
def evaluate(self):
"""
Evaluate canary health.
Returns evaluation result and rollback decision.
"""
if (len(self.canary_metrics) < 50
or len(self.stable_metrics) < 50):
return {
"status": "insufficient_data",
"canary_requests": len(self.canary_metrics),
"stable_requests": len(self.stable_metrics),
"rollback": False,
}
# Latency comparison
latency_result = self._compare_latency()
# Error rate comparison
error_result = self._compare_error_rates()
# Quality comparison (if available)
quality_result = self._compare_quality()
# Rollback decision
rollback = False
rollback_reasons = []
if latency_result["p99_increase_pct"] > self.rollback_config["p99_latency_increase_pct"]:
rollback = True
rollback_reasons.append(
f"p99 latency increased by "
f"{latency_result['p99_increase_pct']:.1f}%"
)
if error_result["error_rate_increase_pct"] > self.rollback_config["error_rate_increase_pct"]:
rollback = True
rollback_reasons.append(
f"Error rate increased by "
f"{error_result['error_rate_increase_pct']:.1f}%"
)
if (quality_result.get("quality_decrease", 0)
> self.rollback_config.get("quality_score_decrease", 1.0)):
rollback = True
rollback_reasons.append(
f"Quality decreased by "
f"{quality_result['quality_decrease']:.3f}"
)
if rollback:
self.rollback_triggered = True
self.rollback_reason = "; ".join(rollback_reasons)
elapsed = time.time() - self.started_at
return {
"status": "evaluated",
"elapsed_seconds": int(elapsed),
"latency": latency_result,
"errors": error_result,
"quality": quality_result,
"rollback": rollback,
"rollback_reasons": rollback_reasons,
}
def _compare_latency(self):
"""Compare latency distributions."""
stable_latencies = [
m.total_latency_ms for m in self.stable_metrics
]
canary_latencies = [
m.total_latency_ms for m in self.canary_metrics
]
stable_p50 = np.percentile(stable_latencies, 50)
stable_p99 = np.percentile(stable_latencies, 99)
canary_p50 = np.percentile(canary_latencies, 50)
canary_p99 = np.percentile(canary_latencies, 99)
stable_ttft = [
m.time_to_first_token_ms for m in self.stable_metrics
]
canary_ttft = [
m.time_to_first_token_ms for m in self.canary_metrics
]
return {
"stable_p50_ms": round(stable_p50, 1),
"stable_p99_ms": round(stable_p99, 1),
"canary_p50_ms": round(canary_p50, 1),
"canary_p99_ms": round(canary_p99, 1),
"p50_increase_pct": round(
(canary_p50 - stable_p50) / stable_p50 * 100, 1
),
"p99_increase_pct": round(
(canary_p99 - stable_p99) / stable_p99 * 100, 1
),
"stable_ttft_p50": round(np.percentile(stable_ttft, 50), 1),
"canary_ttft_p50": round(np.percentile(canary_ttft, 50), 1),
}
def _compare_error_rates(self):
"""Compare error rates."""
stable_errors = sum(
1 for m in self.stable_metrics if m.error
)
canary_errors = sum(
1 for m in self.canary_metrics if m.error
)
stable_rate = stable_errors / max(len(self.stable_metrics), 1)
canary_rate = canary_errors / max(len(self.canary_metrics), 1)
increase_pct = (
(canary_rate - stable_rate)
/ max(stable_rate, 0.001) * 100
)
return {
"stable_error_rate": round(stable_rate, 4),
"canary_error_rate": round(canary_rate, 4),
"error_rate_increase_pct": round(increase_pct, 1),
}
def _compare_quality(self):
"""
Compare output quality using automated metrics.
Metrics:
1. Response length (proxy for completeness)
2. LLM-as-judge score (if available)
3. User feedback (if available)
"""
stable_lengths = [
m.tokens_generated for m in self.stable_metrics
]
canary_lengths = [
m.tokens_generated for m in self.canary_metrics
]
avg_stable_len = np.mean(stable_lengths) if stable_lengths else 0
avg_canary_len = np.mean(canary_lengths) if canary_lengths else 0
length_change = (
(avg_canary_len - avg_stable_len)
/ max(avg_stable_len, 1) * 100
)
return {
"avg_stable_tokens": round(avg_stable_len, 1),
"avg_canary_tokens": round(avg_canary_len, 1),
"length_change_pct": round(length_change, 1),
"quality_decrease": 0.0,
}
A/B Testing Framework
Comparing Two Models Side-by-Side
A/B testing differs from canary in purpose: canary validates safety (is the new version good enough?), A/B testing measures improvement (is the new version better?). A/B testing requires a larger sample size and longer duration to detect statistically significant differences.
from scipy import stats
class ABTestManager:
"""
Manage A/B tests between model versions.
Statistical framework:
- Null hypothesis: models have equal quality
- Alternative: new model is different (two-sided) or
better (one-sided)
- Test: Mann-Whitney U test (non-parametric)
- Significance level: alpha = 0.05
- Power: 0.80 (detect 5% quality difference)
"""
def __init__(self):
self.tests = {}
def create_test(self, test_name, model_a, model_b,
traffic_split=0.5, target_samples=1000):
"""Create a new A/B test."""
self.tests[test_name] = {
"model_a": model_a,
"model_b": model_b,
"traffic_split": traffic_split,
"target_samples": target_samples,
"scores_a": [],
"scores_b": [],
"started_at": time.time(),
"status": "running",
}
def record_score(self, test_name, model, score):
"""Record a quality score for one model in a test."""
test = self.tests[test_name]
if model == test["model_a"]:
test["scores_a"].append(score)
elif model == test["model_b"]:
test["scores_b"].append(score)
def analyze(self, test_name, alpha=0.05):
"""
Analyze A/B test results.
"""
test = self.tests[test_name]
scores_a = np.array(test["scores_a"])
scores_b = np.array(test["scores_b"])
if len(scores_a) < 30 or len(scores_b) < 30:
return {
"status": "insufficient_data",
"n_a": len(scores_a),
"n_b": len(scores_b),
}
# Descriptive statistics
mean_a = np.mean(scores_a)
mean_b = np.mean(scores_b)
std_a = np.std(scores_a, ddof=1)
std_b = np.std(scores_b, ddof=1)
# Mann-Whitney U test (non-parametric)
u_stat, p_value = stats.mannwhitneyu(
scores_a, scores_b, alternative='two-sided'
)
# Effect size (Cohen's d)
pooled_std = np.sqrt(
(std_a ** 2 + std_b ** 2) / 2
)
cohens_d = (mean_b - mean_a) / pooled_std if pooled_std > 0 else 0
# Win rate of B over A
n_comparisons = min(len(scores_a), len(scores_b))
b_wins = sum(
1 for i in range(n_comparisons)
if scores_b[i] > scores_a[i]
)
win_rate_b = b_wins / max(n_comparisons, 1)
# Decision
significant = p_value < alpha
if significant:
if mean_b > mean_a:
decision = f"{test['model_b']} is significantly better"
else:
decision = f"{test['model_a']} is significantly better"
else:
decision = "No significant difference detected"
return {
"status": "analyzed",
"n_a": len(scores_a),
"n_b": len(scores_b),
"mean_a": round(mean_a, 4),
"mean_b": round(mean_b, 4),
"std_a": round(std_a, 4),
"std_b": round(std_b, 4),
"p_value": round(p_value, 6),
"significant": significant,
"cohens_d": round(cohens_d, 3),
"win_rate_b": round(win_rate_b, 3),
"decision": decision,
}
def required_sample_size(self, effect_size=0.05,
alpha=0.05, power=0.80):
"""
Compute required sample size per group to detect
a given effect size.
For LLM quality comparison:
- Small effect: 0.02 (barely noticeable)
- Medium effect: 0.05 (noticeable in aggregate)
- Large effect: 0.10 (clearly better)
"""
z_alpha = stats.norm.ppf(1 - alpha / 2)
z_beta = stats.norm.ppf(power)
# Approximate: n = ((z_alpha + z_beta) / effect_size)^2
# Assumes equal variances and normal approximation
n = ((z_alpha + z_beta) / effect_size) ** 2
return {
"required_per_group": int(np.ceil(n)),
"total_required": int(np.ceil(2 * n)),
"effect_size": effect_size,
"alpha": alpha,
"power": power,
}
Required Samples for A/B Testing (80% Power)
| Metric | 1% | 2% | 5% | 10% | 15% | 20% |
|---|---|---|---|---|---|---|
| Two-sided test (alpha=0.05) |
A 5% quality improvement requires approximately 3,000 samples per model to detect reliably. At typical production traffic (1,000 requests/hour) with a 50/50 split, that takes 6 hours. Smaller improvements (1-2%) require days of testing. Plan A/B test duration accordingly.
Rollback Orchestrator
Instant Rollback Without Downtime
When the canary evaluation triggers a rollback, the system must shift all traffic back to the stable version instantly (within one request cycle) and then safely unload the canary model to free GPU memory.
class RollbackOrchestrator:
"""
Orchestrate model rollback when canary evaluation fails.
Rollback sequence:
1. Set canary weight to 0% (instant, at router level)
2. Drain in-flight canary requests (wait for completion)
3. Clear sticky sessions pointing to canary
4. Unload canary model from GPU
5. Log rollback event
6. Alert on-call
"""
def __init__(self, traffic_splitter, model_manager):
self.splitter = traffic_splitter
self.model_manager = model_manager
self.rollback_history = []
def execute_rollback(self, stable_version, canary_version,
reason, evaluation_snapshot):
"""Execute an immediate rollback."""
rollback_start = time.time()
# Step 1: Redirect all traffic to stable (instant)
self.splitter.set_weights({stable_version: 1.0})
traffic_shifted_at = time.time()
# Step 2: Clear sticky sessions
self.splitter.clear_sticky_sessions()
# Step 3: Wait for in-flight canary requests to complete
drain_timeout = 30 # seconds
self._drain_inflight_requests(
canary_version, drain_timeout
)
drained_at = time.time()
# Step 4: Unload canary model
self.model_manager.unload_model(canary_version)
unloaded_at = time.time()
# Step 5: Log the event
rollback_record = {
"timestamp": datetime.utcnow().isoformat(),
"stable_version": stable_version,
"canary_version": canary_version,
"reason": reason,
"evaluation_snapshot": evaluation_snapshot,
"timing": {
"traffic_shift_ms": round(
(traffic_shifted_at - rollback_start) * 1000, 1
),
"drain_ms": round(
(drained_at - traffic_shifted_at) * 1000, 1
),
"unload_ms": round(
(unloaded_at - drained_at) * 1000, 1
),
"total_ms": round(
(unloaded_at - rollback_start) * 1000, 1
),
},
}
self.rollback_history.append(rollback_record)
return rollback_record
def _drain_inflight_requests(self, version, timeout):
"""Wait for in-flight requests to the canary to complete."""
start = time.time()
while time.time() - start < timeout:
inflight = self.model_manager.get_inflight_count(
version
)
if inflight == 0:
return
time.sleep(0.5)
# Timeout: force-cancel remaining requests
self.model_manager.cancel_inflight(version)
Progressive Rollout
Gradually Increasing Canary Traffic
Instead of jumping from 5% canary to 100% production, progressive rollout increases traffic in stages, re-evaluating at each stage.
class ProgressiveRollout:
"""
Gradually increase canary traffic through stages.
Default stages: 2% -> 5% -> 10% -> 25% -> 50% -> 100%
At each stage:
1. Hold for evaluation_duration
2. Evaluate canary health
3. If healthy: advance to next stage
4. If unhealthy: rollback
"""
def __init__(self, splitter, evaluator, rollback_orch):
self.splitter = splitter
self.evaluator = evaluator
self.rollback = rollback_orch
self.stages = [0.02, 0.05, 0.10, 0.25, 0.50, 1.00]
self.current_stage = 0
self.stage_history = []
def advance(self, stable_version, canary_version):
"""
Attempt to advance to the next rollout stage.
Returns whether the advance succeeded.
"""
if self.current_stage >= len(self.stages):
return {"status": "complete", "message": "Already at 100%"}
canary_pct = self.stages[self.current_stage]
# Set traffic weights
self.splitter.set_weights({
stable_version: 1.0 - canary_pct,
canary_version: canary_pct,
})
stage_record = {
"stage": self.current_stage,
"canary_percentage": canary_pct * 100,
"started_at": time.time(),
}
return {
"status": "advancing",
"stage": self.current_stage,
"canary_percentage": canary_pct * 100,
"next_evaluation_in": self._stage_duration(
self.current_stage
),
}
def evaluate_and_decide(self, stable_version, canary_version):
"""
Evaluate current stage and decide next action.
"""
evaluation = self.evaluator.evaluate()
if evaluation.get("rollback", False):
# Rollback
rollback_result = self.rollback.execute_rollback(
stable_version, canary_version,
reason="; ".join(
evaluation.get("rollback_reasons", ["unknown"])
),
evaluation_snapshot=evaluation,
)
return {
"action": "rollback",
"stage": self.current_stage,
"evaluation": evaluation,
"rollback": rollback_result,
}
if evaluation.get("status") == "insufficient_data":
return {
"action": "wait",
"reason": "Need more data",
"evaluation": evaluation,
}
# Stage passed -- advance
self.current_stage += 1
self.stage_history.append({
"stage": self.current_stage - 1,
"evaluation": evaluation,
"passed": True,
})
if self.current_stage >= len(self.stages):
return {
"action": "complete",
"message": "Canary promoted to 100%",
"stages_passed": len(self.stage_history),
}
return {
"action": "advance",
"next_stage": self.current_stage,
"next_percentage": self.stages[self.current_stage] * 100,
}
def _stage_duration(self, stage):
"""Duration to hold at each stage (seconds)."""
# Earlier stages (lower traffic) need longer to collect
# sufficient data
durations = [1800, 1800, 3600, 3600, 7200, 0]
return durations[min(stage, len(durations) - 1)]
Progressive Rollout Timeline (Typical)
| Stage | Canary % | Duration | Min Requests | Purpose |
|---|---|---|---|---|
| 0 | 2% | 30 min | ~200 | Detect crashes and major regressions |
| 1 | 5% | 30 min | ~500 | Detect latency regressions |
| 2 | 10% | 1 hour | ~2000 | Detect quality issues |
| 3 | 25% | 1 hour | ~5000 | Statistical power for A/B comparison |
| 4 | 50% | 2 hours | ~10000 | Final validation at scale |
| 5 | 100% | Complete | - | Full promotion |
Complete Deployment Pipeline
Orchestrating the Full Update
class ModelDeploymentPipeline:
"""
Complete pipeline for safe model updates.
Orchestrates: planning -> loading -> canary -> evaluation
-> progressive rollout -> completion or rollback.
"""
def __init__(self, model_manager, splitter):
self.model_manager = model_manager
self.splitter = splitter
self.planner = ModelUpdatePlanner()
def deploy(self, current_version, target_version):
"""
Execute a complete model deployment.
"""
# Phase 1: Plan
plan = self.planner.plan_update(
current_version, target_version
)
print(f"Update plan: {plan.strategy} "
f"(risk: {plan.risk_level.value})")
# Phase 2: Load target model
print("Loading target model...")
load_start = time.time()
self.model_manager.load_model(target_version)
load_time = time.time() - load_start
print(f"Loaded in {load_time:.1f}s")
# Phase 3: Warm up
print("Warming up target model...")
self.model_manager.warmup(
target_version, n_tokens=plan.target_version.warmup_tokens
)
# Phase 4: Start canary
evaluator = CanaryEvaluator(
stable_version=current_version.version,
canary_version=target_version.version,
rollback_config=plan.rollback_trigger,
)
rollback_orch = RollbackOrchestrator(
self.splitter, self.model_manager
)
rollout = ProgressiveRollout(
self.splitter, evaluator, rollback_orch
)
print(f"Starting canary at {plan.canary_percentage * 100}%")
rollout.advance(
current_version.version,
target_version.version,
)
# Phase 5: Monitor and advance
while True:
time.sleep(60) # Check every minute
result = rollout.evaluate_and_decide(
current_version.version,
target_version.version,
)
if result["action"] == "rollback":
print(f"ROLLBACK: {result.get('evaluation', {})}")
return {
"status": "rolled_back",
"reason": result.get("evaluation", {}),
}
elif result["action"] == "complete":
# Unload old version
self.model_manager.unload_model(
current_version.version
)
print("Deployment complete. Old version unloaded.")
return {
"status": "deployed",
"version": target_version.version,
"stages_passed": result.get("stages_passed", 0),
}
elif result["action"] == "advance":
print(
f"Advancing to stage {result['next_stage']} "
f"({result['next_percentage']}%)"
)
rollout.advance(
current_version.version,
target_version.version,
)
else:
# Waiting for more data
continue
Key Takeaways
Safe LLM model updates require traffic-level control, continuous quality evaluation, and instant rollback capability. The cost of a bad deployment is proportional to traffic times duration — catching issues in the 2% canary stage prevents them from affecting the full user base.
Core principles:
-
Risk-graded strategies: Config changes need no canary. Fine-tuned model updates need a 10% canary for 1 hour. New model families need a 2% canary for 24 hours. The evaluation duration must match the risk.
-
Dual-model GPU overhead: Canary deployment requires loading both model versions simultaneously. For a 70B model, that means 2x GPU allocation during the canary period (140GB + 140GB). This is a real infrastructure cost that must be budgeted.
-
Statistical rigor for A/B tests: Detecting a 5% quality improvement requires approximately 3,000 samples per model. Rushing the decision with insufficient data leads to false positives (promoting a worse model) or false negatives (rejecting a better model).
-
Progressive rollout is safer than binary: Moving from 2% to 5% to 10% to 25% to 50% to 100% catches issues at every scale. A latency regression that only appears at high concurrency will be caught at the 25% or 50% stage.
-
Sticky sessions matter: Users in an A/B test must consistently see the same model version within a session. Switching mid-conversation produces confusing behavior and corrupts the evaluation signal.