Part of Series The Dataset Frontier 18 of 27
1 Synthetic Data Pipelines: Magpie, Nemotron-4, and Generating Training Data at Scale 2 Data Curation at Scale: DCLM, FineWeb-Edu, and the Exact Heuristics That Filter the Web 3 Agent-Based Simulation: Using 10,000 AI Agents to Generate Synthetic Training Data 4 Code Dataset Curation: Deduplication, License Filtering, and Quality Scoring for LLM Training 5 Multilingual Data: Cross-Lingual Transfer, Low-Resource Languages, and Translation Quality 6 Instruction Tuning Data: ShareGPT, OpenAssistant, and Quality Metrics for Alignment 7 Preference Data: Building DPO/RLHF Datasets from Human and AI Feedback 8 Data Mixing: Optimal Proportions of Code, Math, Web, and Books for LLM Training 9 Evaluation Datasets: Building Benchmarks That Actually Measure LLM Capability 10 Data Contamination: Detecting and Preventing Benchmark Leakage in Training Data 11 The Data Scaling Law: How Much Data Is Enough, and What Happens When You Run Out 12 Training a Tokenizer from Scratch: BPE Merge Rules, Vocabulary Optimization, and Compression Ratio 13 Multimodal Training Data: Image-Text Pairs, Video Captioning, and Interleaved Document Formats 14 RLHF Data at Scale: Collecting Millions of Human Preferences with Minimal Cost 15 Building a Decontamination Pipeline: Removing Benchmark Data from Training Corpora 16 Safety Training Data: Red Teaming, Refusal Training, and Building Datasets for Harmless AI 17 Data Versioning and Reproducibility: Tracking What Changed Between Training Runs 18 Domain-Specific Data: Building Medical, Legal, and Financial Training Datasets 19 Data Attribution and Provenance: Tracing Model Outputs Back to Training Examples 20 The Data Flywheel: Using Production Logs to Continuously Improve Training Data 21 Reward Model Training Data: Building Datasets for Math Verification and Code Correctness 22 Long-Context Training Data: Book-Length Documents, Multi-Document QA, and Needle-in-Haystack 23 Agentic Interaction Data: Tool Use Traces, Multi-Step Planning Logs, and Environment Feedback 24 Data Labeling Platforms: Scale AI, Surge AI, and Building Your Own Annotation Pipeline 25 Data Legal Issues: Copyright, Fair Use, Opt-Out, and the Regulatory Landscape for Training Data 26 Data Pipeline at Scale: Spark, Ray, and Processing 15 Trillion Tokens Across 1000 Nodes 27 Building a Data Pipeline: From Raw HTML to Clean Training Tokens in 500 Lines

A 4Mtrainingrunfailstoreproducethepreviouscheckpointsquality.Thehyperparametersmatch.Thearchitecturematches.Butthetrainingdataisunknowableitwasassembledfromsixsources,filteredthroughthreequalityclassifiers,deduplicatedwithMinHash,andsampledwitharandomseedthatnobodyrecorded.Withoutdataversioning,youcannotdebugthisregression.Youcanonlyguess,rerun,andburnanother4M training run fails to reproduce the previous checkpoint's quality. The hyperparameters match. The architecture matches. But the training data is unknowable — it was assembled from six sources, filtered through three quality classifiers, deduplicated with MinHash, and sampled with a random seed that nobody recorded. Without data versioning, you cannot debug this regression. You can only guess, rerun, and burn another 4M. Data versioning is not a nice-to-have; it is mandatory infrastructure when training runs cost more than most engineers’ annual salaries.

This post covers the complete data versioning stack: content-addressable storage for large datasets, efficient change detection, metadata tracking, integration with training pipelines, and production-scale implementation.

The Versioning Problem for Training Data

Why Git Alone Fails

Git tracks file changes efficiently through delta compression and content-addressable storage. But training datasets are fundamentally different from source code:

from dataclasses import dataclass

@dataclass
class VersioningChallenge:
    problem: str
    git_limit: str
    required_solution: str

CHALLENGES = [
    VersioningChallenge(
        problem="Dataset size (1-50 TB for pretraining)",
        git_limit="Git stores full snapshots in .git, "
                  "cloning a 50TB repo is impractical",
        required_solution="Content-addressable storage with "
                          "lazy fetching (download on demand)",
    ),
    VersioningChallenge(
        problem="Binary formats (Parquet, Arrow, safetensors)",
        git_limit="Git diff cannot meaningfully diff binary files, "
                  "delta compression is poor on binary data",
        required_solution="Semantic diffing: compare schemas, "
                          "row counts, column stats, sample rows",
    ),
    VersioningChallenge(
        problem="Streaming datasets (continuously updated)",
        git_limit="Git commits are point-in-time snapshots, "
                  "no concept of append-only streams",
        required_solution="Append-only log with checkpoint commits "
                          "at significant points",
    ),
    VersioningChallenge(
        problem="Data lineage (this row came from CommonCrawl "
                "dump 2024-09, filtered by quality score > 0.8)",
        git_limit="Git tracks file-level changes, not "
                  "row-level provenance",
        required_solution="Per-row metadata with source tracking "
                          "and transformation history",
    ),
    VersioningChallenge(
        problem="Distributed teams (50 people contributing data)",
        git_limit="Git merge conflicts on binary data are unresolvable",
        required_solution="Partition-level locking with "
                          "append-only semantics per partition",
    ),
]
📊

Storage Backend Comparison for Dataset Versioning

