Part of Series The Dataset Frontier 20 of 27
1 Synthetic Data Pipelines: Magpie, Nemotron-4, and Generating Training Data at Scale 2 Data Curation at Scale: DCLM, FineWeb-Edu, and the Exact Heuristics That Filter the Web 3 Agent-Based Simulation: Using 10,000 AI Agents to Generate Synthetic Training Data 4 Code Dataset Curation: Deduplication, License Filtering, and Quality Scoring for LLM Training 5 Multilingual Data: Cross-Lingual Transfer, Low-Resource Languages, and Translation Quality 6 Instruction Tuning Data: ShareGPT, OpenAssistant, and Quality Metrics for Alignment 7 Preference Data: Building DPO/RLHF Datasets from Human and AI Feedback 8 Data Mixing: Optimal Proportions of Code, Math, Web, and Books for LLM Training 9 Evaluation Datasets: Building Benchmarks That Actually Measure LLM Capability 10 Data Contamination: Detecting and Preventing Benchmark Leakage in Training Data 11 The Data Scaling Law: How Much Data Is Enough, and What Happens When You Run Out 12 Training a Tokenizer from Scratch: BPE Merge Rules, Vocabulary Optimization, and Compression Ratio 13 Multimodal Training Data: Image-Text Pairs, Video Captioning, and Interleaved Document Formats 14 RLHF Data at Scale: Collecting Millions of Human Preferences with Minimal Cost 15 Building a Decontamination Pipeline: Removing Benchmark Data from Training Corpora 16 Safety Training Data: Red Teaming, Refusal Training, and Building Datasets for Harmless AI 17 Data Versioning and Reproducibility: Tracking What Changed Between Training Runs 18 Domain-Specific Data: Building Medical, Legal, and Financial Training Datasets 19 Data Attribution and Provenance: Tracing Model Outputs Back to Training Examples 20 The Data Flywheel: Using Production Logs to Continuously Improve Training Data 21 Reward Model Training Data: Building Datasets for Math Verification and Code Correctness 22 Long-Context Training Data: Book-Length Documents, Multi-Document QA, and Needle-in-Haystack 23 Agentic Interaction Data: Tool Use Traces, Multi-Step Planning Logs, and Environment Feedback 24 Data Labeling Platforms: Scale AI, Surge AI, and Building Your Own Annotation Pipeline 25 Data Legal Issues: Copyright, Fair Use, Opt-Out, and the Regulatory Landscape for Training Data 26 Data Pipeline at Scale: Spark, Ray, and Processing 15 Trillion Tokens Across 1000 Nodes 27 Building a Data Pipeline: From Raw HTML to Clean Training Tokens in 500 Lines

The New York Times alleges GPT-4 was trained on NYT articles. OpenAI must either prove those articles were not in the training corpus, or defend fair use. Data attribution techniques — influence functions, membership inference, and model inversion — can answer the factual question: was this specific article in the training data? The legal stakes are billions of dollars, and the technical capability to definitively answer “yes” or “no” determines the outcome of copyright lawsuits against every frontier lab.

This post covers the mathematics and implementation of all three: influence functions, Data Shapley, and membership inference, along with the legal implications that make these techniques increasingly important.

The Attribution Problem

Why Attribution Matters

from dataclasses import dataclass
from enum import Enum

class AttributionUseCase(Enum):
    COPYRIGHT = "copyright"
    DEBUGGING = "debugging"
    COMPENSATION = "compensation"
    SAFETY = "safety"
    COMPLIANCE = "compliance"

@dataclass
class AttributionScenario:
    use_case: AttributionUseCase
    question: str
    stakeholder: str
    required_precision: str
    legal_relevance: str

