Part of Series Inference Optimization Timeline 33 of 60
1 Transformer Fundamentals for Systems Engineers: The 10-Minute Bridge from Architecture to Inference 2 LLM Inference Fundamentals: Prefill, Decode, and the Memory-Compute Divide 3 KV Cache: The Hidden Memory Giant in LLM Serving 4 Quantization for LLM Inference: From FP16 to INT4 — A Deep Dive into Precision, Performance, and Production Deployment 5 FlashAttention: Why Tiling Attention Through the Memory Hierarchy Changes Everything 6 PagedAttention: How vLLM Borrowed OS Virtual Memory to Fix LLM Serving 7 Continuous Batching: The Complete Guide to LLM Inference Scheduling 8 Speculative Decoding: Why Autoregressive LLMs Leave 99% of Your GPU Idle and How to Fix It 9 Prefix Caching: RadixAttention, Cache Hierarchies, and Reusing Computation Across Requests 10 LoRA and QLoRA for Serving: Multi-Adapter Inference, S-LoRA, and When to Merge 11 Disaggregated Prefill-Decode: Why Splitting LLM Inference Changes Everything 12 Constrained Generation: FSM-Based Decoding, Outlines, and Grammar-Guided LLM Output 13 Mamba and State Space Models: The O(n) Alternative to Attention 14 Inference-Time Compute Scaling: When More Thinking Helps (o1, DeepSeek-R1, and the Reasoning Frontier) 15 CPU and Edge Inference: llama.cpp Internals, GGUF Format, and When CPU Actually Wins 16 Inference Cost Economics: Tokens per Dollar, GPU-Hours, and the Real Math of LLM Serving 17 Model Loading and Cold Start: safetensors, mmap, and Startup Optimization 18 Batched GEMM: Why Matrix Multiply Throughput Determines Everything in LLM Inference 19 Kernel Autotuning: How TensorRT and torch.compile Find Optimal CUDA Kernels 20 Attention Kernel Comparison: FlashAttention vs FlashInfer vs xformers vs Triton 21 Token Generation Pipeline: Logit Processing, Sampling Strategies, and Stop Criteria 22 Dynamic Batching: Orca, Sarathi, and Iteration-Level Scheduling Algorithms 23 Memory Pool Management: Slab Allocators for GPU Inference 24 Prefill vs Decode Optimization: Different Bottlenecks, Different Solutions 25 Decode Optimization: CUDA Graphs, Persistent Batches, and Speculative Verification 26 Multi-Model Serving: GPU Sharing, Model Switching, and Adapter Pool Management 27 Structured Output Acceleration: Compressed FSMs, Speculative JSON, and Grammar Caching 28 Vision-Language Model Serving: ViT Encoding, Cross-Attention, and KV Cache Paging for Multimodal 29 Long-Context Serving: Ring Attention, KV Offloading, and Chunked Processing in Production 30 Inference Profiling: Nsight Systems, torch.profiler, and Finding Where Time Actually Goes 31 FP8 Inference: E4M3 Format, Per-Tensor Scaling, and the Hardware Support Matrix 32 Speculative Decoding v2: Medusa, EAGLE, Lookahead, and Token Tree Verification 33 Disaggregated Serving v2: Mooncake KV-Centric Architecture and LoongServe Elastic SP 34 Request Preemption and Priority Scheduling in Production LLM Serving 35 Autoscaling LLM Inference: Signals, Lag, Warm Pools, and Cost-Optimal Scaling 36 The Inference Stack in 2026: From HTTP Request to GPU Kernel and Back 37 Video and Audio LLM Serving: Temporal Encoding, Chunked Streaming, and Latency Budgets 38 KV Cache Compression and Eviction: H2O, Attention Sinks, Sliding Window, and Quantized KV 39 Distributed Inference: Tensor Parallelism vs Pipeline Parallelism for Serving 40 Serving Benchmark Methodology: How to Properly Measure LLM Inference Performance 41 Compute-Communication Overlap: Hiding Distributed Training Latency 42 DeepSpeed ZeRO: Memory Optimization for Distributed Training at Scale 43 Pipeline Parallelism: From GPipe to DualPipe -- Eliminating the Bubble 44 Gradient Compression for Distributed Training: Promise, Reality, and Where It Still Wins 45 The Definitive Guide to Distributed Parallelism: Data, Tensor, Pipeline, Expert, and Sequence Parallelism for Large-Scale Training 46 Decoding Performance: Beam Search vs Sampling — Latency, Throughput, Memory, and the Full Design Space 47 LLM Prefill Phase Optimization: Why Prompt Processing Is Compute-Bound and How to Fix It 48 LLM Serving Engines: vLLM vs SGLang vs TensorRT-LLM — A Systems Comparison 49 Request Routing for LLM Inference: From Naive Load Balancing to KV Cache-Aware Scheduling 50 Why Adam Is Expensive and What To Do About It: 8-bit Adam, Adafactor, CAME, and the Memory Math of Optimizers 51 How Large Models Actually Get Loaded: Safetensors, mmap, Tensor Parallelism, and Progressive Loading 52 Mixed Precision Training: The Complete Precision Landscape from FP32 to FP4 53 Model Compression: Pruning, Distillation, and Why Quantization Won 54 From NAS to Scaling Laws: How We Design LLM Architectures Now 55 NVIDIA NCCL Performance Tuning for Multi-GPU Training 56 ONNX Runtime in Practice: Graph Optimization, Execution Providers, Quantization, and When ORT Is the Right Choice 57 Optimizing GEMM for Neural Networks: BLAS vs Custom Kernels (Nov 2019) 58 Long Context: From Sparse Attention to Ring Attention 59 TensorRT-LLM: Graph Optimization for Maximum Inference Performance 60 Long Context LLMs: From 2K to 1M Tokens