BackendMax Dataset SizeClone Time (1TB)Diff SpeedStreaming Support
Git (bare) ~5 GB practical N/A Line-level No
Git LFS ~100 GB practical ~2 hours File-level only No
DVC + S3 Unlimited Seconds (lazy) File-level + stats Partial
LakeFS Unlimited Seconds (branching) Row-level Yes
Delta Lake Unlimited N/A (query) Row-level Yes
HF Datasets ~5 TB practical Minutes (streaming) Commit-level Yes
Note: Clone time for DVC/LakeFS is fast because they use lazy fetching -- only metadata is fetched initially.

Content-Addressable Dataset Storage

Hashing for Integrity

Every dataset version is identified by a cryptographic hash of its contents. This provides three guarantees: integrity (any modification changes the hash), deduplication (identical data stored once), and immutability (a hash always refers to the same data).

import hashlib
import struct
import os
from pathlib import Path
from typing import Iterator

class DatasetHasher:
    """
    Content-addressable hashing for datasets.
    Supports both file-level and chunk-level hashing
    for efficient deduplication.
    """

    def __init__(self, algorithm="sha256", chunk_size=64 * 1024 * 1024):
        self.algorithm = algorithm
        self.chunk_size = chunk_size  # 64MB chunks

    def hash_file(self, path):
        """
        Hash a single file. For large files, this reads
        in chunks to avoid loading the entire file into memory.
        """
        h = hashlib.new(self.algorithm)
        file_size = os.path.getsize(path)

        with open(path, "rb") as f:
            while True:
                chunk = f.read(self.chunk_size)
                if not chunk:
                    break
                h.update(chunk)

        return {
            "hash": h.hexdigest(),
            "algorithm": self.algorithm,
            "size": file_size,
            "path": str(path),
        }

    def hash_directory(self, directory):
        """
        Hash an entire dataset directory.
        The directory hash is computed from the sorted list
        of (path, hash) pairs, making it deterministic
        regardless of filesystem ordering.
        """
        directory = Path(directory)
        file_hashes = []

        for path in sorted(directory.rglob("*")):
            if path.is_file():
                rel_path = path.relative_to(directory)
                file_hash = self.hash_file(path)
                file_hashes.append((str(rel_path), file_hash["hash"]))

        # Hash the sorted list of (path, hash) pairs
        h = hashlib.new(self.algorithm)
        for rel_path, file_hash in file_hashes:
            h.update(rel_path.encode("utf-8"))
            h.update(file_hash.encode("utf-8"))

        return {
            "directory_hash": h.hexdigest(),
            "file_count": len(file_hashes),
            "file_hashes": dict(file_hashes),
        }

    def hash_streaming(self, data_iterator):
        """
        Hash a streaming dataset without materializing it.
        Each record is serialized and fed to the hasher.
        """
        h = hashlib.new(self.algorithm)
        record_count = 0

        for record in data_iterator:
            # Serialize record deterministically
            serialized = self._deterministic_serialize(record)
            h.update(serialized)
            record_count += 1

        return {
            "hash": h.hexdigest(),
            "record_count": record_count,
        }

    def _deterministic_serialize(self, record):
        """
        Serialize a record in a deterministic way.
        Dict keys are sorted, floats are rounded to avoid
        platform-dependent precision differences.
        """
        if isinstance(record, dict):
            items = sorted(record.items())
            parts = []
            for key, value in items:
                parts.append(
                    self._deterministic_serialize(key)
                    + b":"
                    + self._deterministic_serialize(value)
                )
            return b"{" + b",".join(parts) + b"}"
        elif isinstance(record, str):
            encoded = record.encode("utf-8")
            return struct.pack(">I", len(encoded)) + encoded
        elif isinstance(record, (int, float)):
            return str(record).encode("utf-8")
        elif isinstance(record, bytes):
            return struct.pack(">I", len(record)) + record
        elif isinstance(record, (list, tuple)):
            parts = [self._deterministic_serialize(item)
                     for item in record]
            return b"[" + b",".join(parts) + b"]"
        else:
            return str(record).encode("utf-8")

Content-Addressable Store

import json
import shutil
from datetime import datetime

