Building Claude Code from Scratch: Unveiling the 12-Layer Architecture Evolution of an AI Coding Agent

It looks like you've got the core structure and roadmap for the project ready. This content is remarkably solid - it's not just a codebase, but more like a training path for Agent architects.

To help you turn this content into a high-quality blog post, I suggest structuring it as: "Theoretical Model → 12-Level Evolution Ladder → Quick Hands-On."

https://github.com/shareAI-lab/learn-claude-code

Content Outline

1. Opening: From "Black Box" to "White Box"

Pain point introduction: Today's Agent frameworks (like LangChain, CrewAI) are powerful but bloated.

Project motivation: Emulate the minimalism and efficiency of Claude Code, using the purest Python code (from 0 to 1) to disassemble every core component of an intelligent agent.

Core philosophy: "One Loop to Rule Them All." No matter how complex an Agent is, at its core it's just a while loop that continuously processes tool_use.

2. Four Evolution Phases (Learning Path Visualization)

You can directly reference your phase breakdown, with a "technical soul" summary for each phase:

Phase 1: The Core Loop (The Loop)

  • Technical points: Handling stop_reason, building a dispatch map.
  • Insight: An Agent's vitality lies in its ability to repeatedly perceive and respond to its environment.

Phase 2: Cognitive Upgrade (Planning & Knowledge)

  • Technical points: Todo management, Context Compact.
  • Insight: Memory isn't infinite — a smart Agent knows how to "forget" and "plan."

Phase 3: Engineering Persistence (Persistence)

  • Technical points: File-system-driven task graphs, background daemons.
  • Insight: The ability to handle long-running tasks is the dividing line between a toy and a tool.

Phase 4: Team Operations (Teams)

  • Technical points: JSONL mailbox protocol, autonomous task claiming, isolated workspaces.
  • Insight: Multi-agent isn't simply parallelism — it's orderly collaboration and resource isolation.

3. Code Showcase: The Elegance of the Core Loop

Include that concise def agent_loop(messages): in the blog. This code is the soul of the entire project, demonstrating how a simple if response.stop_reason != "tool_use": return controls the entire agent's lifecycle.

4. Quick Start & Interactive Experience

Guide readers to get started quickly via git clone.

Highlight: Emphasize your Web Platform (built with Next.js as a visualization platform). For learners, being able to see the Agent's thought process through "step-by-step debugging" and "topology graphs" is extremely appealing.


s01: The Agent Loop

[ s01 ] s02 > s03 > s04 > s05 > s06 | s07 > s08 > s09 > s10 > s11 > s12

"One loop & Bash is all you need" — one tool + one loop = one agent.

Language models can reason about code, but they can't touch the real world — they can't read files, run tests, or see error messages. Without a loop, every time a tool is called you have to manually paste the result back in. You yourself become the loop.

Solution

+--------+      +-------+      +---------+
|  User  | ---> |  LLM  | ---> |  Tool   |
| prompt |      |       |      | execute |
+--------+      +---+---+      +----+----+
                    ^                |
                    |   tool_result  |
                    +----------------+
                    (loop until stop_reason != "tool_use")        

A single exit condition controls the entire flow. The loop keeps running until the model stops calling tools.

How It Works

1.User prompt as the first message.

messages.append({"role": "user", "content": query})        

2.Send messages and tool definitions to the LLM.

response = client.messages.create(
    model=MODEL, system=SYSTEM, messages=messages,
    tools=TOOLS, max_tokens=8000,
)        

3. Append the assistant response. Check stop_reason — if the model didn't call a tool, we're done.

messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
    return        

4. Execute each tool call, collect results, and append as a user message. Go back to step 2.

results = []
for block in response.content:
    if block.type == "tool_use":
        output = run_bash(block.input["command"])
        results.append({
            "type": "tool_result",
            "tool_use_id": block.id,
            "content": output,
        })
messages.append({"role": "user", "content": results})        

Assembled into a complete function:

def agent_loop(query):
    messages = [{"role": "user", "content": query}]
    while True:
        response = client.messages.create(
            model=MODEL, system=SYSTEM, messages=messages,
            tools=TOOLS, max_tokens=8000,
        )
        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason != "tool_use":
            return

        results = []
        for block in response.content:
            if block.type == "tool_use":
                output = run_bash(block.input["command"])
                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                })
        messages.append({"role": "user", "content": results})        

In under 30 lines, this is the entire agent. The next 11 chapters all stack mechanisms on top of this loop — the loop itself never changes.

Changes

Component Before After

Agent loop (none) while True + stop_reason

Tools (none) bash (single tool)

Messages (none) Accumulating message list

Control flow (none) stop_reason != "tool_use"


s02: Tool Use

s01 > [ s02 ] s03 > s04 > s05 > s06 | s07 > s08 > s09 > s10 > s11 > s12

"Add a tool, just add a handler" — no need to touch the loop; just register the new tool in the dispatch map.

