GPT-4 exhibited 3-15% accuracy inflation on contaminated benchmarks β not from intentional cheating, but from MMLU questions appearing verbatim in filtered web crawls. When you train on 13 trillion tokens, even a 0.01% contamination rate means 1.3 billion tokens of benchmark overlap. The effect is measurable: a model scoring 85% on clean MMLU jumps to 92% on contaminated subsets, and that 7-point gap disappears instantly when evaluated on fresh problems. Decontamination at scale requires fingerprinting 15T tokens and matching against benchmark datasets in under 48 hours β string matching is too slow, so you use MinHash and Bloom filters.
This post covers the complete decontamination pipeline: n-gram fingerprinting, perplexity-based detection, paraphrase handling, and production-scale implementation.
Why Decontamination Matters
Measuring Contamination Impact
from dataclasses import dataclass
@dataclass
class ContaminationResult:
benchmark_name: str
total_examples: int
contaminated_count: int
contamination_rate: float
clean_accuracy: float
contaminated_accuracy: float
inflation: float # contaminated_accuracy - clean_accuracy
# Documented contamination impacts from published research
CONTAMINATION_IMPACTS = [
ContaminationResult("MMLU", 14042, 281, 0.020, 0.712, 0.834, 0.122),
ContaminationResult("HellaSwag", 10042, 502, 0.050, 0.821, 0.893, 0.072),
ContaminationResult("ARC-Challenge", 1172, 35, 0.030, 0.684, 0.743, 0.059),
ContaminationResult("GSM8K", 1319, 66, 0.050, 0.571, 0.682, 0.111),
ContaminationResult("HumanEval", 164, 8, 0.049, 0.672, 0.793, 0.121),
ContaminationResult("TruthfulQA", 817, 41, 0.050, 0.453, 0.502, 0.049),
]
Accuracy Inflation from Benchmark Contamination
| Metric | MMLU | HellaSwag | ARC-C | GSM8K | HumanEval | TruthfulQA |
|---|---|---|---|---|---|---|
| Clean Accuracy | ||||||
| Contaminated Accuracy |
Types of Contamination
class ContaminationType:
"""Classification of how benchmark data appears in training corpora."""
VERBATIM = "verbatim"
# Exact copy of benchmark question + answer
# Example: MMLU question copied into a study guide
REPHRASED = "rephrased"
# Same content, different wording
# Example: "What is the capital of France?" -> "Name France's capital city"
PARTIAL = "partial"
# Only the question or only the answer appears
# Example: Wikipedia article containing the answer but not the question
TRANSLATED = "translated"
# Benchmark content translated to/from another language
# Example: GSM8K problem translated to Chinese and back
DERIVED = "derived"
# Content derived from the same source as the benchmark
# Example: Both MMLU and training data sourced from the same textbook
STRUCTURAL = "structural"
# Same problem structure with different values
# Example: "2+3=?" in benchmark, "4+5=?" in training data
Verbatim contamination is the easiest to detect and the most damaging. A model that has memorized the exact answer to βWhat is the capital of France?β has learned nothing about geography β it has learned to pattern-match that specific string. Rephrased contamination is harder to detect but almost as damaging: the model may have memorized the answer-pattern rather than the reasoning.
N-gram Fingerprinting
The Core Algorithm
N-gram fingerprinting is the workhorse of decontamination. Extract character or word n-grams from benchmark data, store them in a lookup structure, and scan training data for matches.
import hashlib
import struct
from collections import defaultdict
class NGramFingerprinter:
"""
N-gram based contamination detection.
Used by GPT-4, Llama, and most major model trainers.
"""
def __init__(self, n=13, char_level=False):
"""
Args:
n: N-gram size. 13-gram is standard (GPT-4 uses 13-gram).
char_level: Use character n-grams instead of word n-grams.
"""
self.n = n
self.char_level = char_level
self.benchmark_fingerprints = set()
self.benchmark_sources = defaultdict(list) # fingerprint -> benchmark names
def _tokenize(self, text):
"""Split text into tokens (words or characters)."""
text = text.lower().strip()
if self.char_level:
return list(text)
else:
# Word-level: split on whitespace, remove punctuation
import re
words = re.findall(r'\w+', text)
return words
def _extract_ngrams(self, tokens):
"""Extract all n-grams from a token sequence."""
ngrams = []
for i in range(len(tokens) - self.n + 1):
ngram = tuple(tokens[i:i + self.n])
ngrams.append(ngram)
return ngrams
def _hash_ngram(self, ngram):
"""Hash an n-gram to a 64-bit integer for compact storage."""
text = ' '.join(ngram)
h = hashlib.md5(text.encode('utf-8')).digest()
return struct.unpack('<Q', h[:8])[0]
def register_benchmark(self, benchmark_name, examples):
"""
Register a benchmark's examples for contamination detection.
Args:
benchmark_name: Name of the benchmark (e.g., "MMLU")
examples: List of strings (questions, answers, or both)
"""
for example in examples:
tokens = self._tokenize(example)
ngrams = self._extract_ngrams(tokens)
for ngram in ngrams:
h = self._hash_ngram(ngram)
self.benchmark_fingerprints.add(h)
self.benchmark_sources[h].append(benchmark_name)
print(f"Registered {benchmark_name}: {len(examples)} examples, "
f"{len(self.benchmark_fingerprints)} total fingerprints")
def check_document(self, document, threshold=0.8):
"""
Check a training document for benchmark contamination.
Args:
document: Text content of the training document
threshold: Fraction of document n-grams that must match
benchmark to flag as contaminated.
0.8 = 80% of 13-grams match.
Returns:
(is_contaminated, match_fraction, matching_benchmarks)
"""
tokens = self._tokenize(document)
ngrams = self._extract_ngrams(tokens)
if not ngrams:
return False, 0.0, []
matching = 0
matching_benchmarks = set()
for ngram in ngrams:
h = self._hash_ngram(ngram)
if h in self.benchmark_fingerprints:
matching += 1
matching_benchmarks.update(self.benchmark_sources.get(h, []))
match_fraction = matching / len(ngrams)
is_contaminated = match_fraction >= threshold
return is_contaminated, match_fraction, list(matching_benchmarks)
Choosing N-gram Size
NGRAM_SIZE_ANALYSIS = {
8: {
"false_positive_rate": 0.05,
"false_negative_rate": 0.01,
"issue": "Too many false positives from common phrases",
},
10: {
"false_positive_rate": 0.02,
"false_negative_rate": 0.02,
"issue": "Good balance but may catch common technical phrases",
},
13: {
"false_positive_rate": 0.005,
"false_negative_rate": 0.05,
"issue": "Standard choice. GPT-4 uses 13-grams.",
},
20: {
"false_positive_rate": 0.001,
"false_negative_rate": 0.15,
"issue": "Too few false positives but misses paraphrases",
},
50: {
"false_positive_rate": 0.0001,
"false_negative_rate": 0.40,
"issue": "Only catches near-verbatim copies",
},
}
# The standard: use 13-gram word overlap
# A matching 13-gram means 13 consecutive words are identical
# Probability of random 13-word match in English: approximately 1e-25
# So false positives are essentially impossible for word 13-grams
Scaling to Trillions of Tokens
Bloom Filter for Memory Efficiency
Storing all benchmark n-gram hashes in a Python set works for small benchmarks but fails at scale. With 50 benchmarks averaging 10K examples, each with 100 13-grams, we have 50M fingerprints. A Bloom filter compresses this to a fixed-size bit array with configurable false positive rate.
import math
class BloomFilter:
"""
Bloom filter for memory-efficient n-gram storage.
Stores 50M fingerprints in 100MB with 0.1% false positive rate.
"""
def __init__(self, expected_items, false_positive_rate=0.001):
"""
Args:
expected_items: Expected number of items to insert
false_positive_rate: Desired false positive rate
"""
self.size = self._optimal_size(expected_items, false_positive_rate)
self.num_hashes = self._optimal_hashes(self.size, expected_items)
self.bit_array = bytearray(self.size // 8 + 1)
self.count = 0
def _optimal_size(self, n, p):
"""Compute optimal bit array size."""
m = -n * math.log(p) / (math.log(2) ** 2)
return int(m)
def _optimal_hashes(self, m, n):
"""Compute optimal number of hash functions."""
k = (m / n) * math.log(2)
return max(1, int(k))
def _hash_values(self, item):
"""Generate k hash values using double hashing."""
h1 = hash(item) & 0xFFFFFFFF
h2 = (hash(item) >> 32) & 0xFFFFFFFF
if h2 == 0:
h2 = 1
for i in range(self.num_hashes):
yield (h1 + i * h2) % self.size
def add(self, item):
"""Add an item to the filter."""
for pos in self._hash_values(item):
byte_idx = pos // 8
bit_idx = pos % 8
self.bit_array[byte_idx] |= (1 << bit_idx)
self.count += 1
def __contains__(self, item):
"""Check if an item might be in the filter."""
for pos in self._hash_values(item):
byte_idx = pos // 8
bit_idx = pos % 8
if not (self.bit_array[byte_idx] & (1 << bit_idx)):
return False
return True
class ScalableDecontaminator:
"""
Decontamination at scale using Bloom filters.
Processes 15T tokens using streaming with constant memory.
"""
def __init__(self, benchmark_ngrams, false_positive_rate=0.001):
"""
Args:
benchmark_ngrams: Total number of benchmark n-grams
"""
self.bloom = BloomFilter(benchmark_ngrams, false_positive_rate)
self.n = 13
def register_benchmarks(self, benchmark_files):
"""Load all benchmark n-grams into the Bloom filter."""
total = 0
for name, filepath in benchmark_files.items():
with open(filepath) as f:
for line in f:
example = line.strip()
tokens = example.lower().split()
for i in range(len(tokens) - self.n + 1):
ngram = tuple(tokens[i:i + self.n])
h = self._hash_ngram(ngram)
self.bloom.add(h)
total += 1
memory_mb = len(self.bloom.bit_array) / 1e6
print(f"Registered {total} n-grams in {memory_mb:.1f} MB")
def _hash_ngram(self, ngram):
text = ' '.join(ngram)
return hashlib.md5(text.encode()).hexdigest()
def scan_corpus_streaming(self, corpus_path, output_path,
contamination_threshold=0.8):
"""
Scan a corpus file, streaming one document at a time.
Writes clean documents to output, discards contaminated ones.
"""
contaminated = 0
clean = 0
with open(corpus_path) as fin, open(output_path, 'w') as fout:
for line in fin:
doc = line.strip()
tokens = doc.lower().split()
if len(tokens) < self.n:
fout.write(line)
clean += 1
continue
# Count matching n-grams
total_ngrams = len(tokens) - self.n + 1
matching = 0
for i in range(total_ngrams):
ngram = tuple(tokens[i:i + self.n])
h = self._hash_ngram(ngram)
if h in self.bloom:
matching += 1
match_rate = matching / total_ngrams
if match_rate >= contamination_threshold:
contaminated += 1
else:
fout.write(line)
clean += 1
return {'clean': clean, 'contaminated': contaminated}
Bloom Filter Memory Usage for Decontamination
| Benchmark N-grams | FP Rate | Memory (MB) | Lookup Time (ns) |
|---|---|---|---|
| 1M | 0.1% | 1.8 | 50 |
| 10M | 0.1% | 18 | 55 |
| 50M | 0.1% | 90 | 60 |
| 100M | 0.1% | 180 | 65 |
| 50M | 0.01% | 120 | 70 |
MapReduce for Distributed Decontamination
class DistributedDecontaminator:
"""
Distribute decontamination across multiple machines.
Each machine gets a copy of the Bloom filter (small)
and processes a shard of the training corpus.
"""
def __init__(self, num_workers=64):
self.num_workers = num_workers
def create_job_config(self, corpus_shards, benchmark_bloom_path,
output_dir):
"""
Create configuration for distributed decontamination job.
The Bloom filter is broadcast to all workers (small, 100MB).
Each worker processes its corpus shard independently.
"""
jobs = []
for shard_idx, shard_path in enumerate(corpus_shards):
worker_id = shard_idx % self.num_workers
jobs.append({
'worker_id': worker_id,
'shard_path': shard_path,
'bloom_filter_path': benchmark_bloom_path,
'output_path': f"{output_dir}/clean_shard_{shard_idx:06d}.jsonl",
'contamination_threshold': 0.8,
'ngram_size': 13,
})
return jobs
def merge_results(self, output_dir):
"""
Merge results from all workers.
Aggregate contamination statistics.
"""
import glob
import json
total_clean = 0
total_contaminated = 0
contaminated_by_benchmark = defaultdict(int)
for stats_file in glob.glob(f"{output_dir}/stats_*.json"):
with open(stats_file) as f:
stats = json.load(f)
total_clean += stats['clean']
total_contaminated += stats['contaminated']
for bench, count in stats.get('by_benchmark', {}).items():
contaminated_by_benchmark[bench] += count
return {
'total_clean': total_clean,
'total_contaminated': total_contaminated,
'contamination_rate': total_contaminated / (total_clean + total_contaminated),
'by_benchmark': dict(contaminated_by_benchmark),
}
Perplexity-Based Detection
When N-grams Miss: Paraphrases
N-gram matching catches verbatim and near-verbatim copies but misses paraphrases. Perplexity-based detection catches a broader class of contamination: if the model has memorized content (even rephrased), it will assign unusually low perplexity to that content.
import torch
import math
class PerplexityDetector:
"""
Detect contamination using model perplexity.
Intuition: if a document appears in training data, the model
assigns it lower perplexity than similar but unseen documents.
"""
def __init__(self, model, tokenizer, device="cuda"):
self.model = model
self.tokenizer = tokenizer
self.device = device
self.model.eval()
def compute_perplexity(self, text, stride=512):
"""
Compute perplexity of a text using sliding window.
"""
encodings = self.tokenizer(text, return_tensors="pt")
input_ids = encodings.input_ids.to(self.device)
seq_len = input_ids.size(1)
max_length = self.model.config.max_position_embeddings
nlls = []
for begin_loc in range(0, seq_len, stride):
end_loc = min(begin_loc + max_length, seq_len)
target_begin = max(begin_loc, 0)
input_slice = input_ids[:, begin_loc:end_loc]
target_ids = input_slice.clone()
target_ids[:, :target_begin - begin_loc] = -100
with torch.no_grad():
outputs = self.model(input_slice, labels=target_ids)
neg_log_likelihood = outputs.loss
nlls.append(neg_log_likelihood.item())
if end_loc == seq_len:
break
ppl = math.exp(sum(nlls) / len(nlls))
return ppl
def detect_contamination(self, benchmark_examples, reference_texts,
z_threshold=2.0):
"""
Detect contamination by comparing benchmark perplexity
against reference perplexity distribution.
Args:
benchmark_examples: List of benchmark texts to check
reference_texts: List of similar but known-clean texts
z_threshold: Z-score threshold for flagging contamination
"""
# Compute perplexity of reference texts
ref_ppls = []
for text in reference_texts:
ppl = self.compute_perplexity(text)
ref_ppls.append(ppl)
ref_mean = sum(ref_ppls) / len(ref_ppls)
ref_std = (sum((p - ref_mean) ** 2 for p in ref_ppls) /
len(ref_ppls)) ** 0.5
# Check benchmark examples
results = []
for example in benchmark_examples:
ppl = self.compute_perplexity(example)
z_score = (ppl - ref_mean) / ref_std if ref_std > 0 else 0
# Negative z-score means lower perplexity than reference
# = potential memorization
is_contaminated = z_score < -z_threshold
results.append({
'text': example[:100] + '...',
'perplexity': ppl,
'z_score': z_score,
'is_contaminated': is_contaminated,
})
return results
Perplexity-based detection has a fundamental limitation: it can only be run after training. This makes it a post-hoc audit tool, not a pre-training filter. Use n-gram fingerprinting for pre-training decontamination and perplexity-based methods for post-training verification.
Min-K% Probability Detection
A more robust perplexity variant: instead of average perplexity, look at the minimum-k% token probabilities. Memorized text has uniformly high token probabilities, while novel text has some high-probability tokens (common words) and some low-probability tokens (content-specific words).
class MinKPercentDetector:
"""
Min-K% Prob detection (Shi et al., 2023).
For memorized text, even the least-likely tokens have high probability.
For novel text, the least-likely tokens have much lower probability.
"""
def __init__(self, model, tokenizer, k_percent=20, device="cuda"):
self.model = model
self.tokenizer = tokenizer
self.k = k_percent / 100.0
self.device = device
def compute_min_k_score(self, text):
"""
Compute the Min-K% probability score.
Lower score = more likely memorized (all tokens are probable).
Higher score = more likely novel (some tokens are improbable).
"""
inputs = self.tokenizer(text, return_tensors="pt").to(self.device)
input_ids = inputs.input_ids
with torch.no_grad():
outputs = self.model(**inputs)
logits = outputs.logits
# Compute per-token log probabilities
log_probs = torch.log_softmax(logits[:, :-1, :], dim=-1)
target_ids = input_ids[:, 1:]
# Get log prob of each actual token
token_log_probs = log_probs.gather(
2, target_ids.unsqueeze(-1)
).squeeze(-1)
# Get the bottom k% of log probabilities
token_log_probs_flat = token_log_probs.flatten()
k_count = max(1, int(len(token_log_probs_flat) * self.k))
bottom_k = torch.topk(token_log_probs_flat, k_count,
largest=False).values
# Score = average of bottom k% log probs
score = bottom_k.mean().item()
return score
def batch_detect(self, texts, threshold=None):
"""
Detect memorization in a batch of texts.
If threshold is None, return scores for manual analysis.
If threshold is provided, classify as memorized/novel.
"""
results = []
for text in texts:
score = self.compute_min_k_score(text)
results.append({
'text_preview': text[:80] + '...',
'min_k_score': score,
'is_memorized': score > threshold if threshold else None,
})
return results
Handling Paraphrases
Semantic Similarity Detection
class SemanticDecontaminator:
"""
Detect paraphrased benchmark content using semantic embeddings.
Catches reformulations that n-gram matching misses.
"""
def __init__(self, embedding_model_name="all-MiniLM-L6-v2"):
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer(embedding_model_name)
self.benchmark_embeddings = {} # benchmark_name -> (embeddings, texts)
def register_benchmark(self, name, examples):
"""Embed benchmark examples for similarity search."""
embeddings = self.model.encode(examples, show_progress_bar=True,
normalize_embeddings=True)
self.benchmark_embeddings[name] = (embeddings, examples)
def check_document_semantic(self, document, threshold=0.90):
"""
Check if a document semantically matches any benchmark example.
Uses cosine similarity between sentence embeddings.
Threshold 0.90 catches close paraphrases with low false positives.
"""
# Split document into sentences
sentences = self._split_sentences(document)
if not sentences:
return False, 0.0, []
doc_embeddings = self.model.encode(sentences,
normalize_embeddings=True)
matches = []
max_similarity = 0.0
for bench_name, (bench_embeds, bench_texts) in self.benchmark_embeddings.items():
# Compute similarity matrix: doc_sentences x benchmark_examples
import numpy as np
similarity = doc_embeddings @ bench_embeds.T
# Find maximum similarity for any sentence pair
max_sim = similarity.max()
if max_sim > max_similarity:
max_similarity = float(max_sim)
if max_sim >= threshold:
# Find which sentence and benchmark example matched
doc_idx, bench_idx = np.unravel_index(
similarity.argmax(), similarity.shape
)
matches.append({
'benchmark': bench_name,
'similarity': float(max_sim),
'document_sentence': sentences[doc_idx],
'benchmark_example': bench_texts[bench_idx],
})
is_contaminated = len(matches) > 0
return is_contaminated, max_similarity, matches
def _split_sentences(self, text):
"""Split text into sentences."""
import re
sentences = re.split(r'[.!?]+', text)
return [s.strip() for s in sentences if len(s.strip()) > 20]
FAISS Index for Scale
class ScalableSemanticDecontaminator:
"""
Semantic decontamination at scale using FAISS.
Handles 15T tokens by embedding documents and running
approximate nearest neighbor search against benchmark embeddings.
"""
def __init__(self, embedding_dim=384):
import faiss
self.embedding_dim = embedding_dim
# IVF index for approximate search
quantizer = faiss.IndexFlatIP(embedding_dim)
self.index = faiss.IndexIVFFlat(
quantizer, embedding_dim, 1024, # 1024 clusters
faiss.METRIC_INNER_PRODUCT
)
self.trained = False
self.benchmark_texts = []
def build_index(self, benchmark_embeddings, benchmark_texts):
"""
Build FAISS index from benchmark embeddings.
Args:
benchmark_embeddings: numpy array (N, dim)
benchmark_texts: list of N strings
"""
import faiss
import numpy as np
# Normalize for cosine similarity via inner product
faiss.normalize_L2(benchmark_embeddings)
# Train the index
self.index.train(benchmark_embeddings)
self.index.add(benchmark_embeddings)
self.trained = True
self.benchmark_texts = benchmark_texts
print(f"Built index with {self.index.ntotal} vectors")
def search_batch(self, query_embeddings, k=1, threshold=0.90):
"""
Search for contaminated documents in batch.
Args:
query_embeddings: numpy array (M, dim)
k: Number of nearest neighbors
threshold: Similarity threshold
"""
import faiss
import numpy as np
faiss.normalize_L2(query_embeddings)
self.index.nprobe = 32 # Search 32 of 1024 clusters
distances, indices = self.index.search(query_embeddings, k)
results = []
for i in range(len(query_embeddings)):
max_sim = distances[i][0]
nearest_idx = indices[i][0]
results.append({
'similarity': float(max_sim),
'is_contaminated': max_sim >= threshold,
'nearest_benchmark': self.benchmark_texts[nearest_idx]
if nearest_idx >= 0 else None,
})
return results
Complete Decontamination Pipeline
Multi-Stage Pipeline
import json
import os
class DecontaminationPipeline:
"""
Complete decontamination pipeline combining all detection methods.
Stage 1: N-gram fingerprinting (fast, catches verbatim)
Stage 2: Semantic similarity (slower, catches paraphrases)
Stage 3: Manual review of borderline cases
"""
def __init__(self, config):
self.config = config
self.ngram_detector = NGramFingerprinter(n=config.get('ngram_size', 13))
self.bloom_detector = None
self.semantic_detector = None
def setup(self, benchmark_dir):
"""
Load all benchmarks and build detection indices.
"""
benchmark_files = {}
for filename in os.listdir(benchmark_dir):
if filename.endswith('.jsonl'):
benchmark_name = filename.replace('.jsonl', '')
filepath = os.path.join(benchmark_dir, filename)
examples = []
with open(filepath) as f:
for line in f:
data = json.loads(line)
# Concatenate question and answer
text = data.get('question', '') + ' ' + data.get('answer', '')
examples.append(text.strip())
self.ngram_detector.register_benchmark(benchmark_name, examples)
benchmark_files[benchmark_name] = examples
# Build Bloom filter for scale
total_ngrams = len(self.ngram_detector.benchmark_fingerprints)
self.bloom_detector = ScalableDecontaminator(
benchmark_ngrams=total_ngrams
)
# Build semantic index
if self.config.get('use_semantic', True):
self.semantic_detector = SemanticDecontaminator()
for name, examples in benchmark_files.items():
self.semantic_detector.register_benchmark(name, examples)
print(f"Setup complete: {len(benchmark_files)} benchmarks, "
f"{total_ngrams} n-gram fingerprints")
def process_document(self, document):
"""
Process a single document through all detection stages.
Returns (keep, contamination_info).
"""
# Stage 1: N-gram check (fast)
is_contaminated, match_frac, benchmarks = \
self.ngram_detector.check_document(
document,
threshold=self.config.get('ngram_threshold', 0.8)
)
if is_contaminated:
return False, {
'method': 'ngram',
'match_fraction': match_frac,
'benchmarks': benchmarks,
}
# Stage 2: Semantic check (if configured)
if self.semantic_detector and self.config.get('use_semantic', True):
is_contaminated, similarity, matches = \
self.semantic_detector.check_document_semantic(
document,
threshold=self.config.get('semantic_threshold', 0.90)
)
if is_contaminated:
return False, {
'method': 'semantic',
'max_similarity': similarity,
'matches': matches,
}
return True, {'method': 'clean'}
def process_corpus(self, input_path, clean_output, contaminated_log):
"""
Process an entire corpus, writing clean documents and logging removals.
"""
stats = {'clean': 0, 'contaminated_ngram': 0, 'contaminated_semantic': 0}
with open(input_path) as fin, \
open(clean_output, 'w') as fclean, \
open(contaminated_log, 'w') as flog:
for line_num, line in enumerate(fin):
doc = line.strip()
keep, info = self.process_document(doc)
if keep:
fclean.write(line)
stats['clean'] += 1
else:
flog.write(json.dumps({
'line_num': line_num,
'method': info['method'],
'details': info,
'preview': doc[:200],
}) + '\n')
stats[f"contaminated_{info['method']}"] += 1
if (line_num + 1) % 100000 == 0:
total = sum(stats.values())
contam_rate = (total - stats['clean']) / total if total > 0 else 0
print(f"Processed {line_num + 1}: "
f"{stats['clean']} clean, "
f"contamination rate: {contam_rate:.4%}")
return stats
Decontamination Pipeline Throughput
| Metric | N-gram (Bloom) | N-gram (HashSet) | Semantic (FAISS) | Semantic (Brute) | Full Pipeline |
|---|---|---|---|---|---|
| Throughput (docs/sec) |
Validation and Reporting
Post-Decontamination Audit
class DecontaminationAuditor:
"""
Verify decontamination completeness and generate compliance reports.
"""
def __init__(self, pipeline):
self.pipeline = pipeline
def sample_audit(self, clean_corpus_path, sample_size=10000):
"""
Random sample audit of the clean corpus.
Verify that contamination rate is below acceptable threshold.
"""
import random
# Read random sample
with open(clean_corpus_path) as f:
lines = f.readlines()
if len(lines) > sample_size:
sample = random.sample(lines, sample_size)
else:
sample = lines
# Run full detection on sample
contaminated = 0
for line in sample:
keep, info = self.pipeline.process_document(line.strip())
if not keep:
contaminated += 1
residual_rate = contaminated / len(sample)
return {
'sample_size': len(sample),
'residual_contaminated': contaminated,
'residual_rate': residual_rate,
'acceptable': residual_rate < 0.001, # Less than 0.1%
}
def generate_report(self, stats, audit_results):
"""Generate a decontamination compliance report."""
total = sum(stats.values())
report = {
'summary': {
'total_documents': total,
'clean_documents': stats['clean'],
'removed_documents': total - stats['clean'],
'removal_rate': (total - stats['clean']) / total,
},
'by_method': {
'ngram': stats.get('contaminated_ngram', 0),
'semantic': stats.get('contaminated_semantic', 0),
},
'audit': audit_results,
'conclusion': (
'PASS' if audit_results['acceptable']
else 'FAIL: residual contamination above threshold'
),
}
return report
Run decontamination before every training run, not just once. Benchmarks evolve (new versions, new benchmarks), training data sources change, and previously clean sources may have been updated to include benchmark content. Keep the decontamination pipeline as a permanent stage in the data processing DAG, triggered whenever benchmarks or data sources are updated.
Decontamination is not optional. A model evaluated on contaminated benchmarks produces misleading results that waste downstream engineering effort: teams build on inflated capabilities that do not generalize. The pipeline described here β n-gram fingerprinting for verbatim matches, semantic similarity for paraphrases, Bloom filters for memory efficiency, and distributed processing for scale β can decontaminate 15T tokens in under 8 hours on a 64-machine cluster. The compute cost is negligible compared to training cost, and the integrity of evaluation results is worth far more.