Part of Series Transformer Anatomy 20 of 36
1 The Transformer Attention Mechanism: From First Principles to Performance Reality 2 Tokenization and BPE: How LLMs See Text — From Characters to Subwords 3 Embedding Layers: The Geometry of Meaning in LLMs 4 Position Encoding in Transformers: From Sinusoidal to RoPE, ALiBi, and Long-Context Scaling 5 Softmax Numerics: Log-Sum-Exp, Temperature, and Why Numerical Stability Matters 6 Attention Variants Compared: MHA, MQA, GQA, and MLA 7 Normalization in Transformers: LayerNorm, RMSNorm, and the Training Stability Story 8 Residual Connections and Skip Paths: Why Transformers Can Be 100 Layers Deep 9 The Feed-Forward Network: SwiGLU, Gating, and the FFN-as-Memory Hypothesis 10 Mixture of Experts: Why Conditional Computation Is the Path to Trillion-Parameter Models 11 The Output Head: Unembedding, Weight Tying, and Vocabulary Projection 12 Cross-Entropy Loss: How the Loss Function Shapes What an LLM Learns 13 Encoder vs Decoder: Why Decoder-Only Won 14 DeepSeek V3: How 671B Parameters Trained for the Cost of a 70B Dense Model 15 Building a Transformer From Scratch: Putting Every Component Together 16 Gradient Flow and Backpropagation Through Transformers: What Happens During the Backward Pass 17 Weight Initialization: Xavier, Kaiming, and Why mu-P Changes Everything for Large Models 18 Training Loop Anatomy: Forward Pass, Loss Computation, Backward Pass, Optimizer Step 19 Learning Rate Schedules: Warmup, Cosine Decay, and Why WSD Changes Everything 20 Distributed Data Parallel: Gradient Synchronization, Bucket All-Reduce, and Overlap with Backward 21 Activation Functions Deep Dive: ReLU, GELU, SiLU, and Why Each Matters for Transformers 22 Dropout and Regularization in Transformers: Where It Helps, Where It Hurts 23 Attention Masking: Causal, Bidirectional, Sliding Window, Block Sparse, and Custom Patterns 24 Mixed Precision Training: BF16 Forward, FP32 Master Weights, and the Precision Hierarchy 25 Token Prediction Heads: Next-Token, Multi-Token, and Classifier Heads 26 Mixture of Depths: Conditional Computation Per Layer for Faster Inference 27 Sparse Attention Patterns: Local, Strided, Hash-Based, and Learnable Sparsity 28 Rotary Position Embedding: The Complete Mathematical Derivation 29 Knowledge Distillation: Training Small Models to Match Large Ones 30 Model Merging: Weight Averaging, TIES, DARE, and Evolutionary Search 31 Pruning at Scale: SparseGPT, Wanda, and Structured Removal of Redundant Parameters 32 The Transformer in 2026: What Changed, What Stayed, and What's Next 33 Data Loading: Tokenization, Sequence Packing, Padding Strategies, and Attention Masks 34 The FlashAttention Backward Pass: Recomputation, Memory Savings, and the 33% Compute Overhead 35 The Inference Engine: Token Generation Loop, KV Cache Management, and Autoregressive Decoding 36 Tensor Parallelism Implementation: Splitting Weights Across GPUs for Training and Inference

Distributed Data Parallel (DDP) is the foundation of all large-scale training. Each GPU has a complete copy of the model and processes a different mini-batch. After the backward pass, gradients must be synchronized so all GPUs have identical gradient values before the optimizer step. The synchronization method — bucket-based all-reduce overlapped with the backward pass — determines whether distributed training scales efficiently or collapses under communication overhead.

Why DDP Exists

Single-GPU training is limited by memory and throughput. A 7B parameter model with FP32 gradients and AdamW optimizer states needs roughly 120 GB — already exceeding a single H100’s 80 GB HBM3. Even if it fits, training on billions of tokens at batch size 1 would take months.

DDP solves the throughput problem: N GPUs process N mini-batches in parallel, producing N gradient sets. Average them, and mathematically it’s identical to processing a batch N times larger on a single GPU. The global batch size scales linearly with GPU count while per-GPU computation stays constant.

The mathematical guarantee: for loss function LL and data batches B1,...,BNB_1, ..., B_N:

Lglobal=1Ni=1NL(Bi)=L(i=1NBi)\nabla L_{\text{global}} = \frac{1}{N} \sum_{i=1}^{N} \nabla L(B_i) = \nabla L\left(\bigcup_{i=1}^{N} B_i\right)

This equality holds exactly — DDP introduces zero approximation error. The only question is how fast you can average the gradients.

Ring All-Reduce

The standard gradient synchronization algorithm: arrange N GPUs in a logical ring. The all-reduce computes the element-wise average of N tensors so every GPU ends up with the same result.

The ring algorithm proceeds in two phases:

Phase 1 — Reduce-Scatter (N1N-1 steps): Each GPU divides its gradient tensor into N chunks. In each step, every GPU sends one chunk to its right neighbor and receives one chunk from its left neighbor. The received chunk is accumulated (summed) with the local chunk. After N1N-1 steps, each GPU holds the complete sum for one chunk.

Phase 2 — All-Gather (N1N-1 steps): Each GPU broadcasts its fully-reduced chunk around the ring. After N1N-1 steps, every GPU has all N chunks, each containing the complete sum.

Total steps: 2(N1)2(N-1). Communication volume per GPU:

V=2×N1N×params×bytes_per_paramV = 2 \times \frac{N-1}{N} \times |\text{params}| \times \text{bytes\_per\_param}

The critical insight: volume per GPU is nearly independent of N. As NN \to \infty, N1N1\frac{N-1}{N} \to 1, so each GPU sends approximately 2×params×bytes_per_param2 \times |\text{params}| \times \text{bytes\_per\_param} regardless of cluster size. This is why ring all-reduce scales.

import torch
import torch.distributed as dist

def ring_all_reduce(tensor, world_size, rank):
    """
    Ring all-reduce implementation (educational — use NCCL in practice).

    Args:
        tensor: local gradient tensor to reduce
        world_size: total number of GPUs
        rank: this GPU's rank (0 to world_size-1)
    """
    # Split tensor into world_size chunks
    chunks = list(tensor.chunk(world_size))

    left_neighbor = (rank - 1) % world_size
    right_neighbor = (rank + 1) % world_size

    # Phase 1: Reduce-Scatter
    # After this phase, chunks[rank] on each GPU contains the global sum for that chunk
    send_buf = torch.empty_like(chunks[0])
    recv_buf = torch.empty_like(chunks[0])

    for step in range(world_size - 1):
        # Send chunk (rank - step) to right, receive from left
        send_idx = (rank - step) % world_size
        recv_idx = (rank - step - 1) % world_size

        send_buf.copy_(chunks[send_idx])

        # Non-blocking send/recv
        send_req = dist.isend(send_buf, right_neighbor)
        dist.recv(recv_buf, left_neighbor)
        send_req.wait()

        # Accumulate received chunk
        chunks[recv_idx] += recv_buf

    # Phase 2: All-Gather
    # Broadcast each GPU's fully-reduced chunk to all others
    for step in range(world_size - 1):
        send_idx = (rank - step + 1) % world_size
        recv_idx = (rank - step) % world_size

        send_buf.copy_(chunks[send_idx])

        send_req = dist.isend(send_buf, right_neighbor)
        dist.recv(recv_buf, left_neighbor)
        send_req.wait()

        chunks[recv_idx].copy_(recv_buf)

    # Average (divide by world_size)
    for chunk in chunks:
        chunk /= world_size

    # Reconstruct full tensor
    tensor.copy_(torch.cat(chunks))
📊

All-Reduce Time by Interconnect (70B Model, FP32 Gradients)

GPUsVolume/GPUNVLink (900 GB/s)IB NDR (50 GB/s)PCIe Gen5 (28 GB/s)
2 280 GB 0.31s 5.6s 10.0s
4 420 GB 0.47s 8.4s 15.0s
8 490 GB 0.54s 9.8s 17.5s
64 548 GB 0.61s 11.0s 19.6s
Note: Volume approaches 2x model size as N grows. NVLink makes all-reduce nearly free. PCIe makes it prohibitive for large models.

Bucket-Based Overlap with Backward

The naive approach: run backward, wait for all gradients, then all-reduce everything. This wastes time — the backward pass takes ~3x the forward pass, and all-reduce must wait for all of it.

The optimization: divide parameters into buckets and start all-reduce as soon as a bucket’s gradients are ready. Since backward computes gradients from the last layer first, buckets containing last-layer parameters finish first and can begin all-reduce while backward continues on earlier layers.