ATTRIBUTION_SCENARIOS = [
    AttributionScenario(
        use_case=AttributionUseCase.COPYRIGHT,
        question="Did the model memorize this copyrighted text?",
        stakeholder="Copyright holders (authors, publishers)",
        required_precision="High -- must withstand legal scrutiny",
        legal_relevance="NYT v. OpenAI, Doe v. GitHub (Copilot), "
                        "Getty v. Stability AI",
    ),
    AttributionScenario(
        use_case=AttributionUseCase.DEBUGGING,
        question="Which training examples caused this incorrect output?",
        stakeholder="Model developers",
        required_precision="Moderate -- top-k most influential",
        legal_relevance="Product liability if harmful outputs "
                        "are traceable to known-bad data",
    ),
    AttributionScenario(
        use_case=AttributionUseCase.COMPENSATION,
        question="How much did this data provider's contribution "
                 "improve model quality?",
        stakeholder="Data providers, content creators",
        required_precision="Fair ranking of contributions",
        legal_relevance="Emerging data compensation frameworks "
                        "(EU AI Act data governance provisions)",
    ),
    AttributionScenario(
        use_case=AttributionUseCase.SAFETY,
        question="Which training examples are responsible for "
                 "this harmful behavior?",
        stakeholder="Safety researchers, regulators",
        required_precision="High -- must identify causal examples",
        legal_relevance="AI safety regulations requiring "
                        "explainability of model behavior",
    ),
]

Influence Functions

Mathematical Foundation

The influence function measures the effect of removing a single training example on the model’s prediction. For a model with parameters θ\theta^* trained on dataset D={z1,z2,,zn}D = \{z_1, z_2, \ldots, z_n\} by minimizing the empirical risk R(θ)=1ni=1nL(zi,θ)R(\theta) = \frac{1}{n} \sum_{i=1}^n L(z_i, \theta), the influence of removing training point ziz_i on the loss at a test point ztestz_{\text{test}} is:

I(zi,ztest)=θL(ztest,θ)Hθ1θL(zi,θ)I(z_i, z_{\text{test}}) = -\nabla_\theta L(z_{\text{test}}, \theta^*)^\top H_{\theta^*}^{-1} \nabla_\theta L(z_i, \theta^*)

where Hθ=1ni=1nθ2L(zi,θ)H_{\theta^*} = \frac{1}{n} \sum_{i=1}^n \nabla_\theta^2 L(z_i, \theta^*) is the Hessian of the training loss.

The intuition: if removing ziz_i would increase the loss on ztestz_{\text{test}}, then ziz_i was helpful for predicting ztestz_{\text{test}}. If removing it would decrease the loss, ziz_i was harmful.

import torch
import torch.nn as nn
import numpy as np
from torch.autograd import grad

