Part of Series The Dataset Frontier 11 of 27
1 Synthetic Data Pipelines: Magpie, Nemotron-4, and Generating Training Data at Scale 2 Data Curation at Scale: DCLM, FineWeb-Edu, and the Exact Heuristics That Filter the Web 3 Agent-Based Simulation: Using 10,000 AI Agents to Generate Synthetic Training Data 4 Code Dataset Curation: Deduplication, License Filtering, and Quality Scoring for LLM Training 5 Multilingual Data: Cross-Lingual Transfer, Low-Resource Languages, and Translation Quality 6 Instruction Tuning Data: ShareGPT, OpenAssistant, and Quality Metrics for Alignment 7 Preference Data: Building DPO/RLHF Datasets from Human and AI Feedback 8 Data Mixing: Optimal Proportions of Code, Math, Web, and Books for LLM Training 9 Evaluation Datasets: Building Benchmarks That Actually Measure LLM Capability 10 Data Contamination: Detecting and Preventing Benchmark Leakage in Training Data 11 The Data Scaling Law: How Much Data Is Enough, and What Happens When You Run Out 12 Training a Tokenizer from Scratch: BPE Merge Rules, Vocabulary Optimization, and Compression Ratio 13 Multimodal Training Data: Image-Text Pairs, Video Captioning, and Interleaved Document Formats 14 RLHF Data at Scale: Collecting Millions of Human Preferences with Minimal Cost 15 Building a Decontamination Pipeline: Removing Benchmark Data from Training Corpora 16 Safety Training Data: Red Teaming, Refusal Training, and Building Datasets for Harmless AI 17 Data Versioning and Reproducibility: Tracking What Changed Between Training Runs 18 Domain-Specific Data: Building Medical, Legal, and Financial Training Datasets 19 Data Attribution and Provenance: Tracing Model Outputs Back to Training Examples 20 The Data Flywheel: Using Production Logs to Continuously Improve Training Data 21 Reward Model Training Data: Building Datasets for Math Verification and Code Correctness 22 Long-Context Training Data: Book-Length Documents, Multi-Document QA, and Needle-in-Haystack 23 Agentic Interaction Data: Tool Use Traces, Multi-Step Planning Logs, and Environment Feedback 24 Data Labeling Platforms: Scale AI, Surge AI, and Building Your Own Annotation Pipeline 25 Data Legal Issues: Copyright, Fair Use, Opt-Out, and the Regulatory Landscape for Training Data 26 Data Pipeline at Scale: Spark, Ray, and Processing 15 Trillion Tokens Across 1000 Nodes 27 Building a Data Pipeline: From Raw HTML to Clean Training Tokens in 500 Lines

Llama 2 found 2% exact n-gram overlap between MMLU test set and its training corpus. The contaminated subset scored 4.2 points higher than clean subsets β€” proof that memorization inflates scores. When you train on 15 trillion tokens scraped from GitHub, Reddit, Pastebin, and academic blogs, the probability that MMLU questions appear verbatim approaches 100%. Contamination is not a rare bug; it is the default outcome of web-scale training, and every benchmark score is suspect until proven clean.

Why Contamination Matters

The Scale of the Problem

Modern training corpora are assembled from web crawls that index billions of pages. The major evaluation benchmarks are relatively small:

πŸ“Š

Benchmark Sizes vs Training Corpus Size

BenchmarkExamplesTokens% of 15T Training Corpus
MMLU 14,042 ~2M 0.00001%
HumanEval 164 ~50K 0.0000003%
GSM8K 8,792 ~1.5M 0.00001%
HellaSwag 10,042 ~3M 0.00002%
TruthfulQA 817 ~200K 0.000001%
ARC-Challenge 1,172 ~300K 0.000002%

These benchmarks are negligible fractions of training data by volume. But the internet contains them many times over β€” MMLU questions appear in Quizlet, Chegg, course websites, study guides, and discussion forums. A single Common Crawl dump may contain 100+ copies of each benchmark question across different websites.

The Impact on Scores

Contamination inflates scores. The magnitude depends on the contamination rate:

Estimated Score Inflation vs Contamination Rate

(Score inflation (points))
0% contaminated True performance
0 Score inflation (points)
5% contaminated +2 points
2 Score inflation (points)
15% contaminated +5 points
5 Score inflation (points)
30% contaminated +10 points
10 Score inflation (points)
50% contaminated +18 points
18 Score inflation (points)
80% contaminated +30 points (meaningless)
30 Score inflation (points)

