Concurrency Concurrent Pool + State Machine See in Code Tour

Streaming & Concurrent Execution

Managing parallel tool execution with result ordering guarantees while streaming progress to the UI in real time.

Streaming & Concurrent Execution — Architecture Diagram
stateDiagram-v2
    [*] --> queued: addTool()
    queued --> executing: concurrency OK
    executing --> completed: result ready
    completed --> yielded: predecessors done
    yielded --> [*]

Mermaid diagram definition

Deep Dive

When Claude calls multiple tools in one response, StreamingToolExecutor manages parallel execution while guaranteeing that results are yielded in the order Claude intended — even if tool B finishes before tool A.

🔑Key Insight

The state machine is the core insight: each tool moves through `queued → executing → completed → yielded`. A tool can only be `yielded` once all tools before it in the queue are also complete.

⚠️Warning

Non-concurrent-safe tools (writes, destructive ops) run exclusively — no other tool executes while they run. This prevents race conditions when two tools might modify the same file.

KEY TAKEAWAYS
  • State machines enforce ordering in concurrent systems
  • Progress messages bypass the result buffer for instant UI feedback
  • Sibling abort controllers propagate failures without stopping the parent
  • Concurrency-safe classification is per-tool, not global

Source Code

TrackedTool type + StreamingToolExecutor class showing the state machine and concurrency control.

type MessageUpdate = {
  message?: Message
  newContext?: ToolUseContext
}

type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded'

type TrackedTool = {
  id: string
  block: ToolUseBlock
  assistantMessage: AssistantMessage
  status: ToolStatus
  isConcurrencySafe: boolean
  promise?: Promise<void>
  results?: Message[]
  // Progress messages are stored separately and yielded immediately
  pendingProgress: Message[]
  contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>
}

/**
 * Executes tools as they stream in with concurrency control.
 * - Concurrent-safe tools can execute in parallel with other concurrent-safe tools
 * - Non-concurrent tools must execute alone (exclusive access)
 * - Results are buffered and emitted in the order tools were received
 */
export class StreamingToolExecutor {
  private tools: TrackedTool[] = []
  private toolUseContext: ToolUseContext
  private hasErrored = false
  private erroredToolDescription = ''
  // Child of toolUseContext.abortController. Fires when a Bash tool errors
  // so sibling subprocesses die immediately instead of running to completion.
  // Aborting this does NOT abort the parent — query.ts won't end the turn.
  private siblingAbortController: AbortController
  private discarded = false
  // Signal to wake up getRemainingResults when progress is available
  private progressAvailableResolve?: () => void

  constructor(
    private readonly toolDefinitions: Tools,
    private readonly canUseTool: CanUseToolFn,
    toolUseContext: ToolUseContext,
  ) {
    this.toolUseContext = toolUseContext
    this.siblingAbortController = createChildAbortController(
      toolUseContext.abortController,
    )
  }

  /**
   * Discards all pending and in-progress tools. Called when streaming fallback
   * occurs and results from the failed attempt should be abandoned.
   * Queued tools won't start, and in-progress tools will receive synthetic errors.
   */
  discard(): void {
    this.discarded = true
  }

  /**
   * Add a tool to the execution queue. Will start executing immediately if conditions allow.
   */
  addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void {
    const toolDefinition = findToolByName(this.toolDefinitions, block.name)
    if (!toolDefinition) {
      this.tools.push({
        id: block.id,
AI Assistant

Ask anything about Streaming & Concurrent Execution

Powered by Groq · Enter to send, Shift+Enter for newline