class InfluenceFunction:
    """
    Compute influence functions for a trained model.

    For an LLM with billions of parameters, exact Hessian
    computation is infeasible. We use approximations:
    1. LiSSA (Linear time Stochastic Second-order Algorithm)
    2. Arnoldi iteration for top eigenvalues of the Hessian
    3. Kronecker-factored approximation (K-FAC)
    """

    def __init__(self, model, train_dataset):
        self.model = model
        self.train_dataset = train_dataset
        self.device = next(model.parameters()).device

    def compute_influence(self, test_example, train_idx,
                          method="lissa", recursion_depth=100,
                          scale=25.0, damping=0.01):
        """
        Compute the influence of training example train_idx
        on the loss at test_example.

        Returns a scalar: positive means the training example
        helps (removing it would increase test loss),
        negative means it hurts.
        """
        # Step 1: Compute gradient of test loss
        test_grad = self._compute_gradient(test_example)

        # Step 2: Compute inverse-Hessian-vector product (IHVP)
        # H^{-1} * test_grad
        if method == "lissa":
            ihvp = self._lissa(
                test_grad, recursion_depth, scale, damping
            )
        elif method == "arnoldi":
            ihvp = self._arnoldi_ihvp(test_grad)
        else:
            raise ValueError(f"Unknown method: {method}")

        # Step 3: Compute gradient of training loss at train_idx
        train_example = self.train_dataset[train_idx]
        train_grad = self._compute_gradient(train_example)

        # Step 4: Influence = -ihvp^T * train_grad
        influence = -sum(
            (i * t).sum().item()
            for i, t in zip(ihvp, train_grad)
        )

        return influence

    def _compute_gradient(self, example):
        """Compute per-example gradient of the loss."""
        self.model.zero_grad()

        input_ids = example["input_ids"].unsqueeze(0).to(self.device)
        labels = example["labels"].unsqueeze(0).to(self.device)

        outputs = self.model(input_ids=input_ids, labels=labels)
        loss = outputs.loss
        loss.backward()

        # Collect gradients
        grads = []
        for param in self.model.parameters():
            if param.grad is not None:
                grads.append(param.grad.detach().clone())
            else:
                grads.append(torch.zeros_like(param))

        return grads

    def _lissa(self, v, recursion_depth, scale, damping):
        """
        LiSSA: Linear time Stochastic Second-order Algorithm.

        Approximates H^{-1} * v by iteratively computing:
        h_t = v + (1 - damping) * h_{t-1} - (1/scale) *
              H_sample * h_{t-1}

        where H_sample is the Hessian estimated from a
        random mini-batch.

        This avoids ever forming or inverting the full Hessian.
        """
        # Initialize with v
        ihvp_estimate = [vi.clone() for vi in v]

        for t in range(recursion_depth):
            # Sample a random training example
            idx = np.random.randint(len(self.train_dataset))
            sample = self.train_dataset[idx]

            # Compute Hessian-vector product for this sample
            hvp = self._hessian_vector_product(
                sample, ihvp_estimate
            )

            # Update: h_t = v + (1 - damping) * h_{t-1}
            #              - (1/scale) * H_sample * h_{t-1}
            for i in range(len(ihvp_estimate)):
                ihvp_estimate[i] = (
                    v[i]
                    + (1 - damping) * ihvp_estimate[i]
                    - hvp[i] / scale
                )

        return ihvp_estimate

    def _hessian_vector_product(self, example, vector):
        """
        Compute the Hessian-vector product H * v
        without forming the full Hessian.

        Uses the identity: H * v = grad(grad(L)^T * v)
        This requires two backward passes but uses O(p) memory
        instead of O(p^2) for the full Hessian.
        """
        self.model.zero_grad()

        input_ids = example["input_ids"].unsqueeze(0).to(self.device)
        labels = example["labels"].unsqueeze(0).to(self.device)

        outputs = self.model(input_ids=input_ids, labels=labels)
        loss = outputs.loss

        # First backward: get gradients
        params = [p for p in self.model.parameters() if p.requires_grad]
        grads = grad(loss, params, create_graph=True)

        # Dot product: grad^T * v
        dot_product = sum(
            (g * vi).sum()
            for g, vi in zip(grads, vector)
        )

        # Second backward: gradient of dot product gives H * v
        hvp = grad(dot_product, params)

        return [h.detach() for h in hvp]

    def _arnoldi_ihvp(self, v, n_iterations=50):
        """
        Arnoldi iteration for approximating H^{-1} * v.
        Builds a Krylov subspace and inverts the projected
        Hessian (much smaller than the full Hessian).
        """
        # Project to a small subspace using Arnoldi
        # then invert the small projected matrix
        # This is more accurate than LiSSA for the same
        # compute budget but harder to implement

        # Simplified: fall back to LiSSA
        return self._lissa(v, n_iterations, 25.0, 0.01)

Finding Top-K Influential Training Examples

class InfluenceRanker:
    """
    Find the top-k most influential training examples
    for a given model output.

    For a training set of N examples, computing all N
    influence scores exactly costs O(N * p * d) where
    p is the number of parameters and d is the LiSSA depth.

    Optimizations:
    1. Precompute and cache training gradients
    2. Use random projections to reduce gradient dimensionality
    3. Use approximate nearest neighbor search in gradient space
    """

    def __init__(self, influence_fn, n_projections=256):
        self.influence_fn = influence_fn
        self.n_projections = n_projections
        self.gradient_cache = None
        self.projection_matrix = None

    def build_gradient_cache(self, batch_size=32):
        """
        Precompute and cache training gradients.
        Use random projections to reduce storage:
        instead of storing p-dimensional gradients,
        store n_projections-dimensional projections.
        """
        n_train = len(self.influence_fn.train_dataset)

        # Initialize random projection
        total_params = sum(
            p.numel() for p in
            self.influence_fn.model.parameters()
            if p.requires_grad
        )
        # In production: use sparse random projection
        # (Achlioptas 2003) for memory efficiency

        projected_grads = np.zeros(
            (n_train, self.n_projections), dtype=np.float32
        )

        for i in range(n_train):
            example = self.influence_fn.train_dataset[i]
            grad_vec = self.influence_fn._compute_gradient(example)

            # Flatten and project
            flat_grad = torch.cat([g.flatten() for g in grad_vec])
            # Random projection: multiply by random matrix
            # In production: use structured random matrix
            projected = flat_grad[:self.n_projections].cpu().numpy()
            projected_grads[i] = projected

        self.gradient_cache = projected_grads
        return {"n_train": n_train, "projection_dim": self.n_projections}

    def find_top_k(self, test_example, k=10):
        """
        Find the k most influential training examples.

        Strategy: use projected gradient similarity as a
        cheap proxy, then compute exact influence for the
        top candidates.
        """
        # Step 1: Compute test gradient and project
        test_grad = self.influence_fn._compute_gradient(test_example)
        test_flat = torch.cat([g.flatten() for g in test_grad])
        test_projected = test_flat[:self.n_projections].cpu().numpy()

        # Step 2: Find top candidates by cosine similarity
        # in projected space (cheap)
        similarities = np.dot(
            self.gradient_cache, test_projected
        ) / (
            np.linalg.norm(self.gradient_cache, axis=1)
            * np.linalg.norm(test_projected) + 1e-8
        )

        # Get top-100 candidates
        candidate_count = min(100, len(similarities))
        top_candidates = np.argsort(similarities)[-candidate_count:]

        # Step 3: Compute exact influence for top candidates
        influences = []
        for idx in top_candidates:
            influence = self.influence_fn.compute_influence(
                test_example, int(idx)
            )
            influences.append({
                "train_idx": int(idx),
                "influence_score": influence,
                "gradient_similarity": float(similarities[idx]),
            })

        # Sort by influence score (most positive = most helpful)
        influences.sort(
            key=lambda x: abs(x["influence_score"]),
            reverse=True,
        )

        return influences[:k]