A model with 15% of MMLU questions in its training data might score 5 points higher than its true capability. At 50% contamination, scores are essentially meaningless β€” the model is largely reciting memorized answers.

Detection Method 1: N-gram Overlap

The Approach

The simplest detection method: check whether sequences of nn consecutive words from the benchmark appear in the training data. If an 8-gram from an MMLU question appears in a training document, that document likely contains (part of) the question.

Implementation

import hashlib
import json
from collections import defaultdict

class NgramContaminationDetector:
    """
    Detect benchmark contamination using n-gram overlap.

    Build an index of n-grams from benchmark questions,
    then scan training documents for matches.
    """

    def __init__(self, n=13):
        """
        n: n-gram size. Larger n = fewer false positives but
        misses paraphrased contamination. 13 is a good default
        (used by GPT-4 technical report).
        """
        self.n = n
        self.benchmark_ngrams = {}  # ngram_hash -> set of benchmark IDs
        self.benchmark_data = {}  # benchmark_id -> original text

    def _normalize(self, text):
        """Lowercase, collapse whitespace, remove punctuation."""
        import re
        text = text.lower().strip()
        text = re.sub(r'[^\w\s]', '', text)
        text = re.sub(r'\s+', ' ', text)
        return text

    def _hash_ngram(self, ngram):
        """Hash an n-gram for memory-efficient storage."""
        return hashlib.md5(ngram.encode()).hexdigest()[:12]

    def _extract_ngrams(self, text):
        """Extract word-level n-grams from normalized text."""
        words = self._normalize(text).split()
        if len(words) < self.n:
            return []
        return [
            " ".join(words[i:i+self.n])
            for i in range(len(words) - self.n + 1)
        ]

    def index_benchmark(self, benchmark_name, questions):
        """
        Index a benchmark's questions for contamination detection.

        benchmark_name: string identifier (e.g., 'mmlu')
        questions: list of question strings
        """
        for i, question in enumerate(questions):
            bid = f"{benchmark_name}_{i}"
            self.benchmark_data[bid] = question

            ngrams = self._extract_ngrams(question)
            for ngram in ngrams:
                h = self._hash_ngram(ngram)
                if h not in self.benchmark_ngrams:
                    self.benchmark_ngrams[h] = set()
                self.benchmark_ngrams[h].add(bid)

        print(f"Indexed {len(questions)} questions from "
              f"{benchmark_name} ({len(self.benchmark_ngrams)} "
              f"unique n-gram hashes)")

    def check_document(self, document_text):
        """
        Check a training document for benchmark contamination.

        Returns dict mapping benchmark_id -> overlap_count
        """
        doc_ngrams = self._extract_ngrams(document_text)
        matches = defaultdict(int)

        for ngram in doc_ngrams:
            h = self._hash_ngram(ngram)
            if h in self.benchmark_ngrams:
                for bid in self.benchmark_ngrams[h]:
                    matches[bid] += 1

        return dict(matches)

    def scan_corpus(self, document_iterator, report_threshold=3):
        """
        Scan an entire training corpus for contamination.

        document_iterator: yields (doc_id, text) tuples
        report_threshold: minimum n-gram matches to report

        Returns contamination report.
        """
        contaminated_docs = []
        benchmark_hit_counts = defaultdict(int)
        docs_scanned = 0

        for doc_id, text in document_iterator:
            docs_scanned += 1
            matches = self.check_document(text)

            for bid, count in matches.items():
                if count >= report_threshold:
                    contaminated_docs.append({
                        "document_id": doc_id,
                        "benchmark_id": bid,
                        "ngram_matches": count,
                    })
                    benchmark_hit_counts[bid] += 1

            if docs_scanned % 100000 == 0:
                print(f"Scanned {docs_scanned} documents, "
                      f"found {len(contaminated_docs)} contaminated")

        # Compute per-benchmark contamination rates
        report = {
            "documents_scanned": docs_scanned,
            "contaminated_documents": len(contaminated_docs),
            "per_benchmark": {},
        }

        benchmarks = set(
            bid.rsplit("_", 1)[0]
            for bid in self.benchmark_data
        )
        for benchmark in benchmarks:
            total_questions = sum(
                1 for bid in self.benchmark_data
                if bid.startswith(benchmark + "_")
            )
            contaminated_questions = sum(
                1 for bid in benchmark_hit_counts
                if bid.startswith(benchmark + "_")
            )
            report["per_benchmark"][benchmark] = {
                "total_questions": total_questions,
                "contaminated_questions": contaminated_questions,
                "contamination_rate": (
                    contaminated_questions / max(total_questions, 1)
                ),
            }

        return report

