Part of Series The Dataset Frontier 16 of 27
1 Synthetic Data Pipelines: Magpie, Nemotron-4, and Generating Training Data at Scale 2 Data Curation at Scale: DCLM, FineWeb-Edu, and the Exact Heuristics That Filter the Web 3 Agent-Based Simulation: Using 10,000 AI Agents to Generate Synthetic Training Data 4 Code Dataset Curation: Deduplication, License Filtering, and Quality Scoring for LLM Training 5 Multilingual Data: Cross-Lingual Transfer, Low-Resource Languages, and Translation Quality 6 Instruction Tuning Data: ShareGPT, OpenAssistant, and Quality Metrics for Alignment 7 Preference Data: Building DPO/RLHF Datasets from Human and AI Feedback 8 Data Mixing: Optimal Proportions of Code, Math, Web, and Books for LLM Training 9 Evaluation Datasets: Building Benchmarks That Actually Measure LLM Capability 10 Data Contamination: Detecting and Preventing Benchmark Leakage in Training Data 11 The Data Scaling Law: How Much Data Is Enough, and What Happens When You Run Out 12 Training a Tokenizer from Scratch: BPE Merge Rules, Vocabulary Optimization, and Compression Ratio 13 Multimodal Training Data: Image-Text Pairs, Video Captioning, and Interleaved Document Formats 14 RLHF Data at Scale: Collecting Millions of Human Preferences with Minimal Cost 15 Building a Decontamination Pipeline: Removing Benchmark Data from Training Corpora 16 Safety Training Data: Red Teaming, Refusal Training, and Building Datasets for Harmless AI 17 Data Versioning and Reproducibility: Tracking What Changed Between Training Runs 18 Domain-Specific Data: Building Medical, Legal, and Financial Training Datasets 19 Data Attribution and Provenance: Tracing Model Outputs Back to Training Examples 20 The Data Flywheel: Using Production Logs to Continuously Improve Training Data 21 Reward Model Training Data: Building Datasets for Math Verification and Code Correctness 22 Long-Context Training Data: Book-Length Documents, Multi-Document QA, and Needle-in-Haystack 23 Agentic Interaction Data: Tool Use Traces, Multi-Step Planning Logs, and Environment Feedback 24 Data Labeling Platforms: Scale AI, Surge AI, and Building Your Own Annotation Pipeline 25 Data Legal Issues: Copyright, Fair Use, Opt-Out, and the Regulatory Landscape for Training Data 26 Data Pipeline at Scale: Spark, Ray, and Processing 15 Trillion Tokens Across 1000 Nodes 27 Building a Data Pipeline: From Raw HTML to Clean Training Tokens in 500 Lines

GPT-4 exhibited 3-15% accuracy inflation on contaminated benchmarks β€” not from intentional cheating, but from MMLU questions appearing verbatim in filtered web crawls. When you train on 13 trillion tokens, even a 0.01% contamination rate means 1.3 billion tokens of benchmark overlap. The effect is measurable: a model scoring 85% on clean MMLU jumps to 92% on contaminated subsets, and that 7-point gap disappears instantly when evaluated on fresh problems. Decontamination at scale requires fingerprinting 15T tokens and matching against benchmark datasets in under 48 hours β€” string matching is too slow, so you use MinHash and Bloom filters.

This post covers the complete decontamination pipeline: n-gram fingerprinting, perplexity-based detection, paraphrase handling, and production-scale implementation.

Why Decontamination Matters

Measuring Contamination Impact

from dataclasses import dataclass

@dataclass
class ContaminationResult:
    benchmark_name: str
    total_examples: int
    contaminated_count: int
    contamination_rate: float
    clean_accuracy: float
    contaminated_accuracy: float
    inflation: float  # contaminated_accuracy - clean_accuracy

# Documented contamination impacts from published research
CONTAMINATION_IMPACTS = [
    ContaminationResult("MMLU", 14042, 281, 0.020, 0.712, 0.834, 0.122),
    ContaminationResult("HellaSwag", 10042, 502, 0.050, 0.821, 0.893, 0.072),
    ContaminationResult("ARC-Challenge", 1172, 35, 0.030, 0.684, 0.743, 0.059),
    ContaminationResult("GSM8K", 1319, 66, 0.050, 0.571, 0.682, 0.111),
    ContaminationResult("HumanEval", 164, 8, 0.049, 0.672, 0.793, 0.121),
    ContaminationResult("TruthfulQA", 817, 41, 0.050, 0.453, 0.502, 0.049),
]

