Part of Series The Dataset Frontier 14 of 27
1 Synthetic Data Pipelines: Magpie, Nemotron-4, and Generating Training Data at Scale 2 Data Curation at Scale: DCLM, FineWeb-Edu, and the Exact Heuristics That Filter the Web 3 Agent-Based Simulation: Using 10,000 AI Agents to Generate Synthetic Training Data 4 Code Dataset Curation: Deduplication, License Filtering, and Quality Scoring for LLM Training 5 Multilingual Data: Cross-Lingual Transfer, Low-Resource Languages, and Translation Quality 6 Instruction Tuning Data: ShareGPT, OpenAssistant, and Quality Metrics for Alignment 7 Preference Data: Building DPO/RLHF Datasets from Human and AI Feedback 8 Data Mixing: Optimal Proportions of Code, Math, Web, and Books for LLM Training 9 Evaluation Datasets: Building Benchmarks That Actually Measure LLM Capability 10 Data Contamination: Detecting and Preventing Benchmark Leakage in Training Data 11 The Data Scaling Law: How Much Data Is Enough, and What Happens When You Run Out 12 Training a Tokenizer from Scratch: BPE Merge Rules, Vocabulary Optimization, and Compression Ratio 13 Multimodal Training Data: Image-Text Pairs, Video Captioning, and Interleaved Document Formats 14 RLHF Data at Scale: Collecting Millions of Human Preferences with Minimal Cost 15 Building a Decontamination Pipeline: Removing Benchmark Data from Training Corpora 16 Safety Training Data: Red Teaming, Refusal Training, and Building Datasets for Harmless AI 17 Data Versioning and Reproducibility: Tracking What Changed Between Training Runs 18 Domain-Specific Data: Building Medical, Legal, and Financial Training Datasets 19 Data Attribution and Provenance: Tracing Model Outputs Back to Training Examples 20 The Data Flywheel: Using Production Logs to Continuously Improve Training Data 21 Reward Model Training Data: Building Datasets for Math Verification and Code Correctness 22 Long-Context Training Data: Book-Length Documents, Multi-Document QA, and Needle-in-Haystack 23 Agentic Interaction Data: Tool Use Traces, Multi-Step Planning Logs, and Environment Feedback 24 Data Labeling Platforms: Scale AI, Surge AI, and Building Your Own Annotation Pipeline 25 Data Legal Issues: Copyright, Fair Use, Opt-Out, and the Regulatory Landscape for Training Data 26 Data Pipeline at Scale: Spark, Ray, and Processing 15 Trillion Tokens Across 1000 Nodes 27 Building a Data Pipeline: From Raw HTML to Clean Training Tokens in 500 Lines

CLIP was trained on 400 million image-text pairs, but LAION-5B expanded that to 5.85 billion — a 14x increase in scale. The result: LAION-trained models match or beat CLIP on zero-shot ImageNet despite using the same architecture. Multimodal intelligence scales with data volume, but only if the data is clean: LAION’s initial release included 2.1% NSFW content and 8.3% misaligned pairs where captions described unrelated images. The curation pipeline that filters 5.85B raw pairs to 3.2B high-quality pairs is where the value lives.

This post covers each stage in detail, with working code for every component.

Image-Text Pair Collection

Web Crawl: Alt-Text Extraction

The largest source of image-text pairs is the web. HTML img tags contain alt attributes that describe the image content. Common Crawl provides petabytes of archived web pages.

import io
import hashlib
import requests
from dataclasses import dataclass, field
from html.parser import HTMLParser
from urllib.parse import urljoin

@dataclass
class ImageTextPair:
    image_url: str
    alt_text: str
    page_url: str
    image_hash: str = ""
    text_quality_score: float = 0.0
    image_width: int = 0
    image_height: int = 0

class AltTextExtractor(HTMLParser):
    """Extract image-alt text pairs from HTML."""

    def __init__(self, base_url):
        super().__init__()
        self.base_url = base_url
        self.pairs = []
        self.current_context = []  # Surrounding text for context

    def handle_starttag(self, tag, attrs):
        if tag == 'img':
            attr_dict = dict(attrs)
            alt = attr_dict.get('alt', '').strip()
            src = attr_dict.get('src', '')

            if alt and src and len(alt) > 5:
                full_url = urljoin(self.base_url, src)
                self.pairs.append(ImageTextPair(
                    image_url=full_url,
                    alt_text=alt,
                    page_url=self.base_url,
                ))

    def handle_data(self, data):
        text = data.strip()
        if text:
            self.current_context.append(text)

