fix(core): Persist run data for Instance AI trigger executions in queue mode (#32498)

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mutasem Aldmour
2026-06-18 14:34:11 +02:00
committed by GitHub
parent 314324e285
commit db1e21fecf
4 changed files with 81 additions and 0 deletions
@@ -2670,4 +2670,31 @@ describe('createExecutionAdapter run()', () => {
}),
);
});
it('populates executionData for a trigger run with no input so it survives queue persistence', async () => {
const { adapter, mockWorkflowRunner } = createRunAdapterForTests({
id: 'wf-1',
nodes: [
{
id: 'node-1',
name: 'Schedule Trigger',
type: 'n8n-nodes-base.scheduleTrigger',
typeVersion: 1,
parameters: {},
position: [0, 0],
},
],
});
await adapter.run('wf-1');
const runData = mockWorkflowRunner.run.mock.calls[0][0];
expect(runData.executionMode).toBe('trigger');
// In queue mode a trigger execution is offloaded to a worker, which reads
// `execution.data` back from storage. An undefined `executionData` persists
// as an empty payload and deserializes to `undefined`, crashing the worker.
expect(runData.executionData).toBeDefined();
expect(runData.executionData?.manualData?.userId).toBe('user-1');
expect(runData.executionData?.manualData?.triggerToStartFrom?.name).toBe('Schedule Trigger');
});
});
@@ -994,6 +994,19 @@ export class InstanceAiAdapterService {
if (Object.keys(basePinData).length > 0) {
runData.pinData = basePinData;
}
// In queue mode this execution is offloaded to a worker, which reads
// `execution.data` back from storage. Persist a valid run-data object
// (the worker reconstructs the run and starts from the trigger) so an
// undefined payload doesn't deserialize to `undefined` and crash the worker.
runData.executionData = createRunExecutionData({
startData: {},
resultData: { pinData: runData.pinData, runData: null },
manualData: {
userId: user.id,
triggerToStartFrom: runData.triggerToStartFrom,
},
executionData: null,
});
} else if (Object.keys(basePinData).length > 0) {
runData.pinData = basePinData;
}
@@ -126,6 +126,37 @@ describe('JobProcessor', () => {
expect(result).toEqual({ success: false });
});
it('should throw a descriptive error when the execution has no run data', async () => {
const executionRepository = mock<ExecutionRepository>();
const executionPersistence = mock<ExecutionPersistence>();
executionPersistence.findSingleExecution.mockResolvedValue(
mock<IExecutionResponse>({
id: 'execution-id',
mode: 'trigger',
workflowData: { nodes: [] },
data: undefined,
}),
);
const manualExecutionService = createManualExecutionServiceMock();
const jobProcessor = new JobProcessor(
logger,
executionRepository,
executionPersistence,
mock(),
mock(),
mock(),
manualExecutionService,
executionsConfig,
mock(),
);
const job = mock<Job>({ data: { executionId: 'execution-id', loadStaticData: false } });
await expect(jobProcessor.processJob(job)).rejects.toThrow(/without run data/);
expect(manualExecutionService.runManually).not.toHaveBeenCalled();
});
it.each(['manual', 'evaluation'] satisfies WorkflowExecuteMode[])(
'should use manualExecutionService to process a job in %p mode',
async (mode) => {
+10
View File
@@ -96,6 +96,16 @@ export class JobProcessor {
*/
if (execution.status === 'crashed') return { success: false };
// A correctly enqueued execution always carries a run-data payload. A missing
// one means the producer persisted no data, which would otherwise surface as an
// opaque `Cannot read properties of undefined` deref further down. Fail with a
// clear, attributable error instead.
if (!execution.data) {
throw new UnexpectedError(
`Worker received execution ${executionId} without run data (job ${job.id})`,
);
}
const workflowId = execution.workflowData.id;
this.logger.info(`Worker started execution ${executionId} (job ${job.id})`, {