Llama 405B in FP16 requires 810 GB of memory. No single GPU can hold it — not even eight H100s with 640 GB combined. You need distributed execution across multiple nodes, and every forward pass becomes a choreographed dance of tensor communication over NVLink and Ethernet. vLLM v1 orchestrates this with Ray as the control plane, NCCL for GPU-to-GPU data movement, and a driver-worker architecture where scheduling stays on one node while computation distributes across the cluster. This post traces the complete distributed execution path: how Ray actors initialize, how NCCL process groups form across nodes, how model weights get sharded and distributed, and the per-step synchronization protocol that keeps 16 GPUs across 4 nodes working in lockstep.
Architecture Overview
vLLM’s distributed execution has three layers:
- Control plane: The
LLMEngineon the driver node schedules requests, manages the KV cache block table, and dispatches execution commands. - Ray actors: Each GPU is managed by a
WorkerRay actor. Actors can live on any node in the Ray cluster. - Data plane: NCCL process groups handle all tensor communication between GPUs. No tensor data flows through Ray — Ray only carries control messages.
# Simplified distributed initialization
class DistributedExecutor:
def __init__(self, engine_config: EngineConfig):
self.tp_size = engine_config.parallel_config.tensor_parallel_size
self.pp_size = engine_config.parallel_config.pipeline_parallel_size
self.world_size = self.tp_size * self.pp_size
# Create one Ray actor per GPU
self.workers = []
for rank in range(self.world_size):
worker = RayWorkerWrapper.options(
num_gpus=1,
scheduling_strategy=self._placement_strategy(rank)
).remote(engine_config, rank)
self.workers.append(worker)
# Initialize NCCL groups (blocks until all workers are ready)
ray.get([w.init_nccl.remote() for w in self.workers])
The key insight: Ray manages process lifecycle and placement. NCCL manages tensor communication. These two systems never overlap in function.
Ray Actor Placement
When deploying across multiple nodes, vLLM needs to place workers strategically. Workers in the same tensor-parallel group must minimize communication latency, ideally sitting on the same node connected via NVLink.
def _placement_strategy(self, rank: int) -> PlacementGroupSchedulingStrategy:
tp_group_id = rank // self.tp_size
local_rank = rank % self.tp_size
# Create placement group: one bundle per GPU
# Workers in same TP group go in same placement group
# to ensure node co-location
bundles = [{"GPU": 1, "CPU": 1} for _ in range(self.tp_size)]
pg = ray.util.placement_group(
bundles, strategy="STRICT_PACK" # all on same node
)
return PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_bundle_index=local_rank
)
The STRICT_PACK strategy forces all GPUs in a tensor-parallel group onto the same physical node. Pipeline-parallel stages can span nodes because the communication pattern (send/recv of activations) is point-to-point and tolerates higher latency.
For a model with TP=4 and PP=2, vLLM creates 2 placement groups (one per PP stage), each requesting 4 GPUs on a single node. The total is 8 GPUs across 2 nodes. Each node handles one pipeline stage with 4-way tensor parallelism within the node.
Topology Detection
vLLM detects the interconnect topology to validate placement decisions:
def detect_gpu_topology(self) -> dict:
"""Detect NVLink and PCIe topology for placement validation."""
result = {}
for i in range(torch.cuda.device_count()):
for j in range(i + 1, torch.cuda.device_count()):
can_p2p = torch.cuda.can_device_access_peer(i, j)
result[(i, j)] = "nvlink" if can_p2p else "pcie"
return result
If tensor-parallel workers end up on GPUs connected only via PCIe (no NVLink), the all-reduce bandwidth drops by 5-10x. vLLM logs a warning but does not fail.
NCCL Process Group Initialization
After Ray actors are placed, vLLM initializes NCCL communicators. This is a multi-step process that must happen before any tensor operations.
class Worker:
def init_nccl(self):
"""Initialize NCCL process groups for this worker."""
# Step 1: Exchange NCCL unique IDs via Ray
if self.local_rank == 0:
# Root of each group generates the unique ID
nccl_id = torch.cuda.nccl.unique_id()
# Broadcast to all workers in the group via Ray object store
self.nccl_id_ref = ray.put(nccl_id)
else:
self.nccl_id_ref = None # Will receive from root
# Step 2: All workers in the TP group call ncclCommInitRank
torch.cuda.set_device(self.local_rank)
dist.init_process_group(
backend="nccl",
init_method=f"env://",
world_size=self.tp_size,
rank=self.local_rank
)
# Step 3: Create sub-groups for TP and PP
self.tp_group = dist.new_group(
ranks=list(range(self.tp_rank_start,
self.tp_rank_start + self.tp_size))
)
if self.pp_size > 1:
# PP group: same local_rank across pipeline stages
pp_ranks = [self.local_rank + i * self.tp_size
for i in range(self.pp_size)]
self.pp_group = dist.new_group(ranks=pp_ranks)
The NCCL unique ID exchange is the bootstrapping problem. One process generates a random ID, and all processes in the group must receive the same ID before calling ncclCommInitRank. vLLM uses Ray’s object store as the broadcast medium.
NCCL Ring Topology
For tensor parallelism, vLLM uses all-reduce operations. NCCL automatically selects the optimal algorithm (ring, tree, or collnet) based on message size and topology:
4-GPU NVLink ring all-reduce for tensor parallelism:
GPU 0 -> GPU 1 -> GPU 2 -> GPU 3 -> GPU 0
Bandwidth utilization:
Ring algorithm: 2 * (N-1)/N * bandwidth_per_link
For N=4: 1.5 * 300 GB/s (NVLink 4.0) = 450 GB/s aggregate
All-reduce of 4096-dim hidden state (FP16):
Data: 4096 * 2 = 8 KB per token
Time: 8 KB / 450 GB/s = 0.018 us (negligible)
All-reduce of full attention output [batch, seq, 4096]:
Data for batch=128: 128 * 4096 * 2 = 1 MB
Time: 1 MB / 450 GB/s = 2.2 us
Weight Distribution
Model weights are loaded once and distributed to all workers. vLLM uses two strategies depending on storage:
Strategy 1: Load on Driver, Scatter via NCCL
class DistributedExecutor:
def load_model_weights(self):
"""Load and distribute model weights."""
# Driver loads full model from disk/HuggingFace
full_state_dict = load_safetensors(self.model_path)
for layer_name, weight in full_state_dict.items():
shard_spec = self.get_shard_spec(layer_name)
if shard_spec.type == "column_parallel":
# Split along output dimension
shards = weight.chunk(self.tp_size, dim=0)
elif shard_spec.type == "row_parallel":
# Split along input dimension
shards = weight.chunk(self.tp_size, dim=1)
elif shard_spec.type == "replicated":
shards = [weight] * self.tp_size
else:
shards = [weight] * self.tp_size
# Send each shard to its worker
futures = []
for rank, shard in enumerate(shards):
f = self.workers[rank].load_weight.remote(
layer_name, shard
)
futures.append(f)
ray.get(futures)
Strategy 2: Direct Load on Workers (Preferred)
For large models, loading all weights on the driver node requires 2x the memory (full model + shards). The preferred approach loads directly on workers:
class Worker:
def load_model_direct(self):
"""Each worker loads only its shard from disk."""
# Each worker reads the same safetensors files
# but extracts only its shard
for shard_file in self.weight_files:
with safe_open(shard_file, framework="pt") as f:
for key in f.keys():
tensor = f.get_tensor(key)
my_shard = self.extract_shard(
tensor, key, self.tp_rank, self.tp_size
)
self.model.load_weight(key, my_shard)
del tensor # free full tensor immediately
Model Loading Time — Llama 70B
| Strategy | Nodes | GPUs | Load Time (s) | Peak Driver RAM (GB) |
|---|---|---|---|---|
| Driver scatter | 1 | 4xA100 | 85 | 280 |
| Direct load | 1 | 4xA100 | 48 | 35 |
| Driver scatter | 2 | 8xA100 | 120 | 280 |
| Direct load | 2 | 8xA100 | 52 | 35 |
| Direct load (NFS) | 2 | 8xA100 | 65 | 35 |
Direct loading is 1.5-2.3x faster and uses 8x less driver RAM. The main requirement is that all nodes can access the model files, either via shared filesystem (NFS, Lustre) or by pre-downloading to local SSD.
Per-Step Execution Protocol
During inference, each step follows a strict synchronization protocol between the driver and workers.
class DistributedExecutor:
def execute_step(self, scheduled_batch: ScheduledBatch):
"""Execute one forward pass across all workers."""
# Step 1: Driver prepares execution metadata
metadata = ExecutionMetadata(
seq_ids=scheduled_batch.seq_ids,
block_tables=scheduled_batch.block_tables,
context_lens=scheduled_batch.context_lens,
is_prefill=scheduled_batch.is_prefill,
)
# Step 2: Broadcast metadata to all workers
# Only control data — no tensors
futures = [
w.execute_model.remote(metadata) for w in self.workers
]
# Step 3: Workers execute forward pass
# NCCL all-reduces happen inside worker forward passes
# Driver waits for results from rank 0 only
output = ray.get(futures[0])
# Other workers' futures are collected but results discarded
# (rank 0 has the complete output after final all-reduce)
ray.get(futures[1:])
return output
Worker Forward Pass with TP Communication
Inside each worker, the forward pass interleaves computation with NCCL all-reduce operations:
class Worker:
def execute_model(self, metadata: ExecutionMetadata):
"""Execute forward pass on this worker's GPU."""
torch.cuda.set_device(self.local_rank)
# Embed tokens (replicated across TP ranks)
hidden = self.model.embed_tokens(metadata.input_ids)
for layer in self.model.layers:
# QKV projection: column-parallel (each rank has 1/tp_size heads)
qkv = layer.qkv_proj(hidden) # local GEMM, no communication
# Attention: each rank computes its own heads
attn_out = layer.attention(qkv, metadata) # local
# Output projection: row-parallel
# Each rank has partial result, need all-reduce
attn_result = layer.o_proj(attn_out) # local GEMM
dist.all_reduce(attn_result, group=self.tp_group)
hidden = hidden + attn_result # residual
# MLP: gate_up is column-parallel, down is row-parallel
gate_up = layer.gate_up_proj(hidden) # local
mlp_out = layer.down_proj(silu(gate_up)) # local
dist.all_reduce(mlp_out, group=self.tp_group)
hidden = hidden + mlp_out # residual
# Final norm + LM head
logits = self.model.lm_head(self.model.norm(hidden))
return logits if self.tp_rank == 0 else None
Each transformer layer requires exactly 2 all-reduce operations: one after the attention output projection and one after the MLP down projection. For a 70B model with 80 layers, that is 160 all-reduce calls per forward pass.
All-Reduce Overhead per Forward Pass — Llama 70B, TP=4
Pipeline Parallelism
Pipeline parallelism splits the model’s layers across multiple stages. Each stage processes a different micro-batch, overlapping computation with communication.
class PipelineParallelWorker(Worker):
def __init__(self, config, rank):
super().__init__(config, rank)
self.pp_rank = rank // self.tp_size
self.pp_size = config.parallel_config.pipeline_parallel_size
# Determine which layers this stage owns
total_layers = config.model_config.num_layers
layers_per_stage = total_layers // self.pp_size
self.start_layer = self.pp_rank * layers_per_stage
self.end_layer = self.start_layer + layers_per_stage
def execute_model(self, metadata):
if self.pp_rank == 0:
# First stage: embed tokens
hidden = self.model.embed_tokens(metadata.input_ids)
else:
# Receive hidden states from previous stage
hidden = torch.empty(
metadata.batch_size, self.hidden_size,
dtype=torch.float16, device=self.device
)
dist.recv(hidden, src=self.pp_rank - 1, group=self.pp_group)
# Process this stage's layers
for i in range(self.start_layer, self.end_layer):
hidden = self.model.layers[i](hidden, metadata)
if self.pp_rank == self.pp_size - 1:
# Last stage: compute logits
logits = self.model.lm_head(self.model.norm(hidden))
return logits
else:
# Send hidden states to next stage
dist.send(hidden, dst=self.pp_rank + 1, group=self.pp_group)
return None
Pipeline Bubble Analysis
The pipeline bubble is the fraction of time GPUs sit idle waiting for activations from other stages. For a pipeline with stages and micro-batches:
Pipeline Bubble Fraction
| PP Stages | Micro-batches=4 | Micro-batches=8 | Micro-batches=16 | Micro-batches=32 |
|---|---|---|---|---|
| 2 | 25.0% | 11.1% | 5.9% | 3.0% |
| 4 | 42.9% | 27.3% | 15.8% | 8.6% |
| 8 | 53.8% | 46.7% | 30.4% | 17.9% |
| 16 | 78.9% | 65.2% | 48.4% | 31.9% |
Pipeline parallelism with PP=8 or higher requires at least 16 micro-batches to keep the bubble below 32%. For interactive serving where batch sizes are small, PP=2 is the practical maximum. Beyond that, tensor parallelism is more efficient.
Multi-Node NCCL Configuration
Cross-node NCCL communication requires careful configuration to achieve full bandwidth:
# Environment variables for multi-node NCCL
export NCCL_IB_DISABLE=0 # Enable InfiniBand
export NCCL_IB_HCA=mlx5 # IB device name
export NCCL_IB_GID_INDEX=3 # RoCE GID index
export NCCL_NET_GDR_LEVEL=5 # GPU Direct RDMA level
export NCCL_SOCKET_IFNAME=eth0 # Fallback network interface
export NCCL_DEBUG=INFO # Debug logging
# vLLM multi-node launch
ray start --head --port=6379 --num-gpus=4 # on node 0
ray start --address=node0:6379 --num-gpus=4 # on node 1
python -m vllm.entrypoints.openai.api_server \
--model meta-llama/Llama-2-70b-hf \
--tensor-parallel-size 4 \
--pipeline-parallel-size 2 \
--distributed-executor-backend ray
Bandwidth Allocation
For TP=4, PP=2 across 2 nodes with 4xA100 per node:
Node 0 (PP stage 0): GPU 0-3, TP group
Intra-node comm: NVLink, 600 GB/s bidirectional
TP all-reduce: 160 calls/step, ~1.8 ms total
Node 1 (PP stage 1): GPU 4-7, TP group
Intra-node comm: NVLink, 600 GB/s bidirectional
TP all-reduce: 160 calls/step, ~1.8 ms total
Cross-node (PP communication):
InfiniBand HDR: 200 Gb/s = 25 GB/s
PP send/recv per step: hidden_size * batch * 2 bytes
For hidden=8192, batch=128: 2 MB per transfer
Time: 2 MB / 25 GB/s = 80 us per transfer
Total PP overhead: 2 transfers/step = 160 us
Fault Tolerance
Ray provides actor restart on failure, but vLLM’s fault handling is more nuanced:
class DistributedExecutor:
def handle_worker_failure(self, failed_rank: int):
"""Handle worker crash during inference."""
# Step 1: Cancel all in-flight requests
self.scheduler.abort_all()
# Step 2: Restart the failed worker
new_worker = RayWorkerWrapper.options(
num_gpus=1,
scheduling_strategy=self._placement_strategy(failed_rank)
).remote(self.engine_config, failed_rank)
# Step 3: Reload model weights on the new worker
ray.get(new_worker.load_model_direct.remote())
# Step 4: Re-initialize NCCL group
# All workers must participate in re-initialization
ray.get([w.reinit_nccl.remote() for w in self.workers])
# Step 5: Clear KV cache (all cached state is invalid)
ray.get([w.clear_kv_cache.remote() for w in self.workers])
self.workers[failed_rank] = new_worker
NCCL does not support removing or adding a rank to an existing communicator. When a single GPU fails in a multi-node setup, all NCCL communicators must be destroyed and recreated. This means all in-flight requests are lost and the KV cache is invalidated. Recovery time is typically 30-60 seconds for a 70B model.
Benchmarks: Scaling Efficiency
Llama 70B Throughput Scaling — A100-80GB
| Config | GPUs | Throughput (tok/s) | Per-GPU Efficiency | Scaling Efficiency |
|---|---|---|---|---|
| TP=2 | 2 | 3,200 | 1,600 | 100% (baseline) |
| TP=4 | 4 | 5,900 | 1,475 | 92.2% |
| TP=8 (1 node) | 8 | 10,400 | 1,300 | 81.3% |
| TP=4 PP=2 (2 nodes) | 8 | 9,800 | 1,225 | 76.6% |
| TP=8 (2 nodes) | 8 | 7,200 | 900 | 56.3% |
Per-GPU Efficiency (tok/s per GPU)
Key observations:
- TP=4 on a single node retains 92% efficiency — NVLink bandwidth is sufficient for the all-reduce volume.
- TP=8 on a single node drops to 81% due to NVLink congestion on the switch topology (DGX A100 uses NVSwitch, but each link shares bandwidth).
- TP=4 PP=2 across 2 nodes achieves 77% efficiency — the pipeline bubble at typical batch sizes costs roughly 15% and cross-node latency adds another 8%.
- TP=8 across 2 nodes is the worst configuration at 56% — cross-node all-reduce on every layer is devastating.
Practical Deployment Patterns
Pattern 1: Single Node, Maximum TP
Best for: interactive serving with low latency requirements.
# 4xA100, TP=4
python -m vllm.entrypoints.openai.api_server \
--model meta-llama/Llama-2-70b-hf \
--tensor-parallel-size 4 \
--max-model-len 4096 \
--gpu-memory-utilization 0.92
Pattern 2: Multi-Node, TP + PP
Best for: models that exceed single-node memory (405B+).
# 2 nodes, 8 GPUs total, TP=4 PP=2
# Node 0:
ray start --head --port=6379 --num-gpus=4
# Node 1:
ray start --address=node0:6379 --num-gpus=4
# Launch from head node:
python -m vllm.entrypoints.openai.api_server \
--model meta-llama/Meta-Llama-3.1-405B \
--tensor-parallel-size 4 \
--pipeline-parallel-size 2 \
--distributed-executor-backend ray
Pattern 3: Disaggregated Prefill/Decode
Best for: high-throughput batch processing with long contexts.
# Prefill cluster: 4 GPUs optimized for long-context processing
# Decode cluster: 4 GPUs optimized for token generation
# Requires vLLM's disaggregated serving mode
python -m vllm.entrypoints.openai.api_server \
--model meta-llama/Llama-2-70b-hf \
--tensor-parallel-size 4 \
--enable-disaggregated-prefill \
--prefill-tp-size 4 \
--decode-tp-size 4
Summary
vLLM v1’s distributed execution layer separates the control plane (Ray actors, scheduling metadata) from the data plane (NCCL tensor communication). Ray handles process lifecycle and placement, ensuring tensor-parallel workers share a node via STRICT_PACK placement groups. NCCL handles all tensor communication with zero-copy GPU-to-GPU transfers. Each transformer layer incurs exactly 2 all-reduce operations for tensor parallelism, and pipeline parallelism adds point-to-point send/recv between stages. The practical scaling limit is TP=4-8 within a node (92-81% efficiency) and PP=2 across nodes for models that require it. Cross-node tensor parallelism should be avoided when possible due to the 40%+ efficiency loss from all-reduce over InfiniBand.