📊

Influence Function Computation Cost

MethodPer-Example CostMemoryAccuracy vs ExactPractical for LLMs
Exact Hessian inversion O(p^3) O(p^2) Exact No (p = 10B+)
LiSSA (depth=100) O(100 * p) O(p) ~90% Marginal (slow)
Arnoldi (50 iterations) O(50 * p) O(50 * p) ~95% Marginal
Random projection + LiSSA O(100 * d) O(d * N) ~80% Yes (d=256)
TracIn (gradient dot product) O(p) O(p * C) ~70% Yes
TRAK (random features) O(k * p) O(k * N) ~85% Yes (k=32)
Note: p = number of parameters, N = training set size, d = projection dimension, C = number of checkpoints, k = number of random features.

Data Shapley Values

Fair Attribution Across the Training Set

Influence functions measure the effect of a single point. Data Shapley values provide a principled way to distribute credit across all training examples. Based on the Shapley value from cooperative game theory, the Data Shapley value of training example ziz_i is:

ϕi=1nk=0n1(n1k)1SD{zi},S=k[v(S{zi})v(S)]\phi_i = \frac{1}{n} \sum_{k=0}^{n-1} \binom{n-1}{k}^{-1} \sum_{S \subseteq D \setminus \{z_i\}, |S|=k} [v(S \cup \{z_i\}) - v(S)]

where v(S)v(S) is the model’s performance when trained on subset SS. This is the average marginal contribution of ziz_i across all possible training subsets.

import itertools

