Distributed Data Parallel (DDP) is the foundation of all large-scale training. Each GPU has a complete copy of the model and processes a different mini-batch. After the backward pass, gradients must be synchronized so all GPUs have identical gradient values before the optimizer step. The synchronization method — bucket-based all-reduce overlapped with the backward pass — determines whether distributed training scales efficiently or collapses under communication overhead.
Why DDP Exists
Single-GPU training is limited by memory and throughput. A 7B parameter model with FP32 gradients and AdamW optimizer states needs roughly 120 GB — already exceeding a single H100’s 80 GB HBM3. Even if it fits, training on billions of tokens at batch size 1 would take months.
DDP solves the throughput problem: N GPUs process N mini-batches in parallel, producing N gradient sets. Average them, and mathematically it’s identical to processing a batch N times larger on a single GPU. The global batch size scales linearly with GPU count while per-GPU computation stays constant.
The mathematical guarantee: for loss function and data batches :
This equality holds exactly — DDP introduces zero approximation error. The only question is how fast you can average the gradients.
Ring All-Reduce
The standard gradient synchronization algorithm: arrange N GPUs in a logical ring. The all-reduce computes the element-wise average of N tensors so every GPU ends up with the same result.
The ring algorithm proceeds in two phases:
Phase 1 — Reduce-Scatter ( steps): Each GPU divides its gradient tensor into N chunks. In each step, every GPU sends one chunk to its right neighbor and receives one chunk from its left neighbor. The received chunk is accumulated (summed) with the local chunk. After steps, each GPU holds the complete sum for one chunk.
Phase 2 — All-Gather ( steps): Each GPU broadcasts its fully-reduced chunk around the ring. After steps, every GPU has all N chunks, each containing the complete sum.
Total steps: . Communication volume per GPU:
The critical insight: volume per GPU is nearly independent of N. As , , so each GPU sends approximately regardless of cluster size. This is why ring all-reduce scales.
import torch
import torch.distributed as dist
def ring_all_reduce(tensor, world_size, rank):
"""
Ring all-reduce implementation (educational — use NCCL in practice).
Args:
tensor: local gradient tensor to reduce
world_size: total number of GPUs
rank: this GPU's rank (0 to world_size-1)
"""
# Split tensor into world_size chunks
chunks = list(tensor.chunk(world_size))
left_neighbor = (rank - 1) % world_size
right_neighbor = (rank + 1) % world_size
# Phase 1: Reduce-Scatter
# After this phase, chunks[rank] on each GPU contains the global sum for that chunk
send_buf = torch.empty_like(chunks[0])
recv_buf = torch.empty_like(chunks[0])
for step in range(world_size - 1):
# Send chunk (rank - step) to right, receive from left
send_idx = (rank - step) % world_size
recv_idx = (rank - step - 1) % world_size
send_buf.copy_(chunks[send_idx])
# Non-blocking send/recv
send_req = dist.isend(send_buf, right_neighbor)
dist.recv(recv_buf, left_neighbor)
send_req.wait()
# Accumulate received chunk
chunks[recv_idx] += recv_buf
# Phase 2: All-Gather
# Broadcast each GPU's fully-reduced chunk to all others
for step in range(world_size - 1):
send_idx = (rank - step + 1) % world_size
recv_idx = (rank - step) % world_size
send_buf.copy_(chunks[send_idx])
send_req = dist.isend(send_buf, right_neighbor)
dist.recv(recv_buf, left_neighbor)
send_req.wait()
chunks[recv_idx].copy_(recv_buf)
# Average (divide by world_size)
for chunk in chunks:
chunk /= world_size
# Reconstruct full tensor
tensor.copy_(torch.cat(chunks))
All-Reduce Time by Interconnect (70B Model, FP32 Gradients)
| GPUs | Volume/GPU | NVLink (900 GB/s) | IB NDR (50 GB/s) | PCIe Gen5 (28 GB/s) |
|---|---|---|---|---|
| 2 | 280 GB | 0.31s | 5.6s | 10.0s |
| 4 | 420 GB | 0.47s | 8.4s | 15.0s |
| 8 | 490 GB | 0.54s | 9.8s | 17.5s |
| 64 | 548 GB | 0.61s | 11.0s | 19.6s |
Bucket-Based Overlap with Backward
The naive approach: run backward, wait for all gradients, then all-reduce everything. This wastes time — the backward pass takes ~3x the forward pass, and all-reduce must wait for all of it.
The optimization: divide parameters into buckets and start all-reduce as soon as a bucket’s gradients are ready. Since backward computes gradients from the last layer first, buckets containing last-layer parameters finish first and can begin all-reduce while backward continues on earlier layers.
Bucket Construction
Parameters are grouped into buckets in reverse order (matching backward computation order). Default bucket size in PyTorch DDP: 25 MB. Larger buckets have better bandwidth utilization (less per-message overhead) but worse overlap (longer wait before starting all-reduce).
import torch
import torch.distributed as dist
class BucketDDP:
"""DDP implementation with bucket-based gradient synchronization."""
def __init__(self, model, bucket_size_mb=25, process_group=None):
self.model = model
self.bucket_size = bucket_size_mb * 1024 * 1024 # bytes
self.process_group = process_group or dist.group.WORLD
self.world_size = dist.get_world_size(self.process_group)
# Create buckets in reverse parameter order (last layer first)
self.buckets = self._create_buckets()
self.bucket_ready_count = {} # Track how many grads are ready per bucket
self.bucket_pending = set() # Buckets with active all-reduce
# Map each parameter to its bucket index
self.param_to_bucket = {}
for bucket_id, bucket in enumerate(self.buckets):
for param in bucket["params"]:
self.param_to_bucket[id(param)] = bucket_id
# Register gradient hooks
self._register_hooks()
def _create_buckets(self):
"""Group parameters into fixed-size buckets in reverse order."""
buckets = []
current_params = []
current_size = 0
# Reverse order: backward computes last-layer gradients first
for param in reversed(list(self.model.parameters())):
if not param.requires_grad:
continue
param_size = param.numel() * param.element_size()
if current_size + param_size > self.bucket_size and current_params:
buckets.append({
"params": current_params,
"size": current_size,
"num_params": len(current_params),
})
current_params = []
current_size = 0
current_params.append(param)
current_size += param_size
if current_params:
buckets.append({
"params": current_params,
"size": current_size,
"num_params": len(current_params),
})
return buckets
def _register_hooks(self):
"""Register backward hooks that trigger all-reduce when a bucket is ready."""
for param in self.model.parameters():
if not param.requires_grad:
continue
param.register_post_accumulate_grad_hook(self._grad_ready_hook)
def _grad_ready_hook(self, param):
"""Called when a parameter's gradient is computed during backward."""
bucket_id = self.param_to_bucket[id(param)]
if bucket_id not in self.bucket_ready_count:
self.bucket_ready_count[bucket_id] = 0
self.bucket_ready_count[bucket_id] += 1
# All gradients in this bucket are ready — launch all-reduce
if self.bucket_ready_count[bucket_id] == self.buckets[bucket_id]["num_params"]:
self._all_reduce_bucket(bucket_id)
def _all_reduce_bucket(self, bucket_id):
"""Flatten bucket gradients, all-reduce, unflatten."""
bucket = self.buckets[bucket_id]
grads = [p.grad.data for p in bucket["params"]]
# Flatten all gradients into a single contiguous tensor
flat_grads = torch.cat([g.flatten() for g in grads])
# All-reduce (average across GPUs) — non-blocking for overlap
handle = dist.all_reduce(
flat_grads, op=dist.ReduceOp.AVG,
group=self.process_group, async_op=True
)
self.bucket_pending.add((bucket_id, handle, flat_grads, grads))
def finish_sync(self):
"""Wait for all pending all-reduce operations and unflatten gradients."""
for bucket_id, handle, flat_grads, grads in self.bucket_pending:
handle.wait()
# Unflatten back to individual parameter gradients
offset = 0
for g in grads:
numel = g.numel()
g.copy_(flat_grads[offset:offset + numel].view_as(g))
offset += numel
self.bucket_pending.clear()
self.bucket_ready_count.clear()
The Overlap Timeline
For a 12-layer transformer on 4 GPUs:
Layer 12 backward: |=====|
Layer 11 backward: |=====|
Layer 10 backward: |=====|
...
Layer 1 backward: |=====|
Bucket 1 all-reduce: |========| (layers 12-11 grads)
Bucket 2 all-reduce: |========| (layers 10-9 grads)
...
Last bucket all-reduce: |========|
^ Only this
is not
overlapped
The backward pass takes roughly 3x the forward pass time (gradient computation through attention + FFN involves both activation recomputation and parameter gradient accumulation). All-reduce of early buckets runs concurrently with backward computation of later layers. For well-balanced models on NVLink, 80-95% of all-reduce time is hidden behind backward computation. The effective overhead: only 5-20% of training step time, not the full all-reduce duration.
Tuning Bucket Size
The bucket size trades off two concerns:
- Too small (e.g., 1 MB): Many small all-reduce operations. Each has fixed overhead (kernel launch, NCCL setup). Bandwidth utilization drops because the interconnect never reaches steady-state throughput for small transfers.
- Too large (e.g., 500 MB): Fewer operations but less overlap opportunity. Must wait for many layers of backward computation before starting the first all-reduce.
Bucket Size Impact on Training Throughput (7B Model, 8x H100)
| Bucket Size | Number of Buckets | Overlap Efficiency | All-Reduce Overhead | Step Time |
|---|---|---|---|---|
| 1 MB | 28,000 | 95% | +15% (kernel overhead) | 1.15x baseline |
| 10 MB | 2,800 | 92% | +3% | 0.97x baseline |
| 25 MB (default) | 1,120 | 88% | +5% | 1.00x baseline |
| 100 MB | 280 | 70% | +8% | 1.04x baseline |
| 500 MB | 56 | 40% | +18% | 1.12x baseline |
Gradient Compression
For clusters connected by InfiniBand (50 GB/s) rather than NVLink (900 GB/s), communication is the bottleneck even with bucket overlap. Gradient compression reduces the volume at the cost of some approximation.
FP16 Gradient All-Reduce
The simplest compression: cast gradients to FP16 before all-reduce, cast back to FP32 after. Halves communication volume.
def all_reduce_fp16(flat_grads_fp32):
"""All-reduce in FP16 to halve communication volume."""
flat_fp16 = flat_grads_fp32.half()
dist.all_reduce(flat_fp16, op=dist.ReduceOp.AVG)
flat_grads_fp32.copy_(flat_fp16.float())
This is safe for most training scenarios — gradient values are already noisy from mini-batch sampling, so the FP16 quantization error is negligible relative to the stochastic noise.
PowerSGD: Low-Rank Gradient Compression
For extreme bandwidth constraints, compress gradients using low-rank approximation. Represent each gradient matrix as a product of two smaller matrices.
class PowerSGD:
"""Low-rank gradient compression via power iteration."""
def __init__(self, rank=4, warmup_steps=100):
self.rank = rank
self.warmup_steps = warmup_steps
self.step = 0
self.error_feedback = {} # Accumulated compression error
def compress_and_allreduce(self, param_name, grad):
"""Compress gradient to rank-r, all-reduce, decompress."""
self.step += 1
# Skip compression during warmup (use full gradients)
if self.step <= self.warmup_steps:
dist.all_reduce(grad, op=dist.ReduceOp.AVG)
return
# Only compress 2D+ tensors (skip biases, norms)
if grad.dim() < 2:
dist.all_reduce(grad, op=dist.ReduceOp.AVG)
return
# Reshape to 2D for SVD-like decomposition
original_shape = grad.shape
M = grad.reshape(grad.shape[0], -1) # (m, n)
m, n = M.shape
# Add error feedback from previous step
if param_name in self.error_feedback:
M = M + self.error_feedback[param_name]
# Power iteration: approximate top-r singular vectors
# Q is (n, rank) — much smaller than M when rank << min(m, n)
Q = torch.randn(n, self.rank, device=grad.device)
Q, _ = torch.linalg.qr(Q)
# One step of power iteration
P = M @ Q # (m, rank)
Q = M.T @ P # (n, rank)
Q, _ = torch.linalg.qr(Q)
P = M @ Q # (m, rank)
# All-reduce the small matrices P and Q instead of full M
# Communication: (m + n) * rank * 4 bytes instead of m * n * 4 bytes
dist.all_reduce(P, op=dist.ReduceOp.AVG)
dist.all_reduce(Q, op=dist.ReduceOp.AVG)
# Reconstruct approximation
M_approx = P @ Q.T
# Store error for next step (error feedback)
self.error_feedback[param_name] = M - M_approx
# Write back
grad.copy_(M_approx.reshape(original_shape))
Compression ratio: for a (4096, 4096) weight matrix at rank 4, communication drops from values to values — a 512x reduction. The error feedback mechanism ensures convergence matches uncompressed training over enough steps.
Communication Volume per GPU: Compression Methods (70B Model, 8 GPUs)
(GB per step)When DDP Is Not Enough
DDP requires each GPU to hold a complete copy of the model. For Llama 70B: 140 GB weights (FP16) + 140 GB gradients (FP16) + 560 GB optimizer states (FP32 master weights + momentum + variance) = 840 GB per GPU. No single GPU has this capacity.
Solutions that build on DDP’s gradient synchronization:
- ZeRO-1: Partition optimizer states across GPUs. Each GPU stores 1/N of the optimizer state. All-reduce still needed for gradients. Memory per GPU drops from 840 GB to ~350 GB.
- ZeRO-2 (+ gradient sharding): Also partition gradients. Reduce-scatter instead of all-reduce — each GPU only keeps its shard. Memory drops to ~210 GB.
- ZeRO-3 / FSDP: Partition everything — parameters, gradients, optimizer. All-gather parameters on demand for each forward/backward computation. Memory per GPU drops to ~105 GB.
- TP + DDP: Tensor parallelism within a node (split individual layers across GPUs), DDP across nodes (replicate the TP groups). This is the standard approach for large training runs.
Memory per GPU: DDP vs ZeRO Stages (70B Model, 8 GPUs)
(GB)Use DDP when the model fits on a single GPU with room for gradients and optimizer states. DDP is simpler, has less communication overhead, and is easier to debug. Switch to FSDP (ZeRO-3) only when memory requires it — FSDP adds all-gather overhead for every forward/backward pass. For models under ~13B parameters on H100 GPUs, DDP is usually the right choice.
Practical DDP Configuration
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
def setup_ddp(rank, world_size):
"""Standard DDP setup."""
dist.init_process_group(
backend="nccl", # NCCL for GPU, gloo for CPU
init_method="env://", # Use MASTER_ADDR and MASTER_PORT env vars
rank=rank,
world_size=world_size,
)
torch.cuda.set_device(rank)
def create_ddp_model(model, rank):
"""Wrap model in DDP with recommended settings."""
model = model.to(rank)
ddp_model = DDP(
model,
device_ids=[rank],
output_device=rank,
bucket_cap_mb=25, # Default bucket size
find_unused_parameters=False, # Set True only if needed (adds overhead)
gradient_as_bucket_view=True, # Avoid gradient copy — use bucket memory directly
static_graph=True, # Enable optimizations for fixed computation graphs
)
return ddp_model
# Training loop
def train_step(ddp_model, optimizer, data, target):
optimizer.zero_grad()
output = ddp_model(data)
loss = torch.nn.functional.cross_entropy(output, target)
loss.backward() # DDP hooks trigger all-reduce automatically
optimizer.step()
return loss.item()
Key flags:
gradient_as_bucket_view=True: Avoids copying gradients into bucket buffers — the gradient tensors directly reference the bucket memory. Saves memory and one memcpy per backward pass.static_graph=True: Tells DDP that the computation graph doesn’t change between iterations (no conditional branches). Enables bucket reordering and fusion optimizations.find_unused_parameters=False: Default. Setting to True adds a traversal to detect unused parameters but adds overhead. Only enable for models with dynamic branching where some parameters don’t receive gradients every step.