refactor(core): Register multi-main handlers reactively (#32457)

This commit is contained in:
Tomi Turtiainen
2026-06-18 12:36:04 +03:00
committed by GitHub
parent 029905890d
commit 753c4f25fc
10 changed files with 174 additions and 101 deletions
@@ -0,0 +1,73 @@
import {
MultiMainMetadata,
LEADER_TAKEOVER_EVENT_NAME,
LEADER_STEPDOWN_EVENT_NAME,
type MultiMainEventHandler,
} from '../multi-main-metadata';
class FirstService {
onTakeover() {}
}
class SecondService {
onStepdown() {}
}
const takeoverHandler: MultiMainEventHandler = {
eventHandlerClass: FirstService as unknown as MultiMainEventHandler['eventHandlerClass'],
methodName: 'onTakeover',
eventName: LEADER_TAKEOVER_EVENT_NAME,
};
const stepdownHandler: MultiMainEventHandler = {
eventHandlerClass: SecondService as unknown as MultiMainEventHandler['eventHandlerClass'],
methodName: 'onStepdown',
eventName: LEADER_STEPDOWN_EVENT_NAME,
};
describe('MultiMainMetadata', () => {
let metadata: MultiMainMetadata;
beforeEach(() => {
metadata = new MultiMainMetadata();
});
it('should replay handlers registered before subscribe()', () => {
metadata.register(takeoverHandler);
const listener = vi.fn();
metadata.subscribe(listener);
expect(listener).toHaveBeenCalledTimes(1);
expect(listener).toHaveBeenCalledWith(takeoverHandler);
});
it('should notify on handlers registered after subscribe()', () => {
const listener = vi.fn();
metadata.subscribe(listener);
metadata.register(stepdownHandler);
expect(listener).toHaveBeenCalledTimes(1);
expect(listener).toHaveBeenCalledWith(stepdownHandler);
});
it('should replay existing handlers then notify on subsequent ones', () => {
metadata.register(takeoverHandler);
const listener = vi.fn();
metadata.subscribe(listener);
metadata.register(stepdownHandler);
expect(listener).toHaveBeenCalledTimes(2);
expect(listener).toHaveBeenNthCalledWith(1, takeoverHandler);
expect(listener).toHaveBeenNthCalledWith(2, stepdownHandler);
});
it('should not notify before subscribe()', () => {
const listener = vi.fn();
metadata.register(takeoverHandler);
expect(listener).not.toHaveBeenCalled();
});
});
@@ -11,14 +11,12 @@ import { OnLeaderStepdown, OnLeaderTakeover } from '../on-multi-main-event';
class MockMultiMainSetup extends EventEmitter {
registerEventHandlers() {
const handlers = Container.get(MultiMainMetadata).getHandlers();
for (const { eventHandlerClass, methodName, eventName } of handlers) {
const instance = Container.get(eventHandlerClass);
Container.get(MultiMainMetadata).subscribe(({ eventHandlerClass, methodName, eventName }) => {
this.on(eventName, async () => {
const instance = Container.get(eventHandlerClass);
return await instance[methodName].call(instance);
});
}
});
}
}
@@ -173,6 +171,28 @@ it('should register handlers from multiple service classes', async () => {
expect(secondService.handlerCalled).toBe(true);
});
it('should wire handlers registered after registerEventHandlers() runs', async () => {
// Subscribe before the decorated class is declared — mirrors a service whose
// module is loaded after MultiMainSetup has already started routing events.
multiMainSetup.registerEventHandlers();
@Service()
class LateService {
handlerCalled = false;
@OnLeaderTakeover()
async handleTakeover() {
this.handlerCalled = true;
}
}
const lateService = Container.get(LateService);
multiMainSetup.emit(LEADER_TAKEOVER_EVENT_NAME);
expect(lateService.handlerCalled).toBe(true);
});
it('should handle async methods correctly', async () => {
@Service()
class TestService {
@@ -1,2 +1,3 @@
export { MultiMainMetadata } from './multi-main-metadata';
export type { MultiMainEventHandler } from './multi-main-metadata';
export { OnLeaderTakeover, OnLeaderStepdown } from './on-multi-main-event';
@@ -1,4 +1,5 @@
import { Service } from '@n8n/di';
import { UnexpectedError } from 'n8n-workflow';
import type { EventHandler } from '../types';
@@ -7,17 +8,31 @@ export const LEADER_STEPDOWN_EVENT_NAME = 'leader-stepdown';
export type MultiMainEvent = typeof LEADER_TAKEOVER_EVENT_NAME | typeof LEADER_STEPDOWN_EVENT_NAME;
type MultiMainEventHandler = EventHandler<MultiMainEvent>;
export type MultiMainEventHandler = EventHandler<MultiMainEvent>;
@Service()
export class MultiMainMetadata {
private readonly handlers: MultiMainEventHandler[] = [];
private onRegister?: (handler: MultiMainEventHandler) => void;
register(handler: MultiMainEventHandler) {
this.handlers.push(handler);
this.onRegister?.(handler);
}
getHandlers(): MultiMainEventHandler[] {
return this.handlers;
/**
* Subscribe to handler registrations. Immediately replays every handler
* registered so far, then notifies the listener on each subsequent
* registration. This lets listeners be wired regardless of when the
* decorated class's module is loaded.
*/
subscribe(listener: (handler: MultiMainEventHandler) => void) {
if (this.onRegister) {
throw new UnexpectedError('A listener is already subscribed to handler registrations');
}
this.onRegister = listener;
for (const handler of this.handlers) listener(handler);
}
}
+4 -22
View File
@@ -286,28 +286,6 @@ export class Start extends BaseCommand<z.infer<typeof flagsSchema>> {
await Container.get(AuthHandlerRegistry).init();
if (this.instanceSettings.isMultiMain) {
// we instantiate `PrometheusMetricsService` early to register its multi-main event handlers
if (this.globalConfig.endpoints.metrics.enable) {
const { PrometheusMetricsService } = await import('@/metrics/prometheus');
Container.get(PrometheusMetricsService);
}
// we instantiate the publication services early to register their multi-main event handlers
if (this.globalConfig.workflows.useWorkflowPublicationService) {
const { WorkflowPublicationOutboxConsumer } = await import(
'@/workflows/publication/workflow-publication-outbox-consumer'
);
const { PublishedWorkflowEnqueuer } = await import(
'@/workflows/publication/published-workflow-enqueuer'
);
const { PublishedWorkflowTriggerDeactivator } = await import(
'@/workflows/publication/published-workflow-trigger-deactivator'
);
Container.get(WorkflowPublicationOutboxConsumer);
Container.get(PublishedWorkflowEnqueuer);
Container.get(PublishedWorkflowTriggerDeactivator);
}
Container.get(MultiMainSetup).registerEventHandlers();
}
@@ -435,6 +413,10 @@ export class Start extends BaseCommand<z.infer<typeof flagsSchema>> {
'@/workflows/publication/workflow-publication-outbox-consumer'
);
// Import for its side effect: registering the trigger deactivator's
// @OnLeaderStepdown and @OnShutdown handlers. Nothing else loads this module.
await import('@/workflows/publication/published-workflow-trigger-deactivator');
// Enqueue needs to happen before outbox consumer init, so it can activate
// everything on the first drain
if (this.instanceSettings.isLeader) {
@@ -90,10 +90,6 @@ describe('PrometheusInstanceRoleMetricsService', () => {
expect(mockGaugeSet).toHaveBeenCalledWith(1);
});
it('should not throw when called before init', () => {
expect(() => service.updateOnLeaderTakeover()).not.toThrow();
});
});
describe('updateOnLeaderStepdown', () => {
@@ -105,60 +101,5 @@ describe('PrometheusInstanceRoleMetricsService', () => {
expect(mockGaugeSet).toHaveBeenCalledWith(0);
});
it('should not throw when called before init', () => {
expect(() => service.updateOnLeaderStepdown()).not.toThrow();
});
});
describe('multi-main early-instantiation sequence', () => {
// In multi-main mode start.ts does Container.get(PrometheusMetricsService) without
// calling init() so that the @OnLeaderTakeover/@OnLeaderStepdown decorators are
// registered before MultiMainSetup.registerEventHandlers() runs. Leader events can
// therefore fire before init() is called. init() must always recover to the correct
// state regardless of what happened in the window before it ran.
it('silently drops leader-takeover calls before init, then init sets gauge from current isLeader', () => {
Object.assign(instanceSettings, { isLeader: false });
// Fire a takeover before the gauge is created — must not throw
service.updateOnLeaderTakeover();
expect(mockGaugeSet).not.toHaveBeenCalled();
// init() reads the current isLeader value and sets the gauge accordingly
service.init();
expect(mockGaugeSet).toHaveBeenLastCalledWith(0);
});
it('silently drops leader-stepdown calls before init, then init sets gauge from current isLeader', () => {
Object.assign(instanceSettings, { isLeader: true });
// Fire a stepdown before the gauge is created — must not throw
service.updateOnLeaderStepdown();
expect(mockGaugeSet).not.toHaveBeenCalled();
// init() reads the current (leader) isLeader value and sets the gauge to 1
service.init();
expect(mockGaugeSet).toHaveBeenLastCalledWith(1);
});
it('correctly handles leader events after init() recovers from a pre-init takeover', () => {
Object.assign(instanceSettings, { isLeader: false });
// Pre-init takeover (dropped)
service.updateOnLeaderTakeover();
// init() sets gauge to 0 (isLeader is false)
service.init();
jest.clearAllMocks();
// Post-init takeover: gauge must update to 1
service.updateOnLeaderTakeover();
expect(mockGaugeSet).toHaveBeenCalledWith(1);
// And stepdown: gauge must update to 0
service.updateOnLeaderStepdown();
expect(mockGaugeSet).toHaveBeenLastCalledWith(0);
});
});
});
@@ -12,7 +12,7 @@ import type { PrometheusMetricsCollector } from './base';
*/
@Service()
export class PrometheusInstanceRoleMetricsService implements PrometheusMetricsCollector {
private gauge: Gauge | undefined;
private gauge!: Gauge;
constructor(
private readonly config: PrometheusMetricsConfig,
@@ -34,11 +34,11 @@ export class PrometheusInstanceRoleMetricsService implements PrometheusMetricsCo
@OnLeaderTakeover()
updateOnLeaderTakeover() {
this.gauge?.set(1);
this.gauge.set(1);
}
@OnLeaderStepdown()
updateOnLeaderStepdown() {
this.gauge?.set(0);
this.gauge.set(0);
}
}
@@ -42,9 +42,8 @@ export class AgentsModule implements ModuleInterface {
registry.register(Container.get(TelegramIntegration));
registry.register(Container.get(LinearIntegration));
// Register Chat and Task services. Importing the services here also
// registers any @OnLeaderTakeover/@OnLeaderStepdown decorators with
// MultiMainMetadata before start.ts:295 wires up the listeners.
// Reconnect Chat and Task services on startup so this main resumes its
// integrations and tasks for the role it currently holds.
//
// Chat integrations run on every main: webhook-driven platforms (Slack,
// Linear, Telegram in webhook mode) need to be connected on every main
@@ -1,6 +1,8 @@
import { mockLogger } from '@n8n/backend-test-utils';
import type { GlobalConfig } from '@n8n/config';
import { MultiMainMetadata } from '@n8n/decorators';
import type { MultiMainEventHandler } from '@n8n/decorators';
import { Container, Service } from '@n8n/di';
import { mock } from 'jest-mock-extended';
import type { ErrorReporter, InstanceSettings } from 'n8n-core';
import { createResultOk, createResultError } from 'n8n-workflow';
@@ -96,6 +98,42 @@ describe('MultiMainSetup', () => {
});
});
describe('registerEventHandlers', () => {
@Service()
class EarlyHandler {
takeover = jest.fn();
}
@Service()
class LateHandler {
takeover = jest.fn();
}
const handlerFor = (eventHandlerClass: unknown): MultiMainEventHandler => ({
eventHandlerClass: eventHandlerClass as MultiMainEventHandler['eventHandlerClass'],
methodName: 'takeover',
eventName: 'leader-takeover',
});
it('wires handlers registered both before and after it runs', () => {
const early = Container.get(EarlyHandler);
const late = Container.get(LateHandler);
// Registered before routing starts — replayed on subscribe.
metadata.register(handlerFor(EarlyHandler));
multiMainSetup.registerEventHandlers();
// Registered after routing started — wired reactively.
metadata.register(handlerFor(LateHandler));
multiMainSetup.emit('leader-takeover');
expect(early.takeover).toHaveBeenCalledTimes(1);
expect(late.takeover).toHaveBeenCalledTimes(1);
});
});
describe('checkLeader (leader path)', () => {
beforeEach(async () => {
client.setLeaderIfNotExists.mockResolvedValue(createResultOk(true));
@@ -2,6 +2,7 @@ import { Logger, TypedEmitter } from '@n8n/backend-common';
import { GlobalConfig } from '@n8n/config';
import { Time } from '@n8n/constants';
import { MultiMainMetadata } from '@n8n/decorators';
import type { MultiMainEventHandler } from '@n8n/decorators';
import { Container, Service } from '@n8n/di';
import { ErrorReporter, InstanceSettings } from 'n8n-core';
import assert from 'node:assert';
@@ -86,14 +87,17 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
}
registerEventHandlers() {
const handlers = this.metadata.getHandlers();
this.metadata.subscribe((handler) => this.attachHandler(handler));
}
for (const { eventHandlerClass, methodName, eventName } of handlers) {
private attachHandler({ eventHandlerClass, methodName, eventName }: MultiMainEventHandler) {
// Resolve the instance lazily, when the event fires. A handler can register
// while its class is still being decorated (method decorators run before the
// `@Service` class decorator), so the class may not be DI-resolvable yet.
this.on(eventName, async () => {
const instance = Container.get(eventHandlerClass);
this.on(eventName, async () => {
return await instance[methodName].call(instance);
});
}
return await instance[methodName].call(instance);
});
}
private async checkLeader() {