GPT-4โs RLHF training used 1.4 million preference labels collected from contractors paid 8 million. Scale AI charges 500K. The cost and quality of human annotation determines whether your RLHF run produces a usable assistant or a model that refuses too often or refuses too rarely.
This post covers the engineering of annotation systems: platform architectures, quality control mechanisms, cost modeling, and the implementation of a custom annotation pipeline.
Platform Architecture Overview
Core Components
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
import hashlib
import json
class TaskType(Enum):
PREFERENCE_RANKING = "preference_ranking"
LIKERT_SCALE = "likert_scale"
CLASSIFICATION = "classification"
SPAN_ANNOTATION = "span_annotation"
FREE_TEXT = "free_text"
PAIRWISE_COMPARISON = "pairwise_comparison"
MULTI_TURN_DIALOG = "multi_turn_dialog"
class AnnotatorTier(Enum):
CROWD = "crowd"
TRAINED = "trained"
EXPERT = "expert"
DOMAIN_SPECIALIST = "domain_specialist"
@dataclass
class AnnotationTask:
"""A single annotation task."""
task_id: str
task_type: TaskType
prompt: str
content_to_annotate: dict
instructions: str
required_tier: AnnotatorTier
estimated_time_s: int
payment_usd: float
created_at: datetime = field(
default_factory=datetime.now
)
priority: int = 0
@dataclass
class Annotation:
"""A single annotation response."""
annotation_id: str
task_id: str
annotator_id: str
label: dict
confidence: float
time_spent_s: int
submitted_at: datetime = field(
default_factory=datetime.now
)
quality_score: float = 0.0
@dataclass
class Annotator:
"""Annotator profile with quality history."""
annotator_id: str
tier: AnnotatorTier
specializations: list = field(default_factory=list)
total_tasks: int = 0
agreement_rate: float = 0.0
avg_speed_s: float = 0.0
quality_score: float = 0.5
active: bool = True
class AnnotationPlatform:
"""
Core annotation platform with task routing,
quality control, and annotator management.
Architecture:
1. Task Queue: prioritized queue of pending tasks
2. Router: assigns tasks to qualified annotators
3. Quality Engine: validates annotations in real-time
4. Aggregator: combines multiple annotations per task
5. Payment System: tracks time and pays annotators
"""
def __init__(self, config):
self.tasks = {}
self.annotations = {}
self.annotators = {}
self.redundancy = config.get("redundancy", 3)
self.min_agreement = config.get(
"min_agreement", 0.6
)
self.gold_ratio = config.get("gold_ratio", 0.05)
self.quality_engine = QualityEngine(config)
self.router = TaskRouter(config)
def submit_task(self, task):
"""Add a task to the annotation queue."""
self.tasks[task.task_id] = task
return task.task_id
def submit_batch(self, tasks):
"""Submit a batch of tasks."""
task_ids = []
for task in tasks:
self.submit_task(task)
task_ids.append(task.task_id)
return task_ids
def get_next_task(self, annotator_id):
"""
Get the next task for an annotator.
Routing considers:
1. Annotator tier vs task required tier
2. Annotator specializations
3. Task priority
4. Gold standard tasks (quality checks)
"""
annotator = self.annotators.get(annotator_id)
if annotator is None:
return None
return self.router.route(annotator, self.tasks)
def submit_annotation(self, annotation):
"""
Submit an annotation and run quality checks.
"""
task_id = annotation.task_id
# Quality check
quality_result = (
self.quality_engine.check_annotation(
annotation, self.tasks.get(task_id)
)
)
annotation.quality_score = quality_result["score"]
if task_id not in self.annotations:
self.annotations[task_id] = []
self.annotations[task_id].append(annotation)
# Check if enough annotations for aggregation
if (
len(self.annotations[task_id]) >= self.redundancy
):
self._aggregate(task_id)
return quality_result
def _aggregate(self, task_id):
"""
Aggregate multiple annotations into a final label.
"""
annotations = self.annotations[task_id]
task = self.tasks[task_id]
if task.task_type == TaskType.PAIRWISE_COMPARISON:
return self._aggregate_pairwise(annotations)
elif task.task_type == TaskType.LIKERT_SCALE:
return self._aggregate_likert(annotations)
else:
return self._aggregate_majority(annotations)
def _aggregate_pairwise(self, annotations):
"""Aggregate pairwise comparison annotations."""
votes = {}
for ann in annotations:
choice = ann.label.get("preferred", "")
weight = ann.quality_score
votes[choice] = votes.get(choice, 0) + weight
if not votes:
return None
winner = max(votes, key=votes.get)
total = sum(votes.values())
confidence = votes[winner] / total if total else 0
return {
"winner": winner,
"confidence": confidence,
"vote_distribution": votes,
}
def _aggregate_likert(self, annotations):
"""Aggregate Likert scale annotations."""
import numpy as np
scores = [
ann.label.get("score", 0) for ann in annotations
]
weights = [ann.quality_score for ann in annotations]
weighted_mean = np.average(scores, weights=weights)
std = np.std(scores)
return {
"score": float(weighted_mean),
"std": float(std),
"n_annotators": len(annotations),
}
def _aggregate_majority(self, annotations):
"""Aggregate by weighted majority vote."""
votes = {}
for ann in annotations:
label = json.dumps(ann.label, sort_keys=True)
weight = ann.quality_score
votes[label] = votes.get(label, 0) + weight
if not votes:
return None
winner = max(votes, key=votes.get)
return {"label": json.loads(winner)}
Annotation Platform Comparison
| Platform | Annotator Pool | Quality Control | Min Task Price | Typical IAA | Best For |
|---|---|---|---|---|---|
| Scale AI | 100K+ managed | ML-assisted + gold tasks | $0.10 | 0.70-0.85 | High-volume structured tasks |
| Surge AI | 5K linguists | Expert review + calibration | $0.50 | 0.80-0.92 | Nuanced language tasks |
| Amazon MTurk | 500K+ crowd | Requestor-managed | $0.01 | 0.40-0.65 | Simple classification |
| Labelbox | Self-managed | Consensus + review | Self-hosted | Varies | Custom workflows |
| Custom pipeline | Your team | Full control | Variable | 0.75-0.95 | Specialized domains |
Quality Control Mechanisms
Gold Tasks, Honeypots, and Agreement Metrics
import numpy as np
from collections import defaultdict
class QualityEngine:
"""
Real-time quality control for annotations.
Mechanisms:
1. Gold standard tasks: pre-labeled tasks mixed in
to measure annotator accuracy
2. Honeypot tasks: deliberately tricky tasks that
test attention
3. Speed checks: flag impossibly fast completions
4. Consistency checks: compare annotator's labels
across similar tasks
5. Inter-annotator agreement: measure consensus
"""
def __init__(self, config):
self.gold_tasks = {}
self.annotator_stats = defaultdict(
lambda: {
"gold_correct": 0,
"gold_total": 0,
"avg_time": 0.0,
"total_annotations": 0,
}
)
self.min_time_s = config.get("min_time_s", 5)
self.max_time_s = config.get("max_time_s", 600)
self.gold_accuracy_threshold = config.get(
"gold_threshold", 0.7
)
def check_annotation(self, annotation, task):
"""
Run all quality checks on an annotation.
Returns a quality score and any flags.
"""
flags = []
score = 1.0
# Speed check
if annotation.time_spent_s < self.min_time_s:
flags.append("too_fast")
score *= 0.3
elif annotation.time_spent_s > self.max_time_s:
flags.append("too_slow")
score *= 0.8
# Gold standard check
if task and task.task_id in self.gold_tasks:
gold_label = self.gold_tasks[task.task_id]
is_correct = self._check_gold(
annotation.label, gold_label
)
stats = self.annotator_stats[
annotation.annotator_id
]
stats["gold_total"] += 1
if is_correct:
stats["gold_correct"] += 1
else:
flags.append("gold_incorrect")
score *= 0.5
# Check cumulative gold accuracy
gold_acc = (
stats["gold_correct"] / stats["gold_total"]
)
if (
stats["gold_total"] >= 10
and gold_acc < self.gold_accuracy_threshold
):
flags.append("below_gold_threshold")
score *= 0.3
# Update running stats
stats = self.annotator_stats[
annotation.annotator_id
]
stats["total_annotations"] += 1
n = stats["total_annotations"]
stats["avg_time"] = (
stats["avg_time"] * (n - 1)
+ annotation.time_spent_s
) / n
return {
"score": score,
"flags": flags,
"annotator_gold_accuracy": (
stats["gold_correct"] / max(
stats["gold_total"], 1
)
),
}
def _check_gold(self, submitted, gold):
"""Check if submitted label matches gold standard."""
if isinstance(gold, dict):
return submitted == gold
return str(submitted) == str(gold)
def compute_inter_annotator_agreement(
self, task_annotations
):
"""
Compute Cohen's Kappa and Krippendorff's Alpha
for a set of annotations.
Cohen's Kappa: pairwise agreement corrected for chance.
Krippendorff's Alpha: multi-annotator agreement metric
that handles missing data.
"""
# Collect all annotations per task
task_labels = defaultdict(dict)
all_labels = set()
for task_id, annotations in task_annotations.items():
for ann in annotations:
task_labels[task_id][
ann.annotator_id
] = json.dumps(ann.label, sort_keys=True)
all_labels.add(
json.dumps(ann.label, sort_keys=True)
)
# Cohen's Kappa (pairwise)
annotator_ids = set()
for annotations in task_annotations.values():
for ann in annotations:
annotator_ids.add(ann.annotator_id)
annotator_ids = list(annotator_ids)
kappas = []
for i in range(len(annotator_ids)):
for j in range(i + 1, len(annotator_ids)):
a1, a2 = annotator_ids[i], annotator_ids[j]
kappa = self._cohens_kappa(
task_labels, a1, a2
)
if kappa is not None:
kappas.append(kappa)
avg_kappa = (
float(np.mean(kappas)) if kappas else 0.0
)
# Krippendorff's Alpha
alpha = self._krippendorffs_alpha(
task_labels, annotator_ids
)
return {
"cohens_kappa_avg": avg_kappa,
"krippendorffs_alpha": alpha,
"n_annotators": len(annotator_ids),
"n_tasks": len(task_labels),
}
def _cohens_kappa(self, task_labels, a1, a2):
"""Compute Cohen's Kappa between two annotators."""
shared_tasks = [
tid for tid in task_labels
if a1 in task_labels[tid] and a2 in task_labels[tid]
]
if len(shared_tasks) < 10:
return None
agree = 0
for tid in shared_tasks:
if task_labels[tid][a1] == task_labels[tid][a2]:
agree += 1
p_o = agree / len(shared_tasks)
# Expected agreement by chance
a1_labels = [task_labels[tid][a1] for tid in shared_tasks]
a2_labels = [task_labels[tid][a2] for tid in shared_tasks]
label_set = set(a1_labels + a2_labels)
p_e = 0.0
n = len(shared_tasks)
for label in label_set:
p1 = a1_labels.count(label) / n
p2 = a2_labels.count(label) / n
p_e += p1 * p2
if p_e == 1.0:
return 1.0
kappa = (p_o - p_e) / (1.0 - p_e)
return kappa
def _krippendorffs_alpha(self, task_labels,
annotator_ids):
"""
Compute Krippendorff's Alpha for multiple annotators.
Simplified version for nominal data.
"""
# Build reliability matrix
all_pairs = []
for tid in task_labels:
labels = list(task_labels[tid].values())
for i in range(len(labels)):
for j in range(i + 1, len(labels)):
all_pairs.append(
(labels[i], labels[j])
)
if not all_pairs:
return 0.0
# Observed disagreement
d_o = sum(
1 for a, b in all_pairs if a != b
) / len(all_pairs)
# Expected disagreement
all_labels_flat = []
for tid in task_labels:
all_labels_flat.extend(task_labels[tid].values())
label_counts = defaultdict(int)
for label in all_labels_flat:
label_counts[label] += 1
n_total = len(all_labels_flat)
d_e = 1.0 - sum(
(c * (c - 1))
for c in label_counts.values()
) / (n_total * (n_total - 1))
if d_e == 0:
return 1.0
alpha = 1.0 - d_o / d_e
return float(alpha)
Inter-annotator agreement (IAA) below 0.6 (Cohenโs Kappa) indicates the task definition is ambiguous. Do not blame annotators โ fix the instructions. Common fixes: provide more examples, narrow the label set, add explicit edge case guidelines. IAA above 0.8 is achievable for well-defined tasks but may indicate the task is too easy and could be automated.
Task Router
Intelligent Assignment
class TaskRouter:
"""
Route tasks to the most appropriate annotators.
Factors:
1. Tier match: task required tier vs annotator tier
2. Specialization match: domain expertise
3. Quality history: prioritize high-quality annotators
4. Load balancing: distribute work evenly
5. Gold injection: insert gold tasks for quality checks
"""
def __init__(self, config):
self.gold_ratio = config.get("gold_ratio", 0.05)
self.gold_tasks = []
def route(self, annotator, available_tasks):
"""
Select the best task for this annotator.
"""
# Inject gold task periodically
if self._should_inject_gold(annotator):
gold = self._select_gold_task(annotator)
if gold:
return gold
# Filter by tier
eligible = [
t for t in available_tasks.values()
if self._tier_matches(
annotator.tier, t.required_tier
)
]
if not eligible:
return None
# Score each task for this annotator
scored = []
for task in eligible:
score = self._compute_match_score(
annotator, task
)
scored.append((task, score))
scored.sort(key=lambda x: x[1], reverse=True)
return scored[0][0] if scored else None
def _tier_matches(self, annotator_tier, required_tier):
"""Check if annotator tier meets requirement."""
tier_order = {
AnnotatorTier.CROWD: 0,
AnnotatorTier.TRAINED: 1,
AnnotatorTier.EXPERT: 2,
AnnotatorTier.DOMAIN_SPECIALIST: 3,
}
return (
tier_order.get(annotator_tier, 0)
>= tier_order.get(required_tier, 0)
)
def _compute_match_score(self, annotator, task):
"""
Score how well an annotator matches a task.
Higher is better.
"""
score = 0.0
# Quality history (higher quality = more trusted)
score += annotator.quality_score * 2.0
# Task priority
score += task.priority * 0.5
# Specialization match
task_domain = task.content_to_annotate.get(
"domain", ""
)
if task_domain in annotator.specializations:
score += 3.0
return score
def _should_inject_gold(self, annotator):
"""Determine if this annotator should get a gold task."""
import random
return random.random() < self.gold_ratio
def _select_gold_task(self, annotator):
"""Select a gold task appropriate for annotator."""
if not self.gold_tasks:
return None
import random
return random.choice(self.gold_tasks)
Active Learning for Annotation Efficiency
Prioritizing What to Label
class ActiveLearningSelector:
"""
Select which samples to annotate next using
active learning strategies.
Annotating everything is expensive. Active learning
selects the most informative samples first:
- Uncertainty sampling: samples where the model is
most uncertain
- Diversity sampling: samples that cover different
regions of the input space
- Expected model change: samples that would most
change the model if added
"""
def __init__(self, model, embedding_model):
self.model = model
self.embedding_model = embedding_model
def uncertainty_sampling(self, unlabeled_pool, n_select):
"""
Select samples where the model is most uncertain.
For classification: highest entropy in softmax output.
For preference: closest to 50/50 between options.
"""
uncertainties = []
for sample in unlabeled_pool:
# Get model's prediction distribution
logits = self.model.predict_proba(
sample["input"]
)
# Entropy
entropy = -np.sum(
logits * np.log(logits + 1e-10)
)
uncertainties.append(
(sample, entropy)
)
# Sort by uncertainty (highest first)
uncertainties.sort(
key=lambda x: x[1], reverse=True
)
return [s for s, _ in uncertainties[:n_select]]
def diversity_sampling(self, unlabeled_pool,
already_labeled, n_select):
"""
Select samples that are most different from
already-labeled data.
Uses k-means++ style selection in embedding space.
"""
# Embed all samples
unlabeled_embeddings = self.embedding_model.encode([
s["input"] for s in unlabeled_pool
])
labeled_embeddings = None
if already_labeled:
labeled_embeddings = self.embedding_model.encode([
s["input"] for s in already_labeled
])
selected_indices = []
for _ in range(n_select):
if not selected_indices and labeled_embeddings is None:
# First selection: random
idx = np.random.randint(
len(unlabeled_embeddings)
)
else:
# Select point farthest from all
# already-selected points
reference = []
if labeled_embeddings is not None:
reference.append(labeled_embeddings)
if selected_indices:
reference.append(
unlabeled_embeddings[selected_indices]
)
ref_matrix = np.vstack(reference)
distances = np.min(
np.linalg.norm(
unlabeled_embeddings[:, np.newaxis]
- ref_matrix[np.newaxis],
axis=-1,
),
axis=1,
)
# Zero out already-selected
for si in selected_indices:
distances[si] = -1
idx = int(np.argmax(distances))
selected_indices.append(idx)
return [unlabeled_pool[i] for i in selected_indices]
def combined_sampling(self, unlabeled_pool,
already_labeled, n_select,
uncertainty_weight=0.6):
"""
Combined uncertainty + diversity sampling.
Balances informative samples (high uncertainty)
with representative samples (high diversity).
"""
# Get uncertainty scores
uncertainties = {}
for i, sample in enumerate(unlabeled_pool):
logits = self.model.predict_proba(
sample["input"]
)
entropy = -np.sum(
logits * np.log(logits + 1e-10)
)
uncertainties[i] = entropy
# Normalize to [0, 1]
max_ent = max(uncertainties.values()) or 1.0
for k in uncertainties:
uncertainties[k] /= max_ent
# Get diversity scores (distance from labeled set)
unlabeled_embeddings = self.embedding_model.encode([
s["input"] for s in unlabeled_pool
])
if already_labeled:
labeled_embeddings = self.embedding_model.encode([
s["input"] for s in already_labeled
])
distances = np.min(
np.linalg.norm(
unlabeled_embeddings[:, np.newaxis]
- labeled_embeddings[np.newaxis],
axis=-1,
),
axis=1,
)
max_dist = np.max(distances) or 1.0
distances /= max_dist
else:
distances = np.ones(len(unlabeled_pool))
# Combined score
scores = []
for i in range(len(unlabeled_pool)):
score = (
uncertainty_weight * uncertainties[i]
+ (1 - uncertainty_weight) * distances[i]
)
scores.append((i, score))
scores.sort(key=lambda x: x[1], reverse=True)
selected_indices = [s[0] for s in scores[:n_select]]
return [unlabeled_pool[i] for i in selected_indices]
Active Learning vs Random Sampling: Model Accuracy by Annotation Budget
| Metric | 100 | 500 | 1000 | 2500 | 5000 | 10000 |
|---|---|---|---|---|---|---|
| Active learning (combined) | ||||||
| Uncertainty sampling only | ||||||
| Random sampling |
Cost Modeling
Annotation Economics
class AnnotationCostModel:
"""
Model annotation costs for budget planning.
Cost factors:
1. Annotator hourly rate (varies by tier and region)
2. Task complexity (time per annotation)
3. Redundancy (annotations per task)
4. Quality overhead (gold tasks, review, calibration)
5. Platform fees (if using a vendor)
"""
HOURLY_RATES = {
AnnotatorTier.CROWD: 12.0,
AnnotatorTier.TRAINED: 25.0,
AnnotatorTier.EXPERT: 50.0,
AnnotatorTier.DOMAIN_SPECIALIST: 100.0,
}
PLATFORM_FEES = {
"scale_ai": 0.40, # 40% markup
"surge_ai": 0.30, # 30% markup
"mturk": 0.20, # 20% markup
"self_hosted": 0.05, # Infrastructure only
}
def estimate_cost(self, n_tasks, task_type, tier,
redundancy=3, platform="self_hosted"):
"""
Estimate total annotation cost.
"""
# Base time per task (seconds)
time_per_task = self._estimate_time(task_type)
# Hourly rate
hourly_rate = self.HOURLY_RATES[tier]
# Cost per single annotation
cost_per_annotation = (
hourly_rate * time_per_task / 3600.0
)
# Total annotations (tasks * redundancy)
total_annotations = n_tasks * redundancy
# Gold tasks (5% overhead)
gold_overhead = 0.05
total_annotations *= (1 + gold_overhead)
# Base cost
base_cost = total_annotations * cost_per_annotation
# Platform fee
platform_fee = self.PLATFORM_FEES.get(
platform, 0.1
)
total_cost = base_cost * (1 + platform_fee)
# Quality review overhead (10% of tasks need review)
review_cost = (
n_tasks * 0.10 * cost_per_annotation * 2.0
)
total_cost += review_cost
return {
"n_tasks": n_tasks,
"redundancy": redundancy,
"total_annotations": int(total_annotations),
"cost_per_annotation": round(
cost_per_annotation, 3
),
"base_cost": round(base_cost, 2),
"platform_fee": round(
base_cost * platform_fee, 2
),
"review_cost": round(review_cost, 2),
"total_cost": round(total_cost, 2),
"cost_per_task": round(
total_cost / n_tasks, 3
),
}
def _estimate_time(self, task_type):
"""Estimate time in seconds per annotation."""
times = {
TaskType.PAIRWISE_COMPARISON: 45,
TaskType.LIKERT_SCALE: 30,
TaskType.CLASSIFICATION: 15,
TaskType.SPAN_ANNOTATION: 90,
TaskType.FREE_TEXT: 120,
TaskType.PREFERENCE_RANKING: 60,
TaskType.MULTI_TURN_DIALOG: 180,
}
return times.get(task_type, 60)
def compare_platforms(self, n_tasks, task_type, tier,
redundancy=3):
"""Compare costs across platforms."""
platforms = [
"scale_ai", "surge_ai", "mturk", "self_hosted"
]
results = {}
for platform in platforms:
results[platform] = self.estimate_cost(
n_tasks, task_type, tier,
redundancy, platform,
)
return results
Cost Per 10K Preference Annotations by Platform and Tier
| Platform | Crowd Tier | Trained Tier | Expert Tier | Specialist Tier | Typical IAA |
|---|---|---|---|---|---|
| Scale AI | $2,100 | $5,250 | $10,500 | $21,000 | 0.72-0.82 |
| Surge AI | $1,950 | $4,875 | $9,750 | $19,500 | 0.80-0.90 |
| Amazon MTurk | $1,800 | N/A | N/A | N/A | 0.45-0.60 |
| Self-hosted | $945 | $2,363 | $4,725 | $9,450 | 0.75-0.92 |
| LLM-as-Judge | $50 | $50 | $50 | $50 | 0.65-0.80 |
LLM-as-Judge annotation (using GPT-4 or Claude as annotators) costs 20-100x less than human annotation and achieves 0.65-0.80 agreement with human labels. However, LLM judges have systematic biases: preference for longer responses, for responses that mimic their own style, and for responses that hedge. Using LLM judges for bootstrapping followed by human annotation for calibration is the current best practice for cost-constrained projects.
Building Your Own Pipeline
Custom Annotation System
class CustomAnnotationPipeline:
"""
Build a custom annotation pipeline for LLM training data.
Components:
1. Task creation UI (Streamlit/Gradio)
2. Annotation interface
3. Quality monitoring dashboard
4. Data export in training-ready formats
"""
def __init__(self, config):
self.platform = AnnotationPlatform(config)
self.quality_engine = QualityEngine(config)
self.active_learner = ActiveLearningSelector(
model=config.get("model"),
embedding_model=config.get("embedding_model"),
)
self.cost_model = AnnotationCostModel()
def create_preference_batch(self, prompts, responses_a,
responses_b, batch_size=500):
"""
Create a batch of preference comparison tasks.
"""
tasks = []
for i in range(min(batch_size, len(prompts))):
task = AnnotationTask(
task_id=f"pref_{hashlib.md5(prompts[i].encode()).hexdigest()[:8]}_{i}",
task_type=TaskType.PAIRWISE_COMPARISON,
prompt=prompts[i],
content_to_annotate={
"response_a": responses_a[i],
"response_b": responses_b[i],
},
instructions=(
"Compare the two responses to the prompt. "
"Select which response is better based on: "
"1) Accuracy, 2) Helpfulness, "
"3) Harmlessness, 4) Conciseness. "
"If both are equal, select 'Tie'."
),
required_tier=AnnotatorTier.TRAINED,
estimated_time_s=45,
payment_usd=0.15,
)
tasks.append(task)
task_ids = self.platform.submit_batch(tasks)
return {
"batch_id": hashlib.md5(
str(task_ids).encode()
).hexdigest()[:12],
"n_tasks": len(tasks),
"estimated_cost": self.cost_model.estimate_cost(
n_tasks=len(tasks),
task_type=TaskType.PAIRWISE_COMPARISON,
tier=AnnotatorTier.TRAINED,
),
}
def export_for_dpo(self, completed_tasks):
"""
Export completed annotations in DPO training format.
Output: list of (prompt, chosen, rejected) triples
with confidence scores.
"""
dpo_data = []
for task_id, annotations in completed_tasks.items():
task = self.platform.tasks.get(task_id)
if task is None:
continue
aggregated = self.platform._aggregate(task_id)
if aggregated is None:
continue
winner = aggregated.get("winner", "")
confidence = aggregated.get("confidence", 0)
if confidence < 0.6:
continue # Skip low-confidence labels
content = task.content_to_annotate
if winner == "A":
chosen = content["response_a"]
rejected = content["response_b"]
elif winner == "B":
chosen = content["response_b"]
rejected = content["response_a"]
else:
continue # Skip ties
dpo_data.append({
"prompt": task.prompt,
"chosen": chosen,
"rejected": rejected,
"confidence": confidence,
"n_annotators": len(annotations),
})
return dpo_data
Key Takeaways
Data labeling determines the ceiling for RLHF model quality. The annotation platform must solve task routing, quality control, and cost management simultaneously.
The critical decisions:
-
Redundancy of 3-5 is the sweet spot: Single-annotator labels have 15-25% error rates for preference tasks. Three annotators with majority voting reduces errors to 5-10%. Five annotators provides marginal improvement (3-7% error) at 67% higher cost.
-
Gold standard tasks are non-negotiable: Injecting pre-labeled gold tasks at 5% of volume catches annotators who are guessing or rushing. Without gold tasks, quality degrades silently over time. Annotators who fall below 70% gold accuracy should be recalibrated or removed.
-
Active learning reduces costs by 30-50%: Selecting the most informative samples to annotate first (uncertainty + diversity sampling) achieves the same model accuracy with 30-50% fewer annotations compared to random sampling.
-
Inter-annotator agreement below 0.6 means the task is broken: Low IAA is a signal to improve instructions, not to add more annotators. Providing worked examples, explicit edge case guidelines, and calibration sessions brings IAA above 0.8 for most task types.
-
Self-hosted pipelines cost 50-60% less than vendors: The annotation labor is the same cost regardless of platform. Self-hosted pipelines save on platform markup (20-40%) at the cost of engineering effort for quality control infrastructure. For teams annotating more than 50K tasks per month, the engineering investment pays off within 2-3 months.