Part of Series NVIDIA Dynamo & llm-d 9 of 30
1 NVIDIA Dynamo: KV-Aware Routing and the Inference Operating System for GPU Clusters 2 NVIDIA Dynamo Part 2: ModelExpress, NIXL, and Zero-Instruction Cold Starts 3 NVIDIA Dynamo Part 3: The Planner, Grove Operator, and Gang Scheduling on NVL72 4 NVIDIA Dynamo Part 4: KVBM β€” Multi-Tier KV Cache Offloading Across GPU, CPU, SSD, and Remote 5 llm-d: Declarative Inference Configuration β€” From YAML to Optimized GPU Execution 6 Dynamo Fault Tolerance: Canary Health Checks, Request Migration, and Graceful Degradation 7 Dynamo Multi-Model Serving: GPU Sharing, Model Priority, and Adapter Pool Management 8 Dynamo for Multimodal: Video/Audio Routing and Encoder Scheduling 9 Dynamo Cost Optimizer: Spot Instances, Reserved Capacity, and Burst Strategy 10 Dynamo on Blackwell: GB200 NVL72 Architecture and Inference Integration 11 Dynamo Observability: Distributed Tracing, Metrics, and Latency Alerting 12 Dynamo vs SGLang Router: Architectural Comparison and Integration Patterns 13 Dynamo for MoE: Expert-Aware Routing and Expert Parallelism Integration 14 Building a Mini-Dynamo: A 500-Line Python KV-Aware Router 15 Dynamo Request Lifecycle: End-to-End Trace from HTTP to GPU Kernel with Latency Breakdown 16 Dynamo Capacity Planning: How Many GPUs for Your SLO, Traffic Pattern, and Model Size 17 Migrating from Single-Node vLLM to Dynamo: A Step-by-Step Production Guide 18 Dynamo Security and Isolation: Multi-Tenant Serving, Request Isolation, and Data Privacy 19 Dynamo A/B Testing and Canary Deployments: Safe Model Updates Without Downtime 20 Dynamo Production Monitoring: Grafana Dashboards, Alert Playbooks, and On-Call Guide 21 Dynamo Network Optimization: InfiniBand Tuning, NCCL Parameters, and Cross-Rack Communication 22 Dynamo for Edge: Extending Cluster Orchestration to On-Premise and Hybrid Deployments 23 Dynamo Batch Inference: Offline Processing and Maximum Throughput 24 Dynamo Speculative Decoding: Draft-Target Coordination Across a Cluster 25 Dynamo Model Versioning: Blue-Green Deployment and Safe Rollback 26 Dynamo GPU Health: DCGM Integration and Predictive Maintenance 27 Load Testing Dynamo: Finding Your Cluster's Breaking Point 28 Dynamo Multi-Tenant Isolation: Ensuring Data Privacy Across Shared GPU Clusters 29 Dynamo Cost-Per-Token Optimization: Minimizing Serving Cost While Meeting SLOs 30 Dynamo Roadmap: What's Coming in 2026 β€” CXL Integration, NVLink Switch, and Beyond

A 24/7 Llama 70B deployment on 4Γ—8H100 nodes costs 100,800/monthonβˆ’demand.Switchingto100100,800/month on-demand. Switching to 100% spot reduces cost to 25,200/month but spot reclamation at peak traffic causes 40-second service gaps (unacceptable). The optimal hybrid: 2 reserved nodes for baseline traffic (52,800/month),2spotnodesforpeakhours(52,800/month), 2 spot nodes for peak hours (12,600/month), and on-demand bursting for traffic spikes (4,200/monthestimated).Total:4,200/month estimated). Total: 69,600/month (31% savings) with zero SLA violations. Dynamo’s cost optimizer implements this as a linear program that rebalances every 5 minutes based on traffic forecasts and spot pricing.

The Cost Model

Instance Types and Pricing

from dataclasses import dataclass

@dataclass
class InstanceType:
    name: str
    gpus: int
    gpu_type: str
    hourly_cost_ondemand: float
    hourly_cost_reserved_1yr: float
    hourly_cost_spot: float
    spot_reclaim_probability: float  # Per-hour probability of reclamation
    max_batch_size: int              # Max concurrent sequences
    prefill_throughput: float        # Tokens/sec for prefill
    decode_throughput: float         # Tokens/sec for decode

