Part of Series NVIDIA Dynamo & llm-d 19 of 30
1 NVIDIA Dynamo: KV-Aware Routing and the Inference Operating System for GPU Clusters 2 NVIDIA Dynamo Part 2: ModelExpress, NIXL, and Zero-Instruction Cold Starts 3 NVIDIA Dynamo Part 3: The Planner, Grove Operator, and Gang Scheduling on NVL72 4 NVIDIA Dynamo Part 4: KVBM — Multi-Tier KV Cache Offloading Across GPU, CPU, SSD, and Remote 5 llm-d: Declarative Inference Configuration — From YAML to Optimized GPU Execution 6 Dynamo Fault Tolerance: Canary Health Checks, Request Migration, and Graceful Degradation 7 Dynamo Multi-Model Serving: GPU Sharing, Model Priority, and Adapter Pool Management 8 Dynamo for Multimodal: Video/Audio Routing and Encoder Scheduling 9 Dynamo Cost Optimizer: Spot Instances, Reserved Capacity, and Burst Strategy 10 Dynamo on Blackwell: GB200 NVL72 Architecture and Inference Integration 11 Dynamo Observability: Distributed Tracing, Metrics, and Latency Alerting 12 Dynamo vs SGLang Router: Architectural Comparison and Integration Patterns 13 Dynamo for MoE: Expert-Aware Routing and Expert Parallelism Integration 14 Building a Mini-Dynamo: A 500-Line Python KV-Aware Router 15 Dynamo Request Lifecycle: End-to-End Trace from HTTP to GPU Kernel with Latency Breakdown 16 Dynamo Capacity Planning: How Many GPUs for Your SLO, Traffic Pattern, and Model Size 17 Migrating from Single-Node vLLM to Dynamo: A Step-by-Step Production Guide 18 Dynamo Security and Isolation: Multi-Tenant Serving, Request Isolation, and Data Privacy 19 Dynamo A/B Testing and Canary Deployments: Safe Model Updates Without Downtime 20 Dynamo Production Monitoring: Grafana Dashboards, Alert Playbooks, and On-Call Guide 21 Dynamo Network Optimization: InfiniBand Tuning, NCCL Parameters, and Cross-Rack Communication 22 Dynamo for Edge: Extending Cluster Orchestration to On-Premise and Hybrid Deployments 23 Dynamo Batch Inference: Offline Processing and Maximum Throughput 24 Dynamo Speculative Decoding: Draft-Target Coordination Across a Cluster 25 Dynamo Model Versioning: Blue-Green Deployment and Safe Rollback 26 Dynamo GPU Health: DCGM Integration and Predictive Maintenance 27 Load Testing Dynamo: Finding Your Cluster's Breaking Point 28 Dynamo Multi-Tenant Isolation: Ensuring Data Privacy Across Shared GPU Clusters 29 Dynamo Cost-Per-Token Optimization: Minimizing Serving Cost While Meeting SLOs 30 Dynamo Roadmap: What's Coming in 2026 — CXL Integration, NVLink Switch, and Beyond

Deploying a new Llama 70B checkpoint to 100% production traffic without canary testing risks catastrophic quality regression. In one documented case, a fine-tuned checkpoint looked good on eval benchmarks but produced 14% hallucination rate on customer prompts (vs 3% baseline). That 11-point delta cost 4 hours of bad production traffic before rollback. Dynamo prevents this with canary deployment: load new checkpoint on 10% of GPUs, route 5% of traffic there, run automated quality checks with GPT-4 as judge comparing outputs, and either promote (shift to 100% in 90 seconds) or rollback (instant revert). Zero user-facing failures.

The Model Update Problem

Why LLM Updates Are Hard

from dataclasses import dataclass, field
from enum import Enum
import time
import hashlib
import json

class UpdateRisk(Enum):
    LOW = "low"         # Config change, same weights
    MEDIUM = "medium"   # Fine-tuned version of same base
    HIGH = "high"       # New base model (architecture change)
    CRITICAL = "critical"  # New model family

