Specialized Researcher Subgraphs for LangGraph

Research workflows often need to evaluate sources from different domains: web pages, academic papers, and books. Each source type has fundamentally different quality signals. A web source should be evaluated for recency and domain authority; an academic paper for peer-review status and citation count; a book for author credentials and publisher reputation.

This pattern replaces a monolithic researcher with specialized subgraphs, each applying source-appropriate evaluation criteria while sharing common utilities.

The Problem

A single researcher node evaluating all source types faces conflicting criteria:

# Problematic: One prompt tries to handle everything
COMPRESS_PROMPT = """Evaluate sources based on:
- Recency (but books may be older...)
- Citations (but web sources don't have these...)
- Publisher reputation (but only for books...)
"""

This leads to:

  • Confused evaluation (applying web criteria to academic papers)
  • Inconsistent quality scoring across source types
  • Difficult prompt maintenance as criteria grow

The Solution

Create specialized researcher subgraphs with source-type specific evaluation criteria:

Source-Specific Evaluation Criteria

SourceType = Literal["web", "academic", "book"]
 
COMPRESS_PROMPTS: dict[SourceType, str] = {
    "web": """You are compressing web research findings.
 
Evaluate sources based on:
1. **Recency**: Prefer recent content (within 1-2 years)
2. **Domain authority**: Trust .gov, .edu, established news > blogs, forums
3. **Factual accuracy**: Cross-reference claims, note contradictions
4. **Bias detection**: Identify opinion vs fact, commercial interests
 
Output JSON with: finding, sources, confidence (0-1), gaps""",
 
    "academic": """You are compressing academic research findings.
 
Evaluate sources based on:
1. **Peer-review status**: Prioritize peer-reviewed journals
2. **Citation count**: Higher citations suggest established findings
3. **Methodology quality**: Note study design, sample sizes, limitations
4. **Evidence hierarchy**: Meta-analysis > RCT > Observational > Case study
 
Output JSON with: finding, sources, confidence (0-1), gaps""",
 
    "book": """You are compressing book research findings.
 
Evaluate sources based on:
1. **Author credentials**: Academic position, publication history
2. **Publisher reputation**: Academic press > trade > self-published
3. **Edition currency**: Is this the latest edition?
4. **Book's contribution**: Seminal work, field synthesis, or contrarian view?
 
Output JSON with: finding, sources, confidence (0-1), gaps""",
}

Shared Utilities

Common functionality lives in shared utilities, avoiding duplication:

from cachetools import TTLCache
 
# Module-level cache shared by all researchers
_scrape_cache: TTLCache[str, str] = TTLCache(maxsize=200, ttl=3600)
 
async def generate_queries(question: ResearchQuestion) -> list[str]:
    """Generate search queries from research question.
 
    Identical for all researcher types - the search sources differ.
    """
    # LLM call to generate queries
    return queries
 
async def scrape_urls_parallel(
    urls: list[str],
    max_scrapes: int = 10,
) -> list[tuple[str, str]]:
    """Scrape URLs in parallel using shared cache."""
    async def scrape_one(url: str) -> tuple[str, str]:
        if url in _scrape_cache:
            return url, _scrape_cache[url]
        content = await scrape(url)
        _scrape_cache[url] = content
        return url, content
 
    tasks = [scrape_one(url) for url in urls[:max_scrapes]]
    return await asyncio.gather(*tasks, return_exceptions=True)

Parameterized Compression

A single compression function, parameterized by source type:

from functools import partial
 
async def compress_findings(
    state: ResearcherState,
    source_type: SourceType,
) -> dict[str, Any]:
    """Compress findings using source-specific evaluation criteria."""
    system_prompt = COMPRESS_PROMPTS[source_type]
 
    # LLM call with source-specific prompt
    finding = ResearchFinding(
        question_id=state["question"]["question_id"],
        finding=compressed_result,
        sources=sources,
        confidence=confidence,
        gaps=gaps,
        source_type=source_type,
    )
    return {"finding": finding}
 
# Create specialized subgraphs using partial
def create_researcher_subgraph(source_type: SourceType) -> StateGraph:
    builder = StateGraph(ResearcherState)
 
    builder.add_node("generate_queries", gen_queries)
    builder.add_node("search", search_funcs[source_type])
 
    # Partial application for source-specific compression
    compress_fn = partial(compress_findings, source_type=source_type)
    builder.add_node("compress", compress_fn)
 
    # Linear flow
    builder.add_edge(START, "generate_queries")
    builder.add_edge("generate_queries", "search")
    builder.add_edge("search", "compress")
    builder.add_edge("compress", END)
 
    return builder.compile()
 
