fix(core): Suspend query acquisition during database connection recovery (#32394)

This commit is contained in:
Lorent Lempereur
2026-06-17 12:29:43 +02:00
committed by GitHub
parent 55168bd931
commit 7440fca5a3
6 changed files with 977 additions and 35 deletions
@@ -171,6 +171,65 @@ export class DatabaseConfig {
@Env('DB_PING_TIMEOUT_MS')
pingTimeoutMs: number = readLegacyPingTimeoutMs();
/**
* How many consecutive health-check ping failures must occur before the
* connection is considered lost and the DataSource is torn down and
* reinitialized (full pool recovery).
*
* Recovery is disruptive (it destroys and recreates the connection pool),
* so this guards against a single transient blip triggering it. With the
* default `DB_PING_INTERVAL_SECONDS=2`, the default of 3 means recovery
* fires only after roughly 6s of sustained ping failures.
*
* Raise this if you see recovery triggering on brief network hiccups
* (false positives); lower it to react faster to genuinely dead connections.
*
* Must be >= 1: recovery fires only after at least one failed ping.
*/
@Env('DB_PING_MAX_FAILURES_BEFORE_RECOVERY', z.coerce.number().int().gte(1))
pingMaxFailuresBeforeRecovery: number = 3;
/**
* Initial delay in milliseconds before retrying a failed recovery attempt.
*
* Recovery retries use exponential backoff: each failed attempt waits
* `min(minRecoveryBackoffMs * 2 ** (attempt - 1), maxRecoveryBackoffMs)`.
* This is the delay after the first failed attempt (the floor of the curve).
*
* Must be >= 1
*/
@Env('DB_RECOVERY_BACKOFF_MIN_MS', z.coerce.number().int().gte(1))
minRecoveryBackoffMs: number = 1 * Time.seconds.toMilliseconds;
/**
* Maximum delay in milliseconds between recovery attempts.
*
* Caps the exponential backoff so retries never wait longer than this,
* keeping recovery responsive once the database becomes reachable again.
* Must be greater than or equal to `DB_RECOVERY_BACKOFF_MIN_MS`.
*
* Must be >= 1
*/
@Env('DB_RECOVERY_BACKOFF_MAX_MS', z.coerce.number().int().gte(1))
maxRecoveryBackoffMs: number = 30 * Time.seconds.toMilliseconds;
/**
* Maximum time in milliseconds a query waits for an in-progress connection
* recovery before failing fast.
*
* While recovery rebuilds the pool, every query that needs a connection parks
* until recovery completes. During a short blip that wait is invisible and the
* query succeeds against the fresh pool. During a long outage it would
* otherwise pile up parked queries for the whole outage, so this bounds the
* wait: once it elapses the query rejects with an `OperationalError` instead of
* holding the request open indefinitely. The default (30s) comfortably covers
* common-case recoveries while staying within typical HTTP gateway timeouts.
*
* Set to `0` to wait indefinitely (no timeout).
*/
@Env('DB_CONNECTION_ACQUISITION_TIMEOUT_MS', z.coerce.number().int().gte(0))
connectionAcquisitionTimeoutMs: number = 30 * Time.seconds.toMilliseconds;
@Nested
logging: LoggingConfig;
+66
View File
@@ -117,6 +117,10 @@ describe('GlobalConfig', () => {
type: 'sqlite',
pingIntervalSeconds: 2,
pingTimeoutMs: 5_000,
pingMaxFailuresBeforeRecovery: 3,
minRecoveryBackoffMs: 1_000,
maxRecoveryBackoffMs: 30_000,
connectionAcquisitionTimeoutMs: 30_000,
} as DatabaseConfig,
credentials: {
defaultName: 'My credentials',
@@ -639,6 +643,10 @@ describe('GlobalConfig', () => {
type: 'sqlite',
pingIntervalSeconds: 2,
pingTimeoutMs: 5_000,
pingMaxFailuresBeforeRecovery: 3,
minRecoveryBackoffMs: 1_000,
maxRecoveryBackoffMs: 30_000,
connectionAcquisitionTimeoutMs: 30_000,
},
endpoints: {
...defaultConfig.endpoints,
@@ -725,6 +733,64 @@ describe('GlobalConfig', () => {
);
});
describe('database recovery config validation', () => {
it('should reject DB_PING_MAX_FAILURES_BEFORE_RECOVERY below 1 and fall back to the default', () => {
process.env = { DB_PING_MAX_FAILURES_BEFORE_RECOVERY: '0' };
const config = Container.get(GlobalConfig);
expect(config.database.pingMaxFailuresBeforeRecovery).toBe(3);
expect(consoleWarnMock).toHaveBeenCalledWith(
expect.stringContaining('DB_PING_MAX_FAILURES_BEFORE_RECOVERY'),
);
});
it('should reject an empty DB_PING_MAX_FAILURES_BEFORE_RECOVERY (coerces to 0, not NaN)', () => {
process.env = { DB_PING_MAX_FAILURES_BEFORE_RECOVERY: '' };
const config = Container.get(GlobalConfig);
expect(config.database.pingMaxFailuresBeforeRecovery).toBe(3);
});
it('should reject DB_RECOVERY_BACKOFF_MIN_MS of 0 and fall back to the default', () => {
process.env = { DB_RECOVERY_BACKOFF_MIN_MS: '0' };
const config = Container.get(GlobalConfig);
expect(config.database.minRecoveryBackoffMs).toBe(1000);
expect(consoleWarnMock).toHaveBeenCalledWith(
expect.stringContaining('DB_RECOVERY_BACKOFF_MIN_MS'),
);
});
it('should reject a negative DB_RECOVERY_BACKOFF_MAX_MS and fall back to the default', () => {
process.env = { DB_RECOVERY_BACKOFF_MAX_MS: '-5' };
const config = Container.get(GlobalConfig);
expect(config.database.maxRecoveryBackoffMs).toBe(30000);
});
it('should accept 0 for DB_CONNECTION_ACQUISITION_TIMEOUT_MS (wait indefinitely)', () => {
process.env = { DB_CONNECTION_ACQUISITION_TIMEOUT_MS: '0' };
const config = Container.get(GlobalConfig);
expect(config.database.connectionAcquisitionTimeoutMs).toBe(0);
});
it('should reject a negative DB_CONNECTION_ACQUISITION_TIMEOUT_MS and fall back to the default', () => {
process.env = { DB_CONNECTION_ACQUISITION_TIMEOUT_MS: '-1' };
const config = Container.get(GlobalConfig);
expect(config.database.connectionAcquisitionTimeoutMs).toBe(30000);
});
it('should accept valid recovery overrides', () => {
process.env = {
DB_PING_MAX_FAILURES_BEFORE_RECOVERY: '5',
DB_RECOVERY_BACKOFF_MIN_MS: '500',
DB_RECOVERY_BACKOFF_MAX_MS: '60000',
DB_CONNECTION_ACQUISITION_TIMEOUT_MS: '10000',
};
const config = Container.get(GlobalConfig);
expect(config.database.pingMaxFailuresBeforeRecovery).toBe(5);
expect(config.database.minRecoveryBackoffMs).toBe(500);
expect(config.database.maxRecoveryBackoffMs).toBe(60000);
expect(config.database.connectionAcquisitionTimeoutMs).toBe(10000);
});
});
it('should clamp password min length to valid range', () => {
process.env = { N8N_PASSWORD_MIN_LENGTH: '100' };
const config = Container.get(GlobalConfig);
@@ -0,0 +1,219 @@
import type { Logger } from '@n8n/backend-common';
import type { DatabaseConfig } from '@n8n/config';
import { DataSource } from '@n8n/typeorm';
import { PostgreSqlContainer, type StartedPostgreSqlContainer } from '@testcontainers/postgresql';
import type { ErrorReporter } from 'n8n-core';
import { getContainerRuntimeClient } from 'testcontainers';
import { mock } from 'vitest-mock-extended';
import { DbConnectionMonitor } from '../db-connection-monitor';
/**
* Integration coverage for the connection-acquisition suspension (CAT-3455).
*
* Unlike the unit suite (which mocks the DataSource), this exercises a REAL
* Postgres pool through a REAL recovery (`destroy()` + `initialize()`) and proves
* that a real TypeORM query issued mid-recovery is held and then completes against
* the rebuilt pool, instead of throwing `Cannot use a pool after calling end on
* the pool` / `Driver not Connected`.
*
* The acquisition wrapper only exists for Postgres: it hooks `driver.obtainMasterConnection`,
* which is the genuine connection chokepoint for the Postgres driver.
* The sqlite driver's `obtainMasterConnection` is a no-op,
* so there is nothing to suspend there and no in-process substitute for this test.
*
* Connection source, in priority order:
* 1. The standard `DB_POSTGRESDB_*` environment variables, when set — the same
* convention the rest of the suite uses (see
* packages/cli/test/setup-testcontainers.js). This is how you run the suite
* locally: point them at any running Postgres.
* 2. Otherwise a throwaway Postgres container, when a container runtime is
* available. This is what lets the suite run on CI, whose unit lane has no
* external database but does have a usable container runtime.
*
* When neither is available the suite is skipped, so `pnpm test` still passes on
* a machine with no Postgres and no usable container runtime.
*
* To run it locally, boot a Postgres and export the host, e.g.:
* docker run --rm -e POSTGRES_PASSWORD=password -p 5432:5432 postgres:18-alpine
* DB_POSTGRESDB_HOST=localhost DB_POSTGRESDB_PASSWORD=password \
* pnpm test db-connection-monitor.recovery.postgres
*/
const POSTGRES_IMAGE = 'postgres:18-alpine';
const CONTAINER_TIMEOUT_MS = 180_000;
const TEST_TIMEOUT_MS = 60_000;
// Prefer an externally-provided Postgres (local dev, cli-style integration lanes).
const externalHost = process.env.DB_POSTGRESDB_HOST ?? '';
const useExternalPostgres = externalHost.length > 0;
// The recovery trigger is private; expose it for the test without loosening prod typing.
type MonitorInternals = { recoverDataSource: () => Promise<void> };
type PgConnection = {
host: string;
port: number;
username: string;
password: string;
database: string;
};
const buildDatabaseConfig = () =>
mock<DatabaseConfig>({
pingTimeoutMs: 5_000,
pingMaxFailuresBeforeRecovery: 3,
minRecoveryBackoffMs: 1_000,
maxRecoveryBackoffMs: 30_000,
connectionAcquisitionTimeoutMs: 30_000,
});
const newDataSource = (conn: PgConnection) =>
new DataSource({
type: 'postgres',
host: conn.host,
port: conn.port,
username: conn.username,
password: conn.password,
database: conn.database,
// No entities/migrations — the test only runs trivial SELECTs through the driver.
synchronize: false,
});
describe('DbConnectionMonitor recovery against real Postgres', () => {
let container: StartedPostgreSqlContainer | undefined;
// Left undefined when no Postgres is reachable; the tests skip themselves in
// that case (see the `ctx.skip()` guards below).
let connection: PgConnection | undefined;
beforeAll(async () => {
if (useExternalPostgres) {
connection = {
host: externalHost,
port: Number(process.env.DB_POSTGRESDB_PORT ?? '5432'),
username: process.env.DB_POSTGRESDB_USER ?? 'postgres',
password: process.env.DB_POSTGRESDB_PASSWORD ?? 'postgres',
database: process.env.DB_POSTGRESDB_DATABASE ?? 'postgres',
};
return;
}
// No external Postgres: fall back to a throwaway container, but only where a
// runtime is usable. Probe via testcontainers' own detection rather than
// `docker info` — the CLI can succeed while testcontainers fails to find a
// working runtime strategy, which would make `start()` throw instead of
// leaving the tests to skip.
const runtimeAvailable = await getContainerRuntimeClient().then(
() => true,
() => false,
);
if (!runtimeAvailable) {
return;
}
container = await new PostgreSqlContainer(POSTGRES_IMAGE).start();
connection = {
host: container.getHost(),
port: container.getMappedPort(5432),
username: container.getUsername(),
password: container.getPassword(),
database: container.getDatabase(),
};
}, CONTAINER_TIMEOUT_MS);
afterAll(async () => {
await container?.stop();
}, CONTAINER_TIMEOUT_MS);
it(
'control: acquiring a connection after the pool is torn down rejects',
async (ctx) => {
if (!connection) {
ctx.skip();
return;
}
// Establishes that this environment reproduces the failure the wrapper guards
// against: with no monitor/wrapper installed, acquiring a connection after the
// pool is ended rejects with the exact error seen in production.
const dataSource = newDataSource(connection);
await dataSource.initialize();
const driver = dataSource.driver as unknown as MonitorInternals &
Record<'obtainMasterConnection', () => Promise<unknown>>;
await dataSource.destroy();
await expect(driver.obtainMasterConnection()).rejects.toThrow(
/Cannot use a pool after calling end on the pool|Driver not Connected/,
);
},
TEST_TIMEOUT_MS,
);
it(
'suspends a real query during recovery and runs it on the rebuilt pool',
async (ctx) => {
if (!connection) {
ctx.skip();
return;
}
const dataSource = newDataSource(connection);
await dataSource.initialize();
const monitor = new DbConnectionMonitor(
dataSource,
() => {},
buildDatabaseConfig(),
mock<Logger>(),
mock<ErrorReporter>(),
);
// start() installs the obtainMasterConnection wrapper on the live driver.
// Under vitest (NODE_ENV=test) it does not schedule background pings, so the
// only recovery is the one we trigger explicitly below.
monitor.start();
try {
// Sanity: queries work before any recovery.
expect(await dataSource.query('SELECT 1 AS ok')).toEqual([{ ok: 1 }]);
// A query runner bound to the *current* (soon-to-be-destroyed) driver.
const queryRunner = dataSource.createQueryRunner();
// Kick off a real recovery (destroy + reinitialize the live pool). The
// pending-recovery promise is set synchronously, before recovery's first
// await, so the query below is guaranteed to land inside the window.
let recoveryDone = false;
const recovery = (monitor as unknown as MonitorInternals).recoverDataSource().then(() => {
recoveryDone = true;
});
// PostgresQueryRunner.query -> connect() -> driver.obtainMasterConnection,
// i.e. the exact chokepoint. Without suspension this rejects with the
// pool-after-end error; with it, the call waits out recovery and retries against
// the rebuilt pool.
const rows = (
await Promise.all([queryRunner.query('SELECT 42 AS answer'), recovery])
)[0] as Array<{ answer: number }>;
// The real proof of suspension is that the query produced its result at all:
// without the wrapper it would have rejected with the pool-after-end error
// ("Cannot use a pool after calling end on the pool") when it
// hit the torn-down pool, failing the `Promise.all` above before we reached
// this line. `recoveryDone` is necessarily true here (the `Promise.all` awaits
// `recovery`); we assert it only to pin that recovery actually ran.
expect(recoveryDone).toBe(true);
expect(rows).toEqual([{ answer: 42 }]);
await queryRunner.release();
// The pool is healthy afterwards.
expect(await dataSource.query('SELECT 1 AS ok')).toEqual([{ ok: 1 }]);
} finally {
await monitor.stop();
if (dataSource.isInitialized) {
await dataSource.destroy();
}
}
},
TEST_TIMEOUT_MS,
);
});
@@ -25,7 +25,13 @@ describe('DbConnectionMonitor', () => {
let monitor: DbConnectionMonitor;
let onConnectedChange: MockedFunction<(connected: boolean) => void>;
const errorReporter = mock<ErrorReporter>();
const databaseConfig = mock<DatabaseConfig>({ pingTimeoutMs: 5_000 });
const databaseConfig = mock<DatabaseConfig>({
pingTimeoutMs: 5_000,
pingMaxFailuresBeforeRecovery: 3,
minRecoveryBackoffMs: 1_000,
maxRecoveryBackoffMs: 30_000,
connectionAcquisitionTimeoutMs: 30_000,
});
const logger = mock<Logger>();
const dataSource = mockDeep<DataSource>({ options: { type: 'postgres' } });
@@ -132,7 +138,7 @@ describe('DbConnectionMonitor', () => {
it('should not query if monitor is stopped', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
monitor.stop();
void monitor.stop();
// @ts-expect-error private property
await monitor.ping();
@@ -270,7 +276,7 @@ describe('DbConnectionMonitor', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const pingSpy = vi.spyOn(scheduledMonitor as any, 'ping');
scheduledMonitor.stop();
void scheduledMonitor.stop();
// @ts-expect-error private property
scheduledMonitor.scheduleNextPing();
vi.advanceTimersByTime(1000);
@@ -315,7 +321,7 @@ describe('DbConnectionMonitor', () => {
});
it('should be a no-op when monitor is stopped', async () => {
monitor.stop();
void monitor.stop();
// @ts-expect-error private property
await monitor.recoverDataSource();
@@ -414,6 +420,50 @@ describe('DbConnectionMonitor', () => {
);
});
it('should not drop below the floor when max backoff is misconfigured below min', async () => {
// A misconfiguration (max < min) must degrade to a constant `min` delay rather
// than collapsing every retry onto the smaller max value, which would defeat the floor.
const misconfigured = mock<DatabaseConfig>({
pingTimeoutMs: 5_000,
pingMaxFailuresBeforeRecovery: 3,
minRecoveryBackoffMs: 1_000,
maxRecoveryBackoffMs: 100,
});
const misconfiguredMonitor = new DbConnectionMonitor(
dataSource,
onConnectedChange,
misconfigured,
logger,
errorReporter,
);
// @ts-expect-error readonly property
dataSource.isInitialized = true;
dataSource.destroy.mockResolvedValue();
dataSource.initialize
.mockRejectedValueOnce(new Error('down'))
.mockRejectedValueOnce(new Error('down'))
.mockResolvedValueOnce(dataSource);
mockedSetTimeoutP.mockResolvedValue(undefined);
// @ts-expect-error private property
await misconfiguredMonitor.recoverDataSource();
// Both backoffs clamp to the floor (1s) instead of the bogus 100ms max.
expect(mockedSetTimeoutP).toHaveBeenNthCalledWith(
1,
1_000,
undefined,
expect.objectContaining({ signal: expect.any(AbortSignal) }),
);
expect(mockedSetTimeoutP).toHaveBeenNthCalledWith(
2,
1_000,
undefined,
expect.objectContaining({ signal: expect.any(AbortSignal) }),
);
});
it('should exit the recovery loop when stop() is called during backoff', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
@@ -433,7 +483,7 @@ describe('DbConnectionMonitor', () => {
await flushMicrotasks();
expect(dataSource.initialize).toHaveBeenCalledTimes(1);
monitor.stop();
void monitor.stop();
resolveBackoff();
await recoveryPromise;
@@ -471,7 +521,7 @@ describe('DbConnectionMonitor', () => {
expect.objectContaining({ signal: expect.any(AbortSignal) }),
);
monitor.stop();
void monitor.stop();
await recoveryPromise;
// AbortError is swallowed; the loop exits on the next iteration without retrying.
@@ -489,8 +539,14 @@ describe('DbConnectionMonitor', () => {
dataSource.destroy.mockResolvedValue();
dataSource.initialize.mockResolvedValue(dataSource);
const on = vi.fn();
(dataSource as unknown as { driver: { master: { on: Mock } } }).driver = {
(
dataSource as unknown as {
driver: { master: { on: Mock }; obtainMasterConnection: () => Promise<unknown> };
}
).driver = {
master: { on },
// start()/recoverDataSource() wrap obtainMasterConnection unconditionally.
obtainMasterConnection: vi.fn().mockResolvedValue(undefined),
};
// @ts-expect-error private property
@@ -511,13 +567,90 @@ describe('DbConnectionMonitor', () => {
const recoveryPromise = monitor.recoverDataSource();
// Stop while destroy is still pending.
monitor.stop();
void monitor.stop();
await recoveryPromise;
expect(dataSource.destroy).toHaveBeenCalled();
expect(dataSource.initialize).not.toHaveBeenCalled();
});
it('should await the in-flight recovery so teardown is ordered after it', async () => {
// stop() must not resolve until the fire-and-forget recovery launched by ping()
// has unwound, so the owner can destroy the DataSource without racing the loop.
// @ts-expect-error readonly property
dataSource.isInitialized = true;
let resolveDestroy: () => void = () => {};
let destroyResolved = false;
dataSource.destroy.mockImplementation(
async () =>
await new Promise<void>((resolve) => {
resolveDestroy = () => {
destroyResolved = true;
resolve();
};
}),
);
// Launch recovery exactly as a failed ping() does (`this.recoveryPromise =
// this.recoverDataSource()`), without driving the ping loop itself: the mock
// config has no pingIntervalSeconds, so scheduleNextPing() would spin a
// runaway ~1ms setTimeout while recovery is parked, which starves the event
// loop and times the test out under CI load.
// @ts-expect-error private property
monitor.recoveryPromise = monitor.recoverDataSource();
await flushMicrotasks();
// Recovery is parked inside the slow destroy().
expect(dataSource.destroy).toHaveBeenCalled();
const stopPromise = monitor.stop();
let stopResolved = false;
void stopPromise.then(() => (stopResolved = true));
// stop() cannot resolve while recovery is still draining destroy().
await flushMicrotasks();
expect(stopResolved).toBe(false);
resolveDestroy();
await stopPromise;
expect(destroyResolved).toBe(true);
expect(stopResolved).toBe(true);
// The post-destroy `if (this.stopped) break;` prevents reinitialization.
expect(dataSource.initialize).not.toHaveBeenCalled();
});
});
describe('start', () => {
it('should warn once when max recovery backoff is configured below min', () => {
const misconfigured = mock<DatabaseConfig>({
pingTimeoutMs: 5_000,
pingMaxFailuresBeforeRecovery: 3,
minRecoveryBackoffMs: 1_000,
maxRecoveryBackoffMs: 100,
});
const misconfiguredMonitor = new DbConnectionMonitor(
dataSource,
onConnectedChange,
misconfigured,
logger,
errorReporter,
);
misconfiguredMonitor.start();
expect(logger.warn).toHaveBeenCalledWith(
expect.stringContaining('DB_RECOVERY_BACKOFF_MAX_MS'),
);
});
it('should not warn when backoff bounds are valid', () => {
monitor.start();
expect(logger.warn).not.toHaveBeenCalledWith(
expect.stringContaining('DB_RECOVERY_BACKOFF_MAX_MS'),
);
});
});
describe('attachPoolErrorHandler', () => {
@@ -526,9 +659,15 @@ describe('DbConnectionMonitor', () => {
// and matches the unsafe cast the production code uses to reach driver.master.
type DriverShape = {
master?: { on?: (event: string, handler: (cause: unknown) => void) => void };
obtainMasterConnection?: () => Promise<unknown>;
};
// start() wraps obtainMasterConnection unconditionally, so every driver needs it present
// or wrapConnectionAcquisition throws. Default it; tests override what they assert on.
const setDriver = (driver: DriverShape) => {
(dataSource as unknown as { driver: DriverShape }).driver = driver;
(dataSource as unknown as { driver: DriverShape }).driver = {
obtainMasterConnection: vi.fn().mockResolvedValue(undefined),
...driver,
};
};
it('should attach an error listener to the Postgres driver pool', () => {
@@ -560,6 +699,35 @@ describe('DbConnectionMonitor', () => {
);
});
it('should ignore errors from a pool that recovery has since replaced', () => {
// Recovery swaps in a fresh driver+pool but the old pool can still emit a late
// 'error' while tearing down. Acting on it would mark the just-recovered instance
// unhealthy, so the stale pool's handler must be inert.
let staleHandler: ((cause: unknown) => void) | undefined;
const on = vi.fn((_event: string, h: (cause: unknown) => void) => {
staleHandler = h;
});
const stalePool = { on };
setDriver({ master: stalePool });
// @ts-expect-error private property
monitor.connected = true;
monitor.start();
expect(staleHandler).toBeDefined();
// A recovery replaced the driver's master with a brand-new pool.
(dataSource as unknown as { driver: DriverShape }).driver.master = {
on: vi.fn(),
};
staleHandler?.(new Error('terminating connection due to administrator command'));
expect(onConnectedChange).not.toHaveBeenCalledWith(false);
expect(logger.debug).toHaveBeenCalledWith(
expect.stringContaining('Ignoring Postgres pool error'),
);
});
it('should skip attaching when the driver is not Postgres', () => {
const sqliteDataSource = mockDeep<DataSource>({ options: { type: 'sqlite-pooled' } });
const sqliteMonitor = new DbConnectionMonitor(
@@ -589,13 +757,240 @@ describe('DbConnectionMonitor', () => {
});
});
describe('connection acquisition during recovery', () => {
// Minimal driver shape, same rationale as the attachPoolErrorHandler block:
// avoids TS2590 on the full driver union and mirrors the production cast.
type AcquisitionDriverShape = {
master?: { on?: (event: string, handler: (cause: unknown) => void) => void };
obtainMasterConnection?: () => Promise<unknown>;
};
const setDriver = (driver: AcquisitionDriverShape) => {
(dataSource as unknown as { driver: AcquisitionDriverShape }).driver = driver;
};
// The private recovery helpers, reached the same way the rest of this file
// pokes at internals.
const internals = () =>
monitor as unknown as {
acquireConnection: (original: () => Promise<unknown>) => Promise<unknown>;
markRecoveryPending: () => void;
clearPendingRecovery: () => void;
recovering: boolean;
liveObtainMasterConnection: (() => Promise<unknown>) | undefined;
};
const POOL_ENDED = 'Cannot use a pool after calling end on the pool';
it('should wrap driver.obtainMasterConnection on start for Postgres', async () => {
const original = vi.fn().mockResolvedValue('connection');
setDriver({ obtainMasterConnection: original });
monitor.start();
const driver = (dataSource as unknown as { driver: AcquisitionDriverShape }).driver;
// The instance method is now a wrapper, not the original.
expect(driver.obtainMasterConnection).not.toBe(original);
await expect(driver.obtainMasterConnection?.()).resolves.toBe('connection');
expect(original).toHaveBeenCalledTimes(1);
});
it('should pass connection acquisition straight through when idle', async () => {
const original = vi.fn().mockResolvedValue('connection');
await expect(internals().acquireConnection(original)).resolves.toBe('connection');
expect(original).toHaveBeenCalledTimes(1);
});
it('should hold connection acquisition while a recovery is in progress', async () => {
const original = vi.fn().mockResolvedValue('connection');
internals().markRecoveryPending();
let settled = false;
const pending = internals()
.acquireConnection(original)
.then((result) => {
settled = true;
return result;
});
await flushMicrotasks();
// Recovery is pending: acquisition must not have run yet.
expect(settled).toBe(false);
expect(original).not.toHaveBeenCalled();
internals().clearPendingRecovery();
await expect(pending).resolves.toBe('connection');
expect(original).toHaveBeenCalledTimes(1);
});
it('should retry against the live driver when acquisition loses the destroy race', async () => {
// A query holding the previous (destroyed) driver hits the ended pool; once
// recovery has swapped in a new driver, the retry must reach the live pool.
const stale = vi.fn().mockRejectedValue(new Error(POOL_ENDED));
const live = vi.fn().mockResolvedValue('fresh-connection');
internals().recovering = true;
internals().liveObtainMasterConnection = live;
await expect(internals().acquireConnection(stale)).resolves.toBe('fresh-connection');
expect(stale).toHaveBeenCalledTimes(1);
expect(live).toHaveBeenCalledTimes(1);
});
it('should retry stale acquisition errors after recovery has completed', async () => {
const stale = vi.fn().mockRejectedValue(new Error(POOL_ENDED));
const live = vi.fn().mockResolvedValue('fresh-connection');
internals().recovering = false;
internals().liveObtainMasterConnection = live;
await expect(internals().acquireConnection(stale)).resolves.toBe('fresh-connection');
expect(stale).toHaveBeenCalledTimes(1);
expect(live).toHaveBeenCalledTimes(1);
});
it('should retry on a "Driver not Connected" error during recovery', async () => {
const stale = vi.fn().mockRejectedValue(new Error('Driver not Connected'));
const live = vi.fn().mockResolvedValue('fresh-connection');
internals().recovering = true;
internals().liveObtainMasterConnection = live;
await expect(internals().acquireConnection(stale)).resolves.toBe('fresh-connection');
expect(live).toHaveBeenCalledTimes(1);
});
it('should surface a pool error when no recovery is in progress', async () => {
// Outside a recovery window the pool is genuinely unavailable; masking it
// with a retry would hide a real outage.
const original = vi.fn().mockRejectedValue(new Error(POOL_ENDED));
internals().recovering = false;
internals().liveObtainMasterConnection = original;
await expect(internals().acquireConnection(original)).rejects.toThrow(POOL_ENDED);
expect(original).toHaveBeenCalledTimes(1);
});
it('should not retry an unrelated error even during recovery', async () => {
const original = vi.fn().mockRejectedValue(new Error('syntax error at or near "FROM"'));
const live = vi.fn().mockResolvedValue('fresh-connection');
internals().recovering = true;
internals().liveObtainMasterConnection = live;
await expect(internals().acquireConnection(original)).rejects.toThrow('syntax error');
expect(live).not.toHaveBeenCalled();
});
it('should fail fast with an OperationalError when recovery exceeds the acquisition timeout', async () => {
// A long outage parks every query here; without the bound they would pile up for
// the whole outage. Once the timeout elapses the query must reject instead.
const original = vi.fn().mockResolvedValue('connection');
internals().markRecoveryPending();
// Fire the timeout side of the race immediately.
mockedSetTimeoutP.mockResolvedValueOnce(undefined);
await expect(internals().acquireConnection(original)).rejects.toThrow(
'Timed out after 30000ms waiting for database connection recovery',
);
// The query never reached the pool.
expect(original).not.toHaveBeenCalled();
});
it('should acquire normally when recovery completes before the acquisition timeout', async () => {
// The timeout must not fire when recovery wins the race.
const original = vi.fn().mockResolvedValue('connection');
internals().markRecoveryPending();
const pending = internals().acquireConnection(original);
await flushMicrotasks();
expect(original).not.toHaveBeenCalled();
// Recovery completes before the (never-resolving) timeout.
internals().clearPendingRecovery();
await expect(pending).resolves.toBe('connection');
expect(original).toHaveBeenCalledTimes(1);
});
it('should wait indefinitely when the acquisition timeout is 0', async () => {
const noTimeoutMonitor = new DbConnectionMonitor(
dataSource,
onConnectedChange,
mock<DatabaseConfig>({
pingTimeoutMs: 5_000,
pingMaxFailuresBeforeRecovery: 3,
minRecoveryBackoffMs: 1_000,
maxRecoveryBackoffMs: 30_000,
connectionAcquisitionTimeoutMs: 0,
}),
logger,
errorReporter,
);
const noTimeoutInternals = noTimeoutMonitor as unknown as {
acquireConnection: (original: () => Promise<unknown>) => Promise<unknown>;
markRecoveryPending: () => void;
clearPendingRecovery: () => void;
};
const original = vi.fn().mockResolvedValue('connection');
noTimeoutInternals.markRecoveryPending();
// Behavioural proof that no timeout is armed: if `awaitRecovery` ever raced a timeout here it would reject.
// Asserting on the shared `setTimeoutP` call count is unreliable because other tests leave a fire-and-forget ping loop calling it.
mockedSetTimeoutP.mockImplementation(async () => {
await Promise.resolve();
throw new Error('timeout should not be armed when the acquisition timeout is 0');
});
let settled = false;
const pending = noTimeoutInternals.acquireConnection(original).then((result) => {
settled = true;
return result;
});
await flushMicrotasks();
// The acquisition stays parked on recovery rather than failing fast.
expect(settled).toBe(false);
noTimeoutInternals.clearPendingRecovery();
// Resolves cleanly once recovery completes; never rejects on a timeout.
await expect(pending).resolves.toBe('connection');
});
it('should release queued acquisitions when stop() is called', async () => {
const original = vi.fn().mockResolvedValue('connection');
internals().markRecoveryPending();
const pending = internals().acquireConnection(original);
await flushMicrotasks();
expect(original).not.toHaveBeenCalled();
void monitor.stop();
await expect(pending).resolves.toBe('connection');
});
it('should re-wrap obtainMasterConnection after a successful recovery', async () => {
// initialize() builds a fresh driver instance, so the wrapper installed at
// start() is gone; without re-wrapping, the new pool would not wait during recovery.
// @ts-expect-error readonly property
dataSource.isInitialized = true;
dataSource.destroy.mockResolvedValue();
dataSource.initialize.mockResolvedValue(dataSource);
const original = vi.fn().mockResolvedValue('connection');
setDriver({ obtainMasterConnection: original });
// @ts-expect-error private property
await monitor.recoverDataSource();
const driver = (dataSource as unknown as { driver: AcquisitionDriverShape }).driver;
expect(driver.obtainMasterConnection).not.toBe(original);
});
});
describe('stop', () => {
it('should clear the ping timer', () => {
const clearTimeoutSpy = vi.spyOn(global, 'clearTimeout');
// @ts-expect-error private property
monitor.pingTimer = setTimeout(() => {}, 1000);
monitor.stop();
void monitor.stop();
expect(clearTimeoutSpy).toHaveBeenCalled();
// @ts-expect-error private property
@@ -603,7 +998,7 @@ describe('DbConnectionMonitor', () => {
});
it('should latch `stopped` so future scheduling is skipped', () => {
monitor.stop();
void monitor.stop();
// @ts-expect-error private property
expect(monitor.stopped).toBe(true);
@@ -2,21 +2,38 @@ import { inTest, type Logger } from '@n8n/backend-common';
import type { DatabaseConfig } from '@n8n/config';
import { Time } from '@n8n/constants';
import type { DataSource } from '@n8n/typeorm';
import type { PostgresDriver } from '@n8n/typeorm/driver/postgres/PostgresDriver';
import type { ErrorReporter } from 'n8n-core';
import { ensureError, OperationalError } from 'n8n-workflow';
import { setTimeout as setTimeoutP } from 'timers/promises';
const MAX_PING_FAILURES_BEFORE_RECOVERY = 3;
const MIN_RECOVERY_BACKOFF_MS = 1_000;
const MAX_RECOVERY_BACKOFF_MS = 30_000;
/** The chokepoint every TypeORM query funnels through to acquire a master connection. */
type ObtainMasterConnection = PostgresDriver['obtainMasterConnection'];
/**
* Error messages a connection acquisition throws when it reaches a pool that recovery
* has already torn down. These are matched by text because neither pg-pool nor TypeORM
* surfaces a stable error code for them; the strings are pinned by the integration test
* against a real driver, so a driver upgrade that renamed them would fail loudly there.
*/
const POOL_TORN_DOWN_MESSAGE = 'Cannot use a pool after calling end on the pool';
const DRIVER_NOT_CONNECTED_MESSAGE = 'Driver not Connected';
/**
* Watches a DataSource and recovers it when the connection goes bad.
* - Pings on `databaseConfig.pingIntervalSeconds`, races against `databaseConfig.pingTimeoutMs`.
* - After `MAX_PING_FAILURES_BEFORE_RECOVERY` consecutive failures, destroys
* and reinitializes the DataSource with exponential backoff.
* - After `databaseConfig.pingMaxFailuresBeforeRecovery` consecutive failures, destroys
* and reinitializes the DataSource with exponential backoff
* (`databaseConfig.minRecoveryBackoffMs` .. `databaseConfig.maxRecoveryBackoffMs`).
* - Attaches an error listener to the pg pool (Postgres only) so terminated
* idle clients are caught instead of crashing the process.
* - Suspends connection acquisition during recovery so in-flight queries wait
* for the new pool instead of hitting the torn-down one.
* Recovery destroys and recreates the shared pool.
* Without this, any query acquiring a connection in that window throws
* `Cannot use a pool after calling end on the pool` / `Driver not Connected`.
* The wait is applied at `driver.obtainMasterConnection`
* (the single chokepoint every TypeORM query funnels through).
*
* Notifies the owner of connection transitions via `onConnectedChange`.
* The owner is responsible for supplying the initial state via `initialConnected`,
@@ -30,6 +47,29 @@ export class DbConnectionMonitor {
private stopped = false; // Latched on stop()
private stopAbortController = new AbortController();
/**
* Pending while a recovery is in progress.
* Queued connection acquisitions await it.
* `undefined` when no recovery is running, so queries pass straight through with no added latency.
*/
private pendingRecovery: Promise<void> | undefined;
private resolvePendingRecovery: (() => void) | undefined;
/**
* Handle on the in-flight recovery loop launched fire-and-forget from `ping()`.
* `stop()` awaits it after aborting so the owner's teardown is ordered
* after the loop's `destroy()`/`initialize()` rather than racing it.
* recoverDataSource` owns its own try/catch and never rejects.
*/
private recoveryPromise: Promise<void> | undefined;
/**
* The current driver's *unwrapped* `obtainMasterConnection`, refreshed on every (re)initialize.
* `initialize()` builds a brand-new driver instance,
* so a query still holding the previous driver retries against this reference.
*/
private liveObtainMasterConnection: ObtainMasterConnection | undefined;
constructor(
private readonly dataSource: DataSource,
private readonly onConnectedChange: (connected: boolean) => void,
@@ -43,22 +83,30 @@ export class DbConnectionMonitor {
start() {
this.attachPoolErrorHandler();
this.wrapConnectionAcquisition();
if (this.databaseConfig.maxRecoveryBackoffMs < this.databaseConfig.minRecoveryBackoffMs) {
this.logger.warn(
`DB_RECOVERY_BACKOFF_MAX_MS (${this.databaseConfig.maxRecoveryBackoffMs}) is below DB_RECOVERY_BACKOFF_MIN_MS (${this.databaseConfig.minRecoveryBackoffMs}); recovery will retry at a constant ${this.databaseConfig.minRecoveryBackoffMs}ms instead of backing off. Set max >= min.`,
);
}
this.logger.debug(
`Database connection monitor started (pingIntervalSeconds=${this.databaseConfig.pingIntervalSeconds}, pingTimeoutMs=${this.databaseConfig.pingTimeoutMs}, recoveryThreshold=${MAX_PING_FAILURES_BEFORE_RECOVERY})`,
`Database connection monitor started (pingIntervalSeconds=${this.databaseConfig.pingIntervalSeconds}, pingTimeoutMs=${this.databaseConfig.pingTimeoutMs}, recoveryThreshold=${this.databaseConfig.pingMaxFailuresBeforeRecovery})`,
);
if (!inTest) {
this.scheduleNextPing();
}
}
stop() {
async stop() {
this.stopped = true;
this.recovering = false;
this.stopRecovery();
this.stopAbortController.abort();
if (this.pingTimer) {
clearTimeout(this.pingTimer);
this.pingTimer = undefined;
}
// Await any in-flight recovery so it has fully unwound (its `!this.stopped` guard exits the loop) before the caller tears down the DataSource.
await this.recoveryPromise;
this.logger.debug('Database connection monitor stopped');
}
@@ -104,18 +152,18 @@ export class DbConnectionMonitor {
this.setConnected(false);
this.consecutiveFailures += 1;
this.logger.warn(
`Database ping failed (${this.consecutiveFailures}/${MAX_PING_FAILURES_BEFORE_RECOVERY}): ${ensureError(error).message}`,
`Database ping failed (${this.consecutiveFailures}/${this.databaseConfig.pingMaxFailuresBeforeRecovery}): ${ensureError(error).message}`,
);
if (!(error instanceof OperationalError)) {
this.errorReporter.error(error);
}
if (this.consecutiveFailures >= MAX_PING_FAILURES_BEFORE_RECOVERY) {
if (this.consecutiveFailures >= this.databaseConfig.pingMaxFailuresBeforeRecovery) {
this.logger.warn(
`Triggering database connection recovery after ${this.consecutiveFailures} consecutive ping failures`,
);
// Fire-and-forget; recoverDataSource owns its own try/catch/finally and never rejects.
void this.recoverDataSource();
this.recoveryPromise = this.recoverDataSource();
}
} finally {
abortController.abort();
@@ -127,7 +175,7 @@ export class DbConnectionMonitor {
if (this.recovering || this.stopped) {
return;
}
this.recovering = true;
this.startRecovery();
try {
const recoveryStart = Date.now();
@@ -140,6 +188,12 @@ export class DbConnectionMonitor {
try {
if (this.dataSource.isInitialized) {
// We deliberately don't bound this drain with a forced teardown.
// `pool.end()` does not interrupt in-flight queries:
// already-acquired clients keep running until their query finishes,
// and only *new* acquisitions are refused
// (those are handled by the acquisition wait in `wrapConnectionAcquisition`).
// So awaiting `destroy()` lets healthy queries drain on their own without us touching pg-pool internals.
await this.dataSource.destroy();
}
@@ -148,16 +202,14 @@ export class DbConnectionMonitor {
}
await this.dataSource.initialize();
this.attachPoolErrorHandler();
this.wrapConnectionAcquisition();
this.setConnected(true);
this.consecutiveFailures = 0;
recovered = true;
} catch (error) {
const wrapped = ensureError(error);
this.errorReporter.error(wrapped);
const backoff = Math.min(
MIN_RECOVERY_BACKOFF_MS * 2 ** (attempt - 1),
MAX_RECOVERY_BACKOFF_MS,
);
const backoff = this.computeBackoff(attempt);
this.logger.warn(
`Recovery attempt ${attempt} failed: ${wrapped.message}. Retrying in ${backoff}ms`,
);
@@ -183,23 +235,48 @@ export class DbConnectionMonitor {
} catch (error) {
this.errorReporter.error(ensureError(error));
} finally {
this.recovering = false;
this.stopRecovery();
}
}
/**
* Exponential backoff for the given (1-based) recovery attempt, ramping from
* `minRecoveryBackoffMs` and capped at `maxRecoveryBackoffMs`.
*
* The cap is clamped to never fall below the floor, so a misconfiguration
* (`maxRecoveryBackoffMs < minRecoveryBackoffMs`) degrades to a constant
* `minRecoveryBackoffMs` delay rather than silently collapsing every retry
* onto the smaller max value (which would defeat the floor). The
* misconfiguration is warned about once at `start()`.
*/
private computeBackoff(attempt: number) {
const { minRecoveryBackoffMs, maxRecoveryBackoffMs } = this.databaseConfig;
const ceiling = Math.max(minRecoveryBackoffMs, maxRecoveryBackoffMs);
return Math.min(minRecoveryBackoffMs * 2 ** (attempt - 1), ceiling);
}
private get isPostgres(): boolean {
return this.dataSource.options.type === 'postgres';
}
/**
* Single cast site to the Postgres driver. Only called once `isPostgres`
* is confirmed. Returns a view of the *current* driver, which
* `initialize()` swaps on every recovery.
*/
private get postgresDriver(): PostgresDriver {
return this.dataSource.driver as PostgresDriver;
}
// pg-pool emits 'error' for idle clients that fail (e.g. server-side pg_terminate_backend or RDS failover).
// Without a listener Node treats these as unhandled and crashes the process.
private attachPoolErrorHandler() {
// Postgres-only: the other supported driver (sqlite-pooled) does not expose a pool-level error stream.
if (this.dataSource.options.type !== 'postgres') {
if (!this.isPostgres) {
return;
}
const driver = this.dataSource.driver as unknown as {
master?: { on?: (event: string, handler: (cause: unknown) => void) => void };
};
const pool = driver.master;
const pool = this.postgresDriver.master;
if (!pool || typeof pool.on !== 'function') {
// Defensive: TypeORM may have renamed `driver.master` in a future release.
this.logger.warn(
@@ -209,12 +286,138 @@ export class DbConnectionMonitor {
}
pool.on('error', (cause: unknown) => {
if (!this.isPostgres || this.postgresDriver.master !== pool) {
this.logger.debug('Ignoring Postgres pool error from a pool replaced by recovery');
return;
}
this.setConnected(false);
this.logger.warn(`Postgres pool client error: ${ensureError(cause).message}`);
});
this.logger.debug('Attached pool error listener to Postgres driver');
}
/**
* Wraps the driver's `obtainMasterConnection` so connection acquisition waits
* out an in-progress recovery instead of racing the torn-down pool.
* Re-run on every (re)initialize because `initialize()` swaps in a fresh driver instance.
*
* Only master connections are guarded. TypeORM read-replica reads go through a
* separate `obtainSlaveConnection` chokepoint that we don't wrap, because n8n does
* not configure TypeORM replication.
* If it ever does, replica reads would need the same treatment.
*/
private wrapConnectionAcquisition() {
if (!this.isPostgres) {
return;
}
const driver = this.postgresDriver;
// Capture the unwrapped acquisition fn before we replace it below (bind keeps `this`).
// `bind` widens to `any` here, so we reassert TypeORM's own signature.
const original = driver.obtainMasterConnection.bind(driver) as ObtainMasterConnection;
this.liveObtainMasterConnection = original;
driver.obtainMasterConnection = async () => await this.acquireConnection(original);
}
/**
* Awaits any in-progress recovery, then acquires a connection.
* If acquisition still loses a race with `destroy()` (recovery began just after this call passed it),
* retry once against the live driver once recovery completes.
*/
private async acquireConnection(original: ObtainMasterConnection) {
await this.awaitRecovery();
try {
return await original();
} catch (error) {
if (!this.isRecoverableConnectionError(error)) {
throw error;
}
const liveObtainMasterConnection = this.liveObtainMasterConnection;
const isRecoveryRace =
this.recovering ||
this.pendingRecovery !== undefined ||
(liveObtainMasterConnection !== undefined && liveObtainMasterConnection !== original);
if (!isRecoveryRace) {
throw error;
}
await this.awaitRecovery();
// `original` may be bound to the previous (destroyed) driver.
// Prefer the live one refreshed by the latest wrapConnectionAcquisition().
return await (this.liveObtainMasterConnection ?? original)();
}
}
/**
* Waits out an in-progress recovery before a connection is acquired, bounded by `connectionAcquisitionTimeoutMs`.
* Resolves immediately when no recovery is pending.
*
* The bound matters under load: during a long outage every query parks here, so an
* unbounded wait would pile up parked acquirers for the whole outage. Once the timeout
* elapses the query rejects with an `OperationalError` (fail fast) instead of holding
* the request open. `0` disables the timeout (wait indefinitely).
*
* `stop()` resolves `pendingRecovery`, so a parked acquirer is also released by teardown.
*/
private async awaitRecovery() {
const pending = this.pendingRecovery;
if (!pending) {
return;
}
const timeoutMs = this.databaseConfig.connectionAcquisitionTimeoutMs;
if (timeoutMs <= 0) {
await pending;
return;
}
// AbortController clears the timeout timer once recovery (or stop) wins the race
const abortController = new AbortController();
try {
await Promise.race([
pending,
setTimeoutP(timeoutMs, undefined, { signal: abortController.signal }).then(() => {
throw new OperationalError(
`Timed out after ${timeoutMs}ms waiting for database connection recovery`,
);
}),
]);
} finally {
abortController.abort();
}
}
private startRecovery() {
this.recovering = true;
this.markRecoveryPending();
}
private markRecoveryPending() {
this.pendingRecovery ??= new Promise<void>(
(resolve) => (this.resolvePendingRecovery = resolve),
);
}
private stopRecovery() {
this.recovering = false;
this.clearPendingRecovery();
}
private clearPendingRecovery() {
this.resolvePendingRecovery?.();
this.resolvePendingRecovery = undefined;
this.pendingRecovery = undefined;
}
private isRecoverableConnectionError(error: unknown): boolean {
const { message } = ensureError(error);
return (
message.includes(POOL_TORN_DOWN_MESSAGE) || message.includes(DRIVER_NOT_CONNECTED_MESSAGE)
);
}
private setConnected(connected: boolean) {
if (this.connected === connected) {
return;
@@ -89,7 +89,7 @@ export class DbConnection {
}
async close() {
this.monitor?.stop();
await this.monitor?.stop();
this.monitor = undefined;
if (this.dataSource.isInitialized) {