Accuracy Inflation from Benchmark Contamination

Metric MMLUHellaSwagARC-CGSM8KHumanEvalTruthfulQA
Clean Accuracy
71.2
82.1
68.4
57.1
67.2
45.3
Contaminated Accuracy
83.4
89.3
74.3
68.2
79.3
50.2

Types of Contamination

class ContaminationType:
    """Classification of how benchmark data appears in training corpora."""

    VERBATIM = "verbatim"
    # Exact copy of benchmark question + answer
    # Example: MMLU question copied into a study guide

    REPHRASED = "rephrased"
    # Same content, different wording
    # Example: "What is the capital of France?" -> "Name France's capital city"

    PARTIAL = "partial"
    # Only the question or only the answer appears
    # Example: Wikipedia article containing the answer but not the question

    TRANSLATED = "translated"
    # Benchmark content translated to/from another language
    # Example: GSM8K problem translated to Chinese and back

    DERIVED = "derived"
    # Content derived from the same source as the benchmark
    # Example: Both MMLU and training data sourced from the same textbook

    STRUCTURAL = "structural"
    # Same problem structure with different values
    # Example: "2+3=?" in benchmark, "4+5=?" in training data
⚠️ Warning

Verbatim contamination is the easiest to detect and the most damaging. A model that has memorized the exact answer to β€œWhat is the capital of France?” has learned nothing about geography β€” it has learned to pattern-match that specific string. Rephrased contamination is harder to detect but almost as damaging: the model may have memorized the answer-pattern rather than the reasoning.

N-gram Fingerprinting

The Core Algorithm

N-gram fingerprinting is the workhorse of decontamination. Extract character or word n-grams from benchmark data, store them in a lookup structure, and scan training data for matches.

import hashlib
import struct
from collections import defaultdict

class NGramFingerprinter:
    """
    N-gram based contamination detection.
    Used by GPT-4, Llama, and most major model trainers.
    """

    def __init__(self, n=13, char_level=False):
        """
        Args:
            n: N-gram size. 13-gram is standard (GPT-4 uses 13-gram).
            char_level: Use character n-grams instead of word n-grams.
        """
        self.n = n
        self.char_level = char_level
        self.benchmark_fingerprints = set()
        self.benchmark_sources = defaultdict(list)  # fingerprint -> benchmark names

    def _tokenize(self, text):
        """Split text into tokens (words or characters)."""
        text = text.lower().strip()
        if self.char_level:
            return list(text)
        else:
            # Word-level: split on whitespace, remove punctuation
            import re
            words = re.findall(r'\w+', text)
            return words

    def _extract_ngrams(self, tokens):
        """Extract all n-grams from a token sequence."""
        ngrams = []
        for i in range(len(tokens) - self.n + 1):
            ngram = tuple(tokens[i:i + self.n])
            ngrams.append(ngram)
        return ngrams

    def _hash_ngram(self, ngram):
        """Hash an n-gram to a 64-bit integer for compact storage."""
        text = ' '.join(ngram)
        h = hashlib.md5(text.encode('utf-8')).digest()
        return struct.unpack('<Q', h[:8])[0]

    def register_benchmark(self, benchmark_name, examples):
        """
        Register a benchmark's examples for contamination detection.

        Args:
            benchmark_name: Name of the benchmark (e.g., "MMLU")
            examples: List of strings (questions, answers, or both)
        """
        for example in examples:
            tokens = self._tokenize(example)
            ngrams = self._extract_ngrams(tokens)
            for ngram in ngrams:
                h = self._hash_ngram(ngram)
                self.benchmark_fingerprints.add(h)
                self.benchmark_sources[h].append(benchmark_name)

        print(f"Registered {benchmark_name}: {len(examples)} examples, "
              f"{len(self.benchmark_fingerprints)} total fingerprints")

    def check_document(self, document, threshold=0.8):
        """
        Check a training document for benchmark contamination.

        Args:
            document: Text content of the training document
            threshold: Fraction of document n-grams that must match
                      benchmark to flag as contaminated.
                      0.8 = 80% of 13-grams match.

        Returns:
            (is_contaminated, match_fraction, matching_benchmarks)
        """
        tokens = self._tokenize(document)
        ngrams = self._extract_ngrams(tokens)

        if not ngrams:
            return False, 0.0, []

        matching = 0
        matching_benchmarks = set()

        for ngram in ngrams:
            h = self._hash_ngram(ngram)
            if h in self.benchmark_fingerprints:
                matching += 1
                matching_benchmarks.update(self.benchmark_sources.get(h, []))

        match_fraction = matching / len(ngrams)
        is_contaminated = match_fraction >= threshold

        return is_contaminated, match_fraction, list(matching_benchmarks)