class ContentAddressableStore:
    """
    Store dataset files by their content hash.
    Similar to how Git stores objects, but optimized
    for large binary files.

    Storage layout:
    store/
      objects/
        ab/cdef1234...  (first 2 chars as directory)
      refs/
        datasets/
          my-dataset/
            v1.json  (points to object hashes)
            v2.json
      manifests/
        abc123.json  (maps version to file hashes)
    """

    def __init__(self, store_path):
        self.store_path = Path(store_path)
        self.objects_dir = self.store_path / "objects"
        self.refs_dir = self.store_path / "refs" / "datasets"
        self.manifests_dir = self.store_path / "manifests"

        for d in [self.objects_dir, self.refs_dir, self.manifests_dir]:
            d.mkdir(parents=True, exist_ok=True)

        self.hasher = DatasetHasher()

    def store_file(self, source_path):
        """
        Store a file by its content hash.
        Returns the hash. If the file already exists
        (same content), this is a no-op.
        """
        file_info = self.hasher.hash_file(source_path)
        content_hash = file_info["hash"]

        # Store in objects directory with two-level hashing
        obj_dir = self.objects_dir / content_hash[:2]
        obj_path = obj_dir / content_hash[2:]

        if not obj_path.exists():
            obj_dir.mkdir(parents=True, exist_ok=True)
            shutil.copy2(source_path, obj_path)

        return content_hash

    def store_dataset(self, dataset_dir, name, version):
        """
        Store an entire dataset directory.
        Creates a manifest mapping filenames to content hashes.
        """
        dataset_dir = Path(dataset_dir)
        manifest = {
            "name": name,
            "version": version,
            "created": datetime.utcnow().isoformat(),
            "files": {},
        }

        total_size = 0
        for path in sorted(dataset_dir.rglob("*")):
            if path.is_file():
                rel_path = str(path.relative_to(dataset_dir))
                content_hash = self.store_file(path)
                file_size = os.path.getsize(path)
                manifest["files"][rel_path] = {
                    "hash": content_hash,
                    "size": file_size,
                }
                total_size += file_size

        manifest["total_size"] = total_size
        manifest["file_count"] = len(manifest["files"])

        # Hash the manifest itself for the version reference
        manifest_hash = hashlib.sha256(
            json.dumps(manifest, sort_keys=True).encode()
        ).hexdigest()
        manifest["manifest_hash"] = manifest_hash

        # Save manifest
        manifest_path = self.manifests_dir / f"{manifest_hash}.json"
        with open(manifest_path, "w") as f:
            json.dump(manifest, f, indent=2)

        # Update version reference
        ref_dir = self.refs_dir / name
        ref_dir.mkdir(parents=True, exist_ok=True)
        ref_path = ref_dir / f"{version}.json"
        with open(ref_path, "w") as f:
            json.dump({
                "manifest_hash": manifest_hash,
                "version": version,
                "created": manifest["created"],
            }, f, indent=2)

        return manifest

    def restore_dataset(self, name, version, target_dir):
        """
        Restore a dataset version to a target directory.
        """
        target_dir = Path(target_dir)
        target_dir.mkdir(parents=True, exist_ok=True)

        # Load version reference
        ref_path = self.refs_dir / name / f"{version}.json"
        with open(ref_path) as f:
            ref = json.load(f)

        # Load manifest
        manifest_path = self.manifests_dir / f"{ref['manifest_hash']}.json"
        with open(manifest_path) as f:
            manifest = json.load(f)

        # Restore files
        for rel_path, file_info in manifest["files"].items():
            content_hash = file_info["hash"]
            obj_path = (self.objects_dir / content_hash[:2]
                        / content_hash[2:])
            dest_path = target_dir / rel_path
            dest_path.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy2(obj_path, dest_path)

        return manifest

Efficient Change Detection

Diffing Dataset Versions

When a training run regresses, the first question is: what changed in the data? A dataset diff compares two versions at multiple levels: file-level (which files were added/removed/modified), schema-level (did columns change), statistics-level (did distributions shift), and row-level (which specific rows changed).

from collections import Counter

@dataclass
class DatasetDiff:
    version_a: str
    version_b: str
    files_added: list
    files_removed: list
    files_modified: list
    files_unchanged: list
    total_size_change: int
    row_count_change: int
    schema_changes: list
    statistics_changes: dict

class DatasetDiffer:
    """
    Compute diffs between two dataset versions.
    Uses content hashes for fast file-level comparison,
    then drills into modified files for detailed diffs.
    """

    def __init__(self, store):
        self.store = store

    def diff(self, name, version_a, version_b):
        """
        Compute the diff between two dataset versions.
        """
        manifest_a = self._load_manifest(name, version_a)
        manifest_b = self._load_manifest(name, version_b)

        files_a = set(manifest_a["files"].keys())
        files_b = set(manifest_b["files"].keys())

        added = files_b - files_a
        removed = files_a - files_b
        common = files_a & files_b

        modified = []
        unchanged = []
        for f in common:
            hash_a = manifest_a["files"][f]["hash"]
            hash_b = manifest_b["files"][f]["hash"]
            if hash_a != hash_b:
                modified.append(f)
            else:
                unchanged.append(f)

        # Compute size change
        size_a = manifest_a.get("total_size", 0)
        size_b = manifest_b.get("total_size", 0)

        # Compute detailed stats for modified files
        stats_changes = {}
        for f in modified:
            stats_changes[f] = self._compute_file_stats_diff(
                manifest_a["files"][f],
                manifest_b["files"][f],
            )

        return DatasetDiff(
            version_a=version_a,
            version_b=version_b,
            files_added=sorted(added),
            files_removed=sorted(removed),
            files_modified=sorted(modified),
            files_unchanged=sorted(unchanged),
            total_size_change=size_b - size_a,
            row_count_change=0,  # Computed per-file
            schema_changes=[],   # Computed per-file
            statistics_changes=stats_changes,
        )

    def diff_detailed(self, name, version_a, version_b,
                      sample_rows=100):
        """
        Compute a detailed diff including row-level changes
        for modified files. This is more expensive because
        it reads actual file contents.
        """
        basic_diff = self.diff(name, version_a, version_b)

        detailed = {
            "basic": basic_diff,
            "modified_file_details": {},
        }

        for f in basic_diff.files_modified:
            # Load both versions of the file
            data_a = self._load_file_data(name, version_a, f)
            data_b = self._load_file_data(name, version_b, f)

            detail = {
                "row_count_a": len(data_a),
                "row_count_b": len(data_b),
                "row_count_change": len(data_b) - len(data_a),
            }

            # Schema comparison
            if data_a and data_b:
                cols_a = set(data_a[0].keys()) if isinstance(data_a[0], dict) else set()
                cols_b = set(data_b[0].keys()) if isinstance(data_b[0], dict) else set()
                detail["columns_added"] = list(cols_b - cols_a)
                detail["columns_removed"] = list(cols_a - cols_b)

            # Statistical comparison
            detail["stats"] = self._compare_column_stats(
                data_a, data_b
            )

            # Sample of changed rows
            detail["sample_changes"] = self._sample_row_changes(
                data_a, data_b, sample_rows
            )

            detailed["modified_file_details"][f] = detail

        return detailed

    def _load_manifest(self, name, version):
        """Load a version manifest."""
        ref_path = self.store.refs_dir / name / f"{version}.json"
        with open(ref_path) as f:
            ref = json.load(f)
        manifest_path = (self.store.manifests_dir
                         / f"{ref['manifest_hash']}.json")
        with open(manifest_path) as f:
            return json.load(f)

    def _compute_file_stats_diff(self, info_a, info_b):
        """Compare basic file stats."""
        return {
            "size_change": info_b["size"] - info_a["size"],
            "size_change_pct": (
                (info_b["size"] - info_a["size"])
                / max(info_a["size"], 1) * 100
            ),
        }

    def _load_file_data(self, name, version, filename):
        """Load actual file data for detailed comparison."""
        # In production: read Parquet/Arrow files
        return []

    def _compare_column_stats(self, data_a, data_b):
        """Compare column-level statistics between versions."""
        stats = {}
        if not data_a or not data_b:
            return stats
        if not isinstance(data_a[0], dict):
            return stats

        for col in data_a[0].keys():
            vals_a = [row.get(col) for row in data_a
                      if col in row]
            vals_b = [row.get(col) for row in data_b
                      if col in row]

            if vals_a and vals_b:
                if isinstance(vals_a[0], (int, float)):
                    stats[col] = {
                        "mean_a": sum(vals_a) / len(vals_a),
                        "mean_b": sum(vals_b) / len(vals_b),
                        "min_a": min(vals_a),
                        "min_b": min(vals_b),
                        "max_a": max(vals_a),
                        "max_b": max(vals_b),
                    }
                elif isinstance(vals_a[0], str):
                    len_a = [len(v) for v in vals_a]
                    len_b = [len(v) for v in vals_b]
                    stats[col] = {
                        "avg_length_a": sum(len_a) / len(len_a),
                        "avg_length_b": sum(len_b) / len(len_b),
                        "unique_a": len(set(vals_a)),
                        "unique_b": len(set(vals_b)),
                    }
        return stats

    def _sample_row_changes(self, data_a, data_b, n):
        """Sample rows that differ between versions."""
        # Simple: hash each row, find non-matching hashes
        hasher = DatasetHasher()
        hashes_a = set()
        for row in data_a:
            h = hashlib.sha256(str(sorted(row.items())).encode()
                               if isinstance(row, dict)
                               else str(row).encode()).hexdigest()
            hashes_a.add(h)

        new_rows = []
        for row in data_b:
            h = hashlib.sha256(str(sorted(row.items())).encode()
                               if isinstance(row, dict)
                               else str(row).encode()).hexdigest()
            if h not in hashes_a:
                new_rows.append(row)
                if len(new_rows) >= n:
                    break

        return new_rows
