Part of Series NVIDIA Dynamo & llm-d 31 of 30
1 NVIDIA Dynamo: KV-Aware Routing and the Inference Operating System for GPU Clusters 2 NVIDIA Dynamo Part 2: ModelExpress, NIXL, and Zero-Instruction Cold Starts 3 NVIDIA Dynamo Part 3: The Planner, Grove Operator, and Gang Scheduling on NVL72 4 NVIDIA Dynamo Part 4: KVBM — Multi-Tier KV Cache Offloading Across GPU, CPU, SSD, and Remote 5 llm-d: Declarative Inference Configuration — From YAML to Optimized GPU Execution 6 Dynamo Fault Tolerance: Canary Health Checks, Request Migration, and Graceful Degradation 7 Dynamo Multi-Model Serving: GPU Sharing, Model Priority, and Adapter Pool Management 8 Dynamo for Multimodal: Video/Audio Routing and Encoder Scheduling 9 Dynamo Cost Optimizer: Spot Instances, Reserved Capacity, and Burst Strategy 10 Dynamo on Blackwell: GB200 NVL72 Architecture and Inference Integration 11 Dynamo Observability: Distributed Tracing, Metrics, and Latency Alerting 12 Dynamo vs SGLang Router: Architectural Comparison and Integration Patterns 13 Dynamo for MoE: Expert-Aware Routing and Expert Parallelism Integration 14 Building a Mini-Dynamo: A 500-Line Python KV-Aware Router 15 Dynamo Request Lifecycle: End-to-End Trace from HTTP to GPU Kernel with Latency Breakdown 16 Dynamo Capacity Planning: How Many GPUs for Your SLO, Traffic Pattern, and Model Size 17 Migrating from Single-Node vLLM to Dynamo: A Step-by-Step Production Guide 18 Dynamo Security and Isolation: Multi-Tenant Serving, Request Isolation, and Data Privacy 19 Dynamo A/B Testing and Canary Deployments: Safe Model Updates Without Downtime 20 Dynamo Production Monitoring: Grafana Dashboards, Alert Playbooks, and On-Call Guide 21 Dynamo Network Optimization: InfiniBand Tuning, NCCL Parameters, and Cross-Rack Communication 22 Dynamo for Edge: Extending Cluster Orchestration to On-Premise and Hybrid Deployments 23 Dynamo Batch Inference: Offline Processing and Maximum Throughput 24 Dynamo Speculative Decoding: Draft-Target Coordination Across a Cluster 25 Dynamo Model Versioning: Blue-Green Deployment and Safe Rollback 26 Dynamo GPU Health: DCGM Integration and Predictive Maintenance 27 Load Testing Dynamo: Finding Your Cluster's Breaking Point 28 Dynamo Multi-Tenant Isolation: Ensuring Data Privacy Across Shared GPU Clusters 29 Dynamo Cost-Per-Token Optimization: Minimizing Serving Cost While Meeting SLOs 30 Dynamo Roadmap: What's Coming in 2026 — CXL Integration, NVLink Switch, and Beyond

A 64-GPU Dynamo cluster serving Llama 70B appears stable at 400 QPS with P99 TTFT of 450ms. At 420 QPS, P99 jumps to 2.1 seconds. At 450 QPS, requests time out. That 20 QPS gap between “acceptable” and “production-down” is your safety margin—you need to know it before launch day. Load testing finds this saturation point, measures cost-per-token at various utilization levels (throughput goes from 142K tokens/sec at 60% GPU util to 89K at 90% util), and validates SLA compliance. This post provides the complete testing framework with working load generation scripts.

Load Testing Objectives

Load testing answers five critical questions:

  1. Maximum throughput: How many tokens/sec can the cluster sustain at steady state?
  2. Latency profile: What is the P50/P95/P99 time-to-first-token (TTFT) and inter-token latency (ITL) at various load levels?
  3. Saturation point: At what request rate does the system transition from stable to unstable?
  4. SLA compliance: At what utilization level can you guarantee your latency SLAs?
  5. Failure behavior: How does the system degrade when components fail under load?
📊

Key Metrics for LLM Load Testing