Choosing N

The n-gram size trades off between false positives and false negatives:

πŸ“Š

N-gram Size vs Detection Characteristics

NFalse Positive RateFalse Negative RateUsed By
8 High (common phrases match) Low Llama 2
10 Moderate Low Common default
13 Low Moderate GPT-4 technical report
20 Very low High (misses paraphrased) Conservative
30 Near zero Very high Only catches verbatim

n=13n = 13 is the standard choice. It is long enough that random 13-word sequences are unlikely to match by chance, but short enough to catch questions that have been lightly reformatted (e.g., whitespace changes, minor punctuation differences).

Detection Method 2: Perplexity Comparison

The Intuition

A model that has memorized a benchmark question will assign it much lower perplexity than a similar but unseen question. If we compute the model’s perplexity on benchmark questions and compare it to perplexity on paraphrased versions, a large gap indicates memorization.

Ξ”ppl=PPL(paraphrase)βˆ’PPL(original)\Delta_{\text{ppl}} = \text{PPL}(\text{paraphrase}) - \text{PPL}(\text{original})

If Ξ”ppl\Delta_{\text{ppl}} is large and positive, the model has memorized the original but not the paraphrase. If Ξ”ppl\Delta_{\text{ppl}} is near zero, the model has learned the underlying knowledge (both versions have similar perplexity).

Implementation

import math
import numpy as np

class PerplexityContaminationDetector:
    """
    Detect contamination by comparing perplexity of original
    benchmark questions vs paraphrased versions.

    If the model has memorized the original, it will have
    much lower perplexity on the original than the paraphrase.
    """

    def __init__(self, model, tokenizer):
        """
        model: language model with forward() method
        tokenizer: tokenizer with encode() method
        """
        self.model = model
        self.tokenizer = tokenizer

    def compute_perplexity(self, text):
        """Compute per-token perplexity for a text string."""
        tokens = self.tokenizer.encode(text)
        if len(tokens) < 2:
            return float('inf')

        # Get log probabilities from model
        # This is a simplified version -- real implementation
        # would batch and use proper autoregressive scoring
        log_probs = self.model.score_tokens(tokens)

        # Perplexity = exp(-mean(log_probs))
        avg_log_prob = sum(log_probs) / len(log_probs)
        return math.exp(-avg_log_prob)

    def detect_memorization(
        self,
        original_questions,
        paraphrased_questions,
        threshold_ratio=2.0,
    ):
        """
        Compare perplexity of originals vs paraphrases.

        original_questions: list of benchmark question strings
        paraphrased_questions: list of paraphrased versions
            (same length, corresponding indices)
        threshold_ratio: PPL(paraphrase)/PPL(original) ratio
            above which we flag contamination

        Returns list of (index, original_ppl, paraphrase_ppl, ratio)
        for flagged questions.
        """
        assert len(original_questions) == len(paraphrased_questions)

        flagged = []
        all_ratios = []

        for i in range(len(original_questions)):
            orig_ppl = self.compute_perplexity(original_questions[i])
            para_ppl = self.compute_perplexity(paraphrased_questions[i])

            if orig_ppl > 0:
                ratio = para_ppl / orig_ppl
            else:
                ratio = 0.0

            all_ratios.append(ratio)

            if ratio >= threshold_ratio:
                flagged.append({
                    "index": i,
                    "original_ppl": orig_ppl,
                    "paraphrase_ppl": para_ppl,
                    "ratio": ratio,
                })

        return {
            "flagged_count": len(flagged),
            "total_count": len(original_questions),
            "contamination_rate": len(flagged) / max(len(original_questions), 1),
            "mean_ratio": float(np.mean(all_ratios)),
            "median_ratio": float(np.median(all_ratios)),
            "flagged_questions": flagged,
        }

Generating Paraphrases

The quality of the perplexity-based detection depends on the quality of paraphrases. Good paraphrases preserve the meaning but change the surface form enough that a non-memorizing model would assign similar perplexity to both versions.