ℹ️ Note

For datasets stored in Parquet format, column-level statistics (min, max, count, null count) are stored in the Parquet footer. You can compare these statistics between versions without reading any row data, making file-level statistical diffs nearly instantaneous even for multi-terabyte datasets.

Metadata and Lineage Tracking

Tracking Data Transformations

Training data goes through multiple transformations: filtering, deduplication, quality scoring, tokenization. Each transformation modifies the dataset. The lineage tracker records every transformation so you can reconstruct the exact pipeline that produced any dataset version.

from datetime import datetime
import uuid

@dataclass
class TransformRecord:
    transform_id: str
    transform_type: str  # "filter", "dedup", "score", "tokenize"
    input_version: str
    output_version: str
    parameters: dict
    timestamp: str
    input_row_count: int
    output_row_count: int
    rows_removed: int
    rows_added: int
    duration_seconds: float

class LineageTracker:
    """
    Track the lineage of dataset transformations.
    Every operation on a dataset is recorded, creating
    a DAG (directed acyclic graph) of transformations
    from raw sources to final training data.
    """

    def __init__(self, store_path):
        self.store_path = Path(store_path)
        self.lineage_dir = self.store_path / "lineage"
        self.lineage_dir.mkdir(parents=True, exist_ok=True)
        self.records = []

    def record_transform(self, transform_type, input_version,
                          output_version, parameters,
                          input_count, output_count, duration):
        """Record a dataset transformation."""
        record = TransformRecord(
            transform_id=str(uuid.uuid4())[:8],
            transform_type=transform_type,
            input_version=input_version,
            output_version=output_version,
            parameters=parameters,
            timestamp=datetime.utcnow().isoformat(),
            input_row_count=input_count,
            output_row_count=output_count,
            rows_removed=max(0, input_count - output_count),
            rows_added=max(0, output_count - input_count),
            duration_seconds=duration,
        )
        self.records.append(record)
        self._persist_record(record)
        return record

    def get_lineage(self, version):
        """
        Trace the full lineage of a dataset version.
        Returns the chain of transformations from raw source
        to the specified version.
        """
        chain = []
        current = version

        # Walk backwards through the transformation graph
        while True:
            record = self._find_record_producing(current)
            if record is None:
                break
            chain.append(record)
            current = record.input_version

        chain.reverse()
        return chain

    def get_lineage_summary(self, version):
        """
        Human-readable summary of dataset lineage.
        """
        chain = self.get_lineage(version)
        summary_lines = [f"Lineage for version: {version}"]
        summary_lines.append(f"Total transformations: {len(chain)}")
        summary_lines.append("")

        for i, record in enumerate(chain):
            summary_lines.append(
                f"Step {i + 1}: {record.transform_type}"
            )
            summary_lines.append(
                f"  Input: {record.input_version} "
                f"({record.input_row_count:,} rows)"
            )
            summary_lines.append(
                f"  Output: {record.output_version} "
                f"({record.output_row_count:,} rows)"
            )
            if record.rows_removed > 0:
                pct = (record.rows_removed
                       / record.input_row_count * 100)
                summary_lines.append(
                    f"  Removed: {record.rows_removed:,} rows "
                    f"({pct:.1f}%)"
                )
            summary_lines.append(
                f"  Duration: {record.duration_seconds:.1f}s"
            )
            summary_lines.append(
                f"  Parameters: {json.dumps(record.parameters)}"
            )
            summary_lines.append("")

        return "\n".join(summary_lines)

    def _find_record_producing(self, version):
        """Find the transform that produced a version."""
        for record in reversed(self.records):
            if record.output_version == version:
                return record
        return None

    def _persist_record(self, record):
        """Save record to disk."""
        path = self.lineage_dir / f"{record.transform_id}.json"
        with open(path, "w") as f:
            json.dump({
                "transform_id": record.transform_id,
                "transform_type": record.transform_type,
                "input_version": record.input_version,
                "output_version": record.output_version,
                "parameters": record.parameters,
                "timestamp": record.timestamp,
                "input_row_count": record.input_row_count,
                "output_row_count": record.output_row_count,
                "rows_removed": record.rows_removed,
                "rows_added": record.rows_added,
                "duration_seconds": record.duration_seconds,
            }, f, indent=2)