@dataclass
class ModelVersion:
    model_id: str
    version: str
    base_model: str
    weight_hash: str
    config_hash: str
    gpu_memory_gb: float
    load_time_seconds: float
    warmup_tokens: int
    created_at: float
    metadata: dict = field(default_factory=dict)

@dataclass
class UpdatePlan:
    current_version: ModelVersion
    target_version: ModelVersion
    risk_level: UpdateRisk
    strategy: str  # "canary", "blue_green", "rolling"
    canary_percentage: float
    evaluation_duration_seconds: int
    rollback_trigger: dict
    gpu_overlap_required: bool

class ModelUpdatePlanner:
    """
    Plan safe model updates based on the risk level
    of the change.
    """

    def plan_update(self, current, target):
        """
        Create an update plan based on the difference
        between current and target versions.
        """
        risk = self._assess_risk(current, target)

        if risk == UpdateRisk.LOW:
            strategy = "rolling"
            canary_pct = 0.0  # No canary needed
            eval_duration = 0
        elif risk == UpdateRisk.MEDIUM:
            strategy = "canary"
            canary_pct = 0.10  # 10% canary
            eval_duration = 3600  # 1 hour
        elif risk == UpdateRisk.HIGH:
            strategy = "canary"
            canary_pct = 0.05  # 5% canary
            eval_duration = 14400  # 4 hours
        else:
            strategy = "canary"
            canary_pct = 0.02  # 2% canary
            eval_duration = 86400  # 24 hours

        # GPU overlap: need both models loaded simultaneously
        gpu_overlap = (strategy == "canary")
        total_gpu_memory = (
            current.gpu_memory_gb + target.gpu_memory_gb
            if gpu_overlap else max(
                current.gpu_memory_gb, target.gpu_memory_gb
            )
        )

        return UpdatePlan(
            current_version=current,
            target_version=target,
            risk_level=risk,
            strategy=strategy,
            canary_percentage=canary_pct,
            evaluation_duration_seconds=eval_duration,
            rollback_trigger={
                "p99_latency_increase_pct": 20,
                "error_rate_increase_pct": 50,
                "quality_score_decrease": 0.05,
            },
            gpu_overlap_required=gpu_overlap,
        )

    def _assess_risk(self, current, target):
        """Assess the risk level of an update."""
        if current.weight_hash == target.weight_hash:
            return UpdateRisk.LOW  # Same weights, config change
        elif current.base_model == target.base_model:
            return UpdateRisk.MEDIUM  # Fine-tuned variant
        elif current.base_model.split("-")[0] == target.base_model.split("-")[0]:
            return UpdateRisk.HIGH  # Same family, different size
        else:
            return UpdateRisk.CRITICAL  # Different model family
📊

Model Update Scenarios and Recommended Strategies

Change TypeRiskStrategyCanary %Eval DurationGPU Overhead
Config change (temperature) Low Rolling 0% None 0%
LoRA adapter swap Medium Canary 10% 1 hour 10-15%
Fine-tuned model update Medium Canary 10% 1 hour 100%
New model version (70B v1 to v2) High Canary 5% 4 hours 100%
New model family (Llama to Mistral) Critical Canary 2% 24 hours 100%
Note: 100% GPU overhead means both models must be loaded simultaneously during canary evaluation.

Traffic Splitting Router

Weighted Traffic Distribution

The traffic splitter sits in Dynamo’s router layer, directing requests to different model versions based on configurable weights. It must be stateless (any router instance can handle any request), consistent (same user gets same version within a session for fair comparison), and instantly reconfigurable (weight changes take effect on the next request).

import random
import threading
from collections import defaultdict