With only bash, all operations go through the shell. cat truncates unpredictably, sed breaks on special characters, and every bash call is an unconstrained security surface. Dedicated tools (read_file, write_file) enable path sandboxing at the tool level.

Key insight: Adding a tool doesn't require changing the loop.

Solution

+--------+      +-------+      +------------------+
|  User  | ---> |  LLM  | ---> | Tool Dispatch    |
| prompt |      |       |      | {                |
+--------+      +---+---+      |   bash: run_bash |
                    ^           |   read: run_read |
                    |           |   write: run_wr  |
                    +-----------+   edit: run_edit  |
                    tool_result | }                |
                                +------------------+        

The dispatch map is a dict: {tool_name: handler_function}. One lookup replaces any if/elif chain.

How It Works

1.Each tool has a handler function. Path sandboxing prevents escaping the workspace.

def safe_path(p: str) -> Path:
    path = (WORKDIR / p).resolve()
    if not path.is_relative_to(WORKDIR):
        raise ValueError(f"Path escapes workspace: {p}")
    return path

def run_read(path: str, limit: int = None) -> str:
    text = safe_path(path).read_text()
    lines = text.splitlines()
    if limit and limit < len(lines):
        lines = lines[:limit]
    return "\n".join(lines)[:50000]        

2. The dispatch map maps tool names to handler functions.

TOOL_HANDLERS = {
    "bash":       lambda **kw: run_bash(kw["command"]),
    "read_file":  lambda **kw: run_read(kw["path"], kw.get("limit")),
    "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
    "edit_file":  lambda **kw: run_edit(kw["path"], kw["old_text"],
                                        kw["new_text"]),
}        

3. Look up the handler by name in the loop. The loop body itself is identical to s01.

for block in response.content:
    if block.type == "tool_use":
        handler = TOOL_HANDLERS.get(block.name)
        output = handler(**block.input) if handler \
            else f"Unknown tool: {block.name}"
        results.append({
            "type": "tool_result",
            "tool_use_id": block.id,
            "content": output,
        })        

Adding a tool = adding a handler + adding a schema. The loop never changes.

Changes from s01

Component Before (s01) After (s02)

Tools 1 (bash only) 4 (bash, read, write, edit)

Dispatch Hardcoded bash call TOOL_HANDLERS dict

Path safety. None safe_path() sandbox

Agent loop Unchanged Unchanged


s03: TodoWrite

s01 > s02 > [ s03 ] s04 > s05 > s06 | s07 > s08 > s09 > s10 > s11 > s12

"An agent without a plan just wanders" — list the steps first, then act, and completion rates double.

In multi-step tasks, the model loses track of progress — repeating finished work, skipping steps, going off track. The longer the conversation, the worse it gets: tool results keep filling the context, and the system prompt's influence gradually gets diluted. A 10-step refactoring might finish steps 1–3 and then start improvising, because steps 4–10 have been pushed out of attention.

Solution

+--------+      +-------+      +---------+
|  User  | ---> |  LLM  | ---> | Tools   |
| prompt |      |       |      | + todo  |
+--------+      +---+---+      +----+----+
                    ^                |
                    |   tool_result  |
                    +----------------+
                          |
              +-----------+-----------+
              | TodoManager state     |
              | [ ] task A            |
              | [>] task B  <- doing  |
              | [x] task C            |
              +-----------------------+
                          |
              if rounds_since_todo >= 3:
                inject <reminder> into tool_result        

How It Works

1.TodoManager stores items with statuses. Only one in_progress is allowed at a time.

class TodoManager:
    def update(self, items: list) -> str:
        validated, in_progress_count = [], 0
        for item in items:
            status = item.get("status", "pending")
            if status == "in_progress":
                in_progress_count += 1
            validated.append({"id": item["id"], "text": item["text"],
                              "status": status})
        if in_progress_count > 1:
            raise ValueError("Only one task can be in_progress")
        self.items = validated
        return self.render()        

2. The todo tool is added to the dispatch map just like any other tool.

TOOL_HANDLERS = {
    # ...base tools...
    "todo": lambda **kw: TODO.update(kw["items"]),
}        

3. Nag reminder: Inject a reminder when the model hasn't called todo for 3+ consecutive rounds.

if rounds_since_todo >= 3 and messages:
    last = messages[-1]
    if last["role"] == "user" and isinstance(last.get("content"), list):
        last["content"].insert(0, {
            "type": "text",
            "text": "<reminder>Update your todos.</reminder>",
        })        

"Only one in_progress at a time" enforces sequential focus. The nag reminder creates accountability pressure — if you don't update the plan, the system keeps asking.

Changes from s02

Component Before (s02) After (s03)

Tools 4 5 (+todo)

Planning None Stateful TodoManager

Nag injection None <reminder> injected after 3 rounds

Agent loop Simple dispatch + rounds_since_todo counter


s04: Subagents

s01 > s02 > s03 > [ s04 ] s05 > s06 | s07 > s08 > s09 > s10 > s11 > s12