An LLM inference server running at capacity has every KV cache block allocated. A batch of 64 requests is mid-generation. GPU memory utilization is 98%. Then a premium-tier request arrives from your highest-paying customer. The system has two choices: reject the new request (violating the premium SLO) or evict something currently running (wasting the compute already spent on it). Neither choice is free.

This is the preemption problem. Every production LLM serving system that handles mixed-priority traffic must solve it. vLLM implements swap-based preemption. TensorRT-LLM supports priority queues. Orca introduced iteration-level scheduling that makes preemption practical. But the literature treats preemption as a binary decision. Production systems need a full priority framework: multiple tiers, SLO-awareness, fair-share guarantees, and graceful degradation under overload.

This post covers the complete design space: preemption strategies and their cost models, priority tier architecture, SLO-driven scheduling, starvation prevention, and a production-grade implementation of a priority scheduler with preemption support.

The Preemption Decision

1.1 When Preemption Is Necessary

Preemption is triggered by resource pressure. The primary resource is GPU memory (KV cache blocks), but GPU compute (batch slot limits) and output queue depth also matter.

class ResourceMonitor:
    """
    Monitors GPU resources and determines when preemption is needed.
    """

    def __init__(self, total_kv_blocks, max_batch_size,
                 memory_threshold=0.95, batch_threshold=0.90):
        self.total_kv_blocks = total_kv_blocks
        self.max_batch_size = max_batch_size
        self.memory_threshold = memory_threshold
        self.batch_threshold = batch_threshold
        self.allocated_blocks = 0
        self.active_requests = 0

    def check_pressure(self):
        """
        Returns pressure level: none, warning, critical.
        """
        memory_util = self.allocated_blocks / self.total_kv_blocks
        batch_util = self.active_requests / self.max_batch_size

        if memory_util > self.memory_threshold:
            return PressureLevel.CRITICAL
        if memory_util > self.memory_threshold - 0.1:
            return PressureLevel.WARNING
        if batch_util > self.batch_threshold:
            return PressureLevel.WARNING
        return PressureLevel.NONE

    def blocks_needed(self, request):
        """Estimate KV cache blocks needed for a request."""
        # Each block holds block_size tokens of KV cache
        block_size = 16  # tokens per block
        estimated_total_tokens = (
            request.prompt_length + request.max_output_tokens
        )
        return (estimated_total_tokens + block_size - 1) // block_size

    def blocks_available(self):
        return self.total_kv_blocks - self.allocated_blocks

    def can_admit(self, request):
        """Check if request can be admitted without preemption."""
        needed = self.blocks_needed(request)
        return needed <= self.blocks_available()

1.2 The Three Preemption Strategies

When a running request must be preempted, there are three options, each with a distinct cost profile.

Strategy 1: Swap KV cache to CPU memory. Copy the preempted request’s KV cache from GPU to CPU over PCIe. When the request resumes, copy it back.

Strategy 2: Recompute from scratch. Discard the preempted request’s KV cache entirely. When it resumes, re-run the prefill phase from the original prompt plus any tokens generated so far.

Strategy 3: Drop the request. Kill the preempted request. Return an error to the client. The client must retry.

import time
import enum

class PreemptionStrategy(enum.Enum):
    SWAP = "swap"
    RECOMPUTE = "recompute"
    DROP = "drop"

class PreemptionCostModel:
    """
    Models the cost of each preemption strategy for a given request.
    Costs are in milliseconds of added latency.
    """

    def __init__(self, pcie_bandwidth_gbps=32.0,
                 prefill_tokens_per_sec=10000,
                 bytes_per_kv_token=512):
        # PCIe 4.0 x16: ~25 GB/s practical, PCIe 5.0: ~50 GB/s
        self.pcie_bw = pcie_bandwidth_gbps * 1e9 / 8  # bytes/sec
        self.prefill_tps = prefill_tokens_per_sec
        self.bytes_per_kv = bytes_per_kv_token  # per token per layer

    def swap_cost_ms(self, request, num_layers=80):
        """
        Cost of swapping KV cache to CPU and back.
        Total transfer = 2x (out + in) the KV cache size.
        """
        kv_size_bytes = (
            request.current_length * num_layers * self.bytes_per_kv
        )
        transfer_time = kv_size_bytes / self.pcie_bw
        # Round trip: swap out + swap in
        return transfer_time * 2 * 1000  # ms

    def recompute_cost_ms(self, request):
        """
        Cost of recomputing KV cache from scratch.
        Must re-run prefill on prompt + generated tokens.
        """
        total_tokens = request.prompt_length + request.generated_length
        prefill_time = total_tokens / self.prefill_tps
        return prefill_time * 1000  # ms

    def drop_cost_ms(self, request):
        """
        Cost of dropping: wasted compute for tokens already generated,
        plus client-perceived retry latency.
        """
        # Wasted compute
        wasted_decode_ms = request.generated_length * request.avg_ms_per_token
        # Client will need to re-submit, wait for queue, regenerate
        estimated_retry_ms = request.estimated_total_time_ms
        return wasted_decode_ms + estimated_retry_ms

    def optimal_strategy(self, request, num_layers=80):
        """
        Choose the cheapest preemption strategy.
        """
        swap = self.swap_cost_ms(request, num_layers)
        recompute = self.recompute_cost_ms(request)
        drop = self.drop_cost_ms(request)

        costs = {
            PreemptionStrategy.SWAP: swap,
            PreemptionStrategy.RECOMPUTE: recompute,
            PreemptionStrategy.DROP: drop,
        }
        return min(costs, key=costs.get), costs