def extract_pairs_from_warc(warc_path):
    """
    Extract image-text pairs from a WARC (Web ARChive) file.
    Common Crawl distributes data in WARC format.
    """
    import warc

    pairs = []
    with warc.open(warc_path) as f:
        for record in f:
            if record['WARC-Type'] != 'response':
                continue

            content_type = record['Content-Type']
            if 'text/html' not in str(content_type):
                continue

            url = record['WARC-Target-URI']
            payload = record.payload.read().decode('utf-8', errors='replace')

            # Split HTTP header from body
            if '\r\n\r\n' in payload:
                _, body = payload.split('\r\n\r\n', 1)
            else:
                continue

            extractor = AltTextExtractor(url)
            try:
                extractor.feed(body)
            except Exception:
                continue

            pairs.extend(extractor.pairs)

    return pairs

LAION and DataComp: Existing Datasets

# LAION-5B: 5.85 billion image-text pairs from Common Crawl
# DataComp: 12.8 billion image-text pairs with quality scores

DATASET_COMPARISON = {
    "LAION-400M": {
        "size": 400_000_000,
        "source": "Common Crawl (subset)",
        "filtering": "CLIP score > 0.28, English language detection",
        "avg_caption_length_words": 12,
        "estimated_noise_rate": 0.30,  # 30% poor alignment
    },
    "LAION-5B": {
        "size": 5_850_000_000,
        "source": "Common Crawl (comprehensive)",
        "filtering": "CLIP score > 0.28, language detection, NSFW filter",
        "avg_caption_length_words": 14,
        "estimated_noise_rate": 0.25,
    },
    "DataComp-1B": {
        "size": 1_280_000_000,
        "source": "Common Crawl (curated subset of 12.8B pool)",
        "filtering": "CLIP score filtering + text-based filtering + image-based filtering",
        "avg_caption_length_words": 16,
        "estimated_noise_rate": 0.15,
    },
}
📊

Image-Text Dataset Comparison

DatasetSizeAvg Caption WordsCLIP Score ThresholdNoise Rate
LAION-400M 400M pairs 12 0.28 ~30%
LAION-5B 5.85B pairs 14 0.28 ~25%
DataComp-1B 1.28B pairs 16 0.30 ~15%
CC12M 12M pairs 20 Manual ~5%
SBU Captions 1M pairs 25 Manual ~3%
Note: Larger datasets have higher noise rates. DataComp achieves low noise through aggressive multi-stage filtering from a 12.8B candidate pool.

Synthetic Caption Generation

Re-Captioning with VLMs

Alt-text from the web is noisy: it contains SEO spam, file names, accessibility placeholders (“image”), and descriptions that do not match the image. Re-captioning uses a vision-language model to generate accurate descriptions.

import torch
from PIL import Image
from transformers import AutoProcessor, AutoModelForCausalLM