INSTANCE_TYPES = {
    "h100_8x": InstanceType(
        name="h100_8x",
        gpus=8,
        gpu_type="H100",
        hourly_cost_ondemand=32.77,
        hourly_cost_reserved_1yr=19.06,
        hourly_cost_spot=9.83,
        spot_reclaim_probability=0.05,
        max_batch_size=256,
        prefill_throughput=50000,
        decode_throughput=8000,
    ),
    "a100_8x": InstanceType(
        name="a100_8x",
        gpus=8,
        gpu_type="A100-80G",
        hourly_cost_ondemand=22.03,
        hourly_cost_reserved_1yr=13.22,
        hourly_cost_spot=6.61,
        spot_reclaim_probability=0.08,
        max_batch_size=128,
        prefill_throughput=30000,
        decode_throughput=5000,
    ),
}

The Optimization Objective

Minimize total hourly cost subject to throughput and latency constraints:

min⁑nr,ns,ndcrβ‹…nr+csβ‹…ns+cdβ‹…nd\min_{n_r, n_s, n_d} \quad c_r \cdot n_r + c_s \cdot n_s + c_d \cdot n_d

Subject to: T(nr,ns,nd)β‰₯TrequiredT(n_r, n_s, n_d) \geq T_{\text{required}} TTFTP95(nr,ns,nd)≀SLOTTFT\text{TTFT}_{P95}(n_r, n_s, n_d) \leq \text{SLO}_{\text{TTFT}} TBTP95(nr,ns,nd)≀SLOTBT\text{TBT}_{P95}(n_r, n_s, n_d) \leq \text{SLO}_{\text{TBT}} nrβ‰₯0,nsβ‰₯0,ndβ‰₯0n_r \geq 0, \quad n_s \geq 0, \quad n_d \geq 0

where nr,ns,ndn_r, n_s, n_d are the number of reserved, spot, and on-demand instances, cr,cs,cdc_r, c_s, c_d are their hourly costs, TT is total throughput, and the SLO constraints ensure latency targets are met.

πŸ“Š

Monthly Cost Comparison: Llama 70B Serving (4 nodes, 24/7)

StrategyReservedOn-DemandSpotMonthly CostSavings
All on-demand 0 4 0 $94,700 Baseline
All reserved (1yr) 4 0 0 $55,000 42%
Reserved + spot burst 2 0 2 (peak only) $44,200 53%
Hybrid optimized 2 0.5 avg 1.5 avg $38,500 59%
Note: Peak traffic: 8 hours/day requiring 4 nodes. Off-peak: 2 nodes sufficient. Spot pricing assumes 70% discount with 5% hourly reclaim probability.

Traffic Pattern Analysis

Demand Profiling

The cost optimizer needs a model of expected traffic. Most LLM workloads show predictable daily and weekly patterns:

import numpy as np
from collections import deque