📊

Preemption Strategy Costs (Llama 3 70B, 80 layers, A100)

Request StateSwap CostRecompute CostDrop CostOptimal
512 prompt, 10 generated 52ms 52ms 1200ms Swap or Recompute
512 prompt, 100 generated 62ms 61ms 1800ms Swap or Recompute
2048 prompt, 50 generated 214ms 210ms 2400ms Swap or Recompute
8192 prompt, 200 generated 856ms 840ms 5200ms Either (both expensive)
512 prompt, 500 generated 103ms 101ms 4500ms Swap or Recompute
32K prompt, 100 generated 3280ms 3210ms 8000ms Recompute (slightly)
Note: PCIe 4.0 x16 bandwidth. Bytes per KV token = 512 (FP16, GQA with 8 KV heads, head_dim=128). Prefill rate: 10K tokens/sec.
ℹ️ Swap vs Recompute Crossover

Swap cost scales with sequence length times number of layers (data volume over PCIe). Recompute cost scales with sequence length times model FLOPS (prefill computation). For current hardware (PCIe 4.0, A100 prefill rates), the crossover point is approximately 16K tokens: below 16K, swap and recompute costs are similar; above 16K, both become expensive and the system should avoid preemption entirely by reserving capacity for premium requests.

Priority Tier Architecture

2.1 Priority Levels

Production systems need at least three priority levels. More granularity adds complexity without proportional benefit.

import enum
import dataclasses
import time as time_module

class Priority(enum.IntEnum):
    """
    Request priority levels. Lower number = higher priority.
    """
    PREMIUM = 0      # Never preempted. Reserved capacity guaranteed.
    STANDARD = 1     # Preempted under memory pressure.
    BACKGROUND = 2   # Preempted freely. No latency SLO.
    SYSTEM = -1      # Internal requests (health checks, warmup).

@dataclasses.dataclass
class InferenceRequest:
    """A single inference request with priority metadata."""
    request_id: str
    priority: Priority
    prompt_tokens: list
    prompt_length: int
    max_output_tokens: int
    arrival_time: float
    slo_ttft_ms: float          # SLO for time-to-first-token
    slo_tpot_ms: float          # SLO for time-per-output-token
    slo_e2e_ms: float           # SLO for end-to-end latency

    # Mutable state during processing
    generated_tokens: list = dataclasses.field(default_factory=list)
    kv_cache_blocks: list = dataclasses.field(default_factory=list)
    prefill_complete: bool = False
    preempted: bool = False
    preemption_count: int = 0
    first_token_time: float = 0.0
    state: str = "queued"  # queued, prefilling, decoding, preempted, done

    @property
    def generated_length(self):
        return len(self.generated_tokens)

    @property
    def current_length(self):
        return self.prompt_length + self.generated_length

    @property
    def wait_time_ms(self):
        return (time_module.time() - self.arrival_time) * 1000

    @property
    def ttft_ms(self):
        if self.first_token_time > 0:
            return (self.first_token_time - self.arrival_time) * 1000
        return None

    @property
    def avg_ms_per_token(self):
        if self.generated_length > 0 and self.first_token_time > 0:
            elapsed = time_module.time() - self.first_token_time
            return (elapsed * 1000) / self.generated_length
        return 15.0  # default estimate

    @property
    def estimated_total_time_ms(self):
        return (
            self.slo_ttft_ms +
            self.max_output_tokens * self.slo_tpot_ms
        )

2.2 Priority-Based Preemption Selection

When preemption is needed, the victim selection algorithm must balance several factors: priority level, progress (prefer preempting requests with less progress), and preemption history (avoid repeatedly preempting the same request).

class VictimSelector:
    """
    Selects which running request to preempt.
    Policy: lowest priority first, then least progress,
    then fewest prior preemptions.
    """

    def __init__(self, cost_model):
        self.cost_model = cost_model

    def select_victim(self, running_requests, incoming_request):
        """
        Select the best victim for preemption.
        Returns (victim_request, strategy) or None if no valid victim.
        """
        candidates = [
            r for r in running_requests
            if r.priority.value > incoming_request.priority.value
        ]

        if not candidates:
            # Cannot preempt: no lower-priority requests running
            return None, None

        # Score candidates: lower score = better victim
        scored = []
        for req in candidates:
            score = self._victim_score(req)
            strategy, costs = self.cost_model.optimal_strategy(req)
            scored.append((score, req, strategy, costs[strategy]))

        scored.sort(key=lambda x: x[0])
        best_score, victim, strategy, cost = scored[0]

        return victim, strategy

    def _victim_score(self, request):
        """
        Lower score = more attractive preemption target.
        Components:
        - Priority: higher priority value = lower priority = better victim
        - Progress: less progress = less wasted compute
        - Preemption count: more preemptions = less attractive
        """
        priority_score = -request.priority.value * 1000
        progress_score = request.generated_length
        preemption_penalty = request.preemption_count * 500

        return priority_score + progress_score + preemption_penalty

    def select_multiple_victims(self, running_requests,
                                 blocks_needed, block_size=16):
        """
        Select multiple victims to free enough KV cache blocks.
        Greedy: pick best victim, check if enough, repeat.
        """
        victims = []
        freed_blocks = 0
        remaining = list(running_requests)

        while freed_blocks < blocks_needed and remaining:
            # Find lowest-priority request with least progress
            candidates = sorted(
                remaining,
                key=lambda r: (-r.priority.value,
                               r.generated_length,
                               r.preemption_count)
            )

            if not candidates:
                break

            victim = candidates[0]
            remaining.remove(victim)

            strategy, _ = self.cost_model.optimal_strategy(victim)
            victim_blocks = len(victim.kv_cache_blocks)
            freed_blocks += victim_blocks
            victims.append((victim, strategy))

        if freed_blocks >= blocks_needed:
            return victims
        return None  # Cannot free enough blocks

