Specialized Researcher Subgraphs for LangGraph
Research workflows often need to evaluate sources from different domains: web pages, academic papers, and books. Each source type has fundamentally different quality signals. A web source should be evaluated for recency and domain authority; an academic paper for peer-review status and citation count; a book for author credentials and publisher reputation.
This pattern replaces a monolithic researcher with specialized subgraphs, each applying source-appropriate evaluation criteria while sharing common utilities.
The Problem
A single researcher node evaluating all source types faces conflicting criteria:
# Problematic: One prompt tries to handle everything
COMPRESS_PROMPT = """Evaluate sources based on:
- Recency (but books may be older...)
- Citations (but web sources don't have these...)
- Publisher reputation (but only for books...)
"""This leads to:
- Confused evaluation (applying web criteria to academic papers)
- Inconsistent quality scoring across source types
- Difficult prompt maintenance as criteria grow
The Solution
Create specialized researcher subgraphs with source-type specific evaluation criteria:
Source-Specific Evaluation Criteria
SourceType = Literal["web", "academic", "book"]
COMPRESS_PROMPTS: dict[SourceType, str] = {
"web": """You are compressing web research findings.
Evaluate sources based on:
1. **Recency**: Prefer recent content (within 1-2 years)
2. **Domain authority**: Trust .gov, .edu, established news > blogs, forums
3. **Factual accuracy**: Cross-reference claims, note contradictions
4. **Bias detection**: Identify opinion vs fact, commercial interests
Output JSON with: finding, sources, confidence (0-1), gaps""",
"academic": """You are compressing academic research findings.
Evaluate sources based on:
1. **Peer-review status**: Prioritize peer-reviewed journals
2. **Citation count**: Higher citations suggest established findings
3. **Methodology quality**: Note study design, sample sizes, limitations
4. **Evidence hierarchy**: Meta-analysis > RCT > Observational > Case study
Output JSON with: finding, sources, confidence (0-1), gaps""",
"book": """You are compressing book research findings.
Evaluate sources based on:
1. **Author credentials**: Academic position, publication history
2. **Publisher reputation**: Academic press > trade > self-published
3. **Edition currency**: Is this the latest edition?
4. **Book's contribution**: Seminal work, field synthesis, or contrarian view?
Output JSON with: finding, sources, confidence (0-1), gaps""",
}Shared Utilities
Common functionality lives in shared utilities, avoiding duplication:
from cachetools import TTLCache
# Module-level cache shared by all researchers
_scrape_cache: TTLCache[str, str] = TTLCache(maxsize=200, ttl=3600)
async def generate_queries(question: ResearchQuestion) -> list[str]:
"""Generate search queries from research question.
Identical for all researcher types - the search sources differ.
"""
# LLM call to generate queries
return queries
async def scrape_urls_parallel(
urls: list[str],
max_scrapes: int = 10,
) -> list[tuple[str, str]]:
"""Scrape URLs in parallel using shared cache."""
async def scrape_one(url: str) -> tuple[str, str]:
if url in _scrape_cache:
return url, _scrape_cache[url]
content = await scrape(url)
_scrape_cache[url] = content
return url, content
tasks = [scrape_one(url) for url in urls[:max_scrapes]]
return await asyncio.gather(*tasks, return_exceptions=True)Parameterized Compression
A single compression function, parameterized by source type:
from functools import partial
async def compress_findings(
state: ResearcherState,
source_type: SourceType,
) -> dict[str, Any]:
"""Compress findings using source-specific evaluation criteria."""
system_prompt = COMPRESS_PROMPTS[source_type]
# LLM call with source-specific prompt
finding = ResearchFinding(
question_id=state["question"]["question_id"],
finding=compressed_result,
sources=sources,
confidence=confidence,
gaps=gaps,
source_type=source_type,
)
return {"finding": finding}
# Create specialized subgraphs using partial
def create_researcher_subgraph(source_type: SourceType) -> StateGraph:
builder = StateGraph(ResearcherState)
builder.add_node("generate_queries", gen_queries)
builder.add_node("search", search_funcs[source_type])
# Partial application for source-specific compression
compress_fn = partial(compress_findings, source_type=source_type)
builder.add_node("compress", compress_fn)
# Linear flow
builder.add_edge(START, "generate_queries")
builder.add_edge("generate_queries", "search")
builder.add_edge("search", "compress")
builder.add_edge("compress", END)
return builder.compile()
# Pre-compiled subgraphs
web_researcher = create_researcher_subgraph("web")
academic_researcher = create_researcher_subgraph("academic")
book_researcher = create_researcher_subgraph("book")Allocation-Based Routing
Distribute questions across researchers based on configurable allocation:
from itertools import islice
from langgraph.types import Send
def route_to_researchers(state: MainWorkflowState) -> list[Send]:
"""Route questions to specialized researchers based on allocation.
Uses round-robin assignment: question 0 -> first researcher type,
question 1 -> second type, etc.
"""
pending = state.get("pending_questions", [])
if not pending:
return []
allocation = state.get("allocation") or {"web": 1, "academic": 1, "book": 1}
# Build dispatch config: [(node_name, count), ...]
dispatch_config = [
("web_researcher", allocation.get("web", 1)),
("academic_researcher", allocation.get("academic", 1)),
("book_researcher", allocation.get("book", 1)),
]
# Use iterator for clean round-robin
pending_iter: Iterator[ResearchQuestion] = iter(pending)
sends: list[Send] = []
for node_name, count in dispatch_config:
for question in islice(pending_iter, count):
sends.append(Send(node_name, ResearcherState(
question=question,
search_queries=[],
search_results=[],
scraped_content=[],
finding=None,
)))
return sendsMain Graph Construction
def create_research_graph() -> StateGraph:
"""Create the main research workflow with specialized researchers."""
from operator import add
from typing import Annotated
class DeepResearchState(TypedDict):
pending_questions: list[ResearchQuestion]
research_findings: Annotated[list[ResearchFinding], add]
allocation: dict[str, int]
current_status: str
builder = StateGraph(DeepResearchState)
builder.add_node("supervisor", supervisor)
builder.add_node("web_researcher", web_researcher)
builder.add_node("academic_researcher", academic_researcher)
builder.add_node("book_researcher", book_researcher)
builder.add_node("aggregate_findings", aggregate_findings)
builder.add_edge(START, "supervisor")
builder.add_conditional_edges("supervisor", route_to_researchers)
builder.add_edge("web_researcher", "aggregate_findings")
builder.add_edge("academic_researcher", "aggregate_findings")
builder.add_edge("book_researcher", "aggregate_findings")
builder.add_edge("aggregate_findings", END)
return builder.compile()Usage Example
async def example_usage():
graph = create_research_graph()
initial_state = {
"pending_questions": [
ResearchQuestion(
question_id="q1",
question="What are the latest AI regulation developments?",
context="Need current policy landscape",
),
ResearchQuestion(
question_id="q2",
question="What academic research exists on AI governance?",
context="Need peer-reviewed sources",
),
ResearchQuestion(
question_id="q3",
question="What foundational books cover AI ethics?",
context="Need authoritative long-form sources",
),
],
"research_findings": [],
"allocation": {"web": 1, "academic": 1, "book": 1},
"current_status": "starting",
}
result = await graph.ainvoke(initial_state)
# Group findings by source type
by_type: dict[str, list[ResearchFinding]] = {}
for finding in result["research_findings"]:
source_type = finding["source_type"]
by_type.setdefault(source_type, []).append(finding)
print("Findings by source type:")
for source_type, findings in by_type.items():
print(f" {source_type}: {len(findings)} findings")Trade-offs
Benefits:
- Each source type evaluated with appropriate criteria
- Shared utilities reduce code duplication
- Allocation configurable per research context
- Findings tagged with source type for downstream processing
Costs:
- Three subgraphs to maintain instead of one
- Prompt updates must consider all source types
- Allocation tuning required for optimal distribution
When to Use This Pattern
Good fit:
- Research spanning multiple source types (web, academic, books)
- Source types have fundamentally different quality signals
- Need to track which source type contributed each finding
- Want configurable allocation across researcher types
Poor fit:
- Single source type research
- All sources can be evaluated with same criteria
- Simple research workflows without specialization needs