Managing parallel tool execution with result ordering guarantees while streaming progress to the UI in real time.
When Claude calls multiple tools in one response, StreamingToolExecutor manages parallel execution while guaranteeing that results are yielded in the order Claude intended — even if tool B finishes before tool A.
The state machine is the core insight: each tool moves through `queued → executing → completed → yielded`. A tool can only be `yielded` once all tools before it in the queue are also complete.
Non-concurrent-safe tools (writes, destructive ops) run exclusively — no other tool executes while they run. This prevents race conditions when two tools might modify the same file.
TrackedTool type + StreamingToolExecutor class showing the state machine and concurrency control.
type MessageUpdate = {
message?: Message
newContext?: ToolUseContext
}
type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded'
type TrackedTool = {
id: string
block: ToolUseBlock
assistantMessage: AssistantMessage
status: ToolStatus
isConcurrencySafe: boolean
promise?: Promise<void>
results?: Message[]
// Progress messages are stored separately and yielded immediately
pendingProgress: Message[]
contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>
}
/**
* Executes tools as they stream in with concurrency control.
* - Concurrent-safe tools can execute in parallel with other concurrent-safe tools
* - Non-concurrent tools must execute alone (exclusive access)
* - Results are buffered and emitted in the order tools were received
*/
export class StreamingToolExecutor {
private tools: TrackedTool[] = []
private toolUseContext: ToolUseContext
private hasErrored = false
private erroredToolDescription = ''
// Child of toolUseContext.abortController. Fires when a Bash tool errors
// so sibling subprocesses die immediately instead of running to completion.
// Aborting this does NOT abort the parent — query.ts won't end the turn.
private siblingAbortController: AbortController
private discarded = false
// Signal to wake up getRemainingResults when progress is available
private progressAvailableResolve?: () => void
constructor(
private readonly toolDefinitions: Tools,
private readonly canUseTool: CanUseToolFn,
toolUseContext: ToolUseContext,
) {
this.toolUseContext = toolUseContext
this.siblingAbortController = createChildAbortController(
toolUseContext.abortController,
)
}
/**
* Discards all pending and in-progress tools. Called when streaming fallback
* occurs and results from the failed attempt should be abandoned.
* Queued tools won't start, and in-progress tools will receive synthetic errors.
*/
discard(): void {
this.discarded = true
}
/**
* Add a tool to the execution queue. Will start executing immediately if conditions allow.
*/
addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void {
const toolDefinition = findToolByName(this.toolDefinitions, block.name)
if (!toolDefinition) {
this.tools.push({
id: block.id,Ask anything about Streaming & Concurrent Execution
Powered by Groq · Enter to send, Shift+Enter for newline