SLO-Aware Scheduling

3.1 SLO Definitions

Production LLM serving has three key SLOs:

  • TTFT (Time to First Token): latency from request arrival to first output token. Drives perceived responsiveness. Typical targets: 200ms (premium), 500ms (standard), none (background).
  • TPOT (Time Per Output Token): inter-token latency during generation. Drives streaming smoothness. Typical targets: 30ms (premium), 80ms (standard), none (background).
  • E2E (End-to-End): total time from arrival to completion. Typical targets: 5s (premium), 15s (standard), 60s (background).

3.2 SLO Monitor and Load Shedding

import collections

class SLOMonitor:
    """
    Tracks SLO compliance and triggers load shedding
    when SLO targets are at risk.
    """

    def __init__(self, window_size=100):
        self.window_size = window_size
        # Per-priority tracking
        self.ttft_history = {
            Priority.PREMIUM: collections.deque(maxlen=window_size),
            Priority.STANDARD: collections.deque(maxlen=window_size),
            Priority.BACKGROUND: collections.deque(maxlen=window_size),
        }
        self.tpot_history = {
            Priority.PREMIUM: collections.deque(maxlen=window_size),
            Priority.STANDARD: collections.deque(maxlen=window_size),
            Priority.BACKGROUND: collections.deque(maxlen=window_size),
        }

    def record_ttft(self, priority, ttft_ms):
        self.ttft_history[priority].append(ttft_ms)

    def record_tpot(self, priority, tpot_ms):
        self.tpot_history[priority].append(tpot_ms)

    def get_percentile(self, data, percentile):
        """Compute percentile from a deque of values."""
        if not data:
            return 0.0
        sorted_data = sorted(data)
        idx = int(len(sorted_data) * percentile / 100)
        idx = min(idx, len(sorted_data) - 1)
        return sorted_data[idx]

    def check_slo_compliance(self, priority, slo_ttft_ms, slo_tpot_ms):
        """
        Check if p99 metrics are within SLO for the given priority.
        Returns (ttft_ok, tpot_ok, ttft_p99, tpot_p99).
        """
        ttft_p99 = self.get_percentile(
            self.ttft_history[priority], 99
        )
        tpot_p99 = self.get_percentile(
            self.tpot_history[priority], 99
        )

        ttft_ok = ttft_p99 <= slo_ttft_ms if slo_ttft_ms > 0 else True
        tpot_ok = tpot_p99 <= slo_tpot_ms if slo_tpot_ms > 0 else True

        return ttft_ok, tpot_ok, ttft_p99, tpot_p99

    def should_shed_load(self, slo_targets):
        """
        Determine if load shedding is needed to protect higher-priority SLOs.
        Returns list of priority levels to shed (lowest first).
        """
        shed_levels = []

        # Check premium SLO first
        premium_target = slo_targets.get(Priority.PREMIUM, {})
        if premium_target:
            ttft_ok, tpot_ok, _, _ = self.check_slo_compliance(
                Priority.PREMIUM,
                premium_target.get('ttft_ms', 0),
                premium_target.get('tpot_ms', 0)
            )
            if not ttft_ok or not tpot_ok:
                # Premium SLO at risk: shed background immediately
                shed_levels.append(Priority.BACKGROUND)
                # If still violated, shed standard too
                shed_levels.append(Priority.STANDARD)
                return shed_levels

        # Check standard SLO
        standard_target = slo_targets.get(Priority.STANDARD, {})
        if standard_target:
            ttft_ok, tpot_ok, _, _ = self.check_slo_compliance(
                Priority.STANDARD,
                standard_target.get('ttft_ms', 0),
                standard_target.get('tpot_ms', 0)
            )
            if not ttft_ok or not tpot_ok:
                shed_levels.append(Priority.BACKGROUND)
                return shed_levels

        return shed_levels

    def slo_headroom(self, priority, slo_ttft_ms, slo_tpot_ms):
        """
        How much headroom exists before SLO violation.
        Returns fraction: 1.0 = fully compliant, 0.0 = at limit.
        """
        _, _, ttft_p99, tpot_p99 = self.check_slo_compliance(
            priority, slo_ttft_ms, slo_tpot_ms
        )
        ttft_headroom = 1.0 - (ttft_p99 / slo_ttft_ms) if slo_ttft_ms > 0 else 1.0
        tpot_headroom = 1.0 - (tpot_p99 / slo_tpot_ms) if slo_tpot_ms > 0 else 1.0
        return min(ttft_headroom, tpot_headroom)