class SyntheticCaptionGenerator:
    """
    Generate high-quality captions using a vision-language model.
    This replaces noisy alt-text with accurate descriptions.
    """

    def __init__(self, model_name="llava-hf/llava-v1.6-34b-hf", device="cuda"):
        self.processor = AutoProcessor.from_pretrained(model_name)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=torch.bfloat16,
            device_map="auto",
        )
        self.device = device

    def generate_caption(self, image, caption_type="detailed"):
        """
        Generate a caption for a single image.

        caption_type: "short" (1 sentence), "detailed" (2-4 sentences),
                     "structured" (JSON with objects, actions, scene)
        """
        prompts = {
            "short": "Describe this image in one sentence.",
            "detailed": (
                "Describe this image in detail. Include the main subjects, "
                "their actions, the setting, colors, and any text visible."
            ),
            "structured": (
                "Describe this image as JSON with fields: "
                "main_subject, action, setting, objects, text_visible, mood."
            ),
        }

        prompt = prompts[caption_type]
        inputs = self.processor(
            text=f"<image>\n{prompt}",
            images=image,
            return_tensors="pt",
        ).to(self.device)

        with torch.no_grad():
            output_ids = self.model.generate(
                **inputs,
                max_new_tokens=256,
                temperature=0.2,
                do_sample=True,
            )

        # Decode only the generated tokens
        generated = output_ids[0][inputs['input_ids'].shape[1]:]
        caption = self.processor.decode(generated, skip_special_tokens=True)
        return caption.strip()

    def batch_recaption(self, image_paths, batch_size=16):
        """
        Re-caption a batch of images.
        Returns list of (image_path, original_alt, new_caption).
        """
        results = []
        for i in range(0, len(image_paths), batch_size):
            batch = image_paths[i:i + batch_size]
            images = []
            for path in batch:
                try:
                    img = Image.open(path).convert('RGB')
                    images.append(img)
                except Exception:
                    images.append(None)

            for img, path in zip(images, batch):
                if img is None:
                    results.append((path, "", "ERROR: Could not load image"))
                    continue
                caption = self.generate_caption(img, "detailed")
                results.append((path, "", caption))

        return results

Multi-Granularity Captioning

Production systems generate multiple caption types per image and use different ones for different training stages:

class MultiGranularityCaptioner:
    """
    Generate captions at multiple levels of detail.
    Short captions for contrastive pre-training (CLIP-style).
    Detailed captions for generative training (LLaVA-style).
    """

    def __init__(self, vlm_model):
        self.vlm = vlm_model

    def generate_all_granularities(self, image):
        """Generate short, medium, detailed, and OCR captions."""
        return {
            "short": self.vlm.generate_caption(image, "short"),
            "detailed": self.vlm.generate_caption(image, "detailed"),
            "structured": self.vlm.generate_caption(image, "structured"),
        }

    def create_training_example(self, image_path, captions):
        """
        Create training examples from multi-granularity captions.

        For contrastive learning: use short caption
        For instruction tuning: create Q&A pairs from detailed caption
        """
        qa_pairs = [
            {
                "question": "What is in this image?",
                "answer": captions["short"],
            },
            {
                "question": "Describe this image in detail.",
                "answer": captions["detailed"],
            },
            {
                "question": "What objects are visible in this image?",
                "answer": captions["structured"],
            },
        ]

        return {
            "image": image_path,
            "conversations": qa_pairs,
        }
Performance

Re-captioning 1B images with a 34B VLM requires approximately 8,000 H100-hours. At 3/GPUhour,thiscosts3/GPU-hour, this costs 24,000. The quality improvement over raw alt-text is substantial: CLIP-score alignment improves from 0.28 to 0.35+ average, and downstream VLM accuracy improves 3-5% on benchmarks.

Video Caption Generation

Frame Extraction and Temporal Captioning

Video data adds temporal information: actions, state changes, and narrative flow. The pipeline extracts keyframes, captions them individually, and generates a temporal description.

import cv2
import numpy as np
from typing import Sequence