class DemandProfiler:
    """Profile traffic patterns to predict future demand."""

    def __init__(self, history_days=14, resolution_minutes=5):
        self.history_days = history_days
        self.resolution = resolution_minutes
        self.bins_per_day = 24 * 60 // resolution_minutes

        # Store historical demand as [day_of_week][time_bin] -> [observations]
        self.history = [[[] for _ in range(self.bins_per_day)] for _ in range(7)]

    def record(self, timestamp, rps, avg_prompt_len, avg_output_len):
        """Record a traffic observation."""
        import datetime
        dt = datetime.datetime.fromtimestamp(timestamp)
        day = dt.weekday()
        bin_idx = (dt.hour * 60 + dt.minute) // self.resolution

        self.history[day][bin_idx].append({
            'rps': rps,
            'prompt_len': avg_prompt_len,
            'output_len': avg_output_len,
            'tokens_per_sec': rps * (avg_prompt_len + avg_output_len),
        })

    def predict(self, timestamp):
        """Predict demand for a future time."""
        import datetime
        dt = datetime.datetime.fromtimestamp(timestamp)
        day = dt.weekday()
        bin_idx = (dt.hour * 60 + dt.minute) // self.resolution

        observations = self.history[day][bin_idx]
        if not observations:
            return self._global_average()

        # Use P75 as the planning target (handle typical peaks)
        rps_values = [o['rps'] for o in observations]
        tps_values = [o['tokens_per_sec'] for o in observations]

        return {
            'rps_expected': np.percentile(rps_values, 50),
            'rps_p75': np.percentile(rps_values, 75),
            'rps_p95': np.percentile(rps_values, 95),
            'tps_expected': np.percentile(tps_values, 50),
            'tps_p75': np.percentile(tps_values, 75),
            'tps_p95': np.percentile(tps_values, 95),
        }

    def _global_average(self):
        all_rps = []
        for day in self.history:
            for time_bin in day:
                for obs in time_bin:
                    all_rps.append(obs['rps'])
        if not all_rps:
            return {'rps_expected': 1, 'rps_p75': 2, 'rps_p95': 5,
                    'tps_expected': 1000, 'tps_p75': 2000, 'tps_p95': 5000}
        return {
            'rps_expected': np.median(all_rps),
            'rps_p75': np.percentile(all_rps, 75),
            'rps_p95': np.percentile(all_rps, 95),
            'tps_expected': np.median(all_rps) * 1500,
            'tps_p75': np.percentile(all_rps, 75) * 1500,
            'tps_p95': np.percentile(all_rps, 95) * 1500,
        }

Capacity Planning

class CapacityPlanner:
    """Compute required instances from demand forecast."""

    def __init__(self, instance_type, slo_config):
        self.instance = instance_type
        self.slo = slo_config

    def compute_required_instances(self, demand):
        """Compute minimum instances to meet demand with SLOs."""
        tps_required = demand['tps_p75']

        # Each instance can handle max_decode_throughput tokens/sec
        # But actual throughput depends on batch utilization
        # At SLO-constrained operation, typical utilization is 70%
        effective_tps_per_instance = self.instance.decode_throughput * 0.70

        # Also consider prefill overhead: each request needs prefill time
        # which reduces decode capacity
        rps = demand['rps_p75']
        avg_prompt_tokens = 1500  # Typical
        prefill_overhead_fraction = (
            rps * avg_prompt_tokens / self.instance.prefill_throughput
        )

        adjusted_tps = effective_tps_per_instance * (1 - prefill_overhead_fraction)
        adjusted_tps = max(adjusted_tps, effective_tps_per_instance * 0.3)

        instances_for_throughput = int(np.ceil(tps_required / adjusted_tps))

        # Also check: enough instances for request concurrency
        avg_active_requests = rps * (avg_prompt_tokens + 500) / adjusted_tps
        instances_for_concurrency = int(np.ceil(
            avg_active_requests / self.instance.max_batch_size
        ))

        return max(instances_for_throughput, instances_for_concurrency, 1)

The Hybrid Strategy

Three Tiers of Capacity

class HybridCapacityStrategy:
    """Allocate capacity across reserved, spot, and on-demand tiers."""

    def __init__(self, instance_type, demand_profiler):
        self.instance = instance_type
        self.profiler = demand_profiler

    def compute_allocation(self, planning_horizon_days=30):
        """Compute optimal allocation across tiers."""
        # Analyze demand across all time periods
        demands = []
        import datetime
        now = datetime.datetime.now()

        for day_offset in range(planning_horizon_days):
            for hour in range(24):
                for minute_bin in range(0, 60, 5):
                    ts = (now + datetime.timedelta(
                        days=day_offset, hours=hour, minutes=minute_bin
                    )).timestamp()
                    demands.append(self.profiler.predict(ts))

        # Compute capacity needs at different percentiles
        all_tps = [d['tps_p75'] for d in demands]
        eff_tps = self.instance.decode_throughput * 0.70

        instances_needed = [int(np.ceil(tps / eff_tps)) for tps in all_tps]

        # Reserved: covers the baseline (P25 of demand)
        # This capacity is needed 75%+ of the time
        reserved_count = int(np.percentile(instances_needed, 25))

        # Spot: covers typical peaks (P25 to P75)
        # Cheap but can be reclaimed
        peak_75 = int(np.percentile(instances_needed, 75))
        spot_count = peak_75 - reserved_count

        # On-demand: covers extreme peaks (P75 to P99)
        # Expensive but guaranteed
        peak_99 = int(np.percentile(instances_needed, 99))
        ondemand_max = peak_99 - peak_75

        # Compute costs
        hours_per_month = planning_horizon_days * 24
        reserved_cost = reserved_count * self.instance.hourly_cost_reserved_1yr * hours_per_month
        spot_cost = spot_count * self.instance.hourly_cost_spot * hours_per_month * 0.6  # 60% utilization
        ondemand_cost = ondemand_max * self.instance.hourly_cost_ondemand * hours_per_month * 0.1  # 10% utilization

        return {
            'reserved': reserved_count,
            'spot': spot_count,
            'ondemand_max': ondemand_max,
            'monthly_cost': reserved_cost + spot_cost + ondemand_cost,
            'cost_breakdown': {
                'reserved': reserved_cost,
                'spot': spot_cost,
                'ondemand': ondemand_cost,
            },
        }