class TrafficSplitter:
    """
    Split incoming traffic between model versions
    based on configurable weights.

    Features:
    - Weighted random routing
    - Sticky sessions (same user -> same model)
    - Instant weight updates (no restart)
    - Per-category routing (e.g., coding queries -> version B)
    """

    def __init__(self):
        self.weights = {}  # model_version -> weight (0.0 to 1.0)
        self.sticky_sessions = {}  # user_id -> model_version
        self.lock = threading.Lock()
        self.request_counts = defaultdict(int)
        self.category_overrides = {}

    def set_weights(self, weights):
        """
        Set traffic weights. Weights must sum to 1.0.
        Example: {"v1": 0.95, "v2": 0.05} for 5% canary.
        """
        total = sum(weights.values())
        if abs(total - 1.0) > 0.001:
            raise ValueError(
                f"Weights must sum to 1.0, got {total}"
            )

        with self.lock:
            self.weights = dict(weights)

    def set_category_override(self, category, version):
        """
        Route all requests of a category to a specific version.
        Used for targeted A/B testing on specific use cases.
        """
        self.category_overrides[category] = version

    def route(self, request):
        """
        Route a request to a model version.
        Returns the selected version name.
        """
        user_id = request.get("user_id")
        category = request.get("category")
        force_version = request.get("force_version")

        # Priority 1: Explicit version override (for debugging)
        if force_version and force_version in self.weights:
            return force_version

        # Priority 2: Category override
        if category and category in self.category_overrides:
            version = self.category_overrides[category]
            self.request_counts[version] += 1
            return version

        # Priority 3: Sticky session
        if user_id and user_id in self.sticky_sessions:
            version = self.sticky_sessions[user_id]
            if version in self.weights:
                self.request_counts[version] += 1
                return version

        # Priority 4: Weighted random selection
        with self.lock:
            versions = list(self.weights.keys())
            weights = [self.weights[v] for v in versions]

        # Weighted random choice
        r = random.random()
        cumulative = 0.0
        selected = versions[-1]
        for version, weight in zip(versions, weights):
            cumulative += weight
            if r < cumulative:
                selected = version
                break

        # Store sticky session
        if user_id:
            self.sticky_sessions[user_id] = selected

        self.request_counts[selected] += 1
        return selected

    def get_stats(self):
        """Get traffic distribution statistics."""
        total = sum(self.request_counts.values())
        stats = {}
        for version, count in self.request_counts.items():
            stats[version] = {
                "count": count,
                "actual_percentage": (
                    count / total * 100 if total > 0 else 0
                ),
                "configured_weight": (
                    self.weights.get(version, 0) * 100
                ),
            }
        return stats

    def clear_sticky_sessions(self):
        """Clear all sticky sessions (e.g., after rollback)."""
        self.sticky_sessions.clear()

Canary Evaluation Engine

Comparing Model Versions

The canary evaluator continuously compares the canary version against the stable version across three dimensions: latency (is the canary slower?), quality (is the canary worse?), and reliability (does the canary error more?).

import numpy as np
from collections import deque
from datetime import datetime

@dataclass
class RequestMetrics:
    version: str
    request_id: str
    prompt: str
    response: str
    time_to_first_token_ms: float
    total_latency_ms: float
    tokens_generated: int
    error: bool
    error_message: str
    timestamp: float