class VideoCaptionPipeline:
    """
    Generate captions for video content.
    Extracts keyframes, captions each, and creates temporal descriptions.
    """

    def __init__(self, vlm_model, frames_per_clip=8):
        self.vlm = vlm_model
        self.frames_per_clip = frames_per_clip

    def extract_keyframes(self, video_path, method="uniform"):
        """
        Extract keyframes from video.

        Methods:
        - uniform: evenly spaced frames
        - scene_change: frames at scene boundaries
        - motion: frames with significant motion
        """
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        duration = total_frames / fps if fps > 0 else 0

        frames = []
        timestamps = []

        if method == "uniform":
            indices = np.linspace(0, total_frames - 1,
                                  self.frames_per_clip, dtype=int)
            for idx in indices:
                cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
                ret, frame = cap.read()
                if ret:
                    frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
                    timestamps.append(idx / fps)

        elif method == "scene_change":
            prev_hist = None
            frame_idx = 0
            while True:
                ret, frame = cap.read()
                if not ret:
                    break

                # Compute color histogram
                hist = cv2.calcHist([frame], [0, 1, 2], None,
                                    [8, 8, 8], [0, 256, 0, 256, 0, 256])
                hist = cv2.normalize(hist, hist).flatten()

                if prev_hist is not None:
                    # Chi-squared distance between histograms
                    diff = cv2.compareHist(prev_hist, hist,
                                          cv2.HISTCMP_CHISQR)
                    if diff > 50.0:  # Scene change threshold
                        frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
                        timestamps.append(frame_idx / fps)

                prev_hist = hist
                frame_idx += 1

            # Subsample if too many scene changes
            if len(frames) > self.frames_per_clip:
                indices = np.linspace(0, len(frames) - 1,
                                      self.frames_per_clip, dtype=int)
                frames = [frames[i] for i in indices]
                timestamps = [timestamps[i] for i in indices]

        cap.release()

        return frames, timestamps, duration

    def caption_video(self, video_path):
        """
        Generate a temporal caption for a video.
        Returns per-frame captions and a summary.
        """
        frames, timestamps, duration = self.extract_keyframes(
            video_path, method="scene_change"
        )

        # Caption each frame
        frame_captions = []
        for frame, ts in zip(frames, timestamps):
            pil_image = Image.fromarray(frame)
            caption = self.vlm.generate_caption(pil_image, "short")
            frame_captions.append({
                "timestamp": round(ts, 2),
                "caption": caption,
            })

        # Generate temporal summary
        frame_descriptions = "\n".join(
            f"[{fc['timestamp']:.1f}s] {fc['caption']}"
            for fc in frame_captions
        )

        summary_prompt = (
            f"Video duration: {duration:.1f}s\n"
            f"Frame descriptions:\n{frame_descriptions}\n\n"
            f"Write a 2-3 sentence summary of what happens in this video."
        )

        # Use text-only LLM for summary (no image needed)
        summary = self._generate_text_summary(summary_prompt)

        return {
            "video_path": video_path,
            "duration": duration,
            "frame_captions": frame_captions,
            "summary": summary,
        }

    def _generate_text_summary(self, prompt):
        """Generate text summary using LLM."""
        # Placeholder: use any text LLM
        return prompt  # Replace with actual LLM call
📊

Video Captioning Dataset Comparison

DatasetVideosAvg DurationCaption TypeSource
WebVid-10M 10.7M 18s Alt-text Web scrape
HowTo100M 136M clips 4s ASR transcript YouTube
InternVid 7M 12s Generated (ViCLIP) YouTube
Panda-70M 70M 8s Generated (multi-VLM) YouTube
HD-VILA-100M 103M 13s Alt-text + ASR Web scrape
Note: Synthetic captions (InternVid, Panda-70M) are higher quality than raw alt-text or ASR transcripts. ASR captions describe what is said, not what is shown.

Audio Transcription Datasets

Speech-to-Text for Multimodal Training

class AudioTranscriptionPipeline:
    """
    Generate transcriptions for audio data.
    Used for speech-understanding in multimodal models.
    """

    def __init__(self, whisper_model_size="large-v3"):
        import whisper
        self.model = whisper.load_model(whisper_model_size)

    def transcribe_with_timestamps(self, audio_path):
        """
        Transcribe audio with word-level timestamps.
        Returns segments with start/end times and text.
        """
        result = self.model.transcribe(
            audio_path,
            word_timestamps=True,
            language=None,  # Auto-detect
        )

        segments = []
        for segment in result['segments']:
            segments.append({
                'start': segment['start'],
                'end': segment['end'],
                'text': segment['text'].strip(),
                'words': [
                    {
                        'word': w['word'],
                        'start': w['start'],
                        'end': w['end'],
                        'probability': w['probability'],
                    }
                    for w in segment.get('words', [])
                ],
            })

        return {
            'language': result['language'],
            'segments': segments,
            'full_text': result['text'],
        }

    def create_audio_text_pairs(self, audio_dir, output_file):
        """
        Process a directory of audio files into training pairs.
        Pairs: (audio_segment, transcription, timestamps)
        """
        import glob
        import json

        pairs = []
        audio_files = glob.glob(f"{audio_dir}/**/*.mp3", recursive=True)
        audio_files += glob.glob(f"{audio_dir}/**/*.wav", recursive=True)

        for audio_path in audio_files:
            try:
                result = self.transcribe_with_timestamps(audio_path)
                pairs.append({
                    'audio_path': audio_path,
                    'language': result['language'],
                    'segments': result['segments'],
                    'full_text': result['full_text'],
                })
            except Exception as e:
                print(f"Error processing {audio_path}: {e}")
                continue

        with open(output_file, 'w') as f:
            for pair in pairs:
                f.write(json.dumps(pair) + '\n')

        return len(pairs)