Spot Instance Management

Preemption Handling

Cloud providers give a 2-minute warning before reclaiming spot instances. Dynamo must gracefully handle this:

class SpotInstanceManager:
    """Handle spot instance lifecycle including preemption."""

    def __init__(self, dynamo_router, kv_cache_manager):
        self.router = dynamo_router
        self.kv_manager = kv_cache_manager
        self.spot_instances = {}

    def register_spot(self, instance_id, worker):
        """Register a new spot instance."""
        self.spot_instances[instance_id] = {
            'worker': worker,
            'status': 'active',
            'active_requests': set(),
        }
        self.router.add_worker(worker)

    def on_preemption_warning(self, instance_id):
        """Handle 2-minute preemption warning."""
        if instance_id not in self.spot_instances:
            return

        spot = self.spot_instances[instance_id]
        spot['status'] = 'draining'

        # Step 1: Stop routing new requests to this instance
        self.router.remove_worker(spot['worker'])

        # Step 2: Migrate active requests to other instances
        active_requests = list(spot['active_requests'])

        for request_id in active_requests:
            self._migrate_request(request_id, spot['worker'])

        # Step 3: Transfer valuable KV cache blocks
        self._transfer_kv_cache(spot['worker'])

    def _migrate_request(self, request_id, from_worker):
        """Migrate an active request to another worker."""
        # Find the best target worker
        target = self.router.find_best_worker_excluding(from_worker)

        if target is None:
            # No available workers -- request will be requeued
            from_worker.cancel_request(request_id)
            self.router.requeue_request(request_id)
            return

        # Get the request's current state
        request_state = from_worker.get_request_state(request_id)

        # Option A: Transfer KV cache and resume (fast if NVLink)
        if self._can_transfer_kv(from_worker, target):
            kv_blocks = from_worker.get_kv_blocks(request_id)
            target.receive_kv_blocks(kv_blocks)
            target.resume_request(request_id, request_state)
        else:
            # Option B: Re-prefill on target (slower but always works)
            from_worker.cancel_request(request_id)
            target.restart_request(request_id, request_state)

    def _transfer_kv_cache(self, from_worker):
        """Transfer hot KV cache blocks to surviving workers."""
        # Identify frequently accessed prefixes
        hot_blocks = from_worker.get_hot_kv_blocks(limit=100)

        # Distribute to workers that serve similar traffic
        for block_hash, block_data in hot_blocks:
            # Find workers that would benefit from this prefix
            workers_needing = self.router.kv_index.find_workers_for_prefix(block_hash)
            if workers_needing:
                target = workers_needing[0]
                target.receive_kv_block(block_hash, block_data)

    def _can_transfer_kv(self, source, target):
        """Check if fast KV transfer is possible."""
        # NVLink or same-node PCIe: fast transfer
        # Cross-node: too slow for 2-minute window
        return source.node_id == target.node_id

Request Draining Strategy

When a spot instance is being reclaimed, ongoing decode requests should be drained gracefully:

