DDP replicates the entire model on every GPU. For Llama 70B (140 GB FP16), that means every GPU needs 140 GB — impossible on a single 80 GB H100. Tensor Parallelism (TP) splits each layer’s weight matrices across GPUs. Each GPU holds of the weights and performs of the computation. Communication (all-reduce) synchronizes the results.
Column-Parallel Linear
Split the weight matrix by columns. Each GPU computes a partial output:
import torch
import torch.nn as nn
import torch.distributed as dist
class ColumnParallelLinear(nn.Module):
"""Split weight columns across TP ranks. No communication in forward.
Full weight: [in_features, out_features]
Per-GPU weight: [in_features, out_features // tp_size]
"""
def __init__(self, in_features, out_features, tp_size, tp_rank, bias=False):
super().__init__()
assert out_features % tp_size == 0
self.out_per_rank = out_features // tp_size
self.weight = nn.Parameter(
torch.randn(in_features, self.out_per_rank) * 0.02
)
self.bias = nn.Parameter(torch.zeros(self.out_per_rank)) if bias else None
def forward(self, x):
# x: [B, S, in_features] — same on all GPUs
# output: [B, S, out_features // tp_size] — different per GPU
output = x @ self.weight
if self.bias is not None:
output = output + self.bias
return output
No communication needed: each GPU multiplies the full input by its column shard independently.
Row-Parallel Linear
Split the weight matrix by rows. Each GPU gets a partial input and computes a partial result. All-reduce sums the partial results:
class RowParallelLinear(nn.Module):
"""Split weight rows across TP ranks. All-reduce in forward.
Full weight: [in_features, out_features]
Per-GPU weight: [in_features // tp_size, out_features]
"""
def __init__(self, in_features, out_features, tp_size, tp_rank, bias=False):
super().__init__()
assert in_features % tp_size == 0
self.in_per_rank = in_features // tp_size
self.weight = nn.Parameter(
torch.randn(self.in_per_rank, out_features) * 0.02
)
self.bias = nn.Parameter(torch.zeros(out_features)) if bias else None
self.tp_group = None # Set during initialization
def forward(self, x):
# x: [B, S, in_features // tp_size] — partial input (from column-parallel)
# Partial output: [B, S, out_features]
output = x @ self.weight
# All-reduce: sum partial outputs across all TP ranks
dist.all_reduce(output, op=dist.ReduceOp.SUM, group=self.tp_group)
if self.bias is not None:
output = output + self.bias
return output
The Megatron Pattern
The key insight from Megatron-LM: pair column-parallel (first linear) with row-parallel (second linear). This requires only ONE all-reduce per sublayer:
Megatron TP Pattern for FFN
class TPSwiGLUFFN(nn.Module):
"""Tensor-parallel SwiGLU FFN using Megatron pattern."""
def __init__(self, d_model, d_ff, tp_size, tp_rank):
super().__init__()
# Column-parallel: split output dim
self.w1 = ColumnParallelLinear(d_model, d_ff, tp_size, tp_rank)
self.w3 = ColumnParallelLinear(d_model, d_ff, tp_size, tp_rank)
# Row-parallel: split input dim, all-reduce output
self.w2 = RowParallelLinear(d_ff, d_model, tp_size, tp_rank)
def forward(self, x):
# x: [B, S, d_model] — same on all GPUs
gate = torch.nn.functional.silu(self.w1(x)) # [B, S, d_ff/N]
up = self.w3(x) # [B, S, d_ff/N]
hidden = gate * up # [B, S, d_ff/N]
output = self.w2(hidden) # [B, S, d_model] — all-reduced
return output
Same pattern for attention: QKV projection is column-parallel (split heads across GPUs), output projection is row-parallel (all-reduce).
class TPAttention(nn.Module):
"""Tensor-parallel attention using Megatron pattern."""
def __init__(self, d_model, n_heads, n_kv_heads, tp_size, tp_rank):
super().__init__()
assert n_heads % tp_size == 0
assert n_kv_heads % tp_size == 0
self.heads_per_rank = n_heads // tp_size
self.kv_heads_per_rank = n_kv_heads // tp_size
d_head = d_model // n_heads
# Column-parallel: each GPU gets a subset of heads
self.W_q = ColumnParallelLinear(d_model, self.heads_per_rank * d_head, tp_size, tp_rank)
self.W_k = ColumnParallelLinear(d_model, self.kv_heads_per_rank * d_head, tp_size, tp_rank)
self.W_v = ColumnParallelLinear(d_model, self.kv_heads_per_rank * d_head, tp_size, tp_rank)
# Row-parallel: all-reduce after output projection
self.W_o = RowParallelLinear(self.heads_per_rank * d_head, d_model, tp_size, tp_rank)
def forward(self, x, kv_cache=None):
Q = self.W_q(x) # [B, S, heads_per_rank * d_head]
K = self.W_k(x) # [B, S, kv_heads_per_rank * d_head]
V = self.W_v(x) # [B, S, kv_heads_per_rank * d_head]
# ... attention computation on local heads ...
output = self.W_o(attn_output) # All-reduced to [B, S, d_model]
return output
Communication Cost
Each all-reduce on GPUs transfers bytes where is the message size. Per transformer layer: 2 all-reduces (attention + FFN).
For Llama 70B at TP=8, batch_size x seq_len = 4096 tokens, d_model=8192, FP16:
TP Communication Overhead (Llama 70B, B*S=4096)
| TP Size | All-Reduce Volume | Time (NVLink) | % of Forward Pass |
|---|---|---|---|
| TP=2 | 64 MB/layer | 0.071 ms | 0.6% |
| TP=4 | 96 MB/layer | 0.107 ms | 1.7% |
| TP=8 (1 node) | 112 MB/layer | 0.124 ms | 4.0% |
| TP=8 (InfiniBand) | 112 MB/layer | 2.24 ms | 72% (too slow!) |
At TP=8 on NVLink: 4% overhead (acceptable). At TP=8 on InfiniBand: 72% overhead (unacceptable). This is why TP is always intra-node (8 GPUs connected by NVLink) and pipeline parallelism is used for inter-node communication (less frequent, larger messages).
When to Use TP vs DDP vs PP
Parallelism Strategy Decision
| Model Size | GPUs Available | Interconnect | Recommended Strategy |
|---|---|---|---|
| 7B | 1-8 | Any | DDP only (model fits on 1 GPU) |
| 70B | 8 (1 node) | NVLink | TP=8 (split across node) |
| 70B | 64 (8 nodes) | NVLink + IB | TP=8 intra-node, DDP=8 across nodes |
| 405B | 64 (8 nodes) | NVLink + IB | TP=8, PP=4, DP=2 |
| 671B MoE | 2048 | NVLink + IB | TP=4, EP=64, PP=4, DP=2 |