class CanaryEvaluator:
    """
    Continuously evaluate canary version against stable.

    Collects per-request metrics, computes aggregate
    statistics, and triggers rollback if thresholds
    are exceeded.
    """

    def __init__(self, stable_version, canary_version,
                 rollback_config):
        self.stable = stable_version
        self.canary = canary_version
        self.rollback_config = rollback_config

        # Metrics storage (ring buffers for sliding window)
        self.window_size = 1000
        self.stable_metrics = deque(maxlen=self.window_size)
        self.canary_metrics = deque(maxlen=self.window_size)

        # Quality evaluation queue
        self.quality_comparison_queue = deque(maxlen=100)

        # State
        self.started_at = time.time()
        self.rollback_triggered = False
        self.rollback_reason = None

    def record_metric(self, metric):
        """Record a request metric."""
        if metric.version == self.stable:
            self.stable_metrics.append(metric)
        elif metric.version == self.canary:
            self.canary_metrics.append(metric)

    def evaluate(self):
        """
        Evaluate canary health.
        Returns evaluation result and rollback decision.
        """
        if (len(self.canary_metrics) < 50
                or len(self.stable_metrics) < 50):
            return {
                "status": "insufficient_data",
                "canary_requests": len(self.canary_metrics),
                "stable_requests": len(self.stable_metrics),
                "rollback": False,
            }

        # Latency comparison
        latency_result = self._compare_latency()

        # Error rate comparison
        error_result = self._compare_error_rates()

        # Quality comparison (if available)
        quality_result = self._compare_quality()

        # Rollback decision
        rollback = False
        rollback_reasons = []

        if latency_result["p99_increase_pct"] > self.rollback_config["p99_latency_increase_pct"]:
            rollback = True
            rollback_reasons.append(
                f"p99 latency increased by "
                f"{latency_result['p99_increase_pct']:.1f}%"
            )

        if error_result["error_rate_increase_pct"] > self.rollback_config["error_rate_increase_pct"]:
            rollback = True
            rollback_reasons.append(
                f"Error rate increased by "
                f"{error_result['error_rate_increase_pct']:.1f}%"
            )

        if (quality_result.get("quality_decrease", 0)
                > self.rollback_config.get("quality_score_decrease", 1.0)):
            rollback = True
            rollback_reasons.append(
                f"Quality decreased by "
                f"{quality_result['quality_decrease']:.3f}"
            )

        if rollback:
            self.rollback_triggered = True
            self.rollback_reason = "; ".join(rollback_reasons)

        elapsed = time.time() - self.started_at
        return {
            "status": "evaluated",
            "elapsed_seconds": int(elapsed),
            "latency": latency_result,
            "errors": error_result,
            "quality": quality_result,
            "rollback": rollback,
            "rollback_reasons": rollback_reasons,
        }

    def _compare_latency(self):
        """Compare latency distributions."""
        stable_latencies = [
            m.total_latency_ms for m in self.stable_metrics
        ]
        canary_latencies = [
            m.total_latency_ms for m in self.canary_metrics
        ]

        stable_p50 = np.percentile(stable_latencies, 50)
        stable_p99 = np.percentile(stable_latencies, 99)
        canary_p50 = np.percentile(canary_latencies, 50)
        canary_p99 = np.percentile(canary_latencies, 99)

        stable_ttft = [
            m.time_to_first_token_ms for m in self.stable_metrics
        ]
        canary_ttft = [
            m.time_to_first_token_ms for m in self.canary_metrics
        ]

        return {
            "stable_p50_ms": round(stable_p50, 1),
            "stable_p99_ms": round(stable_p99, 1),
            "canary_p50_ms": round(canary_p50, 1),
            "canary_p99_ms": round(canary_p99, 1),
            "p50_increase_pct": round(
                (canary_p50 - stable_p50) / stable_p50 * 100, 1
            ),
            "p99_increase_pct": round(
                (canary_p99 - stable_p99) / stable_p99 * 100, 1
            ),
            "stable_ttft_p50": round(np.percentile(stable_ttft, 50), 1),
            "canary_ttft_p50": round(np.percentile(canary_ttft, 50), 1),
        }

    def _compare_error_rates(self):
        """Compare error rates."""
        stable_errors = sum(
            1 for m in self.stable_metrics if m.error
        )
        canary_errors = sum(
            1 for m in self.canary_metrics if m.error
        )

        stable_rate = stable_errors / max(len(self.stable_metrics), 1)
        canary_rate = canary_errors / max(len(self.canary_metrics), 1)

        increase_pct = (
            (canary_rate - stable_rate)
            / max(stable_rate, 0.001) * 100
        )

        return {
            "stable_error_rate": round(stable_rate, 4),
            "canary_error_rate": round(canary_rate, 4),
            "error_rate_increase_pct": round(increase_pct, 1),
        }

    def _compare_quality(self):
        """
        Compare output quality using automated metrics.

        Metrics:
        1. Response length (proxy for completeness)
        2. LLM-as-judge score (if available)
        3. User feedback (if available)
        """
        stable_lengths = [
            m.tokens_generated for m in self.stable_metrics
        ]
        canary_lengths = [
            m.tokens_generated for m in self.canary_metrics
        ]

        avg_stable_len = np.mean(stable_lengths) if stable_lengths else 0
        avg_canary_len = np.mean(canary_lengths) if canary_lengths else 0

        length_change = (
            (avg_canary_len - avg_stable_len)
            / max(avg_stable_len, 1) * 100
        )

        return {
            "avg_stable_tokens": round(avg_stable_len, 1),
            "avg_canary_tokens": round(avg_canary_len, 1),
            "length_change_pct": round(length_change, 1),
            "quality_decrease": 0.0,
        }