class SpotDrainStrategy:
    """Strategies for draining requests from preempted spot instances."""

    def __init__(self, strategy="migrate_top_n"):
        self.strategy = strategy

    def drain(self, spot_worker, available_workers, time_budget_seconds=90):
        """Drain requests from spot worker within time budget."""
        requests = spot_worker.get_active_requests()

        if self.strategy == "migrate_top_n":
            # Migrate requests that are closest to completion first
            requests.sort(key=lambda r: r.remaining_tokens)

            migrated = 0
            for request in requests:
                if time.time() > spot_worker.preemption_deadline - 30:
                    break  # Reserve 30s for cleanup

                if request.remaining_tokens < 50:
                    # Almost done -- let it finish on the spot instance
                    continue

                # Migrate
                target = self._find_target(request, available_workers)
                if target:
                    self._migrate(request, spot_worker, target)
                    migrated += 1

            return migrated

        elif self.strategy == "let_short_finish":
            # Let short requests finish, migrate long ones
            short_threshold = 100  # tokens

            for request in requests:
                if request.remaining_tokens <= short_threshold:
                    continue  # Will finish before preemption
                target = self._find_target(request, available_workers)
                if target:
                    self._migrate(request, spot_worker, target)

    def _find_target(self, request, workers):
        """Find the best target for migration."""
        # Prefer worker with cached prefix
        best = None
        best_overlap = -1
        for worker in workers:
            overlap = worker.kv_overlap(request)
            if overlap > best_overlap:
                best_overlap = overlap
                best = worker
        return best
⚠️ Spot Preemption Is Real

Cloud spot instance reclamation is not theoretical. During popular GPU periods (conference deadlines, product launches), spot reclamation rates can spike to 15-20% per hour. Your system must handle graceful degradation: when spot capacity disappears, fall back to fewer instances at reduced throughput rather than dropping requests.

The Cost Optimizer

Linear Programming Formulation

import numpy as np
from scipy.optimize import linprog

class CostOptimizer:
    """
    Optimize GPU allocation across reserved, spot, and on-demand.
    Runs every 5 minutes based on current and predicted demand.
    """

    def __init__(self, instance_type, slo_config):
        self.instance = instance_type
        self.slo = slo_config

    def optimize(self, current_demand, predicted_demand_1h):
        """
        Find minimum-cost allocation meeting SLOs.

        Variables: [n_reserved, n_spot, n_ondemand]
        """
        # Costs per instance per hour
        c = np.array([
            self.instance.hourly_cost_reserved_1yr,
            self.instance.hourly_cost_spot,
            self.instance.hourly_cost_ondemand,
        ])

        # Throughput per instance (decode tokens/sec)
        tps_per_instance = self.instance.decode_throughput * 0.70

        # Constraint 1: Meet current throughput demand
        # n_r * tps + n_s * tps + n_d * tps >= demand_tps
        # Spot instances have discounted reliability
        spot_reliability = 1.0 - self.instance.spot_reclaim_probability
        A_throughput = -np.array([
            tps_per_instance,
            tps_per_instance * spot_reliability,
            tps_per_instance,
        ])
        b_throughput = -current_demand['tps_p75']

        # Constraint 2: Meet predicted demand (1h ahead)
        A_predicted = -np.array([
            tps_per_instance,
            tps_per_instance * spot_reliability,
            tps_per_instance,
        ])
        b_predicted = -predicted_demand_1h['tps_p95']

        # Constraint 3: Minimum instances for latency SLO
        # At least 1 instance must be non-spot (for SLO guarantee)
        A_reliability = np.array([[-1, 0, -1]])  # -(n_r + n_d) <= -1
        b_reliability = np.array([-1])

        # Stack constraints
        A_ub = np.vstack([
            A_throughput.reshape(1, -1),
            A_predicted.reshape(1, -1),
            A_reliability,
        ])
        b_ub = np.array([b_throughput, b_predicted, b_reliability[0]])

        # Bounds: n >= 0, n_reserved <= current reserved count (cannot change instantly)
        bounds = [
            (0, None),  # reserved (already committed)
            (0, None),  # spot (can scale quickly)
            (0, None),  # on-demand (can scale quickly)
        ]

        result = linprog(c, A_ub=A_ub, b_ub=b_ub, bounds=bounds, method='highs')

        if result.success:
            n_r, n_s, n_d = result.x
            return {
                'reserved': int(np.ceil(n_r)),
                'spot': int(np.ceil(n_s)),
                'ondemand': int(np.ceil(n_d)),
                'hourly_cost': result.fun,
                'total_instances': int(np.ceil(n_r + n_s + n_d)),
            }
        else:
            # Fallback: scale on-demand to meet demand
            n_total = int(np.ceil(current_demand['tps_p95'] / tps_per_instance))
            return {
                'reserved': 0, 'spot': 0, 'ondemand': n_total,
                'hourly_cost': n_total * self.instance.hourly_cost_ondemand,
                'total_instances': n_total,
            }

