Multi-Agent Systems with LLMs: A Developer’s Guide (2026)

A multi-agent system is a group of AI agents that each handle a specific task and pass results to one another. Instead of prompting one model to do everything, you split the work — a researcher finds facts, a writer drafts content, an editor reviews it. Each agent does less, but the overall output is dramatically better.


Why Multi-Agent?

Single-agent LLM calls hit limits quickly:

  • Context window overflow — one agent can’t hold a 200-page report and write a summary at the same time
  • Quality degradation — asking one model to research AND write AND fact-check produces mediocre results at all three
  • No parallelism — sequential prompts are slow; agents can run in parallel
  • Hard to debug — when one big prompt fails, you don’t know which step went wrong

Multi-agent fixes all of these by giving each agent a narrow, well-defined job.


Core Architecture Patterns

1. Orchestrator → Workers

A central agent (orchestrator) breaks down the task and delegates to specialized sub-agents. Workers report back; the orchestrator assembles the final result.

Orchestrator
├── Research Agent    → gathers data
├── Analysis Agent    → interprets data
└── Writer Agent      → produces output

2. Pipeline (Sequential)

Output of agent N becomes input to agent N+1. Simple to reason about, easy to debug.

Input → Researcher → Drafter → Editor → Output

3. Parallel Fan-Out

Orchestrator dispatches multiple agents simultaneously, then merges their results. Great for research tasks where sources are independent.

         ┌─ Agent A ─┐
Input ───┤─ Agent B ─├─── Merger ── Output
         └─ Agent C ─┘

Building a Simple Two-Agent Pipeline

A Researcher agent fetches raw information; a Writer agent turns it into a polished article section. Both use Claude:

import anthropic

client = anthropic.Anthropic()


def researcher(topic: str) -> str:
    """Finds key facts about a topic."""
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        system=(
            "You are a technical researcher. Given a topic, return 5-7 "
            "concise bullet points with the most important facts, stats, "
            "and concepts. Be specific — no filler."
        ),
        messages=[{"role": "user", "content": f"Research topic: {topic}"}],
    )
    return response.content[0].text


def writer(topic: str, research: str) -> str:
    """Writes a clear explanation based on the research."""
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        system=(
            "You are a technical writer for developers. "
            "Write clear, practical prose — no fluff, no marketing speak."
        ),
        messages=[
            {
                "role": "user",
                "content": (
                    f"Topic: {topic}\n\n"
                    f"Research notes:\n{research}\n\n"
                    "Write a 3-paragraph explanation for a developer audience."
                ),
            }
        ],
    )
    return response.content[0].text


# Run the pipeline
topic = "how transformer attention mechanisms work"
print("Step 1: Research...")
facts = researcher(topic)
print(facts)

print("\nStep 2: Write...")
article = writer(topic, facts)
print(article)

Orchestrator Pattern

The orchestrator decides which agents to call and in what order. Here’s a clean implementation that handles the full research-write-review cycle:

import anthropic
from dataclasses import dataclass


@dataclass
class AgentResult:
    agent: str
    output: str


client = anthropic.Anthropic()


def run_agent(name: str, system: str, user: str, max_tokens: int = 1024) -> AgentResult:
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=max_tokens,
        system=system,
        messages=[{"role": "user", "content": user}],
    )
    return AgentResult(agent=name, output=response.content[0].text)


def orchestrator(task: str) -> str:
    print(f"Orchestrator: starting task — {task[:60]}...")

    # Step 1: Research
    research = run_agent(
        name="Researcher",
        system="You are a technical researcher. Return bullet-point facts only.",
        user=f"Research: {task}",
    )
    print(f"  [{research.agent}] done ({len(research.output)} chars)")

    # Step 2: Draft
    draft = run_agent(
        name="Writer",
        system="You are a technical writer. Write clearly for developers.",
        user=f"Task: {task}\n\nResearch:\n{research.output}\n\nWrite a draft.",
        max_tokens=2048,
    )
    print(f"  [{draft.agent}] done ({len(draft.output)} chars)")

    # Step 3: Review
    final = run_agent(
        name="Editor",
        system=(
            "You are a senior technical editor. "
            "Fix clarity, remove redundancy, ensure accuracy. "
            "Return only the improved text."
        ),
        user=f"Edit this draft:\n\n{draft.output}",
        max_tokens=2048,
    )
    print(f"  [{final.agent}] done ({len(final.output)} chars)")

    return final.output


