mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-19 07:36:52 +00:00
feat(core): Trace workflow publication outbox consumption (no-changelog) (#32587)
This commit is contained in:
+4
-1
@@ -2,7 +2,7 @@ import type { Logger } from '@n8n/backend-common';
|
||||
import type { WorkflowsConfig } from '@n8n/config';
|
||||
import type { WorkflowPublicationOutbox, WorkflowPublicationOutboxRepository } from '@n8n/db';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type { ErrorReporter, InstanceSettings } from 'n8n-core';
|
||||
import type { ErrorReporter, InstanceSettings, Span, Tracing } from 'n8n-core';
|
||||
|
||||
import type { PublicationResult } from '@/workflows/publication/publication-result';
|
||||
import type { PublicationStatusReporter } from '@/workflows/publication/publication-status-reporter';
|
||||
@@ -18,6 +18,7 @@ describe('WorkflowPublicationOutboxConsumer', () => {
|
||||
const outboxRepository = mock<WorkflowPublicationOutboxRepository>();
|
||||
const applier = mock<WorkflowPublicationApplier>();
|
||||
const reporter = mock<PublicationStatusReporter>();
|
||||
const tracing = mock<Tracing>();
|
||||
|
||||
let consumer: WorkflowPublicationOutboxConsumer;
|
||||
|
||||
@@ -37,6 +38,7 @@ describe('WorkflowPublicationOutboxConsumer', () => {
|
||||
reporter,
|
||||
mock<InstanceSettings>({ isLeader }),
|
||||
new WorkflowPublicationLifecycleLock(),
|
||||
tracing,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -58,6 +60,7 @@ describe('WorkflowPublicationOutboxConsumer', () => {
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
jest.useFakeTimers();
|
||||
tracing.startSpan.mockImplementation(async (_opts, spanCb) => await spanCb(mock<Span>()));
|
||||
outboxRepository.claimNextPendingRecord.mockResolvedValue(null);
|
||||
applier.apply.mockResolvedValue({ type: 'completed' });
|
||||
reporter.report.mockResolvedValue(undefined);
|
||||
|
||||
@@ -3,7 +3,7 @@ import { WorkflowsConfig } from '@n8n/config';
|
||||
import { WorkflowPublicationOutbox, WorkflowPublicationOutboxRepository } from '@n8n/db';
|
||||
import { OnShutdown } from '@n8n/decorators';
|
||||
import { Service } from '@n8n/di';
|
||||
import { ErrorReporter, InstanceSettings } from 'n8n-core';
|
||||
import { ErrorReporter, InstanceSettings, SpanStatus, Tracing } from 'n8n-core';
|
||||
import { UnexpectedError, ensureError } from 'n8n-workflow';
|
||||
|
||||
import type { PublicationResult } from '@/workflows/publication/publication-result';
|
||||
@@ -39,6 +39,7 @@ export class WorkflowPublicationOutboxConsumer {
|
||||
private readonly reporter: PublicationStatusReporter,
|
||||
private readonly instanceSettings: InstanceSettings,
|
||||
private readonly lifecycleLock: WorkflowPublicationLifecycleLock,
|
||||
private readonly tracing: Tracing,
|
||||
) {
|
||||
this.logger = this.logger.scoped('workflow-publication');
|
||||
}
|
||||
@@ -126,16 +127,23 @@ export class WorkflowPublicationOutboxConsumer {
|
||||
}
|
||||
|
||||
private async runDrain(): Promise<number> {
|
||||
let processed = 0;
|
||||
while (this.shouldKeepPolling()) {
|
||||
const record = await this.outboxRepository.claimNextPendingRecord();
|
||||
if (!record) break;
|
||||
return await this.tracing.startSpan(
|
||||
{ name: 'Publication outbox drain', op: 'publication.outbox.drain' },
|
||||
async (span) => {
|
||||
let processed = 0;
|
||||
while (this.shouldKeepPolling()) {
|
||||
const record = await this.outboxRepository.claimNextPendingRecord();
|
||||
if (!record) break;
|
||||
|
||||
await this.processRecord(record);
|
||||
processed++;
|
||||
}
|
||||
await this.processRecord(record);
|
||||
processed++;
|
||||
}
|
||||
|
||||
return processed;
|
||||
span.setAttribute('n8n.publication.records_processed', processed);
|
||||
span.setStatus({ code: SpanStatus.ok });
|
||||
return processed;
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
private shouldKeepPolling() {
|
||||
@@ -156,48 +164,65 @@ export class WorkflowPublicationOutboxConsumer {
|
||||
* to the queue (so the new leader reprocesses it) and nothing is applied here.
|
||||
*/
|
||||
async processRecord(record: WorkflowPublicationOutbox): Promise<void> {
|
||||
await this.lifecycleLock.runExclusive(record.workflowId, async () => {
|
||||
// A record claimed while leader can reach here after stepdown (e.g. while
|
||||
// waiting on the lock during teardown). Activating triggers now would leave
|
||||
// them running on a demoted instance, so hand the record back to the queue.
|
||||
if (!this.instanceSettings.isLeader) {
|
||||
await this.outboxRepository.returnToPending(record.id);
|
||||
this.logger.debug('Returned publication outbox record to queue: no longer leader', {
|
||||
outboxId: record.id,
|
||||
workflowId: record.workflowId,
|
||||
await this.tracing.startSpan(
|
||||
{
|
||||
name: 'Publication outbox record',
|
||||
op: 'publication.outbox.process_record',
|
||||
attributes: {
|
||||
...this.tracing.pickWorkflowAttributes({ id: record.workflowId }),
|
||||
'n8n.publication.outbox_id': record.id,
|
||||
'n8n.publication.published_version_id': record.publishedVersionId,
|
||||
},
|
||||
},
|
||||
async (span) => {
|
||||
await this.lifecycleLock.runExclusive(record.workflowId, async () => {
|
||||
// A record claimed while leader can reach here after stepdown (e.g. while
|
||||
// waiting on the lock during teardown). Activating triggers now would leave
|
||||
// them running on a demoted instance, so hand the record back to the queue.
|
||||
if (!this.instanceSettings.isLeader) {
|
||||
await this.outboxRepository.returnToPending(record.id);
|
||||
this.logger.debug('Returned publication outbox record to queue: no longer leader', {
|
||||
outboxId: record.id,
|
||||
workflowId: record.workflowId,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.debug('Started processing workflow publication outbox record', {
|
||||
outboxId: record.id,
|
||||
workflowId: record.workflowId,
|
||||
publishedVersionId: record.publishedVersionId,
|
||||
});
|
||||
|
||||
let result: PublicationResult;
|
||||
|
||||
try {
|
||||
result = await this.applier.apply(record);
|
||||
} catch (error) {
|
||||
const cause = ensureError(error);
|
||||
result = {
|
||||
type: 'failed',
|
||||
error: new UnexpectedError(`Unexpected: ${cause.message}`, { cause }),
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
await this.reporter.report(record, result);
|
||||
} catch (reportError) {
|
||||
this.errorReporter.error(reportError, { shouldBeLogged: true });
|
||||
}
|
||||
|
||||
this.logger.debug('Finished processing workflow publication outbox record', {
|
||||
outboxId: record.id,
|
||||
workflowId: record.workflowId,
|
||||
result: result.type,
|
||||
});
|
||||
|
||||
span.setAttribute('n8n.publication.result', result.type);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.debug('Started processing workflow publication outbox record', {
|
||||
outboxId: record.id,
|
||||
workflowId: record.workflowId,
|
||||
publishedVersionId: record.publishedVersionId,
|
||||
});
|
||||
|
||||
let result: PublicationResult;
|
||||
|
||||
try {
|
||||
result = await this.applier.apply(record);
|
||||
} catch (error) {
|
||||
const cause = ensureError(error);
|
||||
result = {
|
||||
type: 'failed',
|
||||
error: new UnexpectedError(`Unexpected: ${cause.message}`, { cause }),
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
await this.reporter.report(record, result);
|
||||
} catch (reportError) {
|
||||
this.errorReporter.error(reportError, { shouldBeLogged: true });
|
||||
}
|
||||
|
||||
this.logger.debug('Finished processing workflow publication outbox record', {
|
||||
outboxId: record.id,
|
||||
workflowId: record.workflowId,
|
||||
result: result.type,
|
||||
});
|
||||
});
|
||||
span.setStatus({ code: SpanStatus.ok });
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
+18
-3
@@ -1,7 +1,7 @@
|
||||
/* eslint-disable @typescript-eslint/unbound-method */
|
||||
import type { WorkflowEntity } from '@n8n/db';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type { ActiveWorkflowTriggers } from 'n8n-core';
|
||||
import type { ActiveWorkflowTriggers, Span, Tracing } from 'n8n-core';
|
||||
import type { IWorkflowBase, IWorkflowExecuteAdditionalData } from 'n8n-workflow';
|
||||
|
||||
import { NonWebhookTriggerRegistrar } from '@/workflows/triggers/non-webhook-trigger-registrar';
|
||||
@@ -10,9 +10,12 @@ import type { TriggerExecutionContextFactory } from '@/workflows/triggers/trigge
|
||||
import { createWorkflow, logger, node } from './trigger-test-utils';
|
||||
|
||||
describe('NonWebhookTriggerRegistrar', () => {
|
||||
const tracing = mock<Tracing>();
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
jest.restoreAllMocks();
|
||||
tracing.startSpan.mockImplementation(async (_opts, spanCb) => await spanCb(mock<Span>()));
|
||||
});
|
||||
|
||||
test('resolves trigger and poll node ids', () => {
|
||||
@@ -20,6 +23,7 @@ describe('NonWebhookTriggerRegistrar', () => {
|
||||
logger,
|
||||
mock<ActiveWorkflowTriggers>(),
|
||||
mock<TriggerExecutionContextFactory>(),
|
||||
tracing,
|
||||
);
|
||||
const workflow = createWorkflow([
|
||||
node('trigger-a', 'trigger'),
|
||||
@@ -37,7 +41,12 @@ describe('NonWebhookTriggerRegistrar', () => {
|
||||
const getPollFunctions = jest.fn();
|
||||
factory.getExecuteTriggerFunctions.mockReturnValue(getTriggerFunctions);
|
||||
factory.getExecutePollFunctions.mockReturnValue(getPollFunctions);
|
||||
const registrar = new NonWebhookTriggerRegistrar(logger, activeWorkflowTriggers, factory);
|
||||
const registrar = new NonWebhookTriggerRegistrar(
|
||||
logger,
|
||||
activeWorkflowTriggers,
|
||||
factory,
|
||||
tracing,
|
||||
);
|
||||
const workflow = createWorkflow([node('trigger-a', 'trigger'), node('poll-a', 'poll')]);
|
||||
const additionalData = mock<IWorkflowExecuteAdditionalData>();
|
||||
const dbWorkflow = mock<WorkflowEntity>({ id: 'wf-1', name: 'Test workflow' });
|
||||
@@ -70,6 +79,7 @@ describe('NonWebhookTriggerRegistrar', () => {
|
||||
logger,
|
||||
activeWorkflowTriggers,
|
||||
mock<TriggerExecutionContextFactory>(),
|
||||
tracing,
|
||||
);
|
||||
|
||||
await registrar.deregister('wf-1', 'poll-a');
|
||||
@@ -82,7 +92,12 @@ describe('NonWebhookTriggerRegistrar', () => {
|
||||
const factory = mock<TriggerExecutionContextFactory>();
|
||||
factory.getExecuteTriggerFunctions.mockReturnValue(jest.fn());
|
||||
factory.getExecutePollFunctions.mockReturnValue(jest.fn());
|
||||
const registrar = new NonWebhookTriggerRegistrar(logger, activeWorkflowTriggers, factory);
|
||||
const registrar = new NonWebhookTriggerRegistrar(
|
||||
logger,
|
||||
activeWorkflowTriggers,
|
||||
factory,
|
||||
tracing,
|
||||
);
|
||||
const workflow = createWorkflow([node('trigger-a', 'trigger')]);
|
||||
const context = {
|
||||
activationMode: 'update' as const,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
/* eslint-disable @typescript-eslint/unbound-method */
|
||||
import type { WebhookEntity } from '@n8n/db';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type { ErrorReporter } from 'n8n-core';
|
||||
import type { ErrorReporter, Span, Tracing } from 'n8n-core';
|
||||
import type { IWebhookData, IWorkflowExecuteAdditionalData } from 'n8n-workflow';
|
||||
import { WebhookPathTakenError } from 'n8n-workflow';
|
||||
|
||||
@@ -13,9 +13,12 @@ import type { WorkflowStaticDataService } from '@/workflows/workflow-static-data
|
||||
import { createWorkflow, logger, node } from './trigger-test-utils';
|
||||
|
||||
describe('WebhookTriggerRegistrar', () => {
|
||||
const tracing = mock<Tracing>();
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
jest.restoreAllMocks();
|
||||
tracing.startSpan.mockImplementation(async (_opts, spanCb) => await spanCb(mock<Span>()));
|
||||
});
|
||||
|
||||
test('resolves workflow webhook definitions', () => {
|
||||
@@ -24,6 +27,7 @@ describe('WebhookTriggerRegistrar', () => {
|
||||
mock<ErrorReporter>(),
|
||||
mock<WebhookService>(),
|
||||
mock<WorkflowStaticDataService>(),
|
||||
tracing,
|
||||
);
|
||||
const additionalData = mock<IWorkflowExecuteAdditionalData>();
|
||||
const webhookData = mock<IWebhookData>({ node: 'Webhook' });
|
||||
@@ -47,6 +51,7 @@ describe('WebhookTriggerRegistrar', () => {
|
||||
mock<ErrorReporter>(),
|
||||
webhookService,
|
||||
workflowStaticDataService,
|
||||
tracing,
|
||||
);
|
||||
const webhookEntity = { webhookPath: '/team/:id/', node: 'Webhook' } as WebhookEntity;
|
||||
webhookService.createWebhook.mockReturnValue(webhookEntity);
|
||||
@@ -91,6 +96,7 @@ describe('WebhookTriggerRegistrar', () => {
|
||||
mock<ErrorReporter>(),
|
||||
webhookService,
|
||||
workflowStaticDataService,
|
||||
tracing,
|
||||
);
|
||||
webhookService.createWebhook.mockImplementation(
|
||||
(data) => ({ webhookPath: data.webhookPath, node: data.node }) as WebhookEntity,
|
||||
@@ -136,6 +142,7 @@ describe('WebhookTriggerRegistrar', () => {
|
||||
mock<ErrorReporter>(),
|
||||
webhookService,
|
||||
workflowStaticDataService,
|
||||
tracing,
|
||||
);
|
||||
webhookService.createWebhook.mockImplementation(
|
||||
(data) => ({ webhookPath: data.webhookPath, node: data.node }) as WebhookEntity,
|
||||
@@ -181,6 +188,7 @@ describe('WebhookTriggerRegistrar', () => {
|
||||
mock<ErrorReporter>(),
|
||||
webhookService,
|
||||
workflowStaticDataService,
|
||||
tracing,
|
||||
);
|
||||
const webhookEntity = { webhookPath: 'taken', node: 'Webhook' } as WebhookEntity;
|
||||
webhookService.createWebhook.mockReturnValue(webhookEntity);
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
import type { WorkflowsConfig } from '@n8n/config';
|
||||
import type { IWorkflowDb, WorkflowEntity, WorkflowRepository } from '@n8n/db';
|
||||
import { mock, type MockProxy } from 'jest-mock-extended';
|
||||
import type { ErrorReporter } from 'n8n-core';
|
||||
import type { ErrorReporter, Span, Tracing } from 'n8n-core';
|
||||
import type { IWebhookData, IWorkflowExecuteAdditionalData } from 'n8n-workflow';
|
||||
import { WebhookPathTakenError, WorkflowActivationError, WorkflowExpression } from 'n8n-workflow';
|
||||
|
||||
@@ -30,6 +30,8 @@ const MAX_ATTEMPTS = TRIGGER_ACTIVATION_MAX_ATTEMPTS;
|
||||
|
||||
const flushPromises = async () => await new Promise((resolve) => setImmediate(resolve));
|
||||
|
||||
const tracing = mock<Tracing>();
|
||||
|
||||
type ActivatorOverrides = {
|
||||
errorReporter?: ErrorReporter;
|
||||
nodeTypes?: ReturnType<typeof createNodeTypes>;
|
||||
@@ -41,6 +43,7 @@ type ActivatorOverrides = {
|
||||
nonWebhookTriggerRegistrar?: NonWebhookTriggerRegistrar;
|
||||
triggerCountService?: TriggerCountService;
|
||||
activationErrorsService?: ActivationErrorsService;
|
||||
tracing?: Tracing;
|
||||
};
|
||||
|
||||
function buildActivator(overrides: ActivatorOverrides = {}) {
|
||||
@@ -56,6 +59,7 @@ function buildActivator(overrides: ActivatorOverrides = {}) {
|
||||
overrides.nonWebhookTriggerRegistrar ?? mock<NonWebhookTriggerRegistrar>(),
|
||||
overrides.triggerCountService ?? mock<TriggerCountService>(),
|
||||
overrides.activationErrorsService ?? mock<ActivationErrorsService>(),
|
||||
overrides.tracing ?? tracing,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -63,6 +67,7 @@ describe('WorkflowTriggerActivator', () => {
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
jest.restoreAllMocks();
|
||||
tracing.startSpan.mockImplementation(async (_opts, spanCb) => await spanCb(mock<Span>()));
|
||||
});
|
||||
|
||||
test('requires workflow publication service to be enabled', () => {
|
||||
|
||||
@@ -3,6 +3,8 @@ import type { WorkflowEntity } from '@n8n/db';
|
||||
import { Service } from '@n8n/di';
|
||||
import {
|
||||
ActiveWorkflowTriggers,
|
||||
SpanStatus,
|
||||
Tracing,
|
||||
type IGetExecutePollFunctions,
|
||||
type IGetExecuteTriggerFunctions,
|
||||
} from 'n8n-core';
|
||||
@@ -44,6 +46,7 @@ export class NonWebhookTriggerRegistrar {
|
||||
private readonly logger: Logger,
|
||||
private readonly activeWorkflowTriggers: ActiveWorkflowTriggers,
|
||||
private readonly triggerExecutionContextFactory: TriggerExecutionContextFactory,
|
||||
private readonly tracing: Tracing,
|
||||
) {
|
||||
this.logger = this.logger.scoped('workflow-publication');
|
||||
}
|
||||
@@ -116,15 +119,31 @@ export class NonWebhookTriggerRegistrar {
|
||||
}: PreparedNonWebhookTriggerRegistration,
|
||||
nodeId: INode['id'],
|
||||
) {
|
||||
await this.activeWorkflowTriggers.addTriggers(
|
||||
workflow.id,
|
||||
workflow,
|
||||
[nodeId],
|
||||
additionalData,
|
||||
executionMode,
|
||||
activationMode,
|
||||
getTriggerFunctions,
|
||||
getPollFunctions,
|
||||
await this.tracing.startSpan(
|
||||
{
|
||||
name: 'Non-webhook trigger register',
|
||||
op: 'publication.non_webhook.register',
|
||||
attributes: {
|
||||
...this.tracing.pickWorkflowAttributes({ id: workflow.id, name: workflow.name }),
|
||||
...this.tracing.pickNodeAttributes({ id: nodeId }),
|
||||
'n8n.publication.activation_mode': activationMode,
|
||||
'n8n.publication.execution_mode': executionMode,
|
||||
},
|
||||
},
|
||||
async (span) => {
|
||||
await this.activeWorkflowTriggers.addTriggers(
|
||||
workflow.id,
|
||||
workflow,
|
||||
[nodeId],
|
||||
additionalData,
|
||||
executionMode,
|
||||
activationMode,
|
||||
getTriggerFunctions,
|
||||
getPollFunctions,
|
||||
);
|
||||
|
||||
span.setStatus({ code: SpanStatus.ok });
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
@@ -132,6 +151,20 @@ export class NonWebhookTriggerRegistrar {
|
||||
* Deregister one active, poll, or schedule trigger node from memory.
|
||||
*/
|
||||
async deregister(workflowId: WorkflowId, nodeId: INode['id']) {
|
||||
await this.activeWorkflowTriggers.removeTriggers(workflowId, new Set([nodeId]));
|
||||
await this.tracing.startSpan(
|
||||
{
|
||||
name: 'Non-webhook trigger deregister',
|
||||
op: 'publication.non_webhook.deregister',
|
||||
attributes: {
|
||||
...this.tracing.pickWorkflowAttributes({ id: workflowId }),
|
||||
...this.tracing.pickNodeAttributes({ id: nodeId }),
|
||||
},
|
||||
},
|
||||
async (span) => {
|
||||
await this.activeWorkflowTriggers.removeTriggers(workflowId, new Set([nodeId]));
|
||||
|
||||
span.setStatus({ code: SpanStatus.ok });
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ import type {
|
||||
} from 'n8n-workflow';
|
||||
import { WebhookPathTakenError, Workflow, ensureError } from 'n8n-workflow';
|
||||
|
||||
import { ErrorReporter } from 'n8n-core';
|
||||
import { ErrorReporter, SpanStatus, Tracing } from 'n8n-core';
|
||||
|
||||
import * as WebhookHelpers from '@/webhooks/webhook-helpers';
|
||||
import { WebhookService } from '@/webhooks/webhook.service';
|
||||
@@ -48,6 +48,7 @@ export class WebhookTriggerRegistrar {
|
||||
private readonly errorReporter: ErrorReporter,
|
||||
private readonly webhookService: WebhookService,
|
||||
private readonly workflowStaticDataService: WorkflowStaticDataService,
|
||||
private readonly tracing: Tracing,
|
||||
) {
|
||||
this.logger = this.logger.scoped('workflow-publication');
|
||||
}
|
||||
@@ -63,65 +64,99 @@ export class WebhookTriggerRegistrar {
|
||||
* Register one workflow-defined webhook in storage and with third-party services.
|
||||
*/
|
||||
async register({ workflow, webhookData, mode, activation }: WebhookTriggerRegistrationOptions) {
|
||||
const node = workflow.getNode(webhookData.node) as INode;
|
||||
node.name = webhookData.node;
|
||||
await this.tracing.startSpan(
|
||||
{
|
||||
name: 'Webhook trigger register',
|
||||
op: 'publication.webhook.register',
|
||||
attributes: {
|
||||
...this.tracing.pickWorkflowAttributes({ id: workflow.id, name: workflow.name }),
|
||||
...this.tracing.pickNodeAttributes({ name: webhookData.node }),
|
||||
'n8n.webhook.path': webhookData.path,
|
||||
'n8n.webhook.method': webhookData.httpMethod,
|
||||
},
|
||||
},
|
||||
async (span) => {
|
||||
const node = workflow.getNode(webhookData.node) as INode;
|
||||
node.name = webhookData.node;
|
||||
|
||||
const webhook = this.webhookService.createWebhook({
|
||||
workflowId: webhookData.workflowId,
|
||||
webhookPath: webhookData.path,
|
||||
node: node.name,
|
||||
method: webhookData.httpMethod,
|
||||
});
|
||||
const webhook = this.webhookService.createWebhook({
|
||||
workflowId: webhookData.workflowId,
|
||||
webhookPath: webhookData.path,
|
||||
node: node.name,
|
||||
method: webhookData.httpMethod,
|
||||
});
|
||||
|
||||
this.normalizeWebhookPath(webhook, node.webhookId);
|
||||
this.normalizeWebhookPath(webhook, node.webhookId);
|
||||
|
||||
let isStored = false;
|
||||
try {
|
||||
// `storeWebhook` registers the webhook atomically on the
|
||||
// (webhookPath, method) primary key and rejects a path already owned
|
||||
// by another workflow.
|
||||
await this.webhookService.storeWebhook(webhook);
|
||||
isStored = true;
|
||||
await this.webhookService.createWebhookIfNotExists(workflow, webhookData, mode, activation);
|
||||
} catch (error) {
|
||||
if (isStored) await this.clearRegisteredWebhook(workflow, webhookData);
|
||||
let isStored = false;
|
||||
try {
|
||||
// `storeWebhook` registers the webhook atomically on the
|
||||
// (webhookPath, method) primary key and rejects a path already owned
|
||||
// by another workflow.
|
||||
await this.webhookService.storeWebhook(webhook);
|
||||
isStored = true;
|
||||
await this.webhookService.createWebhookIfNotExists(
|
||||
workflow,
|
||||
webhookData,
|
||||
mode,
|
||||
activation,
|
||||
);
|
||||
} catch (error) {
|
||||
if (isStored) await this.clearRegisteredWebhook(workflow, webhookData);
|
||||
|
||||
// If it's a workflow from the insert.
|
||||
// TODO check if there is standard error code for duplicate key violation that works
|
||||
// with all databases.
|
||||
if (isQueryFailedError(error)) {
|
||||
throw new WebhookPathTakenError(webhook.node, error);
|
||||
}
|
||||
// If it's a workflow from the insert.
|
||||
// TODO check if there is standard error code for duplicate key violation that works
|
||||
// with all databases.
|
||||
if (isQueryFailedError(error)) {
|
||||
throw new WebhookPathTakenError(webhook.node, error);
|
||||
}
|
||||
|
||||
if (hasErrorDetail(error)) {
|
||||
// It's an error running the webhook methods (checkExists, create).
|
||||
error.message = error.detail;
|
||||
}
|
||||
if (hasErrorDetail(error)) {
|
||||
// It's an error running the webhook methods (checkExists, create).
|
||||
error.message = error.detail;
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
|
||||
this.logger.debug(`Added webhook "${webhookData.node}" for workflow "${workflow.name}"`, {
|
||||
workflowId: workflow.id,
|
||||
nodeName: webhookData.node,
|
||||
});
|
||||
this.logger.debug(`Added webhook "${webhookData.node}" for workflow "${workflow.name}"`, {
|
||||
workflowId: workflow.id,
|
||||
nodeName: webhookData.node,
|
||||
});
|
||||
|
||||
span.setStatus({ code: SpanStatus.ok });
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deregister one workflow-defined webhook from external services.
|
||||
*/
|
||||
async deregister({ workflow, webhookData }: WebhookTriggerDeregistrationOptions) {
|
||||
await this.webhookService.deleteWebhook(workflow, webhookData, 'internal', 'update');
|
||||
|
||||
this.logger.debug(
|
||||
`Deactivating webhook "${webhookData.node}" for workflow "${workflow.name}"`,
|
||||
return await this.tracing.startSpan(
|
||||
{
|
||||
workflow: { id: workflow.id, name: workflow.name },
|
||||
node: { name: webhookData.node, webhookId: webhookData.webhookId },
|
||||
name: 'Webhook trigger deregister',
|
||||
op: 'publication.webhook.deregister',
|
||||
attributes: {
|
||||
...this.tracing.pickWorkflowAttributes({ id: workflow.id, name: workflow.name }),
|
||||
...this.tracing.pickNodeAttributes({ name: webhookData.node }),
|
||||
},
|
||||
},
|
||||
async (span) => {
|
||||
await this.webhookService.deleteWebhook(workflow, webhookData, 'internal', 'update');
|
||||
|
||||
this.logger.debug(
|
||||
`Deactivating webhook "${webhookData.node}" for workflow "${workflow.name}"`,
|
||||
{
|
||||
workflow: { id: workflow.id, name: workflow.name },
|
||||
node: { name: webhookData.node, webhookId: webhookData.webhookId },
|
||||
},
|
||||
);
|
||||
|
||||
span.setStatus({ code: SpanStatus.ok });
|
||||
return webhookData.node;
|
||||
},
|
||||
);
|
||||
|
||||
return webhookData.node;
|
||||
}
|
||||
|
||||
async clearWorkflowWebhooksForNodes(workflowId: string, nodeNames: string[]) {
|
||||
|
||||
@@ -5,7 +5,7 @@ import { WorkflowsConfig } from '@n8n/config';
|
||||
import type { IWorkflowDb, WorkflowEntity } from '@n8n/db';
|
||||
import { WorkflowRepository } from '@n8n/db';
|
||||
import { Service } from '@n8n/di';
|
||||
import { ErrorReporter } from 'n8n-core';
|
||||
import { ErrorReporter, SpanStatus, Tracing } from 'n8n-core';
|
||||
import type {
|
||||
IConnections,
|
||||
INode,
|
||||
@@ -69,6 +69,7 @@ export class WorkflowTriggerActivator {
|
||||
private readonly nonWebhookTriggerRegistrar: NonWebhookTriggerRegistrar,
|
||||
private readonly triggerCountService: TriggerCountService,
|
||||
private readonly activationErrorsService: ActivationErrorsService,
|
||||
private readonly tracing: Tracing,
|
||||
) {
|
||||
assert(
|
||||
this.workflowsConfig.useWorkflowPublicationService,
|
||||
@@ -159,43 +160,62 @@ export class WorkflowTriggerActivator {
|
||||
version: WorkflowTriggerVersion,
|
||||
nodeIds: Set<INode['id']>,
|
||||
): Promise<TriggerActivationOutcome> {
|
||||
this.applyVersionToDbWorkflow(dbWorkflow, version);
|
||||
const workflow = this.createWorkflow(dbWorkflow);
|
||||
return await this.tracing.startSpan(
|
||||
{
|
||||
name: 'Trigger activation',
|
||||
op: 'publication.trigger.activate',
|
||||
attributes: {
|
||||
...this.tracing.pickWorkflowAttributes(dbWorkflow),
|
||||
'n8n.publication.nodes_requested': nodeIds.size,
|
||||
},
|
||||
},
|
||||
async (span) => {
|
||||
this.applyVersionToDbWorkflow(dbWorkflow, version);
|
||||
const workflow = this.createWorkflow(dbWorkflow);
|
||||
|
||||
const additionalData = await WorkflowExecuteAdditionalData.getBase({
|
||||
workflowId: workflow.id,
|
||||
workflowSettings: dbWorkflow.settings,
|
||||
});
|
||||
const additionalData = await WorkflowExecuteAdditionalData.getBase({
|
||||
workflowId: workflow.id,
|
||||
workflowSettings: dbWorkflow.settings,
|
||||
});
|
||||
|
||||
const outcome: TriggerActivationOutcome = { activated: [], failures: [] };
|
||||
const outcome: TriggerActivationOutcome = { activated: [], failures: [] };
|
||||
|
||||
let triggerCount = 0;
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
await this.registerWebhookTriggers(workflow, additionalData, nodeIds, outcome);
|
||||
let triggerCount = 0;
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
await this.registerWebhookTriggers(workflow, additionalData, nodeIds, outcome);
|
||||
|
||||
const resolveWorkflowData = this.createWorkflowDataResolver(dbWorkflow);
|
||||
const resolveWorkflowData = this.createWorkflowDataResolver(dbWorkflow);
|
||||
|
||||
await this.registerNonWebhookTriggers(
|
||||
dbWorkflow,
|
||||
workflow,
|
||||
additionalData,
|
||||
resolveWorkflowData,
|
||||
nodeIds,
|
||||
outcome,
|
||||
);
|
||||
await this.registerNonWebhookTriggers(
|
||||
dbWorkflow,
|
||||
workflow,
|
||||
additionalData,
|
||||
resolveWorkflowData,
|
||||
nodeIds,
|
||||
outcome,
|
||||
);
|
||||
|
||||
triggerCount = this.triggerCountService.count(workflow, additionalData);
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
triggerCount = this.triggerCountService.count(workflow, additionalData);
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
|
||||
await Promise.all([
|
||||
this.workflowRepository.updateWorkflowTriggerCount(workflow.id, triggerCount),
|
||||
this.workflowStaticDataService.saveStaticData(workflow),
|
||||
]);
|
||||
await Promise.all([
|
||||
this.workflowRepository.updateWorkflowTriggerCount(workflow.id, triggerCount),
|
||||
this.workflowStaticDataService.saveStaticData(workflow),
|
||||
]);
|
||||
|
||||
return outcome;
|
||||
span.setAttributes({
|
||||
'n8n.publication.nodes_activated': outcome.activated.length,
|
||||
'n8n.publication.nodes_failed': outcome.failures.length,
|
||||
'n8n.publication.trigger_count': triggerCount,
|
||||
});
|
||||
span.setStatus({ code: SpanStatus.ok });
|
||||
|
||||
return outcome;
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -210,25 +230,39 @@ export class WorkflowTriggerActivator {
|
||||
) {
|
||||
if (nodeIds.size === 0) return;
|
||||
|
||||
this.applyVersionToDbWorkflow(dbWorkflow, version);
|
||||
const workflow = this.createWorkflow(dbWorkflow);
|
||||
await this.tracing.startSpan(
|
||||
{
|
||||
name: 'Trigger deactivation',
|
||||
op: 'publication.trigger.deactivate',
|
||||
attributes: {
|
||||
...this.tracing.pickWorkflowAttributes(dbWorkflow),
|
||||
'n8n.publication.nodes_requested': nodeIds.size,
|
||||
},
|
||||
},
|
||||
async (span) => {
|
||||
this.applyVersionToDbWorkflow(dbWorkflow, version);
|
||||
const workflow = this.createWorkflow(dbWorkflow);
|
||||
|
||||
const additionalData = await WorkflowExecuteAdditionalData.getBase({
|
||||
workflowId: workflow.id,
|
||||
workflowSettings: dbWorkflow.settings,
|
||||
});
|
||||
const additionalData = await WorkflowExecuteAdditionalData.getBase({
|
||||
workflowId: workflow.id,
|
||||
workflowSettings: dbWorkflow.settings,
|
||||
});
|
||||
|
||||
const removedNodeNames = await this.deregisterWebhookTriggers(
|
||||
workflow,
|
||||
additionalData,
|
||||
nodeIds,
|
||||
const removedNodeNames = await this.deregisterWebhookTriggers(
|
||||
workflow,
|
||||
additionalData,
|
||||
nodeIds,
|
||||
);
|
||||
await this.webhookTriggerRegistrar.clearWorkflowWebhooksForNodes(
|
||||
dbWorkflow.id,
|
||||
removedNodeNames,
|
||||
);
|
||||
|
||||
await this.deregisterNonWebhookTriggers(dbWorkflow.id, workflow, nodeIds);
|
||||
|
||||
span.setStatus({ code: SpanStatus.ok });
|
||||
},
|
||||
);
|
||||
await this.webhookTriggerRegistrar.clearWorkflowWebhooksForNodes(
|
||||
dbWorkflow.id,
|
||||
removedNodeNames,
|
||||
);
|
||||
|
||||
await this.deregisterNonWebhookTriggers(dbWorkflow.id, workflow, nodeIds);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -236,23 +270,35 @@ export class WorkflowTriggerActivator {
|
||||
* registering any triggers. Used when publication only removes triggers.
|
||||
*/
|
||||
async updateTriggerCount(dbWorkflow: WorkflowEntity, version: WorkflowTriggerVersion) {
|
||||
this.applyVersionToDbWorkflow(dbWorkflow, version);
|
||||
const workflow = this.createWorkflow(dbWorkflow);
|
||||
await this.tracing.startSpan(
|
||||
{
|
||||
name: 'Trigger count update',
|
||||
op: 'publication.trigger.update_count',
|
||||
attributes: this.tracing.pickWorkflowAttributes(dbWorkflow),
|
||||
},
|
||||
async (span) => {
|
||||
this.applyVersionToDbWorkflow(dbWorkflow, version);
|
||||
const workflow = this.createWorkflow(dbWorkflow);
|
||||
|
||||
const additionalData = await WorkflowExecuteAdditionalData.getBase({
|
||||
workflowId: workflow.id,
|
||||
workflowSettings: dbWorkflow.settings,
|
||||
});
|
||||
const additionalData = await WorkflowExecuteAdditionalData.getBase({
|
||||
workflowId: workflow.id,
|
||||
workflowSettings: dbWorkflow.settings,
|
||||
});
|
||||
|
||||
let triggerCount = 0;
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
triggerCount = this.triggerCountService.count(workflow, additionalData);
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
let triggerCount = 0;
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
triggerCount = this.triggerCountService.count(workflow, additionalData);
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
|
||||
await this.workflowRepository.updateWorkflowTriggerCount(workflow.id, triggerCount);
|
||||
await this.workflowRepository.updateWorkflowTriggerCount(workflow.id, triggerCount);
|
||||
|
||||
span.setAttribute('n8n.publication.trigger_count', triggerCount);
|
||||
span.setStatus({ code: SpanStatus.ok });
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
private applyVersionToDbWorkflow(dbWorkflow: WorkflowEntity, version: WorkflowTriggerVersion) {
|
||||
|
||||
Reference in New Issue
Block a user