Interleaved Document Format

Why Interleaved Data Matters

Web pages naturally interleave text and images. A Wikipedia article has images placed between paragraphs, with captions that reference the surrounding text. Training on interleaved data teaches models to understand images in context, not just in isolation.

import json
from enum import Enum

class ContentType(Enum):
    TEXT = "text"
    IMAGE = "image"
    VIDEO = "video"
    AUDIO = "audio"
    CODE = "code"
    TABLE = "table"

@dataclass
class ContentBlock:
    content_type: ContentType
    content: str  # Text content or media path/URL
    metadata: dict = field(default_factory=dict)

@dataclass
class InterleavedDocument:
    doc_id: str
    source_url: str
    blocks: list  # List of ContentBlock
    metadata: dict = field(default_factory=dict)

class InterleavedDocumentExtractor:
    """
    Extract interleaved text-image documents from HTML.
    Preserves the natural ordering of text and images.
    """

    def __init__(self, min_text_length=50, min_image_size=100):
        self.min_text_length = min_text_length
        self.min_image_size = min_image_size

    def extract_from_html(self, html_content, base_url):
        """
        Parse HTML into an ordered sequence of text and image blocks.
        """
        from bs4 import BeautifulSoup

        soup = BeautifulSoup(html_content, 'html.parser')

        # Remove script, style, nav, footer
        for tag in soup.find_all(['script', 'style', 'nav', 'footer',
                                  'header', 'aside']):
            tag.decompose()

        blocks = []
        current_text = []

        def flush_text():
            text = ' '.join(current_text).strip()
            if len(text) >= self.min_text_length:
                blocks.append(ContentBlock(
                    content_type=ContentType.TEXT,
                    content=text,
                ))
            current_text.clear()

        # Walk the DOM in document order
        for element in soup.body.descendants if soup.body else []:
            if element.name == 'img':
                flush_text()
                src = element.get('src', '')
                alt = element.get('alt', '')
                if src:
                    full_url = urljoin(base_url, src)
                    blocks.append(ContentBlock(
                        content_type=ContentType.IMAGE,
                        content=full_url,
                        metadata={'alt_text': alt},
                    ))

            elif element.name in ('p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6',
                                   'li', 'td', 'th', 'blockquote'):
                text = element.get_text(strip=True)
                if text:
                    current_text.append(text)

        flush_text()

        return InterleavedDocument(
            doc_id=hashlib.md5(base_url.encode()).hexdigest(),
            source_url=base_url,
            blocks=blocks,
        )

    def to_training_format(self, doc):
        """
        Convert to the training format used by multimodal models.

        Format: sequence of text and <image> tokens.
        Example:
        "The Eiffel Tower is 330m tall. <image> Built in 1889, it was..."
        """
        parts = []
        image_paths = []

        for block in doc.blocks:
            if block.content_type == ContentType.TEXT:
                parts.append(block.content)
            elif block.content_type == ContentType.IMAGE:
                parts.append("<image>")
                image_paths.append(block.content)

        return {
            'text': ' '.join(parts),
            'images': image_paths,
            'doc_id': doc.doc_id,
        }

The MMC4 and OBELICS Format

# MMC4 (Multimodal C4): 101M documents with interleaved images
# OBELICS: 141M documents from Common Crawl

INTERLEAVED_DATASETS = {
    "MMC4": {
        "documents": 101_000_000,
        "images": 585_000_000,
        "format": "jsonl with <image> placeholders",
        "avg_images_per_doc": 5.8,
        "filtering": "CLIP similarity between image and surrounding text",
        "total_size_TB": 2.1,
    },
    "OBELICS": {
        "documents": 141_000_000,
        "images": 353_000_000,
        "format": "jsonl with image URLs and text blocks",
        "avg_images_per_doc": 2.5,
        "filtering": "Text quality + image-text alignment + deduplication",
        "total_size_TB": 1.5,
    },
}