Rolling Optimization

The optimizer runs periodically, adjusting allocation based on demand changes:

class RollingCostOptimizer:
    """Continuously optimize cost allocation."""

    def __init__(self, optimizer, profiler, interval_seconds=300):
        self.optimizer = optimizer
        self.profiler = profiler
        self.interval = interval_seconds
        self.current_allocation = None
        self.allocation_history = []

    def run_step(self):
        """Run one optimization step."""
        import time as t
        now = t.time()

        # Get current demand
        current = self._measure_current_demand()

        # Predict demand 1 hour ahead
        predicted = self.profiler.predict(now + 3600)

        # Optimize
        new_allocation = self.optimizer.optimize(current, predicted)

        # Apply changes (with damping to avoid thrashing)
        if self.current_allocation:
            damped = self._damp_changes(self.current_allocation, new_allocation)
        else:
            damped = new_allocation

        # Execute scaling actions
        self._apply_allocation(damped)

        self.current_allocation = damped
        self.allocation_history.append({
            'timestamp': now,
            'allocation': damped,
            'demand': current,
        })

    def _damp_changes(self, current, proposed):
        """Prevent rapid oscillation in allocation."""
        damped = {}
        for key in ['reserved', 'spot', 'ondemand']:
            curr = current.get(key, 0)
            prop = proposed.get(key, 0)
            # Max change per step: +2 instances or -1 instance
            if prop > curr:
                damped[key] = min(prop, curr + 2)
            elif prop < curr:
                damped[key] = max(prop, curr - 1)
            else:
                damped[key] = curr
        damped['hourly_cost'] = sum(
            damped[k] * getattr(self.optimizer.instance, f'hourly_cost_{"reserved_1yr" if k == "reserved" else k}')
            for k in ['reserved', 'spot', 'ondemand']
        )
        return damped

    def _apply_allocation(self, allocation):
        """Scale instances to match allocation."""
        # Reserved: no action (already committed long-term)
        # Spot: request or release spot instances
        # On-demand: launch or terminate on-demand instances
        pass  # Integration with cloud API

Cost Monitoring and Reporting

class CostReporter:
    """Track and report serving costs."""

    def __init__(self):
        self.hourly_costs = []
        self.instance_hours = {'reserved': 0, 'spot': 0, 'ondemand': 0}
        self.tokens_generated = 0
        self.requests_served = 0

    def record_interval(self, allocation, tokens, requests, duration_hours):
        cost = (
            allocation['reserved'] * INSTANCE_TYPES['h100_8x'].hourly_cost_reserved_1yr +
            allocation['spot'] * INSTANCE_TYPES['h100_8x'].hourly_cost_spot +
            allocation['ondemand'] * INSTANCE_TYPES['h100_8x'].hourly_cost_ondemand
        ) * duration_hours

        self.hourly_costs.append(cost / duration_hours)
        self.instance_hours['reserved'] += allocation['reserved'] * duration_hours
        self.instance_hours['spot'] += allocation['spot'] * duration_hours
        self.instance_hours['ondemand'] += allocation['ondemand'] * duration_hours
        self.tokens_generated += tokens
        self.requests_served += requests

    def report(self):
        total_cost = sum(self.hourly_costs)
        return {
            'total_cost': total_cost,
            'avg_hourly_cost': np.mean(self.hourly_costs) if self.hourly_costs else 0,
            'cost_per_1M_tokens': (
                total_cost / (self.tokens_generated / 1e6)
                if self.tokens_generated > 0 else 0
            ),
            'cost_per_request': (
                total_cost / self.requests_served
                if self.requests_served > 0 else 0
            ),
            'instance_hours': self.instance_hours,
            'spot_fraction': (
                self.instance_hours['spot'] /
                sum(self.instance_hours.values())
                if sum(self.instance_hours.values()) > 0 else 0
            ),
        }

