Managing parallel tool execution with result ordering guarantees while streaming progress to the UI in real time.
stateDiagram-v2
[*] --> queued: addTool()
queued --> executing: concurrency OK
executing --> completed: result ready
completed --> yielded: predecessors done
yielded --> [*]Mermaid diagram definition
When Claude calls multiple tools in one response, StreamingToolExecutor manages parallel execution while guaranteeing that results are yielded in the order Claude intended — even if tool B finishes before tool A.
The state machine is the core insight: each tool moves through `queued → executing → completed → yielded`. A tool can only be `yielded` once all tools before it in the queue are also complete.
Non-concurrent-safe tools (writes, destructive ops) run exclusively — no other tool executes while they run. This prevents race conditions when two tools might modify the same file.
TrackedTool type + StreamingToolExecutor class showing the state machine and concurrency control.
type MessageUpdate = {
message?: Message
newContext?: ToolUseContext
}
type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded'
type TrackedTool = {
id: string
block: ToolUseBlock
assistantMessage: AssistantMessage
status: ToolStatus
isConcurrencySafe: boolean
promise?: Promise<void>
results?: Message[]
// Progress messages are stored separately and yielded immediately
pendingProgress: Message[]
contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>
}
/**
* Executes tools as they stream in with concurrency control.
* - Concurrent-safe tools can execute in parallel with other concurrent-safe tools
* - Non-concurrent tools must execute alone (exclusive access)
* - Results are buffered and emitted in the order tools were received
*/
export class StreamingToolExecutor {
private tools: TrackedTool[] = []
private toolUseContext: ToolUseContext
private hasErrored = false
private erroredToolDescription = ''
// Child of toolUseContext.abortController. Fires when a Bash tool errors
// so sibling subprocesses die immediately instead of running to completion.
// Aborting this does NOT abort the parent — query.ts won't end the turn.
private siblingAbortController: AbortController
private discarded = false
// Signal to wake up getRemainingResults when progress is available
private progressAvailableResolve?: () => void
constructor(
private readonly toolDefinitions: Tools,
private readonly canUseTool: CanUseToolFn,
toolUseContext: ToolUseContext,
) {
this.toolUseContext = toolUseContext
this.siblingAbortController = createChildAbortController(
toolUseContext.abortController,
)
}
/**
* Discards all pending and in-progress tools. Called when streaming fallback
* occurs and results from the failed attempt should be abandoned.
* Queued tools won't start, and in-progress tools will receive synthetic errors.
*/
discard(): void {
this.discarded = true
}
/**
* Add a tool to the execution queue. Will start executing immediately if conditions allow.
*/
addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void {
const toolDefinition = findToolByName(this.toolDefinitions, block.name)
if (!toolDefinition) {
this.tools.push({
id: block.id,Ask anything about Streaming & Concurrent Execution
Powered by Groq · Enter to send, Shift+Enter for newline