SLO Compliance Under Increasing Load

(% requests meeting SLO)
Premium (50% load)
99.9 % requests meeting SLO
Standard (50% load)
99.5 % requests meeting SLO
Premium (80% load)
99.8 % requests meeting SLO
Standard (80% load)
97.2 % requests meeting SLO
Premium (100% load, no shed) SLO violated
95.1 % requests meeting SLO
Premium (100% load, with shed) Background shed
99.6 % requests meeting SLO
Standard (100% load, with shed) Degraded
93.8 % requests meeting SLO
Background (100% load, with shed) Heavily shed
45.2 % requests meeting SLO

Fair-Share and Starvation Prevention

4.1 The Starvation Problem

Strict priority scheduling causes starvation: if premium and standard requests keep arriving, background requests never execute. In production, background requests often include internal analytics, offline processing, and batch jobs that must eventually complete.

4.2 Age-Based Priority Boosting

The solution is aging: requests that have waited too long get their effective priority boosted. This guarantees eventual service for all priority levels while still providing strong preference to higher-priority requests under normal load.

class FairShareScheduler:
    """
    Priority scheduler with aging-based starvation prevention.

    Effective priority = base_priority - age_boost.
    Age boost increases linearly with wait time, capped at
    max_boost to prevent background requests from preempting premium.
    """

    def __init__(self, age_boost_rate_per_sec=0.1, max_boost=1.5,
                 fair_share_ratios=None):
        self.age_boost_rate = age_boost_rate_per_sec
        self.max_boost = max_boost
        # Target share of GPU time per priority
        self.fair_share = fair_share_ratios or {
            Priority.PREMIUM: 0.50,    # 50% of capacity guaranteed
            Priority.STANDARD: 0.35,   # 35%
            Priority.BACKGROUND: 0.15, # 15%
        }
        self.gpu_time_used = {p: 0.0 for p in Priority if p.value >= 0}

    def effective_priority(self, request):
        """
        Compute effective priority with age boost.
        Lower value = higher effective priority.
        """
        base = float(request.priority.value)
        wait_sec = request.wait_time_ms / 1000.0
        age_boost = min(
            wait_sec * self.age_boost_rate,
            self.max_boost
        )

        # Fair share deficit boost: if a priority class is under its
        # fair share, boost all requests in that class
        deficit_boost = self._fair_share_deficit(request.priority)

        return base - age_boost - deficit_boost

    def _fair_share_deficit(self, priority):
        """
        Compute boost based on fair share deficit.
        If a priority class has used less than its fair share,
        boost it proportionally.
        """
        total_time = sum(self.gpu_time_used.values())
        if total_time == 0:
            return 0.0

        actual_share = self.gpu_time_used.get(priority, 0) / total_time
        target_share = self.fair_share.get(priority, 0)

        if actual_share < target_share:
            deficit = target_share - actual_share
            return deficit * 2.0  # scale factor
        return 0.0

    def record_gpu_time(self, priority, time_ms):
        """Record GPU time used by a priority class."""
        if priority in self.gpu_time_used:
            self.gpu_time_used[priority] += time_ms

    def sort_queue(self, queue):
        """Sort request queue by effective priority."""
        return sorted(queue, key=lambda r: self.effective_priority(r))

    def admission_control(self, request, current_load):
        """
        Decide whether to admit a request.
        Under overload, reject based on priority and fair share.
        """
        if request.priority == Priority.PREMIUM:
            return True  # Always admit premium

        if current_load < 0.8:
            return True  # Not overloaded, admit all

        # Under pressure: check fair share
        actual_share = self._actual_share(request.priority)
        target = self.fair_share.get(request.priority, 0)

        if actual_share < target:
            return True  # Under fair share, admit

        # Over fair share and overloaded: probabilistic rejection
        excess = actual_share - target
        reject_prob = min(excess * 5.0, 0.8)
        import random
        return random.random() > reject_prob

    def _actual_share(self, priority):
        total = sum(self.gpu_time_used.values())
        if total == 0:
            return 0.0
        return self.gpu_time_used.get(priority, 0) / total
⚠️ Aging Rate Tuning

The age_boost_rate parameter controls how quickly low-priority requests are promoted. Too fast (greater than 0.5/sec): background requests get promoted within 2 seconds, effectively defeating priority scheduling. Too slow (less than 0.01/sec): background requests wait minutes under load. A rate of 0.05-0.1/sec provides a good balance: a background request (priority 2) gets boosted to effective priority 0.5 (above standard, below premium) after 15-30 seconds of waiting.

Preemption Execution

5.1 Swap-Based Preemption

The swap path copies KV cache blocks from GPU to CPU memory over PCIe. This is a bulk memory transfer that must not block the decode loop for other requests.

import torch
import threading

