Part of Series vLLM v1 & Omni Internals 20 of 25
1 vLLM v1 Block Manager: Deconstructing KV Cache Memory Management at the Pointer Level 2 vLLM v1 Disaggregated Serving: The E/P/D/G Pipeline and Multimodal-First Architecture 3 vLLM OmniConnector: Async Multimodal Token Lifecycle Management 4 vLLM v1 Unified Scheduler: One Queue, No Prefill/Decode Distinction, and Persistent Batches 5 vLLM v1 Attention Backends: FlashAttention, FlashInfer, and PagedAttention Selection Logic 6 vLLM v1 Rejection Sampler: Native CFG and Speculative Verification Kernels 7 vLLM v1 Tensor Parallelism: Symmetric Workers, Incremental Updates, and NCCL Optimization 8 vLLM v1 Structured Output: The Native Grammar Engine and Token Mask Caching 9 vLLM v1 Prefix Caching: Hash Chains, LRU Eviction, and Hit Rate Optimization 10 vLLM v1 Multi-LoRA: Adapter Scheduling, Memory Management, and Batched Inference 11 vLLM v1 Performance Profiling: Finding and Fixing Bottlenecks in Production 12 vLLM v1 Speculative Decoding: Draft Model Integration and Token Verification Pipeline 13 vLLM v1 Vision Encoder: ViT Integration, Image Preprocessing, and Visual Token Pipeline 14 vLLM v1 Model Loading: Weight Distribution, safetensors Deserialization, and Progressive Startup 15 vLLM v1 Request Cancellation and Early Stopping: Freeing Resources Mid-Generation 16 vLLM v1 Quantized Inference: GPTQ, AWQ, FP8 Kernel Selection 17 vLLM v1 Distributed Execution: Ray Integration and Multi-Node Coordination 18 vLLM v1 KV Cache Offloading: GPU to CPU to SSD Tiered Memory 19 vLLM v1 Async Output: Detokenization, Streaming, and Queue Management 20 vLLM v1 Video and Audio: Temporal Encoding and Multi-Modal Batching 21 vLLM v1 Benchmarking: Systematic Optimization for Your Workload 22 vLLM v1 Error Handling: CUDA OOM Recovery, Request Retry, and Graceful Degradation 23 vLLM v1 Configuration Guide: gpu_memory_utilization, max_num_seqs, and Every Key Parameter 24 vLLM v1 Plugin Architecture: Custom Samplers, Schedulers, and Attention Backends 25 vLLM v1 Production Checklist: From Development to Reliable 24/7 Serving

Llama 405B in FP16 requires 810 GB of memory. No single GPU can hold it — not even eight H100s with 640 GB combined. You need distributed execution across multiple nodes, and every forward pass becomes a choreographed dance of tensor communication over NVLink and Ethernet. vLLM v1 orchestrates this with Ray as the control plane, NCCL for GPU-to-GPU data movement, and a driver-worker architecture where scheduling stays on one node while computation distributes across the cluster. This post traces the complete distributed execution path: how Ray actors initialize, how NCCL process groups form across nodes, how model weights get sharded and distributed, and the per-step synchronization protocol that keeps 16 GPUs across 4 nodes working in lockstep.

Architecture Overview

vLLM’s distributed execution has three layers:

  1. Control plane: The LLMEngine on the driver node schedules requests, manages the KV cache block table, and dispatches execution commands.
  2. Ray actors: Each GPU is managed by a Worker Ray actor. Actors can live on any node in the Ray cluster.
  3. Data plane: NCCL process groups handle all tensor communication between GPUs. No tensor data flows through Ray — Ray only carries control messages.
# Simplified distributed initialization
class DistributedExecutor:
    def __init__(self, engine_config: EngineConfig):
        self.tp_size = engine_config.parallel_config.tensor_parallel_size
        self.pp_size = engine_config.parallel_config.pipeline_parallel_size
        self.world_size = self.tp_size * self.pp_size

        # Create one Ray actor per GPU
        self.workers = []
        for rank in range(self.world_size):
            worker = RayWorkerWrapper.options(
                num_gpus=1,
                scheduling_strategy=self._placement_strategy(rank)
            ).remote(engine_config, rank)
            self.workers.append(worker)

        # Initialize NCCL groups (blocks until all workers are ready)
        ray.get([w.init_nccl.remote() for w in self.workers])

The key insight: Ray manages process lifecycle and placement. NCCL manages tensor communication. These two systems never overlap in function.

Ray Actor Placement

When deploying across multiple nodes, vLLM needs to place workers strategically. Workers in the same tensor-parallel group must minimize communication latency, ideally sitting on the same node connected via NVLink.

