mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-19 07:36:52 +00:00
chore(core): Add debug logs to publication outbox processing (no-changelog) (#32519)
This commit is contained in:
@@ -677,7 +677,7 @@ describe('ActiveWorkflowManager', () => {
|
||||
const workflowData = mock<WorkflowEntity>({ id: 'wf-1', name: 'Test Workflow' });
|
||||
const additionalData = mock<IWorkflowExecuteAdditionalData>();
|
||||
const realActiveWorkflowTriggers = new ActiveWorkflowTriggers(
|
||||
mock(),
|
||||
mock<Logger>({ scoped: jest.fn().mockReturnValue(mock<Logger>()) }),
|
||||
scheduledTaskManager,
|
||||
{
|
||||
runPollFunction: async (wf: Workflow, node: INode, pollFunctions: IPollFunctions) =>
|
||||
@@ -754,7 +754,7 @@ describe('ActiveWorkflowManager', () => {
|
||||
mock(),
|
||||
);
|
||||
realActiveWorkflowTriggers = new ActiveWorkflowTriggers(
|
||||
mock(),
|
||||
mock<Logger>({ scoped: jest.fn().mockReturnValue(mock<Logger>()) }),
|
||||
realScheduledTaskManager,
|
||||
mock(),
|
||||
mock(),
|
||||
|
||||
@@ -1016,8 +1016,6 @@ export class ActiveWorkflowManager {
|
||||
getPollFunctions,
|
||||
);
|
||||
|
||||
this.logger.debug(`Added non-webhook triggers for workflow ${formatWorkflow(dbWorkflow)}`);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import type {
|
||||
WorkflowHistoryRepository,
|
||||
WorkflowRepository,
|
||||
} from '@n8n/db';
|
||||
import type { Logger } from '@n8n/backend-common';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type { INode } from 'n8n-workflow';
|
||||
import { WebhookPathTakenError } from 'n8n-workflow';
|
||||
@@ -15,12 +16,15 @@ import { WorkflowPublicationApplier } from '@/workflows/publication/workflow-pub
|
||||
import type { WorkflowTriggerActivator } from '@/workflows/triggers/workflow-trigger-activator';
|
||||
|
||||
describe('WorkflowPublicationApplier', () => {
|
||||
const logger = mock<Logger>();
|
||||
logger.scoped.mockReturnValue(logger);
|
||||
const workflowRepository = mock<WorkflowRepository>();
|
||||
const workflowHistoryRepository = mock<WorkflowHistoryRepository>();
|
||||
const workflowPublishedVersionRepository = mock<WorkflowPublishedVersionRepository>();
|
||||
const workflowTriggerActivator = mock<WorkflowTriggerActivator>();
|
||||
|
||||
const applier = new WorkflowPublicationApplier(
|
||||
logger,
|
||||
workflowRepository,
|
||||
workflowHistoryRepository,
|
||||
workflowPublishedVersionRepository,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { Logger } from '@n8n/backend-common';
|
||||
import {
|
||||
WorkflowEntity,
|
||||
WorkflowHistory,
|
||||
@@ -30,11 +31,14 @@ import {
|
||||
@Service()
|
||||
export class WorkflowPublicationApplier {
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly workflowRepository: WorkflowRepository,
|
||||
private readonly workflowHistoryRepository: WorkflowHistoryRepository,
|
||||
private readonly workflowPublishedVersionRepository: WorkflowPublishedVersionRepository,
|
||||
private readonly workflowTriggerActivator: WorkflowTriggerActivator,
|
||||
) {}
|
||||
) {
|
||||
this.logger = this.logger.scoped('workflow-publication');
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies a single publication outbox record, dispatching to {@link publish}
|
||||
@@ -93,6 +97,16 @@ export class WorkflowPublicationApplier {
|
||||
|
||||
const { toAdd, toRemove } = computeTriggerDiff(oldTriggerNodes, desiredTriggerNodes);
|
||||
|
||||
this.logger.debug(
|
||||
`Calculated trigger diff for workflow publication: ${toAdd.size} to add, ${toRemove.size} to remove`,
|
||||
{
|
||||
workflowId: record.workflowId,
|
||||
publishedVersionId: record.publishedVersionId,
|
||||
toAdd: Array.from(toAdd),
|
||||
toRemove: Array.from(toRemove),
|
||||
},
|
||||
);
|
||||
|
||||
// We also register triggers that are in our desired state that aren't
|
||||
// present locally, even if they aren't in this version diff. This is
|
||||
// necessary for startup/retry/crash recovery.
|
||||
|
||||
@@ -92,8 +92,9 @@ export class WorkflowPublicationOutboxConsumer {
|
||||
private async pollCycle() {
|
||||
const processed = await this.drainPending();
|
||||
|
||||
if (processed > 0) {
|
||||
this.logger.debug(`Processed ${processed} workflow publication outbox record(s)`);
|
||||
// Only log if we processed more than 1 since we log each individual record
|
||||
if (processed > 1) {
|
||||
this.logger.debug(`Processed ${processed} workflow publication outbox records in this cycle`);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,6 +130,12 @@ export class WorkflowPublicationOutboxConsumer {
|
||||
* for a later poll cycle to retry.
|
||||
*/
|
||||
async processRecord(record: WorkflowPublicationOutbox): Promise<void> {
|
||||
this.logger.debug('Started processing workflow publication outbox record', {
|
||||
outboxId: record.id,
|
||||
workflowId: record.workflowId,
|
||||
publishedVersionId: record.publishedVersionId,
|
||||
});
|
||||
|
||||
let result: PublicationResult;
|
||||
|
||||
try {
|
||||
@@ -146,5 +153,11 @@ export class WorkflowPublicationOutboxConsumer {
|
||||
} catch (reportError) {
|
||||
this.errorReporter.error(reportError, { shouldBeLogged: true });
|
||||
}
|
||||
|
||||
this.logger.debug('Finished processing workflow publication outbox record', {
|
||||
outboxId: record.id,
|
||||
workflowId: record.workflowId,
|
||||
result: result.type,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ import type {
|
||||
|
||||
import type { TriggerFailureHandler } from '@/workflows/triggers/trigger-execution-context.factory';
|
||||
import { TriggerExecutionContextFactory } from '@/workflows/triggers/trigger-execution-context.factory';
|
||||
import { formatWorkflow } from '@/workflows/workflow.formatter';
|
||||
|
||||
export interface NonWebhookTriggerRegistrationContext {
|
||||
activationMode: WorkflowActivateMode;
|
||||
@@ -29,7 +28,6 @@ export interface NonWebhookTriggerRegistrationContext {
|
||||
}
|
||||
|
||||
export interface PreparedNonWebhookTriggerRegistration {
|
||||
dbWorkflow: WorkflowEntity;
|
||||
activationMode: WorkflowActivateMode;
|
||||
executionMode: WorkflowExecuteMode;
|
||||
additionalData: IWorkflowExecuteAdditionalData;
|
||||
@@ -47,7 +45,7 @@ export class NonWebhookTriggerRegistrar {
|
||||
private readonly activeWorkflowTriggers: ActiveWorkflowTriggers,
|
||||
private readonly triggerExecutionContextFactory: TriggerExecutionContextFactory,
|
||||
) {
|
||||
this.logger = this.logger.scoped(['workflow-activation']);
|
||||
this.logger = this.logger.scoped('workflow-publication');
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -96,7 +94,6 @@ export class NonWebhookTriggerRegistrar {
|
||||
);
|
||||
|
||||
return {
|
||||
dbWorkflow,
|
||||
activationMode,
|
||||
executionMode,
|
||||
additionalData,
|
||||
@@ -111,7 +108,6 @@ export class NonWebhookTriggerRegistrar {
|
||||
async register(
|
||||
workflow: Workflow,
|
||||
{
|
||||
dbWorkflow,
|
||||
activationMode,
|
||||
executionMode,
|
||||
additionalData,
|
||||
@@ -130,10 +126,6 @@ export class NonWebhookTriggerRegistrar {
|
||||
getTriggerFunctions,
|
||||
getPollFunctions,
|
||||
);
|
||||
|
||||
this.logger.debug(
|
||||
`Added non-webhook trigger "${nodeId}" for workflow ${formatWorkflow(dbWorkflow)}`,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -49,7 +49,7 @@ export class WebhookTriggerRegistrar {
|
||||
private readonly webhookService: WebhookService,
|
||||
private readonly workflowStaticDataService: WorkflowStaticDataService,
|
||||
) {
|
||||
this.logger = this.logger.scoped(['workflow-activation']);
|
||||
this.logger = this.logger.scoped('workflow-publication');
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -113,6 +113,14 @@ export class WebhookTriggerRegistrar {
|
||||
async deregister({ workflow, webhookData }: WebhookTriggerDeregistrationOptions) {
|
||||
await this.webhookService.deleteWebhook(workflow, webhookData, 'internal', 'update');
|
||||
|
||||
this.logger.debug(
|
||||
`Deactivating webhook "${webhookData.node}" for workflow "${workflow.name}"`,
|
||||
{
|
||||
workflow: { id: workflow.id, name: workflow.name },
|
||||
node: { name: webhookData.node, webhookId: webhookData.webhookId },
|
||||
},
|
||||
);
|
||||
|
||||
return webhookData.node;
|
||||
}
|
||||
|
||||
|
||||
@@ -40,6 +40,7 @@ describe('ActiveWorkflowTriggers', () => {
|
||||
|
||||
LoggerProxy.init(mock());
|
||||
const logger = mock<Logger>();
|
||||
logger.scoped.mockReturnValue(logger);
|
||||
const scheduledTaskManager = mock<ScheduledTaskManager>();
|
||||
const triggersAndPollers = mock<TriggersAndPollers>();
|
||||
const errorReporter = mock<ErrorReporter>();
|
||||
@@ -857,6 +858,7 @@ describe('ActiveWorkflowTriggers', () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
realLogger = mock<Logger>();
|
||||
realLogger.scoped.mockReturnValue(realLogger);
|
||||
realScheduledTaskManager = new ScheduledTaskManager(
|
||||
mock<InstanceSettings>({ isLeader: true }),
|
||||
mock<Logger>({ scoped: vi.fn().mockReturnValue(mock<Logger>()) }),
|
||||
|
||||
@@ -41,7 +41,9 @@ export class ActiveWorkflowTriggers {
|
||||
private readonly triggersAndPollers: TriggersAndPollers,
|
||||
private readonly errorReporter: ErrorReporter,
|
||||
private readonly tracing: Tracing,
|
||||
) {}
|
||||
) {
|
||||
this.logger = logger.scoped('workflow-publication');
|
||||
}
|
||||
|
||||
private activeTriggersByWorkflowId = new Map<string, WorkflowActiveTriggersState>();
|
||||
|
||||
@@ -156,6 +158,8 @@ export class ActiveWorkflowTriggers {
|
||||
triggers.add(triggerNode.id, triggerResponse);
|
||||
triggersAddedDuringThisCall.add(triggerNode.id, triggerResponse);
|
||||
triggerNodeIdsAddedDuringThisCall.push(triggerNode.id);
|
||||
|
||||
this.logTriggerActivation(workflow, triggerNode);
|
||||
}
|
||||
} catch (e) {
|
||||
const error = ensureError(e);
|
||||
@@ -195,6 +199,8 @@ export class ActiveWorkflowTriggers {
|
||||
mode,
|
||||
activation,
|
||||
);
|
||||
|
||||
this.logTriggerActivation(workflow, pollNode);
|
||||
} catch (e) {
|
||||
if (!existing) {
|
||||
this.activeTriggersByWorkflowId.delete(workflowId);
|
||||
@@ -241,6 +247,8 @@ export class ActiveWorkflowTriggers {
|
||||
await this.closeTrigger(response, workflowId);
|
||||
}
|
||||
activeTriggers.delete(nodeId);
|
||||
|
||||
this.logTriggerDeactivation(workflowId, nodeId);
|
||||
}
|
||||
|
||||
if (activeTriggers.isEmpty && !this.scheduledTaskManager.hasCrons(workflowId)) {
|
||||
@@ -521,4 +529,28 @@ export class ActiveWorkflowTriggers {
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
private logTriggerActivation(workflow: Workflow, triggerNode: INode) {
|
||||
this.logger.debug(
|
||||
`Activated trigger node "${triggerNode.name}" for workflow "${workflow.name}"`,
|
||||
{
|
||||
workflow: {
|
||||
id: workflow.id,
|
||||
name: workflow.name,
|
||||
},
|
||||
node: {
|
||||
id: triggerNode.id,
|
||||
name: triggerNode.name,
|
||||
type: triggerNode.type,
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
private logTriggerDeactivation(workflowId: Workflow['id'], triggerNodeId: INode['id']) {
|
||||
this.logger.debug(`Deactivated trigger "${triggerNodeId}" for workflow "${workflowId}"`, {
|
||||
workflowId,
|
||||
nodeId: triggerNodeId,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user