A 4M. Data versioning is not a nice-to-have; it is mandatory infrastructure when training runs cost more than most engineers’ annual salaries.
This post covers the complete data versioning stack: content-addressable storage for large datasets, efficient change detection, metadata tracking, integration with training pipelines, and production-scale implementation.
The Versioning Problem for Training Data
Why Git Alone Fails
Git tracks file changes efficiently through delta compression and content-addressable storage. But training datasets are fundamentally different from source code:
from dataclasses import dataclass
@dataclass
class VersioningChallenge:
problem: str
git_limit: str
required_solution: str
CHALLENGES = [
VersioningChallenge(
problem="Dataset size (1-50 TB for pretraining)",
git_limit="Git stores full snapshots in .git, "
"cloning a 50TB repo is impractical",
required_solution="Content-addressable storage with "
"lazy fetching (download on demand)",
),
VersioningChallenge(
problem="Binary formats (Parquet, Arrow, safetensors)",
git_limit="Git diff cannot meaningfully diff binary files, "
"delta compression is poor on binary data",
required_solution="Semantic diffing: compare schemas, "
"row counts, column stats, sample rows",
),
VersioningChallenge(
problem="Streaming datasets (continuously updated)",
git_limit="Git commits are point-in-time snapshots, "
"no concept of append-only streams",
required_solution="Append-only log with checkpoint commits "
"at significant points",
),
VersioningChallenge(
problem="Data lineage (this row came from CommonCrawl "
"dump 2024-09, filtered by quality score > 0.8)",
git_limit="Git tracks file-level changes, not "
"row-level provenance",
required_solution="Per-row metadata with source tracking "
"and transformation history",
),
VersioningChallenge(
problem="Distributed teams (50 people contributing data)",
git_limit="Git merge conflicts on binary data are unresolvable",
required_solution="Partition-level locking with "
"append-only semantics per partition",
),
]
Storage Backend Comparison for Dataset Versioning
| Backend | Max Dataset Size | Clone Time (1TB) | Diff Speed | Streaming Support |
|---|---|---|---|---|
| Git (bare) | ~5 GB practical | N/A | Line-level | No |
| Git LFS | ~100 GB practical | ~2 hours | File-level only | No |
| DVC + S3 | Unlimited | Seconds (lazy) | File-level + stats | Partial |
| LakeFS | Unlimited | Seconds (branching) | Row-level | Yes |
| Delta Lake | Unlimited | N/A (query) | Row-level | Yes |
| HF Datasets | ~5 TB practical | Minutes (streaming) | Commit-level | Yes |
Content-Addressable Dataset Storage
Hashing for Integrity
Every dataset version is identified by a cryptographic hash of its contents. This provides three guarantees: integrity (any modification changes the hash), deduplication (identical data stored once), and immutability (a hash always refers to the same data).
import hashlib
import struct
import os
from pathlib import Path
from typing import Iterator
class DatasetHasher:
"""
Content-addressable hashing for datasets.
Supports both file-level and chunk-level hashing
for efficient deduplication.
"""
def __init__(self, algorithm="sha256", chunk_size=64 * 1024 * 1024):
self.algorithm = algorithm
self.chunk_size = chunk_size # 64MB chunks
def hash_file(self, path):
"""
Hash a single file. For large files, this reads
in chunks to avoid loading the entire file into memory.
"""
h = hashlib.new(self.algorithm)
file_size = os.path.getsize(path)
with open(path, "rb") as f:
while True:
chunk = f.read(self.chunk_size)
if not chunk:
break
h.update(chunk)
return {
"hash": h.hexdigest(),
"algorithm": self.algorithm,
"size": file_size,
"path": str(path),
}
def hash_directory(self, directory):
"""
Hash an entire dataset directory.
The directory hash is computed from the sorted list
of (path, hash) pairs, making it deterministic
regardless of filesystem ordering.
"""
directory = Path(directory)
file_hashes = []
for path in sorted(directory.rglob("*")):
if path.is_file():
rel_path = path.relative_to(directory)
file_hash = self.hash_file(path)
file_hashes.append((str(rel_path), file_hash["hash"]))
# Hash the sorted list of (path, hash) pairs
h = hashlib.new(self.algorithm)
for rel_path, file_hash in file_hashes:
h.update(rel_path.encode("utf-8"))
h.update(file_hash.encode("utf-8"))
return {
"directory_hash": h.hexdigest(),
"file_count": len(file_hashes),
"file_hashes": dict(file_hashes),
}
def hash_streaming(self, data_iterator):
"""
Hash a streaming dataset without materializing it.
Each record is serialized and fed to the hasher.
"""
h = hashlib.new(self.algorithm)
record_count = 0
for record in data_iterator:
# Serialize record deterministically
serialized = self._deterministic_serialize(record)
h.update(serialized)
record_count += 1
return {
"hash": h.hexdigest(),
"record_count": record_count,
}
def _deterministic_serialize(self, record):
"""
Serialize a record in a deterministic way.
Dict keys are sorted, floats are rounded to avoid
platform-dependent precision differences.
"""
if isinstance(record, dict):
items = sorted(record.items())
parts = []
for key, value in items:
parts.append(
self._deterministic_serialize(key)
+ b":"
+ self._deterministic_serialize(value)
)
return b"{" + b",".join(parts) + b"}"
elif isinstance(record, str):
encoded = record.encode("utf-8")
return struct.pack(">I", len(encoded)) + encoded
elif isinstance(record, (int, float)):
return str(record).encode("utf-8")
elif isinstance(record, bytes):
return struct.pack(">I", len(record)) + record
elif isinstance(record, (list, tuple)):
parts = [self._deterministic_serialize(item)
for item in record]
return b"[" + b",".join(parts) + b"]"
else:
return str(record).encode("utf-8")
Content-Addressable Store
import json
import shutil
from datetime import datetime
class ContentAddressableStore:
"""
Store dataset files by their content hash.
Similar to how Git stores objects, but optimized
for large binary files.
Storage layout:
store/
objects/
ab/cdef1234... (first 2 chars as directory)
refs/
datasets/
my-dataset/
v1.json (points to object hashes)
v2.json
manifests/
abc123.json (maps version to file hashes)
"""
def __init__(self, store_path):
self.store_path = Path(store_path)
self.objects_dir = self.store_path / "objects"
self.refs_dir = self.store_path / "refs" / "datasets"
self.manifests_dir = self.store_path / "manifests"
for d in [self.objects_dir, self.refs_dir, self.manifests_dir]:
d.mkdir(parents=True, exist_ok=True)
self.hasher = DatasetHasher()
def store_file(self, source_path):
"""
Store a file by its content hash.
Returns the hash. If the file already exists
(same content), this is a no-op.
"""
file_info = self.hasher.hash_file(source_path)
content_hash = file_info["hash"]
# Store in objects directory with two-level hashing
obj_dir = self.objects_dir / content_hash[:2]
obj_path = obj_dir / content_hash[2:]
if not obj_path.exists():
obj_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(source_path, obj_path)
return content_hash
def store_dataset(self, dataset_dir, name, version):
"""
Store an entire dataset directory.
Creates a manifest mapping filenames to content hashes.
"""
dataset_dir = Path(dataset_dir)
manifest = {
"name": name,
"version": version,
"created": datetime.utcnow().isoformat(),
"files": {},
}
total_size = 0
for path in sorted(dataset_dir.rglob("*")):
if path.is_file():
rel_path = str(path.relative_to(dataset_dir))
content_hash = self.store_file(path)
file_size = os.path.getsize(path)
manifest["files"][rel_path] = {
"hash": content_hash,
"size": file_size,
}
total_size += file_size
manifest["total_size"] = total_size
manifest["file_count"] = len(manifest["files"])
# Hash the manifest itself for the version reference
manifest_hash = hashlib.sha256(
json.dumps(manifest, sort_keys=True).encode()
).hexdigest()
manifest["manifest_hash"] = manifest_hash
# Save manifest
manifest_path = self.manifests_dir / f"{manifest_hash}.json"
with open(manifest_path, "w") as f:
json.dump(manifest, f, indent=2)
# Update version reference
ref_dir = self.refs_dir / name
ref_dir.mkdir(parents=True, exist_ok=True)
ref_path = ref_dir / f"{version}.json"
with open(ref_path, "w") as f:
json.dump({
"manifest_hash": manifest_hash,
"version": version,
"created": manifest["created"],
}, f, indent=2)
return manifest
def restore_dataset(self, name, version, target_dir):
"""
Restore a dataset version to a target directory.
"""
target_dir = Path(target_dir)
target_dir.mkdir(parents=True, exist_ok=True)
# Load version reference
ref_path = self.refs_dir / name / f"{version}.json"
with open(ref_path) as f:
ref = json.load(f)
# Load manifest
manifest_path = self.manifests_dir / f"{ref['manifest_hash']}.json"
with open(manifest_path) as f:
manifest = json.load(f)
# Restore files
for rel_path, file_info in manifest["files"].items():
content_hash = file_info["hash"]
obj_path = (self.objects_dir / content_hash[:2]
/ content_hash[2:])
dest_path = target_dir / rel_path
dest_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(obj_path, dest_path)
return manifest
Efficient Change Detection
Diffing Dataset Versions
When a training run regresses, the first question is: what changed in the data? A dataset diff compares two versions at multiple levels: file-level (which files were added/removed/modified), schema-level (did columns change), statistics-level (did distributions shift), and row-level (which specific rows changed).
from collections import Counter
@dataclass
class DatasetDiff:
version_a: str
version_b: str
files_added: list
files_removed: list
files_modified: list
files_unchanged: list
total_size_change: int
row_count_change: int
schema_changes: list
statistics_changes: dict
class DatasetDiffer:
"""
Compute diffs between two dataset versions.
Uses content hashes for fast file-level comparison,
then drills into modified files for detailed diffs.
"""
def __init__(self, store):
self.store = store
def diff(self, name, version_a, version_b):
"""
Compute the diff between two dataset versions.
"""
manifest_a = self._load_manifest(name, version_a)
manifest_b = self._load_manifest(name, version_b)
files_a = set(manifest_a["files"].keys())
files_b = set(manifest_b["files"].keys())
added = files_b - files_a
removed = files_a - files_b
common = files_a & files_b
modified = []
unchanged = []
for f in common:
hash_a = manifest_a["files"][f]["hash"]
hash_b = manifest_b["files"][f]["hash"]
if hash_a != hash_b:
modified.append(f)
else:
unchanged.append(f)
# Compute size change
size_a = manifest_a.get("total_size", 0)
size_b = manifest_b.get("total_size", 0)
# Compute detailed stats for modified files
stats_changes = {}
for f in modified:
stats_changes[f] = self._compute_file_stats_diff(
manifest_a["files"][f],
manifest_b["files"][f],
)
return DatasetDiff(
version_a=version_a,
version_b=version_b,
files_added=sorted(added),
files_removed=sorted(removed),
files_modified=sorted(modified),
files_unchanged=sorted(unchanged),
total_size_change=size_b - size_a,
row_count_change=0, # Computed per-file
schema_changes=[], # Computed per-file
statistics_changes=stats_changes,
)
def diff_detailed(self, name, version_a, version_b,
sample_rows=100):
"""
Compute a detailed diff including row-level changes
for modified files. This is more expensive because
it reads actual file contents.
"""
basic_diff = self.diff(name, version_a, version_b)
detailed = {
"basic": basic_diff,
"modified_file_details": {},
}
for f in basic_diff.files_modified:
# Load both versions of the file
data_a = self._load_file_data(name, version_a, f)
data_b = self._load_file_data(name, version_b, f)
detail = {
"row_count_a": len(data_a),
"row_count_b": len(data_b),
"row_count_change": len(data_b) - len(data_a),
}
# Schema comparison
if data_a and data_b:
cols_a = set(data_a[0].keys()) if isinstance(data_a[0], dict) else set()
cols_b = set(data_b[0].keys()) if isinstance(data_b[0], dict) else set()
detail["columns_added"] = list(cols_b - cols_a)
detail["columns_removed"] = list(cols_a - cols_b)
# Statistical comparison
detail["stats"] = self._compare_column_stats(
data_a, data_b
)
# Sample of changed rows
detail["sample_changes"] = self._sample_row_changes(
data_a, data_b, sample_rows
)
detailed["modified_file_details"][f] = detail
return detailed
def _load_manifest(self, name, version):
"""Load a version manifest."""
ref_path = self.store.refs_dir / name / f"{version}.json"
with open(ref_path) as f:
ref = json.load(f)
manifest_path = (self.store.manifests_dir
/ f"{ref['manifest_hash']}.json")
with open(manifest_path) as f:
return json.load(f)
def _compute_file_stats_diff(self, info_a, info_b):
"""Compare basic file stats."""
return {
"size_change": info_b["size"] - info_a["size"],
"size_change_pct": (
(info_b["size"] - info_a["size"])
/ max(info_a["size"], 1) * 100
),
}
def _load_file_data(self, name, version, filename):
"""Load actual file data for detailed comparison."""
# In production: read Parquet/Arrow files
return []
def _compare_column_stats(self, data_a, data_b):
"""Compare column-level statistics between versions."""
stats = {}
if not data_a or not data_b:
return stats
if not isinstance(data_a[0], dict):
return stats
for col in data_a[0].keys():
vals_a = [row.get(col) for row in data_a
if col in row]
vals_b = [row.get(col) for row in data_b
if col in row]
if vals_a and vals_b:
if isinstance(vals_a[0], (int, float)):
stats[col] = {
"mean_a": sum(vals_a) / len(vals_a),
"mean_b": sum(vals_b) / len(vals_b),
"min_a": min(vals_a),
"min_b": min(vals_b),
"max_a": max(vals_a),
"max_b": max(vals_b),
}
elif isinstance(vals_a[0], str):
len_a = [len(v) for v in vals_a]
len_b = [len(v) for v in vals_b]
stats[col] = {
"avg_length_a": sum(len_a) / len(len_a),
"avg_length_b": sum(len_b) / len(len_b),
"unique_a": len(set(vals_a)),
"unique_b": len(set(vals_b)),
}
return stats
def _sample_row_changes(self, data_a, data_b, n):
"""Sample rows that differ between versions."""
# Simple: hash each row, find non-matching hashes
hasher = DatasetHasher()
hashes_a = set()
for row in data_a:
h = hashlib.sha256(str(sorted(row.items())).encode()
if isinstance(row, dict)
else str(row).encode()).hexdigest()
hashes_a.add(h)
new_rows = []
for row in data_b:
h = hashlib.sha256(str(sorted(row.items())).encode()
if isinstance(row, dict)
else str(row).encode()).hexdigest()
if h not in hashes_a:
new_rows.append(row)
if len(new_rows) >= n:
break
return new_rows
For datasets stored in Parquet format, column-level statistics (min, max, count, null count) are stored in the Parquet footer. You can compare these statistics between versions without reading any row data, making file-level statistical diffs nearly instantaneous even for multi-terabyte datasets.
Metadata and Lineage Tracking
Tracking Data Transformations
Training data goes through multiple transformations: filtering, deduplication, quality scoring, tokenization. Each transformation modifies the dataset. The lineage tracker records every transformation so you can reconstruct the exact pipeline that produced any dataset version.
from datetime import datetime
import uuid
@dataclass
class TransformRecord:
transform_id: str
transform_type: str # "filter", "dedup", "score", "tokenize"
input_version: str
output_version: str
parameters: dict
timestamp: str
input_row_count: int
output_row_count: int
rows_removed: int
rows_added: int
duration_seconds: float
class LineageTracker:
"""
Track the lineage of dataset transformations.
Every operation on a dataset is recorded, creating
a DAG (directed acyclic graph) of transformations
from raw sources to final training data.
"""
def __init__(self, store_path):
self.store_path = Path(store_path)
self.lineage_dir = self.store_path / "lineage"
self.lineage_dir.mkdir(parents=True, exist_ok=True)
self.records = []
def record_transform(self, transform_type, input_version,
output_version, parameters,
input_count, output_count, duration):
"""Record a dataset transformation."""
record = TransformRecord(
transform_id=str(uuid.uuid4())[:8],
transform_type=transform_type,
input_version=input_version,
output_version=output_version,
parameters=parameters,
timestamp=datetime.utcnow().isoformat(),
input_row_count=input_count,
output_row_count=output_count,
rows_removed=max(0, input_count - output_count),
rows_added=max(0, output_count - input_count),
duration_seconds=duration,
)
self.records.append(record)
self._persist_record(record)
return record
def get_lineage(self, version):
"""
Trace the full lineage of a dataset version.
Returns the chain of transformations from raw source
to the specified version.
"""
chain = []
current = version
# Walk backwards through the transformation graph
while True:
record = self._find_record_producing(current)
if record is None:
break
chain.append(record)
current = record.input_version
chain.reverse()
return chain
def get_lineage_summary(self, version):
"""
Human-readable summary of dataset lineage.
"""
chain = self.get_lineage(version)
summary_lines = [f"Lineage for version: {version}"]
summary_lines.append(f"Total transformations: {len(chain)}")
summary_lines.append("")
for i, record in enumerate(chain):
summary_lines.append(
f"Step {i + 1}: {record.transform_type}"
)
summary_lines.append(
f" Input: {record.input_version} "
f"({record.input_row_count:,} rows)"
)
summary_lines.append(
f" Output: {record.output_version} "
f"({record.output_row_count:,} rows)"
)
if record.rows_removed > 0:
pct = (record.rows_removed
/ record.input_row_count * 100)
summary_lines.append(
f" Removed: {record.rows_removed:,} rows "
f"({pct:.1f}%)"
)
summary_lines.append(
f" Duration: {record.duration_seconds:.1f}s"
)
summary_lines.append(
f" Parameters: {json.dumps(record.parameters)}"
)
summary_lines.append("")
return "\n".join(summary_lines)
def _find_record_producing(self, version):
"""Find the transform that produced a version."""
for record in reversed(self.records):
if record.output_version == version:
return record
return None
def _persist_record(self, record):
"""Save record to disk."""
path = self.lineage_dir / f"{record.transform_id}.json"
with open(path, "w") as f:
json.dump({
"transform_id": record.transform_id,
"transform_type": record.transform_type,
"input_version": record.input_version,
"output_version": record.output_version,
"parameters": record.parameters,
"timestamp": record.timestamp,
"input_row_count": record.input_row_count,
"output_row_count": record.output_row_count,
"rows_removed": record.rows_removed,
"rows_added": record.rows_added,
"duration_seconds": record.duration_seconds,
}, f, indent=2)
Training Run Manifest
Binding Data Versions to Training Runs
Every training run should produce a manifest that records exactly which dataset versions were used. This manifest is the key artifact for reproducibility.
@dataclass
class TrainingRunManifest:
run_id: str
model_name: str
started_at: str
completed_at: str
datasets: dict # name -> version mapping
hyperparameters: dict
hardware: dict
git_commit: str # Code version
metrics: dict # Final evaluation metrics
data_hashes: dict # dataset name -> content hash
class TrainingRunTracker:
"""
Track the complete specification of a training run.
This is the artifact that enables exact reproducibility.
"""
def __init__(self, store):
self.store = store
self.runs_dir = store.store_path / "runs"
self.runs_dir.mkdir(parents=True, exist_ok=True)
def start_run(self, run_id, model_name, datasets,
hyperparameters, hardware, git_commit):
"""
Record the start of a training run.
Captures all dataset versions before training begins.
"""
# Capture exact dataset hashes
data_hashes = {}
for name, version in datasets.items():
manifest = self._load_dataset_manifest(name, version)
if manifest:
data_hashes[name] = manifest.get("manifest_hash", "")
run_manifest = TrainingRunManifest(
run_id=run_id,
model_name=model_name,
started_at=datetime.utcnow().isoformat(),
completed_at="",
datasets=datasets,
hyperparameters=hyperparameters,
hardware=hardware,
git_commit=git_commit,
metrics={},
data_hashes=data_hashes,
)
self._save_manifest(run_manifest)
return run_manifest
def complete_run(self, run_id, metrics):
"""Record the completion of a training run."""
manifest = self._load_run_manifest(run_id)
manifest.completed_at = datetime.utcnow().isoformat()
manifest.metrics = metrics
self._save_manifest(manifest)
return manifest
def compare_runs(self, run_id_a, run_id_b):
"""
Compare two training runs to identify differences.
This is the debugging tool: when run B is worse than
run A, what changed?
"""
manifest_a = self._load_run_manifest(run_id_a)
manifest_b = self._load_run_manifest(run_id_b)
comparison = {
"data_changes": {},
"hyperparameter_changes": {},
"hardware_changes": {},
"metric_changes": {},
}
# Compare datasets
all_datasets = set(
list(manifest_a.datasets.keys())
+ list(manifest_b.datasets.keys())
)
for name in all_datasets:
ver_a = manifest_a.datasets.get(name)
ver_b = manifest_b.datasets.get(name)
if ver_a != ver_b:
comparison["data_changes"][name] = {
"run_a": ver_a,
"run_b": ver_b,
}
# If both versions exist, compute the diff
if ver_a and ver_b:
differ = DatasetDiffer(self.store)
diff = differ.diff(name, ver_a, ver_b)
comparison["data_changes"][name]["diff"] = {
"files_added": len(diff.files_added),
"files_removed": len(diff.files_removed),
"files_modified": len(diff.files_modified),
"size_change": diff.total_size_change,
}
# Compare hyperparameters
all_params = set(
list(manifest_a.hyperparameters.keys())
+ list(manifest_b.hyperparameters.keys())
)
for param in all_params:
val_a = manifest_a.hyperparameters.get(param)
val_b = manifest_b.hyperparameters.get(param)
if val_a != val_b:
comparison["hyperparameter_changes"][param] = {
"run_a": val_a,
"run_b": val_b,
}
# Compare metrics
all_metrics = set(
list(manifest_a.metrics.keys())
+ list(manifest_b.metrics.keys())
)
for metric in all_metrics:
val_a = manifest_a.metrics.get(metric, 0)
val_b = manifest_b.metrics.get(metric, 0)
comparison["metric_changes"][metric] = {
"run_a": val_a,
"run_b": val_b,
"change": val_b - val_a if isinstance(val_a, (int, float)) else "N/A",
}
return comparison
def _load_dataset_manifest(self, name, version):
"""Load dataset manifest."""
ref_path = self.store.refs_dir / name / f"{version}.json"
if not ref_path.exists():
return None
with open(ref_path) as f:
ref = json.load(f)
manifest_path = (self.store.manifests_dir
/ f"{ref['manifest_hash']}.json")
with open(manifest_path) as f:
return json.load(f)
def _load_run_manifest(self, run_id):
"""Load a training run manifest."""
path = self.runs_dir / f"{run_id}.json"
with open(path) as f:
data = json.load(f)
return TrainingRunManifest(**data)
def _save_manifest(self, manifest):
"""Save a training run manifest."""
path = self.runs_dir / f"{manifest.run_id}.json"
with open(path, "w") as f:
json.dump({
"run_id": manifest.run_id,
"model_name": manifest.model_name,
"started_at": manifest.started_at,
"completed_at": manifest.completed_at,
"datasets": manifest.datasets,
"hyperparameters": manifest.hyperparameters,
"hardware": manifest.hardware,
"git_commit": manifest.git_commit,
"metrics": manifest.metrics,
"data_hashes": manifest.data_hashes,
}, f, indent=2)
The most common debugging pattern: run B regresses on a benchmark, compare_runs(A, B) shows a data version change, diff_detailed(dataset, v1, v2) shows that a filtering step was tightened, removing 5% of math-related training examples. The benchmark regression was on GSM8K. Cause identified.
DVC Integration
Using DVC for Dataset Versioning
DVC (Data Version Control) is the most widely used tool for versioning ML datasets. It stores pointers (.dvc files) in Git while the actual data lives in remote storage (S3, GCS, Azure Blob).
import subprocess
class DVCManager:
"""
Manage dataset versioning through DVC.
Wraps DVC CLI commands with Python interfaces.
"""
def __init__(self, repo_path):
self.repo_path = Path(repo_path)
def init(self):
"""Initialize DVC in the repo."""
self._run_dvc("init")
def add_remote(self, name, url):
"""Add a DVC remote storage backend."""
self._run_dvc(f"remote add {name} {url}")
def track_dataset(self, dataset_path):
"""
Start tracking a dataset with DVC.
This creates a .dvc file that Git tracks,
while the actual data is managed by DVC.
"""
self._run_dvc(f"add {dataset_path}")
# The .dvc file contains the md5 hash of the data
dvc_file = Path(f"{dataset_path}.dvc")
return self._parse_dvc_file(dvc_file)
def push(self, remote="origin"):
"""Push data to remote storage."""
self._run_dvc(f"push -r {remote}")
def pull(self, remote="origin"):
"""Pull data from remote storage."""
self._run_dvc(f"pull -r {remote}")
def checkout(self, git_ref):
"""
Check out a specific version of the data.
First checkout the Git ref (which updates .dvc files),
then DVC checkout (which updates the actual data files).
"""
# Git checkout updates .dvc pointer files
subprocess.run(
["git", "checkout", git_ref],
cwd=self.repo_path, check=True,
)
# DVC checkout updates actual data to match pointers
self._run_dvc("checkout")
def diff(self, rev_a, rev_b):
"""Compare data between two Git revisions."""
result = self._run_dvc(f"diff {rev_a} {rev_b}")
return result
def get_data_hash(self, dataset_path):
"""Get the current content hash of tracked data."""
dvc_file = Path(f"{dataset_path}.dvc")
if dvc_file.exists():
return self._parse_dvc_file(dvc_file)
return None
def _run_dvc(self, command):
"""Run a DVC command."""
result = subprocess.run(
f"dvc {command}",
shell=True,
cwd=self.repo_path,
capture_output=True,
text=True,
)
if result.returncode != 0:
raise RuntimeError(
f"DVC command failed: {result.stderr}"
)
return result.stdout
def _parse_dvc_file(self, dvc_file_path):
"""Parse a .dvc file to extract hash and metadata."""
# DVC files are YAML
content = dvc_file_path.read_text()
# Simple parser -- in production use yaml library
lines = content.strip().split("\n")
metadata = {}
for line in lines:
if ":" in line:
key, value = line.split(":", 1)
metadata[key.strip()] = value.strip()
return metadata
Integrity Verification
Detecting Data Corruption
Data corruption can happen silently: a disk error flips a few bytes in a Parquet file, a network interruption truncates a download, a bug in preprocessing corrupts a fraction of rows. Integrity verification catches these issues before they affect training.
class IntegrityVerifier:
"""
Verify dataset integrity at multiple levels:
1. File-level: content hash matches stored hash
2. Record-level: no corrupted/truncated records
3. Schema-level: all files have consistent schemas
4. Statistical-level: distributions are within expected ranges
"""
def __init__(self, store):
self.store = store
self.hasher = DatasetHasher()
def verify_version(self, name, version):
"""
Full integrity check for a dataset version.
Returns a report with any issues found.
"""
report = {
"name": name,
"version": version,
"checks": [],
"passed": True,
}
manifest = self._load_manifest(name, version)
if manifest is None:
report["passed"] = False
report["checks"].append({
"check": "manifest_exists",
"passed": False,
"detail": "Manifest not found",
})
return report
# Check 1: All referenced objects exist
missing_objects = []
for rel_path, file_info in manifest["files"].items():
content_hash = file_info["hash"]
obj_path = (self.store.objects_dir / content_hash[:2]
/ content_hash[2:])
if not obj_path.exists():
missing_objects.append(rel_path)
report["checks"].append({
"check": "objects_exist",
"passed": len(missing_objects) == 0,
"detail": f"{len(missing_objects)} missing objects"
if missing_objects else "All objects present",
"missing": missing_objects[:10],
})
if missing_objects:
report["passed"] = False
# Check 2: Content hashes match
hash_mismatches = []
for rel_path, file_info in manifest["files"].items():
content_hash = file_info["hash"]
obj_path = (self.store.objects_dir / content_hash[:2]
/ content_hash[2:])
if obj_path.exists():
actual = self.hasher.hash_file(obj_path)
if actual["hash"] != content_hash:
hash_mismatches.append({
"file": rel_path,
"expected": content_hash,
"actual": actual["hash"],
})
report["checks"].append({
"check": "hash_integrity",
"passed": len(hash_mismatches) == 0,
"detail": f"{len(hash_mismatches)} hash mismatches"
if hash_mismatches else "All hashes valid",
"mismatches": hash_mismatches[:10],
})
if hash_mismatches:
report["passed"] = False
# Check 3: File sizes match
size_mismatches = []
for rel_path, file_info in manifest["files"].items():
expected_size = file_info["size"]
content_hash = file_info["hash"]
obj_path = (self.store.objects_dir / content_hash[:2]
/ content_hash[2:])
if obj_path.exists():
actual_size = os.path.getsize(obj_path)
if actual_size != expected_size:
size_mismatches.append({
"file": rel_path,
"expected": expected_size,
"actual": actual_size,
})
report["checks"].append({
"check": "size_integrity",
"passed": len(size_mismatches) == 0,
"detail": f"{len(size_mismatches)} size mismatches"
if size_mismatches else "All sizes valid",
})
if size_mismatches:
report["passed"] = False
return report
def _load_manifest(self, name, version):
"""Load a version manifest."""
ref_path = self.store.refs_dir / name / f"{version}.json"
if not ref_path.exists():
return None
with open(ref_path) as f:
ref = json.load(f)
manifest_path = (self.store.manifests_dir
/ f"{ref['manifest_hash']}.json")
if not manifest_path.exists():
return None
with open(manifest_path) as f:
return json.load(f)
Automated Audit Trails
Compliance and Legal Requirements
Training on copyrighted data, personal data, or licensed content creates legal obligations. An audit trail proves what data was used, when, and for what purpose.
class AuditTrail:
"""
Maintain a tamper-evident audit trail for dataset operations.
Each entry is linked to the previous entry's hash,
creating a blockchain-like chain of custody.
"""
def __init__(self, store_path):
self.store_path = Path(store_path)
self.audit_dir = self.store_path / "audit"
self.audit_dir.mkdir(parents=True, exist_ok=True)
self.chain = self._load_chain()
def log_event(self, event_type, details):
"""
Log an auditable event.
Events are chained: each event's hash includes
the previous event's hash.
"""
previous_hash = (
self.chain[-1]["event_hash"]
if self.chain else "genesis"
)
event = {
"event_id": str(uuid.uuid4()),
"event_type": event_type,
"timestamp": datetime.utcnow().isoformat(),
"details": details,
"previous_hash": previous_hash,
}
# Compute event hash (includes previous hash for chaining)
event_data = json.dumps(event, sort_keys=True)
event["event_hash"] = hashlib.sha256(
event_data.encode()
).hexdigest()
self.chain.append(event)
self._persist_event(event)
return event
def verify_chain(self):
"""
Verify the integrity of the audit chain.
If any event has been tampered with, the chain breaks.
"""
if not self.chain:
return {"valid": True, "length": 0}
for i, event in enumerate(self.chain):
# Check previous hash linkage
if i == 0:
expected_prev = "genesis"
else:
expected_prev = self.chain[i - 1]["event_hash"]
if event["previous_hash"] != expected_prev:
return {
"valid": False,
"break_at": i,
"event_id": event["event_id"],
"reason": "Previous hash mismatch",
}
# Verify event hash
event_copy = dict(event)
stored_hash = event_copy.pop("event_hash")
computed_hash = hashlib.sha256(
json.dumps(event_copy, sort_keys=True).encode()
).hexdigest()
if computed_hash != stored_hash:
return {
"valid": False,
"break_at": i,
"event_id": event["event_id"],
"reason": "Event hash mismatch (tampered)",
}
return {"valid": True, "length": len(self.chain)}
def get_events_for_dataset(self, dataset_name):
"""Get all audit events for a specific dataset."""
return [
e for e in self.chain
if e["details"].get("dataset") == dataset_name
]
def generate_compliance_report(self, dataset_name, version):
"""
Generate a compliance report showing the full chain
of custody for a dataset version.
"""
events = self.get_events_for_dataset(dataset_name)
version_events = [
e for e in events
if e["details"].get("version") == version
or e["details"].get("output_version") == version
]
report = {
"dataset": dataset_name,
"version": version,
"generated_at": datetime.utcnow().isoformat(),
"chain_valid": self.verify_chain()["valid"],
"events": version_events,
"summary": {
"total_events": len(version_events),
"event_types": Counter(
e["event_type"] for e in version_events
),
},
}
return report
def _load_chain(self):
"""Load the audit chain from disk."""
chain = []
for path in sorted(self.audit_dir.glob("*.json")):
with open(path) as f:
chain.append(json.load(f))
return chain
def _persist_event(self, event):
"""Save an audit event to disk."""
# Use timestamp + event_id for filename ordering
filename = f"{event['timestamp']}_{event['event_id'][:8]}.json"
path = self.audit_dir / filename
with open(path, "w") as f:
json.dump(event, f, indent=2)
Complete Versioning Pipeline
Putting It All Together
class DataVersioningPipeline:
"""
Complete data versioning pipeline that integrates:
- Content-addressable storage
- Change detection
- Lineage tracking
- Training run binding
- Integrity verification
- Audit logging
"""
def __init__(self, store_path):
self.store = ContentAddressableStore(store_path)
self.lineage = LineageTracker(store_path)
self.run_tracker = TrainingRunTracker(self.store)
self.verifier = IntegrityVerifier(self.store)
self.audit = AuditTrail(store_path)
self.differ = DatasetDiffer(self.store)
def ingest_dataset(self, source_dir, name, version,
description=""):
"""Ingest a new dataset version."""
# Store the data
manifest = self.store.store_dataset(
source_dir, name, version
)
# Log the ingestion
self.audit.log_event("dataset_ingested", {
"dataset": name,
"version": version,
"file_count": manifest["file_count"],
"total_size": manifest["total_size"],
"manifest_hash": manifest["manifest_hash"],
"description": description,
})
# Verify integrity immediately
report = self.verifier.verify_version(name, version)
if not report["passed"]:
raise RuntimeError(
f"Integrity verification failed: {report}"
)
return manifest
def transform_dataset(self, name, input_version,
output_version, transform_fn,
transform_type, parameters):
"""
Apply a transformation and track it.
"""
import time
# Restore input version
input_dir = Path(f"/tmp/dvp_input_{input_version}")
self.store.restore_dataset(name, input_version, input_dir)
# Apply transformation
output_dir = Path(f"/tmp/dvp_output_{output_version}")
start = time.time()
input_count, output_count = transform_fn(
input_dir, output_dir
)
duration = time.time() - start
# Store output
self.ingest_dataset(output_dir, name, output_version)
# Record lineage
self.lineage.record_transform(
transform_type=transform_type,
input_version=input_version,
output_version=output_version,
parameters=parameters,
input_count=input_count,
output_count=output_count,
duration=duration,
)
# Audit log
self.audit.log_event("dataset_transformed", {
"dataset": name,
"input_version": input_version,
"output_version": output_version,
"transform_type": transform_type,
"rows_in": input_count,
"rows_out": output_count,
})
return {
"input_count": input_count,
"output_count": output_count,
"duration": duration,
}
def start_training(self, run_id, model_name, datasets,
hyperparameters, hardware, git_commit):
"""Start a training run with full data binding."""
# Verify all dataset versions before training
for name, version in datasets.items():
report = self.verifier.verify_version(name, version)
if not report["passed"]:
raise RuntimeError(
f"Dataset {name} v{version} failed "
f"integrity check: {report}"
)
manifest = self.run_tracker.start_run(
run_id, model_name, datasets,
hyperparameters, hardware, git_commit,
)
self.audit.log_event("training_started", {
"run_id": run_id,
"model": model_name,
"datasets": datasets,
})
return manifest
def debug_regression(self, run_a, run_b):
"""
Debug a regression between two training runs.
Returns a complete analysis of what changed.
"""
comparison = self.run_tracker.compare_runs(run_a, run_b)
# For each data change, get detailed diff
for name, change in comparison["data_changes"].items():
if change.get("run_a") and change.get("run_b"):
detailed = self.differ.diff_detailed(
name, change["run_a"], change["run_b"]
)
change["detailed_diff"] = detailed
# Get lineage for both versions
lineage_a = self.lineage.get_lineage_summary(
change["run_a"]
)
lineage_b = self.lineage.get_lineage_summary(
change["run_b"]
)
change["lineage_a"] = lineage_a
change["lineage_b"] = lineage_b
return comparison
Data Versioning Overhead per Operation
| Metric | Hash 1GB | Hash 100GB | Store 1GB | Store 100GB | Diff (metadata) | Diff (detailed) | Verify 100GB |
|---|---|---|---|---|---|---|---|
| Time (seconds) |
Key Takeaways
Data versioning is infrastructure, not overhead. When a $3 million training run regresses, the ability to diff dataset versions and identify the root cause in minutes rather than days pays for the versioning system many times over.
The critical components:
-
Content-addressable storage: Every dataset version is identified by a cryptographic hash. This provides integrity, deduplication, and immutability. The hash is the version.
-
Multi-level diffing: File-level diffs are fast (compare hashes). Schema-level and statistical-level diffs catch subtle changes. Row-level diffs pinpoint exact modifications but are expensive for large datasets.
-
Lineage tracking: Record every transformation applied to the data. When debugging, trace the full pipeline from raw source to final training data. The lineage DAG is the data equivalent of a stack trace.
-
Training run binding: Every training run manifest records exact dataset versions, hyperparameters, code commit, and hardware. The
compare_runsfunction is the primary debugging tool. -
Audit trails: Chain-linked event logs provide tamper-evident records for compliance. Every dataset operation is logged with enough detail to reconstruct the full chain of custody.
The cost of versioning is small relative to training: storing two versions of a 10TB dataset with content-addressable deduplication typically adds 10-20% storage overhead (only changed files are duplicated). The compute cost of hashing is in dataset size, approximately 1GB/s on modern hardware. For a 15TB pretraining dataset, the full hash takes about 4 hours — negligible compared to the weeks of GPU time for training.