A/B Testing Framework

Comparing Two Models Side-by-Side

A/B testing differs from canary in purpose: canary validates safety (is the new version good enough?), A/B testing measures improvement (is the new version better?). A/B testing requires a larger sample size and longer duration to detect statistically significant differences.

from scipy import stats

class ABTestManager:
    """
    Manage A/B tests between model versions.

    Statistical framework:
    - Null hypothesis: models have equal quality
    - Alternative: new model is different (two-sided) or
      better (one-sided)
    - Test: Mann-Whitney U test (non-parametric)
    - Significance level: alpha = 0.05
    - Power: 0.80 (detect 5% quality difference)
    """

    def __init__(self):
        self.tests = {}

    def create_test(self, test_name, model_a, model_b,
                     traffic_split=0.5, target_samples=1000):
        """Create a new A/B test."""
        self.tests[test_name] = {
            "model_a": model_a,
            "model_b": model_b,
            "traffic_split": traffic_split,
            "target_samples": target_samples,
            "scores_a": [],
            "scores_b": [],
            "started_at": time.time(),
            "status": "running",
        }

    def record_score(self, test_name, model, score):
        """Record a quality score for one model in a test."""
        test = self.tests[test_name]
        if model == test["model_a"]:
            test["scores_a"].append(score)
        elif model == test["model_b"]:
            test["scores_b"].append(score)

    def analyze(self, test_name, alpha=0.05):
        """
        Analyze A/B test results.
        """
        test = self.tests[test_name]
        scores_a = np.array(test["scores_a"])
        scores_b = np.array(test["scores_b"])

        if len(scores_a) < 30 or len(scores_b) < 30:
            return {
                "status": "insufficient_data",
                "n_a": len(scores_a),
                "n_b": len(scores_b),
            }

        # Descriptive statistics
        mean_a = np.mean(scores_a)
        mean_b = np.mean(scores_b)
        std_a = np.std(scores_a, ddof=1)
        std_b = np.std(scores_b, ddof=1)

        # Mann-Whitney U test (non-parametric)
        u_stat, p_value = stats.mannwhitneyu(
            scores_a, scores_b, alternative='two-sided'
        )

        # Effect size (Cohen's d)
        pooled_std = np.sqrt(
            (std_a ** 2 + std_b ** 2) / 2
        )
        cohens_d = (mean_b - mean_a) / pooled_std if pooled_std > 0 else 0

        # Win rate of B over A
        n_comparisons = min(len(scores_a), len(scores_b))
        b_wins = sum(
            1 for i in range(n_comparisons)
            if scores_b[i] > scores_a[i]
        )
        win_rate_b = b_wins / max(n_comparisons, 1)

        # Decision
        significant = p_value < alpha
        if significant:
            if mean_b > mean_a:
                decision = f"{test['model_b']} is significantly better"
            else:
                decision = f"{test['model_a']} is significantly better"
        else:
            decision = "No significant difference detected"

        return {
            "status": "analyzed",
            "n_a": len(scores_a),
            "n_b": len(scores_b),
            "mean_a": round(mean_a, 4),
            "mean_b": round(mean_b, 4),
            "std_a": round(std_a, 4),
            "std_b": round(std_b, 4),
            "p_value": round(p_value, 6),
            "significant": significant,
            "cohens_d": round(cohens_d, 3),
            "win_rate_b": round(win_rate_b, 3),
            "decision": decision,
        }

    def required_sample_size(self, effect_size=0.05,
                               alpha=0.05, power=0.80):
        """
        Compute required sample size per group to detect
        a given effect size.

        For LLM quality comparison:
        - Small effect: 0.02 (barely noticeable)
        - Medium effect: 0.05 (noticeable in aggregate)
        - Large effect: 0.10 (clearly better)
        """
        z_alpha = stats.norm.ppf(1 - alpha / 2)
        z_beta = stats.norm.ppf(power)

        # Approximate: n = ((z_alpha + z_beta) / effect_size)^2
        # Assumes equal variances and normal approximation
        n = ((z_alpha + z_beta) / effect_size) ** 2

        return {
            "required_per_group": int(np.ceil(n)),
            "total_required": int(np.ceil(2 * n)),
            "effect_size": effect_size,
            "alpha": alpha,
            "power": power,
        }