"Break big tasks into smaller ones, each with a clean context" — Subagents use independent messages[], keeping the parent conversation clean.

The longer an agent works, the fatter the messages array gets. Every file read and command output permanently stays in the context. "What test framework does this project use?" might require reading 5 files, but the parent agent only needs one word: "pytest."

Solution

Parent agent                     Subagent
+------------------+             +------------------+
| messages=[...]   |             | messages=[]      | <-- fresh
|                  |  dispatch   |                  |
| tool: task       | ----------> | while tool_use:  |
|   prompt="..."   |             |   call tools     |
|                  |  summary    |   append results |
|   result = "..." | <---------- | return last text |
+------------------+             +------------------+

Parent context stays clean. Subagent context is discarded.        

How It Works

1.The parent agent has a task tool. The subagent has all base tools except task (no recursive spawning).

PARENT_TOOLS = CHILD_TOOLS + [
    {"name": "task",
     "description": "Spawn a subagent with fresh context.",
     "input_schema": {
         "type": "object",
         "properties": {"prompt": {"type": "string"}},
         "required": ["prompt"],
     }},
]        

2. The subagent starts with messages=[] and runs its own loop. Only the final text is returned to the parent.

def run_subagent(prompt: str) -> str:
    sub_messages = [{"role": "user", "content": prompt}]
    for _ in range(30):  # safety limit
        response = client.messages.create(
            model=MODEL, system=SUBAGENT_SYSTEM,
            messages=sub_messages,
            tools=CHILD_TOOLS, max_tokens=8000,
        )
        sub_messages.append({"role": "assistant",
                             "content": response.content})
        if response.stop_reason != "tool_use":
            break
        results = []
        for block in response.content:
            if block.type == "tool_use":
                handler = TOOL_HANDLERS.get(block.name)
                output = handler(**block.input)
                results.append({"type": "tool_result",
                    "tool_use_id": block.id,
                    "content": str(output)[:50000]})
        sub_messages.append({"role": "user", "content": results})
    return "".join(
        b.text for b in response.content if hasattr(b, "text")
    ) or "(no summary)"        

The subagent might have run 30+ tool calls, but the entire message history is discarded. All the parent receives is a summary text, returned as an ordinary tool_result.

Changes from s03

Component Before (s03) After (s04)

Tools 5 5 (base) + task (parent only)

Context Single shared Parent + child isolated

Subagent None run_subagent() function

Return value N/A Summary text only


s05: Skills (Skill Loading)

s01 > s02 > s03 > s04 > [ s05 ] s06 | s07 > s08 > s09 > s10 > s11 > s12

"Load knowledge on demand, only when needed" — injected via tool_result, not stuffed into the system prompt.

You want the agent to follow specific domain workflows: git conventions, testing patterns, code review checklists. Cramming everything into the system prompt is wasteful — 10 skills at 2,000 tokens each means 20,000 tokens, most of which are irrelevant to the current task.

Solution

System prompt (Layer 1 -- always present):
+--------------------------------------+
| You are a coding agent.              |
| Skills available:                    |
|   - git: Git workflow helpers        |  ~100 tokens/skill
|   - test: Testing best practices     |
+--------------------------------------+

When model calls load_skill("git"):
+--------------------------------------+
| tool_result (Layer 2 -- on demand):  |
| <skill name="git">                   |
|   Full git workflow instructions...  |  ~2000 tokens
|   Step 1: ...                        |
| </skill>                             |
+--------------------------------------+        

Layer 1: Skill names in the system prompt (low cost). Layer 2: Full content in tool_result on demand.

How It Works

Each skill is a directory containing a SKILL.md file with YAML frontmatter.

skills/
  pdf/
    SKILL.md       # ---\n name: pdf\n description: Process PDF files\n ---\n ...
  code-review/
    SKILL.md       # ---\n name: code-review\n description: Review code\n ---\n ...        

SkillLoader recursively scans SKILL.md files, using the directory name as the skill identifier.

class SkillLoader:
    def __init__(self, skills_dir: Path):
        self.skills = {}
        for f in sorted(skills_dir.rglob("SKILL.md")):
            text = f.read_text()
            meta, body = self._parse_frontmatter(text)
            name = meta.get("name", f.parent.name)
            self.skills[name] = {"meta": meta, "body": body}

    def get_descriptions(self) -> str:
        lines = []
        for name, skill in self.skills.items():
            desc = skill["meta"].get("description", "")
            lines.append(f"  - {name}: {desc}")
        return "\n".join(lines)

    def get_content(self, name: str) -> str:
        skill = self.skills.get(name)
        if not skill:
            return f"Error: Unknown skill '{name}'."
        return f"<skill name=\"{name}\">\n{skill['body']}\n</skill>"        

Layer 1 is written into the system prompt. Layer 2 is just another tool in the dispatch map.

python

SYSTEM = f"""You are a coding agent at {WORKDIR}.
Skills available:
{SKILL_LOADER.get_descriptions()}"""