Training Run Manifest

Binding Data Versions to Training Runs

Every training run should produce a manifest that records exactly which dataset versions were used. This manifest is the key artifact for reproducibility.

@dataclass
class TrainingRunManifest:
    run_id: str
    model_name: str
    started_at: str
    completed_at: str
    datasets: dict      # name -> version mapping
    hyperparameters: dict
    hardware: dict
    git_commit: str     # Code version
    metrics: dict       # Final evaluation metrics
    data_hashes: dict   # dataset name -> content hash

class TrainingRunTracker:
    """
    Track the complete specification of a training run.
    This is the artifact that enables exact reproducibility.
    """

    def __init__(self, store):
        self.store = store
        self.runs_dir = store.store_path / "runs"
        self.runs_dir.mkdir(parents=True, exist_ok=True)

    def start_run(self, run_id, model_name, datasets,
                  hyperparameters, hardware, git_commit):
        """
        Record the start of a training run.
        Captures all dataset versions before training begins.
        """
        # Capture exact dataset hashes
        data_hashes = {}
        for name, version in datasets.items():
            manifest = self._load_dataset_manifest(name, version)
            if manifest:
                data_hashes[name] = manifest.get("manifest_hash", "")

        run_manifest = TrainingRunManifest(
            run_id=run_id,
            model_name=model_name,
            started_at=datetime.utcnow().isoformat(),
            completed_at="",
            datasets=datasets,
            hyperparameters=hyperparameters,
            hardware=hardware,
            git_commit=git_commit,
            metrics={},
            data_hashes=data_hashes,
        )

        self._save_manifest(run_manifest)
        return run_manifest

    def complete_run(self, run_id, metrics):
        """Record the completion of a training run."""
        manifest = self._load_run_manifest(run_id)
        manifest.completed_at = datetime.utcnow().isoformat()
        manifest.metrics = metrics
        self._save_manifest(manifest)
        return manifest

    def compare_runs(self, run_id_a, run_id_b):
        """
        Compare two training runs to identify differences.
        This is the debugging tool: when run B is worse than
        run A, what changed?
        """
        manifest_a = self._load_run_manifest(run_id_a)
        manifest_b = self._load_run_manifest(run_id_b)

        comparison = {
            "data_changes": {},
            "hyperparameter_changes": {},
            "hardware_changes": {},
            "metric_changes": {},
        }

        # Compare datasets
        all_datasets = set(
            list(manifest_a.datasets.keys())
            + list(manifest_b.datasets.keys())
        )
        for name in all_datasets:
            ver_a = manifest_a.datasets.get(name)
            ver_b = manifest_b.datasets.get(name)
            if ver_a != ver_b:
                comparison["data_changes"][name] = {
                    "run_a": ver_a,
                    "run_b": ver_b,
                }
                # If both versions exist, compute the diff
                if ver_a and ver_b:
                    differ = DatasetDiffer(self.store)
                    diff = differ.diff(name, ver_a, ver_b)
                    comparison["data_changes"][name]["diff"] = {
                        "files_added": len(diff.files_added),
                        "files_removed": len(diff.files_removed),
                        "files_modified": len(diff.files_modified),
                        "size_change": diff.total_size_change,
                    }

        # Compare hyperparameters
        all_params = set(
            list(manifest_a.hyperparameters.keys())
            + list(manifest_b.hyperparameters.keys())
        )
        for param in all_params:
            val_a = manifest_a.hyperparameters.get(param)
            val_b = manifest_b.hyperparameters.get(param)
            if val_a != val_b:
                comparison["hyperparameter_changes"][param] = {
                    "run_a": val_a,
                    "run_b": val_b,
                }

        # Compare metrics
        all_metrics = set(
            list(manifest_a.metrics.keys())
            + list(manifest_b.metrics.keys())
        )
        for metric in all_metrics:
            val_a = manifest_a.metrics.get(metric, 0)
            val_b = manifest_b.metrics.get(metric, 0)
            comparison["metric_changes"][metric] = {
                "run_a": val_a,
                "run_b": val_b,
                "change": val_b - val_a if isinstance(val_a, (int, float)) else "N/A",
            }

        return comparison

    def _load_dataset_manifest(self, name, version):
        """Load dataset manifest."""
        ref_path = self.store.refs_dir / name / f"{version}.json"
        if not ref_path.exists():
            return None
        with open(ref_path) as f:
            ref = json.load(f)
        manifest_path = (self.store.manifests_dir
                         / f"{ref['manifest_hash']}.json")
        with open(manifest_path) as f:
            return json.load(f)

    def _load_run_manifest(self, run_id):
        """Load a training run manifest."""
        path = self.runs_dir / f"{run_id}.json"
        with open(path) as f:
            data = json.load(f)
        return TrainingRunManifest(**data)

    def _save_manifest(self, manifest):
        """Save a training run manifest."""
        path = self.runs_dir / f"{manifest.run_id}.json"
        with open(path, "w") as f:
            json.dump({
                "run_id": manifest.run_id,
                "model_name": manifest.model_name,
                "started_at": manifest.started_at,
                "completed_at": manifest.completed_at,
                "datasets": manifest.datasets,
                "hyperparameters": manifest.hyperparameters,
                "hardware": manifest.hardware,
                "git_commit": manifest.git_commit,
                "metrics": manifest.metrics,
                "data_hashes": manifest.data_hashes,
            }, f, indent=2)