result = orchestrator("Explain how vector databases work and when to use them")
print("\n=== Final Output ===")
print(result)

Parallel Fan-Out with ThreadPoolExecutor

When agents are independent, run them simultaneously to cut latency:

import anthropic
from concurrent.futures import ThreadPoolExecutor, as_completed

client = anthropic.Anthropic()


def research_source(source_name: str, topic: str) -> tuple[str, str]:
    response = client.messages.create(
        model="claude-haiku-4-5",   # cheap model for parallel workers
        max_tokens=512,
        messages=[
            {
                "role": "user",
                "content": (
                    f"You are a {source_name} expert. "
                    f"Give 3 bullet points about: {topic}"
                ),
            }
        ],
    )
    return source_name, response.content[0].text


def parallel_research(topic: str, sources: list[str]) -> dict[str, str]:
    results = {}
    with ThreadPoolExecutor(max_workers=len(sources)) as pool:
        futures = {
            pool.submit(research_source, source, topic): source
            for source in sources
        }
        for future in as_completed(futures):
            name, output = future.result()
            results[name] = output
            print(f"  [{name}] completed")
    return results


def synthesize(topic: str, research: dict[str, str]) -> str:
    combined = "\n\n".join(f"## {k}\n{v}" for k, v in research.items())
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": (
                    f"Topic: {topic}\n\n"
                    f"Research from multiple sources:\n{combined}\n\n"
                    "Synthesize into one coherent summary."
                ),
            }
        ],
    )
    return response.content[0].text


# Run parallel research + synthesis
topic = "best practices for deploying LLMs in production"
sources = ["DevOps", "ML Engineering", "Security"]

print("Running parallel research...")
research = parallel_research(topic, sources)

print("Synthesizing...")
summary = synthesize(topic, research)
print(summary)

Agent Handoffs with State

Real pipelines need to pass structured state between agents, not just raw strings. Use a dataclass or dict to carry context:

from dataclasses import dataclass, field
import anthropic

client = anthropic.Anthropic()


@dataclass
class PipelineState:
    topic: str
    research: str = ""
    outline: str = ""
    draft: str = ""
    final: str = ""
    errors: list[str] = field(default_factory=list)


def research_agent(state: PipelineState) -> PipelineState:
    try:
        resp = client.messages.create(
            model="claude-haiku-4-5",
            max_tokens=512,
            messages=[{"role": "user", "content": f"Research key facts about: {state.topic}"}],
        )
        state.research = resp.content[0].text
    except Exception as e:
        state.errors.append(f"research: {e}")
    return state


def outline_agent(state: PipelineState) -> PipelineState:
    if not state.research:
        state.errors.append("outline skipped: no research")
        return state
    resp = client.messages.create(
        model="claude-haiku-4-5",
        max_tokens=512,
        messages=[
            {
                "role": "user",
                "content": f"Create a 5-point article outline based on:\n{state.research}",
            }
        ],
    )
    state.outline = resp.content[0].text
    return state


def writer_agent(state: PipelineState) -> PipelineState:
    if not state.outline:
        state.errors.append("writer skipped: no outline")
        return state
    resp = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        messages=[
            {
                "role": "user",
                "content": (
                    f"Topic: {state.topic}\n"
                    f"Outline:\n{state.outline}\n"
                    f"Research:\n{state.research}\n\n"
                    "Write the full article."
                ),
            }
        ],
    )
    state.draft = resp.content[0].text
    return state


# Run the pipeline
state = PipelineState(topic="Getting started with LangChain")
for agent in [research_agent, outline_agent, writer_agent]:
    state = agent(state)
    print(f"{agent.__name__}: {'OK' if not state.errors else state.errors[-1]}")

