mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-19 07:36:52 +00:00
feat(core): Reclaim stale publication outbox leases (no-changelog) (#32536)
This commit is contained in:
@@ -32,6 +32,11 @@ export class WorkflowsConfig {
|
||||
@Env('N8N_WORKFLOW_PUBLICATION_OUTBOX_POLL_INTERVAL_MS')
|
||||
publicationOutboxPollIntervalMs: number = 15 * Time.seconds.toMilliseconds;
|
||||
|
||||
/** Seconds after which an `in_progress` workflow publication outbox record
|
||||
* is considered stale (its leader likely died) and may be reclaimed by a poll cycle. */
|
||||
@Env('N8N_WORKFLOW_PUBLICATION_OUTBOX_LEASE_SECONDS')
|
||||
publicationOutboxLeaseSeconds: number = 2 * Time.minutes.toSeconds;
|
||||
|
||||
/** Whether to disable automatic workflow saving in the editor */
|
||||
@Env('N8N_WORKFLOWS_AUTOSAVE_DISABLED')
|
||||
autosaveDisabled: boolean = false;
|
||||
|
||||
@@ -218,6 +218,7 @@ describe('GlobalConfig', () => {
|
||||
indexingBatchSize: 10,
|
||||
useWorkflowPublicationService: false,
|
||||
publicationOutboxPollIntervalMs: 15_000,
|
||||
publicationOutboxLeaseSeconds: 120,
|
||||
autosaveDisabled: false,
|
||||
},
|
||||
endpoints: {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
import { Service } from '@n8n/di';
|
||||
import { DataSource, Repository } from '@n8n/typeorm';
|
||||
import { Brackets, DataSource, Repository } from '@n8n/typeorm';
|
||||
import type { EntityManager } from '@n8n/typeorm';
|
||||
import { UnexpectedError } from 'n8n-workflow';
|
||||
|
||||
@@ -139,27 +139,39 @@ export class WorkflowPublicationOutboxRepository extends Repository<WorkflowPubl
|
||||
|
||||
private async claimWithPostgresLocking(): Promise<WorkflowPublicationOutbox | null> {
|
||||
const tableName = this.getTableName('workflow_publication_outbox');
|
||||
const leaseSeconds = this.globalConfig.workflows.publicationOutboxLeaseSeconds;
|
||||
|
||||
// TypeORM's Postgres driver returns `[rows, affectedCount]` from a raw
|
||||
// UPDATE ... RETURNING (unlike INSERT, which returns the rows directly).
|
||||
const [rows]: [WorkflowPublicationOutbox[], number] = await this.query(
|
||||
// Claim the oldest pending row whose workflow has no in-progress row,
|
||||
// so a workflow is never published concurrently. Ordering by id gives
|
||||
// or re-lease a stale in-progress row whose leader likely died (no
|
||||
// progress for longer than the lease). Reprocessing is idempotent via
|
||||
// the reconciliation diff, so re-leasing is safe. Ordering by id gives
|
||||
// FIFO: ids are monotonically assigned, so the oldest is processed first.
|
||||
`UPDATE ${tableName}
|
||||
SET "status" = '${Status.InProgress}', "updatedAt" = CURRENT_TIMESTAMP(3)
|
||||
WHERE "id" = (
|
||||
SELECT o."id" FROM ${tableName} o
|
||||
WHERE o."status" = '${Status.Pending}'
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM ${tableName} ip
|
||||
WHERE ip."workflowId" = o."workflowId" AND ip."status" = '${Status.InProgress}'
|
||||
WHERE (
|
||||
o."status" = '${Status.Pending}'
|
||||
-- skip workflows that are already being processed
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM ${tableName} ip
|
||||
WHERE ip."workflowId" = o."workflowId" AND ip."status" = '${Status.InProgress}'
|
||||
)
|
||||
)
|
||||
OR (
|
||||
-- reclaim expired leases
|
||||
o."status" = '${Status.InProgress}'
|
||||
AND o."updatedAt" < CURRENT_TIMESTAMP(3) - make_interval(secs => $1)
|
||||
)
|
||||
ORDER BY o."id" ASC
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING *`,
|
||||
[leaseSeconds],
|
||||
);
|
||||
|
||||
return rows[0] ?? null;
|
||||
@@ -168,34 +180,49 @@ export class WorkflowPublicationOutboxRepository extends Repository<WorkflowPubl
|
||||
// Two statements rather than one because `update` doesn't return the claimed
|
||||
// row. The `BEGIN IMMEDIATE` transaction serializes claimers.
|
||||
private async claimWithSqliteTransaction(): Promise<WorkflowPublicationOutbox | null> {
|
||||
const leaseSeconds = Math.round(this.globalConfig.workflows.publicationOutboxLeaseSeconds);
|
||||
|
||||
return await this.manager.transaction(async (tx) => {
|
||||
const queryBuilder = tx.createQueryBuilder(WorkflowPublicationOutbox, 'o');
|
||||
|
||||
const noInProgressSubquery = queryBuilder
|
||||
.subQuery()
|
||||
.select('1')
|
||||
.from(WorkflowPublicationOutbox, 'ip')
|
||||
.where('ip.workflowId = o.workflowId')
|
||||
.andWhere('ip.status = :inProgress')
|
||||
.getQuery();
|
||||
|
||||
// Claim the oldest pending row whose workflow has no in-progress row,
|
||||
// so a workflow is never published concurrently. Ordering by id gives
|
||||
// or re-lease a stale in-progress row whose leader likely died (no
|
||||
// progress for longer than the lease). Reprocessing is idempotent via
|
||||
// the reconciliation diff, so re-leasing is safe. Ordering by id gives
|
||||
// FIFO: ids are monotonically assigned, so the oldest is processed first.
|
||||
const record = await tx
|
||||
.createQueryBuilder(WorkflowPublicationOutbox, 'o')
|
||||
.where('o.status = :pending', { pending: Status.Pending })
|
||||
.andWhere((qb) => {
|
||||
const sub = qb
|
||||
.subQuery()
|
||||
.select('1')
|
||||
.from(WorkflowPublicationOutbox, 'ip')
|
||||
.where('ip.workflowId = o.workflowId')
|
||||
.andWhere('ip.status = :inProgress')
|
||||
.getQuery();
|
||||
return `NOT EXISTS ${sub}`;
|
||||
})
|
||||
const record = await queryBuilder
|
||||
.where(
|
||||
new Brackets((qb) => {
|
||||
qb.where('o.status = :pending', { pending: Status.Pending }).andWhere(
|
||||
`NOT EXISTS ${noInProgressSubquery}`,
|
||||
);
|
||||
}),
|
||||
)
|
||||
.orWhere(
|
||||
new Brackets((qb) => {
|
||||
qb.where('o.status = :inProgress').andWhere(
|
||||
"o.updatedAt < STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', :leaseModifier)",
|
||||
{ leaseModifier: `-${leaseSeconds} seconds` },
|
||||
);
|
||||
}),
|
||||
)
|
||||
.setParameter('inProgress', Status.InProgress)
|
||||
.orderBy('o.id', 'ASC')
|
||||
.getOne();
|
||||
|
||||
if (!record) return null;
|
||||
|
||||
await tx.update(
|
||||
WorkflowPublicationOutbox,
|
||||
{ id: record.id, status: Status.Pending },
|
||||
{ status: Status.InProgress },
|
||||
);
|
||||
// `{ id }` (not `{ id, status: Pending }`) so a reclaimed in-progress
|
||||
// row is re-leased too. TypeORM bumps `updatedAt` on `.update()`.
|
||||
await tx.update(WorkflowPublicationOutbox, { id: record.id }, { status: Status.InProgress });
|
||||
record.status = Status.InProgress;
|
||||
return record;
|
||||
});
|
||||
|
||||
+76
@@ -1,4 +1,5 @@
|
||||
import { createActiveWorkflow, createWorkflow, testDb } from '@n8n/backend-test-utils';
|
||||
import { WorkflowsConfig } from '@n8n/config';
|
||||
import { WorkflowPublicationOutboxRepository, WorkflowRepository } from '@n8n/db';
|
||||
import { Container } from '@n8n/di';
|
||||
import assert from 'node:assert';
|
||||
@@ -146,6 +147,81 @@ describe('WorkflowPublicationOutboxRepository', () => {
|
||||
await expect(repository.markFailed(claimed.id, 'boom')).rejects.toThrow();
|
||||
});
|
||||
|
||||
describe('stale in_progress lease reclaim', () => {
|
||||
let workflowsConfig: WorkflowsConfig;
|
||||
let originalLeaseSeconds: number;
|
||||
|
||||
// Backdate `updatedAt` via raw SQL: `.update()` would re-stamp it.
|
||||
const backdateUpdatedAt = async (id: number) => {
|
||||
await repository.query(
|
||||
`UPDATE ${repository.metadata.tableName} SET "updatedAt" = '2020-01-01 00:00:00.000' WHERE "id" = ${id}`,
|
||||
);
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
workflowsConfig = Container.get(WorkflowsConfig);
|
||||
originalLeaseSeconds = workflowsConfig.publicationOutboxLeaseSeconds;
|
||||
workflowsConfig.publicationOutboxLeaseSeconds = 60;
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
workflowsConfig.publicationOutboxLeaseSeconds = originalLeaseSeconds;
|
||||
});
|
||||
|
||||
it('reclaims a stale in_progress record', async () => {
|
||||
await repository.enqueue('wf-1', 'v-1');
|
||||
const claimed = await repository.claimNextPendingRecord();
|
||||
assert(claimed);
|
||||
await backdateUpdatedAt(claimed.id);
|
||||
|
||||
const reclaimed = await repository.claimNextPendingRecord();
|
||||
|
||||
expect(reclaimed?.id).toBe(claimed.id);
|
||||
expect(reclaimed?.status).toBe('in_progress');
|
||||
});
|
||||
|
||||
it('does not reclaim a fresh in_progress record', async () => {
|
||||
await repository.enqueue('wf-1', 'v-1');
|
||||
const claimed = await repository.claimNextPendingRecord();
|
||||
assert(claimed);
|
||||
|
||||
// Just claimed, so it is within the lease window.
|
||||
expect(await repository.claimNextPendingRecord()).toBeNull();
|
||||
});
|
||||
|
||||
it('bumps updatedAt on reclaim so it is not immediately reclaimable again', async () => {
|
||||
await repository.enqueue('wf-1', 'v-1');
|
||||
const claimed = await repository.claimNextPendingRecord();
|
||||
assert(claimed);
|
||||
await backdateUpdatedAt(claimed.id);
|
||||
|
||||
const reclaimed = await repository.claimNextPendingRecord();
|
||||
assert(reclaimed);
|
||||
|
||||
// Reclaim refreshed updatedAt, so it is fresh again.
|
||||
expect(await repository.claimNextPendingRecord()).toBeNull();
|
||||
});
|
||||
|
||||
it('reclaims the stale in_progress only, leaving a newer pending record untouched', async () => {
|
||||
await repository.enqueue('wf-1', 'v-1');
|
||||
const claimed = await repository.claimNextPendingRecord();
|
||||
assert(claimed);
|
||||
await repository.enqueue('wf-1', 'v-2');
|
||||
await backdateUpdatedAt(claimed.id);
|
||||
|
||||
const reclaimed = await repository.claimNextPendingRecord();
|
||||
expect(reclaimed?.id).toBe(claimed.id);
|
||||
expect(reclaimed?.publishedVersionId).toBe('v-1');
|
||||
|
||||
// No second in_progress row was created; the pending record is untouched.
|
||||
const inProgress = await repository.find({ where: { status: 'in_progress' } });
|
||||
expect(inProgress).toHaveLength(1);
|
||||
const pending = await repository.find({ where: { status: 'pending' } });
|
||||
expect(pending).toHaveLength(1);
|
||||
expect(pending[0].publishedVersionId).toBe('v-2');
|
||||
});
|
||||
});
|
||||
|
||||
// TODO: cover Postgres `FOR UPDATE SKIP LOCKED` concurrency control under
|
||||
// parallel claimers in a follow-up.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user