Bucket Construction

Parameters are grouped into buckets in reverse order (matching backward computation order). Default bucket size in PyTorch DDP: 25 MB. Larger buckets have better bandwidth utilization (less per-message overhead) but worse overlap (longer wait before starting all-reduce).

import torch
import torch.distributed as dist

class BucketDDP:
    """DDP implementation with bucket-based gradient synchronization."""

    def __init__(self, model, bucket_size_mb=25, process_group=None):
        self.model = model
        self.bucket_size = bucket_size_mb * 1024 * 1024  # bytes
        self.process_group = process_group or dist.group.WORLD
        self.world_size = dist.get_world_size(self.process_group)

        # Create buckets in reverse parameter order (last layer first)
        self.buckets = self._create_buckets()
        self.bucket_ready_count = {}  # Track how many grads are ready per bucket
        self.bucket_pending = set()   # Buckets with active all-reduce

        # Map each parameter to its bucket index
        self.param_to_bucket = {}
        for bucket_id, bucket in enumerate(self.buckets):
            for param in bucket["params"]:
                self.param_to_bucket[id(param)] = bucket_id

        # Register gradient hooks
        self._register_hooks()

    def _create_buckets(self):
        """Group parameters into fixed-size buckets in reverse order."""
        buckets = []
        current_params = []
        current_size = 0

        # Reverse order: backward computes last-layer gradients first
        for param in reversed(list(self.model.parameters())):
            if not param.requires_grad:
                continue
            param_size = param.numel() * param.element_size()

            if current_size + param_size > self.bucket_size and current_params:
                buckets.append({
                    "params": current_params,
                    "size": current_size,
                    "num_params": len(current_params),
                })
                current_params = []
                current_size = 0

            current_params.append(param)
            current_size += param_size

        if current_params:
            buckets.append({
                "params": current_params,
                "size": current_size,
                "num_params": len(current_params),
            })

        return buckets

    def _register_hooks(self):
        """Register backward hooks that trigger all-reduce when a bucket is ready."""
        for param in self.model.parameters():
            if not param.requires_grad:
                continue
            param.register_post_accumulate_grad_hook(self._grad_ready_hook)

    def _grad_ready_hook(self, param):
        """Called when a parameter's gradient is computed during backward."""
        bucket_id = self.param_to_bucket[id(param)]

        if bucket_id not in self.bucket_ready_count:
            self.bucket_ready_count[bucket_id] = 0
        self.bucket_ready_count[bucket_id] += 1

        # All gradients in this bucket are ready — launch all-reduce
        if self.bucket_ready_count[bucket_id] == self.buckets[bucket_id]["num_params"]:
            self._all_reduce_bucket(bucket_id)

    def _all_reduce_bucket(self, bucket_id):
        """Flatten bucket gradients, all-reduce, unflatten."""
        bucket = self.buckets[bucket_id]
        grads = [p.grad.data for p in bucket["params"]]

        # Flatten all gradients into a single contiguous tensor
        flat_grads = torch.cat([g.flatten() for g in grads])

        # All-reduce (average across GPUs) — non-blocking for overlap
        handle = dist.all_reduce(
            flat_grads, op=dist.ReduceOp.AVG,
            group=self.process_group, async_op=True
        )
        self.bucket_pending.add((bucket_id, handle, flat_grads, grads))

    def finish_sync(self):
        """Wait for all pending all-reduce operations and unflatten gradients."""
        for bucket_id, handle, flat_grads, grads in self.bucket_pending:
            handle.wait()
            # Unflatten back to individual parameter gradients
            offset = 0
            for g in grads:
                numel = g.numel()
                g.copy_(flat_grads[offset:offset + numel].view_as(g))
                offset += numel

        self.bucket_pending.clear()
        self.bucket_ready_count.clear()

The Overlap Timeline

For a 12-layer transformer on 4 GPUs:

Layer 12 backward: |=====|
Layer 11 backward:       |=====|
Layer 10 backward:             |=====|
...
Layer 1 backward:                                           |=====|

Bucket 1 all-reduce:          |========|      (layers 12-11 grads)
Bucket 2 all-reduce:                   |========|  (layers 10-9 grads)
...
Last bucket all-reduce:                                            |========|
                                                                    ^ Only this
                                                                      is not
                                                                      overlapped