print("\nFinal draft length:", len(state.draft), "chars")
if state.errors:
    print("Errors:", state.errors)

When to Use Multi-Agent vs Single Agent

  • Use single agent when the task fits in one prompt, needs one skill, and latency matters more than quality
  • Use pipeline when the task has clear sequential steps and each step has different requirements
  • Use orchestrator when the task is complex, the number of steps is dynamic, or you need error recovery per step
  • Use parallel fan-out when sub-tasks are independent and you need to reduce wall-clock time

Cost and Latency Considerations

  • Use cheaper models for worker agents: claude-haiku-4-5 for research/extraction; claude-sonnet-4-6 for final synthesis
  • Parallelize where possible: 5 parallel Haiku calls often beat 1 Sonnet call in both cost and speed
  • Cache repeated prompts: if multiple agents use the same system prompt or context, enable prompt caching to save tokens
  • Set per-agent token limits: researchers don’t need 4096 tokens; cap them at 512–1024
  • Fail fast: check state for errors after each agent; skip downstream agents if a critical step failed

Full Example: Content Pipeline

A complete pipeline that turns a topic into a publishable article:

import anthropic
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field

client = anthropic.Anthropic()


@dataclass
class Article:
    topic: str
    facts: list[str] = field(default_factory=list)
    outline: str = ""
    draft: str = ""
    title: str = ""
    seo_description: str = ""


def call(system: str, user: str, model: str = "claude-haiku-4-5", max_tokens: int = 512) -> str:
    return client.messages.create(
        model=model,
        max_tokens=max_tokens,
        system=system,
        messages=[{"role": "user", "content": user}],
    ).content[0].text


def run_pipeline(topic: str) -> Article:
    art = Article(topic=topic)

    # Step 1: Parallel research (3 angles simultaneously)
    print("1. Parallel research...")
    angles = ["technical fundamentals", "practical use cases", "common pitfalls"]
    with ThreadPoolExecutor(max_workers=3) as pool:
        futures = [
            pool.submit(call, "Return 3 concise bullet points.", f"{angle} of: {topic}")
            for angle in angles
        ]
        art.facts = [f.result() for f in futures]

    # Step 2: Outline
    print("2. Outlining...")
    combined = "\n\n".join(art.facts)
    art.outline = call(
        "Create a 5-section article outline.",
        f"Topic: {topic}\n\nResearch:\n{combined}",
    )

    # Step 3: Draft (Sonnet for quality)
    print("3. Writing draft...")
    art.draft = call(
        "Write a complete developer-focused article. Be practical, use code examples.",
        f"Topic: {topic}\nOutline:\n{art.outline}\nResearch:\n{combined}",
        model="claude-sonnet-4-6",
        max_tokens=3000,
    )

    # Step 4: Parallel meta generation (title + SEO simultaneously)
    print("4. Generating metadata...")
    with ThreadPoolExecutor(max_workers=2) as pool:
        title_f = pool.submit(
            call, "Return only a compelling article title, no quotes.",
            f"Generate SEO title for:\n{art.draft[:500]}"
        )
        seo_f = pool.submit(
            call, "Return only a 155-character meta description, no quotes.",
            f"Meta description for:\n{art.draft[:500]}"
        )
        art.title = title_f.result()
        art.seo_description = seo_f.result()

    return art


article = run_pipeline("How to build a RAG system with Python")
print(f"\nTitle: {article.title}")
print(f"SEO: {article.seo_description}")
print(f"Draft: {len(article.draft)} chars")

Summary

Multi-agent systems let you build AI pipelines that are smarter, faster, and easier to debug than single-prompt approaches:

  • Split work by skill: researcher, writer, editor, reviewer
  • Run independent agents in parallel with ThreadPoolExecutor
  • Pass structured state between agents to avoid lost context
  • Use cheap models for workers, powerful models for synthesis
  • Fail fast and log errors per agent — don’t let one bad step corrupt the whole pipeline

Related reading: How to Build an AI Agent with Python for single-agent fundamentals, or LangChain for Beginners for a framework-based approach to agent orchestration.