Choosing N-gram Size

NGRAM_SIZE_ANALYSIS = {
    8: {
        "false_positive_rate": 0.05,
        "false_negative_rate": 0.01,
        "issue": "Too many false positives from common phrases",
    },
    10: {
        "false_positive_rate": 0.02,
        "false_negative_rate": 0.02,
        "issue": "Good balance but may catch common technical phrases",
    },
    13: {
        "false_positive_rate": 0.005,
        "false_negative_rate": 0.05,
        "issue": "Standard choice. GPT-4 uses 13-grams.",
    },
    20: {
        "false_positive_rate": 0.001,
        "false_negative_rate": 0.15,
        "issue": "Too few false positives but misses paraphrases",
    },
    50: {
        "false_positive_rate": 0.0001,
        "false_negative_rate": 0.40,
        "issue": "Only catches near-verbatim copies",
    },
}

# The standard: use 13-gram word overlap
# A matching 13-gram means 13 consecutive words are identical
# Probability of random 13-word match in English: approximately 1e-25
# So false positives are essentially impossible for word 13-grams

Scaling to Trillions of Tokens

Bloom Filter for Memory Efficiency

Storing all benchmark n-gram hashes in a Python set works for small benchmarks but fails at scale. With 50 benchmarks averaging 10K examples, each with 100 13-grams, we have 50M fingerprints. A Bloom filter compresses this to a fixed-size bit array with configurable false positive rate.

import math

