How multiple tools run in parallel while results stay in order
import type { ToolUseBlock } from '@anthropic-ai/sdk/resources/index.mjs'import {createUserMessage,
REJECT_MESSAGE,
withMemoryCorrectionHint,
} from 'src/utils/messages.js'
import type { CanUseToolFn } from '../../hooks/useCanUseTool.js'import { findToolByName, type Tools, type ToolUseContext } from '../../Tool.js'import { BASH_TOOL_NAME } from '../../tools/BashTool/toolName.js'import type { AssistantMessage, Message } from '../../types/message.js'import { createChildAbortController } from '../../utils/abortController.js'import { runToolUse } from './toolExecution.js'type MessageUpdate = {message?: Message
newContext?: ToolUseContext
}
type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded'
type TrackedTool = {id: string
block: ToolUseBlock
assistantMessage: AssistantMessage
status: ToolStatus
isConcurrencySafe: boolean
promise?: Promise<void>
results?: Message[]
// Progress messages are stored separately and yielded immediately
pendingProgress: Message[]
contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>
}
/**
* Executes tools as they stream in with concurrency control.
* - Concurrent-safe tools can execute in parallel with other concurrent-safe tools
* - Non-concurrent tools must execute alone (exclusive access)
* - Results are buffered and emitted in the order tools were received
*/
export class StreamingToolExecutor {private tools: TrackedTool[] = []
private toolUseContext: ToolUseContext
private hasErrored = false
private erroredToolDescription = ''
// Child of toolUseContext.abortController. Fires when a Bash tool errors
// so sibling subprocesses die immediately instead of running to completion.
// Aborting this does NOT abort the parent — query.ts won't end the turn.
private siblingAbortController: AbortController
private discarded = false
// Signal to wake up getRemainingResults when progress is available
private progressAvailableResolve?: () => void
constructor(
private readonly toolDefinitions: Tools,
private readonly canUseTool: CanUseToolFn,
toolUseContext: ToolUseContext,
) {this.toolUseContext = toolUseContext
this.siblingAbortController = createChildAbortController(
toolUseContext.abortController,
)
}
/**
* Discards all pending and in-progress tools. Called when streaming fallback
* occurs and results from the failed attempt should be abandoned.
* Queued tools won't start, and in-progress tools will receive synthetic errors.
*/
discard(): void {this.discarded = true
}
/**
* Add a tool to the execution queue. Will start executing immediately if conditions allow.
*/
addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void {const toolDefinition = findToolByName(this.toolDefinitions, block.name)
if (!toolDefinition) { this.tools.push({id: block.id,
block,
assistantMessage,
status: 'completed',
isConcurrencySafe: true,
pendingProgress: [],
results: [
createUserMessage({content: [
{type: 'tool_result',
content: `<tool_use_error>Error: No such tool available: ${block.name}</tool_use_error>`,is_error: true,
tool_use_id: block.id,
},
],
toolUseResult: `Error: No such tool available: ${block.name}`,sourceToolAssistantUUID: assistantMessage.uuid,
}),
],
})
return
}
const parsedInput = toolDefinition.inputSchema.safeParse(block.input)
const isConcurrencySafe = parsedInput?.success
? (() => { try {return Boolean(toolDefinition.isConcurrencySafe(parsedInput.data))
} catch {return false
}
})()
: false
this.tools.push({id: block.id,
block,
assistantMessage,
status: 'queued',
isConcurrencySafe,
pendingProgress: [],
StreamingToolExecutor solves a hard problem: Claude can invoke multiple tools simultaneously in a single response. Some are safe to parallelize (read two files at once); others are not (writing a file while also writing another). This class manages that.
The state machine is key: every tool moves through `queued → executing → completed → yielded`. Results buffer until all preceding tools complete, ensuring the conversation log stays in the same order Claude intended.
When a Bash tool errors, the siblingAbortController is fired. This is a child of the parent abort controller, so sibling processes get killed but the overall query turn continues — Claude still gets the error result.
Progress messages (like "Running bash...") are yielded immediately to the UI, bypassing the result buffer. The result itself waits for ordering; the progress notification does not.
Ask anything about Streaming & Concurrent Tool Execution
Powered by Groq · Enter to send, Shift+Enter for newline