def generate_paraphrases_for_detection(questions, paraphrase_model):
    """
    Generate paraphrases for perplexity-based contamination detection.

    Requirements for good paraphrases:
    1. Preserve semantic meaning
    2. Change surface form (word choice, sentence structure)
    3. Maintain similar complexity/length
    4. Do NOT simplify or complexify the question
    """
    paraphrases = []

    for question in questions:
        prompt = (
            "Rewrite the following question to have the exact same "
            "meaning but different wording. Keep the same difficulty "
            "level and length. Do not add or remove information.\n\n"
            f"Original: {question}\n\n"
            "Rewritten:"
        )

        paraphrase = paraphrase_model.generate(prompt, max_tokens=500)
        paraphrases.append(paraphrase.strip())

    return paraphrases

def validate_paraphrases(originals, paraphrases, embedding_model):
    """
    Validate that paraphrases are semantically equivalent
    but lexically different from originals.
    """
    valid = []

    for orig, para in zip(originals, paraphrases):
        # Semantic similarity (should be high)
        orig_emb = embedding_model.encode(orig)
        para_emb = embedding_model.encode(para)
        cos_sim = float(np.dot(orig_emb, para_emb) / (
            np.linalg.norm(orig_emb) * np.linalg.norm(para_emb)
        ))

        # Lexical overlap (should be moderate, not too high)
        orig_words = set(orig.lower().split())
        para_words = set(para.lower().split())
        jaccard = len(orig_words & para_words) / max(
            len(orig_words | para_words), 1
        )

        # Good paraphrase: high semantic similarity, moderate lexical overlap
        is_valid = cos_sim > 0.85 and jaccard < 0.7

        valid.append({
            "original": orig,
            "paraphrase": para,
            "semantic_similarity": cos_sim,
            "lexical_overlap": jaccard,
            "is_valid": is_valid,
        })

    return valid
ℹ️ Perplexity Method Limitations

The perplexity method requires access to model internals (log probabilities) and a good paraphrase generator. It cannot detect contamination in closed-source models where only API access is available. It also produces false positives on questions that happen to use common phrasing β€” β€œWhat is the capital of France?” will have low perplexity regardless of memorization because it is a simple, common sentence.

Detection Method 3: Exact Match After Canonicalization

Canonicalization

Before checking for exact matches, both the benchmark text and training text are canonicalized: lowercased, whitespace-collapsed, punctuation-removed, and optionally stemmed. This catches near-duplicates that differ only in formatting.

import re
import hashlib

class CanonicalMatchDetector:
    """
    Detect contamination using exact match after text canonicalization.
    Fast and precise, but misses paraphrased contamination.
    """

    def __init__(self, strip_punctuation=True, stem=False):
        self.strip_punctuation = strip_punctuation
        self.stem = stem
        self.benchmark_hashes = {}  # hash -> benchmark_id

    def _canonicalize(self, text):
        """Normalize text to canonical form."""
        text = text.lower().strip()
        text = re.sub(r'\s+', ' ', text)

        if self.strip_punctuation:
            text = re.sub(r'[^\w\s]', '', text)

        if self.stem:
            # Simple suffix stripping (not a full stemmer)
            words = text.split()
            words = [
                w[:-3] if len(w) > 5 and w.endswith("ing") else
                w[:-2] if len(w) > 4 and w.endswith("ed") else
                w[:-1] if len(w) > 4 and w.endswith("s") else
                w
                for w in words
            ]
            text = " ".join(words)

        return text

    def _hash(self, text):
        return hashlib.sha256(text.encode()).hexdigest()

    def index_benchmark(self, benchmark_name, questions):
        """Index benchmark questions by their canonical hash."""
        for i, q in enumerate(questions):
            canonical = self._canonicalize(q)
            h = self._hash(canonical)
            self.benchmark_hashes[h] = f"{benchmark_name}_{i}"

        # Also index substrings (sliding window of sentences)
        for i, q in enumerate(questions):
            sentences = re.split(r'[.!?]+', q)
            for sent in sentences:
                if len(sent.split()) >= 8:
                    canonical = self._canonicalize(sent)
                    h = self._hash(canonical)
                    self.benchmark_hashes[h] = f"{benchmark_name}_{i}_partial"

    def check_document(self, text):
        """
        Check if a document contains exact matches of benchmark text.
        Returns list of matched benchmark IDs.
        """
        matches = []

        # Check full document
        canonical = self._canonicalize(text)
        h = self._hash(canonical)
        if h in self.benchmark_hashes:
            matches.append(self.benchmark_hashes[h])

        # Check paragraphs
        paragraphs = text.split("\n\n")
        for para in paragraphs:
            if len(para.split()) < 8:
                continue
            canonical = self._canonicalize(para)
            h = self._hash(canonical)
            if h in self.benchmark_hashes:
                matches.append(self.benchmark_hashes[h])

        # Check sliding window of sentences
        sentences = re.split(r'[.!?]+', text)
        for sent in sentences:
            if len(sent.split()) < 8:
                continue
            canonical = self._canonicalize(sent)
            h = self._hash(canonical)
            if h in self.benchmark_hashes:
                matches.append(self.benchmark_hashes[h])

        return list(set(matches))