def _placement_strategy(self, rank: int) -> PlacementGroupSchedulingStrategy:
    tp_group_id = rank // self.tp_size
    local_rank = rank % self.tp_size

    # Create placement group: one bundle per GPU
    # Workers in same TP group go in same placement group
    # to ensure node co-location
    bundles = [{"GPU": 1, "CPU": 1} for _ in range(self.tp_size)]
    pg = ray.util.placement_group(
        bundles, strategy="STRICT_PACK"  # all on same node
    )

    return PlacementGroupSchedulingStrategy(
        placement_group=pg,
        placement_group_bundle_index=local_rank
    )

The STRICT_PACK strategy forces all GPUs in a tensor-parallel group onto the same physical node. Pipeline-parallel stages can span nodes because the communication pattern (send/recv of activations) is point-to-point and tolerates higher latency.

ℹ️ Note

For a model with TP=4 and PP=2, vLLM creates 2 placement groups (one per PP stage), each requesting 4 GPUs on a single node. The total is 8 GPUs across 2 nodes. Each node handles one pipeline stage with 4-way tensor parallelism within the node.

Topology Detection

vLLM detects the interconnect topology to validate placement decisions:

def detect_gpu_topology(self) -> dict:
    """Detect NVLink and PCIe topology for placement validation."""
    result = {}
    for i in range(torch.cuda.device_count()):
        for j in range(i + 1, torch.cuda.device_count()):
            can_p2p = torch.cuda.can_device_access_peer(i, j)
            result[(i, j)] = "nvlink" if can_p2p else "pcie"
    return result

If tensor-parallel workers end up on GPUs connected only via PCIe (no NVLink), the all-reduce bandwidth drops by 5-10x. vLLM logs a warning but does not fail.

NCCL Process Group Initialization

After Ray actors are placed, vLLM initializes NCCL communicators. This is a multi-step process that must happen before any tensor operations.

class Worker:
    def init_nccl(self):
        """Initialize NCCL process groups for this worker."""
        # Step 1: Exchange NCCL unique IDs via Ray
        if self.local_rank == 0:
            # Root of each group generates the unique ID
            nccl_id = torch.cuda.nccl.unique_id()
            # Broadcast to all workers in the group via Ray object store
            self.nccl_id_ref = ray.put(nccl_id)
        else:
            self.nccl_id_ref = None  # Will receive from root

        # Step 2: All workers in the TP group call ncclCommInitRank
        torch.cuda.set_device(self.local_rank)
        dist.init_process_group(
            backend="nccl",
            init_method=f"env://",
            world_size=self.tp_size,
            rank=self.local_rank
        )

        # Step 3: Create sub-groups for TP and PP
        self.tp_group = dist.new_group(
            ranks=list(range(self.tp_rank_start,
                             self.tp_rank_start + self.tp_size))
        )
        if self.pp_size > 1:
            # PP group: same local_rank across pipeline stages
            pp_ranks = [self.local_rank + i * self.tp_size
                        for i in range(self.pp_size)]
            self.pp_group = dist.new_group(ranks=pp_ranks)

The NCCL unique ID exchange is the bootstrapping problem. One process generates a random ID, and all processes in the group must receive the same ID before calling ncclCommInitRank. vLLM uses Ray’s object store as the broadcast medium.

NCCL Ring Topology

For tensor parallelism, vLLM uses all-reduce operations. NCCL automatically selects the optimal algorithm (ring, tree, or collnet) based on message size and topology:

4-GPU NVLink ring all-reduce for tensor parallelism:
  GPU 0 -> GPU 1 -> GPU 2 -> GPU 3 -> GPU 0

Bandwidth utilization:
  Ring algorithm: 2 * (N-1)/N * bandwidth_per_link
  For N=4: 1.5 * 300 GB/s (NVLink 4.0) = 450 GB/s aggregate

  All-reduce of 4096-dim hidden state (FP16):
  Data: 4096 * 2 = 8 KB per token
  Time: 8 KB / 450 GB/s = 0.018 us (negligible)

  All-reduce of full attention output [batch, seq, 4096]:
  Data for batch=128: 128 * 4096 * 2 = 1 MB
  Time: 1 MB / 450 GB/s = 2.2 us

Weight Distribution

Model weights are loaded once and distributed to all workers. vLLM uses two strategies depending on storage:

Strategy 1: Load on Driver, Scatter via NCCL