class DataShapley:
    """
    Compute Data Shapley values for training examples.

    Exact computation requires evaluating 2^n subsets,
    which is infeasible for n > 20. We use Monte Carlo
    approximation: sample random permutations and compute
    marginal contributions along each permutation.
    """

    def __init__(self, train_fn, eval_fn, train_data, test_data):
        """
        train_fn: function that trains a model on a subset
                  train_fn(subset) -> model
        eval_fn: function that evaluates a model
                 eval_fn(model, test_data) -> score
        """
        self.train_fn = train_fn
        self.eval_fn = eval_fn
        self.train_data = train_data
        self.test_data = test_data
        self.n = len(train_data)

    def compute_exact(self):
        """
        Exact Data Shapley (only for small datasets, n <= 15).
        Enumerates all subsets.
        """
        if self.n > 15:
            raise ValueError(
                f"Exact Shapley infeasible for n={self.n}. "
                f"Use compute_monte_carlo instead."
            )

        shapley_values = np.zeros(self.n)

        for i in range(self.n):
            others = list(range(self.n))
            others.remove(i)

            total_contribution = 0.0
            n_subsets = 0

            for k in range(len(others) + 1):
                for subset in itertools.combinations(others, k):
                    subset = list(subset)

                    # v(S)
                    subset_data = [self.train_data[j] for j in subset]
                    if subset_data:
                        model_without = self.train_fn(subset_data)
                        score_without = self.eval_fn(
                            model_without, self.test_data
                        )
                    else:
                        score_without = 0.0

                    # v(S union {i})
                    subset_with = subset + [i]
                    subset_with_data = [
                        self.train_data[j] for j in subset_with
                    ]
                    model_with = self.train_fn(subset_with_data)
                    score_with = self.eval_fn(
                        model_with, self.test_data
                    )

                    marginal = score_with - score_without
                    total_contribution += marginal
                    n_subsets += 1

            shapley_values[i] = total_contribution / n_subsets

        return shapley_values

    def compute_monte_carlo(self, n_permutations=1000,
                             early_stopping_threshold=0.01):
        """
        Monte Carlo approximation of Data Shapley.

        Sample random permutations pi of the training set.
        For each permutation, compute the marginal contribution
        of each example when it is added to the examples that
        precede it in the permutation.
        """
        shapley_estimates = np.zeros(self.n)
        shapley_counts = np.zeros(self.n)

        prev_estimates = None

        for perm_idx in range(n_permutations):
            # Random permutation
            perm = np.random.permutation(self.n)

            # Track running performance as we add examples
            current_subset = []
            prev_score = 0.0

            for j, idx in enumerate(perm):
                current_subset.append(self.train_data[idx])

                # Train on current subset and evaluate
                if len(current_subset) >= 1:
                    model = self.train_fn(current_subset)
                    score = self.eval_fn(model, self.test_data)
                else:
                    score = 0.0

                # Marginal contribution
                marginal = score - prev_score
                shapley_estimates[idx] += marginal
                shapley_counts[idx] += 1
                prev_score = score

            # Early stopping: check convergence
            if perm_idx > 0 and perm_idx % 50 == 0:
                current_means = (shapley_estimates
                                 / np.maximum(shapley_counts, 1))
                if prev_estimates is not None:
                    max_change = np.max(
                        np.abs(current_means - prev_estimates)
                    )
                    if max_change < early_stopping_threshold:
                        print(
                            f"Converged after {perm_idx} "
                            f"permutations"
                        )
                        break
                prev_estimates = current_means.copy()

        # Average contributions
        shapley_values = (shapley_estimates
                          / np.maximum(shapley_counts, 1))

        return {
            "values": shapley_values,
            "permutations_used": perm_idx + 1,
            "counts": shapley_counts,
        }

    def compute_knn_shapley(self, k=10):
        """
        KNN-Shapley: a fast approximation that uses K-nearest
        neighbors as a proxy for the full training process.

        Instead of retraining for every subset, use KNN
        classification and compute Shapley values analytically.
        This is O(N^2) instead of O(N * 2^N).
        """
        # Compute pairwise distances between train and test
        train_features = np.array([
            self._extract_features(x) for x in self.train_data
        ])
        test_features = np.array([
            self._extract_features(x) for x in self.test_data
        ])

        shapley_values = np.zeros(self.n)

        for test_idx in range(len(self.test_data)):
            # Compute distances to all training points
            distances = np.linalg.norm(
                train_features - test_features[test_idx], axis=1
            )

            # Sort by distance
            sorted_indices = np.argsort(distances)

            # KNN-Shapley value for each training point
            # Based on the analytical formula from Jia et al. 2019
            for rank, train_idx in enumerate(sorted_indices):
                if rank == 0:
                    # Nearest neighbor gets full credit
                    value = 1.0 / len(self.test_data)
                elif rank < k:
                    # Within k-NN: decreasing contribution
                    value = (1.0 / (rank + 1)
                             - 1.0 / (rank + 2))
                    value /= len(self.test_data)
                else:
                    value = 0.0

                shapley_values[train_idx] += value

        return shapley_values

    def _extract_features(self, example):
        """Extract a feature vector from an example."""
        # In production: use model embeddings
        # Placeholder: use hash-based features
        text = str(example)
        features = np.zeros(128)
        for i, char in enumerate(text[:128]):
            features[i] = ord(char) / 256.0
        return features

Data Shapley Computation Methods: Accuracy vs Speed

Metric 1001K10K100K1M
Exact Shapley
0.01
MC Shapley (1000 perms)
0.1
10
1000
KNN-Shapley
0.001
0.1
10
1000
TRAK attribution
0.001
0.01
0.5
50
5000

Membership Inference

Was This Example in the Training Data?

Membership inference determines whether a specific example was part of the model’s training set. This has direct legal implications: if you can prove a copyrighted work was used in training, the copyright holder may have a claim.