def convert_obelics_to_training(obelics_doc):
    """
    Convert OBELICS format to model training format.

    OBELICS stores documents as alternating text/image entries:
    [
        {"type": "text", "content": "..."},
        {"type": "image", "url": "...", "alt": "..."},
        {"type": "text", "content": "..."},
    ]
    """
    text_parts = []
    image_urls = []
    image_idx = 0

    for entry in obelics_doc:
        if entry['type'] == 'text':
            text_parts.append(entry['content'])
        elif entry['type'] == 'image':
            text_parts.append(f"<image_{image_idx}>")
            image_urls.append(entry['url'])
            image_idx += 1

    return {
        'text': '\n'.join(text_parts),
        'images': image_urls,
        'num_images': image_idx,
    }

Quality Filtering for Multimodal Data

Multi-Stage Filtering Pipeline

class MultimodalQualityFilter:
    """
    Multi-stage quality filtering for image-text pairs.
    Each stage eliminates low-quality pairs with increasing sophistication.
    """

    def __init__(self, clip_model_name="ViT-L-14"):
        self.clip_model = None  # Lazy load
        self.clip_model_name = clip_model_name

    def stage1_metadata_filter(self, pair):
        """
        Fast metadata-based filtering. No model inference needed.
        Eliminates 40-60% of pairs.
        """
        # Image size
        if pair.image_width < 150 or pair.image_height < 150:
            return False, "image_too_small"

        # Aspect ratio (reject extreme crops)
        aspect = max(pair.image_width, pair.image_height) / min(pair.image_width, pair.image_height)
        if aspect > 5.0:
            return False, "extreme_aspect_ratio"

        # Text length
        if len(pair.alt_text) < 5 or len(pair.alt_text) > 1000:
            return False, "text_length"

        # Blacklisted patterns in alt text
        blacklist = [
            "click here", "thumbnail", "image", "photo", "picture",
            "untitled", "dsc_", "img_", "screenshot", "logo",
            ".jpg", ".png", ".gif", "http://", "https://",
        ]
        alt_lower = pair.alt_text.lower()
        for pattern in blacklist:
            if alt_lower == pattern or alt_lower.startswith(pattern):
                return False, f"blacklisted:{pattern}"

        return True, "passed"

    def stage2_text_quality(self, pair):
        """
        Text quality filtering using heuristics.
        Eliminates 10-20% of remaining pairs.
        """
        text = pair.alt_text

        # Language detection
        words = text.split()
        if len(words) < 3:
            return False, "too_few_words"
        if len(words) > 100:
            return False, "too_many_words"

        # Repetition check
        unique_words = set(w.lower() for w in words)
        if len(unique_words) / len(words) < 0.5:
            return False, "too_repetitive"

        # Capitalization heuristic (ALL CAPS = spam)
        upper_frac = sum(1 for c in text if c.isupper()) / max(len(text), 1)
        if upper_frac > 0.7 and len(text) > 20:
            return False, "all_caps"

        return True, "passed"

    def stage3_clip_score(self, image, text, threshold=0.28):
        """
        CLIP-based image-text alignment filtering.
        Most expensive stage. Eliminates 10-30% of remaining pairs.
        """
        import open_clip
        if self.clip_model is None:
            self.clip_model, _, self.preprocess = open_clip.create_model_and_transforms(
                self.clip_model_name, pretrained='openai'
            )
            self.tokenizer = open_clip.get_tokenizer(self.clip_model_name)
            self.clip_model.eval()

        image_tensor = self.preprocess(image).unsqueeze(0)
        text_tokens = self.tokenizer([text])

        with torch.no_grad():
            image_features = self.clip_model.encode_image(image_tensor)
            text_features = self.clip_model.encode_text(text_tokens)

            # Normalize
            image_features = image_features / image_features.norm(dim=-1, keepdim=True)
            text_features = text_features / text_features.norm(dim=-1, keepdim=True)

            # Cosine similarity
            score = (image_features @ text_features.T).item()

        if score < threshold:
            return False, f"clip_score:{score:.3f}"
        return True, f"passed:{score:.3f}"

    def stage4_nsfw_detection(self, image):
        """
        NSFW content detection.
        Uses a lightweight classifier on CLIP embeddings.
        """
        # Simplified: use CLIP embedding + linear classifier
        # In production, use dedicated NSFW models
        if self.clip_model is None:
            return True, "skipped"  # Need stage3 first

        image_tensor = self.preprocess(image).unsqueeze(0)
        with torch.no_grad():
            features = self.clip_model.encode_image(image_tensor)

        # Placeholder: real implementation uses a trained classifier
        nsfw_score = 0.0  # Replace with actual classifier
        if nsfw_score > 0.5:
            return False, f"nsfw:{nsfw_score:.3f}"
        return True, "passed"

    def filter_batch(self, pairs_with_images):
        """
        Run all filter stages on a batch of pairs.
        Returns filtered pairs with quality scores.
        """
        results = []
        stage_counts = {
            'input': len(pairs_with_images),
            'stage1_pass': 0,
            'stage2_pass': 0,
            'stage3_pass': 0,
            'stage4_pass': 0,
        }

        for pair, image in pairs_with_images:
            passed, reason = self.stage1_metadata_filter(pair)
            if not passed:
                continue
            stage_counts['stage1_pass'] += 1

            passed, reason = self.stage2_text_quality(pair)
            if not passed:
                continue
            stage_counts['stage2_pass'] += 1

            passed, reason = self.stage3_clip_score(image, pair.alt_text)
            if not passed:
                continue
            stage_counts['stage3_pass'] += 1

            passed, reason = self.stage4_nsfw_detection(image)
            if not passed:
                continue
            stage_counts['stage4_pass'] += 1

            pair.text_quality_score = float(reason.split(':')[1]) if ':' in reason else 1.0
            results.append(pair)

        return results, stage_counts

