A 24/7 Llama 70B deployment on 4Γ8H100 nodes costs 25,200/month but spot reclamation at peak traffic causes 40-second service gaps (unacceptable). The optimal hybrid: 2 reserved nodes for baseline traffic (12,600/month), and on-demand bursting for traffic spikes (69,600/month (31% savings) with zero SLA violations. Dynamoβs cost optimizer implements this as a linear program that rebalances every 5 minutes based on traffic forecasts and spot pricing.
The Cost Model
Instance Types and Pricing
from dataclasses import dataclass
@dataclass
class InstanceType:
name: str
gpus: int
gpu_type: str
hourly_cost_ondemand: float
hourly_cost_reserved_1yr: float
hourly_cost_spot: float
spot_reclaim_probability: float # Per-hour probability of reclamation
max_batch_size: int # Max concurrent sequences
prefill_throughput: float # Tokens/sec for prefill
decode_throughput: float # Tokens/sec for decode
INSTANCE_TYPES = {
"h100_8x": InstanceType(
name="h100_8x",
gpus=8,
gpu_type="H100",
hourly_cost_ondemand=32.77,
hourly_cost_reserved_1yr=19.06,
hourly_cost_spot=9.83,
spot_reclaim_probability=0.05,
max_batch_size=256,
prefill_throughput=50000,
decode_throughput=8000,
),
"a100_8x": InstanceType(
name="a100_8x",
gpus=8,
gpu_type="A100-80G",
hourly_cost_ondemand=22.03,
hourly_cost_reserved_1yr=13.22,
hourly_cost_spot=6.61,
spot_reclaim_probability=0.08,
max_batch_size=128,
prefill_throughput=30000,
decode_throughput=5000,
),
}
The Optimization Objective
Minimize total hourly cost subject to throughput and latency constraints:
Subject to:
where are the number of reserved, spot, and on-demand instances, are their hourly costs, is total throughput, and the SLO constraints ensure latency targets are met.
Monthly Cost Comparison: Llama 70B Serving (4 nodes, 24/7)
| Strategy | Reserved | On-Demand | Spot | Monthly Cost | Savings |
|---|---|---|---|---|---|
| All on-demand | 0 | 4 | 0 | $94,700 | Baseline |
| All reserved (1yr) | 4 | 0 | 0 | $55,000 | 42% |
| Reserved + spot burst | 2 | 0 | 2 (peak only) | $44,200 | 53% |
| Hybrid optimized | 2 | 0.5 avg | 1.5 avg | $38,500 | 59% |
Traffic Pattern Analysis
Demand Profiling
The cost optimizer needs a model of expected traffic. Most LLM workloads show predictable daily and weekly patterns:
import numpy as np
from collections import deque
class DemandProfiler:
"""Profile traffic patterns to predict future demand."""
def __init__(self, history_days=14, resolution_minutes=5):
self.history_days = history_days
self.resolution = resolution_minutes
self.bins_per_day = 24 * 60 // resolution_minutes
# Store historical demand as [day_of_week][time_bin] -> [observations]
self.history = [[[] for _ in range(self.bins_per_day)] for _ in range(7)]
def record(self, timestamp, rps, avg_prompt_len, avg_output_len):
"""Record a traffic observation."""
import datetime
dt = datetime.datetime.fromtimestamp(timestamp)
day = dt.weekday()
bin_idx = (dt.hour * 60 + dt.minute) // self.resolution
self.history[day][bin_idx].append({
'rps': rps,
'prompt_len': avg_prompt_len,
'output_len': avg_output_len,
'tokens_per_sec': rps * (avg_prompt_len + avg_output_len),
})
def predict(self, timestamp):
"""Predict demand for a future time."""
import datetime
dt = datetime.datetime.fromtimestamp(timestamp)
day = dt.weekday()
bin_idx = (dt.hour * 60 + dt.minute) // self.resolution
observations = self.history[day][bin_idx]
if not observations:
return self._global_average()
# Use P75 as the planning target (handle typical peaks)
rps_values = [o['rps'] for o in observations]
tps_values = [o['tokens_per_sec'] for o in observations]
return {
'rps_expected': np.percentile(rps_values, 50),
'rps_p75': np.percentile(rps_values, 75),
'rps_p95': np.percentile(rps_values, 95),
'tps_expected': np.percentile(tps_values, 50),
'tps_p75': np.percentile(tps_values, 75),
'tps_p95': np.percentile(tps_values, 95),
}
def _global_average(self):
all_rps = []
for day in self.history:
for time_bin in day:
for obs in time_bin:
all_rps.append(obs['rps'])
if not all_rps:
return {'rps_expected': 1, 'rps_p75': 2, 'rps_p95': 5,
'tps_expected': 1000, 'tps_p75': 2000, 'tps_p95': 5000}
return {
'rps_expected': np.median(all_rps),
'rps_p75': np.percentile(all_rps, 75),
'rps_p95': np.percentile(all_rps, 95),
'tps_expected': np.median(all_rps) * 1500,
'tps_p75': np.percentile(all_rps, 75) * 1500,
'tps_p95': np.percentile(all_rps, 95) * 1500,
}
Capacity Planning
class CapacityPlanner:
"""Compute required instances from demand forecast."""
def __init__(self, instance_type, slo_config):
self.instance = instance_type
self.slo = slo_config
def compute_required_instances(self, demand):
"""Compute minimum instances to meet demand with SLOs."""
tps_required = demand['tps_p75']
# Each instance can handle max_decode_throughput tokens/sec
# But actual throughput depends on batch utilization
# At SLO-constrained operation, typical utilization is 70%
effective_tps_per_instance = self.instance.decode_throughput * 0.70
# Also consider prefill overhead: each request needs prefill time
# which reduces decode capacity
rps = demand['rps_p75']
avg_prompt_tokens = 1500 # Typical
prefill_overhead_fraction = (
rps * avg_prompt_tokens / self.instance.prefill_throughput
)
adjusted_tps = effective_tps_per_instance * (1 - prefill_overhead_fraction)
adjusted_tps = max(adjusted_tps, effective_tps_per_instance * 0.3)
instances_for_throughput = int(np.ceil(tps_required / adjusted_tps))
# Also check: enough instances for request concurrency
avg_active_requests = rps * (avg_prompt_tokens + 500) / adjusted_tps
instances_for_concurrency = int(np.ceil(
avg_active_requests / self.instance.max_batch_size
))
return max(instances_for_throughput, instances_for_concurrency, 1)
The Hybrid Strategy
Three Tiers of Capacity
class HybridCapacityStrategy:
"""Allocate capacity across reserved, spot, and on-demand tiers."""
def __init__(self, instance_type, demand_profiler):
self.instance = instance_type
self.profiler = demand_profiler
def compute_allocation(self, planning_horizon_days=30):
"""Compute optimal allocation across tiers."""
# Analyze demand across all time periods
demands = []
import datetime
now = datetime.datetime.now()
for day_offset in range(planning_horizon_days):
for hour in range(24):
for minute_bin in range(0, 60, 5):
ts = (now + datetime.timedelta(
days=day_offset, hours=hour, minutes=minute_bin
)).timestamp()
demands.append(self.profiler.predict(ts))
# Compute capacity needs at different percentiles
all_tps = [d['tps_p75'] for d in demands]
eff_tps = self.instance.decode_throughput * 0.70
instances_needed = [int(np.ceil(tps / eff_tps)) for tps in all_tps]
# Reserved: covers the baseline (P25 of demand)
# This capacity is needed 75%+ of the time
reserved_count = int(np.percentile(instances_needed, 25))
# Spot: covers typical peaks (P25 to P75)
# Cheap but can be reclaimed
peak_75 = int(np.percentile(instances_needed, 75))
spot_count = peak_75 - reserved_count
# On-demand: covers extreme peaks (P75 to P99)
# Expensive but guaranteed
peak_99 = int(np.percentile(instances_needed, 99))
ondemand_max = peak_99 - peak_75
# Compute costs
hours_per_month = planning_horizon_days * 24
reserved_cost = reserved_count * self.instance.hourly_cost_reserved_1yr * hours_per_month
spot_cost = spot_count * self.instance.hourly_cost_spot * hours_per_month * 0.6 # 60% utilization
ondemand_cost = ondemand_max * self.instance.hourly_cost_ondemand * hours_per_month * 0.1 # 10% utilization
return {
'reserved': reserved_count,
'spot': spot_count,
'ondemand_max': ondemand_max,
'monthly_cost': reserved_cost + spot_cost + ondemand_cost,
'cost_breakdown': {
'reserved': reserved_cost,
'spot': spot_cost,
'ondemand': ondemand_cost,
},
}
Spot Instance Management
Preemption Handling
Cloud providers give a 2-minute warning before reclaiming spot instances. Dynamo must gracefully handle this:
class SpotInstanceManager:
"""Handle spot instance lifecycle including preemption."""
def __init__(self, dynamo_router, kv_cache_manager):
self.router = dynamo_router
self.kv_manager = kv_cache_manager
self.spot_instances = {}
def register_spot(self, instance_id, worker):
"""Register a new spot instance."""
self.spot_instances[instance_id] = {
'worker': worker,
'status': 'active',
'active_requests': set(),
}
self.router.add_worker(worker)
def on_preemption_warning(self, instance_id):
"""Handle 2-minute preemption warning."""
if instance_id not in self.spot_instances:
return
spot = self.spot_instances[instance_id]
spot['status'] = 'draining'
# Step 1: Stop routing new requests to this instance
self.router.remove_worker(spot['worker'])
# Step 2: Migrate active requests to other instances
active_requests = list(spot['active_requests'])
for request_id in active_requests:
self._migrate_request(request_id, spot['worker'])
# Step 3: Transfer valuable KV cache blocks
self._transfer_kv_cache(spot['worker'])
def _migrate_request(self, request_id, from_worker):
"""Migrate an active request to another worker."""
# Find the best target worker
target = self.router.find_best_worker_excluding(from_worker)
if target is None:
# No available workers -- request will be requeued
from_worker.cancel_request(request_id)
self.router.requeue_request(request_id)
return
# Get the request's current state
request_state = from_worker.get_request_state(request_id)
# Option A: Transfer KV cache and resume (fast if NVLink)
if self._can_transfer_kv(from_worker, target):
kv_blocks = from_worker.get_kv_blocks(request_id)
target.receive_kv_blocks(kv_blocks)
target.resume_request(request_id, request_state)
else:
# Option B: Re-prefill on target (slower but always works)
from_worker.cancel_request(request_id)
target.restart_request(request_id, request_state)
def _transfer_kv_cache(self, from_worker):
"""Transfer hot KV cache blocks to surviving workers."""
# Identify frequently accessed prefixes
hot_blocks = from_worker.get_hot_kv_blocks(limit=100)
# Distribute to workers that serve similar traffic
for block_hash, block_data in hot_blocks:
# Find workers that would benefit from this prefix
workers_needing = self.router.kv_index.find_workers_for_prefix(block_hash)
if workers_needing:
target = workers_needing[0]
target.receive_kv_block(block_hash, block_data)
def _can_transfer_kv(self, source, target):
"""Check if fast KV transfer is possible."""
# NVLink or same-node PCIe: fast transfer
# Cross-node: too slow for 2-minute window
return source.node_id == target.node_id
Request Draining Strategy
When a spot instance is being reclaimed, ongoing decode requests should be drained gracefully:
class SpotDrainStrategy:
"""Strategies for draining requests from preempted spot instances."""
def __init__(self, strategy="migrate_top_n"):
self.strategy = strategy
def drain(self, spot_worker, available_workers, time_budget_seconds=90):
"""Drain requests from spot worker within time budget."""
requests = spot_worker.get_active_requests()
if self.strategy == "migrate_top_n":
# Migrate requests that are closest to completion first
requests.sort(key=lambda r: r.remaining_tokens)
migrated = 0
for request in requests:
if time.time() > spot_worker.preemption_deadline - 30:
break # Reserve 30s for cleanup
if request.remaining_tokens < 50:
# Almost done -- let it finish on the spot instance
continue
# Migrate
target = self._find_target(request, available_workers)
if target:
self._migrate(request, spot_worker, target)
migrated += 1
return migrated
elif self.strategy == "let_short_finish":
# Let short requests finish, migrate long ones
short_threshold = 100 # tokens
for request in requests:
if request.remaining_tokens <= short_threshold:
continue # Will finish before preemption
target = self._find_target(request, available_workers)
if target:
self._migrate(request, spot_worker, target)
def _find_target(self, request, workers):
"""Find the best target for migration."""
# Prefer worker with cached prefix
best = None
best_overlap = -1
for worker in workers:
overlap = worker.kv_overlap(request)
if overlap > best_overlap:
best_overlap = overlap
best = worker
return best
Cloud spot instance reclamation is not theoretical. During popular GPU periods (conference deadlines, product launches), spot reclamation rates can spike to 15-20% per hour. Your system must handle graceful degradation: when spot capacity disappears, fall back to fewer instances at reduced throughput rather than dropping requests.
The Cost Optimizer
Linear Programming Formulation
import numpy as np
from scipy.optimize import linprog
class CostOptimizer:
"""
Optimize GPU allocation across reserved, spot, and on-demand.
Runs every 5 minutes based on current and predicted demand.
"""
def __init__(self, instance_type, slo_config):
self.instance = instance_type
self.slo = slo_config
def optimize(self, current_demand, predicted_demand_1h):
"""
Find minimum-cost allocation meeting SLOs.
Variables: [n_reserved, n_spot, n_ondemand]
"""
# Costs per instance per hour
c = np.array([
self.instance.hourly_cost_reserved_1yr,
self.instance.hourly_cost_spot,
self.instance.hourly_cost_ondemand,
])
# Throughput per instance (decode tokens/sec)
tps_per_instance = self.instance.decode_throughput * 0.70
# Constraint 1: Meet current throughput demand
# n_r * tps + n_s * tps + n_d * tps >= demand_tps
# Spot instances have discounted reliability
spot_reliability = 1.0 - self.instance.spot_reclaim_probability
A_throughput = -np.array([
tps_per_instance,
tps_per_instance * spot_reliability,
tps_per_instance,
])
b_throughput = -current_demand['tps_p75']
# Constraint 2: Meet predicted demand (1h ahead)
A_predicted = -np.array([
tps_per_instance,
tps_per_instance * spot_reliability,
tps_per_instance,
])
b_predicted = -predicted_demand_1h['tps_p95']
# Constraint 3: Minimum instances for latency SLO
# At least 1 instance must be non-spot (for SLO guarantee)
A_reliability = np.array([[-1, 0, -1]]) # -(n_r + n_d) <= -1
b_reliability = np.array([-1])
# Stack constraints
A_ub = np.vstack([
A_throughput.reshape(1, -1),
A_predicted.reshape(1, -1),
A_reliability,
])
b_ub = np.array([b_throughput, b_predicted, b_reliability[0]])
# Bounds: n >= 0, n_reserved <= current reserved count (cannot change instantly)
bounds = [
(0, None), # reserved (already committed)
(0, None), # spot (can scale quickly)
(0, None), # on-demand (can scale quickly)
]
result = linprog(c, A_ub=A_ub, b_ub=b_ub, bounds=bounds, method='highs')
if result.success:
n_r, n_s, n_d = result.x
return {
'reserved': int(np.ceil(n_r)),
'spot': int(np.ceil(n_s)),
'ondemand': int(np.ceil(n_d)),
'hourly_cost': result.fun,
'total_instances': int(np.ceil(n_r + n_s + n_d)),
}
else:
# Fallback: scale on-demand to meet demand
n_total = int(np.ceil(current_demand['tps_p95'] / tps_per_instance))
return {
'reserved': 0, 'spot': 0, 'ondemand': n_total,
'hourly_cost': n_total * self.instance.hourly_cost_ondemand,
'total_instances': n_total,
}
Rolling Optimization
The optimizer runs periodically, adjusting allocation based on demand changes:
class RollingCostOptimizer:
"""Continuously optimize cost allocation."""
def __init__(self, optimizer, profiler, interval_seconds=300):
self.optimizer = optimizer
self.profiler = profiler
self.interval = interval_seconds
self.current_allocation = None
self.allocation_history = []
def run_step(self):
"""Run one optimization step."""
import time as t
now = t.time()
# Get current demand
current = self._measure_current_demand()
# Predict demand 1 hour ahead
predicted = self.profiler.predict(now + 3600)
# Optimize
new_allocation = self.optimizer.optimize(current, predicted)
# Apply changes (with damping to avoid thrashing)
if self.current_allocation:
damped = self._damp_changes(self.current_allocation, new_allocation)
else:
damped = new_allocation
# Execute scaling actions
self._apply_allocation(damped)
self.current_allocation = damped
self.allocation_history.append({
'timestamp': now,
'allocation': damped,
'demand': current,
})
def _damp_changes(self, current, proposed):
"""Prevent rapid oscillation in allocation."""
damped = {}
for key in ['reserved', 'spot', 'ondemand']:
curr = current.get(key, 0)
prop = proposed.get(key, 0)
# Max change per step: +2 instances or -1 instance
if prop > curr:
damped[key] = min(prop, curr + 2)
elif prop < curr:
damped[key] = max(prop, curr - 1)
else:
damped[key] = curr
damped['hourly_cost'] = sum(
damped[k] * getattr(self.optimizer.instance, f'hourly_cost_{"reserved_1yr" if k == "reserved" else k}')
for k in ['reserved', 'spot', 'ondemand']
)
return damped
def _apply_allocation(self, allocation):
"""Scale instances to match allocation."""
# Reserved: no action (already committed long-term)
# Spot: request or release spot instances
# On-demand: launch or terminate on-demand instances
pass # Integration with cloud API
Cost Monitoring and Reporting
class CostReporter:
"""Track and report serving costs."""
def __init__(self):
self.hourly_costs = []
self.instance_hours = {'reserved': 0, 'spot': 0, 'ondemand': 0}
self.tokens_generated = 0
self.requests_served = 0
def record_interval(self, allocation, tokens, requests, duration_hours):
cost = (
allocation['reserved'] * INSTANCE_TYPES['h100_8x'].hourly_cost_reserved_1yr +
allocation['spot'] * INSTANCE_TYPES['h100_8x'].hourly_cost_spot +
allocation['ondemand'] * INSTANCE_TYPES['h100_8x'].hourly_cost_ondemand
) * duration_hours
self.hourly_costs.append(cost / duration_hours)
self.instance_hours['reserved'] += allocation['reserved'] * duration_hours
self.instance_hours['spot'] += allocation['spot'] * duration_hours
self.instance_hours['ondemand'] += allocation['ondemand'] * duration_hours
self.tokens_generated += tokens
self.requests_served += requests
def report(self):
total_cost = sum(self.hourly_costs)
return {
'total_cost': total_cost,
'avg_hourly_cost': np.mean(self.hourly_costs) if self.hourly_costs else 0,
'cost_per_1M_tokens': (
total_cost / (self.tokens_generated / 1e6)
if self.tokens_generated > 0 else 0
),
'cost_per_request': (
total_cost / self.requests_served
if self.requests_served > 0 else 0
),
'instance_hours': self.instance_hours,
'spot_fraction': (
self.instance_hours['spot'] /
sum(self.instance_hours.values())
if sum(self.instance_hours.values()) > 0 else 0
),
}
Cost Savings: Hybrid Strategy vs. All-OnDemand (Monthly, 4-node avg)
($/month)Complete Cost Optimizer System
class DynamoCostOptimizer:
"""
Complete cost optimization system for Dynamo.
Manages hybrid capacity allocation with graceful spot preemption.
"""
def __init__(self, config):
self.instance_type = INSTANCE_TYPES[config.instance_type]
self.slo = config.slo_config
# Components
self.profiler = DemandProfiler()
self.optimizer = CostOptimizer(self.instance_type, self.slo)
self.rolling = RollingCostOptimizer(self.optimizer, self.profiler)
self.spot_manager = SpotInstanceManager(
dynamo_router=config.router,
kv_cache_manager=config.kv_manager,
)
self.reporter = CostReporter()
# State
self.current_instances = {
'reserved': [],
'spot': [],
'ondemand': [],
}
def start(self):
"""Start the cost optimizer loop."""
import threading
# Optimization loop
def optimize_loop():
while True:
self.rolling.run_step()
self._apply_allocation_changes()
time.sleep(300) # Every 5 minutes
# Spot monitoring loop
def spot_monitor_loop():
while True:
self._check_spot_health()
time.sleep(10) # Every 10 seconds
threading.Thread(target=optimize_loop, daemon=True).start()
threading.Thread(target=spot_monitor_loop, daemon=True).start()
def _apply_allocation_changes(self):
"""Apply allocation changes from optimizer."""
target = self.rolling.current_allocation
if not target:
return
# Scale spot instances
current_spot = len(self.current_instances['spot'])
target_spot = target['spot']
if target_spot > current_spot:
for _ in range(target_spot - current_spot):
instance = self._launch_spot_instance()
if instance:
self.current_instances['spot'].append(instance)
elif target_spot < current_spot:
for _ in range(current_spot - target_spot):
instance = self.current_instances['spot'].pop()
self._terminate_gracefully(instance)
# Scale on-demand similarly
current_od = len(self.current_instances['ondemand'])
target_od = target['ondemand']
if target_od > current_od:
for _ in range(target_od - current_od):
instance = self._launch_ondemand_instance()
if instance:
self.current_instances['ondemand'].append(instance)
elif target_od < current_od:
for _ in range(current_od - target_od):
instance = self.current_instances['ondemand'].pop()
self._terminate_gracefully(instance)
def _check_spot_health(self):
"""Monitor spot instances for preemption warnings."""
for instance in self.current_instances['spot']:
if instance.has_preemption_warning():
self.spot_manager.on_preemption_warning(instance.id)
self.current_instances['spot'].remove(instance)
def get_cost_report(self):
return self.reporter.report()
def _launch_spot_instance(self):
"""Launch a spot GPU instance."""
# Cloud API integration
pass
def _launch_ondemand_instance(self):
"""Launch an on-demand GPU instance."""
pass
def _terminate_gracefully(self, instance):
"""Gracefully terminate an instance after draining."""
drain = SpotDrainStrategy(strategy="let_short_finish")
drain.drain(instance.worker, self._get_available_workers())
instance.terminate()
def _get_available_workers(self):
workers = []
for tier in self.current_instances.values():
for instance in tier:
workers.append(instance.worker)
return workers