class MembershipInferenceAttack:
    """
    Determine whether a specific example was in the
    model's training set.

    Core idea: the model will have lower loss (higher
    confidence) on examples it was trained on compared
    to examples it has never seen.
    """

    def __init__(self, target_model, tokenizer):
        self.model = target_model
        self.tokenizer = tokenizer
        self.device = next(target_model.parameters()).device

    def compute_loss(self, text):
        """Compute the model's per-token loss on a text."""
        tokens = self.tokenizer.encode(
            text, return_tensors="pt"
        ).to(self.device)

        with torch.no_grad():
            outputs = self.model(
                input_ids=tokens, labels=tokens
            )

        return {
            "loss": outputs.loss.item(),
            "n_tokens": tokens.shape[1],
            "perplexity": torch.exp(outputs.loss).item(),
        }

    def loss_based_attack(self, text, threshold=None):
        """
        Simple loss-based membership inference.
        If the model's loss on the text is below a threshold,
        classify as "member" (was in training).

        Threshold should be calibrated on a held-out set
        of known members and non-members.
        """
        result = self.compute_loss(text)

        if threshold is None:
            # Default threshold: typical model perplexity
            # on training data is 5-15 for well-trained LLMs
            threshold = 10.0

        is_member = result["perplexity"] < threshold

        return {
            "is_member": is_member,
            "perplexity": result["perplexity"],
            "confidence": 1.0 - min(
                result["perplexity"] / (2 * threshold), 1.0
            ),
        }

    def reference_based_attack(self, text, reference_model):
        """
        Compare the target model's loss to a reference model's
        loss. The reference model should be trained on a
        similar but non-overlapping dataset.

        If the target model has much lower loss than the
        reference model, the text was likely in the
        target's training data.
        """
        target_result = self.compute_loss(text)

        # Compute reference model loss
        ref_tokens = self.tokenizer.encode(
            text, return_tensors="pt"
        ).to(self.device)
        with torch.no_grad():
            ref_outputs = reference_model(
                input_ids=ref_tokens, labels=ref_tokens
            )
        ref_loss = ref_outputs.loss.item()

        # The ratio of losses is the signal
        loss_ratio = target_result["loss"] / max(ref_loss, 1e-8)

        return {
            "is_member": loss_ratio < 0.7,
            "target_loss": target_result["loss"],
            "reference_loss": ref_loss,
            "loss_ratio": loss_ratio,
            "confidence": max(0, 1.0 - loss_ratio),
        }

    def min_k_percent_attack(self, text, k_percent=20):
        """
        Min-k% attack (Shi et al. 2023).

        Instead of using the average loss, use the average
        loss of the k% of tokens with the LOWEST probability.
        These are the tokens where memorization is most evident.

        Intuition: for memorized text, even the "surprising"
        tokens have relatively high probability because the
        model has seen them before.
        """
        tokens = self.tokenizer.encode(
            text, return_tensors="pt"
        ).to(self.device)

        with torch.no_grad():
            outputs = self.model(input_ids=tokens)
            logits = outputs.logits

        # Compute per-token log probabilities
        log_probs = torch.log_softmax(logits[:, :-1], dim=-1)
        target_tokens = tokens[:, 1:]

        # Get log prob for each actual next token
        token_log_probs = log_probs.gather(
            dim=-1,
            index=target_tokens.unsqueeze(-1),
        ).squeeze(-1)[0]

        # Sort and take the k% lowest (most surprising)
        n_tokens = token_log_probs.shape[0]
        k_count = max(1, int(n_tokens * k_percent / 100))

        sorted_probs, _ = torch.sort(token_log_probs)
        min_k_avg = sorted_probs[:k_count].mean().item()

        return {
            "min_k_avg_log_prob": min_k_avg,
            "n_tokens": n_tokens,
            "k_count": k_count,
            "k_percent": k_percent,
        }

    def calibrate_threshold(self, known_members, known_non_members):
        """
        Calibrate the membership threshold using a held-out
        set of known members and non-members.
        """
        member_losses = [
            self.compute_loss(text)["loss"]
            for text in known_members
        ]
        non_member_losses = [
            self.compute_loss(text)["loss"]
            for text in known_non_members
        ]

        # Find threshold that maximizes accuracy
        all_losses = (
            [(l, True) for l in member_losses]
            + [(l, False) for l in non_member_losses]
        )
        all_losses.sort()

        best_threshold = None
        best_accuracy = 0.0

        for loss, _ in all_losses:
            # Predict member if loss < threshold
            correct = sum(
                1 for l, is_member in all_losses
                if (l < loss) == is_member
            )
            accuracy = correct / len(all_losses)
            if accuracy > best_accuracy:
                best_accuracy = accuracy
                best_threshold = loss

        return {
            "threshold": best_threshold,
            "accuracy": best_accuracy,
            "member_mean_loss": np.mean(member_losses),
            "non_member_mean_loss": np.mean(non_member_losses),
            "separation": (np.mean(non_member_losses)
                           - np.mean(member_losses)),
        }