Filter Stage Survival Rate (DataComp-12.8B Pool)

Metric Raw PoolStage 1: MetadataStage 2: Text QualityStage 3: CLIP ScoreStage 4: NSFW + Dedup
Surviving Pairs
12.8
7.2
5.1
2.3
1.28

Image Deduplication

Perceptual Hashing at Scale

class ImageDeduplicator:
    """
    Remove duplicate and near-duplicate images using perceptual hashing.
    Handles billions of images using locality-sensitive hashing.
    """

    def __init__(self, hash_size=16, threshold=5):
        self.hash_size = hash_size
        self.threshold = threshold  # Maximum Hamming distance for "duplicate"

    def compute_phash(self, image):
        """
        Compute perceptual hash (pHash) for an image.
        Robust to resizing, compression, minor crops.
        """
        # Resize to hash_size+1 x hash_size (for DCT)
        img = image.convert('L')  # Grayscale
        img = img.resize((self.hash_size + 1, self.hash_size), Image.LANCZOS)

        pixels = np.array(img, dtype=np.float64)

        # Compute difference hash (faster than DCT-based pHash)
        diff = pixels[:, 1:] > pixels[:, :-1]
        return diff.flatten()

    def compute_hash_int(self, image):
        """Compute hash as integer for efficient storage and comparison."""
        hash_bits = self.compute_phash(image)
        hash_int = 0
        for bit in hash_bits:
            hash_int = (hash_int << 1) | int(bit)
        return hash_int

    def hamming_distance(self, hash1, hash2):
        """Compute Hamming distance between two hashes."""
        xor = hash1 ^ hash2
        distance = 0
        while xor:
            distance += xor & 1
            xor >>= 1
        return distance

    def deduplicate_batch(self, image_hashes):
        """
        Remove duplicates from a batch of (id, hash) pairs.
        Uses multi-probe LSH for scalability.

        For billions of images, use an approximate nearest neighbor
        index (FAISS, Annoy) on the hash vectors instead.
        """
        seen = {}  # hash -> first_id
        duplicates = set()

        for img_id, img_hash in image_hashes:
            is_dup = False
            # Check exact match first
            if img_hash in seen:
                duplicates.add(img_id)
                is_dup = True
            else:
                # Check near-duplicates (expensive for large sets)
                for existing_hash, existing_id in seen.items():
                    if self.hamming_distance(img_hash, existing_hash) <= self.threshold:
                        duplicates.add(img_id)
                        is_dup = True
                        break

            if not is_dup:
                seen[img_hash] = img_id

        return duplicates
