A 64-GPU Dynamo cluster serving Llama 70B appears stable at 400 QPS with P99 TTFT of 450ms. At 420 QPS, P99 jumps to 2.1 seconds. At 450 QPS, requests time out. That 20 QPS gap between “acceptable” and “production-down” is your safety margin—you need to know it before launch day. Load testing finds this saturation point, measures cost-per-token at various utilization levels (throughput goes from 142K tokens/sec at 60% GPU util to 89K at 90% util), and validates SLA compliance. This post provides the complete testing framework with working load generation scripts.
Load Testing Objectives
Load testing answers five critical questions:
- Maximum throughput: How many tokens/sec can the cluster sustain at steady state?
- Latency profile: What is the P50/P95/P99 time-to-first-token (TTFT) and inter-token latency (ITL) at various load levels?
- Saturation point: At what request rate does the system transition from stable to unstable?
- SLA compliance: At what utilization level can you guarantee your latency SLAs?
- Failure behavior: How does the system degrade when components fail under load?
Key Metrics for LLM Load Testing
| Metric | Definition | Good (Chat) | Good (Batch API) | How to Measure |
|---|---|---|---|---|
| TTFT P50 | Time to first token (median) | Less than 200 ms | Less than 2 s | Client-side timestamp delta |
| TTFT P99 | Time to first token (tail) | Less than 1 s | Less than 10 s | Client-side timestamp delta |
| ITL P50 | Inter-token latency (median) | Less than 30 ms | N/A | Streaming token timestamps |
| ITL P99 | Inter-token latency (tail) | Less than 100 ms | N/A | Streaming token timestamps |
| Throughput | Total output tokens/sec across cluster | Maximize | Maximize | Server-side counter |
| Error rate | Failed or timed-out requests | Less than 0.1% | Less than 0.01% | HTTP status codes |
| GPU utilization | SM activity during serving | 70-85% | 90%+ | nvidia-smi or DCGM |
Test Harness Architecture
import asyncio
import aiohttp
import time
import json
import numpy as np
from dataclasses import dataclass, field
@dataclass
class RequestConfig:
"""Configuration for a single request."""
prompt: str
max_tokens: int
temperature: float = 0.7
stream: bool = True
@dataclass
class RequestResult:
"""Result of a single request."""
request_id: int
start_time: float
ttft: float = 0.0 # Time to first token (seconds)
total_time: float = 0.0 # Total request time
output_tokens: int = 0
input_tokens: int = 0
itl_values: list = field(default_factory=list) # Inter-token latencies
error: str = None
status_code: int = 0
class LoadTestHarness:
"""Async load testing harness for LLM serving endpoints."""
def __init__(self, base_url, model_name, concurrency=32):
self.base_url = base_url
self.model_name = model_name
self.concurrency = concurrency
self.results = []
async def send_request(self, session, config, request_id):
"""Send a single request and measure latencies."""
result = RequestResult(request_id=request_id, start_time=time.time())
payload = {
"model": self.model_name,
"prompt": config.prompt,
"max_tokens": config.max_tokens,
"temperature": config.temperature,
"stream": config.stream,
}
try:
async with session.post(
f"{self.base_url}/v1/completions",
json=payload,
timeout=aiohttp.ClientTimeout(total=120)
) as resp:
result.status_code = resp.status
if config.stream and resp.status == 200:
first_token_received = False
last_token_time = time.time()
token_count = 0
async for line in resp.content:
line = line.decode('utf-8').strip()
if not line or line == 'data: [DONE]':
continue
if line.startswith('data: '):
now = time.time()
data = json.loads(line[6:])
if data.get('choices', [{}])[0].get('text', ''):
token_count += 1
if not first_token_received:
result.ttft = now - result.start_time
first_token_received = True
else:
itl = now - last_token_time
result.itl_values.append(itl)
last_token_time = now
result.output_tokens = token_count
elif resp.status == 200:
data = await resp.json()
result.ttft = time.time() - result.start_time
result.output_tokens = data.get('usage', {}).get(
'completion_tokens', 0
)
else:
result.error = f"HTTP {resp.status}"
except asyncio.TimeoutError:
result.error = "Timeout"
except Exception as e:
result.error = str(e)
result.total_time = time.time() - result.start_time
return result
async def run_constant_rate(self, rate_rps, duration_seconds,
request_generator):
"""Send requests at a constant rate."""
interval = 1.0 / rate_rps
num_requests = int(rate_rps * duration_seconds)
async with aiohttp.ClientSession() as session:
tasks = []
for i in range(num_requests):
config = request_generator()
task = asyncio.create_task(
self.send_request(session, config, i)
)
tasks.append(task)
await asyncio.sleep(interval)
results = await asyncio.gather(*tasks)
self.results.extend(results)
return results
async def run_ramp(self, start_rps, end_rps, step_rps,
step_duration_seconds, request_generator):
"""Ramp load from start_rps to end_rps."""
all_results = {}
current_rps = start_rps
while current_rps <= end_rps:
print(f"\n=== Testing at {current_rps} RPS ===")
results = await self.run_constant_rate(
current_rps, step_duration_seconds, request_generator
)
all_results[current_rps] = self._summarize(results)
self._print_summary(current_rps, all_results[current_rps])
current_rps += step_rps
return all_results
def _summarize(self, results):
"""Compute summary statistics from results."""
successful = [r for r in results if r.error is None]
errors = [r for r in results if r.error is not None]
if not successful:
return {'error_rate': 1.0}
ttft_values = [r.ttft for r in successful if r.ttft > 0]
total_times = [r.total_time for r in successful]
total_tokens = sum(r.output_tokens for r in successful)
duration = max(r.start_time + r.total_time for r in successful) - \
min(r.start_time for r in successful)
all_itl = []
for r in successful:
all_itl.extend(r.itl_values)
return {
'num_requests': len(results),
'num_successful': len(successful),
'error_rate': len(errors) / len(results),
'ttft_p50': np.percentile(ttft_values, 50) if ttft_values else 0,
'ttft_p95': np.percentile(ttft_values, 95) if ttft_values else 0,
'ttft_p99': np.percentile(ttft_values, 99) if ttft_values else 0,
'itl_p50': np.percentile(all_itl, 50) * 1000 if all_itl else 0, # ms
'itl_p99': np.percentile(all_itl, 99) * 1000 if all_itl else 0, # ms
'throughput_tok_s': total_tokens / duration if duration > 0 else 0,
'avg_output_tokens': np.mean([r.output_tokens for r in successful]),
}
def _print_summary(self, rps, summary):
"""Print human-readable summary."""
print(f" Requests: {summary.get('num_successful', 0)}/{summary.get('num_requests', 0)}")
print(f" Error rate: {summary.get('error_rate', 0):.1%}")
print(f" TTFT P50/P95/P99: {summary.get('ttft_p50', 0)*1000:.0f} / "
f"{summary.get('ttft_p95', 0)*1000:.0f} / "
f"{summary.get('ttft_p99', 0)*1000:.0f} ms")
print(f" ITL P50/P99: {summary.get('itl_p50', 0):.0f} / "
f"{summary.get('itl_p99', 0):.0f} ms")
print(f" Throughput: {summary.get('throughput_tok_s', 0):.0f} tok/s")
Request Generation Strategies
The quality of load testing depends on generating realistic requests. Uniform random prompts miss the distribution effects that matter in production.
import random
class RequestGenerator:
"""Generate realistic request distributions for load testing."""
# Distribution from production traffic analysis
INPUT_LENGTH_DISTRIBUTION = {
'chat': {'mean': 256, 'std': 200, 'min': 10, 'max': 4096},
'code': {'mean': 512, 'std': 300, 'min': 50, 'max': 8192},
'summarization': {'mean': 2048, 'std': 1000, 'min': 500, 'max': 16384},
}
OUTPUT_LENGTH_DISTRIBUTION = {
'chat': {'mean': 128, 'std': 100, 'min': 10, 'max': 2048},
'code': {'mean': 256, 'std': 200, 'min': 20, 'max': 4096},
'summarization': {'mean': 256, 'std': 128, 'min': 50, 'max': 1024},
}
def __init__(self, workload_mix=None, prompt_corpus=None):
"""
Args:
workload_mix: dict like {'chat': 0.6, 'code': 0.3, 'summarization': 0.1}
prompt_corpus: list of real prompts for realistic token distributions
"""
self.workload_mix = workload_mix or {'chat': 0.7, 'code': 0.2, 'summarization': 0.1}
self.prompt_corpus = prompt_corpus or self._generate_synthetic_prompts()
def generate(self):
"""Generate a single request with realistic characteristics."""
# Select workload type
workload = random.choices(
list(self.workload_mix.keys()),
weights=list(self.workload_mix.values())
)[0]
# Sample input/output lengths
input_dist = self.INPUT_LENGTH_DISTRIBUTION[workload]
output_dist = self.OUTPUT_LENGTH_DISTRIBUTION[workload]
input_len = int(np.clip(
np.random.normal(input_dist['mean'], input_dist['std']),
input_dist['min'], input_dist['max']
))
output_len = int(np.clip(
np.random.normal(output_dist['mean'], output_dist['std']),
output_dist['min'], output_dist['max']
))
# Select prompt of appropriate length
prompt = self._select_prompt(input_len)
return RequestConfig(
prompt=prompt,
max_tokens=output_len,
stream=True
)
def _select_prompt(self, target_length):
"""Select a prompt closest to target token length."""
# In practice, use real prompts tokenized to known lengths
# For synthetic testing, use repeated text blocks
words_per_token = 0.75 # Approximate
num_words = int(target_length * words_per_token)
base = "Analyze the following technical document and provide detailed insights. "
return (base * (num_words // len(base.split()) + 1))[:num_words * 5]
def _generate_synthetic_prompts(self):
"""Generate synthetic prompts for testing."""
prompts = []
for _ in range(1000):
length = random.randint(50, 4000)
prompts.append("x " * length)
return prompts
Synthetic load tests with uniform prompt lengths overestimate throughput by 15-30% compared to real traffic. Production traffic has long-tail distributions: most requests are short, but a few are very long. Those long requests consume disproportionate KV cache memory and prefill compute, causing head-of-line blocking. Always validate synthetic results against production traffic replays.
Saturation Testing
The saturation test ramps load until the system breaks. This identifies the maximum throughput and the failure mode at saturation.
async def saturation_test(harness, generator, max_rps=100):
"""Find the saturation point of the serving cluster."""
results = await harness.run_ramp(
start_rps=1,
end_rps=max_rps,
step_rps=2,
step_duration_seconds=60,
request_generator=generator.generate
)
# Find saturation point: where P99 TTFT exceeds 2x baseline
baseline_ttft = results[1]['ttft_p99'] # 1 RPS baseline
saturation_rps = None
max_throughput_rps = None
max_throughput = 0
for rps, summary in sorted(results.items()):
if summary.get('throughput_tok_s', 0) > max_throughput:
max_throughput = summary['throughput_tok_s']
max_throughput_rps = rps
if (saturation_rps is None and
summary.get('ttft_p99', 0) > baseline_ttft * 2):
saturation_rps = rps
print(f"\n=== Saturation Analysis ===")
print(f"Baseline TTFT P99 (1 RPS): {baseline_ttft*1000:.0f} ms")
print(f"Saturation point: {saturation_rps} RPS")
print(f"Max throughput: {max_throughput:.0f} tok/s at {max_throughput_rps} RPS")
print(f"Recommended operating point: {saturation_rps * 0.7:.0f} RPS (70% of saturation)")
return results
Typical Latency-Throughput Curve (Dynamo, 4x H100, Llama 70B INT4)
(TTFT P99 (ms))The latency-throughput curve has a characteristic “hockey stick” shape. Latency is nearly flat at low load (the system has spare capacity), then rises gradually, then explodes at saturation. The recommended operating point is 60-70% of the saturation RPS, giving headroom for traffic spikes.
SLA Validation Test
async def sla_validation_test(harness, generator, sla_config, target_rps,
duration_minutes=30):
"""Validate that SLAs are met at target load for sustained period."""
sla = sla_config # {'ttft_p99_ms': 1000, 'itl_p99_ms': 100, 'error_rate': 0.001}
results = await harness.run_constant_rate(
rate_rps=target_rps,
duration_seconds=duration_minutes * 60,
request_generator=generator.generate
)
summary = harness._summarize(results)
# Check each SLA
checks = {
'TTFT P99': {
'actual': summary['ttft_p99'] * 1000,
'limit': sla['ttft_p99_ms'],
'passed': summary['ttft_p99'] * 1000 <= sla['ttft_p99_ms']
},
'ITL P99': {
'actual': summary['itl_p99'],
'limit': sla['itl_p99_ms'],
'passed': summary['itl_p99'] <= sla['itl_p99_ms']
},
'Error Rate': {
'actual': summary['error_rate'],
'limit': sla['error_rate'],
'passed': summary['error_rate'] <= sla['error_rate']
}
}
print(f"\n=== SLA Validation at {target_rps} RPS for {duration_minutes} min ===")
all_passed = True
for name, check in checks.items():
status = "PASS" if check['passed'] else "FAIL"
print(f" {name}: {check['actual']:.1f} (limit: {check['limit']}) [{status}]")
if not check['passed']:
all_passed = False
overall = "ALL SLAs MET" if all_passed else "SLA VIOLATION"
print(f"\nResult: {overall}")
return all_passed, checks
SLA Validation Results (Dynamo, 4x H100, Llama 70B AWQ INT4, 30-min test)
| Metric | SLA Limit | @ 20 RPS | @ 30 RPS | @ 40 RPS |
|---|---|---|---|---|
| TTFT P99 | 1000 ms | 210 ms (PASS) | 380 ms (PASS) | 890 ms (PASS) |
| ITL P99 | 100 ms | 28 ms (PASS) | 42 ms (PASS) | 95 ms (PASS) |
| Error rate | 0.1% | 0.0% (PASS) | 0.02% (PASS) | 0.8% (FAIL) |
| Throughput | N/A | 2,400 tok/s | 3,200 tok/s | 3,600 tok/s |
| Overall | All pass | PASS | PASS | FAIL (error rate) |
Bottleneck Identification
During load testing, instrument the system to identify which component saturates first.
class BottleneckAnalyzer:
"""Identify serving bottlenecks during load testing."""
def __init__(self, prometheus_url):
self.prom_url = prometheus_url
def identify_bottleneck(self, test_start, test_end):
"""Query metrics to identify the primary bottleneck."""
bottlenecks = {}
# GPU compute utilization
gpu_util = self._query_avg(
'avg(DCGM_FI_DEV_GPU_UTIL)',
test_start, test_end
)
bottlenecks['gpu_compute'] = gpu_util
# GPU memory utilization
gpu_mem = self._query_avg(
'avg(DCGM_FI_DEV_MEM_COPY_UTIL)',
test_start, test_end
)
bottlenecks['gpu_memory_bw'] = gpu_mem
# KV cache utilization
kv_util = self._query_avg(
'vllm_gpu_cache_usage_perc',
test_start, test_end
)
bottlenecks['kv_cache'] = kv_util
# Queue depth
queue_depth = self._query_avg(
'vllm_num_requests_waiting',
test_start, test_end
)
bottlenecks['queue_depth'] = queue_depth
# NVLink utilization (for TP)
nvlink_util = self._query_avg(
'avg(DCGM_FI_DEV_NVLINK_BANDWIDTH_TOTAL)',
test_start, test_end
)
bottlenecks['nvlink'] = nvlink_util
# Identify primary bottleneck
primary = max(bottlenecks, key=bottlenecks.get)
print(f"\n=== Bottleneck Analysis ===")
for name, value in sorted(bottlenecks.items(),
key=lambda x: x[1], reverse=True):
indicator = " <-- PRIMARY" if name == primary else ""
print(f" {name}: {value:.1f}%{indicator}")
return primary, bottlenecks
def _query_avg(self, query, start, end):
"""Query Prometheus for average value over time range."""
import requests
resp = requests.get(f"{self.prom_url}/api/v1/query_range", params={
'query': query,
'start': start,
'end': end,
'step': '15s'
})
data = resp.json()
if data['status'] == 'success' and data['data']['result']:
values = [float(v[1]) for v in data['data']['result'][0]['values']]
return np.mean(values)
return 0.0
Bottleneck Profile at Saturation (4x H100 TP, Llama 70B, 40 RPS)
(% utilization)In autoregressive LLM serving with continuous batching, the KV cache fills up before GPU compute saturates. When the KV cache is full, new requests must wait in a queue until existing requests complete and free KV blocks. This is why increasing KV cache efficiency (PagedAttention, KV cache quantization) has more impact on serving capacity than faster kernels.
Failure Injection Testing
Test how the system behaves when components fail under load.
class ChaosTest:
"""Inject failures during load testing to validate resilience."""
def __init__(self, harness, ssh_hosts):
self.harness = harness
self.ssh_hosts = ssh_hosts # List of node hostnames
async def test_gpu_failure(self, target_rps, generator):
"""Simulate GPU failure during serving."""
print("=== Chaos Test: GPU Failure ===")
# Start load test
load_task = asyncio.create_task(
self.harness.run_constant_rate(
target_rps, 120, generator.generate
)
)
# Wait 30 seconds for steady state
await asyncio.sleep(30)
# Kill one GPU's serving process
print("Injecting GPU failure on node 0, GPU 0...")
self._inject_gpu_failure(self.ssh_hosts[0], gpu_id=0)
# Let the system respond for 60 more seconds
await asyncio.sleep(60)
# Restore GPU
print("Restoring GPU...")
self._restore_gpu(self.ssh_hosts[0], gpu_id=0)
results = await load_task
# Analyze impact window
failure_time = 30 # seconds into test
recovery_time = 90
pre_failure = [r for r in results if r.start_time < failure_time]
during_failure = [r for r in results
if failure_time <= r.start_time < recovery_time]
post_recovery = [r for r in results if r.start_time >= recovery_time]
for phase, reqs in [("Pre-failure", pre_failure),
("During failure", during_failure),
("Post-recovery", post_recovery)]:
errors = sum(1 for r in reqs if r.error)
ttft = np.median([r.ttft for r in reqs if r.ttft > 0]) if reqs else 0
print(f" {phase}: {len(reqs)} requests, "
f"{errors} errors, TTFT P50: {ttft*1000:.0f} ms")
def _inject_gpu_failure(self, host, gpu_id):
"""Simulate GPU failure by setting compute mode to prohibited."""
subprocess.run(
['ssh', host, f'nvidia-smi -i {gpu_id} -c EXCLUSIVE_PROCESS'],
timeout=10
)
def _restore_gpu(self, host, gpu_id):
"""Restore GPU to default compute mode."""
subprocess.run(
['ssh', host, f'nvidia-smi -i {gpu_id} -c DEFAULT'],
timeout=10
)
Failure Injection Test Results (4x H100, 30 RPS)
| Failure Type | Detection Time | Recovery Time | Error Rate During | Throughput Impact | TTFT Impact |
|---|---|---|---|---|---|
| 1 GPU failure (of 4) | 2 seconds | 15 seconds | 3.2% | -25% | +45% |
| 2 GPU failure (of 4) | 2 seconds | 20 seconds | 12.5% | -50% | +180% |
| Network partition (1 node) | 5 seconds | 30 seconds | 8.1% | -50% | +120% |
| NVLink degradation | 10 seconds | N/A (persists) | 0.1% | -15% | +22% |
| Load spike (2x normal) | Immediate | Auto (queue drain) | 1.5% | Queue grows | +300% |
Cost Analysis
def compute_serving_cost(
num_gpus: int,
gpu_cost_per_hour: float,
throughput_tok_s: float,
utilization_pct: float = 70.0
):
"""Compute cost per million tokens served."""
# Adjust throughput for target utilization
# (load tests find max; production runs at 70% of max)
production_throughput = throughput_tok_s * (utilization_pct / 100)
# Tokens per hour
tokens_per_hour = production_throughput * 3600
# Cost per hour
total_cost_per_hour = num_gpus * gpu_cost_per_hour
# Cost per million tokens
cost_per_million = (total_cost_per_hour / tokens_per_hour) * 1_000_000
return {
'production_throughput_tok_s': production_throughput,
'tokens_per_hour': tokens_per_hour,
'cost_per_hour': total_cost_per_hour,
'cost_per_million_tokens': cost_per_million,
'cost_per_1k_tokens': cost_per_million / 1000,
}
# Cost analysis for different configurations
configs = [
("4x H100 FP16 (TP=4)", 4, 3.50, 3200),
("2x H100 FP8 (TP=2)", 2, 3.50, 3000),
("1x H100 INT4 (AWQ)", 1, 3.50, 2800),
("1x A100 INT4 (AWQ)", 1, 2.00, 1600),
]
for name, gpus, cost_hr, throughput in configs:
result = compute_serving_cost(gpus, cost_hr, throughput)
print(f"{name}: ${result['cost_per_million_tokens']:.2f}/M tokens, "
f"{result['production_throughput_tok_s']:.0f} tok/s sustained")
Cost per Million Output Tokens (Llama 70B, at 70% utilization)
(USD per million tokens)Automated Benchmarking Pipeline
#!/bin/bash
# automated_benchmark.sh - Run complete benchmark suite
set -euo pipefail
ENDPOINT="http://dynamo-gateway:8000"
MODEL="Llama-2-70B-AWQ"
OUTPUT_DIR="./benchmark_results/$(date +%Y%m%d_%H%M%S)"
mkdir -p "$OUTPUT_DIR"
echo "=== Phase 1: Warmup ==="
python load_test.py \
--endpoint "$ENDPOINT" \
--model "$MODEL" \
--rps 5 \
--duration 60 \
--output "$OUTPUT_DIR/warmup.json"
echo "=== Phase 2: Baseline (single request) ==="
python load_test.py \
--endpoint "$ENDPOINT" \
--model "$MODEL" \
--rps 1 \
--duration 120 \
--output "$OUTPUT_DIR/baseline.json"
echo "=== Phase 3: Ramp test ==="
python load_test.py \
--endpoint "$ENDPOINT" \
--model "$MODEL" \
--ramp-start 5 \
--ramp-end 100 \
--ramp-step 5 \
--step-duration 60 \
--output "$OUTPUT_DIR/ramp.json"
echo "=== Phase 4: SLA validation ==="
# Use 70% of saturation point from ramp test
SATURATION_RPS=$(python -c "
import json
data = json.load(open('$OUTPUT_DIR/ramp.json'))
# Find saturation: first RPS where error_rate > 0.5%
for rps, summary in sorted(data.items(), key=lambda x: float(x[0])):
if summary.get('error_rate', 0) > 0.005:
print(int(float(rps) * 0.7))
break
")
python load_test.py \
--endpoint "$ENDPOINT" \
--model "$MODEL" \
--rps "$SATURATION_RPS" \
--duration 1800 \
--sla-ttft-p99-ms 1000 \
--sla-itl-p99-ms 100 \
--sla-error-rate 0.001 \
--output "$OUTPUT_DIR/sla_validation.json"
echo "=== Phase 5: Generate report ==="
python generate_report.py \
--input-dir "$OUTPUT_DIR" \
--output "$OUTPUT_DIR/report.html"
echo "Results saved to $OUTPUT_DIR"
Benchmark Suite Execution Time
| Phase | Duration | Purpose | Key Output |
|---|---|---|---|
| Warmup | 1 min | Load model, warm caches | None (discard results) |
| Baseline | 2 min | Single-request latency | Min achievable TTFT/ITL |
| Ramp test | 20 min | Find saturation point | Latency-throughput curve |
| SLA validation | 30 min | Sustained load test | Pass/fail per SLA metric |
| Report generation | 1 min | Aggregate results | HTML report with charts |
| Total | ~54 min | Complete benchmark | Full performance profile |
Summary
Load testing a Dynamo cluster requires four test types: baseline measurement (single-request latency), ramp testing (find the saturation point), SLA validation (sustained load at target utilization), and failure injection (verify resilience). The critical finding in most deployments is that KV cache memory — not GPU compute — is the first bottleneck, and the saturation point is typically 30-40% lower than theoretical maximum throughput due to queuing effects and traffic variability. Operating at 60-70% of the measured saturation point provides sufficient headroom for traffic spikes while maintaining SLA compliance. Automate the benchmark suite and run it before every deployment, configuration change, or model swap.