Combining Detection Methods

The Ensemble Approach

No single detection method catches all contamination. The ensemble combines all three methods:

class EnsembleContaminationDetector:
    """
    Combine multiple detection methods for robust contamination detection.

    Detection levels:
    - Level 1 (certain): Exact canonical match
    - Level 2 (likely): High n-gram overlap (50%+ of question n-grams)
    - Level 3 (possible): Moderate n-gram overlap OR perplexity anomaly
    """

    def __init__(
        self,
        ngram_detector,
        canonical_detector,
        perplexity_detector=None,
    ):
        self.ngram = ngram_detector
        self.canonical = canonical_detector
        self.perplexity = perplexity_detector

    def classify_document(self, document_text):
        """
        Classify a document's contamination level.

        Returns:
        - "clean": no contamination detected
        - "certain": exact match found
        - "likely": strong n-gram overlap
        - "possible": moderate signals
        """
        # Level 1: Exact match
        canonical_matches = self.canonical.check_document(document_text)
        if canonical_matches:
            return "certain", canonical_matches

        # Level 2: N-gram overlap
        ngram_matches = self.ngram.check_document(document_text)
        strong_matches = [
            bid for bid, count in ngram_matches.items()
            if count >= 5
        ]
        if strong_matches:
            return "likely", strong_matches

        # Level 3: Weak n-gram overlap
        weak_matches = [
            bid for bid, count in ngram_matches.items()
            if count >= 2
        ]
        if weak_matches:
            return "possible", weak_matches

        return "clean", []

    def scan_and_report(self, document_iterator):
        """
        Scan corpus and produce a contamination report.
        """
        counts = {
            "clean": 0,
            "certain": 0,
            "likely": 0,
            "possible": 0,
        }

        contaminated_docs = []

        for doc_id, text in document_iterator:
            level, matches = self.classify_document(text)
            counts[level] += 1

            if level != "clean":
                contaminated_docs.append({
                    "doc_id": doc_id,
                    "level": level,
                    "matched_benchmarks": matches,
                })

        total = sum(counts.values())
        return {
            "total_documents": total,
            "counts": counts,
            "rates": {
                k: v / max(total, 1)
                for k, v in counts.items()
            },
            "contaminated_documents": contaminated_docs,
        }

Detection Method Coverage (What Each Method Catches)

(% of contaminated docs detected)
Exact match Verbatim copies only
100 % of contaminated docs detected
N-gram (n=13) +reformatted copies
85 % of contaminated docs detected
Perplexity +some paraphrases
65 % of contaminated docs detected
Ensemble Combined coverage
92 % of contaminated docs detected

Prevention Strategies

Strategy 1: Hold-Out Evaluation Sets

Never publish your evaluation set online. This sounds obvious but fails in practice because:

  1. Researchers publish evaluation results with example questions in papers
  2. Models are evaluated on public benchmarks that already exist online
  3. Even β€œprivate” evaluation sets leak through API queries that get logged and scraped