class BloomFilter:
    """
    Bloom filter for memory-efficient n-gram storage.
    Stores 50M fingerprints in 100MB with 0.1% false positive rate.
    """

    def __init__(self, expected_items, false_positive_rate=0.001):
        """
        Args:
            expected_items: Expected number of items to insert
            false_positive_rate: Desired false positive rate
        """
        self.size = self._optimal_size(expected_items, false_positive_rate)
        self.num_hashes = self._optimal_hashes(self.size, expected_items)
        self.bit_array = bytearray(self.size // 8 + 1)
        self.count = 0

    def _optimal_size(self, n, p):
        """Compute optimal bit array size."""
        m = -n * math.log(p) / (math.log(2) ** 2)
        return int(m)

    def _optimal_hashes(self, m, n):
        """Compute optimal number of hash functions."""
        k = (m / n) * math.log(2)
        return max(1, int(k))

    def _hash_values(self, item):
        """Generate k hash values using double hashing."""
        h1 = hash(item) & 0xFFFFFFFF
        h2 = (hash(item) >> 32) & 0xFFFFFFFF
        if h2 == 0:
            h2 = 1

        for i in range(self.num_hashes):
            yield (h1 + i * h2) % self.size

    def add(self, item):
        """Add an item to the filter."""
        for pos in self._hash_values(item):
            byte_idx = pos // 8
            bit_idx = pos % 8
            self.bit_array[byte_idx] |= (1 << bit_idx)
        self.count += 1

    def __contains__(self, item):
        """Check if an item might be in the filter."""
        for pos in self._hash_values(item):
            byte_idx = pos // 8
            bit_idx = pos % 8
            if not (self.bit_array[byte_idx] & (1 << bit_idx)):
                return False
        return True

class ScalableDecontaminator:
    """
    Decontamination at scale using Bloom filters.
    Processes 15T tokens using streaming with constant memory.
    """

    def __init__(self, benchmark_ngrams, false_positive_rate=0.001):
        """
        Args:
            benchmark_ngrams: Total number of benchmark n-grams
        """
        self.bloom = BloomFilter(benchmark_ngrams, false_positive_rate)
        self.n = 13

    def register_benchmarks(self, benchmark_files):
        """Load all benchmark n-grams into the Bloom filter."""
        total = 0
        for name, filepath in benchmark_files.items():
            with open(filepath) as f:
                for line in f:
                    example = line.strip()
                    tokens = example.lower().split()
                    for i in range(len(tokens) - self.n + 1):
                        ngram = tuple(tokens[i:i + self.n])
                        h = self._hash_ngram(ngram)
                        self.bloom.add(h)
                        total += 1

        memory_mb = len(self.bloom.bit_array) / 1e6
        print(f"Registered {total} n-grams in {memory_mb:.1f} MB")

    def _hash_ngram(self, ngram):
        text = ' '.join(ngram)
        return hashlib.md5(text.encode()).hexdigest()

    def scan_corpus_streaming(self, corpus_path, output_path,
                              contamination_threshold=0.8):
        """
        Scan a corpus file, streaming one document at a time.
        Writes clean documents to output, discards contaminated ones.
        """
        contaminated = 0
        clean = 0

        with open(corpus_path) as fin, open(output_path, 'w') as fout:
            for line in fin:
                doc = line.strip()
                tokens = doc.lower().split()

                if len(tokens) < self.n:
                    fout.write(line)
                    clean += 1
                    continue

                # Count matching n-grams
                total_ngrams = len(tokens) - self.n + 1
                matching = 0

                for i in range(total_ngrams):
                    ngram = tuple(tokens[i:i + self.n])
                    h = self._hash_ngram(ngram)
                    if h in self.bloom:
                        matching += 1

                match_rate = matching / total_ngrams
                if match_rate >= contamination_threshold:
                    contaminated += 1
                else:
                    fout.write(line)
                    clean += 1

        return {'clean': clean, 'contaminated': contaminated}
πŸ“Š

Bloom Filter Memory Usage for Decontamination

Benchmark N-gramsFP RateMemory (MB)Lookup Time (ns)
1M 0.1% 1.8 50
10M 0.1% 18 55
50M 0.1% 90 60
100M 0.1% 180 65
50M 0.01% 120 70
Note: At 0.1% false positive rate, 50M benchmark n-grams fit in 90MB. Processing 15T tokens at 1M documents/sec takes approximately 4 hours on a single machine.

MapReduce for Distributed Decontamination

class DistributedDecontaminator:
    """
    Distribute decontamination across multiple machines.
    Each machine gets a copy of the Bloom filter (small)
    and processes a shard of the training corpus.
    """

    def __init__(self, num_workers=64):
        self.num_workers = num_workers

    def create_job_config(self, corpus_shards, benchmark_bloom_path,
                          output_dir):
        """
        Create configuration for distributed decontamination job.

        The Bloom filter is broadcast to all workers (small, 100MB).
        Each worker processes its corpus shard independently.
        """
        jobs = []
        for shard_idx, shard_path in enumerate(corpus_shards):
            worker_id = shard_idx % self.num_workers
            jobs.append({
                'worker_id': worker_id,
                'shard_path': shard_path,
                'bloom_filter_path': benchmark_bloom_path,
                'output_path': f"{output_dir}/clean_shard_{shard_idx:06d}.jsonl",
                'contamination_threshold': 0.8,
                'ngram_size': 13,
            })

        return jobs

    def merge_results(self, output_dir):
        """
        Merge results from all workers.
        Aggregate contamination statistics.
        """
        import glob
        import json

        total_clean = 0
        total_contaminated = 0
        contaminated_by_benchmark = defaultdict(int)

        for stats_file in glob.glob(f"{output_dir}/stats_*.json"):
            with open(stats_file) as f:
                stats = json.load(f)
            total_clean += stats['clean']
            total_contaminated += stats['contaminated']
            for bench, count in stats.get('by_benchmark', {}).items():
                contaminated_by_benchmark[bench] += count

        return {
            'total_clean': total_clean,
            'total_contaminated': total_contaminated,
            'contamination_rate': total_contaminated / (total_clean + total_contaminated),
            'by_benchmark': dict(contaminated_by_benchmark),
        }

Perplexity-Based Detection

When N-grams Miss: Paraphrases

N-gram matching catches verbatim and near-verbatim copies but misses paraphrases. Perplexity-based detection catches a broader class of contamination: if the model has memorized content (even rephrased), it will assign unusually low perplexity to that content.

import torch
import math

class PerplexityDetector:
    """
    Detect contamination using model perplexity.

    Intuition: if a document appears in training data, the model
    assigns it lower perplexity than similar but unseen documents.
    """

    def __init__(self, model, tokenizer, device="cuda"):
        self.model = model
        self.tokenizer = tokenizer
        self.device = device
        self.model.eval()

    def compute_perplexity(self, text, stride=512):
        """
        Compute perplexity of a text using sliding window.
        """
        encodings = self.tokenizer(text, return_tensors="pt")
        input_ids = encodings.input_ids.to(self.device)
        seq_len = input_ids.size(1)
        max_length = self.model.config.max_position_embeddings

        nlls = []
        for begin_loc in range(0, seq_len, stride):
            end_loc = min(begin_loc + max_length, seq_len)
            target_begin = max(begin_loc, 0)

            input_slice = input_ids[:, begin_loc:end_loc]
            target_ids = input_slice.clone()
            target_ids[:, :target_begin - begin_loc] = -100

            with torch.no_grad():
                outputs = self.model(input_slice, labels=target_ids)
                neg_log_likelihood = outputs.loss

            nlls.append(neg_log_likelihood.item())

            if end_loc == seq_len:
                break

        ppl = math.exp(sum(nlls) / len(nlls))
        return ppl

    def detect_contamination(self, benchmark_examples, reference_texts,
                            z_threshold=2.0):
        """
        Detect contamination by comparing benchmark perplexity
        against reference perplexity distribution.

        Args:
            benchmark_examples: List of benchmark texts to check
            reference_texts: List of similar but known-clean texts
            z_threshold: Z-score threshold for flagging contamination
        """
        # Compute perplexity of reference texts
        ref_ppls = []
        for text in reference_texts:
            ppl = self.compute_perplexity(text)
            ref_ppls.append(ppl)

        ref_mean = sum(ref_ppls) / len(ref_ppls)
        ref_std = (sum((p - ref_mean) ** 2 for p in ref_ppls) /
                   len(ref_ppls)) ** 0.5

        # Check benchmark examples
        results = []
        for example in benchmark_examples:
            ppl = self.compute_perplexity(example)
            z_score = (ppl - ref_mean) / ref_std if ref_std > 0 else 0

            # Negative z-score means lower perplexity than reference
            # = potential memorization
            is_contaminated = z_score < -z_threshold

            results.append({
                'text': example[:100] + '...',
                'perplexity': ppl,
                'z_score': z_score,
                'is_contaminated': is_contaminated,
            })

        return results
ℹ️ Note

Perplexity-based detection has a fundamental limitation: it can only be run after training. This makes it a post-hoc audit tool, not a pre-training filter. Use n-gram fingerprinting for pre-training decontamination and perplexity-based methods for post-training verification.

Min-K% Probability Detection

A more robust perplexity variant: instead of average perplexity, look at the minimum-k% token probabilities. Memorized text has uniformly high token probabilities, while novel text has some high-probability tokens (common words) and some low-probability tokens (content-specific words).

class MinKPercentDetector:
    """
    Min-K% Prob detection (Shi et al., 2023).

    For memorized text, even the least-likely tokens have high probability.
    For novel text, the least-likely tokens have much lower probability.
    """

    def __init__(self, model, tokenizer, k_percent=20, device="cuda"):
        self.model = model
        self.tokenizer = tokenizer
        self.k = k_percent / 100.0
        self.device = device

    def compute_min_k_score(self, text):
        """
        Compute the Min-K% probability score.

        Lower score = more likely memorized (all tokens are probable).
        Higher score = more likely novel (some tokens are improbable).
        """
        inputs = self.tokenizer(text, return_tensors="pt").to(self.device)
        input_ids = inputs.input_ids

        with torch.no_grad():
            outputs = self.model(**inputs)
            logits = outputs.logits

        # Compute per-token log probabilities
        log_probs = torch.log_softmax(logits[:, :-1, :], dim=-1)
        target_ids = input_ids[:, 1:]

        # Get log prob of each actual token
        token_log_probs = log_probs.gather(
            2, target_ids.unsqueeze(-1)
        ).squeeze(-1)

        # Get the bottom k% of log probabilities
        token_log_probs_flat = token_log_probs.flatten()
        k_count = max(1, int(len(token_log_probs_flat) * self.k))
        bottom_k = torch.topk(token_log_probs_flat, k_count,
                              largest=False).values

        # Score = average of bottom k% log probs
        score = bottom_k.mean().item()
        return score

    def batch_detect(self, texts, threshold=None):
        """
        Detect memorization in a batch of texts.

        If threshold is None, return scores for manual analysis.
        If threshold is provided, classify as memorized/novel.
        """
        results = []
        for text in texts:
            score = self.compute_min_k_score(text)
            results.append({
                'text_preview': text[:80] + '...',
                'min_k_score': score,
                'is_memorized': score > threshold if threshold else None,
            })
        return results

Handling Paraphrases

Semantic Similarity Detection

class SemanticDecontaminator:
    """
    Detect paraphrased benchmark content using semantic embeddings.
    Catches reformulations that n-gram matching misses.
    """

    def __init__(self, embedding_model_name="all-MiniLM-L6-v2"):
        from sentence_transformers import SentenceTransformer
        self.model = SentenceTransformer(embedding_model_name)
        self.benchmark_embeddings = {}  # benchmark_name -> (embeddings, texts)

    def register_benchmark(self, name, examples):
        """Embed benchmark examples for similarity search."""
        embeddings = self.model.encode(examples, show_progress_bar=True,
                                        normalize_embeddings=True)
        self.benchmark_embeddings[name] = (embeddings, examples)

    def check_document_semantic(self, document, threshold=0.90):
        """
        Check if a document semantically matches any benchmark example.

        Uses cosine similarity between sentence embeddings.
        Threshold 0.90 catches close paraphrases with low false positives.
        """
        # Split document into sentences
        sentences = self._split_sentences(document)
        if not sentences:
            return False, 0.0, []

        doc_embeddings = self.model.encode(sentences,
                                           normalize_embeddings=True)

        matches = []
        max_similarity = 0.0

        for bench_name, (bench_embeds, bench_texts) in self.benchmark_embeddings.items():
            # Compute similarity matrix: doc_sentences x benchmark_examples
            import numpy as np
            similarity = doc_embeddings @ bench_embeds.T

            # Find maximum similarity for any sentence pair
            max_sim = similarity.max()
            if max_sim > max_similarity:
                max_similarity = float(max_sim)

            if max_sim >= threshold:
                # Find which sentence and benchmark example matched
                doc_idx, bench_idx = np.unravel_index(
                    similarity.argmax(), similarity.shape
                )
                matches.append({
                    'benchmark': bench_name,
                    'similarity': float(max_sim),
                    'document_sentence': sentences[doc_idx],
                    'benchmark_example': bench_texts[bench_idx],
                })

        is_contaminated = len(matches) > 0
        return is_contaminated, max_similarity, matches

    def _split_sentences(self, text):
        """Split text into sentences."""
        import re
        sentences = re.split(r'[.!?]+', text)
        return [s.strip() for s in sentences if len(s.strip()) > 20]

FAISS Index for Scale

class ScalableSemanticDecontaminator:
    """
    Semantic decontamination at scale using FAISS.
    Handles 15T tokens by embedding documents and running
    approximate nearest neighbor search against benchmark embeddings.
    """

    def __init__(self, embedding_dim=384):
        import faiss
        self.embedding_dim = embedding_dim
        # IVF index for approximate search
        quantizer = faiss.IndexFlatIP(embedding_dim)
        self.index = faiss.IndexIVFFlat(
            quantizer, embedding_dim, 1024,  # 1024 clusters
            faiss.METRIC_INNER_PRODUCT
        )
        self.trained = False
        self.benchmark_texts = []

    def build_index(self, benchmark_embeddings, benchmark_texts):
        """
        Build FAISS index from benchmark embeddings.

        Args:
            benchmark_embeddings: numpy array (N, dim)
            benchmark_texts: list of N strings
        """
        import faiss
        import numpy as np

        # Normalize for cosine similarity via inner product
        faiss.normalize_L2(benchmark_embeddings)

        # Train the index
        self.index.train(benchmark_embeddings)
        self.index.add(benchmark_embeddings)
        self.trained = True
        self.benchmark_texts = benchmark_texts

        print(f"Built index with {self.index.ntotal} vectors")

    def search_batch(self, query_embeddings, k=1, threshold=0.90):
        """
        Search for contaminated documents in batch.

        Args:
            query_embeddings: numpy array (M, dim)
            k: Number of nearest neighbors
            threshold: Similarity threshold
        """
        import faiss
        import numpy as np

        faiss.normalize_L2(query_embeddings)
        self.index.nprobe = 32  # Search 32 of 1024 clusters

        distances, indices = self.index.search(query_embeddings, k)

        results = []
        for i in range(len(query_embeddings)):
            max_sim = distances[i][0]
            nearest_idx = indices[i][0]

            results.append({
                'similarity': float(max_sim),
                'is_contaminated': max_sim >= threshold,
                'nearest_benchmark': self.benchmark_texts[nearest_idx]
                    if nearest_idx >= 0 else None,
            })

        return results

Complete Decontamination Pipeline

Multi-Stage Pipeline

import json
import os

class DecontaminationPipeline:
    """
    Complete decontamination pipeline combining all detection methods.

    Stage 1: N-gram fingerprinting (fast, catches verbatim)
    Stage 2: Semantic similarity (slower, catches paraphrases)
    Stage 3: Manual review of borderline cases
    """

    def __init__(self, config):
        self.config = config
        self.ngram_detector = NGramFingerprinter(n=config.get('ngram_size', 13))
        self.bloom_detector = None
        self.semantic_detector = None

    def setup(self, benchmark_dir):
        """
        Load all benchmarks and build detection indices.
        """
        benchmark_files = {}
        for filename in os.listdir(benchmark_dir):
            if filename.endswith('.jsonl'):
                benchmark_name = filename.replace('.jsonl', '')
                filepath = os.path.join(benchmark_dir, filename)
                examples = []
                with open(filepath) as f:
                    for line in f:
                        data = json.loads(line)
                        # Concatenate question and answer
                        text = data.get('question', '') + ' ' + data.get('answer', '')
                        examples.append(text.strip())

                self.ngram_detector.register_benchmark(benchmark_name, examples)
                benchmark_files[benchmark_name] = examples

        # Build Bloom filter for scale
        total_ngrams = len(self.ngram_detector.benchmark_fingerprints)
        self.bloom_detector = ScalableDecontaminator(
            benchmark_ngrams=total_ngrams
        )

        # Build semantic index
        if self.config.get('use_semantic', True):
            self.semantic_detector = SemanticDecontaminator()
            for name, examples in benchmark_files.items():
                self.semantic_detector.register_benchmark(name, examples)

        print(f"Setup complete: {len(benchmark_files)} benchmarks, "
              f"{total_ngrams} n-gram fingerprints")

    def process_document(self, document):
        """
        Process a single document through all detection stages.
        Returns (keep, contamination_info).
        """
        # Stage 1: N-gram check (fast)
        is_contaminated, match_frac, benchmarks = \
            self.ngram_detector.check_document(
                document,
                threshold=self.config.get('ngram_threshold', 0.8)
            )

        if is_contaminated:
            return False, {
                'method': 'ngram',
                'match_fraction': match_frac,
                'benchmarks': benchmarks,
            }

        # Stage 2: Semantic check (if configured)
        if self.semantic_detector and self.config.get('use_semantic', True):
            is_contaminated, similarity, matches = \
                self.semantic_detector.check_document_semantic(
                    document,
                    threshold=self.config.get('semantic_threshold', 0.90)
                )

            if is_contaminated:
                return False, {
                    'method': 'semantic',
                    'max_similarity': similarity,
                    'matches': matches,
                }

        return True, {'method': 'clean'}

    def process_corpus(self, input_path, clean_output, contaminated_log):
        """
        Process an entire corpus, writing clean documents and logging removals.
        """
        stats = {'clean': 0, 'contaminated_ngram': 0, 'contaminated_semantic': 0}

        with open(input_path) as fin, \
             open(clean_output, 'w') as fclean, \
             open(contaminated_log, 'w') as flog:

            for line_num, line in enumerate(fin):
                doc = line.strip()
                keep, info = self.process_document(doc)

                if keep:
                    fclean.write(line)
                    stats['clean'] += 1
                else:
                    flog.write(json.dumps({
                        'line_num': line_num,
                        'method': info['method'],
                        'details': info,
                        'preview': doc[:200],
                    }) + '\n')
                    stats[f"contaminated_{info['method']}"] += 1

                if (line_num + 1) % 100000 == 0:
                    total = sum(stats.values())
                    contam_rate = (total - stats['clean']) / total if total > 0 else 0
                    print(f"Processed {line_num + 1}: "
                          f"{stats['clean']} clean, "
                          f"contamination rate: {contam_rate:.4%}")

        return stats

Decontamination Pipeline Throughput

Metric N-gram (Bloom)N-gram (HashSet)Semantic (FAISS)Semantic (Brute)Full Pipeline
Throughput (docs/sec)
50000
25000
2000
100
1800

Validation and Reporting

Post-Decontamination Audit

class DecontaminationAuditor:
    """
    Verify decontamination completeness and generate compliance reports.
    """

    def __init__(self, pipeline):
        self.pipeline = pipeline

    def sample_audit(self, clean_corpus_path, sample_size=10000):
        """
        Random sample audit of the clean corpus.
        Verify that contamination rate is below acceptable threshold.
        """
        import random

        # Read random sample
        with open(clean_corpus_path) as f:
            lines = f.readlines()

        if len(lines) > sample_size:
            sample = random.sample(lines, sample_size)
        else:
            sample = lines

        # Run full detection on sample
        contaminated = 0
        for line in sample:
            keep, info = self.pipeline.process_document(line.strip())
            if not keep:
                contaminated += 1

        residual_rate = contaminated / len(sample)
        return {
            'sample_size': len(sample),
            'residual_contaminated': contaminated,
            'residual_rate': residual_rate,
            'acceptable': residual_rate < 0.001,  # Less than 0.1%
        }

    def generate_report(self, stats, audit_results):
        """Generate a decontamination compliance report."""
        total = sum(stats.values())
        report = {
            'summary': {
                'total_documents': total,
                'clean_documents': stats['clean'],
                'removed_documents': total - stats['clean'],
                'removal_rate': (total - stats['clean']) / total,
            },
            'by_method': {
                'ngram': stats.get('contaminated_ngram', 0),
                'semantic': stats.get('contaminated_semantic', 0),
            },
            'audit': audit_results,
            'conclusion': (
                'PASS' if audit_results['acceptable']
                else 'FAIL: residual contamination above threshold'
            ),
        }
        return report
πŸ’‘ Tip

Run decontamination before every training run, not just once. Benchmarks evolve (new versions, new benchmarks), training data sources change, and previously clean sources may have been updated to include benchmark content. Keep the decontamination pipeline as a permanent stage in the data processing DAG, triggered whenever benchmarks or data sources are updated.

Decontamination is not optional. A model evaluated on contaminated benchmarks produces misleading results that waste downstream engineering effort: teams build on inflated capabilities that do not generalize. The pipeline described here β€” n-gram fingerprinting for verbatim matches, semantic similarity for paraphrases, Bloom filters for memory efficiency, and distributed processing for scale β€” can decontaminate 15T tokens in under 8 hours on a 64-machine cluster. The compute cost is negligible compared to training cost, and the integrity of evaluation results is worth far more.