feat(core): Add Azure storage mode for execution data (#32368)

This commit is contained in:
Iván Ovejero
2026-06-18 13:33:04 +02:00
committed by GitHub
parent 051a20e9f9
commit 1d6415cd59
32 changed files with 964 additions and 68 deletions
@@ -27,6 +27,7 @@
| Name | Type | Definition |
| ---- | ---- | ---------- |
| CHK_execution_entity_storedAt | CHECK | CHECK ((("storedAt")::text = ANY ((ARRAY['db'::character varying, 'fs'::character varying, 's3'::character varying, 'az'::character varying])::text[]))) |
| execution_entity_binaryDataSizeBytes_not_null | n | NOT NULL "binaryDataSizeBytes" |
| execution_entity_createdAt_not_null | n | NOT NULL "createdAt" |
| execution_entity_finished_not_null | n | NOT NULL finished |
@@ -34,7 +35,6 @@
| execution_entity_jsonSizeBytes_not_null | n | NOT NULL "jsonSizeBytes" |
| execution_entity_mode_not_null | n | NOT NULL mode |
| execution_entity_status_not_null | n | NOT NULL status |
| execution_entity_storedAt_check | CHECK | CHECK ((("storedAt")::text = ANY ((ARRAY['db'::character varying, 'fs'::character varying, 's3'::character varying])::text[]))) |
| execution_entity_storedAt_not_null | n | NOT NULL "storedAt" |
| execution_entity_workflowId_not_null | n | NOT NULL "workflowId" |
| fk_execution_entity_workflow_id | FOREIGN KEY | FOREIGN KEY ("workflowId") REFERENCES workflow_entity(id) ON DELETE CASCADE |
+3 -3
View File
@@ -720,13 +720,13 @@ erDiagram
VARCHAR_36_ workflowVersionId
}
"execution_entity" {
BIGINT binaryDataSizeBytes
bigint binaryDataSizeBytes
datetime_3_ createdAt
varchar_255_ deduplicationKey
datetime_3_ deletedAt
boolean finished
INTEGER id
BIGINT jsonSizeBytes
bigint jsonSizeBytes
varchar mode
varchar retryOf
varchar retrySuccessId
@@ -737,7 +737,7 @@ erDiagram
TEXT tracingContext
datetime waitTill
varchar_36_ workflowId FK
VARCHAR_36_ workflowVersionId
varchar_36_ workflowVersionId
}
"execution_metadata" {
INTEGER executionId FK
@@ -102,13 +102,13 @@ erDiagram
datetime_3_ updatedAt
}
"execution_entity" {
BIGINT binaryDataSizeBytes
bigint binaryDataSizeBytes
datetime_3_ createdAt
varchar_255_ deduplicationKey
datetime_3_ deletedAt
boolean finished
INTEGER id
BIGINT jsonSizeBytes
bigint jsonSizeBytes
varchar mode
varchar retryOf
varchar retrySuccessId
@@ -119,7 +119,7 @@ erDiagram
TEXT tracingContext
datetime waitTill
varchar_36_ workflowId FK
VARCHAR_36_ workflowVersionId
varchar_36_ workflowVersionId
}
"chat_hub_sessions" {
varchar_36_ agentId FK
@@ -52,13 +52,13 @@ erDiagram
varchar_6_ vote
}
"execution_entity" {
BIGINT binaryDataSizeBytes
bigint binaryDataSizeBytes
datetime_3_ createdAt
varchar_255_ deduplicationKey
datetime_3_ deletedAt
boolean finished
INTEGER id
BIGINT jsonSizeBytes
bigint jsonSizeBytes
varchar mode
varchar retryOf
varchar retrySuccessId
@@ -69,7 +69,7 @@ erDiagram
TEXT tracingContext
datetime waitTill
varchar_36_ workflowId FK
VARCHAR_36_ workflowVersionId
varchar_36_ workflowVersionId
}
"execution_annotation_tags" {
INTEGER annotationId PK
@@ -53,13 +53,13 @@ erDiagram
VARCHAR_36_ workflowVersionId
}
"execution_entity" {
BIGINT binaryDataSizeBytes
bigint binaryDataSizeBytes
datetime_3_ createdAt
varchar_255_ deduplicationKey
datetime_3_ deletedAt
boolean finished
INTEGER id
BIGINT jsonSizeBytes
bigint jsonSizeBytes
varchar mode
varchar retryOf
varchar retrySuccessId
@@ -70,7 +70,7 @@ erDiagram
TEXT tracingContext
datetime waitTill
varchar_36_ workflowId FK
VARCHAR_36_ workflowVersionId
varchar_36_ workflowVersionId
}
```
@@ -6,7 +6,7 @@
<summary><strong>Table Definition</strong></summary>
```sql
CREATE TABLE "execution_entity" ("id" integer PRIMARY KEY AUTOINCREMENT NOT NULL, "workflowId" varchar(36) NOT NULL, "finished" boolean NOT NULL, "mode" varchar NOT NULL, "retryOf" varchar, "retrySuccessId" varchar, "startedAt" datetime, "stoppedAt" datetime, "waitTill" datetime, "status" varchar NOT NULL, "deletedAt" datetime(3), "createdAt" datetime(3) NOT NULL DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), "storedAt" varchar(2) NOT NULL DEFAULT ('db'), "tracingContext" text, "deduplicationKey" varchar(255), "jsonSizeBytes" BIGINT NOT NULL DEFAULT 0, "workflowVersionId" VARCHAR(36) DEFAULT NULL, "binaryDataSizeBytes" BIGINT NOT NULL DEFAULT 0, CONSTRAINT "FK_c4d999a5e90784e8caccf5589de" FOREIGN KEY ("workflowId") REFERENCES "workflow_entity" ("id") ON DELETE CASCADE ON UPDATE NO ACTION)
CREATE TABLE "execution_entity" ("id" integer PRIMARY KEY AUTOINCREMENT NOT NULL, "workflowId" varchar(36) NOT NULL, "finished" boolean NOT NULL, "mode" varchar NOT NULL, "retryOf" varchar, "retrySuccessId" varchar, "startedAt" datetime, "stoppedAt" datetime, "waitTill" datetime, "status" varchar NOT NULL, "deletedAt" datetime(3), "createdAt" datetime(3) NOT NULL DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), "storedAt" varchar(2) NOT NULL DEFAULT ('db'), "tracingContext" text, "deduplicationKey" varchar(255), "jsonSizeBytes" bigint NOT NULL DEFAULT (0), "workflowVersionId" varchar(36) DEFAULT (NULL), "binaryDataSizeBytes" bigint NOT NULL DEFAULT (0), CONSTRAINT "CHK_execution_entity_storedAt" CHECK ("storedAt" IN ('db', 'fs', 's3', 'az')), CONSTRAINT "FK_c4d999a5e90784e8caccf5589de" FOREIGN KEY ("workflowId") REFERENCES "workflow_entity" ("id") ON DELETE CASCADE ON UPDATE NO ACTION)
```
</details>
@@ -15,13 +15,13 @@ CREATE TABLE "execution_entity" ("id" integer PRIMARY KEY AUTOINCREMENT NOT NULL
| Name | Type | Default | Nullable | Children | Parents | Comment |
| ---- | ---- | ------- | -------- | -------- | ------- | ------- |
| binaryDataSizeBytes | BIGINT | 0 | false | | | |
| binaryDataSizeBytes | bigint | 0 | false | | | |
| createdAt | datetime(3) | STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW') | false | | | |
| deduplicationKey | varchar(255) | | true | | | |
| deletedAt | datetime(3) | | true | | | |
| finished | boolean | | false | | | |
| id | INTEGER | | false | [chat_hub_messages](chat_hub_messages.md) [execution_annotations](execution_annotations.md) [execution_data](execution_data.md) [execution_metadata](execution_metadata.md) [test_case_execution](test_case_execution.md) | | |
| jsonSizeBytes | BIGINT | 0 | false | | | |
| jsonSizeBytes | bigint | 0 | false | | | |
| mode | varchar | | false | | | |
| retryOf | varchar | | true | | | |
| retrySuccessId | varchar | | true | | | |
@@ -32,12 +32,13 @@ CREATE TABLE "execution_entity" ("id" integer PRIMARY KEY AUTOINCREMENT NOT NULL
| tracingContext | TEXT | | true | | | |
| waitTill | datetime | | true | | | |
| workflowId | varchar(36) | | false | | [workflow_entity](workflow_entity.md) | |
| workflowVersionId | VARCHAR(36) | NULL | true | | | |
| workflowVersionId | varchar(36) | NULL | true | | | |
## Constraints
| Name | Type | Definition |
| ---- | ---- | ---------- |
| - | CHECK | CHECK ("storedAt" IN ('db', 'fs', 's3', 'az')) |
| - (Foreign key ID: 0) | FOREIGN KEY | FOREIGN KEY (workflowId) REFERENCES workflow_entity (id) ON UPDATE NO ACTION ON DELETE CASCADE MATCH NONE |
| id | PRIMARY KEY | PRIMARY KEY (id) |
@@ -64,13 +65,13 @@ erDiagram
"execution_entity" }o--|| "workflow_entity" : "FOREIGN KEY (workflowId) REFERENCES workflow_entity (id) ON UPDATE NO ACTION ON DELETE CASCADE MATCH NONE"
"execution_entity" {
BIGINT binaryDataSizeBytes
bigint binaryDataSizeBytes
datetime_3_ createdAt
varchar_255_ deduplicationKey
datetime_3_ deletedAt
boolean finished
INTEGER id
BIGINT jsonSizeBytes
bigint jsonSizeBytes
varchar mode
varchar retryOf
varchar retrySuccessId
@@ -81,7 +82,7 @@ erDiagram
TEXT tracingContext
datetime waitTill
varchar_36_ workflowId FK
VARCHAR_36_ workflowVersionId
varchar_36_ workflowVersionId
}
"chat_hub_messages" {
varchar_36_ agentId FK
@@ -47,13 +47,13 @@ erDiagram
TEXT value
}
"execution_entity" {
BIGINT binaryDataSizeBytes
bigint binaryDataSizeBytes
datetime_3_ createdAt
varchar_255_ deduplicationKey
datetime_3_ deletedAt
boolean finished
INTEGER id
BIGINT jsonSizeBytes
bigint jsonSizeBytes
varchar mode
varchar retryOf
varchar retrySuccessId
@@ -64,7 +64,7 @@ erDiagram
TEXT tracingContext
datetime waitTill
varchar_36_ workflowId FK
VARCHAR_36_ workflowVersionId
varchar_36_ workflowVersionId
}
```
@@ -79,13 +79,13 @@ erDiagram
datetime_3_ updatedAt
}
"execution_entity" {
BIGINT binaryDataSizeBytes
bigint binaryDataSizeBytes
datetime_3_ createdAt
varchar_255_ deduplicationKey
datetime_3_ deletedAt
boolean finished
INTEGER id
BIGINT jsonSizeBytes
bigint jsonSizeBytes
varchar mode
varchar retryOf
varchar retrySuccessId
@@ -96,7 +96,7 @@ erDiagram
TEXT tracingContext
datetime waitTill
varchar_36_ workflowId FK
VARCHAR_36_ workflowVersionId
varchar_36_ workflowVersionId
}
"test_run" {
boolean cancelRequested
@@ -185,13 +185,13 @@ erDiagram
varchar_36_ workflowId FK
}
"execution_entity" {
BIGINT binaryDataSizeBytes
bigint binaryDataSizeBytes
datetime_3_ createdAt
varchar_255_ deduplicationKey
datetime_3_ deletedAt
boolean finished
INTEGER id
BIGINT jsonSizeBytes
bigint jsonSizeBytes
varchar mode
varchar retryOf
varchar retrySuccessId
@@ -202,7 +202,7 @@ erDiagram
TEXT tracingContext
datetime waitTill
varchar_36_ workflowId FK
VARCHAR_36_ workflowVersionId
varchar_36_ workflowVersionId
}
"insights_metadata" {
INTEGER metaId
@@ -130,6 +130,10 @@ export class LicenseState {
return this.isLicensed('feat:executionDataS3');
}
isExecutionDataAzureLicensed() {
return this.isLicensed('feat:executionDataAz');
}
isMultiMainLicensed() {
return this.isLicensed('feat:multipleMainInstances');
}
+1
View File
@@ -21,6 +21,7 @@ export const LICENSE_FEATURES = {
DEBUG_IN_EDITOR: 'feat:debugInEditor',
BINARY_DATA_S3: 'feat:binaryDataS3',
EXECUTION_DATA_S3: 'feat:executionDataS3',
EXECUTION_DATA_AZURE: 'feat:executionDataAz',
MULTIPLE_MAIN_INSTANCES: 'feat:multipleMainInstances',
WORKER_VIEW: 'feat:workerView',
ADVANCED_PERMISSIONS: 'feat:advancedPermissions',
@@ -20,7 +20,7 @@ import type { ExecutionMetadata } from './execution-metadata';
import { WorkflowEntity } from './workflow-entity';
import { bigintStringToNumber, idStringifier } from '../utils/transformers';
export type ExecutionDataStorageLocation = 'db' | 'fs' | 's3';
export type ExecutionDataStorageLocation = 'db' | 'fs' | 's3' | 'az';
@Entity()
@Index(['workflowId', 'id'])
@@ -0,0 +1,25 @@
import type { IrreversibleMigration, MigrationContext } from '../migration-types';
const EXECUTION_ENTITY = 'execution_entity';
const STORED_AT = 'storedAt';
const STORED_AT_VALUES = ['db', 'fs', 's3', 'az'];
export class AllowAzureStoredAt1784000000034 implements IrreversibleMigration {
async up({ queryRunner, tablePrefix, schemaBuilder }: MigrationContext) {
const table = await queryRunner.getTable(`${tablePrefix}${EXECUTION_ENTITY}`);
const storedAtCheck = table?.checks.find(
(c) =>
(c.columnNames?.includes(STORED_AT) ?? false) ||
(c.expression?.includes(STORED_AT) ?? false),
);
if (table && storedAtCheck) {
await queryRunner.dropCheckConstraint(table, storedAtCheck);
}
await schemaBuilder.addEnumCheck(EXECUTION_ENTITY, STORED_AT, STORED_AT_VALUES, {
recreatesOnSqlite: true,
});
}
}
@@ -207,6 +207,7 @@ import { CreateWorkflowPublicationOutboxTable1784000000027 } from '../common/178
import { AddJsonSizeBytesAndWorkflowVersionIdToExecutionEntity1784000000029 } from '../common/1784000000029-AddJsonSizeBytesAndWorkflowVersionIdToExecutionEntity';
import { CreateAgentChatSubscriptions1784000000030 } from '../common/1784000000030-CreateAgentChatSubscriptions';
import { AddBinaryDataSizeBytesToExecutionEntity1784000000033 } from '../common/1784000000033-AddBinaryDataSizeBytesToExecutionEntity';
import { AllowAzureStoredAt1784000000034 } from '../common/1784000000034-AllowAzureStoredAt';
import type { Migration } from '../migration-types';
export const postgresMigrations: Migration[] = [
@@ -419,4 +420,5 @@ export const postgresMigrations: Migration[] = [
CreateAgentChatSubscriptions1784000000030,
AddExecutionEntityWorkflowStatusIndex1784000000031,
AddBinaryDataSizeBytesToExecutionEntity1784000000033,
AllowAzureStoredAt1784000000034,
];
@@ -0,0 +1,10 @@
import { AllowAzureStoredAt1784000000034 as BaseMigration } from '../common/1784000000034-AllowAzureStoredAt';
/**
* SQLite variant: widening the `storedAt` CHECK recreates the table, and
* `execution_entity` has incoming CASCADE FKs, so we run with foreign keys
* disabled to avoid cascading deletes of execution data/metadata/annotations.
*/
export class AllowAzureStoredAt1784000000034 extends BaseMigration {
withFKsDisabled = true as const;
}
@@ -55,6 +55,7 @@ import { LimitWorkflowVersionTriggerToContent1784000000003 } from './17840000000
import { CreateAgentHistoryTable1784000000011 } from './1784000000011-CreateAgentHistoryTable';
import { AddScopeColumnToOAuthTables1784000000026 } from './1784000000026-AddScopeColumnToOAuthTables';
import { AddProjectIdToInstanceAiThread1784000000028 } from './1784000000028-AddProjectIdToInstanceAiThread';
import { AllowAzureStoredAt1784000000034 } from './1784000000034-AllowAzureStoredAt';
import { UniqueWorkflowNames1620821879465 } from '../common/1620821879465-UniqueWorkflowNames';
import { UpdateWorkflowCredentials1630330987096 } from '../common/1630330987096-UpdateWorkflowCredentials';
import { AddNodeIds1658930531669 } from '../common/1658930531669-AddNodeIds';
@@ -403,6 +404,7 @@ const sqliteMigrations: Migration[] = [
AddJsonSizeBytesAndWorkflowVersionIdToExecutionEntity1784000000029,
CreateAgentChatSubscriptions1784000000030,
AddBinaryDataSizeBytesToExecutionEntity1784000000033,
AllowAzureStoredAt1784000000034,
];
export { sqliteMigrations };
+10 -1
View File
@@ -31,8 +31,8 @@ export class SecurityAudit extends BaseCommand<z.infer<typeof flagsSchema>> {
async init() {
await super.init();
// risk reporters read execution data, which may be stored on S3 or Azure Blob
try {
// risk reporters read execution data, which may be stored on S3
await this.initObjectStoreIfConfigured();
} catch (error) {
this.logger.warn(
@@ -40,6 +40,15 @@ export class SecurityAudit extends BaseCommand<z.infer<typeof flagsSchema>> {
{ error: ensureError(error).message },
);
}
try {
await this.initAzureStoreIfConfigured();
} catch (error) {
this.logger.warn(
'Failed to initialize Azure Blob storage. The audit will fail if any executions have data stored on Azure Blob.',
{ error: ensureError(error).message },
);
}
}
async run() {
+45 -1
View File
@@ -21,6 +21,7 @@ import {
StorageConfig,
} from 'n8n-core';
import { ObjectStoreConfig } from 'n8n-core/dist/binary-data/object-store/object-store.config';
import { AzureBlobConfig } from 'n8n-core/dist/binary-data/azure-blob/azure-blob.config';
import { ensureError, Expression, sleep, UnexpectedError } from 'n8n-workflow';
import type { AbstractServer } from '@/abstract-server';
@@ -264,9 +265,13 @@ export abstract class BaseCommand<F = never> {
}
}
const executionDataMode = Container.get(StorageConfig).mode;
const isS3Configured = Container.get(ObjectStoreConfig).bucket.name !== '';
const isExecutionDataS3Mode = Container.get(StorageConfig).mode === 's3';
const isAzureConfigured = Container.get(AzureBlobConfig).containerName !== '';
const isExecutionDataS3Mode = executionDataMode === 's3';
const isExecutionDataAzureMode = executionDataMode === 'azure';
const isExecutionDataS3Licensed = Container.get(LicenseState).isExecutionDataS3Licensed();
const isExecutionDataAzureLicensed = Container.get(LicenseState).isExecutionDataAzureLicensed();
if (isExecutionDataS3Mode) {
if (!isExecutionDataS3Licensed) {
@@ -283,6 +288,21 @@ export abstract class BaseCommand<F = never> {
}
}
if (isExecutionDataAzureMode) {
if (!isExecutionDataAzureLicensed) {
this.logger.error(
'Azure Blob execution data storage requires a valid license. Either set `N8N_EXECUTION_DATA_STORAGE_MODE` to something else, or upgrade to a license that supports this feature.',
);
process.exit(1);
}
if (!isAzureConfigured) {
this.logger.error(
'Azure Blob execution data storage requires `N8N_EXTERNAL_STORAGE_AZURE_CONTAINER_NAME` to be set.',
);
process.exit(1);
}
}
try {
const objectStoreService = await this.initObjectStoreIfConfigured();
if (objectStoreService) {
@@ -298,6 +318,17 @@ export abstract class BaseCommand<F = never> {
}
}
try {
await this.initAzureStoreIfConfigured();
} catch {
if (isExecutionDataAzureMode) {
this.logger.error(
'Failed to connect to Azure Blob storage. Please check your Azure configuration.',
);
process.exit(1);
}
}
await binaryDataService.init();
}
@@ -316,6 +347,19 @@ export abstract class BaseCommand<F = never> {
return objectStoreService;
}
protected async initAzureStoreIfConfigured() {
if (Container.get(AzureBlobConfig).containerName === '') return;
const { AzureBlobService } = await import(
'n8n-core/dist/binary-data/azure-blob/azure-blob.service.ee'
);
const azureBlobService = Container.get(AzureBlobService);
await azureBlobService.init();
const { AzureStore } = await import('@/executions/execution-data/azure-store.ee');
Container.get(ExecutionPersistence).setAzStore(Container.get(AzureStore));
}
protected async initDataDeduplicationService() {
const dataDeduplicationService = getDataDeduplicationService();
await DataDeduplicationService.init(dataDeduplicationService);
@@ -107,6 +107,7 @@ export class E2EController {
[LICENSE_FEATURES.DEBUG_IN_EDITOR]: false,
[LICENSE_FEATURES.BINARY_DATA_S3]: false,
[LICENSE_FEATURES.EXECUTION_DATA_S3]: false,
[LICENSE_FEATURES.EXECUTION_DATA_AZURE]: false,
[LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES]: false,
[LICENSE_FEATURES.WORKER_VIEW]: false,
[LICENSE_FEATURES.ADVANCED_PERMISSIONS]: false,
@@ -59,6 +59,28 @@ describe('ExecutionPersistence', () => {
});
});
describe('storedAt CHECK constraint (AllowAzureStoredAt migration)', () => {
it("allows persisting an execution with storedAt 'az' (legacy 3-value check was widened, not AND-ed)", async () => {
const executionRepo = Container.get(ExecutionRepository);
const workflow = await createWorkflow({ settings: { executionOrder: 'v1' } });
// Inserting 'az' would throw a CHECK violation if the original
// `IN ('db','fs','s3')` constraint were still present.
const result = await executionRepo.insert({
workflowId: workflow.id,
status: 'new',
finished: false,
mode: 'manual',
createdAt: new Date(),
storedAt: 'az',
});
const id = String(result.identifiers[0].id);
const found = await executionRepo.findOneBy({ id });
expect(found?.storedAt).toEqual('az');
});
});
describe('updateExistingExecution (db overwrite path)', () => {
it('should preserve the original workflowVersionId when overwriting data and workflowData', async () => {
const executionPersistence = Container.get(ExecutionPersistence);
@@ -77,7 +77,7 @@ describe('ExecutionPersistence', () => {
jest.fn().mockImplementation(async <T>(cb: (em: EntityManager) => Promise<T>) => await cb(tx));
const createPersistenceService = (
modeTag: 'db' | 'fs' | 's3',
modeTag: 'db' | 'fs' | 's3' | 'az',
dbType: DatabaseConfig['type'] = 'postgresdb',
) =>
new ExecutionPersistence(
@@ -1733,6 +1733,50 @@ describe('ExecutionPersistence', () => {
executionIds: ['exec-1'],
});
});
it('should delete execution, binary data, and az data when storedAt is az', async () => {
const azStore = mock<ExecutionDataStore>();
executionPersistence.setAzStore(azStore);
const target = { ...baseTarget, storedAt: 'az' as const };
await executionPersistence.hardDelete(target);
expect(executionRepository.deleteByIds).toHaveBeenCalledWith(['exec-1']);
expect(binaryDataService.deleteMany).toHaveBeenCalledWith([{ type: 'execution', ...target }]);
expect(azStore.delete).toHaveBeenCalledWith([target]);
expect(fsStore.delete).not.toHaveBeenCalled();
});
it('should route mixed targets including az to their respective stores', async () => {
const s3Store = mock<ExecutionDataStore>();
const azStore = mock<ExecutionDataStore>();
executionPersistence.setS3Store(s3Store);
executionPersistence.setAzStore(azStore);
const targets = [
{ workflowId: 'wf-1', executionId: 'exec-1', storedAt: 'fs' as const },
{ workflowId: 'wf-2', executionId: 'exec-2', storedAt: 's3' as const },
{ workflowId: 'wf-3', executionId: 'exec-3', storedAt: 'az' as const },
{ workflowId: 'wf-4', executionId: 'exec-4', storedAt: 'db' as const },
];
await executionPersistence.hardDelete(targets);
expect(fsStore.delete).toHaveBeenCalledWith([targets[0]]);
expect(s3Store.delete).toHaveBeenCalledWith([targets[1]]);
expect(azStore.delete).toHaveBeenCalledWith([targets[2]]);
});
it('should warn and still delete entities when az data exists but no az store is set', async () => {
const executionPersistenceWithoutAz = createPersistenceService('db');
const target = { ...baseTarget, storedAt: 'az' as const };
await executionPersistenceWithoutAz.hardDelete(target);
expect(executionRepository.deleteByIds).toHaveBeenCalledWith(['exec-1']);
expect(logger.warn).toHaveBeenCalledWith(expect.any(String), {
executionIds: ['exec-1'],
});
});
});
describe('hardDeleteBy', () => {
@@ -1782,6 +1826,38 @@ describe('ExecutionPersistence', () => {
executionIds: ['exec-1'],
});
});
it('should delete fs, s3, and az data per the refs returned by the repository', async () => {
const s3Store = mock<ExecutionDataStore>();
const azStore = mock<ExecutionDataStore>();
executionPersistence.setS3Store(s3Store);
executionPersistence.setAzStore(azStore);
const refs = [
{ workflowId: 'wf-1', executionId: 'exec-1', storedAt: 'fs' as const },
{ workflowId: 'wf-2', executionId: 'exec-2', storedAt: 's3' as const },
{ workflowId: 'wf-3', executionId: 'exec-3', storedAt: 'az' as const },
{ workflowId: 'wf-4', executionId: 'exec-4', storedAt: 'db' as const },
];
executionRepository.deleteExecutionsByFilter.mockResolvedValue(refs);
await executionPersistence.hardDeleteBy(criteria);
expect(fsStore.delete).toHaveBeenCalledWith([refs[0]]);
expect(s3Store.delete).toHaveBeenCalledWith([refs[1]]);
expect(azStore.delete).toHaveBeenCalledWith([refs[2]]);
});
it('should warn and skip az data deletion when no az store is set', async () => {
const executionPersistenceWithoutAz = createPersistenceService('db');
const refs = [{ workflowId: 'wf-1', executionId: 'exec-1', storedAt: 'az' as const }];
executionRepository.deleteExecutionsByFilter.mockResolvedValue(refs);
await expect(executionPersistenceWithoutAz.hardDeleteBy(criteria)).resolves.not.toThrow();
expect(logger.warn).toHaveBeenCalledWith(expect.any(String), {
executionIds: ['exec-1'],
});
});
});
describe('deleteUnsaved', () => {
@@ -2198,4 +2274,114 @@ describe('ExecutionPersistence', () => {
expect(fsStore.readMany).not.toHaveBeenCalled();
});
});
describe('azure mode', () => {
const azStore = mock<ExecutionDataStore>();
const createPayload: CreateExecutionPayload = {
data: runData,
workflowData,
mode: 'manual',
finished: false,
status: 'new',
workflowId: 'workflow-123',
};
const bundle = {
data: '[{"resultData":"1"},{}]',
workflowData: { id: 'wf-1', name: 's', nodes: [], connections: {}, settings: undefined },
workflowVersionId: 'v-1',
version: 1 as const,
};
const azEntity = (id = 'exec-1') =>
({
id,
workflowId: 'wf-1',
storedAt: 'az',
metadata: [],
annotation: undefined,
status: 'success',
}) as unknown as ExecutionEntity;
it('throws when an execution routes to az but no Azure store is registered', async () => {
const executionPersistence = createPersistenceService('az');
executionRepository.manager.transaction = createMockTx(createMockTransaction());
await expect(executionPersistence.create(createPayload)).rejects.toThrow(UnexpectedError);
});
it('writes via the registered Azure store on create with `storedAt: az`', async () => {
const executionPersistence = createPersistenceService('az');
executionPersistence.setAzStore(azStore);
const mockTx = createMockTransaction();
executionRepository.manager.transaction = createMockTx(mockTx);
const executionId = await executionPersistence.create(createPayload);
expect(executionId).toBe('exec-1');
expect(mockTx.insert).toHaveBeenCalledWith(
ExecutionEntity,
expect.objectContaining({ storedAt: 'az' }),
);
expect(azStore.write).toHaveBeenCalledWith(
{ workflowId: 'workflow-123', executionId: 'exec-1' },
expect.objectContaining({ workflowVersionId: 'version-abc' }),
mockTx,
);
expect(dbStore.write).not.toHaveBeenCalled();
expect(fsStore.write).not.toHaveBeenCalled();
});
it('reads via the registered Azure store on findSingleExecution', async () => {
const executionPersistence = createPersistenceService('az');
executionPersistence.setAzStore(azStore);
executionRepository.findOne.mockResolvedValue(azEntity());
azStore.read.mockResolvedValue(bundle);
const result = await executionPersistence.findSingleExecution('exec-1', {
includeData: true,
});
expect(azStore.read).toHaveBeenCalledWith({ workflowId: 'wf-1', executionId: 'exec-1' });
expect(result).toMatchObject({ data: bundle.data, workflowData: bundle.workflowData });
expect(dbStore.read).not.toHaveBeenCalled();
expect(fsStore.read).not.toHaveBeenCalled();
});
it('hard-fails a missing az bundle like fs (throw), unlike db (report + undefined)', async () => {
const executionPersistence = createPersistenceService('az');
executionPersistence.setAzStore(azStore);
const entity = azEntity();
executionRepository.findOne.mockResolvedValue(entity);
azStore.read.mockResolvedValue(null);
await expect(
executionPersistence.findSingleExecution('exec-1', { includeData: true }),
).rejects.toBeInstanceOf(MissingExecutionDataError);
expect(executionRepository.reportInvalidExecutions).not.toHaveBeenCalled();
});
it('partitions a multi-read to the Azure store', async () => {
const executionPersistence = createPersistenceService('az');
executionPersistence.setAzStore(azStore);
executionRepository.find.mockResolvedValue([azEntity('a'), azEntity('b')]);
azStore.readMany.mockResolvedValue(
new Map([
['a', bundle],
['b', bundle],
]),
);
const result = await executionPersistence.findMultipleExecutions({}, { includeData: true });
expect(azStore.readMany).toHaveBeenCalledWith([
{ workflowId: 'wf-1', executionId: 'a' },
{ workflowId: 'wf-1', executionId: 'b' },
]);
expect(result).toHaveLength(2);
expect(dbStore.readMany).not.toHaveBeenCalled();
expect(fsStore.readMany).not.toHaveBeenCalled();
});
});
});
@@ -0,0 +1,172 @@
/* eslint-disable n8n-local-rules/no-uncaught-json-parse */
/* eslint-disable @typescript-eslint/unbound-method */
import { mock } from 'jest-mock-extended';
import type { ErrorReporter } from 'n8n-core';
import type { AzureBlobService } from 'n8n-core/dist/binary-data/azure-blob/azure-blob.service.ee';
import { AzureStore } from '../azure-store.ee';
import { CorruptedExecutionDataError } from '../corrupted-execution-data.error';
import { ExecutionDataWriteError } from '../execution-data-write.error';
import { createExecutionRef } from '../types';
import { executionId, payload, ref, workflowId } from './mocks';
const keyFor = (execId: string) =>
`workflows/${workflowId}/executions/${execId}/execution_data/bundle.json`;
/** An Azure SDK error as surfaced by `AzureBlobService` (wrapped, original on `cause`). */
const wrappedAzureError = (cause: unknown) =>
new Error('Request to Azure Blob storage failed', { cause });
const blobNotFound = () =>
wrappedAzureError(Object.assign(new Error('missing'), { code: 'BlobNotFound', statusCode: 404 }));
const bundle = { ...payload, version: 1 as const };
describe('AzureStore', () => {
let azureBlob: ReturnType<typeof mock<AzureBlobService>>;
let errorReporter: ReturnType<typeof mock<ErrorReporter>>;
let azureStore: AzureStore;
beforeEach(() => {
azureBlob = mock<AzureBlobService>();
errorReporter = mock<ErrorReporter>();
azureStore = new AzureStore(azureBlob, errorReporter);
});
describe('write', () => {
it('should upload the versioned bundle as JSON at the execution key', async () => {
await azureStore.write(ref, payload);
expect(azureBlob.put).toHaveBeenCalledTimes(1);
const [key, body] = azureBlob.put.mock.calls[0];
expect(key).toBe(keyFor(executionId));
expect(JSON.parse(body.toString('utf-8'))).toMatchObject(bundle);
});
it('should wrap a put failure in `ExecutionDataWriteError`', async () => {
azureBlob.put.mockRejectedValueOnce(new Error('access denied'));
await expect(azureStore.write(ref, payload)).rejects.toThrow(ExecutionDataWriteError);
});
});
describe('read', () => {
it('should retrieve and parse the stored bundle', async () => {
azureBlob.get.mockResolvedValueOnce(Buffer.from(JSON.stringify(bundle)));
const result = await azureStore.read(ref);
expect(result).toEqual(bundle);
expect(azureBlob.get).toHaveBeenCalledWith(keyFor(executionId));
});
it('should return `null` when the blob is missing (BlobNotFound)', async () => {
azureBlob.get.mockRejectedValueOnce(blobNotFound());
expect(await azureStore.read(ref)).toBeNull();
});
it('should not treat a missing container (ContainerNotFound) as a missing blob', async () => {
// ContainerNotFound is also HTTP 404, but is an infra/config problem that must surface.
const containerNotFound = wrappedAzureError(
Object.assign(new Error('no container'), {
code: 'ContainerNotFound',
statusCode: 404,
}),
);
azureBlob.get.mockRejectedValueOnce(containerNotFound);
await expect(azureStore.read(ref)).rejects.toBe(containerNotFound);
});
it('should throw `CorruptedExecutionDataError` for non-parseable content', async () => {
azureBlob.get.mockResolvedValueOnce(Buffer.from('invalid json{{{'));
await expect(azureStore.read(ref)).rejects.toThrow(CorruptedExecutionDataError);
});
it('should rethrow a systemic error (not a missing blob)', async () => {
const throttled = wrappedAzureError(
Object.assign(new Error('slow down'), { code: 'ServerBusy' }),
);
azureBlob.get.mockRejectedValueOnce(throttled);
await expect(azureStore.read(ref)).rejects.toBe(throttled);
});
});
describe('readMany', () => {
it('should return a map keyed by executionId, omitting missing blobs', async () => {
const present = createExecutionRef(workflowId, 'exec-1');
const missing = createExecutionRef(workflowId, 'exec-2');
azureBlob.get
.mockResolvedValueOnce(Buffer.from(JSON.stringify(bundle)))
.mockRejectedValueOnce(blobNotFound());
const bundles = await azureStore.readMany([present, missing]);
expect(bundles.size).toBe(1);
expect(bundles.get('exec-1')).toEqual(bundle);
expect(bundles.has('exec-2')).toBe(false);
});
it('should report and drop a corrupted bundle instead of rejecting the whole read', async () => {
const good = createExecutionRef(workflowId, 'good');
const bad = createExecutionRef(workflowId, 'bad');
azureBlob.get
.mockResolvedValueOnce(Buffer.from(JSON.stringify(bundle)))
.mockResolvedValueOnce(Buffer.from('invalid json{{{'));
const bundles = await azureStore.readMany([good, bad]);
expect(bundles.has('good')).toBe(true);
expect(bundles.has('bad')).toBe(false);
expect(errorReporter.error).toHaveBeenCalledWith(expect.any(CorruptedExecutionDataError));
});
it('should rethrow a systemic read error instead of swallowing it', async () => {
const target = createExecutionRef(workflowId, 'exec-1');
const throttled = wrappedAzureError(
Object.assign(new Error('slow down'), { code: 'ServerBusy' }),
);
azureBlob.get.mockRejectedValueOnce(throttled);
await expect(azureStore.readMany([target])).rejects.toBe(throttled);
expect(errorReporter.error).not.toHaveBeenCalled();
});
it('should return an empty map for an empty array', async () => {
const bundles = await azureStore.readMany([]);
expect(bundles.size).toBe(0);
expect(azureBlob.get).not.toHaveBeenCalled();
});
});
describe('delete', () => {
it('should delete the blob at the execution key for a single ref', async () => {
await azureStore.delete(ref);
expect(azureBlob.delete).toHaveBeenCalledTimes(1);
expect(azureBlob.delete).toHaveBeenCalledWith(keyFor(executionId));
});
it('should delete the blob for each ref in an array', async () => {
const first = createExecutionRef(workflowId, 'exec-1');
const second = createExecutionRef(workflowId, 'exec-2');
await azureStore.delete([first, second]);
expect(azureBlob.delete).toHaveBeenCalledTimes(2);
expect(azureBlob.delete).toHaveBeenCalledWith(keyFor('exec-1'));
expect(azureBlob.delete).toHaveBeenCalledWith(keyFor('exec-2'));
});
it('should be a no-op for an empty array', async () => {
await azureStore.delete([]);
expect(azureBlob.delete).not.toHaveBeenCalled();
});
});
});
@@ -0,0 +1,114 @@
import { Service } from '@n8n/di';
import chunk from 'lodash/chunk';
import { ErrorReporter } from 'n8n-core';
import { AzureBlobService } from 'n8n-core/dist/binary-data/azure-blob/azure-blob.service.ee';
import { ensureError, jsonParse, jsonStringify } from 'n8n-workflow';
import { EXECUTION_DATA_BUNDLE_FILENAME, EXECUTION_DATA_BUNDLE_VERSION } from './constants';
import { CorruptedExecutionDataError } from './corrupted-execution-data.error';
import { ExecutionDataWriteError } from './execution-data-write.error';
import type {
ExecutionDataStore,
ExecutionRef,
ExecutionDataPayload,
ExecutionDataBundle,
} from './types';
const MAX_READ_CONCURRENCY = 50;
const MAX_DELETE_CONCURRENCY = 50;
@Service()
export class AzureStore implements ExecutionDataStore {
constructor(
private readonly azureBlob: AzureBlobService,
private readonly reporter: ErrorReporter,
) {}
async write(ref: ExecutionRef, payload: ExecutionDataPayload): Promise<number> {
const body = Buffer.from(
jsonStringify({ ...payload, version: EXECUTION_DATA_BUNDLE_VERSION }),
'utf-8',
);
try {
await this.azureBlob.put(this.key(ref), body);
} catch (error) {
throw new ExecutionDataWriteError(ref, error);
}
return body.length;
}
async read(ref: ExecutionRef): Promise<ExecutionDataBundle | null> {
let content: string;
try {
const buffer = await this.azureBlob.get(this.key(ref));
content = buffer.toString('utf-8');
} catch (error) {
if (this.isNotFound(error)) return null;
throw error;
}
try {
return jsonParse<ExecutionDataBundle>(content);
} catch (error) {
throw new CorruptedExecutionDataError(ref, error);
}
}
async readMany(refs: ExecutionRef[]) {
const bundles = new Map<string, ExecutionDataBundle>();
if (refs.length === 0) return bundles;
for (const batch of chunk(refs, MAX_READ_CONCURRENCY)) {
const bundlesInBatch = await Promise.all(batch.map(async (ref) => await this.tryRead(ref)));
for (const [idx, bundle] of bundlesInBatch.entries()) {
if (bundle) bundles.set(batch[idx].executionId, bundle);
}
}
return bundles;
}
async delete(ref: ExecutionRef | ExecutionRef[]) {
const refs = Array.isArray(ref) ? ref : [ref];
if (refs.length === 0) return;
for (const batch of chunk(refs, MAX_DELETE_CONCURRENCY)) {
// eslint-disable-next-line @typescript-eslint/promise-function-async
await Promise.all(batch.map((r) => this.azureBlob.delete(this.key(r))));
}
}
private key({ workflowId, executionId }: ExecutionRef) {
return [
'workflows',
workflowId,
'executions',
executionId,
'execution_data',
EXECUTION_DATA_BUNDLE_FILENAME,
].join('/');
}
private async tryRead(ref: ExecutionRef): Promise<ExecutionDataBundle | null> {
try {
return await this.read(ref);
} catch (error) {
if (error instanceof CorruptedExecutionDataError) {
this.reporter.error(error);
return null;
}
throw error;
}
}
private isNotFound(error: unknown): boolean {
const original = ensureError(error).cause ?? error;
if (typeof original !== 'object' || original === null) return false;
const code = 'code' in original ? original.code : undefined;
return code === 'BlobNotFound';
}
}
@@ -58,6 +58,8 @@ type UpdatableEntityColumns = Omit<
export class ExecutionPersistence {
private s3Store: ExecutionDataStore | undefined;
private azStore: ExecutionDataStore | undefined;
constructor(
private readonly executionRepository: ExecutionRepository,
private readonly binaryDataService: BinaryDataService,
@@ -75,6 +77,10 @@ export class ExecutionPersistence {
this.s3Store = store;
}
setAzStore(store: ExecutionDataStore) {
this.azStore = store;
}
/**
* Create an execution entity and persist its data to the configured storage.
* - In `db` mode, we write both entity and data to the DB in a transaction.
@@ -466,23 +472,27 @@ export class ExecutionPersistence {
const targets = Array.isArray(target) ? target : [target];
if (targets.length === 0) return;
const fsTargets = targets.filter((t) => t.storedAt === 'fs');
await Promise.all([
this.executionRepository.deleteByIds(targets.map((t) => t.executionId)),
this.binaryDataService.deleteMany(targets.map((t) => ({ type: 'execution' as const, ...t }))),
fsTargets.length > 0 ? this.fsStore.delete(fsTargets) : Promise.resolve(),
this.deleteFsData(targets.filter((t) => t.storedAt === 'fs')),
this.deleteS3Data(targets.filter((t) => t.storedAt === 's3')),
this.deleteAzData(targets.filter((t) => t.storedAt === 'az')),
]);
}
async hardDeleteBy(criteria: ExecutionDeletionCriteria) {
const refs = await this.executionRepository.deleteExecutionsByFilter(criteria);
const fsRefs = refs.filter((r) => r.storedAt === 'fs');
if (fsRefs.length > 0) await this.fsStore.delete(fsRefs);
await this.deleteFsData(refs.filter((r) => r.storedAt === 'fs'));
await this.deleteS3Data(refs.filter((r) => r.storedAt === 's3'));
await this.deleteAzData(refs.filter((r) => r.storedAt === 'az'));
}
private async deleteFsData(refs: ExecutionRef[]) {
if (refs.length === 0) return;
await this.fsStore.delete(refs);
}
/**
@@ -503,6 +513,19 @@ export class ExecutionPersistence {
await this.s3Store.delete(refs);
}
private async deleteAzData(refs: ExecutionRef[]) {
if (refs.length === 0) return;
if (!this.azStore) {
this.logger.warn('Skipped deleting Azure execution data - Azure store is not initialized', {
executionIds: refs.map((r) => r.executionId),
});
return;
}
await this.azStore.delete(refs);
}
private async updateEntityOnly(
executionId: string,
execution: Partial<IExecutionResponse>,
@@ -716,6 +739,13 @@ export class ExecutionPersistence {
);
}
return this.s3Store;
case 'az':
if (!this.azStore) {
throw new UnexpectedError(
'Execution data is stored on Azure Blob Storage but the Azure store is not initialized. Check that Azure is configured.',
);
}
return this.azStore;
}
const _exhaustive: never = location;
throw new Error(`Unknown storage location: ${String(_exhaustive)}`);
@@ -43,7 +43,7 @@ export class PrometheusExecutionDataMetricsService implements PrometheusMetricsC
labelNames: ['mode'],
});
for (const mode of ['db', 'fs', 's3'] as const) {
for (const mode of ['db', 'fs', 's3', 'az'] as const) {
for (const result of ['success', 'failure'] as const) {
readsTotal.inc({ mode, result }, 0);
writesTotal.inc({ mode, result }, 0);
@@ -80,6 +80,7 @@ export class PrometheusExecutionDataMetricsService implements PrometheusMetricsC
storageModeGauge.set({ mode: 'db' }, 0);
storageModeGauge.set({ mode: 'fs' }, 0);
storageModeGauge.set({ mode: 's3' }, 0);
storageModeGauge.set({ mode: 'az' }, 0);
storageModeGauge.set({ mode: this.storageConfig.modeTag }, 1);
this.eventService.on(
+2
View File
@@ -53,6 +53,8 @@
},
"dependencies": {
"@aws-sdk/client-s3": "3.808.0",
"@azure/identity": "catalog:",
"@azure/storage-blob": "catalog:",
"@langchain/core": "catalog:",
"@n8n/backend-common": "workspace:*",
"@n8n/backend-network": "workspace:*",
@@ -0,0 +1,34 @@
import { Config, Env } from '@n8n/config';
@Config
export class AzureBlobConfig {
/**
* Connection string for Azure Blob storage. Takes precedence over account
* name/key when set (and is the simplest option for Azurite local testing).
*/
@Env('N8N_EXTERNAL_STORAGE_AZURE_CONNECTION_STRING')
connectionString: string = '';
/** Storage account name, used with an account key or managed identity. */
@Env('N8N_EXTERNAL_STORAGE_AZURE_ACCOUNT_NAME')
accountName: string = '';
/** Storage account key, used with the account name. */
@Env('N8N_EXTERNAL_STORAGE_AZURE_ACCOUNT_KEY')
accountKey: string = '';
/** Name of the blob container to store execution data in. */
@Env('N8N_EXTERNAL_STORAGE_AZURE_CONTAINER_NAME')
containerName: string = '';
/** Custom blob endpoint, e.g. for Azurite or sovereign clouds. */
@Env('N8N_EXTERNAL_STORAGE_AZURE_ENDPOINT')
endpoint: string = '';
/**
* Authenticate via `DefaultAzureCredential` (managed identity, env, Azure CLI)
* instead of an account key. Ignores `accountKey` when enabled.
*/
@Env('N8N_EXTERNAL_STORAGE_AZURE_AUTH_AUTO_DETECT')
authAutoDetect: boolean = false;
}
@@ -0,0 +1,102 @@
import { DefaultAzureCredential } from '@azure/identity';
import type { ContainerClient } from '@azure/storage-blob';
import { BlobServiceClient, StorageSharedKeyCredential } from '@azure/storage-blob';
import { Logger } from '@n8n/backend-common';
import { Service } from '@n8n/di';
import { ensureError, UnexpectedError, UserError } from 'n8n-workflow';
import { AzureBlobConfig } from './azure-blob.config';
@Service()
export class AzureBlobService {
private readonly containerClient: ContainerClient;
private isReady = false;
constructor(
private readonly logger: Logger,
private readonly config: AzureBlobConfig,
) {
if (config.containerName === '') {
throw new UserError(
'Azure Blob container name not configured. Please set `N8N_EXTERNAL_STORAGE_AZURE_CONTAINER_NAME`.',
);
}
this.containerClient = this.buildContainerClient();
}
private buildContainerClient(): ContainerClient {
const { connectionString, accountName, accountKey, containerName, endpoint, authAutoDetect } =
this.config;
if (connectionString) {
return BlobServiceClient.fromConnectionString(connectionString).getContainerClient(
containerName,
);
}
const url = endpoint || `https://${accountName}.blob.core.windows.net`;
const serviceClient = authAutoDetect
? new BlobServiceClient(url, new DefaultAzureCredential())
: new BlobServiceClient(url, new StorageSharedKeyCredential(accountName, accountKey));
return serviceClient.getContainerClient(containerName);
}
async init() {
await this.checkConnection();
this.isReady = true;
}
async checkConnection() {
if (this.isReady) return;
try {
this.logger.debug('Checking connection to Azure Blob container', {
container: this.config.containerName,
});
const exists = await this.containerClient.exists();
if (!exists) {
throw new UserError(
`Azure Blob container "${this.config.containerName}" does not exist or is not accessible.`,
);
}
} catch (e) {
if (e instanceof UserError) throw e;
this.handleError(e);
}
}
async put(blobName: string, body: Buffer) {
try {
await this.containerClient.getBlockBlobClient(blobName).uploadData(body, {
blobHTTPHeaders: { blobContentType: 'application/json' },
});
} catch (e) {
this.handleError(e);
}
}
async get(blobName: string): Promise<Buffer> {
try {
return await this.containerClient.getBlockBlobClient(blobName).downloadToBuffer();
} catch (e) {
this.handleError(e);
}
}
async delete(blobName: string) {
try {
await this.containerClient.getBlockBlobClient(blobName).deleteIfExists();
} catch (e) {
this.handleError(e);
}
}
private handleError(e: unknown): never {
const error = ensureError(e);
throw new UnexpectedError(`Request to Azure Blob storage failed: ${error.message}`, {
cause: error,
});
}
}
+4 -4
View File
@@ -7,19 +7,19 @@ import { z } from 'zod';
import { InstanceSettings } from '@/instance-settings';
import { StoragePathError } from '@/storage-path-conflict.error';
export const EXECUTION_DATA_STORAGE_MODES = ['database', 'filesystem', 's3'] as const;
export const EXECUTION_DATA_STORAGE_MODES = ['database', 'filesystem', 's3', 'azure'] as const;
const modeSchema = z.enum(EXECUTION_DATA_STORAGE_MODES);
const MODE_TAGS = { database: 'db', filesystem: 'fs', s3: 's3' } as const;
const MODE_TAGS = { database: 'db', filesystem: 'fs', s3: 's3', azure: 'az' } as const;
@Config
export class StorageConfig {
/** Mode for storing execution data: 'database' (default), 'filesystem', or 's3'. */
/** Mode for storing execution data: 'database' (default), 'filesystem', 's3', or 'azure'. */
@Env('N8N_EXECUTION_DATA_STORAGE_MODE', modeSchema)
mode: z.infer<typeof modeSchema> = 'database';
get modeTag(): 'db' | 'fs' | 's3' {
get modeTag(): 'db' | 'fs' | 's3' | 'az' {
return MODE_TAGS[this.mode];
}
+1 -1
View File
@@ -108,7 +108,7 @@ export type ExecutionError =
| NodeOperationError
| NodeApiError;
export type ExecutionStorageLocation = 'db' | 'fs' | 's3';
export type ExecutionStorageLocation = 'db' | 'fs' | 's3' | 'az';
// Get used to gives nodes access to credentials
export interface IGetCredentials {
+154 -21
View File
@@ -45,6 +45,9 @@ catalogs:
'@ai-sdk/xai':
specifier: ^3.0.93
version: 3.0.93
'@azure/storage-blob':
specifier: ^12.32.0
version: 12.32.0
'@chat-adapter/linear':
specifier: ^4.28.1
version: 4.28.1
@@ -897,7 +900,7 @@ importers:
version: 1.0.27(@langchain/core@1.1.41(@opentelemetry/api@1.9.0)(@opentelemetry/exporter-trace-otlp-proto@0.217.0(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@2.7.1(@opentelemetry/api@1.9.0))(openai@6.34.0(ws@8.21.0(bufferutil@4.0.9)(utf-8-validate@5.0.10))(zod@3.25.67))(ws@8.21.0(bufferutil@4.0.9)(utf-8-validate@5.0.10)))(@opentelemetry/api@1.9.0)(@opentelemetry/exporter-trace-otlp-proto@0.217.0(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@2.7.1(@opentelemetry/api@1.9.0))(cheerio@1.0.0)(openai@6.34.0(ws@8.21.0(bufferutil@4.0.9)(utf-8-validate@5.0.10))(zod@3.25.67))(ws@8.21.0(bufferutil@4.0.9)(utf-8-validate@5.0.10))
'@langchain/community':
specifier: 'catalog:'
version: 1.1.27(1f6ae79d0415a171031ec3fa8aff176f)
version: 1.1.27(3f523e2bcc201c8262a8c15a9a626dc9)
'@langchain/core':
specifier: 'catalog:'
version: 1.1.41(@opentelemetry/api@1.9.0)(@opentelemetry/exporter-trace-otlp-proto@0.217.0(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@2.7.1(@opentelemetry/api@1.9.0))(openai@6.34.0(ws@8.21.0(bufferutil@4.0.9)(utf-8-validate@5.0.10))(zod@3.25.67))(ws@8.21.0(bufferutil@4.0.9)(utf-8-validate@5.0.10))
@@ -2593,7 +2596,7 @@ importers:
version: 1.0.1(@langchain/core@1.1.41(@opentelemetry/api@1.9.0)(@opentelemetry/exporter-trace-otlp-proto@0.217.0(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@2.7.1(@opentelemetry/api@1.9.0))(openai@6.34.0(ws@8.21.0(bufferutil@4.0.9)(utf-8-validate@5.0.10))(zod@3.25.67))(ws@8.21.0(bufferutil@4.0.9)(utf-8-validate@5.0.10)))(encoding@0.1.13)
'@langchain/community':
specifier: 'catalog:'
version: 1.1.27(c14be831c9c3e34e46c4d20db95aa4fe)
version: 1.1.27(bf7da34b9f8c01ea3ac8b22fbd8af0a6)
'@langchain/core':
specifier: 'catalog:'
version: 1.1.41(@opentelemetry/api@1.9.0)(@opentelemetry/exporter-trace-otlp-proto@0.217.0(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@2.7.1(@opentelemetry/api@1.9.0))(openai@6.34.0(ws@8.21.0(bufferutil@4.0.9)(utf-8-validate@5.0.10))(zod@3.25.67))(ws@8.21.0(bufferutil@4.0.9)(utf-8-validate@5.0.10))
@@ -3746,6 +3749,12 @@ importers:
'@aws-sdk/client-s3':
specifier: 3.808.0
version: 3.808.0
'@azure/identity':
specifier: 4.13.0
version: 4.13.0
'@azure/storage-blob':
specifier: 'catalog:'
version: 12.32.0
'@langchain/core':
specifier: 'catalog:'
version: 1.1.41(@opentelemetry/api@1.9.0)(@opentelemetry/exporter-trace-otlp-proto@0.217.0(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@2.7.1(@opentelemetry/api@1.9.0))(openai@6.34.0(ws@8.21.0(bufferutil@4.0.9)(utf-8-validate@5.0.10))(zod@3.25.67))(ws@8.21.0(bufferutil@4.0.9)(utf-8-validate@5.0.10))
@@ -6199,6 +6208,10 @@ packages:
resolution: {integrity: sha512-ykRMW8PjVAn+RS6ww5cmK9U2CyH9p4Q88YJwvUslfuMmN98w/2rdGRLPqJYObapBCdzBVeDgYWdJnFPFb7qzpg==}
engines: {node: '>=20.0.0'}
'@azure/core-client@1.10.2':
resolution: {integrity: sha512-1D2LpsU7y9xrqKjdIbsB7PlrRePw0xsVV8p+AKTlzITrWmscajryfJCdDJB/oGwvDI5HmRo04eMMADB67uwAwQ==}
engines: {node: '>=20.0.0'}
'@azure/core-client@1.9.2':
resolution: {integrity: sha512-kRdry/rav3fUKHl/aDLd/pDLcB+4pOFwPPTVEExuMyaI5r+JBbMWqRbCY1pn5BniDaU3lRxO9eaQ1AmSMehl/w==}
engines: {node: '>=18.0.0'}
@@ -6211,6 +6224,13 @@ packages:
resolution: {integrity: sha512-5MnV1yqzZwgNLLjlizsU3QqOeQChkIXw781Fwh1xdAqJR5AA32IUaq6xv1BICJvfbHoa+JYcaij2HFkhLbNTJQ==}
engines: {node: '>=18.0.0'}
'@azure/core-http-compat@2.4.0':
resolution: {integrity: sha512-f1P96IB399YiN2ARYHP7EpZi3Bf3wH4SN2lGzrw7JVwm7bbsVYtf2iKSBwTywD2P62NOPZGHFSZi+6jjb75JuA==}
engines: {node: '>=20.0.0'}
peerDependencies:
'@azure/core-client': ^1.10.0
'@azure/core-rest-pipeline': ^1.22.0
'@azure/core-lro@2.4.0':
resolution: {integrity: sha512-F65+rYkll1dpw3RGm8/SSiSj+/QkMeYDanzS/QKlM1dmuneVyXbO46C88V1MRHluLGdMP6qfD3vDRYALn0z0tQ==}
engines: {node: '>=12.0.0'}
@@ -6219,14 +6239,26 @@ packages:
resolution: {integrity: sha512-H6Tg9eBm0brHqLy0OSAGzxIh1t4UL8eZVrSUMJ60Ra9cwq2pOskFqVpz2pYoHDsBY1jZ4V/P8LRGb5D5pmC6rg==}
engines: {node: '>=12.0.0'}
'@azure/core-paging@1.6.2':
resolution: {integrity: sha512-YKWi9YuCU04B55h25cnOYZHxXYtEvQEbKST5vqRga7hWY9ydd3FZHdeQF8pyh+acWZvppw13M/LMGx0LABUVMA==}
engines: {node: '>=18.0.0'}
'@azure/core-rest-pipeline@1.20.0':
resolution: {integrity: sha512-ASoP8uqZBS3H/8N8at/XwFr6vYrRP3syTK0EUjDXQy0Y1/AUS+QeIRThKmTNJO2RggvBBxaXDPM7YoIwDGeA0g==}
engines: {node: '>=18.0.0'}
'@azure/core-rest-pipeline@1.24.0':
resolution: {integrity: sha512-PpLsoDQ3AMmKZ0VU+0GrmqMxgp/sExjlVm4R+nLWngeoEGAzOIPVifaxKGU5gMv+nWELUoHfvrolWD+ZS/nFJg==}
engines: {node: '>=20.0.0'}
'@azure/core-tracing@1.2.0':
resolution: {integrity: sha512-UKTiEJPkWcESPYJz3X5uKRYyOcJD+4nYph+KpfdPRnQJVrZfk0KJgdnaAWKfhsBBtAf/D58Az4AvCJEmWgIBAg==}
engines: {node: '>=18.0.0'}
'@azure/core-tracing@1.3.1':
resolution: {integrity: sha512-9MWKevR7Hz8kNzzPLfX4EAtGM2b8mr50HPDBvio96bURP/9C+HjdH3sBlLSNNrvRAr5/k/svoH457gB5IKpmwQ==}
engines: {node: '>=20.0.0'}
'@azure/core-util@1.13.1':
resolution: {integrity: sha512-XPArKLzsvl0Hf0CaGyKHUyVgF7oDnhKoP85Xv6M4StF/1AhfORhZudHtOyf2s+FcbuQ9dPRAjB8J2KvRRMUK2A==}
engines: {node: '>=20.0.0'}
@@ -6251,6 +6283,10 @@ packages:
resolution: {integrity: sha512-aK4s3Xxjrx3daZr3VylxejK3vG5ExXck5WOHDJ8in/k9AqlfIyFMMT1uG7u8mNjX+QRILTIn0/Xgschfh/dQ9g==}
engines: {node: '>=12.0.0'}
'@azure/logger@1.3.0':
resolution: {integrity: sha512-fCqPIfOcLE+CGqGPd66c8bZpwAji98tZ4JI9i/mlTNTlsIWslCfpg48s/ypyLxZTump5sypjrKn2/kY7q8oAbA==}
engines: {node: '>=20.0.0'}
'@azure/monitor-opentelemetry-exporter@1.0.0-beta.32':
resolution: {integrity: sha512-Tk5Tv8KwHhKCQlXET/7ZLtjBv1Zi4lmPTadKTQ9KCURRJWdt+6hu5ze52Tlp2pVeg3mg+MRQ9vhWvVNXMZAp/A==}
engines: {node: '>=18.0.0'}
@@ -6275,6 +6311,14 @@ packages:
resolution: {integrity: sha512-SriLPKezypIsiZ+TtlFfE46uuBIap2HeaQVS78e1P7rz5OSbq0rsd52WE1mC5f7vAeLiXqv7I7oRhL3WFZEw3Q==}
engines: {node: '>=18.0.0'}
'@azure/storage-blob@12.32.0':
resolution: {integrity: sha512-80LzSNnFQye2LCCBFghAJS6jJQJ7N4bfgZ6qDMgVGRtugZ7TLDKQZ2hczMigmZH3jAcMRdma/IygsC5+0gT7Tw==}
engines: {node: '>=20.0.0'}
'@azure/storage-common@12.4.0':
resolution: {integrity: sha512-kNhJKMxQb374KOVt63CZnGIpDcrKNzJeyANLJymxE9mCJSdRGzb+Iv9oSIiCj6tNMLypr530b9ObOiA/5OvwOg==}
engines: {node: '>=20.0.0'}
'@babel/code-frame@7.29.7':
resolution: {integrity: sha512-Aup7aUOfpbAUg2ROOJN6Iw5f9DMBlzu0mIkm/malLQFN/YQgO48wCj0Kxa3sEHJvPVFg7siR+qRInwXd2qhQKw==}
engines: {node: '>=6.9.0'}
@@ -11676,8 +11720,8 @@ packages:
resolution: {integrity: sha512-Gz/Sm64+Sq/vklJu1tt9t+4R2lvnud8NbTD/ZfpZtMiUX7YeVpCA8j6NSW8ptwcoLL+NmYANwqP8DV0q/bwl2w==}
engines: {node: '>=18.0.0'}
'@typespec/ts-http-runtime@0.3.2':
resolution: {integrity: sha512-IlqQ/Gv22xUC1r/WQm4StLkYQmaaTsXAhUVsNE0+xiyf0yRFiH5++q78U3bw6bLKDCTmh0uqKB9eG9+Bt75Dkg==}
'@typespec/ts-http-runtime@0.3.6':
resolution: {integrity: sha512-jIXhD0eWQ1JA6ln/5Dltyx22UxWNrw0hZmhy2rlv6m6KgF7kplHx3g0fzi09lNmTJQRR91OlemYp3xFnvDK9og==}
engines: {node: '>=20.0.0'}
'@unrs/resolver-binding-android-arm-eabi@1.9.2':
@@ -22931,6 +22975,18 @@ snapshots:
transitivePeerDependencies:
- supports-color
'@azure/core-client@1.10.2':
dependencies:
'@azure/abort-controller': 2.1.2
'@azure/core-auth': 1.10.1
'@azure/core-rest-pipeline': 1.24.0
'@azure/core-tracing': 1.3.1
'@azure/core-util': 1.13.1
'@azure/logger': 1.3.0
tslib: 2.8.1
transitivePeerDependencies:
- supports-color
'@azure/core-client@1.9.2':
dependencies:
'@azure/abort-controller': 2.1.2
@@ -22946,8 +23002,8 @@ snapshots:
'@azure/core-http-compat@1.3.0':
dependencies:
'@azure/abort-controller': 1.1.0
'@azure/core-client': 1.9.2
'@azure/core-rest-pipeline': 1.20.0
'@azure/core-client': 1.10.2
'@azure/core-rest-pipeline': 1.24.0
transitivePeerDependencies:
- supports-color
@@ -22959,16 +23015,34 @@ snapshots:
transitivePeerDependencies:
- supports-color
'@azure/core-http-compat@2.4.0(@azure/core-client@1.10.2)(@azure/core-rest-pipeline@1.20.0)':
dependencies:
'@azure/abort-controller': 2.1.2
'@azure/core-client': 1.10.2
'@azure/core-rest-pipeline': 1.20.0
'@azure/core-http-compat@2.4.0(@azure/core-client@1.10.2)(@azure/core-rest-pipeline@1.24.0)':
dependencies:
'@azure/abort-controller': 2.1.2
'@azure/core-client': 1.10.2
'@azure/core-rest-pipeline': 1.24.0
'@azure/core-lro@2.4.0':
dependencies:
'@azure/abort-controller': 1.1.0
'@azure/logger': 1.0.3
'@azure/logger': 1.3.0
tslib: 2.8.1
transitivePeerDependencies:
- supports-color
'@azure/core-paging@1.3.0':
dependencies:
tslib: 2.8.1
'@azure/core-paging@1.6.2':
dependencies:
tslib: 2.8.1
'@azure/core-rest-pipeline@1.20.0':
dependencies:
'@azure/abort-controller': 2.1.2
@@ -22981,14 +23055,30 @@ snapshots:
transitivePeerDependencies:
- supports-color
'@azure/core-rest-pipeline@1.24.0':
dependencies:
'@azure/abort-controller': 2.1.2
'@azure/core-auth': 1.10.1
'@azure/core-tracing': 1.3.1
'@azure/core-util': 1.13.1
'@azure/logger': 1.3.0
'@typespec/ts-http-runtime': 0.3.6
tslib: 2.8.1
transitivePeerDependencies:
- supports-color
'@azure/core-tracing@1.2.0':
dependencies:
tslib: 2.8.1
'@azure/core-tracing@1.3.1':
dependencies:
tslib: 2.8.1
'@azure/core-util@1.13.1':
dependencies:
'@azure/abort-controller': 2.1.2
'@typespec/ts-http-runtime': 0.3.2
'@typespec/ts-http-runtime': 0.3.6
tslib: 2.8.1
transitivePeerDependencies:
- supports-color
@@ -23002,11 +23092,11 @@ snapshots:
dependencies:
'@azure/abort-controller': 2.1.2
'@azure/core-auth': 1.10.1
'@azure/core-client': 1.9.2
'@azure/core-rest-pipeline': 1.20.0
'@azure/core-tracing': 1.2.0
'@azure/core-client': 1.10.2
'@azure/core-rest-pipeline': 1.24.0
'@azure/core-tracing': 1.3.1
'@azure/core-util': 1.13.1
'@azure/logger': 1.0.3
'@azure/logger': 1.3.0
'@azure/msal-browser': 4.27.0
'@azure/msal-node': 3.8.4
open: 10.2.0
@@ -23018,14 +23108,14 @@ snapshots:
dependencies:
'@azure/abort-controller': 1.1.0
'@azure/core-auth': 1.10.1
'@azure/core-client': 1.9.2
'@azure/core-client': 1.10.2
'@azure/core-http-compat': 1.3.0
'@azure/core-lro': 2.4.0
'@azure/core-paging': 1.3.0
'@azure/core-rest-pipeline': 1.20.0
'@azure/core-tracing': 1.2.0
'@azure/core-rest-pipeline': 1.24.0
'@azure/core-tracing': 1.3.1
'@azure/core-util': 1.13.1
'@azure/logger': 1.0.3
'@azure/logger': 1.3.0
tslib: 2.8.1
transitivePeerDependencies:
- supports-color
@@ -23050,6 +23140,13 @@ snapshots:
dependencies:
tslib: 2.8.1
'@azure/logger@1.3.0':
dependencies:
'@typespec/ts-http-runtime': 0.3.6
tslib: 2.8.1
transitivePeerDependencies:
- supports-color
'@azure/monitor-opentelemetry-exporter@1.0.0-beta.32':
dependencies:
'@azure/core-auth': 1.10.1
@@ -23098,20 +23195,54 @@ snapshots:
dependencies:
'@azure/abort-controller': 2.1.2
'@azure/core-auth': 1.10.1
'@azure/core-client': 1.9.2
'@azure/core-client': 1.10.2
'@azure/core-http-compat': 2.1.2
'@azure/core-lro': 2.4.0
'@azure/core-paging': 1.3.0
'@azure/core-rest-pipeline': 1.24.0
'@azure/core-tracing': 1.3.1
'@azure/core-util': 1.13.1
'@azure/core-xml': 1.4.5
'@azure/logger': 1.3.0
events: 3.3.0
tslib: 2.8.1
transitivePeerDependencies:
- supports-color
'@azure/storage-blob@12.32.0':
dependencies:
'@azure/abort-controller': 2.1.2
'@azure/core-auth': 1.10.1
'@azure/core-client': 1.10.2
'@azure/core-http-compat': 2.4.0(@azure/core-client@1.10.2)(@azure/core-rest-pipeline@1.20.0)
'@azure/core-lro': 2.4.0
'@azure/core-paging': 1.6.2
'@azure/core-rest-pipeline': 1.20.0
'@azure/core-tracing': 1.2.0
'@azure/core-util': 1.13.1
'@azure/core-xml': 1.4.5
'@azure/logger': 1.0.3
'@azure/logger': 1.3.0
'@azure/storage-common': 12.4.0(@azure/core-client@1.10.2)
events: 3.3.0
tslib: 2.8.1
transitivePeerDependencies:
- supports-color
'@azure/storage-common@12.4.0(@azure/core-client@1.10.2)':
dependencies:
'@azure/abort-controller': 2.1.2
'@azure/core-auth': 1.10.1
'@azure/core-http-compat': 2.4.0(@azure/core-client@1.10.2)(@azure/core-rest-pipeline@1.24.0)
'@azure/core-rest-pipeline': 1.24.0
'@azure/core-tracing': 1.3.1
'@azure/core-util': 1.13.1
'@azure/logger': 1.3.0
events: 3.3.0
tslib: 2.8.1
transitivePeerDependencies:
- '@azure/core-client'
- supports-color
'@babel/code-frame@7.29.7':
dependencies:
'@babel/helper-validator-identifier': 7.29.7
@@ -25232,7 +25363,7 @@ snapshots:
- aws-crt
- encoding
'@langchain/community@1.1.27(1f6ae79d0415a171031ec3fa8aff176f)':
'@langchain/community@1.1.27(3f523e2bcc201c8262a8c15a9a626dc9)':
dependencies:
'@browserbasehq/stagehand': 1.14.0(@playwright/test@1.60.0)(bufferutil@4.0.9)(deepmerge@4.3.1)(dotenv@17.4.2)(encoding@0.1.13)(openai@6.34.0(ws@8.21.0(bufferutil@4.0.9)(utf-8-validate@5.0.10))(zod@3.25.67))(utf-8-validate@5.0.10)(zod@3.25.67)
'@ibm-cloud/watsonx-ai': 1.1.2
@@ -25251,6 +25382,7 @@ snapshots:
optionalDependencies:
'@aws-crypto/sha256-js': 5.2.0
'@aws-sdk/credential-provider-node': 3.936.0
'@azure/storage-blob': 12.32.0
'@browserbasehq/sdk': 2.12.0(encoding@0.1.13)
'@getzep/zep-cloud': 1.0.6(3678d81c9d030c2de36449a260273f9b)
'@google-cloud/storage': 7.12.1(encoding@0.1.13)
@@ -25283,7 +25415,7 @@ snapshots:
- '@opentelemetry/sdk-trace-base'
- peggy
'@langchain/community@1.1.27(c14be831c9c3e34e46c4d20db95aa4fe)':
'@langchain/community@1.1.27(bf7da34b9f8c01ea3ac8b22fbd8af0a6)':
dependencies:
'@browserbasehq/stagehand': 1.14.0(@playwright/test@1.60.0)(bufferutil@4.0.9)(deepmerge@4.3.1)(dotenv@17.4.2)(encoding@0.1.13)(openai@6.34.0(ws@8.21.0(bufferutil@4.0.9)(utf-8-validate@5.0.10))(zod@3.25.67))(utf-8-validate@5.0.10)(zod@3.25.67)
'@ibm-cloud/watsonx-ai': 1.1.2
@@ -25303,6 +25435,7 @@ snapshots:
'@aws-crypto/sha256-js': 5.2.0
'@aws-sdk/credential-provider-node': 3.936.0
'@azure/search-documents': 12.1.0
'@azure/storage-blob': 12.32.0
'@browserbasehq/sdk': 2.12.0(encoding@0.1.13)
'@getzep/zep-cloud': 1.0.6(3678d81c9d030c2de36449a260273f9b)
'@getzep/zep-js': 0.9.0
@@ -29252,7 +29385,7 @@ snapshots:
transitivePeerDependencies:
- supports-color
'@typespec/ts-http-runtime@0.3.2':
'@typespec/ts-http-runtime@0.3.6':
dependencies:
http-proxy-agent: 7.0.2
https-proxy-agent: 7.0.6
+3 -2
View File
@@ -20,6 +20,7 @@ catalog:
'@ai-sdk/provider-utils': ^4.0.27
'@ai-sdk/xai': ^3.0.93
'@azure/identity': 4.13.0
'@azure/storage-blob': ^12.32.0
'@chat-adapter/linear': ^4.28.1
'@chat-adapter/slack': ^4.28.1
'@chat-adapter/state-memory': ^4.28.1
@@ -56,6 +57,8 @@ catalog:
'@openrouter/ai-sdk-provider': ^2.8.0
'@rudderstack/rudder-sdk-node': 3.0.5
'@sentry/node': ^10.55.0
'@stryker-mutator/core': 9.6.1
'@stryker-mutator/vitest-runner': 9.6.1
'@supabase/supabase-js': 2.50.0
'@testcontainers/k3s': ^11.13.0
'@testcontainers/kafka': ^11.13.0
@@ -85,8 +88,6 @@ catalog:
'@types/sanitize-html': ^2.11.0
'@types/uuid': ^11.0.0
'@types/xml2js': ^0.4.14
'@stryker-mutator/core': 9.6.1
'@stryker-mutator/vitest-runner': 9.6.1
'@vitest/coverage-v8': 4.1.1
agent-browser: 0.26.0
axios: 1.16.1