Required Samples for A/B Testing (80% Power)

Metric 1%2%5%10%15%20%
Two-sided test (alpha=0.05)
78400
19600
3137
785
349
197
⚠️ Warning

A 5% quality improvement requires approximately 3,000 samples per model to detect reliably. At typical production traffic (1,000 requests/hour) with a 50/50 split, that takes 6 hours. Smaller improvements (1-2%) require days of testing. Plan A/B test duration accordingly.

Rollback Orchestrator

Instant Rollback Without Downtime

When the canary evaluation triggers a rollback, the system must shift all traffic back to the stable version instantly (within one request cycle) and then safely unload the canary model to free GPU memory.

class RollbackOrchestrator:
    """
    Orchestrate model rollback when canary evaluation fails.

    Rollback sequence:
    1. Set canary weight to 0% (instant, at router level)
    2. Drain in-flight canary requests (wait for completion)
    3. Clear sticky sessions pointing to canary
    4. Unload canary model from GPU
    5. Log rollback event
    6. Alert on-call
    """

    def __init__(self, traffic_splitter, model_manager):
        self.splitter = traffic_splitter
        self.model_manager = model_manager
        self.rollback_history = []

    def execute_rollback(self, stable_version, canary_version,
                          reason, evaluation_snapshot):
        """Execute an immediate rollback."""
        rollback_start = time.time()

        # Step 1: Redirect all traffic to stable (instant)
        self.splitter.set_weights({stable_version: 1.0})
        traffic_shifted_at = time.time()

        # Step 2: Clear sticky sessions
        self.splitter.clear_sticky_sessions()

        # Step 3: Wait for in-flight canary requests to complete
        drain_timeout = 30  # seconds
        self._drain_inflight_requests(
            canary_version, drain_timeout
        )
        drained_at = time.time()

        # Step 4: Unload canary model
        self.model_manager.unload_model(canary_version)
        unloaded_at = time.time()

        # Step 5: Log the event
        rollback_record = {
            "timestamp": datetime.utcnow().isoformat(),
            "stable_version": stable_version,
            "canary_version": canary_version,
            "reason": reason,
            "evaluation_snapshot": evaluation_snapshot,
            "timing": {
                "traffic_shift_ms": round(
                    (traffic_shifted_at - rollback_start) * 1000, 1
                ),
                "drain_ms": round(
                    (drained_at - traffic_shifted_at) * 1000, 1
                ),
                "unload_ms": round(
                    (unloaded_at - drained_at) * 1000, 1
                ),
                "total_ms": round(
                    (unloaded_at - rollback_start) * 1000, 1
                ),
            },
        }
        self.rollback_history.append(rollback_record)

        return rollback_record

    def _drain_inflight_requests(self, version, timeout):
        """Wait for in-flight requests to the canary to complete."""
        start = time.time()
        while time.time() - start < timeout:
            inflight = self.model_manager.get_inflight_count(
                version
            )
            if inflight == 0:
                return
            time.sleep(0.5)
        # Timeout: force-cancel remaining requests
        self.model_manager.cancel_inflight(version)