Overlap Hides 80-95% of Communication

The backward pass takes roughly 3x the forward pass time (gradient computation through attention + FFN involves both activation recomputation and parameter gradient accumulation). All-reduce of early buckets runs concurrently with backward computation of later layers. For well-balanced models on NVLink, 80-95% of all-reduce time is hidden behind backward computation. The effective overhead: only 5-20% of training step time, not the full all-reduce duration.

Tuning Bucket Size

The bucket size trades off two concerns:

  • Too small (e.g., 1 MB): Many small all-reduce operations. Each has fixed overhead (kernel launch, NCCL setup). Bandwidth utilization drops because the interconnect never reaches steady-state throughput for small transfers.
  • Too large (e.g., 500 MB): Fewer operations but less overlap opportunity. Must wait for many layers of backward computation before starting the first all-reduce.
📊

Bucket Size Impact on Training Throughput (7B Model, 8x H100)

Bucket SizeNumber of BucketsOverlap EfficiencyAll-Reduce OverheadStep Time
1 MB 28,000 95% +15% (kernel overhead) 1.15x baseline
10 MB 2,800 92% +3% 0.97x baseline
25 MB (default) 1,120 88% +5% 1.00x baseline
100 MB 280 70% +8% 1.04x baseline
500 MB 56 40% +18% 1.12x baseline
Note: Sweet spot is 10-50 MB. PyTorch default of 25 MB is a good starting point. Profile with torch.profiler to tune for your specific model.

Gradient Compression

For clusters connected by InfiniBand (50 GB/s) rather than NVLink (900 GB/s), communication is the bottleneck even with bucket overlap. Gradient compression reduces the volume at the cost of some approximation.

FP16 Gradient All-Reduce

The simplest compression: cast gradients to FP16 before all-reduce, cast back to FP32 after. Halves communication volume.

def all_reduce_fp16(flat_grads_fp32):
    """All-reduce in FP16 to halve communication volume."""
    flat_fp16 = flat_grads_fp32.half()
    dist.all_reduce(flat_fp16, op=dist.ReduceOp.AVG)
    flat_grads_fp32.copy_(flat_fp16.float())

This is safe for most training scenarios — gradient values are already noisy from mini-batch sampling, so the FP16 quantization error is negligible relative to the stochastic noise.

PowerSGD: Low-Rank Gradient Compression

For extreme bandwidth constraints, compress gradients using low-rank approximation. Represent each gradient matrix as a product of two smaller matrices.

class PowerSGD:
    """Low-rank gradient compression via power iteration."""

    def __init__(self, rank=4, warmup_steps=100):
        self.rank = rank
        self.warmup_steps = warmup_steps
        self.step = 0
        self.error_feedback = {}  # Accumulated compression error

    def compress_and_allreduce(self, param_name, grad):
        """Compress gradient to rank-r, all-reduce, decompress."""
        self.step += 1

        # Skip compression during warmup (use full gradients)
        if self.step <= self.warmup_steps:
            dist.all_reduce(grad, op=dist.ReduceOp.AVG)
            return

        # Only compress 2D+ tensors (skip biases, norms)
        if grad.dim() < 2:
            dist.all_reduce(grad, op=dist.ReduceOp.AVG)
            return

        # Reshape to 2D for SVD-like decomposition
        original_shape = grad.shape
        M = grad.reshape(grad.shape[0], -1)  # (m, n)
        m, n = M.shape

        # Add error feedback from previous step
        if param_name in self.error_feedback:
            M = M + self.error_feedback[param_name]

        # Power iteration: approximate top-r singular vectors
        # Q is (n, rank) — much smaller than M when rank << min(m, n)
        Q = torch.randn(n, self.rank, device=grad.device)
        Q, _ = torch.linalg.qr(Q)

        # One step of power iteration
        P = M @ Q          # (m, rank)
        Q = M.T @ P        # (n, rank)
        Q, _ = torch.linalg.qr(Q)
        P = M @ Q           # (m, rank)

        # All-reduce the small matrices P and Q instead of full M
        # Communication: (m + n) * rank * 4 bytes instead of m * n * 4 bytes
        dist.all_reduce(P, op=dist.ReduceOp.AVG)
        dist.all_reduce(Q, op=dist.ReduceOp.AVG)

        # Reconstruct approximation
        M_approx = P @ Q.T

        # Store error for next step (error feedback)
        self.error_feedback[param_name] = M - M_approx

        # Write back
        grad.copy_(M_approx.reshape(original_shape))

