mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-19 07:36:52 +00:00
perf(core): Ungate minimize execution data fetching (#30969)
This commit is contained in:
@@ -638,37 +638,14 @@ describe('workflow timeout with startedAt', () => {
|
||||
});
|
||||
|
||||
describe('needsFullExecutionData', () => {
|
||||
const originalEnv = process.env.N8N_MINIMIZE_EXECUTION_DATA_FETCHING;
|
||||
|
||||
afterEach(() => {
|
||||
if (originalEnv === undefined) {
|
||||
delete process.env.N8N_MINIMIZE_EXECUTION_DATA_FETCHING;
|
||||
} else {
|
||||
process.env.N8N_MINIMIZE_EXECUTION_DATA_FETCHING = originalEnv;
|
||||
}
|
||||
});
|
||||
|
||||
it('should return true when forceFullExecutionData is true even with N8N_MINIMIZE_EXECUTION_DATA_FETCHING set', () => {
|
||||
process.env.N8N_MINIMIZE_EXECUTION_DATA_FETCHING = 'true';
|
||||
|
||||
it('should return true when forceFullExecutionData is true', () => {
|
||||
// @ts-expect-error Private method
|
||||
const result = runner.needsFullExecutionData('evaluation', 'exec-id', true);
|
||||
|
||||
expect(result).toBe(true);
|
||||
});
|
||||
|
||||
it('should return true when env var is not set and forceFullExecutionData is undefined', () => {
|
||||
delete process.env.N8N_MINIMIZE_EXECUTION_DATA_FETCHING;
|
||||
|
||||
// @ts-expect-error Private method
|
||||
const result = runner.needsFullExecutionData('webhook', 'exec-id', undefined);
|
||||
|
||||
expect(result).toBe(true);
|
||||
});
|
||||
|
||||
it('should return false when env var is set, forceFullExecutionData is undefined, and mode is not integrated', () => {
|
||||
process.env.N8N_MINIMIZE_EXECUTION_DATA_FETCHING = 'true';
|
||||
|
||||
it('should return false when forceFullExecutionData is undefined and mode is not integrated', () => {
|
||||
const activeExecutions = Container.get(ActiveExecutions);
|
||||
jest.spyOn(activeExecutions, 'getResponseMode').mockReturnValue('responseNode');
|
||||
|
||||
@@ -678,9 +655,7 @@ describe('needsFullExecutionData', () => {
|
||||
expect(result).toBe(false);
|
||||
});
|
||||
|
||||
it('should return true when env var is set and mode is integrated', () => {
|
||||
process.env.N8N_MINIMIZE_EXECUTION_DATA_FETCHING = 'true';
|
||||
|
||||
it('should return true when mode is integrated', () => {
|
||||
// @ts-expect-error Private method
|
||||
const result = runner.needsFullExecutionData('integrated', 'exec-id', undefined);
|
||||
|
||||
|
||||
@@ -71,7 +71,38 @@ const executionsConfig = mock<ExecutionsConfig>({
|
||||
maxTimeout: 3600,
|
||||
});
|
||||
|
||||
const successRun = (): IRun =>
|
||||
mock<IRun>({
|
||||
status: 'success',
|
||||
stoppedAt: new Date(),
|
||||
data: mock<IRunExecutionData>({
|
||||
resultData: { runData: {}, error: undefined },
|
||||
executionData: undefined,
|
||||
}),
|
||||
});
|
||||
|
||||
const errorRun = (error: ExecutionError): IRun =>
|
||||
mock<IRun>({
|
||||
status: 'error',
|
||||
stoppedAt: new Date(),
|
||||
data: mock<IRunExecutionData>({
|
||||
resultData: { runData: {}, error },
|
||||
executionData: undefined,
|
||||
}),
|
||||
});
|
||||
|
||||
const createManualExecutionServiceMock = (run: IRun = successRun()): ManualExecutionService => {
|
||||
const svc = mock<ManualExecutionService>();
|
||||
svc.runManually.mockReturnValue(Promise.resolve(run) as ReturnType<typeof svc.runManually>);
|
||||
return svc;
|
||||
};
|
||||
|
||||
describe('JobProcessor', () => {
|
||||
beforeEach(() => {
|
||||
processRunExecutionDataMock.mockReset();
|
||||
processRunExecutionDataMock.mockResolvedValue(successRun());
|
||||
});
|
||||
|
||||
it('should refrain from processing a crashed execution', async () => {
|
||||
const executionRepository = mock<ExecutionRepository>();
|
||||
const executionPersistence = mock<ExecutionPersistence>();
|
||||
@@ -110,7 +141,7 @@ describe('JobProcessor', () => {
|
||||
}),
|
||||
);
|
||||
|
||||
const manualExecutionService = mock<ManualExecutionService>();
|
||||
const manualExecutionService = createManualExecutionServiceMock();
|
||||
const jobProcessor = new JobProcessor(
|
||||
logger,
|
||||
executionRepository,
|
||||
@@ -141,7 +172,6 @@ describe('JobProcessor', () => {
|
||||
it('should send job-finished with success=false when execution has errors', async () => {
|
||||
const executionRepository = mock<ExecutionRepository>();
|
||||
const executionPersistence = mock<ExecutionPersistence>();
|
||||
// First call: initial execution fetch (no error yet)
|
||||
executionPersistence.findSingleExecution.mockResolvedValueOnce(
|
||||
mock<IExecutionResponse>({
|
||||
mode: 'manual',
|
||||
@@ -151,19 +181,10 @@ describe('JobProcessor', () => {
|
||||
}),
|
||||
}),
|
||||
);
|
||||
// Second call: after execution completes, fetch again to check for errors
|
||||
executionPersistence.findSingleExecution.mockResolvedValueOnce(
|
||||
mock<IExecutionResponse>({
|
||||
status: 'error',
|
||||
data: {
|
||||
resultData: {
|
||||
error: mock<ExecutionError>(),
|
||||
},
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
const manualExecutionService = mock<ManualExecutionService>();
|
||||
const manualExecutionService = createManualExecutionServiceMock(
|
||||
errorRun(mock<ExecutionError>()),
|
||||
);
|
||||
const jobProcessor = new JobProcessor(
|
||||
logger,
|
||||
executionRepository,
|
||||
@@ -211,7 +232,7 @@ describe('JobProcessor', () => {
|
||||
const additionalData = mock<IWorkflowExecuteAdditionalData>();
|
||||
jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue(additionalData);
|
||||
|
||||
const manualExecutionService = mock<ManualExecutionService>();
|
||||
const manualExecutionService = createManualExecutionServiceMock();
|
||||
const jobProcessor = new JobProcessor(
|
||||
logger,
|
||||
executionRepository,
|
||||
@@ -260,7 +281,7 @@ describe('JobProcessor', () => {
|
||||
const additionalData = mock<IWorkflowExecuteAdditionalData>();
|
||||
jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue(additionalData);
|
||||
|
||||
const manualExecutionService = mock<ManualExecutionService>();
|
||||
const manualExecutionService = createManualExecutionServiceMock();
|
||||
const jobProcessor = new JobProcessor(
|
||||
logger,
|
||||
executionRepository,
|
||||
@@ -315,7 +336,7 @@ describe('JobProcessor', () => {
|
||||
const additionalData = mock<IWorkflowExecuteAdditionalData>();
|
||||
jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue(additionalData);
|
||||
|
||||
const manualExecutionService = mock<ManualExecutionService>();
|
||||
const manualExecutionService = createManualExecutionServiceMock();
|
||||
const jobProcessor = new JobProcessor(
|
||||
logger,
|
||||
executionRepository,
|
||||
@@ -358,7 +379,7 @@ describe('JobProcessor', () => {
|
||||
}),
|
||||
);
|
||||
|
||||
const manualExecutionService = mock<ManualExecutionService>();
|
||||
const manualExecutionService = createManualExecutionServiceMock();
|
||||
const mcpInstanceSettings = {
|
||||
hostId: 'worker-host-123',
|
||||
} as unknown as InstanceSettings;
|
||||
@@ -421,7 +442,7 @@ describe('JobProcessor', () => {
|
||||
}),
|
||||
);
|
||||
|
||||
const manualExecutionService = mock<ManualExecutionService>();
|
||||
const manualExecutionService = createManualExecutionServiceMock();
|
||||
const jobProcessor = new JobProcessor(
|
||||
logger,
|
||||
executionRepository,
|
||||
@@ -464,21 +485,10 @@ describe('JobProcessor', () => {
|
||||
}),
|
||||
}),
|
||||
);
|
||||
// Second call shows error
|
||||
executionPersistence.findSingleExecution.mockResolvedValueOnce(
|
||||
mock<IExecutionResponse>({
|
||||
status: 'error',
|
||||
workflowData: { id: 'wf-1', nodes: [], staticData: {} },
|
||||
data: mock<IRunExecutionData>({
|
||||
resultData: {
|
||||
runData: {},
|
||||
error: { message: 'Test error' } as ExecutionError,
|
||||
},
|
||||
}),
|
||||
}),
|
||||
);
|
||||
|
||||
const manualExecutionService = mock<ManualExecutionService>();
|
||||
const manualExecutionService = createManualExecutionServiceMock(
|
||||
errorRun({ message: 'Test error' } as ExecutionError),
|
||||
);
|
||||
const mcpInstanceSettings = {
|
||||
hostId: 'worker-host-123',
|
||||
} as unknown as InstanceSettings;
|
||||
@@ -539,7 +549,7 @@ describe('JobProcessor', () => {
|
||||
}),
|
||||
);
|
||||
|
||||
const manualExecutionService = mock<ManualExecutionService>();
|
||||
const manualExecutionService = createManualExecutionServiceMock();
|
||||
const mcpInstanceSettings = {
|
||||
hostId: 'worker-host-123',
|
||||
} as unknown as InstanceSettings;
|
||||
@@ -609,7 +619,7 @@ describe('JobProcessor', () => {
|
||||
}),
|
||||
);
|
||||
|
||||
const manualExecutionService = mock<ManualExecutionService>();
|
||||
const manualExecutionService = createManualExecutionServiceMock();
|
||||
const mcpInstanceSettings = {
|
||||
hostId: 'worker-host-123',
|
||||
} as unknown as InstanceSettings;
|
||||
@@ -700,7 +710,7 @@ describe('JobProcessor', () => {
|
||||
}),
|
||||
);
|
||||
|
||||
const manualExecutionService = mock<ManualExecutionService>();
|
||||
const manualExecutionService = createManualExecutionServiceMock();
|
||||
const mcpInstanceSettings = {
|
||||
hostId: 'worker-host-123',
|
||||
} as unknown as InstanceSettings;
|
||||
@@ -788,7 +798,7 @@ describe('JobProcessor', () => {
|
||||
}),
|
||||
);
|
||||
|
||||
const manualExecutionService = mock<ManualExecutionService>();
|
||||
const manualExecutionService = createManualExecutionServiceMock();
|
||||
const mcpInstanceSettings = {
|
||||
hostId: 'worker-host-123',
|
||||
} as unknown as InstanceSettings;
|
||||
@@ -889,7 +899,7 @@ describe('JobProcessor', () => {
|
||||
}),
|
||||
);
|
||||
|
||||
const manualExecutionService = mock<ManualExecutionService>();
|
||||
const manualExecutionService = createManualExecutionServiceMock();
|
||||
const mcpInstanceSettings = {
|
||||
hostId: 'worker-host-123',
|
||||
} as unknown as InstanceSettings;
|
||||
@@ -985,48 +995,14 @@ describe('JobProcessor', () => {
|
||||
expect(props.status).toBe('waiting');
|
||||
});
|
||||
|
||||
it('carries waitTill on JobFinishedProps from the persisted execution (DB-fetch path)', async () => {
|
||||
const waitTill = new Date(Date.now() + 60_000);
|
||||
const executionRepository = mock<ExecutionRepository>();
|
||||
const executionPersistence = mock<ExecutionPersistence>();
|
||||
const persisted = mock<IExecutionResponse>({
|
||||
status: 'waiting',
|
||||
stoppedAt: new Date(),
|
||||
data: mock<IRunExecutionData>({
|
||||
resultData: { runData: {}, error: undefined },
|
||||
executionData: undefined,
|
||||
}),
|
||||
});
|
||||
persisted.waitTill = waitTill;
|
||||
executionPersistence.findSingleExecution.mockResolvedValueOnce(persisted);
|
||||
it('defaults waitTill to null on JobFinishedProps when the run is not waiting', () => {
|
||||
const jobProcessor = new JobProcessor(
|
||||
logger,
|
||||
executionRepository,
|
||||
executionPersistence,
|
||||
mock<ExecutionRepository>(),
|
||||
mock(),
|
||||
mock(),
|
||||
mock(),
|
||||
mock(),
|
||||
executionsConfig,
|
||||
mock(),
|
||||
);
|
||||
|
||||
const props = await jobProcessor['fetchJobFinishedResult']('exec-1');
|
||||
|
||||
expect(props.waitTill).toBe(waitTill);
|
||||
expect(props.status).toBe('waiting');
|
||||
});
|
||||
|
||||
it('defaults waitTill to null on JobFinishedProps when the run is not waiting', async () => {
|
||||
const executionRepository = mock<ExecutionRepository>();
|
||||
const executionPersistence = mock<ExecutionPersistence>();
|
||||
const jobProcessor = new JobProcessor(
|
||||
logger,
|
||||
executionRepository,
|
||||
executionPersistence,
|
||||
mock(),
|
||||
mock(),
|
||||
mock(),
|
||||
mock(),
|
||||
executionsConfig,
|
||||
mock(),
|
||||
@@ -1041,18 +1017,6 @@ describe('JobProcessor', () => {
|
||||
});
|
||||
run.waitTill = undefined;
|
||||
expect(jobProcessor['deriveJobFinishedProps'](run, new Date()).waitTill).toBeNull();
|
||||
|
||||
const persisted = mock<IExecutionResponse>({
|
||||
status: 'success',
|
||||
stoppedAt: new Date(),
|
||||
data: mock<IRunExecutionData>({
|
||||
resultData: { runData: {}, error: undefined },
|
||||
executionData: undefined,
|
||||
}),
|
||||
});
|
||||
persisted.waitTill = null;
|
||||
executionPersistence.findSingleExecution.mockResolvedValueOnce(persisted);
|
||||
expect((await jobProcessor['fetchJobFinishedResult']('exec-1')).waitTill).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1080,7 +1044,7 @@ describe('JobProcessor', () => {
|
||||
}),
|
||||
);
|
||||
|
||||
const manualExecutionService = mock<ManualExecutionService>();
|
||||
const manualExecutionService = createManualExecutionServiceMock();
|
||||
const jobProcessor = new JobProcessor(
|
||||
logger,
|
||||
executionRepository,
|
||||
@@ -1146,7 +1110,7 @@ describe('JobProcessor', () => {
|
||||
}),
|
||||
);
|
||||
|
||||
const manualExecutionService = mock<ManualExecutionService>();
|
||||
const manualExecutionService = createManualExecutionServiceMock();
|
||||
const jobProcessor = new JobProcessor(
|
||||
logger,
|
||||
executionRepository,
|
||||
|
||||
@@ -304,9 +304,7 @@ export class JobProcessor {
|
||||
throw new ManualExecutionCancelledError(executionId);
|
||||
}
|
||||
|
||||
const props = process.env.N8N_MINIMIZE_EXECUTION_DATA_FETCHING
|
||||
? this.deriveJobFinishedProps(run, startedAt)
|
||||
: await this.fetchJobFinishedResult(executionId);
|
||||
const props = this.deriveJobFinishedProps(run, startedAt);
|
||||
|
||||
this.logger.info(`Worker finished execution ${executionId} (job ${job.id})`, {
|
||||
executionId,
|
||||
@@ -403,31 +401,6 @@ export class JobProcessor {
|
||||
};
|
||||
}
|
||||
|
||||
private async fetchJobFinishedResult(executionId: string): Promise<JobFinishedProps> {
|
||||
const execution = await this.executionPersistence.findSingleExecution(executionId, {
|
||||
includeData: true,
|
||||
unflattenData: true,
|
||||
});
|
||||
|
||||
if (!execution) {
|
||||
throw new UnexpectedError(
|
||||
`Worker failed to find execution ${executionId} immediately after workflow completed`,
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
success: execution.status !== 'error' && execution.data?.resultData?.error === undefined,
|
||||
status: execution.status,
|
||||
error: execution.data?.resultData?.error,
|
||||
startedAt: execution.startedAt,
|
||||
stoppedAt: execution.stoppedAt!,
|
||||
lastNodeExecuted: execution.data?.resultData?.lastNodeExecuted,
|
||||
usedDynamicCredentials: !!execution.data?.executionData?.runtimeData?.credentials,
|
||||
metadata: execution.data?.resultData?.metadata,
|
||||
waitTill: execution.waitTill ?? null,
|
||||
};
|
||||
}
|
||||
|
||||
stopJob(jobId: JobId) {
|
||||
const runningJob = this.runningJobs[jobId];
|
||||
if (!runningJob) return;
|
||||
|
||||
@@ -685,7 +685,6 @@ export class WorkflowRunner {
|
||||
forceFullExecutionData?: boolean,
|
||||
): boolean {
|
||||
if (forceFullExecutionData) return true;
|
||||
if (!process.env.N8N_MINIMIZE_EXECUTION_DATA_FETCHING) return true;
|
||||
|
||||
return (
|
||||
executionMode === 'integrated' ||
|
||||
|
||||
@@ -3175,8 +3175,7 @@ export interface IWorkflowExecutionDataProcess {
|
||||
restartExecutionId?: string;
|
||||
executionMode: WorkflowExecuteMode;
|
||||
/**
|
||||
* When true, forces the execution data to be present in the run data
|
||||
* ignores N8N_MINIMIZE_EXECUTION_DATA_FETCHING environment variable if set
|
||||
* When true, forces the execution data to be present in the run data.
|
||||
*/
|
||||
forceFullExecutionData?: boolean;
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user