class PrivateEvalManager:
    """
    Manage private evaluation sets that are never exposed
    to the internet.
    """

    def __init__(self, eval_data, access_log_path):
        self.eval_data = eval_data  # In-memory only
        self.access_log = access_log_path
        # NEVER serialize eval_data to a web-accessible location

    def evaluate_model(self, model_fn, subset_size=None):
        """
        Run evaluation locally. The model function receives
        questions one at a time and returns predictions.

        model_fn: callable(question) -> answer
        """
        data = self.eval_data
        if subset_size and subset_size < len(data):
            import random
            data = random.sample(data, subset_size)

        correct = 0
        total = 0

        for item in data:
            prediction = model_fn(item["question"])
            is_correct = self._check_answer(
                prediction, item["answer"]
            )
            correct += int(is_correct)
            total += 1

        # Log evaluation (without the questions/answers)
        self._log_access(total, correct / max(total, 1))

        return {
            "accuracy": correct / max(total, 1),
            "total": total,
            "correct": correct,
        }

    def _check_answer(self, prediction, ground_truth):
        """Compare prediction to ground truth."""
        pred_clean = prediction.strip().lower()
        truth_clean = ground_truth.strip().lower()
        return pred_clean == truth_clean

    def _log_access(self, num_evaluated, accuracy):
        """Log who evaluated what, without logging the actual data."""
        import datetime
        entry = {
            "timestamp": datetime.datetime.now().isoformat(),
            "num_evaluated": num_evaluated,
            "accuracy": accuracy,
        }
        with open(self.access_log, "a") as f:
            f.write(json.dumps(entry) + "\n")

Strategy 2: Dynamic Benchmarks

Generate fresh evaluation questions for each evaluation run, making pre-contamination impossible.

class DynamicBenchmarkGenerator:
    """
    Generate fresh evaluation questions per run,
    making contamination impossible.
    """

    def __init__(self, question_templates, knowledge_base):
        """
        question_templates: parameterized question patterns
        knowledge_base: facts to populate templates
        """
        self.templates = question_templates
        self.kb = knowledge_base

    def generate_eval_set(self, size, seed):
        """
        Generate a deterministic but unique evaluation set.

        Different seeds produce different questions -- the same
        seed reproduces the same set for reproducibility.
        """
        import random
        rng = random.Random(seed)

        questions = []
        for _ in range(size):
            # Select random template
            template = rng.choice(self.templates)

            # Fill in with random knowledge
            params = {}
            for param_name, kb_key in template["params"].items():
                options = self.kb[kb_key]
                params[param_name] = rng.choice(options)

            question_text = template["text"].format(**params)
            answer = template["answer_fn"](params)

            questions.append({
                "question": question_text,
                "answer": answer,
                "template_id": template["id"],
            })

        return questions

# Example: dynamic math evaluation
templates = [
    {
        "id": "arithmetic_1",
        "text": "What is {a} * {b} + {c}?",
        "params": {
            "a": "integers",
            "b": "integers",
            "c": "integers",
        },
        "answer_fn": lambda p: str(
            int(p["a"]) * int(p["b"]) + int(p["c"])
        ),
    },
]

kb = {
    "integers": [str(i) for i in range(2, 100)],
}

generator = DynamicBenchmarkGenerator(templates, kb)
eval_set_1 = generator.generate_eval_set(size=100, seed=42)
eval_set_2 = generator.generate_eval_set(size=100, seed=43)
# Different questions, same difficulty distribution

Strategy 3: Canary Strings

Embed unique canary strings in evaluation data. If these strings appear in a model’s training data (detectable via prompting), the evaluation set has been contaminated.

import secrets

def create_canary_protected_benchmark(questions, canary_prefix="EVAL_CANARY"):
    """
    Add canary strings to benchmark questions.
    If a model can complete the canary, the benchmark leaked.
    """
    protected = []
    canary_registry = {}

    for i, q in enumerate(questions):
        # Generate unique canary
        canary = f"{canary_prefix}_{secrets.token_hex(8)}"
        canary_registry[canary] = i

        # Embed canary in a way that's invisible to humans
        # but detectable if memorized
        protected_q = f"[{canary}] {q}"

        protected.append({
            "question": q,
            "canary_question": protected_q,
            "canary": canary,
        })

    return protected, canary_registry