Compression ratio: for a (4096, 4096) weight matrix at rank 4, communication drops from 40962=16.7M4096^2 = 16.7M values to (4096+4096)×4=32K(4096 + 4096) \times 4 = 32K values — a 512x reduction. The error feedback mechanism ensures convergence matches uncompressed training over enough steps.

Communication Volume per GPU: Compression Methods (70B Model, 8 GPUs)

(GB per step)
FP32 (no compression) 490 GB
490 GB per step
FP16 gradients 245 GB
245 GB per step
PowerSGD rank-4 ~12 GB
12 GB per step
TopK (1% sparsity) ~5 GB
5 GB per step

When DDP Is Not Enough

DDP requires each GPU to hold a complete copy of the model. For Llama 70B: 140 GB weights (FP16) + 140 GB gradients (FP16) + 560 GB optimizer states (FP32 master weights + momentum + variance) = 840 GB per GPU. No single GPU has this capacity.

Solutions that build on DDP’s gradient synchronization:

  • ZeRO-1: Partition optimizer states across GPUs. Each GPU stores 1/N of the optimizer state. All-reduce still needed for gradients. Memory per GPU drops from 840 GB to ~350 GB.
  • ZeRO-2 (+ gradient sharding): Also partition gradients. Reduce-scatter instead of all-reduce — each GPU only keeps its shard. Memory drops to ~210 GB.
  • ZeRO-3 / FSDP: Partition everything — parameters, gradients, optimizer. All-gather parameters on demand for each forward/backward computation. Memory per GPU drops to ~105 GB.
  • TP + DDP: Tensor parallelism within a node (split individual layers across GPUs), DDP across nodes (replicate the TP groups). This is the standard approach for large training runs.

Memory per GPU: DDP vs ZeRO Stages (70B Model, 8 GPUs)

(GB)
DDP (full replicas) 840 GB (impossible)
840 GB
ZeRO-1 (shard optimizer) 350 GB
350 GB
ZeRO-2 (+ shard grads) 210 GB
210 GB
ZeRO-3 / FSDP 105 GB (fits H100)
105 GB
ℹ️ DDP vs FSDP: When to Use Which

Use DDP when the model fits on a single GPU with room for gradients and optimizer states. DDP is simpler, has less communication overhead, and is easier to debug. Switch to FSDP (ZeRO-3) only when memory requires it — FSDP adds all-gather overhead for every forward/backward pass. For models under ~13B parameters on H100 GPUs, DDP is usually the right choice.

Practical DDP Configuration

import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

def setup_ddp(rank, world_size):
    """Standard DDP setup."""
    dist.init_process_group(
        backend="nccl",           # NCCL for GPU, gloo for CPU
        init_method="env://",     # Use MASTER_ADDR and MASTER_PORT env vars
        rank=rank,
        world_size=world_size,
    )
    torch.cuda.set_device(rank)

def create_ddp_model(model, rank):
    """Wrap model in DDP with recommended settings."""
    model = model.to(rank)
    ddp_model = DDP(
        model,
        device_ids=[rank],
        output_device=rank,
        bucket_cap_mb=25,              # Default bucket size
        find_unused_parameters=False,  # Set True only if needed (adds overhead)
        gradient_as_bucket_view=True,  # Avoid gradient copy — use bucket memory directly
        static_graph=True,             # Enable optimizations for fixed computation graphs
    )
    return ddp_model

# Training loop
def train_step(ddp_model, optimizer, data, target):
    optimizer.zero_grad()
    output = ddp_model(data)
    loss = torch.nn.functional.cross_entropy(output, target)
    loss.backward()  # DDP hooks trigger all-reduce automatically
    optimizer.step()
    return loss.item()

Key flags:

  • gradient_as_bucket_view=True: Avoids copying gradients into bucket buffers — the gradient tensors directly reference the bucket memory. Saves memory and one memcpy per backward pass.
  • static_graph=True: Tells DDP that the computation graph doesn’t change between iterations (no conditional branches). Enables bucket reordering and fusion optimizations.
  • find_unused_parameters=False: Default. Setting to True adds a traversal to detect unused parameters but adds overhead. Only enable for models with dynamic branching where some parameters don’t receive gradients every step.