💡 Tip

The most common debugging pattern: run B regresses on a benchmark, compare_runs(A, B) shows a data version change, diff_detailed(dataset, v1, v2) shows that a filtering step was tightened, removing 5% of math-related training examples. The benchmark regression was on GSM8K. Cause identified.

DVC Integration

Using DVC for Dataset Versioning

DVC (Data Version Control) is the most widely used tool for versioning ML datasets. It stores pointers (.dvc files) in Git while the actual data lives in remote storage (S3, GCS, Azure Blob).

import subprocess

class DVCManager:
    """
    Manage dataset versioning through DVC.
    Wraps DVC CLI commands with Python interfaces.
    """

    def __init__(self, repo_path):
        self.repo_path = Path(repo_path)

    def init(self):
        """Initialize DVC in the repo."""
        self._run_dvc("init")

    def add_remote(self, name, url):
        """Add a DVC remote storage backend."""
        self._run_dvc(f"remote add {name} {url}")

    def track_dataset(self, dataset_path):
        """
        Start tracking a dataset with DVC.
        This creates a .dvc file that Git tracks,
        while the actual data is managed by DVC.
        """
        self._run_dvc(f"add {dataset_path}")

        # The .dvc file contains the md5 hash of the data
        dvc_file = Path(f"{dataset_path}.dvc")
        return self._parse_dvc_file(dvc_file)

    def push(self, remote="origin"):
        """Push data to remote storage."""
        self._run_dvc(f"push -r {remote}")

    def pull(self, remote="origin"):
        """Pull data from remote storage."""
        self._run_dvc(f"pull -r {remote}")

    def checkout(self, git_ref):
        """
        Check out a specific version of the data.
        First checkout the Git ref (which updates .dvc files),
        then DVC checkout (which updates the actual data files).
        """
        # Git checkout updates .dvc pointer files
        subprocess.run(
            ["git", "checkout", git_ref],
            cwd=self.repo_path, check=True,
        )
        # DVC checkout updates actual data to match pointers
        self._run_dvc("checkout")

    def diff(self, rev_a, rev_b):
        """Compare data between two Git revisions."""
        result = self._run_dvc(f"diff {rev_a} {rev_b}")
        return result

    def get_data_hash(self, dataset_path):
        """Get the current content hash of tracked data."""
        dvc_file = Path(f"{dataset_path}.dvc")
        if dvc_file.exists():
            return self._parse_dvc_file(dvc_file)
        return None

    def _run_dvc(self, command):
        """Run a DVC command."""
        result = subprocess.run(
            f"dvc {command}",
            shell=True,
            cwd=self.repo_path,
            capture_output=True,
            text=True,
        )
        if result.returncode != 0:
            raise RuntimeError(
                f"DVC command failed: {result.stderr}"
            )
        return result.stdout

    def _parse_dvc_file(self, dvc_file_path):
        """Parse a .dvc file to extract hash and metadata."""
        # DVC files are YAML
        content = dvc_file_path.read_text()
        # Simple parser -- in production use yaml library
        lines = content.strip().split("\n")
        metadata = {}
        for line in lines:
            if ":" in line:
                key, value = line.split(":", 1)
                metadata[key.strip()] = value.strip()
        return metadata

Integrity Verification

Detecting Data Corruption

Data corruption can happen silently: a disk error flips a few bytes in a Parquet file, a network interruption truncates a download, a bug in preprocessing corrupts a fraction of rows. Integrity verification catches these issues before they affect training.