# Pre-compiled subgraphs
web_researcher = create_researcher_subgraph("web")
academic_researcher = create_researcher_subgraph("academic")
book_researcher = create_researcher_subgraph("book")

Allocation-Based Routing

Distribute questions across researchers based on configurable allocation:

from itertools import islice
from langgraph.types import Send
 
def route_to_researchers(state: MainWorkflowState) -> list[Send]:
    """Route questions to specialized researchers based on allocation.
 
    Uses round-robin assignment: question 0 -> first researcher type,
    question 1 -> second type, etc.
    """
    pending = state.get("pending_questions", [])
    if not pending:
        return []
 
    allocation = state.get("allocation") or {"web": 1, "academic": 1, "book": 1}
 
    # Build dispatch config: [(node_name, count), ...]
    dispatch_config = [
        ("web_researcher", allocation.get("web", 1)),
        ("academic_researcher", allocation.get("academic", 1)),
        ("book_researcher", allocation.get("book", 1)),
    ]
 
    # Use iterator for clean round-robin
    pending_iter: Iterator[ResearchQuestion] = iter(pending)
    sends: list[Send] = []
 
    for node_name, count in dispatch_config:
        for question in islice(pending_iter, count):
            sends.append(Send(node_name, ResearcherState(
                question=question,
                search_queries=[],
                search_results=[],
                scraped_content=[],
                finding=None,
            )))
 
    return sends

Main Graph Construction

def create_research_graph() -> StateGraph:
    """Create the main research workflow with specialized researchers."""
    from operator import add
    from typing import Annotated
 
    class DeepResearchState(TypedDict):
        pending_questions: list[ResearchQuestion]
        research_findings: Annotated[list[ResearchFinding], add]
        allocation: dict[str, int]
        current_status: str
 
    builder = StateGraph(DeepResearchState)
 
    builder.add_node("supervisor", supervisor)
    builder.add_node("web_researcher", web_researcher)
    builder.add_node("academic_researcher", academic_researcher)
    builder.add_node("book_researcher", book_researcher)
    builder.add_node("aggregate_findings", aggregate_findings)
 
    builder.add_edge(START, "supervisor")
    builder.add_conditional_edges("supervisor", route_to_researchers)
    builder.add_edge("web_researcher", "aggregate_findings")
    builder.add_edge("academic_researcher", "aggregate_findings")
    builder.add_edge("book_researcher", "aggregate_findings")
    builder.add_edge("aggregate_findings", END)
 
    return builder.compile()

Usage Example

async def example_usage():
    graph = create_research_graph()
 
    initial_state = {
        "pending_questions": [
            ResearchQuestion(
                question_id="q1",
                question="What are the latest AI regulation developments?",
                context="Need current policy landscape",
            ),
            ResearchQuestion(
                question_id="q2",
                question="What academic research exists on AI governance?",
                context="Need peer-reviewed sources",
            ),
            ResearchQuestion(
                question_id="q3",
                question="What foundational books cover AI ethics?",
                context="Need authoritative long-form sources",
            ),
        ],
        "research_findings": [],
        "allocation": {"web": 1, "academic": 1, "book": 1},
        "current_status": "starting",
    }
 
    result = await graph.ainvoke(initial_state)
 
    # Group findings by source type
    by_type: dict[str, list[ResearchFinding]] = {}
    for finding in result["research_findings"]:
        source_type = finding["source_type"]
        by_type.setdefault(source_type, []).append(finding)
 
    print("Findings by source type:")
    for source_type, findings in by_type.items():
        print(f"  {source_type}: {len(findings)} findings")

Trade-offs

Benefits:

  • Each source type evaluated with appropriate criteria
  • Shared utilities reduce code duplication
  • Allocation configurable per research context
  • Findings tagged with source type for downstream processing

Costs:

  • Three subgraphs to maintain instead of one
  • Prompt updates must consider all source types
  • Allocation tuning required for optimal distribution

When to Use This Pattern

Good fit:

  • Research spanning multiple source types (web, academic, books)
  • Source types have fundamentally different quality signals
  • Need to track which source type contributed each finding
  • Want configurable allocation across researcher types

Poor fit:

  • Single source type research
  • All sources can be evaluated with same criteria
  • Simple research workflows without specialization needs