class KVCacheSwapper:
    """
    Manages swap-out and swap-in of KV cache blocks between
    GPU and CPU memory. Uses async CUDA copies to avoid
    blocking the main decode loop.
    """

    def __init__(self, block_size_bytes, max_cpu_blocks=10000):
        self.block_size = block_size_bytes
        self.max_cpu_blocks = max_cpu_blocks
        # Pre-allocate CPU swap space (pinned memory for fast transfer)
        self.cpu_pool = torch.empty(
            max_cpu_blocks, block_size_bytes,
            dtype=torch.uint8, pin_memory=True
        )
        self.cpu_free_list = list(range(max_cpu_blocks))
        self.swap_map = {}  # gpu_block_id -> cpu_block_id
        self.swap_stream = torch.cuda.Stream()

    def swap_out(self, request, gpu_allocator):
        """
        Swap request's KV cache from GPU to CPU.
        Returns immediately; transfer happens asynchronously.
        """
        block_ids = request.kv_cache_blocks
        cpu_ids = []

        for gpu_id in block_ids:
            if not self.cpu_free_list:
                raise RuntimeError("CPU swap space exhausted")

            cpu_id = self.cpu_free_list.pop()
            cpu_ids.append(cpu_id)
            self.swap_map[gpu_id] = cpu_id

        # Async copy GPU -> CPU on swap stream
        with torch.cuda.stream(self.swap_stream):
            for gpu_id, cpu_id in zip(block_ids, cpu_ids):
                gpu_data = gpu_allocator.get_block(gpu_id)
                self.cpu_pool[cpu_id].copy_(
                    gpu_data.view(-1)[:self.block_size],
                    non_blocking=True
                )

        # Record event for synchronization
        swap_event = self.swap_stream.record_event()

        # Free GPU blocks (after transfer completes)
        def free_after_transfer():
            swap_event.synchronize()
            for gpu_id in block_ids:
                gpu_allocator.free_block(gpu_id)

        thread = threading.Thread(target=free_after_transfer)
        thread.start()

        request.state = "preempted"
        request.preempted = True
        request.preemption_count += 1
        request.swap_cpu_ids = cpu_ids
        request.swap_event = swap_event

    def swap_in(self, request, gpu_allocator):
        """
        Swap request's KV cache from CPU back to GPU.
        Allocates new GPU blocks and copies data from CPU.
        """
        cpu_ids = request.swap_cpu_ids
        new_gpu_ids = []

        for _ in cpu_ids:
            gpu_id = gpu_allocator.allocate_block()
            if gpu_id is None:
                # Failed: free already-allocated blocks
                for gid in new_gpu_ids:
                    gpu_allocator.free_block(gid)
                return False
            new_gpu_ids.append(gpu_id)

        # Async copy CPU -> GPU
        with torch.cuda.stream(self.swap_stream):
            for cpu_id, gpu_id in zip(cpu_ids, new_gpu_ids):
                gpu_data = gpu_allocator.get_block(gpu_id)
                gpu_data.view(-1)[:self.block_size].copy_(
                    self.cpu_pool[cpu_id],
                    non_blocking=True
                )

        swap_event = self.swap_stream.record_event()

        # Free CPU blocks after transfer
        def free_cpu_after():
            swap_event.synchronize()
            for old_gpu in request.kv_cache_blocks:
                if old_gpu in self.swap_map:
                    del self.swap_map[old_gpu]
            for cpu_id in cpu_ids:
                self.cpu_free_list.append(cpu_id)

        thread = threading.Thread(target=free_cpu_after)
        thread.start()

        request.kv_cache_blocks = new_gpu_ids
        request.state = "decoding"
        request.preempted = False
        request.swap_event = swap_event

        return True

    def swap_space_available(self):
        return len(self.cpu_free_list)

5.2 Recompute-Based Preemption

Recompute preemption is simpler: discard the KV cache, save only the token sequence, and re-prefill when the request resumes.

class RecomputePreemptor:
    """
    Preemption by discarding KV cache and saving generated tokens.
    Resume by re-running prefill on prompt + generated tokens.
    """

    def __init__(self, gpu_allocator):
        self.gpu_allocator = gpu_allocator
        self.saved_sequences = {}  # request_id -> token list

    def preempt(self, request):
        """Preempt by saving tokens and freeing GPU blocks."""
        # Save the full sequence (prompt + generated)
        self.saved_sequences[request.request_id] = {
            'prompt_tokens': list(request.prompt_tokens),
            'generated_tokens': list(request.generated_tokens),
        }

        # Free all GPU blocks
        for block_id in request.kv_cache_blocks:
            self.gpu_allocator.free_block(block_id)

        request.kv_cache_blocks = []
        request.state = "preempted"
        request.preempted = True
        request.preemption_count += 1

        return len(request.kv_cache_blocks)  # blocks freed

    def resume(self, request, model, gpu_allocator):
        """
        Resume by re-running prefill on the full sequence.
        This regenerates the KV cache from scratch.
        """
        saved = self.saved_sequences.get(request.request_id)
        if saved is None:
            return False

        # Reconstruct full input
        full_sequence = (
            saved['prompt_tokens'] + saved['generated_tokens']
        )

        # Allocate new KV cache blocks
        blocks_needed = (len(full_sequence) + 15) // 16
        new_blocks = []
        for _ in range(blocks_needed):
            block = gpu_allocator.allocate_block()
            if block is None:
                for b in new_blocks:
                    gpu_allocator.free_block(b)
                return False
            new_blocks.append(block)

        request.kv_cache_blocks = new_blocks

        # Re-run prefill (expensive but no PCIe transfer)
        input_tensor = torch.tensor(
            [full_sequence], device='cuda'
        )
        with torch.no_grad():
            model(input_tensor, use_cache=True)

        request.state = "decoding"
        request.preempted = False
        del self.saved_sequences[request.request_id]

        return True

Complete Priority Scheduler

6.1 Integration

import heapq
import time as time_mod
import threading