class IntegrityVerifier:
    """
    Verify dataset integrity at multiple levels:
    1. File-level: content hash matches stored hash
    2. Record-level: no corrupted/truncated records
    3. Schema-level: all files have consistent schemas
    4. Statistical-level: distributions are within expected ranges
    """

    def __init__(self, store):
        self.store = store
        self.hasher = DatasetHasher()

    def verify_version(self, name, version):
        """
        Full integrity check for a dataset version.
        Returns a report with any issues found.
        """
        report = {
            "name": name,
            "version": version,
            "checks": [],
            "passed": True,
        }

        manifest = self._load_manifest(name, version)
        if manifest is None:
            report["passed"] = False
            report["checks"].append({
                "check": "manifest_exists",
                "passed": False,
                "detail": "Manifest not found",
            })
            return report

        # Check 1: All referenced objects exist
        missing_objects = []
        for rel_path, file_info in manifest["files"].items():
            content_hash = file_info["hash"]
            obj_path = (self.store.objects_dir / content_hash[:2]
                        / content_hash[2:])
            if not obj_path.exists():
                missing_objects.append(rel_path)

        report["checks"].append({
            "check": "objects_exist",
            "passed": len(missing_objects) == 0,
            "detail": f"{len(missing_objects)} missing objects"
                      if missing_objects else "All objects present",
            "missing": missing_objects[:10],
        })
        if missing_objects:
            report["passed"] = False

        # Check 2: Content hashes match
        hash_mismatches = []
        for rel_path, file_info in manifest["files"].items():
            content_hash = file_info["hash"]
            obj_path = (self.store.objects_dir / content_hash[:2]
                        / content_hash[2:])
            if obj_path.exists():
                actual = self.hasher.hash_file(obj_path)
                if actual["hash"] != content_hash:
                    hash_mismatches.append({
                        "file": rel_path,
                        "expected": content_hash,
                        "actual": actual["hash"],
                    })

        report["checks"].append({
            "check": "hash_integrity",
            "passed": len(hash_mismatches) == 0,
            "detail": f"{len(hash_mismatches)} hash mismatches"
                      if hash_mismatches else "All hashes valid",
            "mismatches": hash_mismatches[:10],
        })
        if hash_mismatches:
            report["passed"] = False

        # Check 3: File sizes match
        size_mismatches = []
        for rel_path, file_info in manifest["files"].items():
            expected_size = file_info["size"]
            content_hash = file_info["hash"]
            obj_path = (self.store.objects_dir / content_hash[:2]
                        / content_hash[2:])
            if obj_path.exists():
                actual_size = os.path.getsize(obj_path)
                if actual_size != expected_size:
                    size_mismatches.append({
                        "file": rel_path,
                        "expected": expected_size,
                        "actual": actual_size,
                    })

        report["checks"].append({
            "check": "size_integrity",
            "passed": len(size_mismatches) == 0,
            "detail": f"{len(size_mismatches)} size mismatches"
                      if size_mismatches else "All sizes valid",
        })
        if size_mismatches:
            report["passed"] = False

        return report

    def _load_manifest(self, name, version):
        """Load a version manifest."""
        ref_path = self.store.refs_dir / name / f"{version}.json"
        if not ref_path.exists():
            return None
        with open(ref_path) as f:
            ref = json.load(f)
        manifest_path = (self.store.manifests_dir
                         / f"{ref['manifest_hash']}.json")
        if not manifest_path.exists():
            return None
        with open(manifest_path) as f:
            return json.load(f)

Automated Audit Trails

Training on copyrighted data, personal data, or licensed content creates legal obligations. An audit trail proves what data was used, when, and for what purpose.

class AuditTrail:
    """
    Maintain a tamper-evident audit trail for dataset operations.
    Each entry is linked to the previous entry's hash,
    creating a blockchain-like chain of custody.
    """

    def __init__(self, store_path):
        self.store_path = Path(store_path)
        self.audit_dir = self.store_path / "audit"
        self.audit_dir.mkdir(parents=True, exist_ok=True)
        self.chain = self._load_chain()

    def log_event(self, event_type, details):
        """
        Log an auditable event.
        Events are chained: each event's hash includes
        the previous event's hash.
        """
        previous_hash = (
            self.chain[-1]["event_hash"]
            if self.chain else "genesis"
        )

        event = {
            "event_id": str(uuid.uuid4()),
            "event_type": event_type,
            "timestamp": datetime.utcnow().isoformat(),
            "details": details,
            "previous_hash": previous_hash,
        }

        # Compute event hash (includes previous hash for chaining)
        event_data = json.dumps(event, sort_keys=True)
        event["event_hash"] = hashlib.sha256(
            event_data.encode()
        ).hexdigest()

        self.chain.append(event)
        self._persist_event(event)

        return event

    def verify_chain(self):
        """
        Verify the integrity of the audit chain.
        If any event has been tampered with, the chain breaks.
        """
        if not self.chain:
            return {"valid": True, "length": 0}

        for i, event in enumerate(self.chain):
            # Check previous hash linkage
            if i == 0:
                expected_prev = "genesis"
            else:
                expected_prev = self.chain[i - 1]["event_hash"]

            if event["previous_hash"] != expected_prev:
                return {
                    "valid": False,
                    "break_at": i,
                    "event_id": event["event_id"],
                    "reason": "Previous hash mismatch",
                }

            # Verify event hash
            event_copy = dict(event)
            stored_hash = event_copy.pop("event_hash")
            computed_hash = hashlib.sha256(
                json.dumps(event_copy, sort_keys=True).encode()
            ).hexdigest()

            if computed_hash != stored_hash:
                return {
                    "valid": False,
                    "break_at": i,
                    "event_id": event["event_id"],
                    "reason": "Event hash mismatch (tampered)",
                }

        return {"valid": True, "length": len(self.chain)}

    def get_events_for_dataset(self, dataset_name):
        """Get all audit events for a specific dataset."""
        return [
            e for e in self.chain
            if e["details"].get("dataset") == dataset_name
        ]

    def generate_compliance_report(self, dataset_name, version):
        """
        Generate a compliance report showing the full chain
        of custody for a dataset version.
        """
        events = self.get_events_for_dataset(dataset_name)
        version_events = [
            e for e in events
            if e["details"].get("version") == version
            or e["details"].get("output_version") == version
        ]

        report = {
            "dataset": dataset_name,
            "version": version,
            "generated_at": datetime.utcnow().isoformat(),
            "chain_valid": self.verify_chain()["valid"],
            "events": version_events,
            "summary": {
                "total_events": len(version_events),
                "event_types": Counter(
                    e["event_type"] for e in version_events
                ),
            },
        }

        return report

    def _load_chain(self):
        """Load the audit chain from disk."""
        chain = []
        for path in sorted(self.audit_dir.glob("*.json")):
            with open(path) as f:
                chain.append(json.load(f))
        return chain

    def _persist_event(self, event):
        """Save an audit event to disk."""
        # Use timestamp + event_id for filename ordering
        filename = f"{event['timestamp']}_{event['event_id'][:8]}.json"
        path = self.audit_dir / filename
        with open(path, "w") as f:
            json.dump(event, f, indent=2)

