Reinforcement Learning from Human Feedback requires pairwise preference data: given a prompt and two responses, which response is better? InstructGPT used 33,000 comparison labels from 40 contractors. Llama 2 used 1.4 million binary comparisons. Anthropic’s Constitutional AI uses AI-generated preferences to reduce human annotation. The cost ranges from 25 per comparison (expert domain evaluation).
Scaling from 10K to 1M+ preference pairs while maintaining quality is an engineering problem. This post covers the complete pipeline: annotation platform design, agreement metrics, AI-assisted labeling, active learning for cost-efficient sampling, quality assurance, and the economics of preference data at scale.
Anatomy of a Preference Label
What Gets Labeled
from dataclasses import dataclass
from enum import Enum
from typing import Optional
class PreferenceLabel(Enum):
A_MUCH_BETTER = "A>>"
A_BETTER = "A>"
TIE = "="
B_BETTER = "B>"
B_MUCH_BETTER = "B>>"
@dataclass
class PreferenceExample:
prompt: str
response_a: str
response_b: str
label: Optional[PreferenceLabel] = None
annotator_id: str = ""
annotation_time_seconds: float = 0.0
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
@dataclass
class AnnotationTask:
"""A complete annotation task with guidelines and examples."""
task_id: str
examples: list # List of PreferenceExample
guidelines: str
rubric: dict # Dimension -> weight
@staticmethod
def default_rubric():
"""Standard rubric for general-purpose preference annotation."""
return {
"helpfulness": {
"weight": 0.35,
"description": "Does the response address the user's request?",
},
"correctness": {
"weight": 0.30,
"description": "Is the information factually accurate?",
},
"harmlessness": {
"weight": 0.20,
"description": "Does the response avoid harmful content?",
},
"conciseness": {
"weight": 0.15,
"description": "Is the response appropriately concise?",
},
}
Response Pair Generation
The quality of preference data depends on generating response pairs that are meaningfully different. If both responses are equally good or equally bad, the comparison provides no training signal.
import random
class ResponsePairGenerator:
"""
Generate response pairs that maximize information gain.
Good pairs have one clearly better response.
"""
def __init__(self, model_configs):
"""
Args:
model_configs: List of model configurations with different
temperatures, system prompts, or model sizes.
"""
self.model_configs = model_configs
def generate_pairs(self, prompts, strategy="diverse_models"):
"""
Generate response pairs for annotation.
Strategies:
- diverse_models: Two different models
- temperature_diversity: Same model, different temperatures
- best_of_n: Generate N, pair best vs. worst
- rejection_sampling: Pair accepted vs. rejected samples
"""
pairs = []
if strategy == "diverse_models":
for prompt in prompts:
# Pick two different model configs
config_a, config_b = random.sample(self.model_configs, 2)
response_a = self._generate(prompt, config_a)
response_b = self._generate(prompt, config_b)
pairs.append(PreferenceExample(
prompt=prompt,
response_a=response_a,
response_b=response_b,
))
elif strategy == "temperature_diversity":
for prompt in prompts:
config = random.choice(self.model_configs)
response_a = self._generate(prompt, config, temperature=0.3)
response_b = self._generate(prompt, config, temperature=1.0)
pairs.append(PreferenceExample(
prompt=prompt,
response_a=response_a,
response_b=response_b,
))
elif strategy == "best_of_n":
for prompt in prompts:
config = random.choice(self.model_configs)
responses = [
self._generate(prompt, config, temperature=0.8)
for _ in range(8)
]
# Score with reward model (if available)
scores = self._score_responses(prompt, responses)
best_idx = max(range(len(scores)), key=lambda i: scores[i])
worst_idx = min(range(len(scores)), key=lambda i: scores[i])
pairs.append(PreferenceExample(
prompt=prompt,
response_a=responses[best_idx],
response_b=responses[worst_idx],
metadata={"strategy": "best_of_n", "n": 8},
))
return pairs
def _generate(self, prompt, config, temperature=0.7):
"""Generate a response. Placeholder for actual model inference."""
return f"[Response from {config} at T={temperature}]"
def _score_responses(self, prompt, responses):
"""Score responses with reward model. Placeholder."""
return [random.random() for _ in responses]
Annotation Platform Design
Task Distribution Architecture
import time
import uuid
from collections import defaultdict
class AnnotationPlatform:
"""
Platform for distributing annotation tasks to human annotators.
Supports quality control, inter-annotator agreement, and batching.
"""
def __init__(self, config):
self.config = config
self.tasks = {} # task_id -> AnnotationTask
self.annotations = {} # example_id -> list of annotations
self.annotators = {} # annotator_id -> annotator stats
self.gold_standards = {} # example_id -> known correct label
def create_batch(self, examples, batch_size=50, gold_fraction=0.10):
"""
Create an annotation batch with embedded gold standard questions.
Gold questions have known correct answers and are used to:
1. Measure annotator accuracy in real-time
2. Remove unreliable annotators
3. Calibrate annotation guidelines
"""
# Separate gold and regular examples
num_gold = max(1, int(batch_size * gold_fraction))
num_regular = batch_size - num_gold
# Sample gold examples from known-good pool
gold_examples = random.sample(
list(self.gold_standards.keys()),
min(num_gold, len(self.gold_standards))
)
# Sample regular examples
regular_examples = random.sample(examples, min(num_regular, len(examples)))
# Interleave gold and regular (random positions)
batch = []
gold_positions = set(random.sample(range(batch_size),
len(gold_examples)))
regular_iter = iter(regular_examples)
gold_iter = iter(gold_examples)
for i in range(batch_size):
if i in gold_positions:
example_id = next(gold_iter)
batch.append(('gold', example_id))
else:
try:
example = next(regular_iter)
batch.append(('regular', example))
except StopIteration:
break
return batch
def submit_annotation(self, example_id, annotator_id, label,
time_seconds, dimensions=None):
"""
Record an annotation.
Checks quality against gold standards in real-time.
"""
annotation = {
'example_id': example_id,
'annotator_id': annotator_id,
'label': label,
'time_seconds': time_seconds,
'timestamp': time.time(),
'dimensions': dimensions or {},
}
if example_id not in self.annotations:
self.annotations[example_id] = []
self.annotations[example_id].append(annotation)
# Update annotator stats
if annotator_id not in self.annotators:
self.annotators[annotator_id] = {
'total': 0, 'gold_correct': 0, 'gold_total': 0,
'avg_time': 0.0, 'times': [],
}
stats = self.annotators[annotator_id]
stats['total'] += 1
stats['times'].append(time_seconds)
stats['avg_time'] = sum(stats['times']) / len(stats['times'])
# Check against gold standard
if example_id in self.gold_standards:
stats['gold_total'] += 1
correct_label = self.gold_standards[example_id]
if self._labels_agree(label, correct_label):
stats['gold_correct'] += 1
accuracy = stats['gold_correct'] / stats['gold_total']
if stats['gold_total'] >= 5 and accuracy < 0.60:
return {'status': 'flagged', 'reason': 'low_gold_accuracy',
'accuracy': accuracy}
return {'status': 'accepted'}
def _labels_agree(self, label1, label2):
"""Check if two labels agree (allowing for adjacent-category agreement)."""
# Strict: exact match
if label1 == label2:
return True
# Relaxed: same direction (both prefer A or both prefer B)
a_labels = {PreferenceLabel.A_MUCH_BETTER, PreferenceLabel.A_BETTER}
b_labels = {PreferenceLabel.B_MUCH_BETTER, PreferenceLabel.B_BETTER}
if (label1 in a_labels and label2 in a_labels) or \
(label1 in b_labels and label2 in b_labels):
return True
return False
Gold standard questions are the single most important quality control mechanism. Without them, annotators can achieve 50% accuracy on binary choices by random clicking. A 10% gold rate catches random annotators within 20 tasks. A 5% gold rate catches them within 40 tasks. Below 5%, bad annotators pollute the dataset before detection.
Inter-Annotator Agreement
Measuring Agreement
import numpy as np
from itertools import combinations
class AgreementMetrics:
"""
Compute inter-annotator agreement metrics.
These determine whether the annotation task is well-defined.
"""
def __init__(self, annotations):
"""
Args:
annotations: dict of example_id -> list of labels
"""
self.annotations = annotations
def pairwise_agreement(self):
"""
Raw pairwise agreement: fraction of pairs where annotators agree.
Does not correct for chance agreement.
"""
agreements = 0
total_pairs = 0
for example_id, labels in self.annotations.items():
if len(labels) < 2:
continue
for l1, l2 in combinations(labels, 2):
total_pairs += 1
if l1 == l2:
agreements += 1
return agreements / total_pairs if total_pairs > 0 else 0.0
def cohens_kappa(self, annotator1_labels, annotator2_labels):
"""
Cohen's Kappa: agreement corrected for chance.
kappa = (p_o - p_e) / (1 - p_e)
p_o = observed agreement
p_e = expected agreement by chance
"""
assert len(annotator1_labels) == len(annotator2_labels)
n = len(annotator1_labels)
all_labels = set(annotator1_labels + annotator2_labels)
# Observed agreement
p_o = sum(1 for a, b in zip(annotator1_labels, annotator2_labels)
if a == b) / n
# Expected agreement by chance
p_e = 0.0
for label in all_labels:
p1 = sum(1 for l in annotator1_labels if l == label) / n
p2 = sum(1 for l in annotator2_labels if l == label) / n
p_e += p1 * p2
if p_e == 1.0:
return 1.0
return (p_o - p_e) / (1 - p_e)
def fleiss_kappa(self):
"""
Fleiss' Kappa: multi-annotator agreement corrected for chance.
Generalizes Cohen's Kappa to N annotators.
"""
# Collect all unique labels
all_labels = set()
for labels in self.annotations.values():
all_labels.update(labels)
label_list = sorted(all_labels, key=str)
k = len(label_list)
label_to_idx = {l: i for i, l in enumerate(label_list)}
# Build rating matrix: n_examples x n_categories
# Each cell = number of annotators who assigned that category
examples = [ex_id for ex_id, labels in self.annotations.items()
if len(labels) >= 2]
n = len(examples)
if n == 0:
return 0.0
# Number of annotators per example (assumed constant)
num_annotators = len(self.annotations[examples[0]])
rating_matrix = np.zeros((n, k))
for i, ex_id in enumerate(examples):
for label in self.annotations[ex_id]:
j = label_to_idx[label]
rating_matrix[i, j] += 1
# Fleiss' Kappa computation
N = n
nn = num_annotators
# P_i for each example
P_i = np.zeros(N)
for i in range(N):
P_i[i] = (np.sum(rating_matrix[i] ** 2) - nn) / (nn * (nn - 1))
P_bar = np.mean(P_i)
# p_j for each category
p_j = np.sum(rating_matrix, axis=0) / (N * nn)
P_e = np.sum(p_j ** 2)
if P_e == 1.0:
return 1.0
kappa = (P_bar - P_e) / (1 - P_e)
return kappa
def krippendorff_alpha(self):
"""
Krippendorff's Alpha: handles missing data and ordinal scales.
Most robust agreement metric.
For preference annotation with ordinal labels
(A_MUCH_BETTER, A_BETTER, TIE, B_BETTER, B_MUCH_BETTER),
use ordinal distance function.
"""
# Convert labels to numeric scale
label_to_numeric = {
PreferenceLabel.A_MUCH_BETTER: 0,
PreferenceLabel.A_BETTER: 1,
PreferenceLabel.TIE: 2,
PreferenceLabel.B_BETTER: 3,
PreferenceLabel.B_MUCH_BETTER: 4,
}
# Compute observed and expected disagreement
observed_disagreement = 0.0
total_pairs = 0
for labels in self.annotations.values():
numeric = [label_to_numeric.get(l, 2) for l in labels]
for v1, v2 in combinations(numeric, 2):
observed_disagreement += (v1 - v2) ** 2
total_pairs += 1
if total_pairs == 0:
return 0.0
D_o = observed_disagreement / total_pairs
# Expected disagreement (from marginal distribution)
all_values = []
for labels in self.annotations.values():
all_values.extend(label_to_numeric.get(l, 2) for l in labels)
expected_disagreement = 0.0
n_total = len(all_values)
for v1, v2 in combinations(all_values, 2):
expected_disagreement += (v1 - v2) ** 2
D_e = expected_disagreement / (n_total * (n_total - 1) / 2)
if D_e == 0:
return 1.0
return 1 - D_o / D_e
Agreement Thresholds and Interpretation
| Kappa Range | Interpretation | Action Required |
|---|---|---|
| 0.81 - 1.00 | Almost perfect agreement | Ready for training |
| 0.61 - 0.80 | Substantial agreement | Acceptable with majority voting |
| 0.41 - 0.60 | Moderate agreement | Revise guidelines, add training examples |
| 0.21 - 0.40 | Fair agreement | Task may be too subjective; simplify rubric |
| 0.00 - 0.20 | Slight agreement | Task is broken; redesign completely |
AI-Assisted Labeling
Constitutional AI: Model as Annotator
class AIAssistedLabeler:
"""
Use an LLM to generate preference labels, reducing human annotation cost.
Based on Anthropic's Constitutional AI approach.
"""
def __init__(self, model, principles):
"""
Args:
model: LLM for generating preference judgments
principles: List of principles for the AI to follow
"""
self.model = model
self.principles = principles
def label_preference(self, example):
"""
Generate an AI preference label with explanation.
"""
prompt = self._build_comparison_prompt(example)
response = self.model.generate(prompt, temperature=0.1, max_tokens=500)
label, explanation = self._parse_response(response)
return {
'label': label,
'explanation': explanation,
'confidence': self._estimate_confidence(response),
}
def _build_comparison_prompt(self, example):
"""Build the comparison prompt with principles."""
principles_text = "\n".join(
f"- {p}" for p in self.principles
)
return f"""Compare the following two responses to a user query.
Principles to evaluate by:
{principles_text}
User query: {example.prompt}
Response A:
{example.response_a}
Response B:
{example.response_b}
Which response is better according to the principles above?
Provide your reasoning, then conclude with exactly one of:
VERDICT: A>>B (A much better)
VERDICT: A>B (A somewhat better)
VERDICT: A=B (tie)
VERDICT: B>A (B somewhat better)
VERDICT: B>>A (B much better)"""
def _parse_response(self, response):
"""Extract label and explanation from AI response."""
verdict_map = {
"A>>B": PreferenceLabel.A_MUCH_BETTER,
"A>B": PreferenceLabel.A_BETTER,
"A=B": PreferenceLabel.TIE,
"B>A": PreferenceLabel.B_BETTER,
"B>>A": PreferenceLabel.B_MUCH_BETTER,
}
label = PreferenceLabel.TIE # Default
for verdict_str, verdict_label in verdict_map.items():
if f"VERDICT: {verdict_str}" in response:
label = verdict_label
break
# Everything before the verdict is the explanation
explanation = response.split("VERDICT:")[0].strip()
return label, explanation
def _estimate_confidence(self, response):
"""Estimate confidence from response characteristics."""
# Heuristic: longer explanations with hedging words = lower confidence
hedging_words = ['might', 'perhaps', 'arguably', 'somewhat',
'debatable', 'hard to say', 'close call']
hedging_count = sum(1 for w in hedging_words if w in response.lower())
confidence = max(0.3, 1.0 - hedging_count * 0.1)
return confidence
def batch_label(self, examples, human_agreement_threshold=0.7):
"""
Label a batch, flagging low-confidence examples for human review.
"""
ai_labels = []
needs_human = []
for example in examples:
result = self.label_preference(example)
if result['confidence'] >= human_agreement_threshold:
ai_labels.append((example, result['label']))
else:
needs_human.append(example)
return ai_labels, needs_human
Hybrid Human-AI Pipeline
class HybridAnnotationPipeline:
"""
Combine AI and human annotation for cost-efficient labeling.
Strategy:
1. AI labels everything
2. Humans review low-confidence AI labels
3. Humans spot-check high-confidence AI labels (5% sample)
4. Disagreements go to expert review
"""
def __init__(self, ai_labeler, human_platform, expert_pool):
self.ai = ai_labeler
self.human = human_platform
self.experts = expert_pool
self.stats = defaultdict(int)
def process_batch(self, examples, ai_confidence_threshold=0.75,
human_spot_check_rate=0.05):
"""Process a batch through the hybrid pipeline."""
final_labels = {}
# Step 1: AI labels everything
for example in examples:
ai_result = self.ai.label_preference(example)
example_id = id(example)
if ai_result['confidence'] >= ai_confidence_threshold:
# High confidence: accept with spot-check
if random.random() < human_spot_check_rate:
# Send to human for verification
human_label = self._get_human_label(example)
if self._labels_agree(ai_result['label'], human_label):
final_labels[example_id] = ai_result['label']
self.stats['ai_confirmed'] += 1
else:
# Disagreement: send to expert
expert_label = self._get_expert_label(example)
final_labels[example_id] = expert_label
self.stats['expert_resolved'] += 1
else:
final_labels[example_id] = ai_result['label']
self.stats['ai_accepted'] += 1
else:
# Low confidence: send to human
human_label = self._get_human_label(example)
final_labels[example_id] = human_label
self.stats['human_labeled'] += 1
return final_labels
def _get_human_label(self, example):
"""Get human annotation (placeholder)."""
return PreferenceLabel.TIE
def _get_expert_label(self, example):
"""Get expert annotation (placeholder)."""
return PreferenceLabel.TIE
def _labels_agree(self, l1, l2):
"""Check directional agreement."""
a_set = {PreferenceLabel.A_MUCH_BETTER, PreferenceLabel.A_BETTER}
b_set = {PreferenceLabel.B_MUCH_BETTER, PreferenceLabel.B_BETTER}
return (l1 in a_set and l2 in a_set) or \
(l1 in b_set and l2 in b_set) or \
(l1 == PreferenceLabel.TIE and l2 == PreferenceLabel.TIE)
Cost Comparison: Human-Only vs Hybrid Annotation
| Metric | 10K | 50K | 100K | 500K | 1M |
|---|---|---|---|---|---|
| Human Only ($1.50/pair) | |||||
| Hybrid AI+Human ($0.25/pair) | |||||
| AI Only ($0.02/pair) |
Active Learning for Efficient Annotation
Selecting the Most Informative Examples
Active learning selects examples that provide the most training signal per annotation dollar. Instead of labeling random examples, prioritize examples where the reward model is most uncertain.
class ActiveLearningSelector:
"""
Select examples for annotation that maximize reward model improvement.
"""
def __init__(self, reward_model):
self.reward_model = reward_model
def uncertainty_sampling(self, candidates, budget):
"""
Select examples where the reward model is most uncertain.
Uncertainty = small gap between scores of response A and B.
"""
scored = []
for example in candidates:
score_a = self.reward_model.score(example.prompt, example.response_a)
score_b = self.reward_model.score(example.prompt, example.response_b)
uncertainty = 1.0 / (1.0 + abs(score_a - score_b))
scored.append((uncertainty, example))
# Select top-k by uncertainty
scored.sort(reverse=True, key=lambda x: x[0])
return [example for _, example in scored[:budget]]
def diversity_sampling(self, candidates, budget, embedding_fn):
"""
Select diverse examples using embedding-based clustering.
Ensures coverage of different prompt types.
"""
# Embed all prompts
embeddings = np.array([
embedding_fn(ex.prompt) for ex in candidates
])
# K-means clustering
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=budget, random_state=42)
kmeans.fit(embeddings)
# Select the example closest to each cluster center
selected = []
for cluster_id in range(budget):
cluster_mask = kmeans.labels_ == cluster_id
cluster_indices = np.where(cluster_mask)[0]
cluster_embeddings = embeddings[cluster_indices]
center = kmeans.cluster_centers_[cluster_id]
distances = np.linalg.norm(cluster_embeddings - center, axis=1)
closest_idx = cluster_indices[np.argmin(distances)]
selected.append(candidates[closest_idx])
return selected
def hybrid_sampling(self, candidates, budget, embedding_fn,
uncertainty_weight=0.6, diversity_weight=0.4):
"""
Combine uncertainty and diversity sampling.
"""
# Compute uncertainty scores
uncertainties = []
for example in candidates:
score_a = self.reward_model.score(example.prompt, example.response_a)
score_b = self.reward_model.score(example.prompt, example.response_b)
uncertainties.append(1.0 / (1.0 + abs(score_a - score_b)))
# Compute diversity scores (distance to nearest selected)
embeddings = np.array([embedding_fn(ex.prompt) for ex in candidates])
selected_indices = []
scores = np.array(uncertainties) * uncertainty_weight
for _ in range(budget):
if selected_indices:
# Add diversity bonus: distance to nearest selected
selected_embeds = embeddings[selected_indices]
distances = np.min(
np.linalg.norm(
embeddings[:, None, :] - selected_embeds[None, :, :],
axis=2
),
axis=1,
)
# Normalize distances
if distances.max() > 0:
distances = distances / distances.max()
combined = scores + diversity_weight * distances
else:
combined = scores
# Select highest scoring unselected example
for idx in np.argsort(-combined):
if idx not in selected_indices:
selected_indices.append(idx)
break
return [candidates[i] for i in selected_indices]
Active learning reduces annotation cost by 2-5x compared to random sampling. In practice, 100K actively-selected preference pairs provide equivalent reward model quality to 300K-500K randomly-selected pairs. The overhead of running the reward model for selection is negligible compared to annotation cost.
Quality Assurance Pipeline
Detecting and Removing Bad Annotations
class QualityAssurance:
"""
Detect and remove low-quality annotations.
"""
def __init__(self, annotations, gold_standards):
self.annotations = annotations
self.gold_standards = gold_standards
def annotator_reliability_scores(self):
"""
Compute reliability score for each annotator.
Combines gold accuracy, agreement with majority, and speed metrics.
"""
annotator_stats = defaultdict(lambda: {
'gold_correct': 0, 'gold_total': 0,
'majority_agree': 0, 'majority_total': 0,
'times': [], 'total': 0,
})
for example_id, annots in self.annotations.items():
# Compute majority label
labels = [a['label'] for a in annots]
label_counts = defaultdict(int)
for l in labels:
label_counts[l] += 1
majority_label = max(label_counts, key=label_counts.get)
for annot in annots:
aid = annot['annotator_id']
stats = annotator_stats[aid]
stats['total'] += 1
stats['times'].append(annot['time_seconds'])
# Gold accuracy
if example_id in self.gold_standards:
stats['gold_total'] += 1
if annot['label'] == self.gold_standards[example_id]:
stats['gold_correct'] += 1
# Majority agreement
stats['majority_total'] += 1
if annot['label'] == majority_label:
stats['majority_agree'] += 1
# Compute composite reliability score
reliability = {}
for aid, stats in annotator_stats.items():
gold_acc = (stats['gold_correct'] / stats['gold_total']
if stats['gold_total'] > 0 else 0.5)
majority_rate = (stats['majority_agree'] / stats['majority_total']
if stats['majority_total'] > 0 else 0.5)
avg_time = np.mean(stats['times']) if stats['times'] else 30.0
# Flag suspiciously fast annotations (less than 5 seconds)
speed_penalty = 1.0 if avg_time >= 5.0 else avg_time / 5.0
reliability[aid] = {
'gold_accuracy': gold_acc,
'majority_agreement': majority_rate,
'avg_time_seconds': avg_time,
'speed_penalty': speed_penalty,
'composite_score': (
0.4 * gold_acc +
0.3 * majority_rate +
0.3 * speed_penalty
),
'total_annotations': stats['total'],
}
return reliability
def filter_annotations(self, min_reliability=0.6):
"""
Remove annotations from unreliable annotators.
Re-compute majority labels with remaining annotations.
"""
reliability = self.annotator_reliability_scores()
filtered = {}
removed_count = 0
for example_id, annots in self.annotations.items():
good_annots = [
a for a in annots
if reliability.get(a['annotator_id'], {}).get(
'composite_score', 0
) >= min_reliability
]
if good_annots:
filtered[example_id] = good_annots
else:
removed_count += 1
return filtered, removed_count
def compute_final_labels(self, filtered_annotations, method="majority"):
"""
Compute final labels from filtered annotations.
Methods:
- majority: Simple majority vote
- weighted: Weight by annotator reliability
- dawid_skene: Probabilistic model (EM algorithm)
"""
final_labels = {}
if method == "majority":
for example_id, annots in filtered_annotations.items():
labels = [a['label'] for a in annots]
label_counts = defaultdict(int)
for l in labels:
label_counts[l] += 1
final_labels[example_id] = max(label_counts,
key=label_counts.get)
elif method == "weighted":
reliability = self.annotator_reliability_scores()
for example_id, annots in filtered_annotations.items():
label_scores = defaultdict(float)
for a in annots:
weight = reliability.get(
a['annotator_id'], {}
).get('composite_score', 0.5)
label_scores[a['label']] += weight
final_labels[example_id] = max(label_scores,
key=label_scores.get)
return final_labels
Scaling Economics
Cost Model
SCALING_ECONOMICS = {
"10K_pairs": {
"method": "Human only",
"annotators": 5,
"cost_per_pair": 2.00,
"total_cost": 20_000,
"calendar_time_weeks": 2,
"quality_kappa": 0.72,
},
"100K_pairs": {
"method": "Human + gold QA",
"annotators": 20,
"cost_per_pair": 1.50,
"total_cost": 150_000,
"calendar_time_weeks": 4,
"quality_kappa": 0.68,
},
"500K_pairs": {
"method": "Hybrid AI + human review",
"annotators": 30,
"cost_per_pair": 0.40,
"total_cost": 200_000,
"calendar_time_weeks": 6,
"quality_kappa": 0.65,
},
"1M_pairs": {
"method": "AI primary + human spot-check + active learning",
"annotators": 15,
"cost_per_pair": 0.25,
"total_cost": 250_000,
"calendar_time_weeks": 8,
"quality_kappa": 0.62,
},
}
Scaling Strategy Comparison
| Scale | Method | Cost/Pair | Total Cost | Kappa |
|---|---|---|---|---|
| 10K | Human only | $2.00 | $20K | 0.72 |
| 100K | Human + gold QA | $1.50 | $150K | 0.68 |
| 500K | Hybrid AI + human | $0.40 | $200K | 0.65 |
| 1M+ | AI + spot-check + AL | $0.25 | $250K | 0.62 |
The fundamental tension in RLHF data collection is quality versus scale. Every technique for reducing cost — AI-assisted labeling, larger annotator pools, faster task design — introduces noise. The reward model trained on this data inherits that noise. The optimal strategy is not to minimize cost per label but to maximize reward model quality per dollar spent, which often means spending more per label on a smaller, actively-selected, carefully-verified dataset.