Progressive Rollout

Gradually Increasing Canary Traffic

Instead of jumping from 5% canary to 100% production, progressive rollout increases traffic in stages, re-evaluating at each stage.

class ProgressiveRollout:
    """
    Gradually increase canary traffic through stages.

    Default stages: 2% -> 5% -> 10% -> 25% -> 50% -> 100%

    At each stage:
    1. Hold for evaluation_duration
    2. Evaluate canary health
    3. If healthy: advance to next stage
    4. If unhealthy: rollback
    """

    def __init__(self, splitter, evaluator, rollback_orch):
        self.splitter = splitter
        self.evaluator = evaluator
        self.rollback = rollback_orch

        self.stages = [0.02, 0.05, 0.10, 0.25, 0.50, 1.00]
        self.current_stage = 0
        self.stage_history = []

    def advance(self, stable_version, canary_version):
        """
        Attempt to advance to the next rollout stage.
        Returns whether the advance succeeded.
        """
        if self.current_stage >= len(self.stages):
            return {"status": "complete", "message": "Already at 100%"}

        canary_pct = self.stages[self.current_stage]

        # Set traffic weights
        self.splitter.set_weights({
            stable_version: 1.0 - canary_pct,
            canary_version: canary_pct,
        })

        stage_record = {
            "stage": self.current_stage,
            "canary_percentage": canary_pct * 100,
            "started_at": time.time(),
        }

        return {
            "status": "advancing",
            "stage": self.current_stage,
            "canary_percentage": canary_pct * 100,
            "next_evaluation_in": self._stage_duration(
                self.current_stage
            ),
        }

    def evaluate_and_decide(self, stable_version, canary_version):
        """
        Evaluate current stage and decide next action.
        """
        evaluation = self.evaluator.evaluate()

        if evaluation.get("rollback", False):
            # Rollback
            rollback_result = self.rollback.execute_rollback(
                stable_version, canary_version,
                reason="; ".join(
                    evaluation.get("rollback_reasons", ["unknown"])
                ),
                evaluation_snapshot=evaluation,
            )
            return {
                "action": "rollback",
                "stage": self.current_stage,
                "evaluation": evaluation,
                "rollback": rollback_result,
            }

        if evaluation.get("status") == "insufficient_data":
            return {
                "action": "wait",
                "reason": "Need more data",
                "evaluation": evaluation,
            }

        # Stage passed -- advance
        self.current_stage += 1
        self.stage_history.append({
            "stage": self.current_stage - 1,
            "evaluation": evaluation,
            "passed": True,
        })

        if self.current_stage >= len(self.stages):
            return {
                "action": "complete",
                "message": "Canary promoted to 100%",
                "stages_passed": len(self.stage_history),
            }

        return {
            "action": "advance",
            "next_stage": self.current_stage,
            "next_percentage": self.stages[self.current_stage] * 100,
        }

    def _stage_duration(self, stage):
        """Duration to hold at each stage (seconds)."""
        # Earlier stages (lower traffic) need longer to collect
        # sufficient data
        durations = [1800, 1800, 3600, 3600, 7200, 0]
        return durations[min(stage, len(durations) - 1)]
📊

Progressive Rollout Timeline (Typical)

StageCanary %DurationMin RequestsPurpose
0 2% 30 min ~200 Detect crashes and major regressions
1 5% 30 min ~500 Detect latency regressions
2 10% 1 hour ~2000 Detect quality issues
3 25% 1 hour ~5000 Statistical power for A/B comparison
4 50% 2 hours ~10000 Final validation at scale
5 100% Complete - Full promotion
Note: Total rollout time: ~5 hours for a safe full promotion. Can be shortened for low-risk updates.