Complete Versioning Pipeline

Putting It All Together

class DataVersioningPipeline:
    """
    Complete data versioning pipeline that integrates:
    - Content-addressable storage
    - Change detection
    - Lineage tracking
    - Training run binding
    - Integrity verification
    - Audit logging
    """

    def __init__(self, store_path):
        self.store = ContentAddressableStore(store_path)
        self.lineage = LineageTracker(store_path)
        self.run_tracker = TrainingRunTracker(self.store)
        self.verifier = IntegrityVerifier(self.store)
        self.audit = AuditTrail(store_path)
        self.differ = DatasetDiffer(self.store)

    def ingest_dataset(self, source_dir, name, version,
                        description=""):
        """Ingest a new dataset version."""
        # Store the data
        manifest = self.store.store_dataset(
            source_dir, name, version
        )

        # Log the ingestion
        self.audit.log_event("dataset_ingested", {
            "dataset": name,
            "version": version,
            "file_count": manifest["file_count"],
            "total_size": manifest["total_size"],
            "manifest_hash": manifest["manifest_hash"],
            "description": description,
        })

        # Verify integrity immediately
        report = self.verifier.verify_version(name, version)
        if not report["passed"]:
            raise RuntimeError(
                f"Integrity verification failed: {report}"
            )

        return manifest

    def transform_dataset(self, name, input_version,
                           output_version, transform_fn,
                           transform_type, parameters):
        """
        Apply a transformation and track it.
        """
        import time

        # Restore input version
        input_dir = Path(f"/tmp/dvp_input_{input_version}")
        self.store.restore_dataset(name, input_version, input_dir)

        # Apply transformation
        output_dir = Path(f"/tmp/dvp_output_{output_version}")
        start = time.time()
        input_count, output_count = transform_fn(
            input_dir, output_dir
        )
        duration = time.time() - start

        # Store output
        self.ingest_dataset(output_dir, name, output_version)

        # Record lineage
        self.lineage.record_transform(
            transform_type=transform_type,
            input_version=input_version,
            output_version=output_version,
            parameters=parameters,
            input_count=input_count,
            output_count=output_count,
            duration=duration,
        )

        # Audit log
        self.audit.log_event("dataset_transformed", {
            "dataset": name,
            "input_version": input_version,
            "output_version": output_version,
            "transform_type": transform_type,
            "rows_in": input_count,
            "rows_out": output_count,
        })

        return {
            "input_count": input_count,
            "output_count": output_count,
            "duration": duration,
        }

    def start_training(self, run_id, model_name, datasets,
                        hyperparameters, hardware, git_commit):
        """Start a training run with full data binding."""
        # Verify all dataset versions before training
        for name, version in datasets.items():
            report = self.verifier.verify_version(name, version)
            if not report["passed"]:
                raise RuntimeError(
                    f"Dataset {name} v{version} failed "
                    f"integrity check: {report}"
                )

        manifest = self.run_tracker.start_run(
            run_id, model_name, datasets,
            hyperparameters, hardware, git_commit,
        )

        self.audit.log_event("training_started", {
            "run_id": run_id,
            "model": model_name,
            "datasets": datasets,
        })

        return manifest

    def debug_regression(self, run_a, run_b):
        """
        Debug a regression between two training runs.
        Returns a complete analysis of what changed.
        """
        comparison = self.run_tracker.compare_runs(run_a, run_b)

        # For each data change, get detailed diff
        for name, change in comparison["data_changes"].items():
            if change.get("run_a") and change.get("run_b"):
                detailed = self.differ.diff_detailed(
                    name, change["run_a"], change["run_b"]
                )
                change["detailed_diff"] = detailed

                # Get lineage for both versions
                lineage_a = self.lineage.get_lineage_summary(
                    change["run_a"]
                )
                lineage_b = self.lineage.get_lineage_summary(
                    change["run_b"]
                )
                change["lineage_a"] = lineage_a
                change["lineage_b"] = lineage_b

        return comparison

Data Versioning Overhead per Operation

Metric Hash 1GBHash 100GBStore 1GBStore 100GBDiff (metadata)Diff (detailed)Verify 100GB
Time (seconds)
2.5
250
8
800
0.1
30
260

Key Takeaways

Data versioning is infrastructure, not overhead. When a $3 million training run regresses, the ability to diff dataset versions and identify the root cause in minutes rather than days pays for the versioning system many times over.

The critical components:

  1. Content-addressable storage: Every dataset version is identified by a cryptographic hash. This provides integrity, deduplication, and immutability. The hash is the version.

  2. Multi-level diffing: File-level diffs are fast (compare hashes). Schema-level and statistical-level diffs catch subtle changes. Row-level diffs pinpoint exact modifications but are expensive for large datasets.

  3. Lineage tracking: Record every transformation applied to the data. When debugging, trace the full pipeline from raw source to final training data. The lineage DAG is the data equivalent of a stack trace.

  4. Training run binding: Every training run manifest records exact dataset versions, hyperparameters, code commit, and hardware. The compare_runs function is the primary debugging tool.

  5. Audit trails: Chain-linked event logs provide tamper-evident records for compliance. Every dataset operation is logged with enough detail to reconstruct the full chain of custody.

The cost of versioning is small relative to training: storing two versions of a 10TB dataset with content-addressable deduplication typically adds 10-20% storage overhead (only changed files are duplicated). The compute cost of hashing is O(n)O(n) in dataset size, approximately 1GB/s on modern hardware. For a 15TB pretraining dataset, the full hash takes about 4 hours — negligible compared to the weeks of GPU time for training.