class DistributedExecutor:
    def load_model_weights(self):
        """Load and distribute model weights."""
        # Driver loads full model from disk/HuggingFace
        full_state_dict = load_safetensors(self.model_path)

        for layer_name, weight in full_state_dict.items():
            shard_spec = self.get_shard_spec(layer_name)

            if shard_spec.type == "column_parallel":
                # Split along output dimension
                shards = weight.chunk(self.tp_size, dim=0)
            elif shard_spec.type == "row_parallel":
                # Split along input dimension
                shards = weight.chunk(self.tp_size, dim=1)
            elif shard_spec.type == "replicated":
                shards = [weight] * self.tp_size
            else:
                shards = [weight] * self.tp_size

            # Send each shard to its worker
            futures = []
            for rank, shard in enumerate(shards):
                f = self.workers[rank].load_weight.remote(
                    layer_name, shard
                )
                futures.append(f)
            ray.get(futures)

Strategy 2: Direct Load on Workers (Preferred)

For large models, loading all weights on the driver node requires 2x the memory (full model + shards). The preferred approach loads directly on workers:

class Worker:
    def load_model_direct(self):
        """Each worker loads only its shard from disk."""
        # Each worker reads the same safetensors files
        # but extracts only its shard
        for shard_file in self.weight_files:
            with safe_open(shard_file, framework="pt") as f:
                for key in f.keys():
                    tensor = f.get_tensor(key)
                    my_shard = self.extract_shard(
                        tensor, key, self.tp_rank, self.tp_size
                    )
                    self.model.load_weight(key, my_shard)
                    del tensor  # free full tensor immediately
📊

Model Loading Time — Llama 70B

StrategyNodesGPUsLoad Time (s)Peak Driver RAM (GB)
Driver scatter 1 4xA100 85 280
Direct load 1 4xA100 48 35
Driver scatter 2 8xA100 120 280
Direct load 2 8xA100 52 35
Direct load (NFS) 2 8xA100 65 35

Direct loading is 1.5-2.3x faster and uses 8x less driver RAM. The main requirement is that all nodes can access the model files, either via shared filesystem (NFS, Lustre) or by pre-downloading to local SSD.

Per-Step Execution Protocol

During inference, each step follows a strict synchronization protocol between the driver and workers.

class DistributedExecutor:
    def execute_step(self, scheduled_batch: ScheduledBatch):
        """Execute one forward pass across all workers."""
        # Step 1: Driver prepares execution metadata
        metadata = ExecutionMetadata(
            seq_ids=scheduled_batch.seq_ids,
            block_tables=scheduled_batch.block_tables,
            context_lens=scheduled_batch.context_lens,
            is_prefill=scheduled_batch.is_prefill,
        )

        # Step 2: Broadcast metadata to all workers
        # Only control data — no tensors
        futures = [
            w.execute_model.remote(metadata) for w in self.workers
        ]

        # Step 3: Workers execute forward pass
        # NCCL all-reduces happen inside worker forward passes
        # Driver waits for results from rank 0 only
        output = ray.get(futures[0])

        # Other workers' futures are collected but results discarded
        # (rank 0 has the complete output after final all-reduce)
        ray.get(futures[1:])

        return output

Worker Forward Pass with TP Communication

Inside each worker, the forward pass interleaves computation with NCCL all-reduce operations:

class Worker:
    def execute_model(self, metadata: ExecutionMetadata):
        """Execute forward pass on this worker's GPU."""
        torch.cuda.set_device(self.local_rank)

        # Embed tokens (replicated across TP ranks)
        hidden = self.model.embed_tokens(metadata.input_ids)

        for layer in self.model.layers:
            # QKV projection: column-parallel (each rank has 1/tp_size heads)
            qkv = layer.qkv_proj(hidden)  # local GEMM, no communication

            # Attention: each rank computes its own heads
            attn_out = layer.attention(qkv, metadata)  # local

            # Output projection: row-parallel
            # Each rank has partial result, need all-reduce
            attn_result = layer.o_proj(attn_out)  # local GEMM
            dist.all_reduce(attn_result, group=self.tp_group)

            hidden = hidden + attn_result  # residual

            # MLP: gate_up is column-parallel, down is row-parallel
            gate_up = layer.gate_up_proj(hidden)  # local
            mlp_out = layer.down_proj(silu(gate_up))  # local
            dist.all_reduce(mlp_out, group=self.tp_group)

            hidden = hidden + mlp_out  # residual

        # Final norm + LM head
        logits = self.model.lm_head(self.model.norm(hidden))
        return logits if self.tp_rank == 0 else None

Each transformer layer requires exactly 2 all-reduce operations: one after the attention output projection and one after the MLP down projection. For a 70B model with 80 layers, that is 160 all-reduce calls per forward pass.