Cost Savings: Hybrid Strategy vs. All-OnDemand (Monthly, 4-node avg)

($/month)
All on-demand
94,700 $/month
All reserved
55,000 $/month
Reserved + spot
44,200 $/month
Fully optimized 59% savings
38,500 $/month

Complete Cost Optimizer System

class DynamoCostOptimizer:
    """
    Complete cost optimization system for Dynamo.
    Manages hybrid capacity allocation with graceful spot preemption.
    """

    def __init__(self, config):
        self.instance_type = INSTANCE_TYPES[config.instance_type]
        self.slo = config.slo_config

        # Components
        self.profiler = DemandProfiler()
        self.optimizer = CostOptimizer(self.instance_type, self.slo)
        self.rolling = RollingCostOptimizer(self.optimizer, self.profiler)
        self.spot_manager = SpotInstanceManager(
            dynamo_router=config.router,
            kv_cache_manager=config.kv_manager,
        )
        self.reporter = CostReporter()

        # State
        self.current_instances = {
            'reserved': [],
            'spot': [],
            'ondemand': [],
        }

    def start(self):
        """Start the cost optimizer loop."""
        import threading

        # Optimization loop
        def optimize_loop():
            while True:
                self.rolling.run_step()
                self._apply_allocation_changes()
                time.sleep(300)  # Every 5 minutes

        # Spot monitoring loop
        def spot_monitor_loop():
            while True:
                self._check_spot_health()
                time.sleep(10)  # Every 10 seconds

        threading.Thread(target=optimize_loop, daemon=True).start()
        threading.Thread(target=spot_monitor_loop, daemon=True).start()

    def _apply_allocation_changes(self):
        """Apply allocation changes from optimizer."""
        target = self.rolling.current_allocation
        if not target:
            return

        # Scale spot instances
        current_spot = len(self.current_instances['spot'])
        target_spot = target['spot']

        if target_spot > current_spot:
            for _ in range(target_spot - current_spot):
                instance = self._launch_spot_instance()
                if instance:
                    self.current_instances['spot'].append(instance)
        elif target_spot < current_spot:
            for _ in range(current_spot - target_spot):
                instance = self.current_instances['spot'].pop()
                self._terminate_gracefully(instance)

        # Scale on-demand similarly
        current_od = len(self.current_instances['ondemand'])
        target_od = target['ondemand']

        if target_od > current_od:
            for _ in range(target_od - current_od):
                instance = self._launch_ondemand_instance()
                if instance:
                    self.current_instances['ondemand'].append(instance)
        elif target_od < current_od:
            for _ in range(current_od - target_od):
                instance = self.current_instances['ondemand'].pop()
                self._terminate_gracefully(instance)

    def _check_spot_health(self):
        """Monitor spot instances for preemption warnings."""
        for instance in self.current_instances['spot']:
            if instance.has_preemption_warning():
                self.spot_manager.on_preemption_warning(instance.id)
                self.current_instances['spot'].remove(instance)

    def get_cost_report(self):
        return self.reporter.report()

    def _launch_spot_instance(self):
        """Launch a spot GPU instance."""
        # Cloud API integration
        pass

    def _launch_ondemand_instance(self):
        """Launch an on-demand GPU instance."""
        pass

    def _terminate_gracefully(self, instance):
        """Gracefully terminate an instance after draining."""
        drain = SpotDrainStrategy(strategy="let_short_finish")
        drain.drain(instance.worker, self._get_available_workers())
        instance.terminate()

    def _get_available_workers(self):
        workers = []
        for tier in self.current_instances.values():
            for instance in tier:
                workers.append(instance.worker)
        return workers