ℹ️ Note

The min-k% attack achieves 60-75% accuracy on detecting training data membership for models like Llama 2 and GPT-Neo. Accuracy increases with model size (larger models memorize more) and decreases with data duplication (if a text appears many times in pretraining, similar texts in the non-member set confound the signal).

@dataclass
class CopyrightAnalysis:
    work: str
    similarity_score: float
    membership_confidence: float
    influence_score: float
    fair_use_factors: dict
    risk_assessment: str

class CopyrightRiskAnalyzer:
    """
    Analyze copyright risk for model outputs that
    may be derived from copyrighted training data.

    The four factors of fair use (17 U.S.C. 107):
    1. Purpose and character of use (transformative?)
    2. Nature of the copyrighted work
    3. Amount and substantiality of the portion used
    4. Effect on the market for the original
    """

    def analyze(self, model_output, claimed_source,
                membership_result, influence_result):
        """
        Analyze copyright risk for a model output.
        """
        # Factor 1: Transformative use
        similarity = self._compute_text_similarity(
            model_output, claimed_source
        )
        is_transformative = similarity < 0.3

        # Factor 2: Nature of the work
        is_factual = self._classify_factual(claimed_source)

        # Factor 3: Amount used
        longest_match = self._longest_common_substring(
            model_output, claimed_source
        )
        amount_fraction = (longest_match
                           / max(len(claimed_source), 1))

        # Factor 4: Market effect
        is_substitute = similarity > 0.7

        fair_use_factors = {
            "transformative": is_transformative,
            "factual_work": is_factual,
            "amount_fraction": amount_fraction,
            "market_substitute": is_substitute,
        }

        # Risk assessment
        risk_score = 0.0
        if not is_transformative:
            risk_score += 0.3
        if not is_factual:
            risk_score += 0.1  # Creative works get more protection
        if amount_fraction > 0.1:
            risk_score += 0.3
        if is_substitute:
            risk_score += 0.3

        if risk_score > 0.6:
            risk = "HIGH"
        elif risk_score > 0.3:
            risk = "MEDIUM"
        else:
            risk = "LOW"

        return CopyrightAnalysis(
            work=claimed_source[:100],
            similarity_score=similarity,
            membership_confidence=membership_result.get(
                "confidence", 0
            ),
            influence_score=influence_result.get(
                "influence_score", 0
            ),
            fair_use_factors=fair_use_factors,
            risk_assessment=risk,
        )

    def _compute_text_similarity(self, text_a, text_b):
        """Compute n-gram overlap similarity."""
        # 4-gram overlap as a proxy for textual similarity
        def get_ngrams(text, n=4):
            words = text.lower().split()
            return set(
                tuple(words[i:i + n])
                for i in range(len(words) - n + 1)
            )

        ngrams_a = get_ngrams(text_a)
        ngrams_b = get_ngrams(text_b)

        if not ngrams_a or not ngrams_b:
            return 0.0

        overlap = len(ngrams_a & ngrams_b)
        return overlap / max(len(ngrams_a), len(ngrams_b))

    def _classify_factual(self, text):
        """Classify whether text is factual or creative."""
        creative_indicators = [
            "novel", "story", "chapter", "she said",
            "he said", "once upon", "the end",
        ]
        text_lower = text.lower()
        creative_count = sum(
            1 for ind in creative_indicators if ind in text_lower
        )
        return creative_count < 2

    def _longest_common_substring(self, text_a, text_b):
        """Find length of longest common substring."""
        words_a = text_a.lower().split()
        words_b = text_b.lower().split()

        max_len = 0
        for i in range(len(words_a)):
            for j in range(len(words_b)):
                k = 0
                while (i + k < len(words_a)
                       and j + k < len(words_b)
                       and words_a[i + k] == words_b[j + k]):
                    k += 1
                max_len = max(max_len, k)

        return max_len