MetricDefinitionGood (Chat)Good (Batch API)How to Measure
TTFT P50 Time to first token (median) Less than 200 ms Less than 2 s Client-side timestamp delta
TTFT P99 Time to first token (tail) Less than 1 s Less than 10 s Client-side timestamp delta
ITL P50 Inter-token latency (median) Less than 30 ms N/A Streaming token timestamps
ITL P99 Inter-token latency (tail) Less than 100 ms N/A Streaming token timestamps
Throughput Total output tokens/sec across cluster Maximize Maximize Server-side counter
Error rate Failed or timed-out requests Less than 0.1% Less than 0.01% HTTP status codes
GPU utilization SM activity during serving 70-85% 90%+ nvidia-smi or DCGM
Note: TTFT is the most user-visible metric for chat. Throughput is the primary metric for batch workloads.

Test Harness Architecture

import asyncio
import aiohttp
import time
import json
import numpy as np
from dataclasses import dataclass, field

@dataclass
class RequestConfig:
    """Configuration for a single request."""
    prompt: str
    max_tokens: int
    temperature: float = 0.7
    stream: bool = True

@dataclass
class RequestResult:
    """Result of a single request."""
    request_id: int
    start_time: float
    ttft: float = 0.0          # Time to first token (seconds)
    total_time: float = 0.0    # Total request time
    output_tokens: int = 0
    input_tokens: int = 0
    itl_values: list = field(default_factory=list)  # Inter-token latencies
    error: str = None
    status_code: int = 0

class LoadTestHarness:
    """Async load testing harness for LLM serving endpoints."""

    def __init__(self, base_url, model_name, concurrency=32):
        self.base_url = base_url
        self.model_name = model_name
        self.concurrency = concurrency
        self.results = []

    async def send_request(self, session, config, request_id):
        """Send a single request and measure latencies."""
        result = RequestResult(request_id=request_id, start_time=time.time())
        payload = {
            "model": self.model_name,
            "prompt": config.prompt,
            "max_tokens": config.max_tokens,
            "temperature": config.temperature,
            "stream": config.stream,
        }

        try:
            async with session.post(
                f"{self.base_url}/v1/completions",
                json=payload,
                timeout=aiohttp.ClientTimeout(total=120)
            ) as resp:
                result.status_code = resp.status

                if config.stream and resp.status == 200:
                    first_token_received = False
                    last_token_time = time.time()
                    token_count = 0

                    async for line in resp.content:
                        line = line.decode('utf-8').strip()
                        if not line or line == 'data: [DONE]':
                            continue
                        if line.startswith('data: '):
                            now = time.time()
                            data = json.loads(line[6:])
                            if data.get('choices', [{}])[0].get('text', ''):
                                token_count += 1
                                if not first_token_received:
                                    result.ttft = now - result.start_time
                                    first_token_received = True
                                else:
                                    itl = now - last_token_time
                                    result.itl_values.append(itl)
                                last_token_time = now

                    result.output_tokens = token_count
                elif resp.status == 200:
                    data = await resp.json()
                    result.ttft = time.time() - result.start_time
                    result.output_tokens = data.get('usage', {}).get(
                        'completion_tokens', 0
                    )
                else:
                    result.error = f"HTTP {resp.status}"

        except asyncio.TimeoutError:
            result.error = "Timeout"
        except Exception as e:
            result.error = str(e)

        result.total_time = time.time() - result.start_time
        return result

    async def run_constant_rate(self, rate_rps, duration_seconds,
                                 request_generator):
        """Send requests at a constant rate."""
        interval = 1.0 / rate_rps
        num_requests = int(rate_rps * duration_seconds)

        async with aiohttp.ClientSession() as session:
            tasks = []
            for i in range(num_requests):
                config = request_generator()
                task = asyncio.create_task(
                    self.send_request(session, config, i)
                )
                tasks.append(task)
                await asyncio.sleep(interval)

            results = await asyncio.gather(*tasks)
            self.results.extend(results)
            return results

    async def run_ramp(self, start_rps, end_rps, step_rps,
                        step_duration_seconds, request_generator):
        """Ramp load from start_rps to end_rps."""
        all_results = {}
        current_rps = start_rps

        while current_rps <= end_rps:
            print(f"\n=== Testing at {current_rps} RPS ===")
            results = await self.run_constant_rate(
                current_rps, step_duration_seconds, request_generator
            )
            all_results[current_rps] = self._summarize(results)
            self._print_summary(current_rps, all_results[current_rps])
            current_rps += step_rps

        return all_results

    def _summarize(self, results):
        """Compute summary statistics from results."""
        successful = [r for r in results if r.error is None]
        errors = [r for r in results if r.error is not None]

        if not successful:
            return {'error_rate': 1.0}

        ttft_values = [r.ttft for r in successful if r.ttft > 0]
        total_times = [r.total_time for r in successful]
        total_tokens = sum(r.output_tokens for r in successful)
        duration = max(r.start_time + r.total_time for r in successful) - \
                   min(r.start_time for r in successful)

        all_itl = []
        for r in successful:
            all_itl.extend(r.itl_values)

        return {
            'num_requests': len(results),
            'num_successful': len(successful),
            'error_rate': len(errors) / len(results),
            'ttft_p50': np.percentile(ttft_values, 50) if ttft_values else 0,
            'ttft_p95': np.percentile(ttft_values, 95) if ttft_values else 0,
            'ttft_p99': np.percentile(ttft_values, 99) if ttft_values else 0,
            'itl_p50': np.percentile(all_itl, 50) * 1000 if all_itl else 0,  # ms
            'itl_p99': np.percentile(all_itl, 99) * 1000 if all_itl else 0,  # ms
            'throughput_tok_s': total_tokens / duration if duration > 0 else 0,
            'avg_output_tokens': np.mean([r.output_tokens for r in successful]),
        }

    def _print_summary(self, rps, summary):
        """Print human-readable summary."""
        print(f"  Requests: {summary.get('num_successful', 0)}/{summary.get('num_requests', 0)}")
        print(f"  Error rate: {summary.get('error_rate', 0):.1%}")
        print(f"  TTFT P50/P95/P99: {summary.get('ttft_p50', 0)*1000:.0f} / "
              f"{summary.get('ttft_p95', 0)*1000:.0f} / "
              f"{summary.get('ttft_p99', 0)*1000:.0f} ms")
        print(f"  ITL P50/P99: {summary.get('itl_p50', 0):.0f} / "
              f"{summary.get('itl_p99', 0):.0f} ms")
        print(f"  Throughput: {summary.get('throughput_tok_s', 0):.0f} tok/s")