class PriorityScheduler:
    """
    Production priority scheduler for LLM inference.
    Integrates: priority queue, preemption, SLO monitoring,
    fair share, and admission control.
    """

    def __init__(self, model, gpu_allocator, config):
        self.model = model
        self.gpu_allocator = gpu_allocator
        self.config = config

        # Core components
        self.resource_monitor = ResourceMonitor(
            total_kv_blocks=config['total_kv_blocks'],
            max_batch_size=config['max_batch_size']
        )
        self.cost_model = PreemptionCostModel(
            pcie_bandwidth_gbps=config.get('pcie_bw_gbps', 32.0),
            prefill_tokens_per_sec=config.get('prefill_tps', 10000),
        )
        self.victim_selector = VictimSelector(self.cost_model)
        self.slo_monitor = SLOMonitor(window_size=200)
        self.fair_share = FairShareScheduler(
            age_boost_rate_per_sec=config.get('age_boost_rate', 0.1),
            max_boost=config.get('max_age_boost', 1.5),
        )
        self.swapper = KVCacheSwapper(
            block_size_bytes=config.get('block_size_bytes', 8192),
            max_cpu_blocks=config.get('max_cpu_swap_blocks', 10000),
        )
        self.recompute = RecomputePreemptor(gpu_allocator)

        # Queues
        self.waiting_queue = []  # requests waiting for admission
        self.running_requests = []  # currently in the batch
        self.preempted_requests = []  # swapped out, waiting to resume

        # Statistics
        self.stats = {
            'total_admitted': 0,
            'total_preempted': 0,
            'total_rejected': 0,
            'total_shed': 0,
            'preemptions_by_strategy': {s: 0 for s in PreemptionStrategy},
        }

        self.lock = threading.Lock()

    def submit_request(self, request):
        """Submit a new request to the scheduler."""
        with self.lock:
            # Admission control
            current_load = (
                self.resource_monitor.allocated_blocks /
                self.resource_monitor.total_kv_blocks
            )

            if not self.fair_share.admission_control(
                request, current_load
            ):
                self.stats['total_rejected'] += 1
                return False, "Rejected: system overloaded"

            # Check for load shedding
            slo_targets = self.config.get('slo_targets', {})
            shed_levels = self.slo_monitor.should_shed_load(slo_targets)
            if request.priority in shed_levels:
                self.stats['total_shed'] += 1
                return False, f"Shed: {request.priority.name} traffic"

            self.waiting_queue.append(request)
            self.stats['total_admitted'] += 1
            return True, "Queued"

    def schedule_iteration(self):
        """
        Run one scheduling iteration.
        Called every decode step by the inference engine.

        Returns the batch of requests to process this iteration.
        """
        with self.lock:
            batch = []

            # 1. Try to resume preempted requests
            self._try_resume_preempted()

            # 2. Sort waiting queue by effective priority
            self.waiting_queue = self.fair_share.sort_queue(
                self.waiting_queue
            )

            # 3. Admit requests from waiting queue
            while self.waiting_queue:
                request = self.waiting_queue[0]

                if self.resource_monitor.can_admit(request):
                    # Direct admit: enough free blocks
                    self.waiting_queue.pop(0)
                    self._admit_request(request)
                elif self._try_preempt_for(request):
                    # Preempted a lower-priority request
                    self.waiting_queue.pop(0)
                    self._admit_request(request)
                else:
                    # Cannot admit: no free blocks, nothing to preempt
                    break

            # 4. Build batch from running requests
            batch = list(self.running_requests)

            # 5. Record SLO metrics
            self._record_slo_metrics()

            return batch

    def _admit_request(self, request):
        """Allocate resources and add to running batch."""
        blocks_needed = self.resource_monitor.blocks_needed(request)
        allocated = []

        for _ in range(blocks_needed):
            block = self.gpu_allocator.allocate_block()
            if block is not None:
                allocated.append(block)

        request.kv_cache_blocks = allocated
        request.state = "prefilling"
        self.running_requests.append(request)
        self.resource_monitor.allocated_blocks += len(allocated)
        self.resource_monitor.active_requests += 1

    def _try_preempt_for(self, incoming):
        """
        Attempt to preempt running requests to make room
        for the incoming request.
        """
        if incoming.priority == Priority.BACKGROUND:
            return False  # Background never triggers preemption

        blocks_needed = self.resource_monitor.blocks_needed(incoming)
        blocks_available = self.resource_monitor.blocks_available()
        blocks_deficit = blocks_needed - blocks_available

        if blocks_deficit <= 0:
            return True  # Already enough space

        victims = self.victim_selector.select_multiple_victims(
            self.running_requests, blocks_deficit
        )

        if victims is None:
            return False  # Cannot free enough blocks

        for victim, strategy in victims:
            self._execute_preemption(victim, strategy)

        return True

    def _execute_preemption(self, request, strategy):
        """Execute preemption with the chosen strategy."""
        self.running_requests.remove(request)
        self.resource_monitor.active_requests -= 1

        if strategy == PreemptionStrategy.SWAP:
            self.swapper.swap_out(request, self.gpu_allocator)
            self.preempted_requests.append(request)
        elif strategy == PreemptionStrategy.RECOMPUTE:
            blocks_freed = self.recompute.preempt(request)
            self.resource_monitor.allocated_blocks -= blocks_freed
            self.preempted_requests.append(request)
        elif strategy == PreemptionStrategy.DROP:
            # Free blocks and discard request
            for block in request.kv_cache_blocks:
                self.gpu_allocator.free_block(block)
            self.resource_monitor.allocated_blocks -= len(
                request.kv_cache_blocks
            )
            request.state = "dropped"

        self.stats['total_preempted'] += 1
        self.stats['preemptions_by_strategy'][strategy] += 1

    def _try_resume_preempted(self):
        """Try to resume preempted requests when resources free up."""
        # Sort preempted by effective priority (highest first)
        self.preempted_requests = self.fair_share.sort_queue(
            self.preempted_requests
        )

        resumed = []
        for request in self.preempted_requests:
            if self.resource_monitor.can_admit(request):
                if request.state == "preempted" and hasattr(
                    request, 'swap_cpu_ids'
                ):
                    success = self.swapper.swap_in(
                        request, self.gpu_allocator
                    )
                else:
                    success = self.recompute.resume(
                        request, self.model, self.gpu_allocator
                    )

                if success:
                    self.running_requests.append(request)
                    self.resource_monitor.active_requests += 1
                    resumed.append(request)

        for r in resumed:
            self.preempted_requests.remove(r)

    def _record_slo_metrics(self):
        """Record current SLO metrics for monitoring."""
        now = time_mod.time()
        for request in self.running_requests:
            if request.ttft_ms is not None:
                self.slo_monitor.record_ttft(
                    request.priority, request.ttft_ms
                )
            if (request.generated_length > 1 and
                    request.avg_ms_per_token > 0):
                self.slo_monitor.record_tpot(
                    request.priority, request.avg_ms_per_token
                )

    def complete_request(self, request):
        """Called when a request finishes generation."""
        with self.lock:
            if request in self.running_requests:
                self.running_requests.remove(request)

            # Free KV cache blocks
            for block in request.kv_cache_blocks:
                self.gpu_allocator.free_block(block)
            self.resource_monitor.allocated_blocks -= len(
                request.kv_cache_blocks
            )
            self.resource_monitor.active_requests -= 1

            # Record GPU time for fair share
            e2e_ms = (time_mod.time() - request.arrival_time) * 1000
            self.fair_share.record_gpu_time(request.priority, e2e_ms)

            request.state = "done"

    def get_stats(self):
        """Return scheduler statistics."""
        with self.lock:
            return {
                'running': len(self.running_requests),
                'waiting': len(self.waiting_queue),
                'preempted': len(self.preempted_requests),
                'stats': dict(self.stats),
                'slo_compliance': {
                    p.name: self.slo_monitor.check_slo_compliance(
                        p,
                        self.config.get('slo_targets', {}).get(
                            p, {}
                        ).get('ttft_ms', 0),
                        self.config.get('slo_targets', {}).get(
                            p, {}
                        ).get('tpot_ms', 0),
                    )
                    for p in [Priority.PREMIUM, Priority.STANDARD]
                },
                'fair_share_actual': {
                    p.name: self.fair_share._actual_share(p)
                    for p in Priority if p.value >= 0
                },
            }