TOOL_HANDLERS = {
    # ...base tools...
    "load_skill": lambda **kw: SKILL_LOADER.get_content(kw["name"]),
}        

The model knows what skills are available (cheap), and loads full content only when needed (expensive).

Changes from s04

Component Before (s04) After (s05)

Tools 5 (base + task) 5 (base + load_skill)

System prompt Static string + skill description list

Knowledge base None skills/*/SKILL.md files

Injection method None Two layers (system prompt + result)


s06: Context Compact

s01 > s02 > s03 > s04 > s05 > [ s06 ] | s07 > s08 > s09 > s10 > s11 > s12

"Context always fills up — you need a way to make room" — a three-layer compression strategy in exchange for unlimited sessions.

The context window is finite. Reading a 1,000-line file consumes ~4,000 tokens; read 30 files and run 20 commands, and you easily blow past 100k tokens. Without compression, the agent simply can't work on large projects.

Solution

Three layers of compression, with increasing aggressiveness:

Every turn:
+------------------+
| Tool call result |
+------------------+
        |
        v
[Layer 1: micro_compact]        (silent, every turn)
  Replace tool_result > 3 turns old
  with "[Previous: used {tool_name}]"
        |
        v
[Check: tokens > 50000?]
   |               |
   no              yes
   |               |
   v               v
continue    [Layer 2: auto_compact]
              Save transcript to .transcripts/
              LLM summarizes conversation.
              Replace all messages with [summary].
                    |
                    v
            [Layer 3: compact tool]
              Model calls compact explicitly.
              Same summarization as auto_compact.        

How It Works

Layer 1 — micro_compact: Before each LLM call, replace old tool results with placeholders.

python

def micro_compact(messages: list) -> list:
    tool_results = []
    for i, msg in enumerate(messages):
        if msg["role"] == "user" and isinstance(msg.get("content"), list):
            for j, part in enumerate(msg["content"]):
                if isinstance(part, dict) and part.get("type") == "tool_result":
                    tool_results.append((i, j, part))
    if len(tool_results) <= KEEP_RECENT:
        return messages
    for _, _, part in tool_results[:-KEEP_RECENT]:
        if len(part.get("content", "")) > 100:
            part["content"] = f"[Previous: used {tool_name}]"
    return messages        

Layer 2 — auto_compact: When tokens exceed the threshold, save the full conversation to disk and have the LLM summarize.

python

def auto_compact(messages: list) -> list:
    # Save transcript for recovery
    transcript_path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
    with open(transcript_path, "w") as f:
        for msg in messages:
            f.write(json.dumps(msg, default=str) + "\n")
    # LLM summarizes
    response = client.messages.create(
        model=MODEL,
        messages=[{"role": "user", "content":
            "Summarize this conversation for continuity..."
            + json.dumps(messages, default=str)[:80000]}],
        max_tokens=2000,
    )
    return [
        {"role": "user", "content": f"[Compressed]\n\n{response.content[0].text}"},
        {"role": "assistant", "content": "Understood. Continuing."},
    ]        

Layer 3 — manual compact: The compact tool triggers the same summarization mechanism on demand.

Loop integration of all three layers:

python

def agent_loop(messages: list):
    while True:
        micro_compact(messages)                        # Layer 1
        if estimate_tokens(messages) > THRESHOLD:
            messages[:] = auto_compact(messages)       # Layer 2
        response = client.messages.create(...)
        # ... tool execution ...
        if manual_compact:
            messages[:] = auto_compact(messages)       # Layer 3        

The full history is saved to disk via transcripts. Information isn't truly lost — it's just moved out of the active context.

Changes from s05

ComponentBefore (s05)After (s06)Tools55 (base + compact)Context managementNoneThree-layer compressionMicro-compactNoneOld results → placeholdersAuto-compactNoneToken threshold triggerTranscriptsNoneSaved to .transcripts/


s07: Task System

s01 > s02 > s03 > s04 > s05 > s06 | [ s07 ] s08 > s09 > s10 > s11 > s12

"Big goals need to be broken into small tasks, ordered, and saved to disk" — a file-persisted task graph, laying the foundation for multi-agent collaboration.

s03's TodoManager was just a flat list in memory: no ordering, no dependencies, just done-or-not-done status. Real goals have structure — task B depends on task A, tasks C and D can run in parallel, task E must wait for both C and D to complete.

Without explicit relationships, the agent can't tell what's actionable, what's blocked, and what can run concurrently. And the list only lives in memory — once context compression (s06) runs, it's gone.

Solution

Upgrade the flat list to a disk-persisted task graph. Each task is a JSON file with status, predecessors (blockedBy), and successors (blocks). The task graph always answers three questions:

  • What's actionable? — Tasks with pending status and empty blockedBy.
  • What's blocked? — Tasks waiting for predecessors to complete.
  • What's done? — Tasks with completed status, which automatically unblock successors upon completion.

.tasks/
  task_1.json  {"id":1, "status":"completed"}
  task_2.json  {"id":2, "blockedBy":[1], "status":"pending"}
  task_3.json  {"id":3, "blockedBy":[1], "status":"pending"}
  task_4.json  {"id":4, "blockedBy":[2,3], "status":"pending"}

Task Graph (DAG):
                 +----------+
            +--> | task 2   | --+
            |    | pending  |   |
+----------+     +----------+    +--> +----------+
| task 1   |                          | task 4   |
| completed| --> +----------+    +--> | blocked  |
+----------+     | task 3   | --+     +----------+
                 | pending  |
                 +----------+

Ordering:  task 1 must complete before 2 and 3 can start
Parallel:  tasks 2 and 3 can execute simultaneously
Dependency: task 4 waits for both 2 and 3 to complete
States:    pending -> in_progress -> completed        

This task graph is the coordination backbone for all mechanisms from s07 onward: background execution (s08), multi-agent teams (s09+), and worktree isolation (s12) all read from and write to this same structure.

How It Works

TaskManager: One JSON file per task, CRUD + dependency graph.

python

class TaskManager:
    def __init__(self, tasks_dir: Path):
        self.dir = tasks_dir
        self.dir.mkdir(exist_ok=True)
        self._next_id = self._max_id() + 1

    def create(self, subject, description=""):
        task = {"id": self._next_id, "subject": subject,
                "status": "pending", "blockedBy": [],
                "blocks": [], "owner": ""}
        self._save(task)
        self._next_id += 1
        return json.dumps(task, indent=2)        

Dependency clearing: When a task completes, automatically remove its ID from other tasks' blockedBy, unblocking successors.

python

def _clear_dependency(self, completed_id):
    for f in self.dir.glob("task_*.json"):
        task = json.loads(f.read_text())
        if completed_id in task.get("blockedBy", []):
            task["blockedBy"].remove(completed_id)
            self._save(task)        

State transitions + dependency linking: update handles state changes and dependency edges.

def update(self, task_id, status=None,
           add_blocked_by=None, add_blocks=None):
    task = self._load(task_id)
    if status:
        task["status"] = status
        if status == "completed":
            self._clear_dependency(task_id)
    self._save(task)        

Four task tools added to the dispatch map.

TOOL_HANDLERS = {
    # ...base tools...
    "task_create": lambda **kw: TASKS.create(kw["subject"]),
    "task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status")),
    "task_list":   lambda **kw: TASKS.list_all(),
    "task_get":    lambda **kw: TASKS.get(kw["task_id"]),
}        

Starting from s07, the task graph is the default choice for multi-step work. s03's Todo can still be used for quick checklists within a single session.

Changes from s06

ComponentBefore (s06)After (s07)Tools58 (task_create/update/list/get)Planning modelFlat list (memory only)Task graph with dependencies (disk)RelationshipsNoneblockedBy + blocks edgesState trackingDone or not donepending → in_progress → completedPersistenceLost after compressionSurvives compression and restarts


s08: Background Tasks

s01 > s02 > s03 > s04 > s05 > s06 | s07 > [ s08 ] s09 > s10 > s11 > s12

"Throw slow operations to the background, let the agent keep thinking" — background threads run commands and inject notifications when done.

Some commands take several minutes: npm install, pytest, docker build. In a blocking loop, the model can only sit and wait. The user says "install dependencies and also create a config file," but the agent can only do them one at a time.

Solution

Main thread                Background thread
+-----------------+        +-----------------+
| agent loop      |        | subprocess runs |
| ...             |        | ...             |
| [LLM call] <---+------- | enqueue(result) |
|  ^drain queue   |        +-----------------+
+-----------------+

Timeline:
Agent --[spawn A]--[spawn B]--[other work]----
             |          |
             v          v
          [A runs]   [B runs]      (parallel)
             |          |
             +-- results injected before next LLM call --+        

How It Works

BackgroundManager tracks tasks with a thread-safe notification queue.

class BackgroundManager:
    def __init__(self):
        self.tasks = {}
        self._notification_queue = []
        self._lock = threading.Lock()        

run() starts a daemon thread and returns immediately.

def run(self, command: str) -> str:
    task_id = str(uuid.uuid4())[:8]
    self.tasks[task_id] = {"status": "running", "command": command}
    thread = threading.Thread(
        target=self._execute, args=(task_id, command), daemon=True)
    thread.start()
    return f"Background task {task_id} started"        

When the subprocess completes, results enter the notification queue.

def _execute(self, task_id, command):
    try:
        r = subprocess.run(command, shell=True, cwd=WORKDIR,
            capture_output=True, text=True, timeout=300)
        output = (r.stdout + r.stderr).strip()[:50000]
    except subprocess.TimeoutExpired:
        output = "Error: Timeout (300s)"
    with self._lock:
        self._notification_queue.append({
            "task_id": task_id, "result": output[:500]})        

Drain the notification queue before each LLM call.

def agent_loop(messages: list):
    while True:
        notifs = BG.drain_notifications()
        if notifs:
            notif_text = "\n".join(
                f"[bg:{n['task_id']}] {n['result']}" for n in notifs)
            messages.append({"role": "user",
                "content": f"<background-results>\n{notif_text}\n"
                           f"</background-results>"})
            messages.append({"role": "assistant",
                "content": "Noted background results."})
        response = client.messages.create(...)        

The loop stays single-threaded. Only subprocess I/O is parallelized.

Changes from s07

ComponentBefore (s07)After (s08)Tools86 (base + background_run + check)Execution modeBlocking onlyBlocking + background threadsNotification mechanismNoneQueue drained each turnConcurrencyNoneDaemon threads


s09: Agent Teams

s01 > s02 > s03 > s04 > s05 > s06 | s07 > s08 > [ s09 ] s10 > s11 > s12

"Tasks too big for one agent — delegate to teammates" — persistent teammates + JSONL mailboxes.

Subagents (s04) are ephemeral: spawn, do work, return summary, die. No identity, no memory across invocations. Background tasks (s08) can run shell commands but can't make LLM-guided decisions.

Real team collaboration requires three things: (1) persistent agents that survive across multiple conversation turns, (2) identity and lifecycle management, (3) communication channels between agents.

Solution

Teammate lifecycle:
  spawn -> WORKING -> IDLE -> WORKING -> ... -> SHUTDOWN

Communication:
  .team/
    config.json           <- team roster + statuses
    inbox/
      alice.jsonl         <- append-only, drain-on-read
      bob.jsonl
      lead.jsonl

              +--------+    send("alice","bob","...")    +--------+
              | alice  | -----------------------------> |  bob   |
              | loop   |    bob.jsonl << {json_line}    |  loop  |
              +--------+                                +--------+
                   ^                                         |
                   |        BUS.read_inbox("alice")          |
                   +---- alice.jsonl -> read + drain ---------+        

How It Works

TeammateManager maintains a team roster via config.json.

class TeammateManager:
    def __init__(self, team_dir: Path):
        self.dir = team_dir
        self.dir.mkdir(exist_ok=True)
        self.config_path = self.dir / "config.json"
        self.config = self._load_config()
        self.threads = {}        

spawn() creates a teammate and starts an agent loop in a thread.

def spawn(self, name: str, role: str, prompt: str) -> str:
    member = {"name": name, "role": role, "status": "working"}
    self.config["members"].append(member)
    self._save_config()
    thread = threading.Thread(
        target=self._teammate_loop,
        args=(name, role, prompt), daemon=True)
    thread.start()
    return f"Spawned teammate '{name}' (role: {role})"        

MessageBus: append-only JSONL inboxes. send() appends a line; read_inbox() reads all and clears.

class MessageBus:
    def send(self, sender, to, content, msg_type="message", extra=None):
        msg = {"type": msg_type, "from": sender,
               "content": content, "timestamp": time.time()}
        if extra:
            msg.update(extra)
        with open(self.dir / f"{to}.jsonl", "a") as f:
            f.write(json.dumps(msg) + "\n")

    def read_inbox(self, name):
        path = self.dir / f"{name}.jsonl"
        if not path.exists(): return "[]"
        msgs = [json.loads(l) for l in path.read_text().strip().splitlines() if l]
        path.write_text("")  # drain
        return json.dumps(msgs, indent=2)        

Each teammate checks their inbox before every LLM call, injecting messages into context.

def _teammate_loop(self, name, role, prompt):
    messages = [{"role": "user", "content": prompt}]
    for _ in range(50):
        inbox = BUS.read_inbox(name)
        if inbox != "[]":
            messages.append({"role": "user",
                "content": f"<inbox>{inbox}</inbox>"})
            messages.append({"role": "assistant",
                "content": "Noted inbox messages."})
        response = client.messages.create(...)
        if response.stop_reason != "tool_use":
            break
        # execute tools, append results...
    self._find_member(name)["status"] = "idle"        

Changes from s08

ComponentBefore (s08)After (s09)Tools69 (+spawn/send/read_inbox)Agent countSingleLead + N teammatesPersistenceNoneconfig.json + JSONL inboxesThreadsBackground commandsFull agent loop per threadLifecycleEphemeralidle → working → idleCommunicationNonemessage + broadcast


s10: Team Protocols

s01 > s02 > s03 > s04 > s05 > s06 | s07 > s08 > s09 > [ s10 ] s11 > s12

"Teammates need unified communication rules" — a single request-response pattern drives all negotiations.

In s09 teammates can work and communicate, but lack structured coordination:

Shutdown: Killing a thread directly leaves half-written files and stale config.json. A handshake is needed — the lead requests, the teammate approves (finishes up and exits) or rejects (keeps working).

Plan approval: The lead says "refactor the auth module," and the teammate starts immediately. High-risk changes should go through review first.

Both have the same structure: one party sends a request with a unique ID, the other responds referencing the same ID.

Solution

Shutdown Protocol            Plan Approval Protocol
==================           ======================

Lead             Teammate    Teammate           Lead
  |                 |           |                 |
  |--shutdown_req-->|           |--plan_req------>|
  | {req_id:"abc"}  |           | {req_id:"xyz"}  |
  |                 |           |                 |
  |<--shutdown_resp-|           |<--plan_resp-----|
  | {req_id:"abc",  |           | {req_id:"xyz",  |
  |  approve:true}  |           |  approve:true}  |

Shared FSM:
  [pending] --approve--> [approved]
  [pending] --reject---> [rejected]

Trackers:
  shutdown_requests = {req_id: {target, status}}
  plan_requests     = {req_id: {from, plan, status}}        

How It Works

The lead generates a request_id and sends a shutdown request via inbox.

shutdown_requests = {}

def handle_shutdown_request(teammate: str) -> str:
    req_id = str(uuid.uuid4())[:8]
    shutdown_requests[req_id] = {"target": teammate, "status": "pending"}
    BUS.send("lead", teammate, "Please shut down gracefully.",
             "shutdown_request", {"request_id": req_id})
    return f"Shutdown request {req_id} sent (status: pending)"        

The teammate receives the request and responds with approve/reject.

if tool_name == "shutdown_response":
    req_id = args["request_id"]
    approve = args["approve"]
    shutdown_requests[req_id]["status"] = "approved" if approve else "rejected"
    BUS.send(sender, "lead", args.get("reason", ""),
             "shutdown_response",
             {"request_id": req_id, "approve": approve})        

Plan approval follows the exact same pattern. The teammate submits a plan (generates request_id), the lead reviews (references the same request_id).

plan_requests = {}

def handle_plan_review(request_id, approve, feedback=""):
    req = plan_requests[request_id]
    req["status"] = "approved" if approve else "rejected"
    BUS.send("lead", req["from"], feedback,
             "plan_approval_response",
             {"request_id": request_id, "approve": approve})        

One FSM, two uses. The same pending → approved | rejected state machine can be applied to any request-response protocol.

Changes from s09

ComponentBefore (s09)After (s10)Tools912 (+shutdown_req/resp +plan)ShutdownNatural exit onlyRequest-response handshakePlan gatingNoneSubmit/review and approvalCorrelationNoneOne request_id per requestFSMNonepending → approved/rejected


s11: Autonomous Agents

s01 > s02 > s03 > s04 > s05 > s06 | s07 > s08 > s09 > s10 > [ s11 ] s12

"Teammates check the board themselves and claim available work" — no need for the lead to assign one by one; self-organizing.

In s09–s10, teammates only act when explicitly assigned. The lead has to write a prompt for each teammate, and 10 unclaimed tasks on the board must be manually distributed. This doesn't scale.

True autonomy: teammates scan the task board themselves, claim unassigned tasks, finish them, and look for the next one.

One detail: after context compression (s06), an agent might forget who it is. Identity re-injection solves this.

Solution

Teammate lifecycle with idle cycle:

+-------+
| spawn |
+---+---+
    |
    v
+-------+   tool_use     +-------+
| WORK  | <------------- |  LLM  |
+---+---+                +-------+
    |
    | stop_reason != tool_use (or idle tool called)
    v
+--------+
|  IDLE  |  poll every 5s for up to 60s
+---+----+
    |
    +---> check inbox --> message? ----------> WORK
    |
    +---> scan .tasks/ --> unclaimed? -------> claim -> WORK
    |
    +---> 60s timeout ----------------------> SHUTDOWN

Identity re-injection after compression:
if len(messages) <= 3:
    messages.insert(0, identity_block)        

How It Works

The teammate loop has two phases: WORK and IDLE. When the LLM stops calling tools (or calls idle), it enters IDLE.

def _loop(self, name, role, prompt):
    while True:
        # -- WORK PHASE --
        messages = [{"role": "user", "content": prompt}]
        for _ in range(50):
            response = client.messages.create(...)
            if response.stop_reason != "tool_use":
                break
            # execute tools...
            if idle_requested:
                break

        # -- IDLE PHASE --
        self._set_status(name, "idle")
        resume = self._idle_poll(name, messages)
        if not resume:
            self._set_status(name, "shutdown")
            return
        self._set_status(name, "working")        

The idle phase polls the inbox and task board in a loop.

def _idle_poll(self, name, messages):
    for _ in range(IDLE_TIMEOUT // POLL_INTERVAL):  # 60s / 5s = 12
        time.sleep(POLL_INTERVAL)
        inbox = BUS.read_inbox(name)
        if inbox:
            messages.append({"role": "user",
                "content": f"<inbox>{inbox}</inbox>"})
            return True
        unclaimed = scan_unclaimed_tasks()
        if unclaimed:
            claim_task(unclaimed[0]["id"], name)
            messages.append({"role": "user",
                "content": f"<auto-claimed>Task #{unclaimed[0]['id']}: "
                           f"{unclaimed[0]['subject']}</auto-claimed>"})
            return True
    return False  # timeout -> shutdown        

Task board scanning: Find tasks that are pending, have no owner, and aren't blocked.

def scan_unclaimed_tasks() -> list:
    unclaimed = []
    for f in sorted(TASKS_DIR.glob("task_*.json")):
        task = json.loads(f.read_text())
        if (task.get("status") == "pending"
                and not task.get("owner")
                and not task.get("blockedBy")):
            unclaimed.append(task)
    return unclaimed        

Identity re-injection: When context is too short (indicating compression occurred), insert an identity block at the beginning.

if len(messages) <= 3:
    messages.insert(0, {"role": "user",
        "content": f"<identity>You are '{name}', role: {role}, "
                   f"team: {team_name}. Continue your work.</identity>"})
    messages.insert(1, {"role": "assistant",
        "content": f"I am {name}. Continuing."})        

Changes from s10

Component Before (s10) After (s11)

Tools 12 14 (+idle, +claim_task)

Autonomy Lead-assigned Self-organizing

Idle phase None Poll inbox + task board

Task claiming Manual only Auto-claim unassigned tasks

Identity System prompt + re-injection after compression

Timeout None 60s idle → auto-shutdown

--- ## s12: Worktree + Task Isolation s01 > s02 > s03 > s04 > s05 > s06 | s07 > s08 > s09 > s10 > s11 > [ s12 ] "Each works in their own directory, no interference" — tasks manage the goal, worktrees manage the directory, bound by ID. By s11, agents can autonomously claim and complete tasks. But all tasks share a single directory. Two agents simultaneously refactoring different modules — A edits config.py, B also edits config.py — uncommitted changes pollute each other, and neither can cleanly rollback. The task board manages "what to do" but not "where to do it." The solution: give each task an independent git worktree directory, linking the two sides by task ID. ### Solution ``` Control plane (.tasks/) Execution plane (.worktrees/) +------------------+ +------------------------+ | task_1.json | | auth-refactor/ | | status: in_progress <------> branch: wt/auth-refactor | worktree: "auth-refactor" | task_id: 1 | +------------------+ +------------------------+ | task_2.json | | ui-login/ | | status: pending <------> branch: wt/ui-login | worktree: "ui-login" | task_id: 2 | +------------------+ +------------------------+ | index.json (worktree registry) events.jsonl (lifecycle log) State machines: Task: pending -> in_progress -> completed Worktree: absent -> active -> removed | kept

How It Works

Create a task. Persist the goal first.

python

TASKS.create("Implement auth refactor")
# -> .tasks/task_1.json  status=pending  worktree=""        

Create a worktree and bind it to a task. Passing task_id automatically advances the task to in_progress.

python

WORKTREES.create("auth-refactor", task_id=1)
# -> git worktree add -b wt/auth-refactor .worktrees/auth-refactor HEAD
# -> index.json gets new entry, task_1.json gets worktree="auth-refactor"        

Binding writes state to both sides:

python

def bind_worktree(self, task_id, worktree):
    task = self._load(task_id)
    task["worktree"] = worktree
    if task["status"] == "pending":
        task["status"] = "in_progress"
    self._save(task)        

Execute commands in the worktree. cwd points to the isolated directory.

python

subprocess.run(command, shell=True, cwd=worktree_path,
               capture_output=True, text=True, timeout=300)        

Teardown. Two options:

  • worktree_keep(name) — keep the directory for later use.
  • worktree_remove(name, complete_task=True) — delete the directory, complete the bound task, and emit an event. One call handles both teardown and completion.

python

def remove(self, name, force=False, complete_task=False):
    self._run_git(["worktree", "remove", wt["path"]])
    if complete_task and wt.get("task_id") is not None:
        self.tasks.update(wt["task_id"], status="completed")
        self.tasks.unbind_worktree(wt["task_id"])
        self.events.emit("task.completed", ...)        

Event stream. Each lifecycle step writes to .worktrees/events.jsonl:

json

{
  "event": "worktree.remove.after",
  "task": {"id": 1, "status": "completed"},
  "worktree": {"name": "auth-refactor", "status": "removed"},
  "ts": 1730000000
}        

Event types: worktree.create.before/after/failed, worktree.remove.before/after/failed, worktree.keep, task.completed.

After a crash, the scene is reconstructed from .tasks/ + .worktrees/index.json. Session memory is volatile; disk state is persistent.

Changes from s11

ComponentBefore (s11)After (s12)CoordinationTask board (owner/status)Task board + explicit worktree bindingExecution scopeShared directoryIndependent directory per taskRecoverabilityTask state onlyTask state + worktree indexTeardownTask completionTask completion + explicit keep/removeLifecycle visibilityImplicit logs.worktrees/events.jsonl explicit event stream

To view or add a comment, sign in

More articles by Bei-Bei Wang

Explore content categories