All-Reduce Overhead per Forward Pass — Llama 70B, TP=4

NVLink (A100)
1.8
NVLink (H100)
1.2
PCIe Gen4
12.5
InfiniBand (cross-node)
8.4
Ethernet 100G (cross-node)
45

Pipeline Parallelism

Pipeline parallelism splits the model’s layers across multiple stages. Each stage processes a different micro-batch, overlapping computation with communication.

class PipelineParallelWorker(Worker):
    def __init__(self, config, rank):
        super().__init__(config, rank)
        self.pp_rank = rank // self.tp_size
        self.pp_size = config.parallel_config.pipeline_parallel_size

        # Determine which layers this stage owns
        total_layers = config.model_config.num_layers
        layers_per_stage = total_layers // self.pp_size
        self.start_layer = self.pp_rank * layers_per_stage
        self.end_layer = self.start_layer + layers_per_stage

    def execute_model(self, metadata):
        if self.pp_rank == 0:
            # First stage: embed tokens
            hidden = self.model.embed_tokens(metadata.input_ids)
        else:
            # Receive hidden states from previous stage
            hidden = torch.empty(
                metadata.batch_size, self.hidden_size,
                dtype=torch.float16, device=self.device
            )
            dist.recv(hidden, src=self.pp_rank - 1, group=self.pp_group)

        # Process this stage's layers
        for i in range(self.start_layer, self.end_layer):
            hidden = self.model.layers[i](hidden, metadata)

        if self.pp_rank == self.pp_size - 1:
            # Last stage: compute logits
            logits = self.model.lm_head(self.model.norm(hidden))
            return logits
        else:
            # Send hidden states to next stage
            dist.send(hidden, dst=self.pp_rank + 1, group=self.pp_group)
            return None

Pipeline Bubble Analysis

The pipeline bubble is the fraction of time GPUs sit idle waiting for activations from other stages. For a pipeline with PP stages and MM micro-batches:

bubble fraction=P1M+P1\text{bubble fraction} = \frac{P - 1}{M + P - 1}

📊

Pipeline Bubble Fraction

PP StagesMicro-batches=4Micro-batches=8Micro-batches=16Micro-batches=32
2 25.0% 11.1% 5.9% 3.0%
4 42.9% 27.3% 15.8% 8.6%
8 53.8% 46.7% 30.4% 17.9%
16 78.9% 65.2% 48.4% 31.9%
⚠️ Warning

Pipeline parallelism with PP=8 or higher requires at least 16 micro-batches to keep the bubble below 32%. For interactive serving where batch sizes are small, PP=2 is the practical maximum. Beyond that, tensor parallelism is more efficient.

Multi-Node NCCL Configuration

Cross-node NCCL communication requires careful configuration to achieve full bandwidth:

# Environment variables for multi-node NCCL
export NCCL_IB_DISABLE=0          # Enable InfiniBand
export NCCL_IB_HCA=mlx5           # IB device name
export NCCL_IB_GID_INDEX=3        # RoCE GID index
export NCCL_NET_GDR_LEVEL=5       # GPU Direct RDMA level
export NCCL_SOCKET_IFNAME=eth0    # Fallback network interface
export NCCL_DEBUG=INFO             # Debug logging

# vLLM multi-node launch
ray start --head --port=6379 --num-gpus=4  # on node 0
ray start --address=node0:6379 --num-gpus=4  # on node 1

python -m vllm.entrypoints.openai.api_server \
    --model meta-llama/Llama-2-70b-hf \
    --tensor-parallel-size 4 \
    --pipeline-parallel-size 2 \
    --distributed-executor-backend ray

Bandwidth Allocation

For TP=4, PP=2 across 2 nodes with 4xA100 per node:

Node 0 (PP stage 0): GPU 0-3, TP group
  Intra-node comm: NVLink, 600 GB/s bidirectional
  TP all-reduce: 160 calls/step, ~1.8 ms total

Node 1 (PP stage 1): GPU 4-7, TP group
  Intra-node comm: NVLink, 600 GB/s bidirectional
  TP all-reduce: 160 calls/step, ~1.8 ms total

Cross-node (PP communication):
  InfiniBand HDR: 200 Gb/s = 25 GB/s
  PP send/recv per step: hidden_size * batch * 2 bytes
  For hidden=8192, batch=128: 2 MB per transfer
  Time: 2 MB / 25 GB/s = 80 us per transfer
  Total PP overhead: 2 transfers/step = 160 us

Fault Tolerance

Ray provides actor restart on failure, but vLLM’s fault handling is more nuanced:

class DistributedExecutor:
    def handle_worker_failure(self, failed_rank: int):
        """Handle worker crash during inference."""
        # Step 1: Cancel all in-flight requests
        self.scheduler.abort_all()

        # Step 2: Restart the failed worker
        new_worker = RayWorkerWrapper.options(
            num_gpus=1,
            scheduling_strategy=self._placement_strategy(failed_rank)
        ).remote(self.engine_config, failed_rank)

        # Step 3: Reload model weights on the new worker
        ray.get(new_worker.load_model_direct.remote())

        # Step 4: Re-initialize NCCL group
        # All workers must participate in re-initialization
        ray.get([w.reinit_nccl.remote() for w in self.workers])

        # Step 5: Clear KV cache (all cached state is invalid)
        ray.get([w.clear_kv_cache.remote() for w in self.workers])

        self.workers[failed_rank] = new_worker
🚨 Danger

NCCL does not support removing or adding a rank to an existing communicator. When a single GPU fails in a multi-node setup, all NCCL communicators must be destroyed and recreated. This means all in-flight requests are lost and the KV cache is invalidated. Recovery time is typically 30-60 seconds for a 70B model.

Benchmarks: Scaling Efficiency

📊

Llama 70B Throughput Scaling — A100-80GB

ConfigGPUsThroughput (tok/s)Per-GPU EfficiencyScaling Efficiency
TP=2 2 3,200 1,600 100% (baseline)
TP=4 4 5,900 1,475 92.2%
TP=8 (1 node) 8 10,400 1,300 81.3%
TP=4 PP=2 (2 nodes) 8 9,800 1,225 76.6%
TP=8 (2 nodes) 8 7,200 900 56.3%

Per-GPU Efficiency (tok/s per GPU)

TP=2
1,600
TP=4
1,475
TP=8 (1node)
1,300
TP=4,PP=2
1,225
TP=8 (2node)
900

Key observations:

  1. TP=4 on a single node retains 92% efficiency — NVLink bandwidth is sufficient for the all-reduce volume.
  2. TP=8 on a single node drops to 81% due to NVLink congestion on the switch topology (DGX A100 uses NVSwitch, but each link shares bandwidth).
  3. TP=4 PP=2 across 2 nodes achieves 77% efficiency — the pipeline bubble at typical batch sizes costs roughly 15% and cross-node latency adds another 8%.
  4. TP=8 across 2 nodes is the worst configuration at 56% — cross-node all-reduce on every layer is devastating.

Practical Deployment Patterns

Pattern 1: Single Node, Maximum TP

Best for: interactive serving with low latency requirements.

# 4xA100, TP=4
python -m vllm.entrypoints.openai.api_server \
    --model meta-llama/Llama-2-70b-hf \
    --tensor-parallel-size 4 \
    --max-model-len 4096 \
    --gpu-memory-utilization 0.92

Pattern 2: Multi-Node, TP + PP

Best for: models that exceed single-node memory (405B+).

# 2 nodes, 8 GPUs total, TP=4 PP=2
# Node 0:
ray start --head --port=6379 --num-gpus=4
# Node 1:
ray start --address=node0:6379 --num-gpus=4
# Launch from head node:
python -m vllm.entrypoints.openai.api_server \
    --model meta-llama/Meta-Llama-3.1-405B \
    --tensor-parallel-size 4 \
    --pipeline-parallel-size 2 \
    --distributed-executor-backend ray

Pattern 3: Disaggregated Prefill/Decode

Best for: high-throughput batch processing with long contexts.

# Prefill cluster: 4 GPUs optimized for long-context processing
# Decode cluster: 4 GPUs optimized for token generation
# Requires vLLM's disaggregated serving mode
python -m vllm.entrypoints.openai.api_server \
    --model meta-llama/Llama-2-70b-hf \
    --tensor-parallel-size 4 \
    --enable-disaggregated-prefill \
    --prefill-tp-size 4 \
    --decode-tp-size 4

Summary

vLLM v1’s distributed execution layer separates the control plane (Ray actors, scheduling metadata) from the data plane (NCCL tensor communication). Ray handles process lifecycle and placement, ensuring tensor-parallel workers share a node via STRICT_PACK placement groups. NCCL handles all tensor communication with zero-copy GPU-to-GPU transfers. Each transformer layer incurs exactly 2 all-reduce operations for tensor parallelism, and pipeline parallelism adds point-to-point send/recv between stages. The practical scaling limit is TP=4-8 within a node (92-81% efficiency) and PP=2 across nodes for models that require it. Cross-node tensor parallelism should be avoided when possible due to the 40%+ efficiency loss from all-reduce over InfiniBand.