GPT-4 scores 86% on the USMLE (medical licensing exam) but still hallucinates drug interactions in 12% of clinical queries. The gap is data: general pretraining includes medical Wikipedia and PubMed abstracts, but it lacks clinical notes, specialist textbooks, and the case-law reasoning patterns that separate accurate medical advice from plausible-sounding nonsense. Domain-specific training requires expert-validated data ($300/hour for physician annotation), regulatory compliance (HIPAA for medical, SOX for financial), and coverage of terminology that appears 1,000x less frequently in web crawls than in domain corpora.
Domain Data Requirements
Quality Thresholds by Domain
from dataclasses import dataclass, field
from enum import Enum
class DomainType(Enum):
MEDICAL = "medical"
LEGAL = "legal"
FINANCIAL = "financial"
GENERAL = "general"
@dataclass
class DomainQualityRequirements:
domain: DomainType
accuracy_threshold: float
source_authority_required: bool
expert_review_required: bool
privacy_regulations: list
recency_requirement: str
annotation_cost_per_hour: float
min_annotator_qualification: str
error_tolerance: str
DOMAIN_REQUIREMENTS = [
DomainQualityRequirements(
domain=DomainType.MEDICAL,
accuracy_threshold=0.99,
source_authority_required=True,
expert_review_required=True,
privacy_regulations=["HIPAA", "GDPR (health data)", "HITECH"],
recency_requirement="Guidelines change yearly; data older "
"than 3 years needs review",
annotation_cost_per_hour=350.0,
min_annotator_qualification="Licensed MD or equivalent",
error_tolerance="Near zero for treatment recommendations. "
"A wrong drug interaction is potentially fatal.",
),
DomainQualityRequirements(
domain=DomainType.LEGAL,
accuracy_threshold=0.98,
source_authority_required=True,
expert_review_required=True,
privacy_regulations=["Attorney-client privilege",
"Court sealing orders", "GDPR"],
recency_requirement="Laws change with each legislative session. "
"Case law: precedent can be overruled.",
annotation_cost_per_hour=400.0,
min_annotator_qualification="JD with bar admission",
error_tolerance="Low. Incorrect legal advice can lead to "
"liability and harm.",
),
DomainQualityRequirements(
domain=DomainType.FINANCIAL,
accuracy_threshold=0.97,
source_authority_required=True,
expert_review_required=True,
privacy_regulations=["SEC regulations", "FINRA rules",
"SOX compliance", "GDPR"],
recency_requirement="Market data stale within hours. "
"Regulatory filings: quarterly cycle.",
annotation_cost_per_hour=300.0,
min_annotator_qualification="CFA or equivalent financial "
"credential",
error_tolerance="Low for regulatory compliance. "
"Moderate for market analysis.",
),
]
Domain Data Source Comparison
| Source | Domain | Size | Quality | Access | Cost |
|---|---|---|---|---|---|
| PubMed abstracts | Medical | 36M articles | High (peer-reviewed) | Free API | Free |
| PubMed Central full text | Medical | 8M articles | High | Free (OA subset) | Free |
| Clinical notes (MIMIC-IV) | Medical | 300K+ notes | High (real) | Credentialed access | Free |
| FDA drug labels | Medical | ~100K labels | High (regulatory) | Free download | Free |
| Court opinions (CourtListener) | Legal | 7M+ opinions | High (primary source) | Free API | Free |
| SEC EDGAR filings | Financial | 20M+ filings | High (regulatory) | Free API | Free |
| Earnings call transcripts | Financial | ~500K calls | High (primary source) | Paid ($5K-50K/yr) | Paid |
| UpToDate medical content | Medical | 12K+ topics | Highest (expert) | Licensed ($500/yr) | Paid |
Medical Data Pipeline
PubMed Abstract Extraction
PubMed contains 36 million biomedical abstracts. The Entrez API provides programmatic access. The challenge is not access but quality filtering: not all papers are equally authoritative, and abstracts alone lack the detailed methodology that makes medical knowledge actionable.
import time
import xml.etree.ElementTree as ET
from typing import Optional
from datetime import datetime
@dataclass
class MedicalArticle:
pmid: str
title: str
abstract: str
authors: list
journal: str
publication_date: str
mesh_terms: list
publication_type: str
doi: Optional[str] = None
is_review: bool = False
is_clinical_trial: bool = False
is_meta_analysis: bool = False
citation_count: int = 0
class PubMedExtractor:
"""
Extract and filter medical literature from PubMed.
Focuses on high-quality, authoritative sources.
"""
BASE_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"
def __init__(self, api_key=None):
self.api_key = api_key
# Rate limit: 10/sec with key, 3/sec without
self.delay = 0.1 if api_key else 0.35
def search(self, query, max_results=10000):
"""
Search PubMed and return PMIDs.
Uses Entrez ESearch API.
"""
params = {
"db": "pubmed",
"term": query,
"retmax": min(max_results, 10000),
"retmode": "json",
"sort": "relevance",
}
if self.api_key:
params["api_key"] = self.api_key
# In production: requests.get(f"{self.BASE_URL}/esearch.fcgi",
# params=params)
# Returns {"esearchresult": {"idlist": ["12345", ...]}}
return []
def fetch_articles(self, pmids):
"""
Fetch full article metadata for a list of PMIDs.
Uses Entrez EFetch API in batches of 200.
"""
articles = []
batch_size = 200
for i in range(0, len(pmids), batch_size):
batch = pmids[i:i + batch_size]
batch_articles = self._fetch_batch(batch)
articles.extend(batch_articles)
time.sleep(self.delay)
return articles
def _fetch_batch(self, pmids):
"""Fetch a batch of articles."""
params = {
"db": "pubmed",
"id": ",".join(pmids),
"retmode": "xml",
"rettype": "abstract",
}
if self.api_key:
params["api_key"] = self.api_key
# In production: response = requests.get(
# f"{self.BASE_URL}/efetch.fcgi", params=params
# )
# Parse XML response
# return self._parse_pubmed_xml(response.text)
return []
def _parse_pubmed_xml(self, xml_text):
"""Parse PubMed XML into MedicalArticle objects."""
root = ET.fromstring(xml_text)
articles = []
for article_elem in root.findall(".//PubmedArticle"):
medline = article_elem.find("MedlineCitation")
if medline is None:
continue
pmid = medline.findtext("PMID", "")
article = medline.find("Article")
if article is None:
continue
title = article.findtext("ArticleTitle", "")
# Extract abstract
abstract_elem = article.find("Abstract")
abstract_parts = []
if abstract_elem is not None:
for text in abstract_elem.findall("AbstractText"):
label = text.get("Label", "")
content = text.text or ""
if label:
abstract_parts.append(f"{label}: {content}")
else:
abstract_parts.append(content)
abstract = " ".join(abstract_parts)
# Extract MeSH terms
mesh_terms = []
mesh_list = medline.find("MeshHeadingList")
if mesh_list is not None:
for heading in mesh_list.findall("MeshHeading"):
descriptor = heading.find("DescriptorName")
if descriptor is not None and descriptor.text:
mesh_terms.append(descriptor.text)
# Extract publication types
pub_types = []
for pt in article.findall(".//PublicationType"):
if pt.text:
pub_types.append(pt.text)
articles.append(MedicalArticle(
pmid=pmid,
title=title,
abstract=abstract,
authors=[],
journal=article.findtext(".//Title", ""),
publication_date="",
mesh_terms=mesh_terms,
publication_type=", ".join(pub_types),
is_review="Review" in pub_types,
is_clinical_trial="Clinical Trial" in " ".join(pub_types),
is_meta_analysis="Meta-Analysis" in pub_types,
))
return articles
def filter_high_quality(self, articles):
"""
Filter articles to high-quality subset.
Priority order:
1. Meta-analyses and systematic reviews (highest evidence)
2. Randomized controlled trials
3. Clinical guidelines
4. Peer-reviewed research
"""
scored = []
for article in articles:
score = self._quality_score(article)
if score >= 0.5:
scored.append((score, article))
scored.sort(key=lambda x: x[0], reverse=True)
return [article for _, article in scored]
def _quality_score(self, article):
"""Score article quality based on multiple signals."""
score = 0.0
# Evidence level
if article.is_meta_analysis:
score += 0.4
elif article.is_clinical_trial:
score += 0.3
elif article.is_review:
score += 0.25
else:
score += 0.1
# Has structured abstract (BACKGROUND/METHODS/RESULTS)
if "METHODS:" in article.abstract.upper():
score += 0.1
if "RESULTS:" in article.abstract.upper():
score += 0.1
# Has MeSH terms (indicates proper indexing)
if len(article.mesh_terms) >= 3:
score += 0.1
# Abstract length (too short = low information)
if len(article.abstract.split()) >= 150:
score += 0.1
elif len(article.abstract.split()) >= 100:
score += 0.05
# Citation count (proxy for importance)
if article.citation_count >= 100:
score += 0.2
elif article.citation_count >= 20:
score += 0.1
return min(score, 1.0)
De-identification for Clinical Notes
Clinical notes contain the richest medical knowledge but also the most sensitive data. HIPAA requires removing 18 categories of Protected Health Information (PHI) before any use.
import re
class PHIDeidentifier:
"""
Remove Protected Health Information (PHI) from clinical text.
Implements HIPAA Safe Harbor method: remove all 18 PHI identifiers.
The 18 HIPAA identifiers:
1. Names
2. Geographic data (smaller than state)
3. Dates (except year) for ages > 89
4. Phone numbers
5. Fax numbers
6. Email addresses
7. Social Security numbers
8. Medical record numbers
9. Health plan beneficiary numbers
10. Account numbers
11. Certificate/license numbers
12. Vehicle identifiers
13. Device identifiers
14. Web URLs
15. IP addresses
16. Biometric identifiers
17. Full-face photographs
18. Any other unique identifying number
"""
def __init__(self):
self.patterns = self._build_patterns()
def _build_patterns(self):
"""Build regex patterns for PHI detection."""
return {
"phone": re.compile(
r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b'
),
"ssn": re.compile(
r'\b\d{3}-\d{2}-\d{4}\b'
),
"email": re.compile(
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.'
r'[A-Za-z]{2,}\b'
),
"mrn": re.compile(
r'\bMRN[:#\s]*\d{6,10}\b', re.IGNORECASE
),
"date_full": re.compile(
r'\b\d{1,2}/\d{1,2}/\d{2,4}\b'
),
"date_written": re.compile(
r'\b(?:January|February|March|April|May|June|'
r'July|August|September|October|November|'
r'December)\s+\d{1,2},?\s+\d{4}\b',
re.IGNORECASE,
),
"ip_address": re.compile(
r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'
),
"url": re.compile(
r'https?://[^\s]+'
),
"zip_code": re.compile(
r'\b\d{5}(-\d{4})?\b'
),
"age_over_89": re.compile(
r'\b(9[0-9]|1[0-9]{2})\s*(?:year|yr|y\.?o\.?)\b',
re.IGNORECASE,
),
}
def deidentify(self, text):
"""
Remove all detected PHI from clinical text.
Replaces PHI with category-specific placeholders.
"""
result = text
replacements = []
for phi_type, pattern in self.patterns.items():
matches = list(pattern.finditer(result))
for match in reversed(matches):
placeholder = f"[{phi_type.upper()}]"
start, end = match.span()
replacements.append({
"type": phi_type,
"original_span": (start, end),
"original_length": end - start,
})
result = result[:start] + placeholder + result[end:]
# Name detection (requires NER model in production)
result = self._deidentify_names(result)
return {
"deidentified_text": result,
"phi_count": len(replacements),
"phi_types_found": list(set(
r["type"] for r in replacements
)),
}
def _deidentify_names(self, text):
"""
Remove person names from text.
In production, this uses a medical NER model
(e.g., BioBERT fine-tuned for PHI detection).
Regex-based heuristics as fallback:
- "Dr. LastName" patterns
- "Patient: FirstName LastName" patterns
"""
# Dr. patterns
text = re.sub(
r'\bDr\.?\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)?',
'[PHYSICIAN_NAME]', text
)
# Patient name patterns
text = re.sub(
r'(?:Patient|Pt)[:\s]+[A-Z][a-z]+\s+[A-Z][a-z]+',
'[PATIENT_NAME]', text
)
return text
def validate_deidentification(self, original, deidentified):
"""
Validate that deidentification was complete.
Run the detector again on the output -- if PHI is
still found, deidentification failed.
"""
result = self.deidentify(deidentified)
remaining_phi = result["phi_count"]
# Also check that no names leaked through
# (names in brackets are okay -- they are placeholders)
return {
"remaining_phi": remaining_phi,
"passed": remaining_phi == 0,
}
De-identification is not a solved problem. Regex-based methods miss 5-15% of PHI instances. Production systems use ensemble methods: regex + NER model + dictionary lookup + manual review. Even with all methods combined, there is residual risk. Always assume some PHI will leak and implement additional safeguards (access controls, audit logging, data use agreements).
Legal Data Pipeline
Case Law Extraction
Legal text has unique structural properties: citations follow specific formats (e.g., “Brown v. Board of Education, 347 U.S. 483 (1954)”), reasoning follows IRAC (Issue, Rule, Application, Conclusion) structure, and precedent relationships create a citation graph that defines the authority of each case.
@dataclass
class LegalCase:
case_id: str
case_name: str
court: str
date_decided: str
citation: str
full_text: str
syllabus: str
holding: str
judges: list
cited_cases: list
jurisdiction: str
practice_area: str
disposition: str
class LegalDataExtractor:
"""
Extract and structure legal case data.
Primary source: CourtListener (Free Law Project).
"""
def __init__(self):
self.citation_pattern = re.compile(
r'(\d+)\s+(U\.S\.|S\.Ct\.|F\.\d+d|F\.Supp\.\d*d?)\s+(\d+)'
)
self.court_hierarchy = {
"scotus": 10, # Supreme Court
"ca1": 8, "ca2": 8, "ca3": 8, # Circuit Courts
"ca4": 8, "ca5": 8, "ca6": 8,
"ca7": 8, "ca8": 8, "ca9": 8,
"ca10": 8, "ca11": 8, "cadc": 8, "cafc": 8,
"district": 5, # District Courts
"state_supreme": 7, # State Supreme Courts
"state_appellate": 4,
}
def extract_cases(self, raw_opinions):
"""
Process raw court opinions into structured cases.
"""
cases = []
for raw in raw_opinions:
case = self._parse_opinion(raw)
if case and self._passes_quality_check(case):
cases.append(case)
return cases
def _parse_opinion(self, raw):
"""Parse a raw opinion into a structured LegalCase."""
# Extract citation references
citations = self.citation_pattern.findall(raw.get("text", ""))
cited_cases = [
f"{vol} {reporter} {page}"
for vol, reporter, page in citations
]
# Extract the holding (core legal rule)
holding = self._extract_holding(raw.get("text", ""))
return LegalCase(
case_id=raw.get("id", ""),
case_name=raw.get("case_name", ""),
court=raw.get("court", ""),
date_decided=raw.get("date_filed", ""),
citation=raw.get("citation", ""),
full_text=raw.get("text", ""),
syllabus=raw.get("syllabus", ""),
holding=holding,
judges=raw.get("judges", "").split(", "),
cited_cases=cited_cases,
jurisdiction=raw.get("jurisdiction", ""),
practice_area=self._classify_practice_area(
raw.get("text", "")
),
disposition=raw.get("disposition", ""),
)
def _extract_holding(self, text):
"""
Extract the holding from an opinion.
Holdings are typically in the last few paragraphs
and contain phrases like "we hold that", "we conclude",
"the judgment is affirmed/reversed".
"""
holding_indicators = [
"we hold that", "we conclude that",
"we therefore hold", "it is ordered",
"the judgment of", "we affirm", "we reverse",
"accordingly, we",
]
paragraphs = text.split("\n\n")
holding_paragraphs = []
for para in paragraphs:
para_lower = para.lower()
if any(ind in para_lower for ind in holding_indicators):
holding_paragraphs.append(para.strip())
return " ".join(holding_paragraphs) if holding_paragraphs else ""
def _classify_practice_area(self, text):
"""Classify the practice area of a case."""
area_keywords = {
"constitutional": ["constitution", "amendment",
"due process", "equal protection"],
"criminal": ["defendant", "guilty", "sentence",
"indictment", "prosecution"],
"civil_rights": ["discrimination", "title vii",
"civil rights", "section 1983"],
"contract": ["contract", "breach", "agreement",
"consideration", "damages"],
"tort": ["negligence", "liability", "injury",
"duty of care", "proximate cause"],
"intellectual_property": ["patent", "copyright",
"trademark", "infringement"],
"tax": ["tax", "irs", "revenue", "deduction",
"internal revenue code"],
}
text_lower = text.lower()
scores = {}
for area, keywords in area_keywords.items():
score = sum(1 for kw in keywords if kw in text_lower)
if score > 0:
scores[area] = score
if scores:
return max(scores, key=scores.get)
return "general"
def _passes_quality_check(self, case):
"""Check if a case meets quality requirements."""
# Must have substantial text
if len(case.full_text.split()) < 500:
return False
# Must have a case name
if not case.case_name:
return False
# Must have a court
if not case.court:
return False
return True
def build_authority_graph(self, cases):
"""
Build a citation graph showing which cases cite which.
Higher-authority cases (more cited by higher courts)
should be weighted more heavily in training.
"""
# Build adjacency list
graph = {}
case_by_citation = {}
for case in cases:
graph[case.case_id] = {
"cited_by": [],
"cites": case.cited_cases,
"court_weight": self.court_hierarchy.get(
case.court, 1
),
}
case_by_citation[case.citation] = case.case_id
# Build reverse edges (who cites this case)
for case in cases:
for cited in case.cited_cases:
if cited in case_by_citation:
cited_id = case_by_citation[cited]
graph[cited_id]["cited_by"].append(case.case_id)
# Compute authority score (PageRank-like)
for case_id, info in graph.items():
citation_count = len(info["cited_by"])
court_weight = info["court_weight"]
info["authority_score"] = (
citation_count * 0.7 + court_weight * 0.3
)
return graph
Financial Data Pipeline
SEC EDGAR Filing Extraction
SEC EDGAR contains every public company filing since 1996. 10-K (annual reports), 10-Q (quarterly reports), 8-K (material events), and proxy statements contain structured financial data alongside narrative management discussion.
@dataclass
class SECFiling:
cik: str # Central Index Key
company_name: str
filing_type: str # 10-K, 10-Q, 8-K, DEF 14A
filing_date: str
accession_number: str
full_text: str
sections: dict # Section name -> text
financial_tables: list # Extracted numerical tables
risk_factors: str
mda: str # Management Discussion & Analysis
class EDGARExtractor:
"""
Extract and structure SEC EDGAR filings.
Handles HTML/SGML parsing, table extraction,
and section identification.
"""
BASE_URL = "https://www.sec.gov/cgi-bin/browse-edgar"
FULL_TEXT_URL = "https://efts.sec.gov/LATEST/search-index"
# 10-K sections defined by Regulation S-K
SECTION_HEADERS_10K = {
"item_1": "Business",
"item_1a": "Risk Factors",
"item_1b": "Unresolved Staff Comments",
"item_2": "Properties",
"item_3": "Legal Proceedings",
"item_4": "Mine Safety Disclosures",
"item_5": "Market for Registrant's Common Equity",
"item_6": "Reserved",
"item_7": "Management's Discussion and Analysis",
"item_7a": "Quantitative and Qualitative Disclosures "
"About Market Risk",
"item_8": "Financial Statements",
"item_9": "Changes in and Disagreements With Accountants",
"item_9a": "Controls and Procedures",
"item_10": "Directors, Executive Officers and "
"Corporate Governance",
"item_11": "Executive Compensation",
}
def extract_filing(self, filing_html):
"""
Extract structured content from a raw SEC filing.
Filings are HTML/SGML with embedded tables.
"""
# Clean HTML
text = self._clean_html(filing_html)
# Extract sections
sections = self._extract_sections(text)
# Extract financial tables
tables = self._extract_tables(filing_html)
# Extract specific high-value sections
risk_factors = sections.get("item_1a", "")
mda = sections.get("item_7", "")
return SECFiling(
cik="",
company_name="",
filing_type="10-K",
filing_date="",
accession_number="",
full_text=text,
sections=sections,
financial_tables=tables,
risk_factors=risk_factors,
mda=mda,
)
def _clean_html(self, html):
"""
Clean SEC filing HTML to plain text.
SEC filings use non-standard HTML from the 1990s-2000s
that modern parsers struggle with.
"""
# Remove HTML tags but preserve structure
text = re.sub(r'<br\s*/?>', '\n', html, flags=re.IGNORECASE)
text = re.sub(r'<p[^>]*>', '\n\n', text, flags=re.IGNORECASE)
text = re.sub(r'</p>', '', text, flags=re.IGNORECASE)
text = re.sub(r'<[^>]+>', '', text)
# Clean up entities
text = text.replace('&', '&')
text = text.replace(' ', ' ')
text = text.replace(' ', ' ')
# Normalize whitespace
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r' {2,}', ' ', text)
return text.strip()
def _extract_sections(self, text):
"""
Extract named sections from a 10-K filing.
Sections are identified by "Item N" headers.
"""
sections = {}
# Pattern: "Item 1." or "ITEM 1." or "Item 1A."
section_pattern = re.compile(
r'(?:^|\n)\s*ITEM\s+(\d+[A-B]?)[\.\s:]+([^\n]+)',
re.IGNORECASE
)
matches = list(section_pattern.finditer(text))
for i, match in enumerate(matches):
item_num = match.group(1).lower()
section_key = f"item_{item_num}"
start = match.end()
end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
section_text = text[start:end].strip()
sections[section_key] = section_text
return sections
def _extract_tables(self, html):
"""
Extract financial tables from HTML.
SEC filings embed XBRL-tagged financial data in tables.
"""
tables = []
# Simple table extraction -- in production use
# a proper HTML parser (lxml/BeautifulSoup)
table_pattern = re.compile(
r'<table[^>]*>(.*?)</table>',
re.DOTALL | re.IGNORECASE
)
for match in table_pattern.finditer(html):
table_html = match.group(1)
rows = self._parse_table_rows(table_html)
if rows and len(rows) >= 2:
tables.append({
"headers": rows[0] if rows else [],
"data": rows[1:] if len(rows) > 1 else [],
"row_count": len(rows) - 1,
})
return tables
def _parse_table_rows(self, table_html):
"""Parse table rows from HTML."""
rows = []
row_pattern = re.compile(
r'<tr[^>]*>(.*?)</tr>',
re.DOTALL | re.IGNORECASE
)
cell_pattern = re.compile(
r'<t[dh][^>]*>(.*?)</t[dh]>',
re.DOTALL | re.IGNORECASE
)
for row_match in row_pattern.finditer(table_html):
cells = []
for cell_match in cell_pattern.finditer(row_match.group(1)):
cell_text = re.sub(r'<[^>]+>', '', cell_match.group(1))
cells.append(cell_text.strip())
if cells:
rows.append(cells)
return rows
Expert Annotation Framework
Annotation Protocol for Domain Data
Domain-specific annotation requires experts, not crowd workers. The annotation protocol must specify exactly what constitutes correct, what constitutes incorrect, and how to handle ambiguous cases.
@dataclass
class AnnotationTask:
task_id: str
domain: DomainType
text: str
task_type: str # "accuracy", "completeness", "safety"
instructions: str
options: list
required_qualifications: list
estimated_time_minutes: int
@dataclass
class Annotation:
task_id: str
annotator_id: str
annotator_qualifications: list
label: str
confidence: float
rationale: str
time_spent_seconds: int
timestamp: str
class ExpertAnnotationManager:
"""
Manage expert annotation for domain-specific data.
Handles task distribution, quality control,
inter-annotator agreement, and adjudication.
"""
def __init__(self, domain, annotators_per_task=3):
self.domain = domain
self.annotators_per_task = annotators_per_task
self.tasks = []
self.annotations = []
def create_accuracy_task(self, text, claim):
"""
Create a factual accuracy annotation task.
Expert must verify whether the claim in the text
is factually correct.
"""
if self.domain == DomainType.MEDICAL:
instructions = (
"Evaluate whether the following medical claim is "
"factually accurate based on current clinical "
"evidence. Consider:\n"
"1. Is the mechanism of action correct?\n"
"2. Are the dosages/frequencies appropriate?\n"
"3. Are the contraindications mentioned?\n"
"4. Is this consistent with current guidelines?\n\n"
"Rate as: ACCURATE, PARTIALLY_ACCURATE, INACCURATE, "
"or OUTDATED (was accurate but guidelines changed)."
)
elif self.domain == DomainType.LEGAL:
instructions = (
"Evaluate whether the following legal statement is "
"accurate. Consider:\n"
"1. Is the legal rule correctly stated?\n"
"2. Is the cited authority valid and current?\n"
"3. Has the precedent been overruled?\n"
"4. Is the jurisdiction-specific context correct?\n\n"
"Rate as: ACCURATE, PARTIALLY_ACCURATE, INACCURATE, "
"or SUPERSEDED (was accurate but law changed)."
)
else:
instructions = "Evaluate factual accuracy."
task = AnnotationTask(
task_id=f"acc_{len(self.tasks):06d}",
domain=self.domain,
text=f"Claim: {claim}\n\nContext: {text}",
task_type="accuracy",
instructions=instructions,
options=["ACCURATE", "PARTIALLY_ACCURATE",
"INACCURATE", "OUTDATED"],
required_qualifications=self._get_required_quals(),
estimated_time_minutes=5,
)
self.tasks.append(task)
return task
def compute_agreement(self, task_id):
"""
Compute inter-annotator agreement for a task.
Uses Fleiss' kappa for multi-annotator agreement.
"""
task_annotations = [
a for a in self.annotations if a.task_id == task_id
]
if len(task_annotations) < 2:
return {"kappa": None, "agreement": None}
labels = [a.label for a in task_annotations]
# Simple agreement: fraction of annotators who agree
# with the majority label
from collections import Counter
counts = Counter(labels)
majority_label = counts.most_common(1)[0][0]
majority_count = counts[majority_label]
agreement = majority_count / len(labels)
return {
"majority_label": majority_label,
"agreement": agreement,
"label_distribution": dict(counts),
"needs_adjudication": agreement < 0.67,
}
def adjudicate(self, task_id, senior_annotator_id):
"""
When annotators disagree, a senior expert makes
the final decision.
"""
agreement = self.compute_agreement(task_id)
if not agreement["needs_adjudication"]:
return agreement["majority_label"]
# In production: present the task and all annotations
# to the senior expert for final adjudication
return None # Placeholder
def compute_annotator_quality(self, annotator_id):
"""
Compute quality metrics for an annotator.
Uses agreement with majority and agreement with
gold standard (pre-adjudicated) tasks.
"""
annotator_tasks = [
a for a in self.annotations
if a.annotator_id == annotator_id
]
agreements = 0
total = 0
for annotation in annotator_tasks:
agreement = self.compute_agreement(annotation.task_id)
if agreement["majority_label"] is not None:
total += 1
if annotation.label == agreement["majority_label"]:
agreements += 1
return {
"annotator_id": annotator_id,
"total_tasks": len(annotator_tasks),
"agreement_rate": agreements / max(total, 1),
"avg_confidence": (
sum(a.confidence for a in annotator_tasks)
/ max(len(annotator_tasks), 1)
),
"avg_time": (
sum(a.time_spent_seconds for a in annotator_tasks)
/ max(len(annotator_tasks), 1)
),
}
def _get_required_quals(self):
"""Get required qualifications for the domain."""
quals = {
DomainType.MEDICAL: ["MD", "DO", "PharmD", "NP"],
DomainType.LEGAL: ["JD with bar admission"],
DomainType.FINANCIAL: ["CFA", "CPA", "Series 7"],
}
return quals.get(self.domain, [])
Annotation Cost by Domain and Task Type
| Metric | Accuracy check | Completeness review | Safety audit | Full QA |
|---|---|---|---|---|
| Medical (MD annotators) | ||||
| Legal (JD annotators) | ||||
| Financial (CFA annotators) |
Quality Assurance Pipeline
Automated Quality Gates
Before expert annotation (which is expensive), automated quality gates filter out obviously bad data.
class DomainQualityGate:
"""
Automated quality gates for domain-specific data.
Each gate is a fast, cheap check that filters out
low-quality data before expensive expert review.
"""
def __init__(self, domain):
self.domain = domain
self.gates = self._build_gates()
def _build_gates(self):
"""Build domain-specific quality gates."""
common_gates = [
("length", self._check_length),
("language", self._check_language),
("encoding", self._check_encoding),
("duplication", self._check_duplication),
]
domain_gates = {
DomainType.MEDICAL: [
("medical_terminology", self._check_medical_terms),
("structured_content", self._check_structured_medical),
("recency", self._check_medical_recency),
],
DomainType.LEGAL: [
("legal_structure", self._check_legal_structure),
("citation_presence", self._check_citations),
("jurisdiction_identified", self._check_jurisdiction),
],
DomainType.FINANCIAL: [
("numerical_content", self._check_financial_numbers),
("filing_structure", self._check_filing_structure),
("ticker_identified", self._check_ticker),
],
}
return common_gates + domain_gates.get(self.domain, [])
def evaluate(self, document):
"""Run all quality gates on a document."""
results = []
passed_all = True
for gate_name, gate_fn in self.gates:
passed, detail = gate_fn(document)
results.append({
"gate": gate_name,
"passed": passed,
"detail": detail,
})
if not passed:
passed_all = False
return {
"passed": passed_all,
"gates_passed": sum(1 for r in results if r["passed"]),
"gates_total": len(results),
"results": results,
}
def _check_length(self, doc):
"""Minimum length check."""
min_words = {"medical": 100, "legal": 200, "financial": 150}
threshold = min_words.get(self.domain.value, 100)
word_count = len(doc.get("text", "").split())
return (word_count >= threshold,
f"{word_count} words (min: {threshold})")
def _check_language(self, doc):
"""Check document is in English."""
text = doc.get("text", "")
# Simple heuristic: check for common English words
english_markers = ["the", "and", "is", "of", "in", "to"]
words = text.lower().split()[:100]
marker_count = sum(1 for w in words if w in english_markers)
is_english = marker_count >= 5
return (is_english, f"English markers: {marker_count}")
def _check_encoding(self, doc):
"""Check for encoding issues."""
text = doc.get("text", "")
bad_chars = sum(1 for c in text if ord(c) > 65535)
return (bad_chars == 0, f"Bad characters: {bad_chars}")
def _check_duplication(self, doc):
"""Check for excessive repetition."""
text = doc.get("text", "")
sentences = text.split(".")
if len(sentences) < 3:
return (True, "Too short to check")
unique = len(set(s.strip() for s in sentences if s.strip()))
ratio = unique / len(sentences)
return (ratio > 0.5, f"Unique ratio: {ratio:.2f}")
def _check_medical_terms(self, doc):
"""Check for medical terminology presence."""
text = doc.get("text", "").lower()
medical_terms = [
"patient", "diagnosis", "treatment", "clinical",
"symptoms", "dosage", "mg", "disease", "therapy",
"prognosis", "etiology", "pathology",
]
found = sum(1 for t in medical_terms if t in text)
return (found >= 3, f"Medical terms found: {found}")
def _check_structured_medical(self, doc):
"""Check for structured medical content."""
text = doc.get("text", "").upper()
sections = ["BACKGROUND", "METHODS", "RESULTS",
"CONCLUSION", "OBJECTIVE", "FINDINGS"]
found = sum(1 for s in sections if s in text)
return (found >= 1, f"Structured sections: {found}")
def _check_medical_recency(self, doc):
"""Check publication recency for medical content."""
year = doc.get("year", 0)
current_year = 2025
is_recent = (current_year - year) <= 5
return (is_recent or year == 0,
f"Published: {year}, age: {current_year - year}y")
def _check_legal_structure(self, doc):
"""Check for legal document structure."""
text = doc.get("text", "")
legal_indicators = [
"court", "plaintiff", "defendant", "held",
"opinion", "judgment", "statute", "section",
]
found = sum(
1 for ind in legal_indicators if ind in text.lower()
)
return (found >= 3, f"Legal indicators: {found}")
def _check_citations(self, doc):
"""Check for legal citation presence."""
text = doc.get("text", "")
citation_count = len(re.findall(
r'\d+\s+(?:U\.S\.|F\.\d+d|S\.Ct\.)\s+\d+', text
))
return (citation_count >= 1, f"Citations: {citation_count}")
def _check_jurisdiction(self, doc):
"""Check that jurisdiction is identified."""
has_jurisdiction = bool(doc.get("jurisdiction"))
return (has_jurisdiction,
f"Jurisdiction: {doc.get('jurisdiction', 'missing')}")
def _check_financial_numbers(self, doc):
"""Check for numerical financial content."""
text = doc.get("text", "")
number_pattern = re.compile(r'\$[\d,.]+|\d+%|\d{1,3}(?:,\d{3})+')
matches = number_pattern.findall(text)
return (len(matches) >= 5,
f"Financial numbers: {len(matches)}")
def _check_filing_structure(self, doc):
"""Check for SEC filing structure."""
text = doc.get("text", "").upper()
items = re.findall(r'ITEM\s+\d+[A-B]?', text)
return (len(items) >= 2, f"Item headers: {len(items)}")
def _check_ticker(self, doc):
"""Check that company ticker is identified."""
has_ticker = bool(doc.get("ticker"))
return (has_ticker,
f"Ticker: {doc.get('ticker', 'missing')}")
Complete Medical Data Pipeline
End-to-End Implementation
import time as time_module
from pathlib import Path
import json
class MedicalDataPipeline:
"""
Complete pipeline for building medical training data.
Stages:
1. Extract from PubMed (abstracts + full text)
2. Extract from clinical notes (with de-identification)
3. Extract from FDA drug labels
4. Quality gate filtering
5. Expert annotation (accuracy verification)
6. Format for training
"""
def __init__(self, config):
self.pubmed = PubMedExtractor(api_key=config.get("pubmed_key"))
self.deidentifier = PHIDeidentifier()
self.quality_gate = DomainQualityGate(DomainType.MEDICAL)
self.annotation_mgr = ExpertAnnotationManager(DomainType.MEDICAL)
def run(self, output_dir):
"""Run the complete medical data pipeline."""
start = time_module.time()
stats = {
"stage_results": {},
"final_counts": {},
}
# Stage 1: PubMed extraction
print("Stage 1: Extracting PubMed articles...")
pubmed_queries = [
"clinical trial[pt] AND treatment[tiab]",
"meta-analysis[pt]",
"systematic review[pt] AND guideline[tiab]",
"drug interaction[mesh] AND adverse effects[mesh]",
]
articles = []
for query in pubmed_queries:
pmids = self.pubmed.search(query, max_results=5000)
batch = self.pubmed.fetch_articles(pmids)
articles.extend(batch)
high_quality = self.pubmed.filter_high_quality(articles)
stats["stage_results"]["pubmed"] = {
"total_fetched": len(articles),
"high_quality": len(high_quality),
}
# Stage 2: Clinical notes (de-identified)
print("Stage 2: Processing clinical notes...")
clinical_docs = self._process_clinical_notes()
stats["stage_results"]["clinical"] = {
"total_notes": len(clinical_docs),
}
# Stage 3: Combine and quality gate
print("Stage 3: Quality filtering...")
all_docs = (
[self._article_to_doc(a) for a in high_quality]
+ clinical_docs
)
passed_docs = []
failed_docs = []
for doc in all_docs:
result = self.quality_gate.evaluate(doc)
if result["passed"]:
passed_docs.append(doc)
else:
failed_docs.append((doc, result))
stats["stage_results"]["quality_gate"] = {
"input": len(all_docs),
"passed": len(passed_docs),
"failed": len(failed_docs),
"pass_rate": len(passed_docs) / max(len(all_docs), 1),
}
# Stage 4: Format for training
print("Stage 4: Formatting training examples...")
training_examples = self._format_for_training(passed_docs)
stats["final_counts"]["training_examples"] = len(training_examples)
# Save
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
with open(output_path / "medical_train.jsonl", "w") as f:
for ex in training_examples:
f.write(json.dumps(ex) + "\n")
elapsed = time_module.time() - start
stats["elapsed_seconds"] = elapsed
print(f"Pipeline complete in {elapsed:.1f}s")
return stats
def _process_clinical_notes(self):
"""Process and de-identify clinical notes."""
# In production: load from MIMIC-IV or internal EHR
raw_notes = [] # Placeholder
deidentified = []
for note in raw_notes:
result = self.deidentifier.deidentify(note["text"])
if result["phi_count"] >= 0:
deidentified.append({
"text": result["deidentified_text"],
"source": "clinical_notes",
"phi_removed": result["phi_count"],
})
return deidentified
def _article_to_doc(self, article):
"""Convert a PubMed article to a document dict."""
return {
"text": f"{article.title}\n\n{article.abstract}",
"source": "pubmed",
"pmid": article.pmid,
"year": 2024,
"mesh_terms": article.mesh_terms,
}
def _format_for_training(self, docs):
"""Format documents as training examples."""
examples = []
for doc in docs:
# Format as instruction-response pair
# The model learns to answer medical questions
# based on authoritative sources
examples.append({
"messages": [
{"role": "system",
"content": "You are a medical knowledge assistant. "
"Provide accurate, evidence-based "
"information."},
{"role": "user",
"content": self._generate_question(doc)},
{"role": "assistant",
"content": doc["text"]},
],
"source": doc["source"],
"domain": "medical",
})
return examples
def _generate_question(self, doc):
"""Generate a question that the document answers."""
# In production: use an LLM to generate natural questions
text = doc["text"][:200]
return f"Summarize the key findings: {text}..."
Medical Data Pipeline Output Statistics
| Source | Raw Count | After Quality Gate | After Expert Review | Final |
|---|---|---|---|---|
| PubMed (meta-analyses) | 45,000 | 38,000 (84%) | 35,000 (92%) | 35,000 |
| PubMed (clinical trials) | 120,000 | 85,000 (71%) | 72,000 (85%) | 72,000 |
| PubMed (reviews) | 90,000 | 65,000 (72%) | 55,000 (85%) | 55,000 |
| Clinical notes (MIMIC) | 300,000 | 250,000 (83%) | N/A (auto-scored) | 250,000 |
| FDA drug labels | 100,000 | 95,000 (95%) | N/A (regulatory) | 95,000 |
| Total | 655,000 | 533,000 (81%) | - | 507,000 |
Key Takeaways
Domain-specific data is harder to build than general data because the cost of errors is higher, the sources are more restricted, and quality verification requires expensive experts.
Core principles:
-
Authority hierarchy: Not all sources are equal. A meta-analysis outweighs a case report. A Supreme Court opinion outweighs a district court opinion. An SEC filing outweighs a news article. Weight training data by source authority.
-
Privacy-first for medical data: De-identification is mandatory, imperfect, and must be layered. Regex catches patterns, NER catches names, manual review catches what automated methods miss. Budget 2-5% of pipeline cost for privacy QA.
-
Structured extraction preserves value: A 10-K filing is not just text — it has sections (Risk Factors, MD&A), tables (financial statements), and metadata (CIK, filing date). Preserving this structure in training data teaches the model domain-specific document structure.
-
Expert annotation is the bottleneck: At $300-500/hour, domain experts are the scarcest resource. Maximize their impact: use automated quality gates to filter out obviously bad data first, then use experts only for accuracy verification on the remaining high-quality subset.
-
Recency matters: Medical guidelines change yearly. Case law is overruled. Regulations are amended. Domain data has a shelf life. Build pipelines that can refresh data on a schedule matched to the domain’s update cadence.
The cost equation: for a 500K-example medical training dataset, expect approximately 150K-300K in expert annotation (10% sample rate at 20K in pipeline engineering. Total: $220K-370K. This is a small fraction of the training compute cost but determines whether the model is trustworthy in the domain.