refactor(editor): Unify instance AI agent run state with the rendered tree (no-changelog) (#32124)

This commit is contained in:
Raúl Gómez Morales
2026-06-18 12:14:17 +02:00
committed by GitHub
parent ebbd629a6a
commit 553d046bbd
12 changed files with 988 additions and 601 deletions
+2 -1
View File
@@ -422,9 +422,10 @@ export {
reduceEvent,
findAgent,
toAgentTree,
stateFromAgentTree,
} from './schemas/agent-run-reducer';
export type { AgentRunState, AgentNode } from './schemas/agent-run-reducer';
export type { AgentRunState } from './schemas/agent-run-reducer';
export {
formatDebugJson,
@@ -1,6 +1,19 @@
import { createInitialState, reduceEvent, findAgent, toAgentTree } from '../agent-run-reducer';
import { deepCopy } from 'n8n-workflow';
import {
createInitialState,
reduceEvent,
findAgent,
toAgentTree,
stateFromAgentTree,
} from '../agent-run-reducer';
import type { AgentRunState } from '../agent-run-reducer';
import type { InstanceAiEvent } from '../instance-ai.schema';
import type {
InstanceAiAgentNode,
InstanceAiEvent,
InstanceAiTimelineEntry,
InstanceAiToolCallState,
} from '../instance-ai.schema';
// ---------------------------------------------------------------------------
// Factory helpers
@@ -133,10 +146,7 @@ function stateWithRun(runId: string, agentId: string): AgentRunState {
function expectStateMapsNotPolluted(state: AgentRunState): void {
expect(Object.getPrototypeOf(state.agentsById)).toBe(Object.prototype);
expect(Object.getPrototypeOf(state.parentByAgentId)).toBe(Object.prototype);
expect(Object.getPrototypeOf(state.childrenByAgentId)).toBe(Object.prototype);
expect(Object.getPrototypeOf(state.timelineByAgentId)).toBe(Object.prototype);
expect(Object.getPrototypeOf(state.toolCallsById)).toBe(Object.prototype);
expect(Object.getPrototypeOf(state.toolCallIdsByAgentId)).toBe(Object.prototype);
}
// ---------------------------------------------------------------------------
@@ -254,6 +264,35 @@ describe('agent-run-reducer', () => {
expect(state.toolCallsById['tc-1'].isLoading).toBe(true);
});
it('follow-up run-start preserves a reasoning-only tree', () => {
const state = stateWithRun('run-1', 'root');
reduceEvent(state, makeReasoningDelta('run-1', 'root', 'deep thoughts'));
// Turn produced reasoning only — no text, tools, or children.
reduceEvent(state, makeRunFinish('run-1', 'root', 'completed'));
reduceEvent(state, makeRunStart('run-2', 'root'));
expect(state.agentsById['root'].reasoning).toBe('deep thoughts');
expect(state.status).toBe('active');
expect(state.agentsById['root'].status).toBe('active');
});
it('follow-up run-start preserves root-only status/result/error', () => {
const state = stateWithRun('run-1', 'root');
const root = state.agentsById['root'];
root.statusMessage = 'Recalling conversation...';
root.result = 'done';
root.error = 'boom';
reduceEvent(state, makeRunFinish('run-1', 'root', 'completed'));
reduceEvent(state, makeRunStart('run-2', 'root'));
expect(state.agentsById['root'].statusMessage).toBe('Recalling conversation...');
expect(state.agentsById['root'].result).toBe('done');
expect(state.agentsById['root'].error).toBe('boom');
expect(state.agentsById['root'].status).toBe('active');
});
it('run-start with unsafe agentId is ignored', () => {
const state = createInitialState();
@@ -273,8 +312,8 @@ describe('agent-run-reducer', () => {
expect(state.agentsById['root'].textContent).toBe('Hello world');
// Consecutive text should merge into one timeline entry
expect(state.timelineByAgentId['root']).toHaveLength(1);
expect(state.timelineByAgentId['root'][0]).toEqual({
expect(state.agentsById['root'].timeline).toHaveLength(1);
expect(state.agentsById['root'].timeline[0]).toEqual({
type: 'text',
content: 'Hello world',
});
@@ -325,8 +364,8 @@ describe('agent-run-reducer', () => {
expect(tc.isLoading).toBe(true);
expect(tc.renderHint).toBe('tasks');
expect(state.toolCallIdsByAgentId['root']).toContain('tc-1');
expect(state.timelineByAgentId['root']).toContainEqual({
expect(state.agentsById['root'].toolCalls.map((t) => t.toolCallId)).toContain('tc-1');
expect(state.agentsById['root'].timeline).toContainEqual({
type: 'tool-call',
toolCallId: 'tc-1',
});
@@ -399,8 +438,8 @@ describe('agent-run-reducer', () => {
expect(state.agentsById['sub-1'].role).toBe('sub-agent');
expect(state.agentsById['sub-1'].status).toBe('active');
expect(state.parentByAgentId['sub-1']).toBe('root');
expect(state.childrenByAgentId['root']).toContain('sub-1');
expect(state.timelineByAgentId['root']).toContainEqual({
expect(state.agentsById['root'].children.map((c) => c.agentId)).toContain('sub-1');
expect(state.agentsById['root'].timeline).toContainEqual({
type: 'child',
agentId: 'sub-1',
});
@@ -578,7 +617,7 @@ describe('agent-run-reducer', () => {
expect(state.agentsById['grandchild'].textContent).toBe('deep text');
expect(state.agentsById['grandchild'].status).toBe('completed');
expect(state.parentByAgentId['grandchild']).toBe('child');
expect(state.childrenByAgentId['child']).toContain('grandchild');
expect(state.agentsById['child'].children.map((c) => c.agentId)).toContain('grandchild');
});
});
@@ -591,7 +630,7 @@ describe('agent-run-reducer', () => {
reduceEvent(state, makeToolResult('run-1', 'sub-1', 'tc-1', 'found'));
reduceEvent(state, makeTextDelta('run-1', 'sub-1', 'after tool'));
const timeline = state.timelineByAgentId['sub-1'];
const timeline = state.agentsById['sub-1'].timeline;
expect(timeline).toHaveLength(3);
expect(timeline[0]).toEqual({ type: 'text', content: 'before tool' });
expect(timeline[1]).toEqual({ type: 'tool-call', toolCallId: 'tc-1' });
@@ -680,7 +719,7 @@ describe('agent-run-reducer', () => {
// builder-1 from run A should still exist
expect(findAgent(state, 'builder-1')).toBeDefined();
expect(state.toolCallsById['tc-1']).toBeDefined();
expect(state.childrenByAgentId['root']).toContain('builder-1');
expect(state.agentsById['root'].children.map((c) => c.agentId)).toContain('builder-1');
const tree = toAgentTree(state);
expect(tree.children).toHaveLength(1);
@@ -729,7 +768,10 @@ describe('agent-run-reducer', () => {
makeAgentSpawned('run-1', 'builder-2', 'root', 'workflow-builder', ['build']),
);
expect(state.childrenByAgentId['root']).toEqual(['builder-1', 'builder-2']);
expect(state.agentsById['root'].children.map((c) => c.agentId)).toEqual([
'builder-1',
'builder-2',
]);
expect(findAgent(state, 'builder-1')).toBeDefined();
expect(findAgent(state, 'builder-2')).toBeDefined();
@@ -737,8 +779,8 @@ describe('agent-run-reducer', () => {
reduceEvent(state, makeToolCall('run-1', 'builder-1', 'tc-1', 'search-nodes'));
reduceEvent(state, makeToolCall('run-1', 'builder-2', 'tc-2', 'search-nodes'));
expect(state.toolCallIdsByAgentId['builder-1']).toEqual(['tc-1']);
expect(state.toolCallIdsByAgentId['builder-2']).toEqual(['tc-2']);
expect(state.agentsById['builder-1'].toolCalls.map((t) => t.toolCallId)).toEqual(['tc-1']);
expect(state.agentsById['builder-2'].toolCalls.map((t) => t.toolCallId)).toEqual(['tc-2']);
const tree = toAgentTree(state);
expect(tree.children).toHaveLength(2);
@@ -746,4 +788,216 @@ describe('agent-run-reducer', () => {
expect(tree.children[1].toolCalls).toHaveLength(1);
});
});
describe('live tree view', () => {
it('toAgentTree returns the state root node itself (stable identity)', () => {
const state = stateWithRun('run-1', 'root');
const tree = toAgentTree(state);
expect(tree).toBe(state.agentsById['root']);
expect(toAgentTree(state)).toBe(tree);
});
it('mutations after toAgentTree are visible through the returned tree', () => {
const state = stateWithRun('run-1', 'root');
const tree = toAgentTree(state);
reduceEvent(state, makeTextDelta('run-1', 'root', 'streamed'));
reduceEvent(state, makeAgentSpawned('run-1', 'sub-1', 'root'));
reduceEvent(state, makeToolCall('run-1', 'sub-1', 'tc-1', 'search'));
expect(tree.textContent).toBe('streamed');
expect(tree.children).toHaveLength(1);
expect(tree.children[0]).toBe(state.agentsById['sub-1']);
expect(tree.children[0].toolCalls[0]).toBe(state.toolCallsById['tc-1']);
});
it('duplicate agent-spawned for an existing agent is ignored', () => {
const state = stateWithRun('run-1', 'root');
reduceEvent(state, makeAgentSpawned('run-1', 'sub-1', 'root'));
reduceEvent(state, makeTextDelta('run-1', 'sub-1', 'kept'));
reduceEvent(state, makeAgentSpawned('run-1', 'sub-1', 'root'));
expect(state.agentsById['root'].children).toHaveLength(1);
expect(state.agentsById['sub-1'].textContent).toBe('kept');
});
});
describe('stateFromAgentTree', () => {
function buildSnapshotTree() {
const state = stateWithRun('run-1', 'root');
reduceEvent(state, makeTextDelta('run-1', 'root', 'hello'));
reduceEvent(state, makeAgentSpawned('run-1', 'sub-1', 'root', 'builder', ['build']));
reduceEvent(state, makeToolCall('run-1', 'sub-1', 'tc-1', 'build-workflow'));
reduceEvent(state, makeToolResult('run-1', 'sub-1', 'tc-1', 'ok'));
reduceEvent(state, makeAgentCompleted('run-1', 'sub-1', 'built'));
reduceEvent(state, makeRunFinish('run-1', 'root', 'completed'));
// Deep copy to simulate a backend snapshot (fresh objects).
return deepCopy(toAgentTree(state));
}
it('round-trips a snapshot tree back into an equivalent state', () => {
const tree = buildSnapshotTree();
const state = stateFromAgentTree(tree);
expect(state).toBeDefined();
expect(state!.rootAgentId).toBe('root');
expect(state!.status).toBe('completed');
expect(state!.parentByAgentId['sub-1']).toBe('root');
expect(state!.toolCallsById['tc-1'].result).toBe('ok');
expect(toAgentTree(state!)).toEqual(tree);
});
it('adopts the given nodes instead of copying them', () => {
const tree = buildSnapshotTree();
const state = stateFromAgentTree(tree);
expect(toAgentTree(state!)).toBe(tree);
expect(state!.agentsById['sub-1']).toBe(tree.children[0]);
expect(state!.toolCallsById['tc-1']).toBe(tree.children[0].toolCalls[0]);
// Live continuation: reducing into the adopted state mutates the original tree
reduceEvent(state!, makeTextDelta('run-2', 'root', ' again'));
expect(tree.textContent).toBe('hello again');
});
it('preserves an active run status', () => {
const tree = buildSnapshotTree();
tree.status = 'active';
const state = stateFromAgentTree(tree);
expect(state!.status).toBe('active');
});
it('returns undefined for an unsafe root agentId', () => {
const tree = buildSnapshotTree();
tree.agentId = '__proto__';
expect(stateFromAgentTree(tree)).toBeUndefined();
});
it('drops children and tool calls with unsafe ids', () => {
const tree = buildSnapshotTree();
tree.children.push({
...tree.children[0],
agentId: '__proto__',
});
tree.toolCalls.push({
toolCallId: '__proto__',
toolName: 'evil',
args: {},
isLoading: false,
});
tree.timeline.push({ type: 'child', agentId: '__proto__' });
tree.timeline.push({ type: 'tool-call', toolCallId: '__proto__' });
const state = stateFromAgentTree(tree);
expect(findAgent(state!, '__proto__')).toBeUndefined();
expect(tree.children).toHaveLength(1);
expect(tree.toolCalls).toHaveLength(0);
expect(tree.timeline.some((e) => e.type === 'child' && e.agentId === '__proto__')).toBe(
false,
);
expect(
tree.timeline.some((e) => e.type === 'tool-call' && e.toolCallId === '__proto__'),
).toBe(false);
expectStateMapsNotPolluted(state!);
});
it('normalizes missing or non-array collections instead of throwing', () => {
// Simulates a truncated/malformed snapshot — run-sync frames and
// hydrated messages are not schema-validated.
const child = {
agentId: 'sub-1',
role: 'builder',
status: 'completed',
textContent: '',
reasoning: '',
// children / toolCalls / timeline missing entirely
} as unknown as InstanceAiAgentNode;
const tree = {
agentId: 'root',
role: 'orchestrator',
status: 'active',
textContent: 'hi',
reasoning: '',
children: [child],
toolCalls: 'junk',
timeline: undefined,
} as unknown as InstanceAiAgentNode;
const state = stateFromAgentTree(tree);
expect(state).toBeDefined();
expect(tree.toolCalls).toEqual([]);
expect(tree.timeline).toEqual([]);
expect(child.children).toEqual([]);
expect(child.toolCalls).toEqual([]);
expect(child.timeline).toEqual([]);
// The repaired node is reducible: appendTimelineText reads timeline.at()
reduceEvent(state!, makeTextDelta('run-1', 'sub-1', 'live'));
expect(child.textContent).toBe('live');
expect(child.timeline).toEqual([{ type: 'text', content: 'live' }]);
});
it('drops junk entries inside snapshot collections', () => {
const tree = buildSnapshotTree();
tree.children.push(null as unknown as InstanceAiAgentNode);
tree.children.push({ role: 'no-id' } as unknown as InstanceAiAgentNode);
tree.toolCalls.push(null as unknown as InstanceAiToolCallState);
tree.timeline.push(null as unknown as InstanceAiTimelineEntry);
const state = stateFromAgentTree(tree);
expect(state).toBeDefined();
expect(tree.children).toHaveLength(1);
expect(tree.toolCalls).toHaveLength(0);
expect(tree.timeline.every((entry) => entry !== null)).toBe(true);
expect(findAgent(state!, 'undefined')).toBeUndefined();
});
it('returns undefined when the root agentId is missing', () => {
const tree = buildSnapshotTree();
Reflect.deleteProperty(tree, 'agentId');
expect(stateFromAgentTree(tree)).toBeUndefined();
});
it('drops unsafe ids on nested descendants, not just the root level', () => {
const tree = buildSnapshotTree();
const child = tree.children[0];
child.children.push({
agentId: '__proto__',
role: 'evil',
status: 'active',
textContent: '',
reasoning: '',
toolCalls: [],
children: [],
timeline: [],
});
child.toolCalls.push({
toolCallId: 'constructor',
toolName: 'evil',
args: {},
isLoading: true,
});
child.timeline.push({ type: 'child', agentId: '__proto__' });
const state = stateFromAgentTree(tree);
expect(findAgent(state!, '__proto__')).toBeUndefined();
// Plain-object lookup would hit Object.prototype.constructor — assert own keys.
expect(Object.prototype.hasOwnProperty.call(state!.toolCallsById, 'constructor')).toBe(false);
expect(child.children).toHaveLength(0);
expect(child.toolCalls.some((tc) => tc.toolCallId === 'constructor')).toBe(false);
expect(child.timeline.some((e) => e.type === 'child' && e.agentId === '__proto__')).toBe(
false,
);
expectStateMapsNotPolluted(state!);
});
});
});
@@ -2,63 +2,50 @@
* Shared event reducer for Instance AI agent runs.
*
* Used by both the frontend (live SSE updates) and the backend (snapshot building).
* All state is plain objects/arrays — no Map/Set — so it's serializable, Pinia-safe,
* and easy to inspect in tests.
* All state is plain objects/arrays — no Map/Set — so it's Pinia-safe and easy
* to inspect in tests.
*
* The state is node-centric: `agentsById` holds the actual `InstanceAiAgentNode`
* objects, each embedding its own `toolCalls`, `timeline`, and `children`. The
* agent tree and the flat lookups share the same objects — the tree rooted at
* `rootAgentId` IS the state, viewed hierarchically. Events mutate nodes in
* place, so consumers holding a node (or the tree root) always observe current
* data without any synchronization layer.
*
* Nodes never reference their parent (parent links live in `parentByAgentId`
* as plain ids), so the tree stays acyclic and `toAgentTree(state)` is
* JSON-serializable. Don't JSON round-trip the state itself: `agentsById`
* aliases the tree nodes, so stringify duplicates every shared subtree and
* parse yields a state whose index and tree no longer share objects.
*/
import { getRenderHint, isSafeObjectKey } from './instance-ai.schema';
import type {
InstanceAiEvent,
InstanceAiAgentNode,
InstanceAiAgentKind,
InstanceAiAgentStatus,
InstanceAiToolCallState,
InstanceAiTimelineEntry,
InstanceAiTargetResource,
PlannedTaskArg,
TaskList,
InstanceAiToolCallState,
} from './instance-ai.schema';
// ---------------------------------------------------------------------------
// State types
// ---------------------------------------------------------------------------
export interface AgentNode {
agentId: string;
role: string;
tools?: string[];
taskId?: string;
// Display metadata (from enriched agent-spawned events)
kind?: InstanceAiAgentKind;
title?: string;
subtitle?: string;
goal?: string;
targetResource?: InstanceAiTargetResource;
/** Transient status message (e.g. "Recalling conversation..."). Cleared when empty. */
statusMessage?: string;
status: InstanceAiAgentStatus;
textContent: string;
reasoning: string;
tasks?: TaskList;
planItems?: PlannedTaskArg[];
result?: string;
error?: string;
}
export interface AgentRunState {
rootAgentId: string;
/** Flat agent lookup — supports any nesting depth. */
agentsById: Record<string, AgentNode>;
/**
* Flat agent lookup — supports any nesting depth. Values are the live tree
* nodes: `agentsById[id].children[n]` is the same object as
* `agentsById[childId]`.
*/
agentsById: Record<string, InstanceAiAgentNode>;
/** Maps child agentId → parent agentId. Root agent has no entry. */
parentByAgentId: Record<string, string>;
/** Ordered list of children per agent. */
childrenByAgentId: Record<string, string[]>;
/** Chronological timeline per agent. */
timelineByAgentId: Record<string, InstanceAiTimelineEntry[]>;
/** Flat tool-call lookup. */
/**
* Flat tool-call lookup — same objects as in the owning node's `toolCalls`
* array, so result/error updates are visible from both sides.
*/
toolCallsById: Record<string, InstanceAiToolCallState>;
/** Ordered tool-call IDs per agent (preserves insertion order). */
toolCallIdsByAgentId: Record<string, string[]>;
/** Run status — tracks the overall run lifecycle. */
status: 'active' | 'completed' | 'cancelled' | 'error';
}
@@ -67,24 +54,28 @@ export interface AgentRunState {
// Factory
// ---------------------------------------------------------------------------
function createNode(agentId: string, role: string): InstanceAiAgentNode {
return {
agentId,
role,
status: 'active',
textContent: '',
reasoning: '',
toolCalls: [],
children: [],
timeline: [],
};
}
export function createInitialState(rootAgentId = 'agent-001'): AgentRunState {
const safeRootAgentId = isSafeObjectKey(rootAgentId) ? rootAgentId : 'agent-001';
return {
rootAgentId: safeRootAgentId,
agentsById: {
[safeRootAgentId]: {
agentId: safeRootAgentId,
role: 'orchestrator',
status: 'active',
textContent: '',
reasoning: '',
},
[safeRootAgentId]: createNode(safeRootAgentId, 'orchestrator'),
},
parentByAgentId: {},
childrenByAgentId: { [safeRootAgentId]: [] },
timelineByAgentId: { [safeRootAgentId]: [] },
toolCallsById: {},
toolCallIdsByAgentId: { [safeRootAgentId]: [] },
status: 'active',
};
}
@@ -93,7 +84,7 @@ export function createInitialState(rootAgentId = 'agent-001'): AgentRunState {
// Lookup
// ---------------------------------------------------------------------------
export function findAgent(state: AgentRunState, agentId: string): AgentNode | undefined {
export function findAgent(state: AgentRunState, agentId: string): InstanceAiAgentNode | undefined {
if (!isSafeObjectKey(agentId)) return undefined;
return state.agentsById[agentId];
}
@@ -102,41 +93,11 @@ export function findAgent(state: AgentRunState, agentId: string): AgentNode | un
// Internal helpers
// ---------------------------------------------------------------------------
function ensureAgent(state: AgentRunState, agentId: string): AgentNode | undefined {
function ensureAgent(state: AgentRunState, agentId: string): InstanceAiAgentNode | undefined {
if (!isSafeObjectKey(agentId)) return undefined;
return state.agentsById[agentId];
}
function ensureTimeline(state: AgentRunState, agentId: string): InstanceAiTimelineEntry[] {
if (!isSafeObjectKey(agentId)) return [];
let tl = state.timelineByAgentId[agentId];
if (!tl) {
tl = [];
state.timelineByAgentId[agentId] = tl;
}
return tl;
}
function ensureToolCallIds(state: AgentRunState, agentId: string): string[] {
if (!isSafeObjectKey(agentId)) return [];
let ids = state.toolCallIdsByAgentId[agentId];
if (!ids) {
ids = [];
state.toolCallIdsByAgentId[agentId] = ids;
}
return ids;
}
function ensureChildren(state: AgentRunState, agentId: string): string[] {
if (!isSafeObjectKey(agentId)) return [];
let children = state.childrenByAgentId[agentId];
if (!children) {
children = [];
state.childrenByAgentId[agentId] = children;
}
return children;
}
/** Append text to timeline — merges consecutive text entries within the same responseId. */
function appendTimelineText(
timeline: InstanceAiTimelineEntry[],
@@ -151,6 +112,30 @@ function appendTimelineText(
}
}
/**
* Whether a node carries any content worth preserving across a follow-up
* `run-start`. Covers every renderable field a turn can populate — not just
* text/tools/children — so a reasoning-, status-, result-, or error-only tree
* is not wiped when the next run in the group starts. Optional-chained because
* adopted run-sync trees are not schema-validated, so a malformed node must not
* throw here.
*/
function nodeHasContent(node: InstanceAiAgentNode | undefined): boolean {
if (!node) return false;
return (
(node.textContent?.length ?? 0) > 0 ||
(node.reasoning?.length ?? 0) > 0 ||
(node.timeline?.length ?? 0) > 0 ||
(node.toolCalls?.length ?? 0) > 0 ||
(node.children?.length ?? 0) > 0 ||
(node.planItems?.length ?? 0) > 0 ||
!!node.statusMessage ||
!!node.result ||
!!node.error ||
!!node.tasks
);
}
// ---------------------------------------------------------------------------
// Reducer
// ---------------------------------------------------------------------------
@@ -164,35 +149,20 @@ export function reduceEvent(state: AgentRunState, event: InstanceAiEvent): Agent
case 'run-start': {
const rootId = event.agentId;
if (!isSafeObjectKey(rootId)) break;
const hasExistingAgents =
Object.keys(state.agentsById).length > 1 ||
(state.agentsById[state.rootAgentId]?.textContent?.length ?? 0) > 0 ||
(state.childrenByAgentId[state.rootAgentId]?.length ?? 0) > 0 ||
(state.toolCallIdsByAgentId[state.rootAgentId]?.length ?? 0) > 0;
const root = state.agentsById[state.rootAgentId];
const hasExistingAgents = Object.keys(state.agentsById).length > 1 || nodeHasContent(root);
if (hasExistingAgents) {
// Follow-up run in a merged group: preserve existing agent tree,
// just re-activate the root orchestrator for the new run's events.
state.status = 'active';
const root = state.agentsById[state.rootAgentId];
if (root) root.status = 'active';
} else {
// First run: initialize from scratch.
state.rootAgentId = rootId;
state.agentsById = {
[rootId]: {
agentId: rootId,
role: 'orchestrator',
status: 'active',
textContent: '',
reasoning: '',
},
};
state.agentsById = { [rootId]: createNode(rootId, 'orchestrator') };
state.parentByAgentId = {};
state.childrenByAgentId = { [rootId]: [] };
state.timelineByAgentId = { [rootId]: [] };
state.toolCallsById = {};
state.toolCallIdsByAgentId = { [rootId]: [] };
state.status = 'active';
}
break;
@@ -202,11 +172,7 @@ export function reduceEvent(state: AgentRunState, event: InstanceAiEvent): Agent
const agent = ensureAgent(state, event.agentId);
if (agent) {
agent.textContent += event.payload.text;
appendTimelineText(
ensureTimeline(state, event.agentId),
event.payload.text,
event.responseId,
);
appendTimelineText(agent.timeline, event.payload.text, event.responseId);
}
break;
}
@@ -232,8 +198,8 @@ export function reduceEvent(state: AgentRunState, event: InstanceAiEvent): Agent
startedAt: new Date().toISOString(),
};
state.toolCallsById[event.payload.toolCallId] = tc;
ensureToolCallIds(state, event.agentId).push(event.payload.toolCallId);
ensureTimeline(state, event.agentId).push({
agent.toolCalls.push(tc);
agent.timeline.push({
type: 'tool-call',
toolCallId: event.payload.toolCallId,
...(event.responseId ? { responseId: event.responseId } : {}),
@@ -266,11 +232,13 @@ export function reduceEvent(state: AgentRunState, event: InstanceAiEvent): Agent
case 'agent-spawned': {
if (!isSafeObjectKey(event.agentId) || !isSafeObjectKey(event.payload.parentId)) break;
// Idempotency guard: a replayed agent-spawned for an existing agent
// must not create a second node for the same id.
if (state.agentsById[event.agentId]) break;
const parentAgent = ensureAgent(state, event.payload.parentId);
if (parentAgent) {
state.agentsById[event.agentId] = {
agentId: event.agentId,
role: event.payload.role,
const child: InstanceAiAgentNode = {
...createNode(event.agentId, event.payload.role),
tools: event.payload.tools,
taskId: event.payload.taskId,
kind: event.payload.kind,
@@ -278,20 +246,14 @@ export function reduceEvent(state: AgentRunState, event: InstanceAiEvent): Agent
subtitle: event.payload.subtitle,
goal: event.payload.goal,
targetResource: event.payload.targetResource,
status: 'active',
textContent: '',
reasoning: '',
};
state.agentsById[event.agentId] = child;
state.parentByAgentId[event.agentId] = event.payload.parentId;
ensureChildren(state, event.payload.parentId).push(event.agentId);
ensureChildren(state, event.agentId); // init empty
ensureTimeline(state, event.agentId); // init empty
ensureToolCallIds(state, event.agentId); // init empty
const parentTimeline = ensureTimeline(state, event.payload.parentId);
parentAgent.children.push(child);
// Inherit responseId from the parent's last entry when not set on the event
// (agent-spawned events are emitted from tool code, not the stream executor).
const inheritedResponseId = event.responseId ?? parentTimeline.at(-1)?.responseId;
parentTimeline.push({
const inheritedResponseId = event.responseId ?? parentAgent.timeline.at(-1)?.responseId;
parentAgent.timeline.push({
type: 'child',
agentId: event.agentId,
...(inheritedResponseId ? { responseId: inheritedResponseId } : {}),
@@ -306,15 +268,10 @@ export function reduceEvent(state: AgentRunState, event: InstanceAiEvent): Agent
agent.status = event.payload.error ? 'error' : 'completed';
agent.result = event.payload.result;
agent.error = event.payload.error;
}
// A completed/errored agent can't have tool calls still in-flight.
// Clear isLoading so persisted snapshots don't show stale confirmations.
if (!isSafeObjectKey(event.agentId)) break;
const agentToolCallIds = state.toolCallIdsByAgentId[event.agentId];
if (agentToolCallIds) {
for (const tcId of agentToolCallIds) {
const tc = state.toolCallsById[tcId];
if (tc?.isLoading) {
// A completed/errored agent can't have tool calls still in-flight.
// Clear isLoading so persisted snapshots don't show stale confirmations.
for (const tc of agent.toolCalls) {
if (tc.isLoading) {
tc.isLoading = false;
}
}
@@ -370,17 +327,10 @@ export function reduceEvent(state: AgentRunState, event: InstanceAiEvent): Agent
case 'error': {
const errorText = '\n\n*Error: ' + event.payload.content + '*';
const agent = ensureAgent(state, event.agentId);
const agent = ensureAgent(state, event.agentId) ?? state.agentsById[state.rootAgentId];
if (agent) {
agent.textContent += errorText;
appendTimelineText(ensureTimeline(state, event.agentId), errorText, event.responseId);
} else {
// Fall back to root agent
const root = state.agentsById[state.rootAgentId];
if (root) {
root.textContent += errorText;
appendTimelineText(ensureTimeline(state, state.rootAgentId), errorText, event.responseId);
}
appendTimelineText(agent.timeline, errorText, event.responseId);
}
break;
}
@@ -416,72 +366,82 @@ export function reduceEvent(state: AgentRunState, event: InstanceAiEvent): Agent
}
// ---------------------------------------------------------------------------
// Tree reconstruction (for rendering)
// Tree view (for rendering and snapshot serialization)
// ---------------------------------------------------------------------------
/**
* Derives the nested `InstanceAiAgentNode` tree from the flat state.
* This is what components receive for rendering.
* Returns the agent tree rooted at `rootAgentId`. This is a live view — the
* root node and everything below it are the state's own objects, kept current
* by `reduceEvent` mutations. JSON-serializable (no cycles); callers that need
* an immutable snapshot must clone it themselves.
*/
export function toAgentTree(state: AgentRunState): InstanceAiAgentNode {
return buildNodeRecursive(state, state.rootAgentId);
return state.agentsById[state.rootAgentId] ?? createNode(state.rootAgentId, 'unknown');
}
function buildNodeRecursive(state: AgentRunState, agentId: string): InstanceAiAgentNode {
if (!isSafeObjectKey(agentId)) {
return {
agentId,
role: 'unknown',
status: 'active',
textContent: '',
reasoning: '',
toolCalls: [],
children: [],
timeline: [],
};
}
/**
* Adopted snapshots are not schema-validated (run-sync frames are plain
* `JSON.parse`, hydrated messages a cast REST response), so an id must be
* checked for presence and type before it is trusted as an object key.
* Note `isSafeObjectKey(undefined)` would return true — `Set.has` on a
* missing key is just false — so the string check is load-bearing.
*/
function isAdoptableId(id: unknown): id is string {
return typeof id === 'string' && isSafeObjectKey(id);
}
const agent = state.agentsById[agentId];
const childIds = (state.childrenByAgentId[agentId] ?? []).filter((childId) =>
isSafeObjectKey(childId),
);
const toolCallIds = (state.toolCallIdsByAgentId[agentId] ?? []).filter((toolCallId) =>
isSafeObjectKey(toolCallId),
);
const timeline = (state.timelineByAgentId[agentId] ?? []).filter((entry) => {
if (entry.type === 'child') return isSafeObjectKey(entry.agentId);
if (entry.type === 'tool-call') return isSafeObjectKey(entry.toolCallId);
return true;
});
/**
* Inverse of `toAgentTree`: index an existing snapshot tree (e.g. from session
* restore or a `run-sync` frame) into an `AgentRunState`.
*
* The nodes are adopted, not copied — the returned state's `agentsById` points
* at the given tree's own objects, so subsequent `reduceEvent` calls mutate the
* tree the caller already holds. Snapshots are not schema-validated: entries
* with unsafe or missing ids are dropped and missing/junk collections are
* normalized to empty arrays, in place — adoption never throws, and adopted
* nodes are always safe to reduce into and render.
*/
export function stateFromAgentTree(tree: InstanceAiAgentNode): AgentRunState | undefined {
if (!isAdoptableId(tree.agentId)) return undefined;
const toolCalls: InstanceAiToolCallState[] = toolCallIds
.map((id) => state.toolCallsById[id])
.filter((tc): tc is InstanceAiToolCallState => tc !== undefined);
const children: InstanceAiAgentNode[] = childIds.map((childId) =>
buildNodeRecursive(state, childId),
);
return {
agentId: agent?.agentId ?? agentId,
role: agent?.role ?? 'unknown',
tools: agent?.tools,
taskId: agent?.taskId,
kind: agent?.kind,
title: agent?.title,
subtitle: agent?.subtitle,
goal: agent?.goal,
targetResource: agent?.targetResource,
statusMessage: agent?.statusMessage,
status: agent?.status ?? 'active',
textContent: agent?.textContent ?? '',
reasoning: agent?.reasoning ?? '',
toolCalls,
children,
timeline: [...timeline],
tasks: agent?.tasks,
planItems: agent?.planItems,
result: agent?.result,
error: agent?.error,
const state: AgentRunState = {
rootAgentId: tree.agentId,
agentsById: {},
parentByAgentId: {},
toolCallsById: {},
status: tree.status === 'active' ? 'active' : tree.status,
};
adoptNode(state, tree, undefined);
return state;
}
function adoptNode(
state: AgentRunState,
node: InstanceAiAgentNode,
parentId: string | undefined,
): void {
if (!isAdoptableId(node.agentId)) return;
state.agentsById[node.agentId] = node;
if (parentId && isSafeObjectKey(parentId)) state.parentByAgentId[node.agentId] = parentId;
node.children = Array.isArray(node.children)
? node.children.filter((child) => isAdoptableId(child?.agentId))
: [];
node.toolCalls = Array.isArray(node.toolCalls)
? node.toolCalls.filter((toolCall) => isAdoptableId(toolCall?.toolCallId))
: [];
node.timeline = Array.isArray(node.timeline)
? node.timeline.filter((entry) => {
if (entry?.type === 'child') return isAdoptableId(entry.agentId);
if (entry?.type === 'tool-call') return isAdoptableId(entry.toolCallId);
return Boolean(entry);
})
: [];
for (const tc of node.toolCalls) {
state.toolCallsById[tc.toolCallId] = tc;
}
for (const child of node.children) {
adoptNode(state, child, node.agentId);
}
}
@@ -6,6 +6,7 @@ import {
onUnmounted,
provide,
ref,
shallowReactive,
useTemplateRef,
watch,
} from 'vue';
@@ -82,7 +83,19 @@ const builderAgents = computed(() => collectActiveBuilderAgents(thread.messages)
// Assistant messages whose only content has been extracted to the bottom
// builder section (or which haven't produced anything renderable yet) would
// otherwise leave an empty wrapper in the list — filter them out.
const displayedMessages = computed(() => thread.messages.filter(messageHasVisibleContent));
// Reconciled in place: spliced only when membership changes, so streamed
// tokens don't re-render the list.
const displayedMessages = shallowReactive<typeof thread.messages>([]);
watch(
() => thread.messages.filter(messageHasVisibleContent),
(next) => {
const unchanged =
next.length === displayedMessages.length &&
next.every((msg, i) => msg === displayedMessages[i]);
if (!unchanged) displayedMessages.splice(0, displayedMessages.length, ...next);
},
{ immediate: true },
);
// True when at least one pending confirmation should occupy the chat-input
// slot (generic approvals + domain/web-search access). Drives the swap
@@ -1,10 +1,11 @@
import { describe, test, expect } from 'vitest';
import { isReactive } from 'vue';
import {
handleEvent,
findMessageByRunId,
findAgentNode,
getRenderHint,
rebuildRunStateFromTree,
createRunStateFromTree,
} from '../instanceAi.reducer';
import type { InstanceAiReducerState } from '../instanceAi.reducer';
import type { InstanceAiEvent } from '@n8n/api-types';
@@ -17,8 +18,8 @@ function makeState(overrides?: Partial<InstanceAiReducerState>): InstanceAiReduc
return {
messages: [],
activeRunId: null,
runStateByGroupId: {},
groupIdByRunId: {},
runStateByGroupId: new Map(),
groupIdByRunId: new Map(),
...overrides,
};
}
@@ -185,8 +186,10 @@ function stateWithRun(runId: string, agentId: string): InstanceAiReducerState {
}
function expectReducerMapsNotPolluted(state: InstanceAiReducerState): void {
expect(Object.getPrototypeOf(state.runStateByGroupId)).toBe(Object.prototype);
expect(Object.getPrototypeOf(state.groupIdByRunId)).toBe(Object.prototype);
// Maps can't be prototype-polluted; assert the guards skipped the unsafe id
// rather than storing it as an entry.
expect(state.runStateByGroupId.has('__proto__')).toBe(false);
expect(state.groupIdByRunId.has('__proto__')).toBe(false);
}
// ---------------------------------------------------------------------------
@@ -333,7 +336,9 @@ describe('instanceAi.reducer', () => {
handleEvent(state, makeToolResultEvent('run-1', 'agent-root', 'tc-1', { ok: true }));
const tc = state.messages[0].agentTree!.toolCalls[0];
expect(tc).not.toBe(pendingToolCall);
// In-place update: the rendered tool call keeps its identity (reactivity
// tracks the mutated properties, not object replacement).
expect(tc).toBe(pendingToolCall);
expect(tc.isLoading).toBe(false);
expect(tc.result).toEqual({ ok: true });
});
@@ -521,7 +526,7 @@ describe('instanceAi.reducer', () => {
});
expect(state.messages).toHaveLength(0);
expect(state.groupIdByRunId['run-safe']).toBeUndefined();
expect(state.groupIdByRunId.get('run-safe')).toBeUndefined();
expectReducerMapsNotPolluted(state);
});
@@ -553,8 +558,8 @@ describe('instanceAi.reducer', () => {
expectReducerMapsNotPolluted(state);
});
test('rebuildRunStateFromTree skips unsafe roots', () => {
const runState = rebuildRunStateFromTree({
test('createRunStateFromTree skips unsafe roots', () => {
const runState = createRunStateFromTree({
agentId: '__proto__',
role: 'orchestrator',
status: 'completed',
@@ -568,8 +573,8 @@ describe('instanceAi.reducer', () => {
expect(runState).toBeUndefined();
});
test('rebuildRunStateFromTree preserves planItems', () => {
const runState = rebuildRunStateFromTree({
test('createRunStateFromTree preserves planItems', () => {
const runState = createRunStateFromTree({
agentId: 'agent-root',
role: 'orchestrator',
status: 'completed',
@@ -855,4 +860,58 @@ describe('instanceAi.reducer', () => {
expect(node!.textContent).toBe('deep');
});
});
// -----------------------------------------------------------------------
// Live tree contract: msg.agentTree IS the run state's root node
// -----------------------------------------------------------------------
describe('live tree contract', () => {
test('msg.agentTree is the run state root and is reactive', () => {
const state = stateWithRun('run-1', 'agent-root');
const msg = state.messages[0];
const runState = state.runStateByGroupId.get('run-1');
expect(runState).toBeDefined();
expect(msg.agentTree).toBe(runState!.agentsById['agent-root']);
// The run state is wrapped in reactive() so in-place reducer mutations
// trigger Vue updates on the rendered tree.
expect(isReactive(msg.agentTree)).toBe(true);
});
test('events keep mutating an adopted snapshot tree (session restore continuation)', () => {
const tree = {
agentId: 'agent-root',
role: 'orchestrator',
status: 'active' as const,
textContent: 'restored',
reasoning: '',
toolCalls: [],
children: [],
timeline: [{ type: 'text' as const, content: 'restored' }],
};
const runState = createRunStateFromTree(tree)!;
const state = makeState({
messages: [
{
id: 'mg-1',
runId: 'run-1',
messageGroupId: 'mg-1',
role: 'assistant',
createdAt: new Date().toISOString(),
content: 'restored',
reasoning: '',
isStreaming: true,
agentTree: tree,
},
],
runStateByGroupId: new Map([['mg-1', runState]]),
groupIdByRunId: new Map([['run-1', 'mg-1']]),
});
handleEvent(state, makeTextDeltaEvent('run-1', 'agent-root', ' and continued'));
// The adopted tree — the exact object the message renders — was updated.
expect(tree.textContent).toBe('restored and continued');
expect(state.messages[0].content).toBe('restored and continued');
});
});
});
@@ -677,13 +677,13 @@ describe('createThreadRuntime - SSE and hydration', () => {
isStreaming: false,
},
];
mockFetchThreadMessages.mockResolvedValueOnce({
threadId: activeThreadId,
messages: [],
nextEventId: 10,
});
await expect(activeRuntime(registry).loadHistoricalMessages()).resolves.toBe('skipped');
// The skip happens before the fetch — fetchThreadMessages must not be
// called (and must not be mocked here: vi.clearAllMocks() does not drain
// once-queues, so an unconsumed mockResolvedValueOnce leaks into the next
// test's hydration).
expect(mockFetchThreadMessages).not.toHaveBeenCalled();
expect(activeRuntime(registry).messages).toHaveLength(1);
expect(activeRuntime(registry).lastEventId).toBeUndefined();
});
@@ -734,10 +734,111 @@ describe('createThreadRuntime - SSE and hydration', () => {
}),
);
// The unsafe group id got no routing entry (and thus no run state), so
// the event must not reduce anywhere: the message is still found via the
// runId fallback (no phantom), but stays untouched.
expect(activeRuntime(registry).messages).toHaveLength(1);
expect(activeRuntime(registry).messages[0].messageGroupId).toBe('safe-run');
expect(activeRuntime(registry).messages[0].agentTree?.agentId).toBe('fresh-root');
expect(activeRuntime(registry).messages[0].content).toBe('restored safely');
const hydratedMsg = activeRuntime(registry).messages[0];
expect(hydratedMsg.messageGroupId).toBe('__proto__');
expect(hydratedMsg.agentTree?.agentId).toBe('agent-root');
expect(hydratedMsg.agentTree?.textContent).toBe('');
expect(hydratedMsg.content).toBe('');
});
test('hydration adopts message trees so live events mutate the rendered tree', async () => {
mockFetchThreadMessages.mockResolvedValueOnce({
threadId: activeThreadId,
messages: [
{
id: 'msg-restored',
runId: 'run-h',
messageGroupId: 'group-h',
role: 'assistant',
createdAt: new Date().toISOString(),
content: 'restored',
reasoning: '',
isStreaming: false,
agentTree: {
agentId: 'agent-root',
role: 'orchestrator',
status: 'active',
textContent: 'restored',
reasoning: '',
toolCalls: [],
children: [],
timeline: [{ type: 'text', content: 'restored' }],
},
},
],
nextEventId: 11,
});
await activeRuntime(registry).loadHistoricalMessages();
capturedOnMessage!(
makeSSEEvent({
type: 'text-delta',
runId: 'run-h',
agentId: 'agent-root',
payload: { text: ' + live' },
}),
);
// Identity contract: the hydrated run state ADOPTS msg.agentTree's nodes,
// so the live event must mutate the very tree the message renders. A
// defensive copy anywhere on this path would freeze the rendered tree at
// the snapshot while events mutate an orphaned state.
expect(activeRuntime(registry).messages).toHaveLength(1);
const msg = activeRuntime(registry).messages[0];
expect(msg.agentTree?.textContent).toBe('restored + live');
expect(msg.agentTree?.timeline).toEqual([{ type: 'text', content: 'restored + live' }]);
expect(msg.content).toBe('restored + live');
});
test('run-sync adopts the snapshot tree so subsequent live events mutate the rendered tree', () => {
capturedOnMessage!(
makeSSEEvent({
type: 'run-start',
runId: 'run-s',
agentId: 'agent-root',
payload: { messageId: 'msg-1', messageGroupId: 'group-s' },
}),
);
capturedInstance!.dispatchNamedEvent('run-sync', {
runId: 'run-s',
messageGroupId: 'group-s',
runIds: ['run-s'],
agentTree: {
agentId: 'agent-root',
role: 'orchestrator',
status: 'active',
textContent: 'synced',
reasoning: '',
toolCalls: [],
children: [],
timeline: [{ type: 'text', content: 'synced' }],
},
status: 'active',
backgroundTasks: [],
});
capturedOnMessage!(
makeSSEEvent({
type: 'text-delta',
runId: 'run-s',
agentId: 'agent-root',
payload: { text: ' + live' },
}),
);
// Identity contract: run-sync rebuilds the run state by ADOPTING the
// snapshot tree it assigns to msg.agentTree, so post-sync live events
// must mutate the rendered tree (not an orphaned copy).
expect(activeRuntime(registry).messages).toHaveLength(1);
const msg = activeRuntime(registry).messages[0];
expect(msg.agentTree?.textContent).toBe('synced + live');
expect(msg.agentTree?.timeline).toEqual([{ type: 'text', content: 'synced + live' }]);
});
test('run-sync skips unsafe group identifiers instead of registering them', () => {
@@ -1,5 +1,5 @@
import { describe, test, expect } from 'vitest';
import { ref, nextTick } from 'vue';
import { ref, nextTick, watchEffect } from 'vue';
import type {
InstanceAiMessage,
InstanceAiAgentNode,
@@ -78,10 +78,10 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.get('wf-1')).toEqual(
expect(producedArtifacts.get('wf-1')).toEqual(
expect.objectContaining({ type: 'workflow', id: 'wf-1', name: 'My Workflow' }),
);
expect(resourceNameIndex.value.get('my workflow')?.id).toBe('wf-1');
expect(resourceNameIndex.get('my workflow')?.id).toBe('wf-1');
});
test('falls back to args.name when result has no workflowName', async () => {
@@ -102,7 +102,7 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.get('wf-2')).toEqual(
expect(producedArtifacts.get('wf-2')).toEqual(
expect.objectContaining({ type: 'workflow', id: 'wf-2', name: 'From Args' }),
);
});
@@ -125,7 +125,7 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.get('wf-3')).toEqual(
expect(producedArtifacts.get('wf-3')).toEqual(
expect.objectContaining({ type: 'workflow', id: 'wf-3', name: 'Untitled' }),
);
});
@@ -153,8 +153,8 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.get('wf-a')?.id).toBe('wf-a');
expect(producedArtifacts.value.get('wf-b')?.id).toBe('wf-b');
expect(producedArtifacts.get('wf-a')?.id).toBe('wf-a');
expect(producedArtifacts.get('wf-b')?.id).toBe('wf-b');
});
});
@@ -179,7 +179,7 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.get('wf-edit')).toEqual(
expect(producedArtifacts.get('wf-edit')).toEqual(
expect.objectContaining({ type: 'workflow', id: 'wf-edit', name: 'Existing WF' }),
);
});
@@ -204,7 +204,7 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.size).toBe(0);
expect(producedArtifacts.size).toBe(0);
});
test('ignores credential targetResource (not surfaced in the artifacts panel)', async () => {
@@ -227,7 +227,7 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.size).toBe(0);
expect(producedArtifacts.size).toBe(0);
});
test('falls back to Untitled when targetResource has no name', async () => {
@@ -250,7 +250,7 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.get('wf-edit')?.name).toBe('Untitled');
expect(producedArtifacts.get('wf-edit')?.name).toBe('Untitled');
});
test('later build-workflow result overwrites the placeholder name', async () => {
@@ -279,8 +279,8 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.size).toBe(1);
expect(producedArtifacts.value.get('wf-edit')?.name).toBe('Renamed');
expect(producedArtifacts.size).toBe(1);
expect(producedArtifacts.get('wf-edit')?.name).toBe('Renamed');
});
});
@@ -309,8 +309,8 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.size).toBe(1);
expect(producedArtifacts.value.get('wf-1')?.name).toBe('Renamed');
expect(producedArtifacts.size).toBe(1);
expect(producedArtifacts.get('wf-1')?.name).toBe('Renamed');
});
test('patch call without a name preserves the existing name (no regression to Untitled)', async () => {
@@ -338,7 +338,7 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.get('wf-1')?.name).toBe('Keep Me');
expect(producedArtifacts.get('wf-1')?.name).toBe('Keep Me');
});
test('mutation result enriches an existing data-table entry with projectId', async () => {
@@ -368,8 +368,8 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.size).toBe(1);
expect(producedArtifacts.value.get('dt-1')).toEqual(
expect(producedArtifacts.size).toBe(1);
expect(producedArtifacts.get('dt-1')).toEqual(
expect.objectContaining({
type: 'data-table',
id: 'dt-1',
@@ -404,9 +404,9 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.size).toBe(0);
expect(resourceNameIndex.value.get('workspace workflow 1')?.id).toBe('wf-list-1');
expect(resourceNameIndex.value.get('workspace workflow 2')?.id).toBe('wf-list-2');
expect(producedArtifacts.size).toBe(0);
expect(resourceNameIndex.get('workspace workflow 1')?.id).toBe('wf-list-1');
expect(resourceNameIndex.get('workspace workflow 2')?.id).toBe('wf-list-2');
});
test('data-tables action=list result is indexed by name only', async () => {
@@ -429,8 +429,8 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.size).toBe(0);
expect(resourceNameIndex.value.get('existing table')?.id).toBe('dt-a');
expect(producedArtifacts.size).toBe(0);
expect(resourceNameIndex.get('existing table')?.id).toBe('dt-a');
});
test('a later write promotes a previously-listed resource into producedArtifacts', async () => {
@@ -456,12 +456,12 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.size).toBe(1);
expect(producedArtifacts.size).toBe(1);
// Produced entry keeps the 'Existing' name via fallback-to-untitled
// avoidance — the patch call has no name of its own.
expect(producedArtifacts.value.get('wf-1')?.name).toBe('Untitled');
expect(producedArtifacts.get('wf-1')?.name).toBe('Untitled');
// Name index still resolves 'existing'
expect(resourceNameIndex.value.get('existing')?.id).toBe('wf-1');
expect(resourceNameIndex.get('existing')?.id).toBe('wf-1');
});
});
@@ -485,9 +485,9 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.get('wf-3')?.name).toBe('Insert Random City Data');
expect(resourceNameIndex.value.get('insert random city data')?.id).toBe('wf-3');
expect(resourceNameIndex.value.get('untitled')).toBeUndefined();
expect(producedArtifacts.get('wf-3')?.name).toBe('Insert Random City Data');
expect(resourceNameIndex.get('insert random city data')?.id).toBe('wf-3');
expect(resourceNameIndex.get('untitled')).toBeUndefined();
});
test('keeps original name when lookup returns undefined', async () => {
@@ -508,7 +508,7 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.get('wf-4')?.name).toBe('Original Name');
expect(producedArtifacts.get('wf-4')?.name).toBe('Original Name');
});
test('does not enrich data-table entries', async () => {
@@ -530,7 +530,7 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.get('dt-1')?.name).toBe('cities1');
expect(producedArtifacts.get('dt-1')?.name).toBe('cities1');
});
});
@@ -558,7 +558,7 @@ describe('useResourceRegistry', () => {
];
await nextTick();
const entry = producedArtifacts.value.get('dt-mut-1') as ResourceEntry;
const entry = producedArtifacts.get('dt-mut-1') as ResourceEntry;
expect(entry).toEqual({
type: 'data-table',
id: 'dt-mut-1',
@@ -589,7 +589,7 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.get('dt-no-name')).toEqual({
expect(producedArtifacts.get('dt-no-name')).toEqual({
type: 'data-table',
id: 'dt-no-name',
name: 'dt-no-name',
@@ -622,7 +622,7 @@ describe('useResourceRegistry', () => {
];
await nextTick();
expect(producedArtifacts.value.get('dt-signups')).toEqual({
expect(producedArtifacts.get('dt-signups')).toEqual({
type: 'data-table',
id: 'dt-signups',
name: 'Signups',
@@ -631,4 +631,112 @@ describe('useResourceRegistry', () => {
},
);
});
describe('in-place reactivity contract', () => {
function setupWithArtifact() {
const result = setup();
result.messages.value = [
makeMessage({
agentTree: makeAgentNode({
toolCalls: [
makeToolCall({
toolName: 'build-workflow',
result: { workflowId: 'wf-1', workflowName: 'Pipeline' },
}),
],
}),
}),
];
return result;
}
test('entry objects keep their identity across rebuilds', async () => {
const { messages, producedArtifacts } = setupWithArtifact();
await nextTick();
const entryBefore = producedArtifacts.get('wf-1');
expect(entryBefore).toBeDefined();
messages.value[0].agentTree!.toolCalls.push(
makeToolCall({ toolCallId: 'tc-2', toolName: 'search' }),
);
await nextTick();
expect(producedArtifacts.get('wf-1')).toBe(entryBefore);
});
test('rebuilds that change nothing do not notify subscribers', async () => {
const { messages, resourceNameIndex } = setupWithArtifact();
let runs = 0;
const stop = watchEffect(() => {
void [...resourceNameIndex.values()];
runs++;
});
await nextTick();
const runsAfterSetup = runs;
// Structural event that does not change the registry: the derivation
// re-runs, the reconcile produces zero writes, nobody is notified.
messages.value[0].agentTree!.toolCalls.push(
makeToolCall({ toolCallId: 'tc-2', toolName: 'search' }),
);
await nextTick();
expect(runs).toBe(runsAfterSetup);
// A real registry change notifies.
messages.value[0].agentTree!.toolCalls.push(
makeToolCall({
toolCallId: 'tc-3',
toolName: 'build-workflow',
result: { workflowId: 'wf-2', workflowName: 'Second Pipeline' },
}),
);
await nextTick();
expect(runs).toBe(runsAfterSetup + 1);
expect(resourceNameIndex.get('second pipeline')?.id).toBe('wf-2');
stop();
});
test('field-level changes notify field readers, and removed fields are swept', async () => {
const messages = ref<InstanceAiMessage[]>([]);
const archived = ref<ReadonlySet<string>>(new Set());
const { producedArtifacts } = useResourceRegistry(
() => messages.value,
undefined,
() => archived.value,
);
messages.value = [
makeMessage({
agentTree: makeAgentNode({
toolCalls: [
makeToolCall({
toolName: 'build-workflow',
result: { workflowId: 'wf-1', workflowName: 'Pipeline' },
}),
],
}),
}),
];
let observed: boolean | undefined;
const stop = watchEffect(() => {
observed = producedArtifacts.get('wf-1')?.archived;
});
await nextTick();
expect(observed).toBeUndefined();
archived.value = new Set(['wf-1']);
await nextTick();
expect(observed).toBe(true);
// Un-archiving rebuilds the entry WITHOUT the field — the reconcile's
// removed-field sweep must delete it, not leave a stale true behind.
archived.value = new Set();
await nextTick();
expect(observed).toBeUndefined();
expect(Object.keys(producedArtifacts.get('wf-1') ?? {})).not.toContain('archived');
stop();
});
});
});
@@ -5,7 +5,6 @@ import type {
InstanceAiToolCallState,
TaskList,
} from '@n8n/api-types';
import { N8nText } from '@n8n/design-system';
import { useI18n } from '@n8n/i18n';
import { computed } from 'vue';
import {
@@ -23,9 +22,9 @@ import AgentSection from './AgentSection.vue';
import AnsweredQuestions from './AnsweredQuestions.vue';
import ArtifactCard from './ArtifactCard.vue';
import DelegateCard from './DelegateCard.vue';
import InstanceAiMarkdown from './InstanceAiMarkdown.vue';
import PlanReviewPanel, { type PlannedTaskArg, type PlanReviewStatus } from './PlanReviewPanel.vue';
import TaskChecklist from './TaskChecklist.vue';
import TimelineTextSegment from './TimelineTextSegment.vue';
import ToolCallStep from './ToolCallStep.vue';
const i18n = useI18n();
@@ -98,10 +97,6 @@ const props = withDefaults(
const timelineEntries = computed(() => props.visibleEntries ?? props.agentNode.timeline);
defineSlots<{
'after-tool-call'?: (props: { toolCall: InstanceAiToolCallState }) => unknown;
}>();
/** Index tool calls by ID for O(1) lookup and proper reactivity tracking. */
const toolCallsById = computed(() => {
const map: Record<string, InstanceAiToolCallState> = {};
@@ -258,18 +253,14 @@ function mapTaskItemsToPlannedTasks(tasks?: TaskList): PlannedTaskArg[] | undefi
<template>
<div v-if="hasVisibleEntries" :class="$style.timeline">
<template v-for="(entry, idx) in timelineEntries" :key="idx">
<!-- Text segment -->
<N8nText
<!-- Text segment (leaf keeps the per-token content read out of this render) -->
<TimelineTextSegment
v-if="entry.type === 'text'"
size="large"
:entry="entry"
:compact="props.compact"
:streaming="isStreamingTimelineEntry(props.agentNode, entry)"
:class="$style.timelineItem"
>
<InstanceAiMarkdown
:content="entry.content"
:streaming="isStreamingTimelineEntry(props.agentNode, entry)"
/>
</N8nText>
/>
<!-- Tool call (skip internal tools like updateWorkingMemory) -->
<template
@@ -322,9 +313,9 @@ function mapTaskItemsToPlannedTasks(tasks?: TaskList): PlannedTaskArg[] | undefi
toolCallsById[entry.toolCallId].isLoading
"
/>
<ToolCallStep v-else :tool-call="toolCallsById[entry.toolCallId]" :show-connector="true">
<slot name="after-tool-call" :tool-call="toolCallsById[entry.toolCallId]" />
</ToolCallStep>
<!-- Keep slot-free: slot children disable the props bailout and would
re-render every step on each timeline render -->
<ToolCallStep v-else :tool-call="toolCallsById[entry.toolCallId]" :show-connector="true" />
</template>
<!-- Child agent flat section. Running builder sub-agents are
@@ -0,0 +1,25 @@
<script lang="ts" setup>
import type { InstanceAiTimelineEntry } from '@n8n/api-types';
import { N8nText } from '@n8n/design-system';
import InstanceAiMarkdown from './InstanceAiMarkdown.vue';
/**
* Renders one text timeline entry. Keeps the per-token `entry.content` read
* out of AgentTimeline's render, so streamed tokens re-render only this segment.
*/
const props = withDefaults(
defineProps<{
entry: Extract<InstanceAiTimelineEntry, { type: 'text' }>;
compact?: boolean;
/** Forwarded to InstanceAiMarkdown to defer resource decoration mid-stream. */
streaming?: boolean;
}>(),
{ compact: false, streaming: false },
);
</script>
<template>
<N8nText size="large" :compact="props.compact">
<InstanceAiMarkdown :content="props.entry.content" :streaming="props.streaming" />
</N8nText>
</template>
@@ -1,9 +1,11 @@
import { reactive } from 'vue';
import {
getRenderHint,
createInitialState,
reduceEvent as reduceRunEvent,
toAgentTree,
findAgent,
stateFromAgentTree,
isSafeObjectKey,
} from '@n8n/api-types';
import type {
@@ -13,6 +15,19 @@ import type {
AgentRunState,
} from '@n8n/api-types';
/**
* Per-thread reducer state.
*
* The run states are node-centric (see `agent-run-reducer.ts`): the agent tree
* a message renders (`msg.agentTree`) IS the run state's root node the same
* live objects, mutated in place by the shared reducer. There is no separate
* denormalized copy to keep in sync.
*
* Run states are wrapped in Vue's `reactive()` (see `createRunState`), so those
* in-place mutations flow through the same canonical proxies the components
* render a text-delta invalidates exactly the component reading that node's
* text, nothing else.
*/
export interface InstanceAiReducerState {
messages: InstanceAiMessage[];
activeRunId: string | null;
@@ -20,21 +35,24 @@ export interface InstanceAiReducerState {
* One normalized AgentRunState per messageGroupId.
* All runs within the same user turn share one state.
* Falls back to runId as key when no messageGroupId exists.
*
* A Map (not a plain object): this is FE-only runtime state, never
* serialized no prototype-pollution footgun, clean add/delete/clear.
*/
runStateByGroupId: Record<string, AgentRunState>;
runStateByGroupId: Map<string, AgentRunState>;
/**
* Maps any runId to its messageGroupId (or itself when no group exists).
* This is the primary routing table every incoming event resolves its
* runId through this map to find the correct state and message.
* Survives follow-up chains: runA mg_1, runB mg_1, runC mg_1.
*/
groupIdByRunId: Record<string, string>;
groupIdByRunId: Map<string, string>;
}
/** Resolve a runId to its group key. */
function resolveGroupId(state: InstanceAiReducerState, runId: string): string {
if (!isSafeObjectKey(runId)) return runId;
const groupId = state.groupIdByRunId[runId];
const groupId = state.groupIdByRunId.get(runId);
return groupId && isSafeObjectKey(groupId) ? groupId : runId;
}
@@ -57,7 +75,7 @@ export function findMessageByRunId(
}
/**
* Find an agent node in the tree by agentId.
* Find an agent node in a message's tree by agentId.
* Searches recursively (no depth limit).
*/
export function findAgentNode(
@@ -83,6 +101,26 @@ function findNodeRecursive(
// Re-export for backward compat
export { getRenderHint };
/**
* Create a reactive run state. Reactivity must live on the run state itself:
* the shared reducer mutates it in place, and components render the very same
* node objects via `msg.agentTree` wrapping here makes those mutations
* observable everywhere without any synchronization layer.
*/
function createRunState(rootAgentId?: string): AgentRunState {
return reactive(createInitialState(rootAgentId));
}
/**
* Index a snapshot tree (session restore / run-sync) into a reactive run state.
* The tree's nodes are adopted, not copied live events keep mutating the
* exact objects the message already renders.
*/
export function createRunStateFromTree(tree: InstanceAiAgentNode): AgentRunState | undefined {
const runState = stateFromAgentTree(tree);
return runState ? reactive(runState) : undefined;
}
/**
* Get or create the AgentRunState for a group.
*/
@@ -92,12 +130,12 @@ function getOrCreateGroupState(
rootAgentId?: string,
): AgentRunState {
if (!isSafeObjectKey(groupId)) {
return createInitialState(rootAgentId);
return createRunState(rootAgentId);
}
let runState = state.runStateByGroupId[groupId];
let runState = state.runStateByGroupId.get(groupId);
if (!runState) {
runState = createInitialState(rootAgentId);
state.runStateByGroupId[groupId] = runState;
runState = createRunState(rootAgentId);
state.runStateByGroupId.set(groupId, runState);
}
return runState;
}
@@ -105,7 +143,7 @@ function getOrCreateGroupState(
/** Register a runId → groupId mapping. */
function registerRunId(state: InstanceAiReducerState, runId: string, groupId: string): void {
if (!isSafeObjectKey(runId) || !isSafeObjectKey(groupId)) return;
state.groupIdByRunId[runId] = groupId;
state.groupIdByRunId.set(runId, groupId);
}
function hasSafeEventKeys(event: InstanceAiEvent): boolean {
@@ -126,217 +164,6 @@ function hasSafeEventKeys(event: InstanceAiEvent): boolean {
}
}
/**
* Sync the agentTree on a message from the normalized run state.
* Uses in-place patching when the tree structure hasn't changed.
*/
function syncAgentTree(msg: InstanceAiMessage, runState: AgentRunState): void {
if (!msg.agentTree) {
msg.agentTree = toAgentTree(runState);
return;
}
if (!patchNodeInPlace(msg.agentTree, runState)) {
msg.agentTree = toAgentTree(runState);
}
}
function patchNodeInPlace(node: InstanceAiAgentNode, state: AgentRunState): boolean {
const agent = state.agentsById[node.agentId];
if (!agent) return false;
const childIds = state.childrenByAgentId[node.agentId] ?? [];
const toolCallIds = state.toolCallIdsByAgentId[node.agentId] ?? [];
const timeline = state.timelineByAgentId[node.agentId] ?? [];
if (node.children.length !== childIds.length || node.toolCalls.length !== toolCallIds.length) {
return false;
}
for (let i = 0; i < childIds.length; i++) {
if (node.children[i].agentId !== childIds[i]) return false;
}
node.status = agent.status;
node.textContent = agent.textContent;
node.reasoning = agent.reasoning;
node.result = agent.result;
node.error = agent.error;
node.tasks = agent.tasks;
node.planItems = agent.planItems;
node.kind = agent.kind;
node.title = agent.title;
node.subtitle = agent.subtitle;
node.goal = agent.goal;
node.targetResource = agent.targetResource;
for (let i = 0; i < toolCallIds.length; i++) {
if (!isSafeObjectKey(toolCallIds[i])) return false;
const tc = state.toolCallsById[toolCallIds[i]];
const existing = node.toolCalls[i];
if (!tc || !existing || existing.toolCallId !== tc.toolCallId) return false;
node.toolCalls[i] = { ...tc };
}
if (node.timeline.length !== timeline.length) return false;
for (let i = 0; i < timeline.length; i++) {
const existing = node.timeline[i];
const updated = timeline[i];
if (existing.type !== updated.type) return false;
if (existing.type === 'text' && updated.type === 'text') {
existing.content = updated.content;
}
}
for (let i = 0; i < childIds.length; i++) {
if (!patchNodeInPlace(node.children[i], state)) return false;
}
return true;
}
function patchStreamingTextTimeline(
node: InstanceAiAgentNode,
timeline: AgentRunState['timelineByAgentId'][string],
): boolean {
if (node.timeline.length === timeline.length) {
if (timeline.length === 0) return true;
for (let i = 0; i < timeline.length; i++) {
if (node.timeline[i].type !== timeline[i].type) return false;
}
const existingLast = node.timeline.at(-1);
const updatedLast = timeline.at(-1);
if (existingLast?.type !== 'text' || updatedLast?.type !== 'text') return false;
existingLast.content = updatedLast.content;
return true;
}
if (node.timeline.length + 1 !== timeline.length) return false;
for (let i = 0; i < node.timeline.length; i++) {
if (node.timeline[i].type !== timeline[i].type) return false;
}
const updatedLast = timeline.at(-1);
if (updatedLast?.type !== 'text') return false;
node.timeline.push({
type: 'text',
content: updatedLast.content,
...(updatedLast.responseId ? { responseId: updatedLast.responseId } : {}),
});
return true;
}
function syncStreamingTextNode(
msg: InstanceAiMessage,
runState: AgentRunState,
agentId: string,
): void {
if (!msg.agentTree) {
msg.agentTree = toAgentTree(runState);
return;
}
const renderedNode = findAgentNode(msg, agentId);
const stateNode = findAgent(runState, agentId);
if (!renderedNode || !stateNode) {
syncAgentTree(msg, runState);
return;
}
renderedNode.status = stateNode.status;
renderedNode.textContent = stateNode.textContent;
renderedNode.result = stateNode.result;
renderedNode.error = stateNode.error;
const timeline = runState.timelineByAgentId[agentId] ?? [];
if (!patchStreamingTextTimeline(renderedNode, timeline)) {
syncAgentTree(msg, runState);
return;
}
if (agentId === runState.rootAgentId) {
msg.content = stateNode.textContent;
}
}
function syncStreamingReasoningNode(
msg: InstanceAiMessage,
runState: AgentRunState,
agentId: string,
): void {
if (!msg.agentTree) {
msg.agentTree = toAgentTree(runState);
return;
}
const renderedNode = findAgentNode(msg, agentId);
const stateNode = findAgent(runState, agentId);
if (!renderedNode || !stateNode) {
syncAgentTree(msg, runState);
return;
}
renderedNode.status = stateNode.status;
renderedNode.reasoning = stateNode.reasoning;
renderedNode.result = stateNode.result;
renderedNode.error = stateNode.error;
if (agentId === runState.rootAgentId) {
msg.reasoning = stateNode.reasoning;
}
}
/**
* Rebuild an AgentRunState from a snapshot tree (for run-sync).
*/
export function rebuildRunStateFromTree(tree: InstanceAiAgentNode): AgentRunState | undefined {
if (!isSafeObjectKey(tree.agentId)) return undefined;
const runState = createInitialState(tree.agentId);
populateRunStateFromNode(runState, tree, undefined);
runState.status = tree.status === 'active' ? 'active' : tree.status;
return runState;
}
function populateRunStateFromNode(
state: AgentRunState,
node: InstanceAiAgentNode,
parentId: string | undefined,
): void {
if (!isSafeObjectKey(node.agentId)) return;
state.agentsById[node.agentId] = {
agentId: node.agentId,
role: node.role,
tools: node.tools,
taskId: node.taskId,
kind: node.kind,
title: node.title,
subtitle: node.subtitle,
goal: node.goal,
targetResource: node.targetResource,
status: node.status,
textContent: node.textContent,
reasoning: node.reasoning,
tasks: node.tasks,
planItems: node.planItems,
result: node.result,
error: node.error,
};
if (parentId && isSafeObjectKey(parentId)) state.parentByAgentId[node.agentId] = parentId;
const safeChildren = node.children.filter((child) => isSafeObjectKey(child.agentId));
state.childrenByAgentId[node.agentId] = safeChildren.map((child) => child.agentId);
state.timelineByAgentId[node.agentId] = node.timeline.filter((entry) => {
if (entry.type === 'child') return isSafeObjectKey(entry.agentId);
if (entry.type === 'tool-call') return isSafeObjectKey(entry.toolCallId);
return true;
});
const safeToolCalls = node.toolCalls.filter((toolCall) => isSafeObjectKey(toolCall.toolCallId));
state.toolCallIdsByAgentId[node.agentId] = safeToolCalls.map((toolCall) => toolCall.toolCallId);
for (const tc of safeToolCalls) {
state.toolCallsById[tc.toolCallId] = { ...tc };
}
for (const child of safeChildren) {
populateRunStateFromNode(state, child, node.agentId);
}
}
// ---------------------------------------------------------------------------
// Event routing helper — resolves runId → groupId → (message, runState)
// ---------------------------------------------------------------------------
@@ -351,7 +178,7 @@ function resolveTarget(state: InstanceAiReducerState, runId: string): ResolvedTa
const groupId = resolveGroupId(state, runId);
return {
msg: findMessageByGroupId(state, groupId),
runState: state.runStateByGroupId[groupId],
runState: state.runStateByGroupId.get(groupId),
groupId,
};
}
@@ -363,8 +190,8 @@ function resolveTarget(state: InstanceAiReducerState, runId: string): ResolvedTa
/** Mutates state.messages in-place. Returns the new activeRunId (may differ from input). */
export function handleEvent(state: InstanceAiReducerState, event: InstanceAiEvent): string | null {
// Ensure maps exist (backward compat)
if (!state.groupIdByRunId) state.groupIdByRunId = {};
if (!state.runStateByGroupId) state.runStateByGroupId = {};
if (!state.groupIdByRunId) state.groupIdByRunId = new Map();
if (!state.runStateByGroupId) state.runStateByGroupId = new Map();
if (!hasSafeEventKeys(event)) return state.activeRunId;
// Mid-run replay guard: if we receive events for a runId that has no
@@ -398,17 +225,16 @@ export function handleEvent(state: InstanceAiReducerState, event: InstanceAiEven
// Auto-follow-up merging: if this group already has a message, merge.
const existingMsg = findMessageByGroupId(state, messageGroupId);
if (existingMsg) {
const runState = state.runStateByGroupId[messageGroupId];
const runState = state.runStateByGroupId.get(messageGroupId);
if (runState) {
// Re-activate root orchestrator — do NOT call reduceRunEvent
// which would wipe the agent tree.
runState.status = 'active';
const root = findAgent(runState, runState.rootAgentId);
if (root) root.status = 'active';
// The shared reducer preserves the existing agent tree for
// follow-up runs and re-activates the root orchestrator.
reduceRunEvent(runState, event);
// Re-point in case run-start re-initialized the root node.
existingMsg.agentTree = toAgentTree(runState);
}
existingMsg.runId = event.runId;
existingMsg.isStreaming = true;
if (runState) syncAgentTree(existingMsg, runState);
return event.runId;
}
@@ -433,7 +259,11 @@ export function handleEvent(state: InstanceAiReducerState, event: InstanceAiEven
const { msg, runState } = resolveTarget(state, event.runId);
if (runState) {
reduceRunEvent(runState, event);
if (msg) syncStreamingTextNode(msg, runState, event.agentId);
// Mirror the root agent's text onto the message for consumers that
// read `msg.content` (previews, fallback rendering, feedback copy).
if (msg && event.agentId === runState.rootAgentId) {
msg.content = findAgent(runState, event.agentId)?.textContent ?? msg.content;
}
}
return state.activeRunId;
}
@@ -442,7 +272,9 @@ export function handleEvent(state: InstanceAiReducerState, event: InstanceAiEven
const { msg, runState } = resolveTarget(state, event.runId);
if (runState) {
reduceRunEvent(runState, event);
if (msg) syncStreamingReasoningNode(msg, runState, event.agentId);
if (msg && event.agentId === runState.rootAgentId) {
msg.reasoning = findAgent(runState, event.agentId)?.reasoning ?? msg.reasoning;
}
}
return state.activeRunId;
}
@@ -453,11 +285,11 @@ export function handleEvent(state: InstanceAiReducerState, event: InstanceAiEven
case 'agent-spawned':
case 'agent-completed':
case 'confirmation-request':
case 'tasks-update': {
const { msg, runState } = resolveTarget(state, event.runId);
case 'tasks-update':
case 'status': {
const { runState } = resolveTarget(state, event.runId);
if (runState) {
reduceRunEvent(runState, event);
if (msg) syncAgentTree(msg, runState);
}
return state.activeRunId;
}
@@ -466,25 +298,23 @@ export function handleEvent(state: InstanceAiReducerState, event: InstanceAiEven
const { msg, runState } = resolveTarget(state, event.runId);
if (runState) {
reduceRunEvent(runState, event);
if (msg) {
const targetAgentId = findAgent(runState, event.agentId)
? event.agentId
: runState.rootAgentId;
syncStreamingTextNode(msg, runState, targetAgentId);
// Enrich the rendered node with structured error details from HEAD
const target = findAgentNode(msg, targetAgentId) ?? msg.agentTree;
if (target) {
target.status = 'error';
target.error = event.payload.content;
target.errorDetails = {
...(event.payload.statusCode !== undefined
? { statusCode: event.payload.statusCode }
: {}),
...(event.payload.provider ? { provider: event.payload.provider } : {}),
...(event.payload.technicalDetails
? { technicalDetails: event.payload.technicalDetails }
: {}),
};
// Enrich the affected agent with structured error details.
const target =
findAgent(runState, event.agentId) ?? findAgent(runState, runState.rootAgentId);
if (target) {
target.status = 'error';
target.error = event.payload.content;
target.errorDetails = {
...(event.payload.statusCode !== undefined
? { statusCode: event.payload.statusCode }
: {}),
...(event.payload.provider ? { provider: event.payload.provider } : {}),
...(event.payload.technicalDetails
? { technicalDetails: event.payload.technicalDetails }
: {}),
};
if (msg && target.agentId === runState.rootAgentId) {
msg.content = target.textContent;
}
}
} else if (msg) {
@@ -501,18 +331,14 @@ export function handleEvent(state: InstanceAiReducerState, event: InstanceAiEven
const { msg, runState } = resolveTarget(state, event.runId);
if (runState) {
reduceRunEvent(runState, event);
if (msg) {
msg.isStreaming = false;
syncAgentTree(msg, runState);
// Preserve error status set by a prior 'error' event (don't downgrade)
const { status, reason } = event.payload;
if (msg.agentTree && msg.agentTree.status !== 'error' && status === 'error') {
msg.agentTree.status = 'error';
}
if (msg.agentTree && status === 'error' && reason && !msg.agentTree.error) {
msg.agentTree.error = reason;
}
const { status, reason } = event.payload;
const root = findAgent(runState, runState.rootAgentId);
// Surface the failure reason on the root agent (the shared reducer
// only sets the status).
if (root && status === 'error' && reason && !root.error) {
root.error = reason;
}
if (msg) msg.isStreaming = false;
} else if (msg) {
msg.isStreaming = false;
const { status, reason } = event.payload;
@@ -31,7 +31,7 @@ import {
fetchThreadMessages as fetchThreadMessagesApi,
fetchThreadStatus as fetchThreadStatusApi,
} from './instanceAi.memory.api';
import { handleEvent as reduceEvent, rebuildRunStateFromTree } from './instanceAi.reducer';
import { handleEvent as reduceEvent, createRunStateFromTree } from './instanceAi.reducer';
import { getLatestBuildResult } from './canvasPreview.utils';
import { useResourceRegistry } from './useResourceRegistry';
import { useResponseFeedback } from './useResponseFeedback';
@@ -198,34 +198,35 @@ export function collapseDeltaEvents(events: DebugEventEntry[]): DebugEventEntry[
/**
* Walk historical messages and build the reducer routing maps that SSE replay
* events need to reduce into existing run state. Pure: returns fresh maps the
* caller can `Object.assign` onto its own state.
* events need to reduce into existing run state. Each message's agent tree is
* adopted (not copied) into its run state, so replayed/live events mutate the
* exact nodes the message renders.
*
* - `runStateByGroupId`: snapshot of run state keyed by message group id
* - `runStateByGroupId`: run state per message group id, adopting `msg.agentTree`
* - `groupIdByRunId`: every runId in the group its group id, so late events
* from older runs in a merged ABC chain still route to the right message
*/
export function buildRoutingFromMessages(messages: InstanceAiMessage[]): {
runStateByGroupId: Record<string, AgentRunState>;
groupIdByRunId: Record<string, string>;
runStateByGroupId: Map<string, AgentRunState>;
groupIdByRunId: Map<string, string>;
} {
const runStateByGroupId: Record<string, AgentRunState> = {};
const groupIdByRunId: Record<string, string> = {};
const runStateByGroupId = new Map<string, AgentRunState>();
const groupIdByRunId = new Map<string, string>();
for (const msg of messages) {
if (msg.role !== 'assistant' || !msg.agentTree) continue;
const groupId = msg.messageGroupId ?? msg.runId;
if (!groupId || !isSafeObjectKey(groupId)) continue;
const rebuiltRunState = rebuildRunStateFromTree(msg.agentTree);
const rebuiltRunState = createRunStateFromTree(msg.agentTree);
if (!rebuiltRunState) continue;
runStateByGroupId[groupId] = rebuiltRunState;
runStateByGroupId.set(groupId, rebuiltRunState);
if (msg.runIds) {
for (const rid of msg.runIds) {
if (!isSafeObjectKey(rid)) continue;
groupIdByRunId[rid] = groupId;
groupIdByRunId.set(rid, groupId);
}
}
if (msg.runId && isSafeObjectKey(msg.runId)) groupIdByRunId[msg.runId] = groupId;
if (msg.runId && isSafeObjectKey(msg.runId)) groupIdByRunId.set(msg.runId, groupId);
}
return { runStateByGroupId, groupIdByRunId };
@@ -265,9 +266,13 @@ export function createThreadRuntime(
const activePlanEdit = ref<PlanEditContext | null>(null);
const updatingPlanRequestIds = reactive(new Set<string>());
// --- Non-reactive runtime state ---
let runStateByGroupId: Record<string, AgentRunState> = {};
let groupIdByRunId: Record<string, string> = {};
// --- Reducer routing state ---
// Plain Maps: the routing tables themselves are never rendered. The run
// STATES they hold are reactive (created via `createRunState*` in the
// reducer) — that's where rendering reactivity lives, since `msg.agentTree`
// is the run state's own root node.
const runStateByGroupId = new Map<string, AgentRunState>();
const groupIdByRunId = new Map<string, string>();
let eventSource: EventSource | null = null;
let sseGeneration = 0;
let hydrationGeneration = 0;
@@ -551,7 +556,9 @@ export function createThreadRuntime(
const groupId = data.messageGroupId ?? data.runId;
if (!isSafeObjectKey(data.runId) || !isSafeObjectKey(groupId)) return;
const rebuiltRunState = rebuildRunStateFromTree(data.agentTree);
// Adopts the snapshot tree's nodes — `msg.agentTree` below points at the
// same objects, so subsequent live events mutate what's rendered.
const rebuiltRunState = createRunStateFromTree(data.agentTree);
if (!rebuiltRunState) return;
// Find the message to update — by messageGroupId first, then runId
@@ -601,7 +608,7 @@ export function createThreadRuntime(
}
// Rebuild normalized run state keyed by groupId
runStateByGroupId[groupId] = rebuiltRunState;
runStateByGroupId.set(groupId, rebuiltRunState);
// Restore runId → groupId mappings for ALL runs in the group.
// This ensures late events from older follow-up runs still route
@@ -609,11 +616,11 @@ export function createThreadRuntime(
if (data.runIds) {
for (const rid of data.runIds) {
if (!isSafeObjectKey(rid)) continue;
groupIdByRunId[rid] = groupId;
groupIdByRunId.set(rid, groupId);
}
}
// Always register the current runId
groupIdByRunId[data.runId] = groupId;
groupIdByRunId.set(data.runId, groupId);
} catch {
// Malformed run-sync — skip
}
@@ -687,8 +694,8 @@ export function createThreadRuntime(
resetFeedback();
resolvedConfirmationIds.clear();
sessionAlwaysAllowKeys.value = new Set();
runStateByGroupId = {};
groupIdByRunId = {};
runStateByGroupId.clear();
groupIdByRunId.clear();
lastEventId.value = undefined;
}
@@ -722,9 +729,9 @@ export function createThreadRuntime(
// Rebuild reducer routing state from historical messages so SSE
// replay events (which arrive before run-sync) can reduce into
// existing run states instead of being dropped or creating phantoms.
const routing = buildRoutingFromMessages(result.messages);
Object.assign(runStateByGroupId, routing.runStateByGroupId);
Object.assign(groupIdByRunId, routing.groupIdByRunId);
const routing = buildRoutingFromMessages(messages.value);
routing.runStateByGroupId.forEach((value, key) => runStateByGroupId.set(key, value));
routing.groupIdByRunId.forEach((value, key) => groupIdByRunId.set(key, value));
}
// Set SSE cursor to skip past events already covered by historical messages.
// This prevents duplicate messages when SSE replays in-memory events.
@@ -1,11 +1,11 @@
import { computed } from 'vue';
import { reactive, watch } from 'vue';
import type {
InstanceAiMessage,
InstanceAiAgentNode,
InstanceAiToolCallState,
} from '@n8n/api-types';
export interface ResourceEntry {
export type ResourceEntry = {
type: 'workflow' | 'credential' | 'data-table';
id: string;
name: string;
@@ -19,7 +19,7 @@ export interface ResourceEntry {
* "Archived" label.
*/
archived?: boolean;
}
};
// ---------------------------------------------------------------------------
// Internal helpers (defined before use to satisfy no-use-before-define)
@@ -264,35 +264,77 @@ export function useResourceRegistry(
workflowNameLookup?: (id: string) => string | undefined,
archivedWorkflowIds?: () => ReadonlySet<string>,
) {
const collections = computed((): Collections => {
const col: Collections = {
produced: new Map<string, ResourceEntry>(),
byName: new Map<string, ResourceEntry>(),
};
// Long-lived reactive maps, reconciled in place: rebuilds that change
// nothing trigger nothing.
const producedArtifacts = reactive(new Map<string, ResourceEntry>());
const resourceNameIndex = reactive(new Map<string, ResourceEntry>());
for (const msg of messages()) {
if (!msg.agentTree) continue;
collectFromAgentNode(msg.agentTree, col);
}
// Derived from `messages` so every state-arrival path (hydration, run-sync
// replacement, rollback, reset) self-heals on the next derivation. Must
// stay a watch: the handler reads the target maps, so a watchEffect would
// re-trigger itself.
watch(
(): Collections => {
const col: Collections = {
produced: new Map<string, ResourceEntry>(),
byName: new Map<string, ResourceEntry>(),
};
if (workflowNameLookup) {
enrichWorkflowNames(col, workflowNameLookup);
}
for (const msg of messages()) {
if (!msg.agentTree) continue;
collectFromAgentNode(msg.agentTree, col);
}
const archived = archivedWorkflowIds?.();
if (archived && archived.size > 0) {
for (const entry of col.produced.values()) {
if (entry.type === 'workflow' && archived.has(entry.id)) {
entry.archived = true;
if (workflowNameLookup) {
enrichWorkflowNames(col, workflowNameLookup);
}
const archived = archivedWorkflowIds?.();
if (archived && archived.size > 0) {
for (const entry of col.produced.values()) {
if (entry.type === 'workflow' && archived.has(entry.id)) {
entry.archived = true;
}
}
}
}
return col;
});
return col;
},
(col) => {
reconcileMap(producedArtifacts, col.produced);
reconcileMap(resourceNameIndex, col.byName);
},
{ immediate: true },
);
return {
producedArtifacts: computed(() => collections.value.produced),
resourceNameIndex: computed(() => collections.value.byName),
};
return { producedArtifacts, resourceNameIndex };
}
/** Sync `target` to `next` with minimal writes — unchanged entries trigger no subscribers. */
function reconcileMap(target: Map<string, ResourceEntry>, next: Map<string, ResourceEntry>): void {
for (const key of [...target.keys()]) {
if (!next.has(key)) target.delete(key);
}
for (const [key, entry] of next) {
const existing = target.get(key);
if (existing) {
reconcileEntryFields(existing, entry);
} else {
target.set(key, entry);
}
}
}
/**
* Per-field sync: `Object.assign` writes through the proxy (equal values
* trigger nothing), the sweep deletes fields the new entry no longer carries.
*/
function reconcileEntryFields(
existing: Record<string, unknown>,
next: Record<string, unknown>,
): void {
for (const key of Object.keys(existing)) {
if (!(key in next)) Reflect.deleteProperty(existing, key);
}
Object.assign(existing, next);
}