def check_for_canary_leakage(model_fn, canary_registry):
    """
    Test if a model has memorized canary strings.
    Prompt the model with partial canaries and check if it
    can complete them.
    """
    leaked = []

    for canary, question_idx in canary_registry.items():
        # Give model the first half of the canary
        prefix = canary[:len(canary)//2]
        prompt = f"Complete this string: {prefix}"

        completion = model_fn(prompt)

        # Check if completion contains the full canary
        if canary in completion:
            leaked.append({
                "canary": canary,
                "question_index": question_idx,
            })

    return {
        "total_canaries": len(canary_registry),
        "leaked": len(leaked),
        "leak_rate": len(leaked) / max(len(canary_registry), 1),
        "leaked_canaries": leaked,
    }

Quantifying Contamination Impact

Controlled Experiments

The gold standard for measuring contamination impact: train two models on identical data, except one includes benchmark questions and the other does not.

def controlled_contamination_experiment(
    base_training_data,
    benchmark_questions,
    contamination_rates,
    train_fn,
    eval_fn,
):
    """
    Measure the impact of contamination at different rates.

    contamination_rates: list of floats, e.g. [0.0, 0.01, 0.05, 0.1, 0.5]
    train_fn: callable(data) -> model
    eval_fn: callable(model, questions) -> accuracy
    """
    results = []

    for rate in contamination_rates:
        # Create contaminated dataset
        num_to_inject = int(len(benchmark_questions) * rate)
        injected = benchmark_questions[:num_to_inject]

        # Format benchmark questions as training documents
        contamination_docs = []
        for q in injected:
            # Simulate how benchmarks appear on the web
            doc = (
                f"Question: {q['question']}\n"
                f"Answer: {q['answer']}\n"
            )
            contamination_docs.append(doc)

        training_data = base_training_data + contamination_docs

        # Train and evaluate
        model = train_fn(training_data)
        accuracy = eval_fn(model, benchmark_questions)

        results.append({
            "contamination_rate": rate,
            "num_injected": num_to_inject,
            "accuracy": accuracy,
        })

    # Compute inflation
    baseline = results[0]["accuracy"]  # 0% contamination
    for r in results:
        r["inflation"] = r["accuracy"] - baseline

    return results

Expected Results

πŸ“Š

Contamination Impact (Controlled Experiment on 7B Model, MMLU)

Contamination RateAccuracyInflationMeaningful?
0% (clean) 63.2% 0.0 Yes
1% 63.8% +0.6 Yes
5% 65.4% +2.2 Marginal
15% 69.1% +5.9 No
50% 81.3% +18.1 No
100% 94.7% +31.5 No (memorized)
🚨 The 5% Threshold

At 5% contamination rate, the score inflation (+2.2 points) is within the noise range of typical evaluation variance. This makes low-level contamination nearly undetectable through scores alone. The n-gram and perplexity detection methods are essential precisely because they catch contamination that score-based analysis cannot.

The Complete Decontamination Pipeline

Integration

class DecontaminationPipeline:
    """
    Complete pipeline: detect and remove contamination from
    a training corpus before model training.
    """

    def __init__(self, benchmarks):
        """
        benchmarks: dict mapping name -> list of question strings
        """
        # Build all detectors
        self.ngram_detector = NgramContaminationDetector(n=13)
        self.canonical_detector = CanonicalMatchDetector()

        for name, questions in benchmarks.items():
            self.ngram_detector.index_benchmark(name, questions)
            self.canonical_detector.index_benchmark(name, questions)

        self.ensemble = EnsembleContaminationDetector(
            self.ngram_detector,
            self.canonical_detector,
        )

    def decontaminate(self, documents, strict=True):
        """
        Remove contaminated documents from training corpus.

        documents: list of (doc_id, text) tuples
        strict: if True, remove "possible" contamination too

        Returns (clean_docs, removed_docs, report)
        """
        clean = []
        removed = []

        threshold = "possible" if strict else "likely"
        severity_order = ["certain", "likely", "possible", "clean"]

        for doc_id, text in documents:
            level, matches = self.ensemble.classify_document(text)

            level_idx = severity_order.index(level)
            threshold_idx = severity_order.index(threshold)

            if level_idx <= threshold_idx:
                removed.append((doc_id, text, level, matches))
            else:
                clean.append((doc_id, text))

        report = {
            "total": len(documents),
            "clean": len(clean),
            "removed": len(removed),
            "removal_rate": len(removed) / max(len(documents), 1),
            "by_level": {
                level: sum(
                    1 for _, _, l, _ in removed if l == level
                )
                for level in ["certain", "likely", "possible"]
            },
        }

        return clean, removed, report
πŸ’‘ The Key Insight

Data contamination detection is an arms race with no final solution. N-gram overlap catches verbatim copies. Perplexity comparison catches memorized but reformatted content. Canonical matching catches near-duplicates. But none of these catch deep paraphrases or questions that test the same knowledge through different surface forms. The ultimate defense is not detection but prevention: private evaluation sets that never touch the internet, dynamic benchmarks regenerated per evaluation, and canary strings that detect leakage. Build your evaluation infrastructure assuming contamination will happen, and design around it.