The tokenizer is the first transformation applied to every piece of text that enters a language model and the last transformation applied to every piece of text that leaves it. A bad tokenizer wastes context window on redundant tokens, fragments common words, and creates artificial difficulty for the model. Llama 3’s tokenizer uses 128,256 tokens and achieves 3.5 bytes per token on English text. GPT-4’s cl100k_base uses 100,256 tokens and averages 3.7 bytes per token. The difference in compression ratio directly translates to the effective context length: 15% better compression means 15% more text fits in the same context window.
This post covers the complete process of training a BPE tokenizer from scratch: the algorithm, vocabulary size selection, byte-level fallback, compression ratio optimization, and the practical differences between SentencePiece and HuggingFace tokenizers.
The BPE Algorithm
Core Concept
Byte Pair Encoding starts with a base vocabulary of individual bytes (256 entries) and iteratively merges the most frequent adjacent pair into a new token. After merges, the vocabulary has entries.
from collections import Counter, defaultdict
class BPETrainer:
"""
Train a BPE tokenizer from scratch.
This is the actual algorithm, not a wrapper around a library.
"""
def __init__(self, vocab_size=32000):
self.vocab_size = vocab_size
self.merges = [] # Ordered list of (pair, new_token)
self.vocab = {} # token_id -> bytes
def _get_initial_tokens(self, text_bytes):
"""Convert bytes to initial token sequence (one token per byte)."""
return list(text_bytes)
def _count_pairs(self, sequences):
"""Count frequency of adjacent token pairs across all sequences."""
pair_counts = Counter()
for seq in sequences:
for i in range(len(seq) - 1):
pair_counts[(seq[i], seq[i + 1])] += 1
return pair_counts
def _merge_pair(self, sequences, pair, new_token):
"""Replace all occurrences of pair with new_token in all sequences."""
merged = []
for seq in sequences:
new_seq = []
i = 0
while i < len(seq):
if i < len(seq) - 1 and (seq[i], seq[i + 1]) == pair:
new_seq.append(new_token)
i += 2
else:
new_seq.append(seq[i])
i += 1
merged.append(new_seq)
return merged
def train(self, corpus_bytes):
"""
Train BPE on raw bytes.
Args:
corpus_bytes: Raw byte content of training corpus
"""
# Initialize vocabulary with byte values
for i in range(256):
self.vocab[i] = bytes([i])
# Split corpus into "words" (sequences between whitespace)
# Each word becomes a separate sequence for pair counting
words = corpus_bytes.split()
sequences = [self._get_initial_tokens(w) for w in words]
# Count how many times each word appears (for efficiency)
word_counts = Counter()
for w in words:
word_counts[tuple(w)] += 1
next_token_id = 256
num_merges = self.vocab_size - 256
for step in range(num_merges):
# Count pairs weighted by word frequency
pair_counts = Counter()
for seq_idx, seq in enumerate(sequences):
original_word = tuple(words[seq_idx]) if seq_idx < len(words) else None
weight = 1
for i in range(len(seq) - 1):
pair_counts[(seq[i], seq[i + 1])] += weight
if not pair_counts:
break
# Find the most frequent pair
best_pair = pair_counts.most_common(1)[0][0]
best_count = pair_counts[best_pair]
# Create new token
new_token_bytes = self.vocab[best_pair[0]] + self.vocab[best_pair[1]]
self.vocab[next_token_id] = new_token_bytes
self.merges.append((best_pair, next_token_id))
# Replace all occurrences of this pair
sequences = self._merge_pair(sequences, best_pair, next_token_id)
next_token_id += 1
if step % 1000 == 0:
print(f"Step {step}/{num_merges}: merged {best_pair} -> "
f"token {next_token_id - 1} "
f"('{new_token_bytes.decode('utf-8', errors='replace')}'), "
f"count={best_count}")
return self.merges, self.vocab
The Efficient Version: Counting with Word Frequencies
The naive algorithm above is where is corpus size and is number of merges. The standard optimization pre-computes word frequencies so that identical words are counted once:
class EfficientBPETrainer:
"""
BPE trainer that uses word frequency counts.
Reduces O(n*k) to O(V*k) where V is unique word count.
"""
def __init__(self, vocab_size=32000):
self.vocab_size = vocab_size
self.merges = []
self.vocab = {i: bytes([i]) for i in range(256)}
def _preprocess(self, corpus_bytes):
"""
Split into words, count frequencies.
Returns: dict mapping word_tuple -> count
"""
word_freq = Counter()
# Split on whitespace, keep whitespace as prefix of next word
# (GPT-style: space is part of the following word)
words = corpus_bytes.split(b' ')
for i, word in enumerate(words):
if i > 0:
word = b' ' + word # Prepend space
word_freq[tuple(word)] += 1
return word_freq
def train(self, corpus_bytes):
"""Train using word frequency table."""
word_freq = self._preprocess(corpus_bytes)
# Convert words to token sequences
# Key: original word tuple, Value: (token_sequence, frequency)
word_tokens = {}
for word, freq in word_freq.items():
word_tokens[word] = (list(word), freq)
next_id = 256
num_merges = self.vocab_size - 256
for step in range(num_merges):
# Count pairs, weighted by word frequency
pair_freq = Counter()
for word, (tokens, freq) in word_tokens.items():
for i in range(len(tokens) - 1):
pair_freq[(tokens[i], tokens[i + 1])] += freq
if not pair_freq:
break
best_pair = max(pair_freq, key=pair_freq.get)
# Merge in all words
new_word_tokens = {}
for word, (tokens, freq) in word_tokens.items():
new_tokens = []
i = 0
while i < len(tokens):
if (i < len(tokens) - 1 and
tokens[i] == best_pair[0] and
tokens[i + 1] == best_pair[1]):
new_tokens.append(next_id)
i += 2
else:
new_tokens.append(tokens[i])
i += 1
new_word_tokens[word] = (new_tokens, freq)
word_tokens = new_word_tokens
new_bytes = self.vocab[best_pair[0]] + self.vocab[best_pair[1]]
self.vocab[next_id] = new_bytes
self.merges.append((best_pair, next_id))
next_id += 1
return self.merges, self.vocab
The word-frequency optimization reduces the inner loop from iterating over the entire corpus (billions of bytes) to iterating over the unique word vocabulary (typically 1-10M entries). On a 100GB corpus, this reduces training time from weeks to hours.
Priority Queue Optimization
Even with word frequencies, re-scanning all words every step is . The production optimization uses a priority queue:
import heapq
from typing import Optional
class PriorityQueueBPE:
"""
BPE with priority queue for O(V * log(V) + k * avg_changes) training.
This is what SentencePiece and HuggingFace tokenizers actually use.
"""
def __init__(self, vocab_size=32000):
self.vocab_size = vocab_size
def train(self, word_freq):
"""
Train with priority queue.
word_freq: dict mapping word (as tuple of ints) -> frequency
"""
# Build initial pair counts
pair_freq = Counter()
# For each pair, track which words contain it
pair_to_words = defaultdict(set)
word_tokens = {}
for word, freq in word_freq.items():
tokens = list(word)
word_tokens[word] = tokens
for i in range(len(tokens) - 1):
pair = (tokens[i], tokens[i + 1])
pair_freq[pair] += freq
pair_to_words[pair].add(word)
# Build max-heap (negate for max-heap behavior)
heap = [(-freq, pair) for pair, freq in pair_freq.items()]
heapq.heapify(heap)
merges = []
vocab = {i: bytes([i]) for i in range(256)}
next_id = 256
num_merges = self.vocab_size - 256
for step in range(num_merges):
# Pop best pair (may be stale)
while heap:
neg_freq, best_pair = heapq.heappop(heap)
# Verify count is still accurate
actual_freq = pair_freq.get(best_pair, 0)
if actual_freq == -neg_freq and actual_freq > 0:
break
# Stale entry, re-push if still valid
if actual_freq > 0:
heapq.heappush(heap, (-actual_freq, best_pair))
else:
break
# Perform merge
new_bytes = vocab[best_pair[0]] + vocab[best_pair[1]]
vocab[next_id] = new_bytes
merges.append(best_pair)
# Update only affected words
affected_words = pair_to_words.pop(best_pair, set())
for word in affected_words:
tokens = word_tokens[word]
freq = word_freq[word]
# Remove old pair counts for this word
for i in range(len(tokens) - 1):
p = (tokens[i], tokens[i + 1])
pair_freq[p] -= freq
pair_to_words[p].discard(word)
# Apply merge
new_tokens = []
i = 0
while i < len(tokens):
if (i < len(tokens) - 1 and
tokens[i] == best_pair[0] and
tokens[i + 1] == best_pair[1]):
new_tokens.append(next_id)
i += 2
else:
new_tokens.append(tokens[i])
i += 1
word_tokens[word] = new_tokens
# Add new pair counts
for i in range(len(new_tokens) - 1):
p = (new_tokens[i], new_tokens[i + 1])
pair_freq[p] += freq
pair_to_words[p].add(word)
heapq.heappush(heap, (-pair_freq[p], p))
next_id += 1
return merges, vocab
Vocabulary Size Selection
The Compression-Efficiency Tradeoff
Vocabulary size controls the tradeoff between compression (tokens per byte) and embedding table size. Larger vocabularies compress text better but require more parameters in the embedding layer and LM head, both of which are matrices.
Vocabulary Size Impact on Model Architecture
| Vocab Size | Embedding Params (d=4096) | Bytes/Token (English) | Bytes/Token (Code) | Bytes/Token (Chinese) |
|---|---|---|---|---|
| 32K | 134M (0.5% of 7B) | 3.1 | 2.8 | 1.9 |
| 50K | 205M (0.8%) | 3.3 | 3.0 | 2.2 |
| 100K | 410M (1.5%) | 3.7 | 3.4 | 2.8 |
| 128K | 524M (1.9%) | 3.8 | 3.5 | 3.1 |
| 256K | 1,049M (3.8%) | 4.0 | 3.7 | 3.4 |
Computing Optimal Vocabulary Size
The objective is to maximize effective information density: minimize the total number of tokens needed to represent a fixed corpus, subject to the embedding parameter budget.
import numpy as np
def compute_optimal_vocab_size(
corpus_bytes,
model_dim=4096,
total_param_budget_B=7.0,
embedding_budget_fraction=0.02,
candidate_sizes=None,
):
"""
Find optimal vocab size by training BPE at multiple sizes
and measuring compression ratio.
Returns: (optimal_size, results_table)
"""
if candidate_sizes is None:
candidate_sizes = [16384, 32768, 49152, 65536, 98304, 131072, 196608, 262144]
max_embedding_params = total_param_budget_B * 1e9 * embedding_budget_fraction
# embedding_params = 2 * vocab_size * model_dim (input + output)
max_vocab = int(max_embedding_params / (2 * model_dim))
results = []
for vocab_size in candidate_sizes:
if vocab_size > max_vocab:
continue
# Train BPE at this vocab size (using HuggingFace for speed)
from tokenizers import Tokenizer, models, trainers, pre_tokenizers
tokenizer = Tokenizer(models.BPE())
tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel(add_prefix_space=False)
trainer = trainers.BpeTrainer(
vocab_size=vocab_size,
special_tokens=["<|begin_of_text|>", "<|end_of_text|>", "<|pad|>"],
show_progress=False,
)
# Train on corpus
tokenizer.train_from_iterator(
corpus_bytes.decode('utf-8', errors='replace').split('\n'),
trainer=trainer,
)
# Measure compression
sample = corpus_bytes[:10_000_000] # 10MB sample
encoded = tokenizer.encode(sample.decode('utf-8', errors='replace'))
num_tokens = len(encoded.ids)
bytes_per_token = len(sample) / num_tokens
embedding_params = 2 * vocab_size * model_dim
results.append({
'vocab_size': vocab_size,
'bytes_per_token': bytes_per_token,
'tokens_for_corpus': len(corpus_bytes) / bytes_per_token,
'embedding_params_M': embedding_params / 1e6,
'embedding_fraction': embedding_params / (total_param_budget_B * 1e9),
})
# Optimal = highest bytes_per_token within budget
best = max(results, key=lambda r: r['bytes_per_token'])
return best['vocab_size'], results
Vocabulary Size by Use Case
The right vocabulary size depends on the target languages and domains:
VOCAB_SIZE_GUIDELINES = {
"english_only_small": {
"vocab_size": 32000,
"rationale": "Sufficient for English. Used by Llama 1/2.",
"bytes_per_token_english": 3.1,
"bytes_per_token_code": 2.8,
},
"english_plus_code": {
"vocab_size": 50000,
"rationale": "Extra capacity for code tokens (indentation, common identifiers).",
"bytes_per_token_english": 3.3,
"bytes_per_token_code": 3.2,
},
"multilingual_100_languages": {
"vocab_size": 100000,
"rationale": "GPT-4 / cl100k_base. Good coverage of CJK, Arabic, Devanagari.",
"bytes_per_token_english": 3.7,
"bytes_per_token_chinese": 2.8,
},
"multilingual_plus_code_large": {
"vocab_size": 128256,
"rationale": "Llama 3. Extensive multilingual + code coverage.",
"bytes_per_token_english": 3.8,
"bytes_per_token_chinese": 3.1,
},
}
Byte-Level Fallback
Why Byte-Level BPE
Pure character-level or word-level tokenizers face a coverage problem: any character not in the vocabulary cannot be encoded. Byte-level BPE solves this by treating raw bytes (0-255) as the base vocabulary. Every possible input can be encoded, even binary data.
class ByteLevelBPETokenizer:
"""
Tokenizer with byte-level fallback.
Every possible byte sequence can be tokenized.
"""
def __init__(self, merges, vocab):
"""
Args:
merges: List of (pair, new_token_id) from training
vocab: Dict of token_id -> bytes
"""
self.merges = merges
self.vocab = vocab
# Build merge lookup: pair -> (new_token_id, priority)
self.merge_lookup = {}
for priority, (pair, token_id) in enumerate(merges):
self.merge_lookup[pair] = (token_id, priority)
# Inverse vocab for decoding
self.id_to_bytes = vocab
def encode(self, text):
"""Encode text to token IDs."""
raw_bytes = text.encode('utf-8')
tokens = list(raw_bytes) # Start with individual bytes
# Iteratively apply merges
while len(tokens) >= 2:
# Find the highest-priority merge that applies
best_pair = None
best_priority = float('inf')
best_idx = -1
for i in range(len(tokens) - 1):
pair = (tokens[i], tokens[i + 1])
if pair in self.merge_lookup:
token_id, priority = self.merge_lookup[pair]
if priority < best_priority:
best_priority = priority
best_pair = pair
best_idx = i
if best_pair is None:
break
# Apply the merge
new_token_id = self.merge_lookup[best_pair][0]
tokens = (tokens[:best_idx] +
[new_token_id] +
tokens[best_idx + 2:])
return tokens
def decode(self, token_ids):
"""Decode token IDs back to text."""
raw_bytes = b''
for token_id in token_ids:
raw_bytes += self.id_to_bytes[token_id]
return raw_bytes.decode('utf-8', errors='replace')
GPT-2’s Byte-to-Unicode Mapping
GPT-2 introduced a trick to make byte-level BPE work with text-based tools: map each byte to a printable Unicode character. This lets you inspect the vocabulary with standard text editors.
def bytes_to_unicode():
"""
GPT-2's byte-to-unicode mapping.
Maps 256 byte values to 256 Unicode characters,
preferring printable ASCII where possible.
"""
# Start with printable ASCII characters
bs = (
list(range(ord("!"), ord("~") + 1)) + # 33-126
list(range(ord("\xa1"), ord("\xac") + 1)) + # 161-172
list(range(ord("\xae"), ord("\xff") + 1)) # 174-255
)
cs = bs[:]
# Map remaining bytes (0-32, 127-160, 173) to Unicode above 255
n = 0
for b in range(256):
if b not in bs:
bs.append(b)
cs.append(256 + n)
n += 1
cs = [chr(c) for c in cs]
return dict(zip(bs, cs))
# Example: byte 0x20 (space) maps to chr(0x120) = 'Ġ'
# So "Hello world" in BPE vocab looks like: "Hello", "Ġworld"
byte_map = bytes_to_unicode()
print(f"Space (0x20) maps to: {byte_map[0x20]!r}") # 'Ġ'
print(f"Newline (0x0A) maps to: {byte_map[0x0A]!r}") # 'Ċ'
The ‘Ġ’ prefix you see in GPT-2/GPT-3 vocabulary entries is not a random character — it is the byte-to-Unicode mapping of the space character (0x20). When you see “Ġworld” in the vocabulary, it means “space followed by world”. This is how the tokenizer preserves whitespace information.
Compression Ratio Analysis
Measuring Compression
Compression ratio determines how effectively the tokenizer uses the model’s context window. The key metric is bytes per token (BPT): higher is better.
import json
class CompressionAnalyzer:
"""Analyze tokenizer compression across languages and domains."""
def __init__(self, tokenizer):
self.tokenizer = tokenizer
def measure_compression(self, text):
"""
Compute compression metrics for a text sample.
Returns dict with bytes_per_token, chars_per_token, tokens_per_word.
"""
raw_bytes = text.encode('utf-8')
tokens = self.tokenizer.encode(text)
num_tokens = len(tokens) if isinstance(tokens, list) else len(tokens.ids)
words = text.split()
return {
'num_bytes': len(raw_bytes),
'num_chars': len(text),
'num_tokens': num_tokens,
'num_words': len(words),
'bytes_per_token': len(raw_bytes) / num_tokens,
'chars_per_token': len(text) / num_tokens,
'tokens_per_word': num_tokens / len(words) if words else 0,
}
def compare_domains(self, samples):
"""
Compare compression across domains.
Args:
samples: dict of domain_name -> text_sample
"""
results = {}
for domain, text in samples.items():
results[domain] = self.measure_compression(text)
return results
def fertility_analysis(self, text, language_name="unknown"):
"""
Fertility = average number of tokens per word.
Low fertility = good compression for this language.
"""
words = text.split()
total_tokens = 0
word_fertilities = []
for word in words[:10000]: # Sample first 10K words
tokens = self.tokenizer.encode(word)
n = len(tokens) if isinstance(tokens, list) else len(tokens.ids)
total_tokens += n
word_fertilities.append(n)
return {
'language': language_name,
'mean_fertility': np.mean(word_fertilities),
'median_fertility': np.median(word_fertilities),
'p95_fertility': np.percentile(word_fertilities, 95),
'single_token_fraction': sum(1 for f in word_fertilities if f == 1) / len(word_fertilities),
}
Compression Ratio by Language (Llama 3 Tokenizer, 128K vocab)
| Metric | English | French | German | Spanish | Chinese | Japanese | Korean | Hindi | Arabic | Python | C++ | JSON |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Bytes per Token |
Why Compression Varies by Language
The BPE algorithm creates merge rules biased toward the training corpus distribution. If 60% of the training corpus is English, English words dominate the vocabulary. Non-Latin scripts suffer from lower compression because:
- UTF-8 encodes CJK characters as 3 bytes, Devanagari as 3 bytes, Arabic as 2 bytes
- Fewer merges are allocated to less-represented scripts
- Rare scripts may tokenize almost character-by-character
def analyze_tokenizer_bias(tokenizer, vocab):
"""
Analyze vocabulary allocation across scripts.
Shows how many tokens are allocated to each Unicode script.
"""
import unicodedata
script_counts = Counter()
script_bytes = Counter()
for token_id in range(len(vocab)):
token_bytes = vocab[token_id]
try:
text = token_bytes.decode('utf-8')
except UnicodeDecodeError:
script_counts['binary'] += 1
continue
# Classify by the first non-space character
for char in text:
if char.strip():
try:
script = unicodedata.script(char) if hasattr(unicodedata, 'script') else 'Unknown'
except ValueError:
script = 'Unknown'
script_counts[script] += 1
script_bytes[script] += len(token_bytes)
break
else:
script_counts['whitespace'] += 1
total = sum(script_counts.values())
results = []
for script, count in script_counts.most_common(20):
avg_bytes = script_bytes[script] / count if count > 0 else 0
results.append({
'script': script,
'token_count': count,
'fraction': count / total,
'avg_token_bytes': avg_bytes,
})
return results
SentencePiece vs HuggingFace Tokenizers
Architecture Comparison
# SentencePiece: C++ library with Python bindings
# Used by: Llama, T5, mBART, ALBERT
import sentencepiece as spm
def train_sentencepiece(input_file, model_prefix, vocab_size=32000):
"""
Train a SentencePiece BPE model.
SentencePiece operates on raw Unicode, not pre-tokenized text.
It handles whitespace as a special character (U+2581 = _).
"""
spm.SentencePieceTrainer.train(
input=input_file,
model_prefix=model_prefix,
vocab_size=vocab_size,
model_type='bpe',
# Character coverage: fraction of characters to cover
# 1.0 = cover everything, 0.9995 = skip very rare characters
character_coverage=0.9995,
# Byte fallback: encode unknown chars as byte sequences
byte_fallback=True,
# Number of threads for training
num_threads=16,
# Maximum sentence length (in bytes)
max_sentence_length=16384,
# Split digits into individual characters
split_digits=True,
# Training corpus size (random sample of this many sentences)
input_sentence_size=10_000_000,
shuffle_input_sentence=True,
# Special tokens
pad_id=0,
unk_id=1,
bos_id=2,
eos_id=3,
)
# Load trained model
sp = spm.SentencePieceProcessor()
sp.load(f'{model_prefix}.model')
return sp
# HuggingFace tokenizers: Rust library with Python bindings
# Used by: GPT-2, GPT-3, GPT-4, Llama 3
from tokenizers import Tokenizer, models, trainers, pre_tokenizers, decoders
from tokenizers.normalizers import NFC
def train_huggingface_bpe(files, vocab_size=32000, save_path="tokenizer.json"):
"""
Train a HuggingFace BPE tokenizer.
HuggingFace tokenizers apply pre-tokenization (regex splitting)
before BPE, unlike SentencePiece which operates on raw text.
"""
tokenizer = Tokenizer(models.BPE())
# Pre-tokenizer: split on whitespace + punctuation + digits
# This is the GPT-2 / Llama 3 pattern
tokenizer.pre_tokenizer = pre_tokenizers.Sequence([
pre_tokenizers.Split(
# Llama 3 regex pattern
pattern=pre_tokenizers.Split(
pattern="(?i:'s|'t|'re|'ve|'m|'ll|'d)|"
"[^\\r\\n\\p{L}\\p{N}]?\\p{L}+|"
"\\p{N}{1,3}|"
" ?[^\\s\\p{L}\\p{N}]+[\\r\\n]*|"
"\\s*[\\r\\n]+|"
"\\s+(?!\\S)|"
"\\s+",
behavior="isolated",
).pattern,
behavior="isolated",
),
pre_tokenizers.ByteLevel(add_prefix_space=False),
])
# Decoder: reverse byte-level encoding
tokenizer.decoder = decoders.ByteLevel()
# Trainer configuration
trainer = trainers.BpeTrainer(
vocab_size=vocab_size,
special_tokens=[
"<|begin_of_text|>",
"<|end_of_text|>",
"<|reserved_special_token_0|>",
"<|reserved_special_token_1|>",
"<|finetune_right_pad_id|>",
"<|step_id|>",
"<|start_header_id|>",
"<|end_header_id|>",
"<|eom_id|>",
"<|eot_id|>",
"<|python_tag|>",
],
min_frequency=2,
show_progress=True,
)
tokenizer.train(files, trainer)
tokenizer.save(save_path)
return tokenizer
SentencePiece vs HuggingFace Tokenizers
| Feature | SentencePiece | HuggingFace Tokenizers |
|---|---|---|
| Language | C++ | Rust |
| Pre-tokenization | None (raw Unicode) | Regex-based splitting |
| Whitespace handling | _ prefix (U+2581) | Byte-level (Ġ prefix) |
| Unknown handling | Byte fallback | Inherent (byte-level) |
| Training speed (1GB) | ~15 minutes | ~8 minutes |
| Encoding speed | ~50K tokens/sec | ~200K tokens/sec |
| Model file format | Binary .model | JSON .json |
| Used by | Llama 1/2, T5, mBART | GPT-2/3/4, Llama 3 |
Key Difference: Pre-tokenization
The fundamental architectural difference is pre-tokenization. SentencePiece operates on raw Unicode: the BPE algorithm can merge across word boundaries. HuggingFace tokenizers first split text into “pre-tokens” using a regex pattern, then apply BPE within each pre-token.
def demonstrate_pretokenization_difference():
"""Show how pre-tokenization affects BPE merges."""
text = "New York City"
# SentencePiece (no pre-tokenization):
# BPE can merge across words: "New_York" could be a single token
# This is why T5/Llama 1/2 sometimes have multi-word tokens
sp_tokens = ["_New", "_York", "_City"] # _ = space
# HuggingFace (pre-tokenized by regex):
# Regex splits into: ["New", " York", " City"]
# BPE merges happen WITHIN each pre-token only
# "New" and " York" can never merge into one token
hf_tokens = ["New", "ĠYork", "ĠCity"] # Ġ = byte-level space
# Impact: HuggingFace vocabularies are more predictable
# but cannot learn multi-word tokens like "New_York"
return sp_tokens, hf_tokens
Special Tokens and Chat Templates
Designing the Special Token Set
SPECIAL_TOKENS = {
# Sequence boundaries
"<|begin_of_text|>": 128000,
"<|end_of_text|>": 128001,
# Chat formatting
"<|start_header_id|>": 128006,
"<|end_header_id|>": 128007,
"<|eom_id|>": 128008, # End of message (expects reply)
"<|eot_id|>": 128009, # End of turn (doesn't expect reply)
# Tool use
"<|python_tag|>": 128010,
# Reserved for future use
# Llama 3 reserves 128002-128005 and 128011-128255
}
def apply_chat_template(messages, tokenizer):
"""
Apply Llama 3 chat template.
Format:
<|begin_of_text|><|start_header_id|>system<|end_header_id|>
{system_prompt}<|eot_id|><|start_header_id|>user<|end_header_id|>
{user_message}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
{assistant_response}<|eot_id|>
"""
token_ids = [SPECIAL_TOKENS["<|begin_of_text|>"]]
for msg in messages:
role = msg["role"]
content = msg["content"]
# Header
token_ids.append(SPECIAL_TOKENS["<|start_header_id|>"])
token_ids.extend(tokenizer.encode(role))
token_ids.append(SPECIAL_TOKENS["<|end_header_id|>"])
token_ids.extend(tokenizer.encode("\n\n"))
# Content
token_ids.extend(tokenizer.encode(content))
# End of turn
token_ids.append(SPECIAL_TOKENS["<|eot_id|>"])
return token_ids
Special tokens must be added to the tokenizer AFTER training, not included in the BPE training corpus. If special tokens participate in BPE merges, they may be split or merged with adjacent text, breaking the chat template. Llama 3 uses token IDs 128000-128255 (above the 128K BPE vocabulary) for special tokens.
Training Corpus Design for Tokenizer
Corpus Composition Determines Vocabulary Allocation
The tokenizer training corpus determines which scripts and domains get the most vocabulary entries. This is a critical design decision:
TOKENIZER_CORPUS_RECIPES = {
"llama3_style": {
"english_web": 0.40,
"code_all_languages": 0.20,
"multilingual_top50": 0.25,
"math_latex": 0.05,
"structured_data_json_xml": 0.05,
"special_domains_medical_legal": 0.05,
"total_size_GB": 100,
"rationale": "Heavy code allocation for better code compression. "
"Multilingual ensures CJK and Indic scripts get tokens.",
},
"english_focused": {
"english_web": 0.60,
"code_python_js": 0.20,
"english_books": 0.10,
"english_academic": 0.10,
"total_size_GB": 50,
"rationale": "Maximum English compression. "
"Non-Latin scripts will tokenize poorly.",
},
"balanced_multilingual": {
"english_web": 0.20,
"code_all_languages": 0.15,
"chinese": 0.15,
"european_languages": 0.15,
"japanese_korean": 0.10,
"indic_languages": 0.10,
"arabic_hebrew": 0.05,
"other_languages": 0.05,
"math_structured": 0.05,
"total_size_GB": 200,
"rationale": "Equal compression across languages. "
"English compression will be worse than English-focused.",
},
}
def sample_tokenizer_corpus(data_sources, recipe, target_size_gb):
"""
Sample a balanced corpus for tokenizer training.
Key insight: tokenizer corpus should be SMALLER than model training corpus
but have the SAME distribution. 50-200GB is typical.
"""
corpus_parts = []
for source, fraction in recipe.items():
if source in ('total_size_GB', 'rationale'):
continue
target_bytes = int(target_size_gb * 1e9 * fraction)
if source in data_sources:
data = data_sources[source]
# Random sample to target size
if len(data) > target_bytes:
# Sample complete documents, not byte slices
sampled = random_sample_documents(data, target_bytes)
else:
sampled = data
corpus_parts.append(sampled)
return b'\n'.join(corpus_parts)
End-to-End Implementation
Complete Tokenizer Training Pipeline
import os
import json
import hashlib
from pathlib import Path
class TokenizerPipeline:
"""
Complete pipeline: corpus preparation -> training -> evaluation.
"""
def __init__(self, config):
self.config = config
self.output_dir = Path(config['output_dir'])
self.output_dir.mkdir(parents=True, exist_ok=True)
def prepare_corpus(self, data_dir):
"""
Step 1: Prepare tokenizer training corpus.
Sample from each data source according to recipe.
"""
recipe = self.config['corpus_recipe']
target_gb = self.config.get('corpus_size_gb', 100)
corpus_file = self.output_dir / 'tokenizer_corpus.txt'
with open(corpus_file, 'wb') as f:
for source_name, fraction in recipe.items():
source_dir = Path(data_dir) / source_name
if not source_dir.exists():
continue
target_bytes = int(target_gb * 1e9 * fraction)
written = 0
for file_path in sorted(source_dir.glob('*.txt')):
with open(file_path, 'rb') as sf:
data = sf.read()
f.write(data)
f.write(b'\n')
written += len(data)
if written >= target_bytes:
break
print(f" {source_name}: {written / 1e9:.2f} GB")
return str(corpus_file)
def train_bpe(self, corpus_file):
"""
Step 2: Train BPE tokenizer.
Uses HuggingFace tokenizers for speed.
"""
from tokenizers import Tokenizer, models, trainers
from tokenizers import pre_tokenizers, decoders, normalizers
tokenizer = Tokenizer(models.BPE())
# Normalizer: NFC normalization for consistent Unicode
tokenizer.normalizer = normalizers.NFC()
# Pre-tokenizer: byte-level with regex splitting
tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel(
add_prefix_space=False,
use_regex=True,
)
tokenizer.decoder = decoders.ByteLevel()
vocab_size = self.config['vocab_size']
special_tokens = self.config.get('special_tokens', [
"<|begin_of_text|>", "<|end_of_text|>",
"<|pad|>", "<|unk|>",
])
trainer = trainers.BpeTrainer(
vocab_size=vocab_size - len(special_tokens),
special_tokens=special_tokens,
min_frequency=self.config.get('min_frequency', 2),
show_progress=True,
initial_alphabet=pre_tokenizers.ByteLevel.alphabet(),
)
tokenizer.train([corpus_file], trainer)
# Save
save_path = str(self.output_dir / 'tokenizer.json')
tokenizer.save(save_path)
print(f"Saved tokenizer to {save_path}")
print(f"Vocabulary size: {tokenizer.get_vocab_size()}")
return tokenizer
def evaluate(self, tokenizer, test_samples):
"""
Step 3: Evaluate compression across domains and languages.
"""
results = {}
for name, text in test_samples.items():
encoded = tokenizer.encode(text)
num_tokens = len(encoded.ids)
num_bytes = len(text.encode('utf-8'))
bpt = num_bytes / num_tokens
# Roundtrip check
decoded = tokenizer.decode(encoded.ids)
roundtrip_ok = (decoded == text)
results[name] = {
'bytes_per_token': round(bpt, 2),
'num_tokens': num_tokens,
'num_bytes': num_bytes,
'roundtrip_ok': roundtrip_ok,
}
return results
def run(self, data_dir, test_samples):
"""Run the complete pipeline."""
print("Step 1: Preparing corpus...")
corpus_file = self.prepare_corpus(data_dir)
print("\nStep 2: Training BPE...")
tokenizer = self.train_bpe(corpus_file)
print("\nStep 3: Evaluating...")
results = self.evaluate(tokenizer, test_samples)
print("\nResults:")
for name, metrics in results.items():
print(f" {name}: {metrics['bytes_per_token']} bytes/token, "
f"roundtrip={'OK' if metrics['roundtrip_ok'] else 'FAIL'}")
# Save results
results_file = self.output_dir / 'evaluation_results.json'
with open(results_file, 'w') as f:
json.dump(results, f, indent=2)
return tokenizer, results
Running the Pipeline
config = {
'output_dir': './tokenizer_output',
'vocab_size': 128256,
'min_frequency': 2,
'corpus_size_gb': 100,
'corpus_recipe': {
'english_web': 0.40,
'code': 0.20,
'multilingual': 0.25,
'math': 0.05,
'structured': 0.05,
'domain_specific': 0.05,
},
'special_tokens': [
"<|begin_of_text|>",
"<|end_of_text|>",
"<|start_header_id|>",
"<|end_header_id|>",
"<|eom_id|>",
"<|eot_id|>",
"<|python_tag|>",
],
}
pipeline = TokenizerPipeline(config)
tokenizer, results = pipeline.run(
data_dir="/data/tokenizer_corpus",
test_samples={
"english_news": open("test/english.txt").read(),
"python_code": open("test/python.txt").read(),
"chinese_web": open("test/chinese.txt").read(),
"japanese_web": open("test/japanese.txt").read(),
"latex_math": open("test/math.txt").read(),
"json_data": open("test/structured.txt").read(),
},
)
Train the tokenizer on a representative sample (50-200GB), not the full training corpus (15T tokens). The BPE algorithm converges on merge rules well before processing all data. Using a larger sample mainly adds rare tokens that each appear only a few times, wasting vocabulary capacity.
Common Pitfalls and Debugging
Pitfall 1: Numeric Tokenization
def demonstrate_number_tokenization():
"""
Numbers tokenize inconsistently unless special-cased.
"1000" might be one token, "1001" might be three tokens.
This causes arithmetic difficulty for the model.
"""
# Without digit splitting:
# "1000" -> ["1000"] (common number, single token)
# "1037" -> ["10", "37"] (less common, split)
# "9847" -> ["98", "47"] (different split)
# With digit splitting (Llama 3 approach):
# "1000" -> ["1", "0", "0", "0"]
# "1037" -> ["1", "0", "3", "7"]
# "9847" -> ["9", "8", "4", "7"]
# Consistent tokenization makes arithmetic patterns learnable
# But costs more tokens for numbers
# SentencePiece config:
# split_digits=True
# HuggingFace pre-tokenizer regex:
# \\p{N}{1,3} (split numbers into 1-3 digit chunks)
pass
Pitfall 2: Whitespace Sensitivity
def demonstrate_whitespace_issue():
"""
Indentation in code is critical but can tokenize poorly.
4 spaces might be 1 token, 3 spaces might be 3 tokens.
"""
# Code indentation patterns:
# " " (4 spaces) -> should be 1 token for Python
# " " (8 spaces) -> should be 1-2 tokens
# "\t" (tab) -> should be 1 token
# Llama 3 vocabulary includes specific indentation tokens:
# Token for 2 spaces, 4 spaces, 8 spaces, 16 spaces
# This makes code tokenization much more efficient
# Without these: Python code uses 30-50% more tokens
# due to indentation overhead
pass
Pitfall 3: Tokenizer-Model Mismatch
def check_tokenizer_model_compatibility(tokenizer, model_config):
"""
Verify tokenizer is compatible with model configuration.
Mismatches cause silent errors or crashes.
"""
vocab_size = tokenizer.get_vocab_size()
model_vocab = model_config.get('vocab_size')
errors = []
# Check 1: Vocabulary size matches model embedding
if vocab_size != model_vocab:
errors.append(
f"Tokenizer vocab ({vocab_size}) != "
f"model embedding ({model_vocab})"
)
# Check 2: Special tokens exist and have correct IDs
expected_special = {
'bos_token': '<|begin_of_text|>',
'eos_token': '<|end_of_text|>',
}
for role, token in expected_special.items():
token_id = tokenizer.token_to_id(token)
if token_id is None:
errors.append(f"Missing special token: {token} ({role})")
# Check 3: Roundtrip encoding/decoding works
test_texts = [
"Hello, world!",
"def foo():\n return 42",
"The value is $100.",
]
for text in test_texts:
encoded = tokenizer.encode(text)
decoded = tokenizer.decode(encoded.ids)
if decoded != text:
errors.append(f"Roundtrip failure: {text!r} -> {decoded!r}")
return errors
Training Time vs Vocabulary Size (100GB corpus)
| Metric | 16K | 32K | 64K | 128K | 256K |
|---|---|---|---|---|---|
| SentencePiece | |||||
| HuggingFace Tokenizers |
Production Considerations
Tokenizer Versioning
Once a model is trained with a specific tokenizer, the tokenizer is frozen. Changing the tokenizer requires retraining the model from scratch. This means tokenizer bugs persist for the lifetime of the model.
class TokenizerRegistry:
"""
Version and track tokenizers for production use.
Once a model is trained, its tokenizer must never change.
"""
def __init__(self, registry_path):
self.registry_path = Path(registry_path)
self.registry_path.mkdir(parents=True, exist_ok=True)
def register(self, name, tokenizer_path, metadata):
"""Register a tokenizer with a unique hash."""
with open(tokenizer_path, 'rb') as f:
content = f.read()
content_hash = hashlib.sha256(content).hexdigest()[:16]
version_dir = self.registry_path / name / content_hash
version_dir.mkdir(parents=True, exist_ok=True)
# Copy tokenizer file
import shutil
shutil.copy2(tokenizer_path, version_dir / 'tokenizer.json')
# Save metadata
metadata['hash'] = content_hash
metadata['registered_at'] = str(Path(tokenizer_path).stat().st_mtime)
with open(version_dir / 'metadata.json', 'w') as f:
json.dump(metadata, f, indent=2)
return content_hash
def load(self, name, content_hash):
"""Load a specific tokenizer version."""
from tokenizers import Tokenizer
path = self.registry_path / name / content_hash / 'tokenizer.json'
return Tokenizer.from_file(str(path))
The tokenizer is a small file (1-5MB) that determines how billions of dollars of training compute interprets text. Getting it right before training starts is worth weeks of analysis. Getting it wrong means restarting from scratch.