Common Crawl provides 100 TB of raw HTML each month. After extraction, filtering, quality classification, and deduplication, you are left with 0.8 TB of training-ready text — a 99.2% rejection rate. The gap between raw and clean is where model quality lives: GPT-3 trained on unfiltered Common Crawl would score 30-40% on MMLU, while the same architecture trained on curated data scores 70%+. This post implements the complete pipeline in 500 lines of Python — every line is necessary, and every stage has measurable quality impact.
Pipeline Architecture
The pipeline has seven stages, each reducing the data volume:
Stage 1: Download HTML (100 GB raw)
| ~ 100 million pages
v
Stage 2: Extract Text (30 GB text)
| Remove boilerplate, ads, navigation
v
Stage 3: Language Detection (28 GB English text)
| Keep target language(s)
v
Stage 4: Heuristic Filters (15 GB filtered)
| Remove low-quality text
v
Stage 5: Quality Classifier (8 GB high-quality)
| ML-based quality scoring
v
Stage 6: Deduplication (5 GB deduplicated)
| Remove near-duplicates
v
Stage 7: Tokenize + Package (3 GB tokens)
| BPE tokenization -> JSONL
v
Output: Training-ready dataset
Data Volume at Each Pipeline Stage (100M web pages)
(GB)Stage 1: HTML Download
"""
Stage 1: Download HTML pages from Common Crawl WARC files.
Common Crawl provides monthly web crawls in WARC format.
"""
import gzip
import io
from dataclasses import dataclass
@dataclass
class WebPage:
url: str
html: str
content_type: str
status_code: int
crawl_date: str
class WARCReader:
"""Read web pages from Common Crawl WARC files."""
def read_warc(self, warc_path):
"""Iterate over records in a WARC file."""
open_fn = gzip.open if warc_path.endswith('.gz') else open
with open_fn(warc_path, 'rb') as f:
while True:
record = self._read_record(f)
if record is None:
break
if record.get('type') == 'response':
yield self._parse_response(record)
def _read_record(self, f):
"""Read a single WARC record."""
# Read WARC header
header_line = f.readline()
if not header_line:
return None
headers = {}
while True:
line = f.readline().decode('utf-8', errors='replace').strip()
if not line:
break
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip()] = value.strip()
# Read content
content_length = int(headers.get('Content-Length', 0))
content = f.read(content_length)
f.readline() # Read trailing newline
return {
'type': headers.get('WARC-Type', ''),
'url': headers.get('WARC-Target-URI', ''),
'date': headers.get('WARC-Date', ''),
'content': content,
}
def _parse_response(self, record):
"""Parse an HTTP response from a WARC record."""
content = record['content']
try:
# Split HTTP headers from body
header_end = content.find(b'\r\n\r\n')
if header_end == -1:
return None
http_headers = content[:header_end].decode('utf-8', errors='replace')
body = content[header_end + 4:]
# Extract status code
status_line = http_headers.split('\r\n')[0]
status_code = int(status_line.split()[1]) if len(status_line.split()) > 1 else 0
# Extract content type
content_type = ''
for line in http_headers.split('\r\n'):
if line.lower().startswith('content-type:'):
content_type = line.split(':', 1)[1].strip()
break
# Only process HTML
if 'text/html' not in content_type.lower():
return None
html = body.decode('utf-8', errors='replace')
return WebPage(
url=record['url'],
html=html,
content_type=content_type,
status_code=status_code,
crawl_date=record['date'],
)
except Exception:
return None
Stage 2: Text Extraction
"""
Stage 2: Extract clean text from HTML.
Uses trafilatura for main content extraction.
"""
import re
class TextExtractor:
"""Extract clean text from HTML, removing boilerplate."""
def extract(self, html, url=""):
"""Extract main content text from HTML."""
try:
import trafilatura
text = trafilatura.extract(
html,
include_comments=False,
include_tables=True,
no_fallback=False,
favor_recall=False,
)
if text:
return self._clean_text(text)
except Exception:
pass
# Fallback: basic HTML stripping
return self._basic_extract(html)
def _basic_extract(self, html):
"""Basic HTML to text conversion as fallback."""
# Remove script and style elements
html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL | re.IGNORECASE)
html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL | re.IGNORECASE)
# Remove HTML tags
text = re.sub(r'<[^>]+>', ' ', html)
# Decode HTML entities
import html as html_module
text = html_module.unescape(text)
return self._clean_text(text)
def _clean_text(self, text):
"""Clean extracted text."""
# Normalize whitespace
text = re.sub(r'\s+', ' ', text)
# Remove excessive newlines
text = re.sub(r'\n{3,}', '\n\n', text)
# Remove leading/trailing whitespace per line
lines = [line.strip() for line in text.split('\n')]
text = '\n'.join(lines)
return text.strip()
Stage 3: Language Detection
"""
Stage 3: Detect language and filter to target language(s).
"""
class LanguageDetector:
"""Detect the language of a text document."""
def __init__(self, target_languages=None):
self.target_languages = target_languages or ["en"]
try:
import fasttext
self.model = fasttext.load_model("lid.176.bin")
self.method = "fasttext"
except Exception:
self.method = "heuristic"
def detect(self, text):
"""Detect the language of a text."""
if self.method == "fasttext":
return self._detect_fasttext(text)
return self._detect_heuristic(text)
def _detect_fasttext(self, text):
"""Detect language using fastText."""
# fastText expects single line
clean = text.replace('\n', ' ')[:1000]
predictions = self.model.predict(clean, k=3)
labels = predictions[0]
probs = predictions[1]
results = []
for label, prob in zip(labels, probs):
lang = label.replace('__label__', '')
results.append((lang, float(prob)))
return results
def _detect_heuristic(self, text):
"""Simple heuristic language detection."""
# Count common English words
english_words = set([
"the", "is", "at", "which", "on", "a", "an", "and", "or",
"but", "in", "with", "to", "for", "of", "not", "this", "that",
])
words = text.lower().split()[:200]
english_count = sum(1 for w in words if w in english_words)
english_ratio = english_count / max(len(words), 1)
if english_ratio > 0.1:
return [("en", english_ratio)]
return [("unknown", 1.0 - english_ratio)]
def is_target_language(self, text, min_confidence=0.65):
"""Check if text is in a target language."""
results = self.detect(text)
if not results:
return False
top_lang, top_conf = results[0]
return top_lang in self.target_languages and top_conf >= min_confidence
Stage 4: Heuristic Filters
"""
Stage 4: Apply rule-based quality filters.
These are fast, deterministic filters that remove obviously low-quality text.
"""
from dataclasses import dataclass
@dataclass
class FilterResult:
passed: bool
reason: str = ""
class HeuristicFilters:
"""Rule-based quality filters for web text."""
def __init__(self):
self.min_words = 50
self.max_words = 100000
self.min_avg_word_length = 3
self.max_avg_word_length = 15
self.max_symbol_ratio = 0.1
self.max_digit_ratio = 0.2
self.max_uppercase_ratio = 0.3
self.min_alphabetic_ratio = 0.7
self.max_line_length_ratio = 0.3 # Fraction of lines over 1000 chars
self.max_short_line_ratio = 0.7 # Fraction of lines under 5 words
self.max_duplicate_line_ratio = 0.3
def apply_all(self, text):
"""Apply all filters. Returns FilterResult."""
filters = [
self._check_length,
self._check_word_length,
self._check_symbol_ratio,
self._check_alphabetic_ratio,
self._check_line_quality,
self._check_duplicate_lines,
self._check_boilerplate,
self._check_adult_content,
]
for filter_fn in filters:
result = filter_fn(text)
if not result.passed:
return result
return FilterResult(passed=True)
def _check_length(self, text):
words = text.split()
if len(words) < self.min_words:
return FilterResult(False, f"Too short: {len(words)} words")
if len(words) > self.max_words:
return FilterResult(False, f"Too long: {len(words)} words")
return FilterResult(True)
def _check_word_length(self, text):
words = text.split()
avg_len = sum(len(w) for w in words) / max(len(words), 1)
if avg_len < self.min_avg_word_length:
return FilterResult(False, f"Avg word length too short: {avg_len:.1f}")
if avg_len > self.max_avg_word_length:
return FilterResult(False, f"Avg word length too long: {avg_len:.1f}")
return FilterResult(True)
def _check_symbol_ratio(self, text):
symbols = sum(1 for c in text if not c.isalnum() and not c.isspace())
ratio = symbols / max(len(text), 1)
if ratio > self.max_symbol_ratio:
return FilterResult(False, f"Symbol ratio too high: {ratio:.2f}")
return FilterResult(True)
def _check_alphabetic_ratio(self, text):
alpha = sum(1 for c in text if c.isalpha())
ratio = alpha / max(len(text), 1)
if ratio < self.min_alphabetic_ratio:
return FilterResult(False, f"Alphabetic ratio too low: {ratio:.2f}")
return FilterResult(True)
def _check_line_quality(self, text):
lines = text.split('\n')
if not lines:
return FilterResult(True)
# Check for too many very long lines (code dumps, base64)
long_lines = sum(1 for line in lines if len(line) > 1000)
if long_lines / len(lines) > self.max_line_length_ratio:
return FilterResult(False, "Too many long lines")
# Check for too many very short lines (lists, menus)
short_lines = sum(1 for line in lines if len(line.split()) < 5 and line.strip())
non_empty = sum(1 for line in lines if line.strip())
if non_empty > 0 and short_lines / non_empty > self.max_short_line_ratio:
return FilterResult(False, "Too many short lines")
return FilterResult(True)
def _check_duplicate_lines(self, text):
lines = [line.strip() for line in text.split('\n') if line.strip()]
if not lines:
return FilterResult(True)
unique = set(lines)
dup_ratio = 1 - len(unique) / len(lines)
if dup_ratio > self.max_duplicate_line_ratio:
return FilterResult(False, f"Duplicate line ratio: {dup_ratio:.2f}")
return FilterResult(True)
def _check_boilerplate(self, text):
"""Check for common boilerplate patterns."""
boilerplate_phrases = [
"cookie policy", "terms of service", "privacy policy",
"subscribe to our newsletter", "click here to",
"all rights reserved", "powered by wordpress",
"loading...", "please enable javascript",
]
text_lower = text.lower()
boilerplate_count = sum(
1 for phrase in boilerplate_phrases
if phrase in text_lower
)
if boilerplate_count >= 3:
return FilterResult(False, f"Boilerplate detected: {boilerplate_count} phrases")
return FilterResult(True)
def _check_adult_content(self, text):
"""Basic adult content filter."""
# Simplified: check for high density of adult keywords
# Production systems use more sophisticated classifiers
adult_keywords = [
"xxx", "porn", "sex video", "adult content",
"18+", "nsfw", "explicit",
]
text_lower = text.lower()
adult_count = sum(1 for kw in adult_keywords if kw in text_lower)
if adult_count >= 2:
return FilterResult(False, "Adult content detected")
return FilterResult(True)
Filter Pass Rates by Stage (1M web pages)
| Filter | Pass Rate | Cumulative Retention | Primary Rejection Reason |
|---|---|---|---|
| Text extraction | 78% | 78% | Non-HTML content, empty pages |
| Language detection | 62% | 48% | Non-English content |
| Length filter | 85% | 41% | Too short (cookie banners, navigation) |
| Symbol/digit ratio | 92% | 38% | Code dumps, data tables |
| Line quality | 88% | 33% | Menu lists, URL lists |
| Boilerplate | 90% | 30% | Template pages |
| All heuristic filters | - | 30% | - |
Stage 5: Quality Classifier
"""
Stage 5: ML-based quality classification.
Trains a classifier to distinguish high-quality from low-quality text.
"""
import numpy as np
class QualityClassifier:
"""Classify text quality using a lightweight model."""
def __init__(self):
# Feature weights (pre-trained or learned from curated examples)
self.weights = None
self.threshold = 0.5
def extract_features(self, text):
"""Extract quality-indicative features from text."""
words = text.split()
lines = text.split('\n')
sentences = re.split(r'[.!?]+', text)
features = {
# Length features
"word_count": len(words),
"avg_word_length": np.mean([len(w) for w in words]) if words else 0,
"avg_sentence_length": np.mean([len(s.split()) for s in sentences if s.strip()]) if sentences else 0,
# Vocabulary features
"type_token_ratio": len(set(words)) / max(len(words), 1),
"hapax_ratio": sum(1 for w in set(words) if words.count(w) == 1) / max(len(set(words)), 1),
# Structure features
"paragraph_count": text.count('\n\n') + 1,
"avg_paragraph_length": len(words) / (text.count('\n\n') + 1),
"has_headings": 1 if re.search(r'^#+\s', text, re.MULTILINE) else 0,
# Punctuation features
"period_ratio": text.count('.') / max(len(words), 1),
"comma_ratio": text.count(',') / max(len(words), 1),
"question_ratio": text.count('?') / max(len(words), 1),
# Code features
"has_code_blocks": 1 if '```' in text or ' ' in text else 0,
"url_count": len(re.findall(r'https?://', text)),
# Readability (simplified Flesch-Kincaid)
"syllable_ratio": self._estimate_syllables(words) / max(len(words), 1),
}
return features
def _estimate_syllables(self, words):
"""Rough syllable count estimation."""
count = 0
for word in words[:200]: # Sample for speed
word = word.lower()
if len(word) <= 3:
count += 1
else:
# Count vowel groups
vowels = re.findall(r'[aeiouy]+', word)
count += max(1, len(vowels))
return count
def train(self, high_quality_texts, low_quality_texts):
"""Train the quality classifier."""
# Extract features
X_high = [list(self.extract_features(t).values()) for t in high_quality_texts]
X_low = [list(self.extract_features(t).values()) for t in low_quality_texts]
X = np.array(X_high + X_low)
y = np.array([1] * len(X_high) + [0] * len(X_low))
# Normalize features
self.feature_mean = X.mean(axis=0)
self.feature_std = X.std(axis=0) + 1e-8
X_norm = (X - self.feature_mean) / self.feature_std
# Simple logistic regression
# Using gradient descent (no sklearn dependency)
n_features = X_norm.shape[1]
self.weights = np.zeros(n_features)
bias = 0.0
lr = 0.01
for epoch in range(100):
logits = X_norm @ self.weights + bias
probs = 1 / (1 + np.exp(-logits))
grad_w = X_norm.T @ (probs - y) / len(y)
grad_b = (probs - y).mean()
self.weights -= lr * grad_w
bias -= lr * grad_b
self.bias = bias
def predict(self, text):
"""Predict quality score (0-1)."""
features = list(self.extract_features(text).values())
X = np.array(features)
X_norm = (X - self.feature_mean) / self.feature_std
logit = X_norm @ self.weights + self.bias
prob = 1 / (1 + np.exp(-logit))
return float(prob)
def is_high_quality(self, text):
return self.predict(text) >= self.threshold
Stage 6: Deduplication
"""
Stage 6: Near-duplicate detection using MinHash LSH.
"""
import hashlib
import struct
class MinHashDeduplicator:
"""Deduplicate documents using MinHash Locality-Sensitive Hashing."""
def __init__(self, num_hashes=128, num_bands=16, ngram_size=5):
self.num_hashes = num_hashes
self.num_bands = num_bands
self.rows_per_band = num_hashes // num_bands
self.ngram_size = ngram_size
# Generate random hash parameters
import random
random.seed(42)
self.hash_params = [
(random.randint(1, 2**31), random.randint(0, 2**31))
for _ in range(num_hashes)
]
# LSH buckets: band_id -> bucket_hash -> set of doc_ids
self.lsh_buckets = [{} for _ in range(num_bands)]
self.seen_docs = set()
def compute_minhash(self, text):
"""Compute MinHash signature for a document."""
# Extract character n-grams
text_lower = text.lower()
ngrams = set()
for i in range(len(text_lower) - self.ngram_size + 1):
ngrams.add(text_lower[i:i + self.ngram_size])
if not ngrams:
return [0] * self.num_hashes
# Compute minhash values
signature = []
for a, b in self.hash_params:
min_hash = float('inf')
for ngram in ngrams:
h = (a * hash(ngram) + b) % (2**31 - 1)
min_hash = min(min_hash, h)
signature.append(min_hash)
return signature
def is_duplicate(self, doc_id, text, threshold=0.8):
"""Check if a document is a near-duplicate of any seen document."""
signature = self.compute_minhash(text)
# Check LSH buckets for potential matches
is_dup = False
for band_idx in range(self.num_bands):
start = band_idx * self.rows_per_band
end = start + self.rows_per_band
band_signature = tuple(signature[start:end])
band_hash = hash(band_signature)
if band_hash in self.lsh_buckets[band_idx]:
# Potential match found -- verify with full signature comparison
for other_id, other_sig in self.lsh_buckets[band_idx][band_hash]:
similarity = self._jaccard_from_minhash(signature, other_sig)
if similarity >= threshold:
is_dup = True
break
# Add to bucket
if band_hash not in self.lsh_buckets[band_idx]:
self.lsh_buckets[band_idx][band_hash] = []
self.lsh_buckets[band_idx][band_hash].append((doc_id, signature))
if is_dup:
break
self.seen_docs.add(doc_id)
return is_dup
def _jaccard_from_minhash(self, sig1, sig2):
"""Estimate Jaccard similarity from MinHash signatures."""
matches = sum(1 for a, b in zip(sig1, sig2) if a == b)
return matches / len(sig1)
Stage 7: Tokenization and Output
"""
Stage 7: Tokenize and package into training-ready format.
"""
import json
class TokenizerStage:
"""Tokenize text and write to JSONL format."""
def __init__(self, tokenizer_name="meta-llama/Llama-3-8B"):
from transformers import AutoTokenizer
self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)
def tokenize_and_write(self, texts, output_path, max_seq_len=8192):
"""Tokenize texts and write to JSONL."""
with open(output_path, 'w') as f:
for i, text in enumerate(texts):
tokens = self.tokenizer.encode(text, add_special_tokens=False)
# Split into chunks of max_seq_len
for start in range(0, len(tokens), max_seq_len):
chunk = tokens[start:start + max_seq_len]
if len(chunk) < 64: # Skip very short chunks
continue
record = {
"tokens": chunk,
"length": len(chunk),
"source_id": i,
}
f.write(json.dumps(record) + '\n')
The Complete Pipeline
class DataPipeline:
"""
Complete data processing pipeline.
From raw HTML to training-ready tokens.
"""
def __init__(self, output_dir="./output"):
self.extractor = TextExtractor()
self.lang_detector = LanguageDetector(target_languages=["en"])
self.filters = HeuristicFilters()
self.classifier = QualityClassifier()
self.dedup = MinHashDeduplicator()
self.output_dir = output_dir
# Stats
self.stats = {
"total_pages": 0,
"extracted": 0,
"lang_passed": 0,
"filter_passed": 0,
"quality_passed": 0,
"dedup_passed": 0,
"total_tokens": 0,
}
def process_warc(self, warc_path):
"""Process a single WARC file through the full pipeline."""
reader = WARCReader()
results = []
for page in reader.read_warc(warc_path):
if page is None:
continue
self.stats["total_pages"] += 1
# Stage 2: Extract text
text = self.extractor.extract(page.html, page.url)
if not text or len(text) < 100:
continue
self.stats["extracted"] += 1
# Stage 3: Language detection
if not self.lang_detector.is_target_language(text):
continue
self.stats["lang_passed"] += 1
# Stage 4: Heuristic filters
filter_result = self.filters.apply_all(text)
if not filter_result.passed:
continue
self.stats["filter_passed"] += 1
# Stage 5: Quality classification
if not self.classifier.is_high_quality(text):
continue
self.stats["quality_passed"] += 1
# Stage 6: Deduplication
doc_id = hashlib.md5(page.url.encode()).hexdigest()
if self.dedup.is_duplicate(doc_id, text):
continue
self.stats["dedup_passed"] += 1
results.append({
"text": text,
"url": page.url,
"date": page.crawl_date,
})
return results
def process_and_tokenize(self, warc_paths, output_path):
"""Process multiple WARC files and produce tokenized output."""
all_texts = []
for warc_path in warc_paths:
results = self.process_warc(warc_path)
all_texts.extend([r["text"] for r in results])
# Stage 7: Tokenize
tokenizer = TokenizerStage()
tokenizer.tokenize_and_write(all_texts, output_path)
return self.get_report()
def get_report(self):
"""Generate pipeline processing report."""
total = max(self.stats["total_pages"], 1)
return {
"total_pages": self.stats["total_pages"],
"extraction_rate": self.stats["extracted"] / total,
"language_rate": self.stats["lang_passed"] / total,
"filter_rate": self.stats["filter_passed"] / total,
"quality_rate": self.stats["quality_passed"] / total,
"dedup_rate": self.stats["dedup_passed"] / total,
"overall_retention": self.stats["dedup_passed"] / total,
}
Cheap filters should run before expensive ones. Language detection (fast, rules-based) runs before quality classification (requires feature extraction). Deduplication (requires MinHash computation) runs last because it only needs to process the documents that passed all other filters.