fix: Store messages even if stream is undrained (#32119)

This commit is contained in:
yehorkardash
2026-06-12 18:42:20 +03:00
committed by GitHub
parent ac197878d9
commit f2da1d14bb
11 changed files with 830 additions and 149 deletions
@@ -0,0 +1,133 @@
import { expect, it } from 'vitest';
import { z } from 'zod';
import { describeIf, getModel, collectStreamChunks, chunksOfType } from './helpers';
import { Agent, Memory, Tool } from '../../index';
const describe = describeIf('anthropic');
/**
* Integration tests for error resilience in processToolCall transforms.
*
* Two bugs under test:
*
* 1. toModelOutput throwing during a normal tool call — the agent should treat it
* as a tool error and let the LLM self-correct, not crash.
*
* 2. toModelOutput throwing during a RESUMED tool call — without the fix, this
* causes iteratePendingToolCallsConcurrent to propagate the rejection, which
* closes the stream via closeStreamWithError and skips memory persistence.
* With the fix, the error is captured and the stream continues normally.
*/
describe('tool transform error resilience', () => {
it('toModelOutput throwing during normal tool call — agent continues and finishes cleanly', async () => {
const fetchTool = new Tool('fetch_data')
.description('Fetch some data. Always succeeds but transform throws.')
.input(z.object({ id: z.string().describe('Resource ID') }))
.handler(async ({ id }) => ({ id, value: 42 }))
.toModelOutput(() => {
throw new Error('toModelOutput failed intentionally');
});
const agent = new Agent('transform-error-normal-test')
.model(getModel('anthropic'))
.instructions(
'You are a data fetcher. Use fetch_data to get data. ' +
'If the tool fails with an error, acknowledge it and say what happened. Be concise.',
)
.tool(fetchTool);
const result = await agent.generate('Fetch data with id "test-1"');
// The agent must not throw — it should continue and acknowledge the error
expect(result.finishReason).toBe('stop');
expect(result.messages.length).toBeGreaterThan(0);
});
it('toModelOutput throwing during normal tool call in stream() — stream finishes with stop, not error', async () => {
const fetchTool = new Tool('fetch_data')
.description('Fetch some data. Transform always throws.')
.input(z.object({ id: z.string().describe('Resource ID') }))
.handler(async ({ id }) => ({ id, value: 99 }))
.toModelOutput(() => {
throw new Error('toModelOutput failed in stream');
});
const agent = new Agent('transform-error-stream-test')
.model(getModel('anthropic'))
.instructions(
'You are a data fetcher. Use fetch_data to get data. ' +
'If the tool errors, acknowledge it. Be concise.',
)
.tool(fetchTool);
const { stream } = await agent.stream('Fetch data with id "stream-1"');
const chunks = await collectStreamChunks(stream);
const finishChunks = chunksOfType(chunks, 'finish');
expect(finishChunks.length).toBeGreaterThan(0);
expect(finishChunks[finishChunks.length - 1].finishReason).not.toBe('error');
});
it('toModelOutput throwing during resumed tool call — stream does not close with error and messages are saved to memory', async () => {
const { memory } = { memory: new Memory().storage('memory') };
const approveTool = new Tool('approve_action')
.description('Request approval for an action. Suspends for human review.')
.input(z.object({ action: z.string().describe('Action to approve') }))
.output(z.object({ approved: z.boolean(), action: z.string() }))
.suspend(z.object({ question: z.string() }))
.resume(z.object({ confirmed: z.boolean() }))
.handler(async ({ action }, ctx) => {
if (!ctx.resumeData) {
return await ctx.suspend({ question: `Approve "${action}"?` });
}
return { approved: ctx.resumeData.confirmed, action };
})
.toModelOutput(() => {
// Always throws — simulates a broken transform on the resumed call
throw new Error('toModelOutput failed on resume intentionally');
});
const agent = new Agent('transform-error-resume-test')
.model(getModel('anthropic'))
.instructions(
'You are an approval manager. Use approve_action when asked to perform an action. ' +
'If the tool errors, acknowledge it and say what happened. Be concise.',
)
.tool(approveTool)
.memory(memory)
.checkpoint('memory');
const threadId = `test-transform-resume-${Date.now()}`;
const options = { persistence: { threadId, resourceId: 'test-user' } };
// Turn 1: stream — agent should call the tool, which suspends
const result1 = await agent.stream('Please approve the action "deploy to production"', options);
const chunks1 = await collectStreamChunks(result1.stream);
const suspendChunk = chunksOfType(chunks1, 'tool-call-suspended')[0];
expect(suspendChunk).toBeDefined();
// Turn 2: resume — tool returns a result, but toModelOutput throws
// Bug (without fix): stream closes with finishReason 'error' via closeStreamWithError
// Fix: error is captured as a tool error, stream continues, LLM responds
const result2 = await agent.resume(
'stream',
{ confirmed: true },
{ runId: suspendChunk.runId, toolCallId: suspendChunk.toolCallId },
);
const chunks2 = await collectStreamChunks(result2.stream);
const finishChunks = chunksOfType(chunks2, 'finish');
expect(finishChunks.length).toBeGreaterThan(0);
// The stream must NOT end with 'error' — the transform error must be contained
expect(finishChunks[finishChunks.length - 1].finishReason).not.toBe('error');
// The tool-result chunk in the resumed stream should carry isError: true
const toolResults = chunksOfType(chunks2, 'tool-result');
const approveResult = toolResults.find((c) => c.toolName === 'approve_action');
expect(approveResult).toBeDefined();
expect(approveResult!.isError).toBe(true);
});
});
@@ -649,6 +649,43 @@ describe('AgentRuntime.generate() — graceful error contract', () => {
});
});
// ---------------------------------------------------------------------------
// stream() — fallback error observability
// ---------------------------------------------------------------------------
describe('AgentRuntime.stream() — fallback error observability', () => {
beforeEach(() => {
vi.clearAllMocks();
});
it('emits AgentEvent.Error and sets failed state when post-loop persistence throws', async () => {
const memory = {
getMessages: vi.fn().mockResolvedValue([]),
saveMessages: vi.fn().mockRejectedValue(new Error('memory write failed')),
} as unknown as InMemoryMemory;
const { runtime, bus } = createRuntime();
const errorEvents: unknown[] = [];
bus.on(AgentEvent.Error, (event) => errorEvents.push(event));
(runtime as unknown as { config: { memory: unknown } }).config.memory = memory;
streamText.mockReturnValue(makeStreamSuccess('done'));
const result = await runtime.stream('hello', {
persistence: { threadId: 'thread-1', resourceId: 'resource-1' },
});
const chunks = await collectChunks(result.stream);
expect(chunks).toContainEqual(
expect.objectContaining({
type: 'finish',
finishReason: 'error',
}),
);
expect(errorEvents).toHaveLength(1);
expect(runtime.getState().status).toBe('failed');
});
});
// ---------------------------------------------------------------------------
// stream() — graceful error contract
// ---------------------------------------------------------------------------
@@ -4831,3 +4868,203 @@ describe('AgentRuntime.resume() with createCancellation() — manual handling (h
expect(callCtx.resumeData).toBeUndefined();
});
});
// ---------------------------------------------------------------------------
// toModelOutput error resilience — processToolCall must never re-throw
// ---------------------------------------------------------------------------
describe('AgentRuntime — toModelOutput error resilience', () => {
beforeEach(() => {
vi.clearAllMocks();
});
function makeTransformErrorTool(name = 'transform_tool'): BuiltTool {
return {
name,
description: 'A tool whose toModelOutput always throws',
inputSchema: z.object({ value: z.string().optional() }),
handler: async () => await Promise.resolve({ raw: 'data' }),
toModelOutput: () => {
throw new Error('toModelOutput failed');
},
};
}
function makeSuspendingTransformErrorTool(name = 'suspend_tool'): BuiltTool {
return {
name,
description: 'A suspending tool whose toModelOutput throws on the resumed call',
inputSchema: z.object({ value: z.string().optional() }),
suspendSchema: z.object({ reason: z.string() }),
resumeSchema: z.object({ approved: z.boolean() }),
handler: async (_input, ctx) => {
const suspendCtx = ctx as InterruptibleToolContext;
if (!suspendCtx.resumeData) {
return await suspendCtx.suspend({ reason: 'needs approval' });
}
return { done: true };
},
toModelOutput: () => {
throw new Error('toModelOutput failed on resume');
},
};
}
it('generate(): toModelOutput throwing is treated as a tool error — loop continues', async () => {
const tool = makeTransformErrorTool();
const { runtime } = createRuntimeWithTools([tool], 1);
generateText
.mockResolvedValueOnce(
makeGenerateWithToolCalls([{ toolCallId: 'tc-1', toolName: 'transform_tool', args: {} }]),
)
.mockResolvedValueOnce(makeGenerateSuccess('I see the tool errored'));
const result = await runtime.generate('run the tool');
// The agent loop must not crash — it continues and the LLM finishes
expect(result.finishReason).toBe('stop');
// Two LLM calls: tool-call turn + follow-up after error
expect(generateText).toHaveBeenCalledTimes(2);
});
it('stream(): toModelOutput throwing surfaces as isError tool-result — loop continues', async () => {
const tool = makeTransformErrorTool();
const { runtime } = createRuntimeWithTools([tool], 1);
streamText
.mockReturnValueOnce({
fullStream: makeChunkStream([{ type: 'text-delta', textDelta: 'thinking...' }]),
finishReason: Promise.resolve('tool-calls'),
usage: Promise.resolve({ inputTokens: 10, outputTokens: 5, totalTokens: 15 }),
response: Promise.resolve({
messages: [
{
role: 'assistant',
content: [
{
type: 'tool-call',
toolCallId: 'tc-1',
toolName: 'transform_tool',
args: {},
},
],
},
],
}),
toolCalls: Promise.resolve([{ toolCallId: 'tc-1', toolName: 'transform_tool', input: {} }]),
})
.mockReturnValueOnce(makeStreamSuccess('I see the tool errored'));
const { stream } = await runtime.stream('run the tool');
const chunks = await collectChunks(stream);
const finishChunk = chunks.find((c) => c.type === 'finish') as
| (StreamChunk & { type: 'finish' })
| undefined;
expect(finishChunk?.finishReason).toBe('stop');
const toolResultChunk = chunks.find(
(c) => c.type === 'tool-result' && c.toolCallId === 'tc-1',
) as (StreamChunk & { type: 'tool-result' }) | undefined;
expect(toolResultChunk?.isError).toBe(true);
});
it('generate() resume: toModelOutput throwing in resumed tool call is captured — loop continues', async () => {
const tool = makeSuspendingTransformErrorTool();
const { runtime } = createRuntimeWithTools([tool], 1);
generateText
.mockResolvedValueOnce(
makeGenerateWithToolCalls([{ toolCallId: 'tc-1', toolName: 'suspend_tool', args: {} }]),
)
.mockResolvedValueOnce(makeGenerateSuccess('Handled the resume error'));
// First call: tool suspends
const firstResult = await runtime.generate('run the tool');
expect(firstResult.finishReason).toBe('tool-calls');
expect(firstResult.pendingSuspend).toHaveLength(1);
const { runId, toolCallId } = firstResult.pendingSuspend![0];
// Resume: tool returns a result but toModelOutput throws
// Bug: without fix this propagates out of iteratePendingToolCallsConcurrent and
// causes generate() to return with finishReason 'error' instead of 'stop'.
const resumeResult = await runtime.resume(
'generate',
{ approved: true },
{
runId,
toolCallId,
},
);
expect(resumeResult.finishReason).toBe('stop');
// Two LLM calls: initial + after resume error
expect(generateText).toHaveBeenCalledTimes(2);
});
it('stream() resume: toModelOutput throwing in resumed tool call does not close the stream with error', async () => {
const tool = makeSuspendingTransformErrorTool();
const { runtime } = createRuntimeWithTools([tool], 1);
// First stream: agent calls the tool and suspends
streamText.mockReturnValueOnce({
fullStream: makeChunkStream([{ type: 'text-delta', textDelta: 'thinking...' }]),
finishReason: Promise.resolve('tool-calls'),
usage: Promise.resolve({ inputTokens: 10, outputTokens: 5, totalTokens: 15 }),
response: Promise.resolve({
messages: [
{
role: 'assistant',
content: [
{
type: 'tool-call',
toolCallId: 'tc-1',
toolName: 'suspend_tool',
args: {},
},
],
},
],
}),
toolCalls: Promise.resolve([{ toolCallId: 'tc-1', toolName: 'suspend_tool', input: {} }]),
});
const firstResult = await runtime.stream('run the tool');
const firstChunks = await collectChunks(firstResult.stream);
const suspendChunk = firstChunks.find((c) => c.type === 'tool-call-suspended') as
| (StreamChunk & { type: 'tool-call-suspended' })
| undefined;
expect(suspendChunk).toBeDefined();
// Resume: toModelOutput throws
// Bug: without fix this closes the stream with finishReason 'error' via closeStreamWithError.
streamText.mockReturnValueOnce(makeStreamSuccess('Handled the resume error'));
const resumed = await runtime.resume(
'stream',
{ approved: true },
{
runId: suspendChunk!.runId,
toolCallId: suspendChunk!.toolCallId,
},
);
const resumedChunks = await collectChunks(resumed.stream);
const finishChunk = resumedChunks.find((c) => c.type === 'finish') as
| (StreamChunk & { type: 'finish' })
| undefined;
expect(finishChunk).toBeDefined();
// Must NOT be 'error' — the tool error should be contained, not kill the stream
expect(finishChunk!.finishReason).not.toBe('error');
// The resumed stream should contain a tool-result with isError: true
const toolResultChunk = resumedChunks.find(
(c) => c.type === 'tool-result' && c.toolCallId === 'tc-1',
) as (StreamChunk & { type: 'tool-result' }) | undefined;
expect(toolResultChunk).toBeDefined();
expect(toolResultChunk!.isError).toBe(true);
});
});
@@ -2,7 +2,7 @@ import type * as AiImport from 'ai';
import type { LanguageModel } from 'ai';
import type { BuiltTelemetry } from '../../types';
import { generateTitleFromMessage } from '../title-generation';
import { generateTitleAndEmojiFromMessage, generateTitleFromMessage } from '../title-generation';
type GenerateTextCall = {
system?: string;
@@ -207,3 +207,24 @@ describe('generateTitleFromMessage', () => {
expect(result).toBe('Scryfall random card workflow');
});
});
describe('generateTitleAndEmojiFromMessage', () => {
beforeEach(() => {
mockGenerateText.mockReset();
});
it('wraps the user message as conversation context so the model does not answer it', async () => {
mockGenerateText.mockResolvedValue({ text: '{"title":"Berlin rain alert","emoji":"rain"}' });
await generateTitleAndEmojiFromMessage(fakeModel, 'Build a daily Berlin rain alert workflow');
const call = mockGenerateText.mock.calls[0][0];
expect(call.messages[0]).toEqual({
role: 'user',
content: `
Here's the conversation:
<conversation>
Build a daily Berlin rain alert workflow
</conversation>`,
});
});
});
@@ -1220,6 +1220,11 @@ export class AgentRuntime {
.catch(async (error: unknown) => {
await this.flushTelemetry(options);
await this.cleanupRun();
const isAbort = ctx.abortScope.isAborted;
this.updateState({ status: isAbort ? 'cancelled' : 'failed' });
if (!isAbort) {
this.eventBus.emit({ type: AgentEvent.Error, message: String(error), error });
}
try {
await writer.write({ type: 'error', error });
await writer.write({ type: 'finish', finishReason: 'error' });
@@ -1228,7 +1233,8 @@ export class AgentRuntime {
writer.abort(error).catch(() => {});
}
})
.finally(() => {
.finally(async () => {
await writer.close().catch(() => {});
this.eventBus.off(AgentEvent.ToolExecutionStart, onToolExecutionStart);
this.eventBus.off(AgentEvent.ToolExecutionEnd, onToolExecutionEnd);
this.eventBus.off(AgentEvent.SubAgentStarted, onSubAgentStarted);
@@ -1535,6 +1541,27 @@ export class AgentRuntime {
lastFinishReason = 'max-iterations';
}
await this.saveToMemory(list, options);
if (this.config.titleGeneration && options?.persistence && this.config.memory) {
const titlePromise = generateThreadTitle({
memory: this.config.memory,
threadId: options.persistence.threadId,
resourceId: options.persistence.resourceId,
titleConfig: this.config.titleGeneration,
agentModel: this.config.model,
turnDelta: list.turnDelta(),
executionCounter: options.executionCounter,
});
this.backgroundTasks.track(titlePromise);
if (this.config.titleGeneration.sync) {
await titlePromise;
}
}
await this.cleanupRun();
await this.flushTelemetry(options);
const costUsage = this.applyCost(totalUsage);
await writer.write({
type: 'finish',
@@ -1544,33 +1571,10 @@ export class AgentRuntime {
...(structuredOutput !== undefined && { structuredOutput }),
});
try {
await this.saveToMemory(list, options);
if (this.config.titleGeneration && options?.persistence && this.config.memory) {
const titlePromise = generateThreadTitle({
memory: this.config.memory,
threadId: options.persistence.threadId,
resourceId: options.persistence.resourceId,
titleConfig: this.config.titleGeneration,
agentModel: this.config.model,
turnDelta: list.turnDelta(),
executionCounter: options.executionCounter,
});
this.backgroundTasks.track(titlePromise);
if (this.config.titleGeneration.sync) {
await titlePromise;
}
}
await this.cleanupRun();
await this.flushTelemetry(options);
this.updateState({ status: 'success', messageList: list.serialize() });
this.eventBus.emit({ type: AgentEvent.AgentEnd, messages: list.responseDelta() });
} finally {
await writer.close();
}
this.updateState({ status: 'success', messageList: list.serialize() });
this.eventBus.emit({ type: AgentEvent.AgentEnd, messages: list.responseDelta() });
// on error writer.close() will be called in startStreamLoop
await writer.close();
}
/** Persist the current-turn delta to memory. */
@@ -2079,20 +2083,26 @@ export class AgentRuntime {
const pending: Record<string, PendingToolCall> = {};
// 1. Execute the resumed tool
const processResult = await this.processToolCall(
resumedEntry.toolCallId,
resumedToolName,
resumedEntry.input,
toolMap,
list,
runId,
persistence,
pendingResume.resumeData,
resolvedTelemetry,
executionCounter,
abortSignal,
false,
);
let processResult: ToolCallOutcome;
try {
processResult = await this.processToolCall(
resumedEntry.toolCallId,
resumedToolName,
resumedEntry.input,
toolMap,
list,
runId,
persistence,
pendingResume.resumeData,
resolvedTelemetry,
executionCounter,
abortSignal,
false,
);
} catch (error) {
processResult = { outcome: 'error', error };
list.setToolCallError(resumedEntry.toolCallId, error);
}
if (processResult.outcome === 'suspended') {
pending[resumedId] = {
@@ -2375,6 +2385,16 @@ export class AgentRuntime {
};
}
// Apply toModelOutput transform before emitting the success event.
// If the transform throws, treat it as a tool error so processToolCall
// never re-throws (preserving the "never re-throws" contract).
let modelResult: unknown;
try {
modelResult = builtTool.toModelOutput ? builtTool.toModelOutput(toolResult) : toolResult;
} catch (error) {
return makeToolError(error);
}
this.eventBus.emit({
type: AgentEvent.ToolExecutionEnd,
toolCallId,
@@ -2383,10 +2403,6 @@ export class AgentRuntime {
isError: false,
});
// Apply toModelOutput transform: the raw result goes to history/events,
// but the transformed version is what the LLM sees as the tool result.
const modelResult = builtTool.toModelOutput ? builtTool.toModelOutput(toolResult) : toolResult;
list.setToolCallResult(toolCallId, toJsonValue(modelResult));
const customMessage = builtTool?.toMessage?.(toolResult);
@@ -173,10 +173,16 @@ export async function generateTitleAndEmojiFromMessage(
return null;
}
const wrappedMessage = `
Here's the conversation:
<conversation>
${trimmed}
</conversation>`;
const result = await loadAi().generateText({
model,
system: opts?.instructions ?? DEFAULT_TITLE_AND_EMOJI_INSTRUCTIONS,
messages: [{ role: 'user', content: trimmed }],
messages: [{ role: 'user', content: wrappedMessage }],
});
incrementTokenCountFromUsage(opts?.executionCounter, result.usage);
@@ -1622,6 +1622,26 @@ describe('AgentsService', () => {
});
}
function makeFailingStream(error: Error): ReadableStream {
let readCount = 0;
return {
getReader: () => ({
read: jest.fn().mockImplementation(async () => {
readCount++;
if (readCount === 1) {
return {
done: false,
value: { type: 'text-delta', id: 't1', delta: 'partial answer' },
};
}
throw error;
}),
cancel: jest.fn(),
releaseLock: jest.fn(),
}),
} as unknown as ReadableStream;
}
async function collectChunks(
config: object,
): Promise<Array<{ type: string; [k: string]: unknown }>> {
@@ -1684,6 +1704,78 @@ describe('AgentsService', () => {
expect(chunks.every((c) => c.type !== 'text-delta')).toBe(true);
});
it('records the first user message when the stream is stopped after suspension', async () => {
const agentInstance = {
name: 'test',
stream: jest.fn().mockResolvedValue({
runId: 'run-1',
stream: makeStream([
{
type: 'tool-call-suspended',
toolCallId: 'tool-call-1',
toolName: 'approve',
},
]),
}),
};
const stream = (service as unknown as StreamChatResponse).streamChatResponse({
agentInstance,
toolRegistry: new Map(),
agentId,
message: 'hello',
memory: { threadId: 'thread-1', resourceId: 'user-1' },
projectId,
});
const first = await stream.next();
expect(first.value.type).toBe('tool-call-suspended');
await stream.return(undefined);
expect(agentExecutionService.recordMessage).toHaveBeenCalledWith(
expect.objectContaining({
threadId: 'thread-1',
userMessage: 'hello',
hitlStatus: 'suspended',
}),
);
});
it('records a failed execution when the stream reader errors before finish', async () => {
const streamError = new Error('reader failed while consuming stream');
const agentInstance = {
name: 'test',
stream: jest.fn().mockResolvedValue({
runId: 'run-1',
stream: makeFailingStream(streamError),
}),
};
await expect(
collectChunks({
agentInstance,
toolRegistry: new Map(),
agentId,
message: 'hello',
memory: { threadId: 'thread-1', resourceId: 'user-1' },
projectId,
}),
).rejects.toThrow('reader failed while consuming stream');
expect(agentExecutionService.recordMessage).toHaveBeenCalledWith(
expect.objectContaining({
threadId: 'thread-1',
agentId,
userMessage: 'hello',
record: expect.objectContaining({
assistantResponse: 'partial answer',
finishReason: 'error',
error: 'reader failed while consuming stream',
}),
}),
);
});
});
describe('executeForWorkflow', () => {
const outputSchema: JSONSchema7 = {
@@ -684,6 +684,38 @@ describe('ExecutionRecorder — tool-result error normalization', () => {
expect(tc.success).toBe(true);
expect(tc.output).toEqual({ status: 'ok', data: [1, 2, 3] });
});
it('preserves object-shaped stream errors in the message record', () => {
const rec = new ExecutionRecorder();
rec.record({
type: 'error',
error: {
message: 'Node tool validation failed',
code: 'NODE_TOOL_VALIDATION',
details: { nodeType: 'n8n-nodes-base.httpRequestTool' },
},
} as never);
rec.record({ type: 'finish', finishReason: 'error' } as StreamChunk);
const record = rec.getMessageRecord();
expect(record.error).toContain('Node tool validation failed');
expect(record.error).toContain('NODE_TOOL_VALIDATION');
expect(record.error).toContain('n8n-nodes-base.httpRequestTool');
});
it('scrubs secrets from Error-shaped stream errors', () => {
const rec = new ExecutionRecorder();
rec.record({
type: 'error',
error: new Error('Request failed with apiKey=super-secret-token'),
} as never);
rec.record({ type: 'finish', finishReason: 'error' } as StreamChunk);
const record = rec.getMessageRecord();
expect(record.error).toBe('Request failed with [REDACTED]');
});
});
describe('ExecutionRecorder — node-tool {{$json.x}} resolution', () => {
+117 -93
View File
@@ -922,37 +922,44 @@ export class AgentsService {
const { agent: agentInstance, toolRegistry } = runtime;
const recorder = new ExecutionRecorder(toolRegistry);
const resultStream = await agentInstance.resume('stream', resumeData, {
runId,
toolCallId,
executionCounter: this.createAgentExecutionCounter({ agentId, userId }),
});
for await (const value of streamAgentChunks(resultStream.stream)) {
recorder.record(value);
yield value;
}
// Always record resumed executions — even if they suspend again (chained HITL).
// Don't repeat the original user message — the pre-suspension execution already has it.
const messageRecord = recorder.getMessageRecord();
void this.agentExecutionService
.recordMessage({
threadId,
agentId,
agentName: agentInstance.name,
projectId,
userMessage: '',
record: messageRecord,
hitlStatus: 'resumed',
})
.catch((error) => {
this.logger.warn('Failed to record resumed agent execution', {
agentId,
threadId,
error: error instanceof Error ? error.message : String(error),
});
try {
const resultStream = await agentInstance.resume('stream', resumeData, {
runId,
toolCallId,
executionCounter: this.createAgentExecutionCounter({ agentId, userId }),
});
for await (const value of streamAgentChunks(resultStream.stream)) {
recorder.record(value);
yield value;
}
} catch (error) {
recorder.record({ type: 'error', error });
recorder.record({ type: 'finish', finishReason: 'error' });
throw error;
} finally {
// Always record resumed executions — even if they suspend again (chained HITL)
// or fail while streaming. Don't repeat the original user message — the
// pre-suspension execution already has it.
const messageRecord = recorder.getMessageRecord();
void this.agentExecutionService
.recordMessage({
threadId,
agentId,
agentName: agentInstance.name,
projectId,
userMessage: '',
record: messageRecord,
hitlStatus: 'resumed',
})
.catch((error) => {
this.logger.warn('Failed to record resumed agent execution', {
agentId,
threadId,
error: error instanceof Error ? error.message : String(error),
});
});
}
}
/**
@@ -1295,51 +1302,57 @@ export class AgentsService {
const recorder = new ExecutionRecorder(toolRegistry);
const resultStream = await agentInstance.stream(message, {
persistence: { threadId, resourceId },
executionCounter: this.createAgentExecutionCounter({ agentId, userId }),
});
for await (const value of streamAgentChunks(resultStream.stream)) {
recorder.record(value);
if (value.type === 'tool-call-suspended') {
this.logger.info('Chat: tool-call-suspended chunk received', {
agentId,
toolCallId: value.toolCallId,
toolName: value.toolName,
});
}
if (value.type === 'finish' && value.finishReason === 'max-iterations') {
for (const chunk of getMaxIterationsChunks()) {
yield chunk;
}
}
yield value;
}
// Always record — even if suspended, the pre-suspension response text
// and tool calls are valuable. Usage/model will be null for suspended runs.
const messageRecord = recorder.getMessageRecord();
void this.agentExecutionService
.recordMessage({
threadId,
agentId,
agentName: agentInstance.name,
projectId,
userMessage: message,
record: messageRecord,
hitlStatus: recorder.suspended ? 'suspended' : undefined,
source,
taskId,
taskVersionId,
})
.catch((error) => {
this.logger.warn('Failed to record agent execution', {
agentId,
threadId,
error: error instanceof Error ? error.message : String(error),
});
try {
const resultStream = await agentInstance.stream(message, {
persistence: { threadId, resourceId },
executionCounter: this.createAgentExecutionCounter({ agentId, userId }),
});
for await (const value of streamAgentChunks(resultStream.stream)) {
recorder.record(value);
if (value.type === 'tool-call-suspended') {
this.logger.info('Chat: tool-call-suspended chunk received', {
agentId,
toolCallId: value.toolCallId,
toolName: value.toolName,
});
}
if (value.type === 'finish' && value.finishReason === 'max-iterations') {
for (const chunk of getMaxIterationsChunks()) {
yield chunk;
}
}
yield value;
}
} catch (error) {
recorder.record({ type: 'error', error });
recorder.record({ type: 'finish', finishReason: 'error' });
throw error;
} finally {
// Always record — even if suspended or failed, the pre-suspension/error
// response text and tool calls are valuable.
const messageRecord = recorder.getMessageRecord();
void this.agentExecutionService
.recordMessage({
threadId,
agentId,
agentName: agentInstance.name,
projectId,
userMessage: message,
record: messageRecord,
hitlStatus: recorder.suspended ? 'suspended' : undefined,
source,
taskId,
taskVersionId,
})
.catch((error) => {
this.logger.warn('Failed to record agent execution', {
agentId,
threadId,
error: error instanceof Error ? error.message : String(error),
});
});
}
}
/**
@@ -1438,31 +1451,38 @@ export class AgentsService {
// `structuredOutput` and `toolCalls` aren't surfaced by the recorder —
// pull them off the `finish` chunk and the discrete `tool-result` chunks
// directly so the workflow node receives the same shape as before.
let structuredOutput: unknown | null = null;
let structuredOutput: unknown = null;
const toolCalls: ExecuteAgentData['toolCalls'] = [];
const toolInputs = new Map<string, { toolName: string; input: unknown }>();
let streamError: Error | undefined;
const resultStream = await agentInstance.stream(message, {
persistence: { resourceId: executionId, threadId },
executionCounter: this.createAgentExecutionCounter({ agentId, userId: telemetryUserId }),
});
try {
const resultStream = await agentInstance.stream(message, {
persistence: { resourceId: executionId, threadId },
executionCounter: this.createAgentExecutionCounter({ agentId, userId: telemetryUserId }),
});
for await (const value of streamAgentChunks(resultStream.stream)) {
recorder.record(value);
for await (const value of streamAgentChunks(resultStream.stream)) {
recorder.record(value);
if (value.type === 'tool-call') {
toolInputs.set(value.toolCallId, { toolName: value.toolName, input: value.input });
} else if (value.type === 'tool-result') {
const pending = toolInputs.get(value.toolCallId);
toolCalls.push({
toolName: value.toolName,
input: pending?.input ?? null,
result: value.output,
});
toolInputs.delete(value.toolCallId);
} else if (value.type === 'finish' && value.structuredOutput !== undefined) {
structuredOutput = value.structuredOutput;
if (value.type === 'tool-call') {
toolInputs.set(value.toolCallId, { toolName: value.toolName, input: value.input });
} else if (value.type === 'tool-result') {
const pending = toolInputs.get(value.toolCallId);
toolCalls.push({
toolName: value.toolName,
input: pending?.input ?? null,
result: value.output,
});
toolInputs.delete(value.toolCallId);
} else if (value.type === 'finish' && value.structuredOutput !== undefined) {
structuredOutput = value.structuredOutput;
}
}
} catch (error) {
recorder.record({ type: 'error', error });
recorder.record({ type: 'finish', finishReason: 'error' });
streamError = error instanceof Error ? error : new Error(String(error));
}
const messageRecord = recorder.getMessageRecord();
@@ -1489,6 +1509,10 @@ export class AgentsService {
});
});
if (streamError !== undefined) {
throw streamError;
}
if (recorder.suspended) {
throw new OperationalError(
'Agent execution suspended waiting for tool approval. ' +
@@ -139,6 +139,20 @@ function normaliseToolErrorOutput(output: unknown): unknown {
return output;
}
function normaliseStreamError(error: unknown): string {
if (error instanceof Error) {
return scrubSecretsInText(error.message || error.name || 'Agent execution failed');
}
if (typeof error === 'string') return scrubSecretsInText(error);
const sanitized = sanitizeExecutionLogValue(error);
try {
return scrubSecretsInText(JSON.stringify(sanitized));
} catch {
return scrubSecretsInText(String(error));
}
}
const REDACTED_VALUE = '[REDACTED]';
const CIRCULAR_VALUE = '[Circular]';
@@ -323,8 +337,7 @@ export class ExecutionRecorder {
});
break;
case 'error': {
const errMsg = chunk.error instanceof Error ? chunk.error.message : String(chunk.error);
this.error = errMsg;
this.error = normaliseStreamError(chunk.error);
break;
}
}
@@ -10,6 +10,7 @@ import {
import {
applyOpenSuspensions,
buildDisplayGroups,
convertDbMessages,
rebuildInteractiveFromHistory,
isGroupable,
@@ -376,8 +377,7 @@ describe('isGroupable', () => {
});
describe('buildDisplayGroups — interactive payloads', () => {
it('collects interactive payloads from each grouped message into the toolRun group', async () => {
const { buildDisplayGroups } = await import('../composables/agentChatMessages');
it('collects interactive payloads from each grouped message into the toolRun group', () => {
const groups = buildDisplayGroups([
// First grouped turn: a resolved ask_llm card
{
@@ -433,6 +433,65 @@ describe('buildDisplayGroups — interactive payloads', () => {
expect(grouped.interactives[1].toolName).toBe(ASK_CREDENTIAL_TOOL_NAME);
expect(grouped.interactives[1].resolvedAt).toBeUndefined();
});
it('merges duplicate persisted tool calls by id and keeps the resolved one', () => {
const chat = convertDbMessages([
{
id: 'user-1',
role: 'user',
content: [{ type: 'text', text: 'Can you fetch this page?' }],
},
{
id: 'assistant-pending',
role: 'assistant',
content: [
{
type: 'tool-call',
toolName: 'notion_notion-fetch',
toolCallId: 'toolu_1',
input: { id: 'https://app.notion.com/p/example' },
startTime: 1_000,
},
],
},
{
id: 'assistant-resolved',
role: 'assistant',
content: [
{
type: 'tool-call',
toolName: 'notion_notion-fetch',
toolCallId: 'toolu_1',
state: 'resolved',
output: { content: [{ type: 'text', text: 'Page contents' }] },
startTime: 2_000,
endTime: 2_000,
},
{ type: 'text', text: 'Here is the page I fetched.' },
],
},
]);
const groups = buildDisplayGroups(chat);
expect(groups).toHaveLength(2);
const toolRun = groups[1];
expect(toolRun.kind).toBe('toolRun');
if (toolRun.kind !== 'toolRun') return;
expect(toolRun.toolCalls).toHaveLength(1);
expect(toolRun.toolCalls[0]).toEqual(
expect.objectContaining({
tool: 'notion_notion-fetch',
toolCallId: 'toolu_1',
state: 'done',
input: { id: 'https://app.notion.com/p/example' },
output: { content: [{ type: 'text', text: 'Page contents' }] },
startTime: 1_000,
endTime: 2_000,
}),
);
expect(toolRun.finalMessage?.content).toBe('Here is the page I fetched.');
});
});
describe('applyOpenSuspensions', () => {
@@ -189,17 +189,65 @@ export function isGroupable(msg: ChatMessage): boolean {
return msg.role === 'assistant' && !!msg.toolCalls?.length && !msg.content.trim();
}
function mergeToolCall(previous: ToolCall, next: ToolCall): ToolCall {
const merged: ToolCall = {
...previous,
...next,
input: next.input ?? previous.input,
startTime: previous.startTime ?? next.startTime,
endTime: next.endTime ?? previous.endTime,
canceled: next.canceled ?? previous.canceled,
};
return {
...merged,
displaySummary: summariseToolCall(merged.tool, merged.output, merged.input),
};
}
function appendToolCalls(existing: ToolCall[], next: ToolCall[]): ToolCall[] {
const merged = [...existing];
const indexByToolCallId = new Map<string, number>();
for (const [index, toolCall] of merged.entries()) {
if (toolCall.toolCallId) indexByToolCallId.set(toolCall.toolCallId, index);
}
for (const toolCall of next) {
if (!toolCall.toolCallId) {
merged.push(toolCall);
continue;
}
const index = indexByToolCallId.get(toolCall.toolCallId);
if (index === undefined) {
indexByToolCallId.set(toolCall.toolCallId, merged.length);
merged.push(toolCall);
continue;
}
merged[index] = mergeToolCall(merged[index], toolCall);
}
return merged;
}
function appendInteractivePayloads(
existing: InteractivePayload[],
next: InteractivePayload | undefined,
): InteractivePayload[] {
if (!next) return existing;
const index = existing.findIndex((payload) => payload.toolCallId === next.toolCallId);
if (index === -1) return [...existing, next];
return existing.map((payload, i) => (i === index ? next : payload));
}
export function buildDisplayGroups(messages: ChatMessage[]): DisplayGroup[] {
const groups: DisplayGroup[] = [];
for (const msg of messages) {
if (isGroupable(msg)) {
const last = groups[groups.length - 1];
if (last && last.kind === 'toolRun' && !last.finalMessage) {
last.toolCalls = [...last.toolCalls, ...(msg.toolCalls ?? [])];
last.toolCalls = appendToolCalls(last.toolCalls, msg.toolCalls ?? []);
if (msg.thinking) {
last.thinking = last.thinking ? `${last.thinking}\n\n${msg.thinking}` : msg.thinking;
}
if (msg.interactive) last.interactives.push(msg.interactive);
last.interactives = appendInteractivePayloads(last.interactives, msg.interactive);
continue;
}
groups.push({
@@ -221,9 +269,9 @@ export function buildDisplayGroups(messages: ChatMessage[]): DisplayGroup[] {
last.thinking = last.thinking ? `${last.thinking}\n\n${msg.thinking}` : msg.thinking;
}
if (msg.toolCalls?.length) {
last.toolCalls = [...last.toolCalls, ...msg.toolCalls];
last.toolCalls = appendToolCalls(last.toolCalls, msg.toolCalls);
}
if (msg.interactive) last.interactives.push(msg.interactive);
last.interactives = appendInteractivePayloads(last.interactives, msg.interactive);
continue;
}
}