ℹ️ Note

LAION-5B contains approximately 15-20% near-duplicate images. After deduplication, the effective dataset size drops from 5.85B to approximately 4.7B unique image-text pairs. Deduplication improves training efficiency because the model does not waste compute learning the same visual patterns multiple times.

Putting It All Together: Production Pipeline

class MultimodalDataPipeline:
    """
    End-to-end pipeline: crawl -> filter -> caption -> deduplicate -> format.
    """

    def __init__(self, config):
        self.config = config
        self.quality_filter = MultimodalQualityFilter(
            clip_model_name=config.get('clip_model', 'ViT-L-14')
        )
        self.captioner = SyntheticCaptionGenerator(
            model_name=config.get('captioner_model', 'llava-hf/llava-v1.6-34b-hf')
        )
        self.deduplicator = ImageDeduplicator(
            hash_size=config.get('hash_size', 16),
            threshold=config.get('dedup_threshold', 5),
        )

    def process_warc_file(self, warc_path, output_dir):
        """Process a single WARC file through the complete pipeline."""
        # Step 1: Extract image-text pairs
        pairs = extract_pairs_from_warc(warc_path)
        print(f"Extracted {len(pairs)} pairs from {warc_path}")

        # Step 2: Download images and filter
        filtered_pairs = []
        for pair in pairs:
            try:
                resp = requests.get(pair.image_url, timeout=5)
                image = Image.open(io.BytesIO(resp.content)).convert('RGB')
                pair.image_width, pair.image_height = image.size

                # Run quality filters
                passed, _ = self.quality_filter.stage1_metadata_filter(pair)
                if not passed:
                    continue
                passed, _ = self.quality_filter.stage2_text_quality(pair)
                if not passed:
                    continue
                passed, _ = self.quality_filter.stage3_clip_score(
                    image, pair.alt_text
                )
                if not passed:
                    continue

                # Re-caption
                new_caption = self.captioner.generate_caption(image, "detailed")
                pair.alt_text = new_caption

                filtered_pairs.append((pair, image))

            except Exception:
                continue

        print(f"Filtered to {len(filtered_pairs)} pairs")

        # Step 3: Deduplicate
        hashes = []
        for pair, image in filtered_pairs:
            h = self.deduplicator.compute_hash_int(image)
            pair.image_hash = str(h)
            hashes.append((pair.image_url, h))

        duplicates = self.deduplicator.deduplicate_batch(hashes)
        deduped = [(p, img) for p, img in filtered_pairs
                   if p.image_url not in duplicates]

        print(f"After dedup: {len(deduped)} pairs")

        # Step 4: Save
        output_file = os.path.join(output_dir,
                                    os.path.basename(warc_path) + '.jsonl')
        with open(output_file, 'w') as f:
            for pair, image in deduped:
                record = {
                    'image_url': pair.image_url,
                    'caption': pair.alt_text,
                    'source_url': pair.page_url,
                    'image_hash': pair.image_hash,
                    'quality_score': pair.text_quality_score,
                }
                f.write(json.dumps(record) + '\n')

        return len(deduped)
📊

Multimodal Pipeline Throughput and Cost

Pipeline StageThroughputCost per 1M pairsHardware
WARC extraction 100K pairs/min $0.10 32-core CPU
Image download 10K images/min $5 (bandwidth) 100 Mbps
Metadata filtering 500K pairs/min $0.05 CPU
CLIP scoring 5K pairs/min $3.00 1x A100
VLM re-captioning 500 pairs/min $30.00 1x H100
Deduplication (pHash) 50K images/min $0.50 CPU + RAM
Note: VLM re-captioning dominates cost. CLIP filtering should run before re-captioning to minimize the number of images that need expensive VLM inference.

The multimodal data pipeline is a funnel: start with billions of raw image-text pairs from web crawls, progressively filter through metadata checks, text quality, CLIP alignment, NSFW detection, and deduplication, then re-caption the survivors with a VLM. The output is a high-quality dataset where every image-text pair is accurately aligned, unique, and safe. The ratio of raw to final pairs is typically 10:1 to 5:1, making efficient filtering the most important engineering challenge.