Chapter 4 System Design

Streaming & Concurrent Tool Execution

How multiple tools run in parallel while results stay in order

src/services/tools/StreamingToolExecutor.tsLines 1120
1
import type { ToolUseBlock } from '@anthropic-ai/sdk/resources/index.mjs'
2
import {
3
  createUserMessage,
4
  REJECT_MESSAGE,
5
  withMemoryCorrectionHint,
6
} from 'src/utils/messages.js'
7
import type { CanUseToolFn } from '../../hooks/useCanUseTool.js'
8
import { findToolByName, type Tools, type ToolUseContext } from '../../Tool.js'
9
import { BASH_TOOL_NAME } from '../../tools/BashTool/toolName.js'
10
import type { AssistantMessage, Message } from '../../types/message.js'
11
import { createChildAbortController } from '../../utils/abortController.js'
12
import { runToolUse } from './toolExecution.js'
13
 
14
type MessageUpdate = {
15
  message?: Message
16
  newContext?: ToolUseContext
17
}
18
 
19
type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded'
20
 
21
type TrackedTool = {
22
  id: string
23
  block: ToolUseBlock
24
  assistantMessage: AssistantMessage
25
  status: ToolStatus
26
  isConcurrencySafe: boolean
27
  promise?: Promise<void>
28
  results?: Message[]
29
  // Progress messages are stored separately and yielded immediately
30
  pendingProgress: Message[]
31
  contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>
32
}
33
 
34
/**
35
 * Executes tools as they stream in with concurrency control.
36
 * - Concurrent-safe tools can execute in parallel with other concurrent-safe tools
37
 * - Non-concurrent tools must execute alone (exclusive access)
38
 * - Results are buffered and emitted in the order tools were received
39
 */
40
export class StreamingToolExecutor {
41
  private tools: TrackedTool[] = []
42
  private toolUseContext: ToolUseContext
43
  private hasErrored = false
44
  private erroredToolDescription = ''
45
  // Child of toolUseContext.abortController. Fires when a Bash tool errors
46
  // so sibling subprocesses die immediately instead of running to completion.
47
  // Aborting this does NOT abort the parent — query.ts won't end the turn.
48
  private siblingAbortController: AbortController
49
  private discarded = false
50
  // Signal to wake up getRemainingResults when progress is available
51
  private progressAvailableResolve?: () => void
52
 
53
  constructor(
54
    private readonly toolDefinitions: Tools,
55
    private readonly canUseTool: CanUseToolFn,
56
    toolUseContext: ToolUseContext,
57
  ) {
58
    this.toolUseContext = toolUseContext
59
    this.siblingAbortController = createChildAbortController(
60
      toolUseContext.abortController,
61
    )
62
  }
63
 
64
  /**
65
   * Discards all pending and in-progress tools. Called when streaming fallback
66
   * occurs and results from the failed attempt should be abandoned.
67
   * Queued tools won't start, and in-progress tools will receive synthetic errors.
68
   */
69
  discard(): void {
70
    this.discarded = true
71
  }
72
 
73
  /**
74
   * Add a tool to the execution queue. Will start executing immediately if conditions allow.
75
   */
76
  addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void {
77
    const toolDefinition = findToolByName(this.toolDefinitions, block.name)
78
    if (!toolDefinition) {
79
      this.tools.push({
80
        id: block.id,
81
        block,
82
        assistantMessage,
83
        status: 'completed',
84
        isConcurrencySafe: true,
85
        pendingProgress: [],
86
        results: [
87
          createUserMessage({
88
            content: [
89
              {
90
                type: 'tool_result',
91
                content: `<tool_use_error>Error: No such tool available: ${block.name}</tool_use_error>`,
92
                is_error: true,
93
                tool_use_id: block.id,
94
              },
95
            ],
96
            toolUseResult: `Error: No such tool available: ${block.name}`,
97
            sourceToolAssistantUUID: assistantMessage.uuid,
98
          }),
99
        ],
100
      })
101
      return
102
    }
103
 
104
    const parsedInput = toolDefinition.inputSchema.safeParse(block.input)
105
    const isConcurrencySafe = parsedInput?.success
106
      ? (() => {
107
          try {
108
            return Boolean(toolDefinition.isConcurrencySafe(parsedInput.data))
109
          } catch {
110
            return false
111
          }
112
        })()
113
      : false
114
    this.tools.push({
115
      id: block.id,
116
      block,
117
      assistantMessage,
118
      status: 'queued',
119
      isConcurrencySafe,
120
      pendingProgress: [],
Annotations (click the dots)

StreamingToolExecutor solves a hard problem: Claude can invoke multiple tools simultaneously in a single response. Some are safe to parallelize (read two files at once); others are not (writing a file while also writing another). This class manages that.

🔑Key Insight

The state machine is key: every tool moves through `queued → executing → completed → yielded`. Results buffer until all preceding tools complete, ensuring the conversation log stays in the same order Claude intended.

When a Bash tool errors, the siblingAbortController is fired. This is a child of the parent abort controller, so sibling processes get killed but the overall query turn continues — Claude still gets the error result.

⚠️Warning

Progress messages (like "Running bash...") are yielded immediately to the UI, bypassing the result buffer. The result itself waits for ordering; the progress notification does not.

KEY TAKEAWAYS
  • Concurrent-safe tools (like multiple file reads) run in true parallel
  • Non-concurrent tools (like file writes) get exclusive access — no sibling tools run
  • Results are buffered and yielded in receive order, not completion order
  • Progress messages stream immediately without waiting for the result buffer
  • A sibling AbortController kills related subprocesses when one Bash tool errors
AI Assistant

Ask anything about Streaming & Concurrent Tool Execution

Powered by Groq · Enter to send, Shift+Enter for newline