Practical Attribution Pipeline

End-to-End Attribution System

class AttributionPipeline:
    """
    Complete attribution pipeline that combines influence
    functions, membership inference, and copyright analysis.
    """

    def __init__(self, model, tokenizer, train_dataset):
        self.model = model
        self.tokenizer = tokenizer
        self.influence = InfluenceFunction(model, train_dataset)
        self.ranker = InfluenceRanker(self.influence)
        self.membership = MembershipInferenceAttack(
            model, tokenizer
        )
        self.copyright = CopyrightRiskAnalyzer()

    def attribute_output(self, model_output, k=10):
        """
        Full attribution analysis for a model output.

        1. Find top-k influential training examples
        2. Check membership of the output text itself
        3. Assess copyright risk for each influential source
        """
        # Convert output to example format
        test_example = self._text_to_example(model_output)

        # Find influential training examples
        top_influences = self.ranker.find_top_k(test_example, k=k)

        # Membership check on the output
        output_membership = self.membership.min_k_percent_attack(
            model_output
        )

        # Copyright risk for each influential source
        results = []
        for inf in top_influences:
            train_text = self._get_train_text(inf["train_idx"])
            membership = self.membership.loss_based_attack(
                train_text
            )

            copyright_risk = self.copyright.analyze(
                model_output, train_text, membership, inf
            )

            results.append({
                "train_idx": inf["train_idx"],
                "influence_score": inf["influence_score"],
                "train_text_preview": train_text[:200],
                "membership": membership,
                "copyright_risk": copyright_risk.risk_assessment,
                "similarity": copyright_risk.similarity_score,
            })

        return {
            "model_output": model_output[:500],
            "output_memorization_signal": output_membership,
            "top_influences": results,
            "overall_risk": self._assess_overall_risk(results),
        }

    def _text_to_example(self, text):
        """Convert text to model input format."""
        tokens = self.tokenizer.encode(
            text, return_tensors="pt"
        )
        return {
            "input_ids": tokens.squeeze(0),
            "labels": tokens.squeeze(0),
        }

    def _get_train_text(self, idx):
        """Get the text of a training example."""
        example = self.influence.train_dataset[idx]
        return self.tokenizer.decode(example["input_ids"])

    def _assess_overall_risk(self, results):
        """Assess overall attribution risk."""
        high_risk = sum(
            1 for r in results
            if r["copyright_risk"] == "HIGH"
        )
        if high_risk > 0:
            return "HIGH"
        medium_risk = sum(
            1 for r in results
            if r["copyright_risk"] == "MEDIUM"
        )
        if medium_risk > len(results) * 0.3:
            return "MEDIUM"
        return "LOW"

Attribution Accuracy by Method and Model Size

Metric 125M1.3B6.7B13B70B
Influence Functions (LiSSA)
82
78
72
68
60
TRAK
85
83
80
77
73
Membership Inference (min-k%)
58
62
67
70
74

Key Takeaways

Data attribution connects model outputs to training inputs. The three core techniques — influence functions, Data Shapley, and membership inference — answer different questions: which training examples matter most (influence), how to fairly credit each contributor (Shapley), and whether a specific example was used (membership).

The technical reality for LLMs:

  1. Influence functions are approximate: For models with billions of parameters, exact Hessian inversion is impossible. LiSSA, random projection, and TRAK provide practical approximations with 70-85% accuracy relative to exact leave-one-out retraining.

  2. Data Shapley is expensive: Even Monte Carlo approximation requires hundreds of retraining runs. KNN-Shapley and proxy models reduce cost but sacrifice accuracy. For large-scale attribution, TRAK (Training Data Attribution via Random Features) is currently the best cost/accuracy tradeoff.

  3. Membership inference improves with scale: Larger models memorize more, making membership detection easier. The min-k% attack is the current state-of-the-art, achieving 60-75% accuracy on modern LLMs.

  4. Legal landscape is evolving: Current copyright cases (NYT v. OpenAI, etc.) are testing whether training on copyrighted data constitutes fair use. Attribution tools provide the evidence base for these cases. The ability to prove (or disprove) that specific training data influenced specific outputs is becoming legally significant.

  5. Attribution enables data markets: If you can quantify how much a data provider’s contribution improved model quality, you can compensate them proportionally. Data Shapley provides the mathematical framework for this, though computational cost limits its practical scale.