6.2 Scheduler Performance Under Mixed Load

📊

Priority Scheduler vs FIFO Under Mixed Traffic (A100, Llama 70B)

MetricFIFOPriority (no preemption)Priority + PreemptionPriority + Preemption + SLO
Premium p99 TTFT 2100ms 280ms 195ms 185ms
Standard p99 TTFT 2100ms 650ms 520ms 480ms
Background p99 TTFT 2100ms 8200ms 12500ms 18000ms
Premium SLO compliance 72% 98.1% 99.6% 99.9%
Standard SLO compliance 72% 91.3% 95.8% 97.2%
Overall throughput (tok/s) 4200 4100 3950 3900
Preemptions/min 0 0 12.4 8.7
Note: Traffic mix: 20% premium, 50% standard, 30% background. Load: 90% capacity. SLO targets: premium TTFT 200ms, standard TTFT 500ms.
The Throughput-Fairness Tradeoff

Priority scheduling with preemption costs 5-6% total throughput due to preemption overhead (swap transfers and recomputation). The trade is worthwhile: premium SLO compliance goes from 72% to 99.9%. The throughput cost is borne almost entirely by background traffic, which is the intended behavior — background requests exist to fill idle capacity, not to compete with paying customers.

Production Considerations

7.1 Preemption Limits

Unbounded preemption creates pathological behavior. A request that gets preempted, resumed, and preempted again wastes significant compute on swap transfers and recomputation. Set a maximum preemption count per request (typically 2-3). After exceeding the limit, either promote the request’s priority or drop it with an error indicating capacity issues.

7.2 Capacity Reservation

For premium traffic, the most reliable approach is not preemption but reservation. Allocate a fixed fraction of KV cache blocks (e.g., 30%) exclusively for premium requests. Standard and background requests cannot use reserved blocks even if they are idle. This guarantees that premium requests are never queued waiting for preemption.

The cost: reduced effective capacity for non-premium traffic. With 30% reserved, the system operates at 70% effective capacity for standard and background traffic. This is acceptable when premium traffic constitutes 20-30% of total volume.

7.3 Monitoring and Alerting

The scheduler must export metrics for operational monitoring: preemption rate (per minute, per strategy), SLO compliance (per priority, rolling window), queue depth (per priority), fair share deviation, and swap space utilization. Alert thresholds: preemption rate greater than 20/min indicates capacity planning is wrong; premium SLO compliance below 99.5% requires immediate capacity addition.