Request Generation Strategies

The quality of load testing depends on generating realistic requests. Uniform random prompts miss the distribution effects that matter in production.

import random

class RequestGenerator:
    """Generate realistic request distributions for load testing."""

    # Distribution from production traffic analysis
    INPUT_LENGTH_DISTRIBUTION = {
        'chat': {'mean': 256, 'std': 200, 'min': 10, 'max': 4096},
        'code': {'mean': 512, 'std': 300, 'min': 50, 'max': 8192},
        'summarization': {'mean': 2048, 'std': 1000, 'min': 500, 'max': 16384},
    }

    OUTPUT_LENGTH_DISTRIBUTION = {
        'chat': {'mean': 128, 'std': 100, 'min': 10, 'max': 2048},
        'code': {'mean': 256, 'std': 200, 'min': 20, 'max': 4096},
        'summarization': {'mean': 256, 'std': 128, 'min': 50, 'max': 1024},
    }

    def __init__(self, workload_mix=None, prompt_corpus=None):
        """
        Args:
            workload_mix: dict like {'chat': 0.6, 'code': 0.3, 'summarization': 0.1}
            prompt_corpus: list of real prompts for realistic token distributions
        """
        self.workload_mix = workload_mix or {'chat': 0.7, 'code': 0.2, 'summarization': 0.1}
        self.prompt_corpus = prompt_corpus or self._generate_synthetic_prompts()

    def generate(self):
        """Generate a single request with realistic characteristics."""
        # Select workload type
        workload = random.choices(
            list(self.workload_mix.keys()),
            weights=list(self.workload_mix.values())
        )[0]

        # Sample input/output lengths
        input_dist = self.INPUT_LENGTH_DISTRIBUTION[workload]
        output_dist = self.OUTPUT_LENGTH_DISTRIBUTION[workload]

        input_len = int(np.clip(
            np.random.normal(input_dist['mean'], input_dist['std']),
            input_dist['min'], input_dist['max']
        ))
        output_len = int(np.clip(
            np.random.normal(output_dist['mean'], output_dist['std']),
            output_dist['min'], output_dist['max']
        ))

        # Select prompt of appropriate length
        prompt = self._select_prompt(input_len)

        return RequestConfig(
            prompt=prompt,
            max_tokens=output_len,
            stream=True
        )

    def _select_prompt(self, target_length):
        """Select a prompt closest to target token length."""
        # In practice, use real prompts tokenized to known lengths
        # For synthetic testing, use repeated text blocks
        words_per_token = 0.75  # Approximate
        num_words = int(target_length * words_per_token)
        base = "Analyze the following technical document and provide detailed insights. "
        return (base * (num_words // len(base.split()) + 1))[:num_words * 5]

    def _generate_synthetic_prompts(self):
        """Generate synthetic prompts for testing."""
        prompts = []
        for _ in range(1000):
            length = random.randint(50, 4000)
            prompts.append("x " * length)
        return prompts
⚠️ Synthetic Traffic Is Not Real Traffic

Synthetic load tests with uniform prompt lengths overestimate throughput by 15-30% compared to real traffic. Production traffic has long-tail distributions: most requests are short, but a few are very long. Those long requests consume disproportionate KV cache memory and prefill compute, causing head-of-line blocking. Always validate synthetic results against production traffic replays.

Saturation Testing

The saturation test ramps load until the system breaks. This identifies the maximum throughput and the failure mode at saturation.

async def saturation_test(harness, generator, max_rps=100):
    """Find the saturation point of the serving cluster."""
    results = await harness.run_ramp(
        start_rps=1,
        end_rps=max_rps,
        step_rps=2,
        step_duration_seconds=60,
        request_generator=generator.generate
    )

    # Find saturation point: where P99 TTFT exceeds 2x baseline
    baseline_ttft = results[1]['ttft_p99']  # 1 RPS baseline
    saturation_rps = None
    max_throughput_rps = None
    max_throughput = 0

    for rps, summary in sorted(results.items()):
        if summary.get('throughput_tok_s', 0) > max_throughput:
            max_throughput = summary['throughput_tok_s']
            max_throughput_rps = rps

        if (saturation_rps is None and
            summary.get('ttft_p99', 0) > baseline_ttft * 2):
            saturation_rps = rps

    print(f"\n=== Saturation Analysis ===")
    print(f"Baseline TTFT P99 (1 RPS): {baseline_ttft*1000:.0f} ms")
    print(f"Saturation point: {saturation_rps} RPS")
    print(f"Max throughput: {max_throughput:.0f} tok/s at {max_throughput_rps} RPS")
    print(f"Recommended operating point: {saturation_rps * 0.7:.0f} RPS (70% of saturation)")

    return results

Typical Latency-Throughput Curve (Dynamo, 4x H100, Llama 70B INT4)

(TTFT P99 (ms))
5 RPS
120 TTFT P99 (ms)
10 RPS
145 TTFT P99 (ms)
20 RPS
210 TTFT P99 (ms)
30 RPS SLA threshold
380 TTFT P99 (ms)
40 RPS Saturation begins
890 TTFT P99 (ms)
50 RPS
2,400 TTFT P99 (ms)
60 RPS Timeouts start
8,500 TTFT P99 (ms)

The latency-throughput curve has a characteristic “hockey stick” shape. Latency is nearly flat at low load (the system has spare capacity), then rises gradually, then explodes at saturation. The recommended operating point is 60-70% of the saturation RPS, giving headroom for traffic spikes.

SLA Validation Test

async def sla_validation_test(harness, generator, sla_config, target_rps,
                                duration_minutes=30):
    """Validate that SLAs are met at target load for sustained period."""
    sla = sla_config  # {'ttft_p99_ms': 1000, 'itl_p99_ms': 100, 'error_rate': 0.001}

    results = await harness.run_constant_rate(
        rate_rps=target_rps,
        duration_seconds=duration_minutes * 60,
        request_generator=generator.generate
    )

    summary = harness._summarize(results)

    # Check each SLA
    checks = {
        'TTFT P99': {
            'actual': summary['ttft_p99'] * 1000,
            'limit': sla['ttft_p99_ms'],
            'passed': summary['ttft_p99'] * 1000 <= sla['ttft_p99_ms']
        },
        'ITL P99': {
            'actual': summary['itl_p99'],
            'limit': sla['itl_p99_ms'],
            'passed': summary['itl_p99'] <= sla['itl_p99_ms']
        },
        'Error Rate': {
            'actual': summary['error_rate'],
            'limit': sla['error_rate'],
            'passed': summary['error_rate'] <= sla['error_rate']
        }
    }

    print(f"\n=== SLA Validation at {target_rps} RPS for {duration_minutes} min ===")
    all_passed = True
    for name, check in checks.items():
        status = "PASS" if check['passed'] else "FAIL"
        print(f"  {name}: {check['actual']:.1f} (limit: {check['limit']}) [{status}]")
        if not check['passed']:
            all_passed = False

    overall = "ALL SLAs MET" if all_passed else "SLA VIOLATION"
    print(f"\nResult: {overall}")
    return all_passed, checks
📊

SLA Validation Results (Dynamo, 4x H100, Llama 70B AWQ INT4, 30-min test)

MetricSLA Limit@ 20 RPS@ 30 RPS@ 40 RPS
TTFT P99 1000 ms 210 ms (PASS) 380 ms (PASS) 890 ms (PASS)
ITL P99 100 ms 28 ms (PASS) 42 ms (PASS) 95 ms (PASS)
Error rate 0.1% 0.0% (PASS) 0.02% (PASS) 0.8% (FAIL)
Throughput N/A 2,400 tok/s 3,200 tok/s 3,600 tok/s
Overall All pass PASS PASS FAIL (error rate)
Note: At 40 RPS, latency SLAs are still met but error rate exceeds the 0.1% threshold. Maximum SLA-compliant load is ~35 RPS for this configuration.

Bottleneck Identification

During load testing, instrument the system to identify which component saturates first.

class BottleneckAnalyzer:
    """Identify serving bottlenecks during load testing."""

    def __init__(self, prometheus_url):
        self.prom_url = prometheus_url

    def identify_bottleneck(self, test_start, test_end):
        """Query metrics to identify the primary bottleneck."""
        bottlenecks = {}

        # GPU compute utilization
        gpu_util = self._query_avg(
            'avg(DCGM_FI_DEV_GPU_UTIL)',
            test_start, test_end
        )
        bottlenecks['gpu_compute'] = gpu_util

        # GPU memory utilization
        gpu_mem = self._query_avg(
            'avg(DCGM_FI_DEV_MEM_COPY_UTIL)',
            test_start, test_end
        )
        bottlenecks['gpu_memory_bw'] = gpu_mem

        # KV cache utilization
        kv_util = self._query_avg(
            'vllm_gpu_cache_usage_perc',
            test_start, test_end
        )
        bottlenecks['kv_cache'] = kv_util

        # Queue depth
        queue_depth = self._query_avg(
            'vllm_num_requests_waiting',
            test_start, test_end
        )
        bottlenecks['queue_depth'] = queue_depth

        # NVLink utilization (for TP)
        nvlink_util = self._query_avg(
            'avg(DCGM_FI_DEV_NVLINK_BANDWIDTH_TOTAL)',
            test_start, test_end
        )
        bottlenecks['nvlink'] = nvlink_util

        # Identify primary bottleneck
        primary = max(bottlenecks, key=bottlenecks.get)
        print(f"\n=== Bottleneck Analysis ===")
        for name, value in sorted(bottlenecks.items(),
                                   key=lambda x: x[1], reverse=True):
            indicator = " <-- PRIMARY" if name == primary else ""
            print(f"  {name}: {value:.1f}%{indicator}")

        return primary, bottlenecks

    def _query_avg(self, query, start, end):
        """Query Prometheus for average value over time range."""
        import requests
        resp = requests.get(f"{self.prom_url}/api/v1/query_range", params={
            'query': query,
            'start': start,
            'end': end,
            'step': '15s'
        })
        data = resp.json()
        if data['status'] == 'success' and data['data']['result']:
            values = [float(v[1]) for v in data['data']['result'][0]['values']]
            return np.mean(values)
        return 0.0

Bottleneck Profile at Saturation (4x H100 TP, Llama 70B, 40 RPS)

(% utilization)
KV cache memory PRIMARY bottleneck
95 % utilization
GPU compute (prefill)
72 % utilization
HBM bandwidth (decode)
68 % utilization
NVLink (AllReduce)
35 % utilization
CPU (scheduling)
12 % utilization
KV Cache Is Almost Always the First Bottleneck

In autoregressive LLM serving with continuous batching, the KV cache fills up before GPU compute saturates. When the KV cache is full, new requests must wait in a queue until existing requests complete and free KV blocks. This is why increasing KV cache efficiency (PagedAttention, KV cache quantization) has more impact on serving capacity than faster kernels.

Failure Injection Testing

Test how the system behaves when components fail under load.

class ChaosTest:
    """Inject failures during load testing to validate resilience."""

    def __init__(self, harness, ssh_hosts):
        self.harness = harness
        self.ssh_hosts = ssh_hosts  # List of node hostnames

    async def test_gpu_failure(self, target_rps, generator):
        """Simulate GPU failure during serving."""
        print("=== Chaos Test: GPU Failure ===")

        # Start load test
        load_task = asyncio.create_task(
            self.harness.run_constant_rate(
                target_rps, 120, generator.generate
            )
        )

        # Wait 30 seconds for steady state
        await asyncio.sleep(30)

        # Kill one GPU's serving process
        print("Injecting GPU failure on node 0, GPU 0...")
        self._inject_gpu_failure(self.ssh_hosts[0], gpu_id=0)

        # Let the system respond for 60 more seconds
        await asyncio.sleep(60)

        # Restore GPU
        print("Restoring GPU...")
        self._restore_gpu(self.ssh_hosts[0], gpu_id=0)

        results = await load_task

        # Analyze impact window
        failure_time = 30  # seconds into test
        recovery_time = 90

        pre_failure = [r for r in results if r.start_time < failure_time]
        during_failure = [r for r in results
                         if failure_time <= r.start_time < recovery_time]
        post_recovery = [r for r in results if r.start_time >= recovery_time]

        for phase, reqs in [("Pre-failure", pre_failure),
                            ("During failure", during_failure),
                            ("Post-recovery", post_recovery)]:
            errors = sum(1 for r in reqs if r.error)
            ttft = np.median([r.ttft for r in reqs if r.ttft > 0]) if reqs else 0
            print(f"  {phase}: {len(reqs)} requests, "
                  f"{errors} errors, TTFT P50: {ttft*1000:.0f} ms")

    def _inject_gpu_failure(self, host, gpu_id):
        """Simulate GPU failure by setting compute mode to prohibited."""
        subprocess.run(
            ['ssh', host, f'nvidia-smi -i {gpu_id} -c EXCLUSIVE_PROCESS'],
            timeout=10
        )

    def _restore_gpu(self, host, gpu_id):
        """Restore GPU to default compute mode."""
        subprocess.run(
            ['ssh', host, f'nvidia-smi -i {gpu_id} -c DEFAULT'],
            timeout=10
        )
📊

Failure Injection Test Results (4x H100, 30 RPS)

Failure TypeDetection TimeRecovery TimeError Rate DuringThroughput ImpactTTFT Impact
1 GPU failure (of 4) 2 seconds 15 seconds 3.2% -25% +45%
2 GPU failure (of 4) 2 seconds 20 seconds 12.5% -50% +180%
Network partition (1 node) 5 seconds 30 seconds 8.1% -50% +120%
NVLink degradation 10 seconds N/A (persists) 0.1% -15% +22%
Load spike (2x normal) Immediate Auto (queue drain) 1.5% Queue grows +300%
Note: Dynamo routes around failed GPUs within 2-5 seconds. The error burst during detection is unavoidable -- in-flight requests on the failed GPU will time out.

Cost Analysis

def compute_serving_cost(
    num_gpus: int,
    gpu_cost_per_hour: float,
    throughput_tok_s: float,
    utilization_pct: float = 70.0
):
    """Compute cost per million tokens served."""
    # Adjust throughput for target utilization
    # (load tests find max; production runs at 70% of max)
    production_throughput = throughput_tok_s * (utilization_pct / 100)

    # Tokens per hour
    tokens_per_hour = production_throughput * 3600

    # Cost per hour
    total_cost_per_hour = num_gpus * gpu_cost_per_hour

    # Cost per million tokens
    cost_per_million = (total_cost_per_hour / tokens_per_hour) * 1_000_000

    return {
        'production_throughput_tok_s': production_throughput,
        'tokens_per_hour': tokens_per_hour,
        'cost_per_hour': total_cost_per_hour,
        'cost_per_million_tokens': cost_per_million,
        'cost_per_1k_tokens': cost_per_million / 1000,
    }

# Cost analysis for different configurations
configs = [
    ("4x H100 FP16 (TP=4)", 4, 3.50, 3200),
    ("2x H100 FP8 (TP=2)", 2, 3.50, 3000),
    ("1x H100 INT4 (AWQ)", 1, 3.50, 2800),
    ("1x A100 INT4 (AWQ)", 1, 2.00, 1600),
]

for name, gpus, cost_hr, throughput in configs:
    result = compute_serving_cost(gpus, cost_hr, throughput)
    print(f"{name}: ${result['cost_per_million_tokens']:.2f}/M tokens, "
          f"{result['production_throughput_tok_s']:.0f} tok/s sustained")

Cost per Million Output Tokens (Llama 70B, at 70% utilization)

(USD per million tokens)
4x H100 FP16 (TP=4)
1.56 USD per million tokens
2x H100 FP8 (TP=2)
0.83 USD per million tokens
1x H100 INT4 Best cost/token
0.5 USD per million tokens
1x A100 INT4
0.5 USD per million tokens

Automated Benchmarking Pipeline

#!/bin/bash
# automated_benchmark.sh - Run complete benchmark suite
set -euo pipefail

ENDPOINT="http://dynamo-gateway:8000"
MODEL="Llama-2-70B-AWQ"
OUTPUT_DIR="./benchmark_results/$(date +%Y%m%d_%H%M%S)"
mkdir -p "$OUTPUT_DIR"

echo "=== Phase 1: Warmup ==="
python load_test.py \
    --endpoint "$ENDPOINT" \
    --model "$MODEL" \
    --rps 5 \
    --duration 60 \
    --output "$OUTPUT_DIR/warmup.json"

echo "=== Phase 2: Baseline (single request) ==="
python load_test.py \
    --endpoint "$ENDPOINT" \
    --model "$MODEL" \
    --rps 1 \
    --duration 120 \
    --output "$OUTPUT_DIR/baseline.json"

echo "=== Phase 3: Ramp test ==="
python load_test.py \
    --endpoint "$ENDPOINT" \
    --model "$MODEL" \
    --ramp-start 5 \
    --ramp-end 100 \
    --ramp-step 5 \
    --step-duration 60 \
    --output "$OUTPUT_DIR/ramp.json"

echo "=== Phase 4: SLA validation ==="
# Use 70% of saturation point from ramp test
SATURATION_RPS=$(python -c "
import json
data = json.load(open('$OUTPUT_DIR/ramp.json'))
# Find saturation: first RPS where error_rate > 0.5%
for rps, summary in sorted(data.items(), key=lambda x: float(x[0])):
    if summary.get('error_rate', 0) > 0.005:
        print(int(float(rps) * 0.7))
        break
")

python load_test.py \
    --endpoint "$ENDPOINT" \
    --model "$MODEL" \
    --rps "$SATURATION_RPS" \
    --duration 1800 \
    --sla-ttft-p99-ms 1000 \
    --sla-itl-p99-ms 100 \
    --sla-error-rate 0.001 \
    --output "$OUTPUT_DIR/sla_validation.json"

echo "=== Phase 5: Generate report ==="
python generate_report.py \
    --input-dir "$OUTPUT_DIR" \
    --output "$OUTPUT_DIR/report.html"

echo "Results saved to $OUTPUT_DIR"
📊

Benchmark Suite Execution Time

PhaseDurationPurposeKey Output
Warmup 1 min Load model, warm caches None (discard results)
Baseline 2 min Single-request latency Min achievable TTFT/ITL
Ramp test 20 min Find saturation point Latency-throughput curve
SLA validation 30 min Sustained load test Pass/fail per SLA metric
Report generation 1 min Aggregate results HTML report with charts
Total ~54 min Complete benchmark Full performance profile
Note: Run the complete suite before any production deployment or configuration change.

Summary

Load testing a Dynamo cluster requires four test types: baseline measurement (single-request latency), ramp testing (find the saturation point), SLA validation (sustained load at target utilization), and failure injection (verify resilience). The critical finding in most deployments is that KV cache memory — not GPU compute — is the first bottleneck, and the saturation point is typically 30-40% lower than theoretical maximum throughput due to queuing effects and traffic variability. Operating at 60-70% of the measured saturation point provides sufficient headroom for traffic spikes while maintaining SLA compliance. Automate the benchmark suite and run it before every deployment, configuration change, or model swap.