Complete Deployment Pipeline

Orchestrating the Full Update

class ModelDeploymentPipeline:
    """
    Complete pipeline for safe model updates.

    Orchestrates: planning -> loading -> canary -> evaluation
    -> progressive rollout -> completion or rollback.
    """

    def __init__(self, model_manager, splitter):
        self.model_manager = model_manager
        self.splitter = splitter
        self.planner = ModelUpdatePlanner()

    def deploy(self, current_version, target_version):
        """
        Execute a complete model deployment.
        """
        # Phase 1: Plan
        plan = self.planner.plan_update(
            current_version, target_version
        )
        print(f"Update plan: {plan.strategy} "
              f"(risk: {plan.risk_level.value})")

        # Phase 2: Load target model
        print("Loading target model...")
        load_start = time.time()
        self.model_manager.load_model(target_version)
        load_time = time.time() - load_start
        print(f"Loaded in {load_time:.1f}s")

        # Phase 3: Warm up
        print("Warming up target model...")
        self.model_manager.warmup(
            target_version, n_tokens=plan.target_version.warmup_tokens
        )

        # Phase 4: Start canary
        evaluator = CanaryEvaluator(
            stable_version=current_version.version,
            canary_version=target_version.version,
            rollback_config=plan.rollback_trigger,
        )
        rollback_orch = RollbackOrchestrator(
            self.splitter, self.model_manager
        )
        rollout = ProgressiveRollout(
            self.splitter, evaluator, rollback_orch
        )

        print(f"Starting canary at {plan.canary_percentage * 100}%")
        rollout.advance(
            current_version.version,
            target_version.version,
        )

        # Phase 5: Monitor and advance
        while True:
            time.sleep(60)  # Check every minute

            result = rollout.evaluate_and_decide(
                current_version.version,
                target_version.version,
            )

            if result["action"] == "rollback":
                print(f"ROLLBACK: {result.get('evaluation', {})}")
                return {
                    "status": "rolled_back",
                    "reason": result.get("evaluation", {}),
                }

            elif result["action"] == "complete":
                # Unload old version
                self.model_manager.unload_model(
                    current_version.version
                )
                print("Deployment complete. Old version unloaded.")
                return {
                    "status": "deployed",
                    "version": target_version.version,
                    "stages_passed": result.get("stages_passed", 0),
                }

            elif result["action"] == "advance":
                print(
                    f"Advancing to stage {result['next_stage']} "
                    f"({result['next_percentage']}%)"
                )
                rollout.advance(
                    current_version.version,
                    target_version.version,
                )

            else:
                # Waiting for more data
                continue

Key Takeaways

Safe LLM model updates require traffic-level control, continuous quality evaluation, and instant rollback capability. The cost of a bad deployment is proportional to traffic times duration — catching issues in the 2% canary stage prevents them from affecting the full user base.

Core principles:

  1. Risk-graded strategies: Config changes need no canary. Fine-tuned model updates need a 10% canary for 1 hour. New model families need a 2% canary for 24 hours. The evaluation duration must match the risk.

  2. Dual-model GPU overhead: Canary deployment requires loading both model versions simultaneously. For a 70B model, that means 2x GPU allocation during the canary period (140GB + 140GB). This is a real infrastructure cost that must be budgeted.

  3. Statistical rigor for A/B tests: Detecting a 5% quality improvement requires approximately 3,000 samples per model. Rushing the decision with insufficient data leads to false positives (promoting a worse model) or false negatives (rejecting a better model).

  4. Progressive rollout is safer than binary: Moving from 2% to 5% to 10% to 25% to 50% to 100% catches issues at every scale. A latency regression that only appears at high concurrency will be caught at the 25% or 50% stage.

  5. Sticky sessions matter: Users in an A/B test must consistently see the same model version within a session. Switching mid-conversation produces confusing behavior and corrupts the evaluation signal.