refactor(core): Migrate log-streaming webhook destination to OutboundHttp (#32407)

This commit is contained in:
Lorent Lempereur
2026-06-18 17:33:17 +02:00
committed by GitHub
parent 038b623a1d
commit fc47494df6
11 changed files with 341 additions and 104 deletions
@@ -255,6 +255,31 @@ describe('convertN8nRequestToAxios', () => {
expect(axiosConfig.maxRedirects).toBe(0);
});
test('should honor a configured maxRedirects limit', () => {
const requestOptions: IHttpRequestOptions = {
method: 'GET',
url: 'https://example.com',
maxRedirects: 3,
};
const axiosConfig = convertN8nRequestToAxios(requestOptions);
expect(axiosConfig.maxRedirects).toBe(3);
});
test('should let disableFollowRedirect win over maxRedirects', () => {
const requestOptions: IHttpRequestOptions = {
method: 'GET',
url: 'https://example.com',
disableFollowRedirect: true,
maxRedirects: 3,
};
const axiosConfig = convertN8nRequestToAxios(requestOptions);
expect(axiosConfig.maxRedirects).toBe(0);
});
test('should handle SSL certificate validation', () => {
const requestOptions: IHttpRequestOptions = {
method: 'GET',
@@ -76,6 +76,8 @@ export function convertN8nRequestToAxios(
if (n8nRequest.disableFollowRedirect === true) {
axiosRequest.maxRedirects = 0;
} else if (n8nRequest.maxRedirects !== undefined) {
axiosRequest.maxRedirects = n8nRequest.maxRedirects;
}
if (n8nRequest.encoding !== undefined) {
@@ -1,4 +1,5 @@
import { Logger } from '@n8n/backend-common';
import type { OutboundHttp } from '@n8n/backend-network';
import { mockInstance } from '@n8n/backend-test-utils';
import { mock } from 'jest-mock-extended';
import { MessageEventBusDestinationTypeNames } from 'n8n-workflow';
@@ -23,6 +24,7 @@ describe('LogStreamingDestinationService', () => {
confirmMessageDelivered: jest.fn(),
} as unknown as MessageEventBus;
const publisher = mock<Publisher>();
const outboundHttp = mock<OutboundHttp>();
let service: LogStreamingDestinationService;
@@ -33,6 +35,7 @@ describe('LogStreamingDestinationService', () => {
eventDestinationsRepository,
eventBus,
publisher,
outboundHttp,
);
});
@@ -97,6 +100,7 @@ describe('LogStreamingDestinationService', () => {
expect(messageEventBusDestinationFromDb).toHaveBeenCalledWith(
eventBus,
createDbEntity('webhook-1'),
outboundHttp,
);
});
@@ -1,3 +1,4 @@
import type { OutboundHttp } from '@n8n/backend-network';
import type { InstanceSettingsLoaderConfig } from '@n8n/config';
import type { AuthenticatedRequest } from '@n8n/db';
import { mock } from 'jest-mock-extended';
@@ -17,13 +18,19 @@ describe('EventBusController', () => {
const instanceSettingsLoaderConfig = mock<InstanceSettingsLoaderConfig>({
logStreamingManagedByEnv: false,
});
const outboundHttp = mock<OutboundHttp>();
let controller: EventBusController;
beforeEach(() => {
jest.clearAllMocks();
instanceSettingsLoaderConfig.logStreamingManagedByEnv = false;
controller = new EventBusController(eventBus, destinationService, instanceSettingsLoaderConfig);
controller = new EventBusController(
eventBus,
destinationService,
instanceSettingsLoaderConfig,
outboundHttp,
);
});
describe('getDestination', () => {
@@ -1,12 +1,63 @@
import type { HttpRequestClient, OutboundHttp } from '@n8n/backend-network';
import { mockInstance } from '@n8n/backend-test-utils';
import { mock } from 'jest-mock-extended';
import { MessageEventBusDestinationTypeNames } from 'n8n-workflow';
import type { MessageEventBusDestinationWebhookOptions } from 'n8n-workflow';
import type { IHttpRequestOptions, MessageEventBusDestinationWebhookOptions } from 'n8n-workflow';
import { CredentialsHelper } from '@/credentials-helper';
import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { MessageEventBusDestinationWebhook } from '../message-event-bus-destination-webhook.ee';
const mockEventBus = {} as MessageEventBus;
const createMessage = () =>
({
ts: { toISO: () => '2020-01-01T00:00:00.000Z' },
payload: { foo: 'bar' },
anonymize: () => ({ anonymized: true }),
eventName: 'n8n.workflow.success',
}) as any;
/**
* Build an `OutboundHttp` mock whose `request()` resolves to the given response
* and whose `requests()` is a spy, so tests can assert the SSRF option and the
* mapped request options.
*/
const mockOutboundHttp = (
response: { statusCode: number; body: unknown } = { statusCode: 200, body: {} },
) => {
const request = jest.fn().mockResolvedValue(response);
const requests = jest.fn().mockReturnValue(mock<HttpRequestClient>({ request }));
const outboundHttp = mock<OutboundHttp>({ requests });
return { outboundHttp, requests, request };
};
/**
* Drive a single `receiveFromEventBus` so the destination builds and sends its
* request through the injected facade, returning the captured request options.
*/
const sendThroughDestination = async (options: MessageEventBusDestinationWebhookOptions) => {
const { outboundHttp, requests, request } = mockOutboundHttp();
const destination = new MessageEventBusDestinationWebhook(mockEventBus, options, outboundHttp);
await destination.receiveFromEventBus({
msg: createMessage(),
confirmCallback: jest.fn(),
} as any);
return {
destination,
requests,
sentOptions: request.mock.calls[0][0] as IHttpRequestOptions,
};
};
beforeEach(() => {
jest.clearAllMocks();
mockInstance(CredentialsHelper);
});
describe('MessageEventBusDestinationWebhook', () => {
describe('isMessageEventBusDestinationWebhookOptions', () => {
it('should identify valid webhook options', () => {
@@ -49,15 +100,148 @@ describe('MessageEventBusDestinationWebhook', () => {
const destination = MessageEventBusDestinationWebhook.deserialize(
null as any,
invalidOptions,
mock<OutboundHttp>(),
);
expect(destination).toBeNull();
});
});
describe('proxy config (buildAxiosSetting)', () => {
it('should unwrap nested proxy from fixedCollection shape (options.proxy.proxy)', () => {
const options: MessageEventBusDestinationWebhookOptions = {
describe('receiveFromEventBus', () => {
it('should send through OutboundHttp with SSRF disabled', async () => {
const { requests, sentOptions } = await sendThroughDestination({
__type: MessageEventBusDestinationTypeNames.webhook,
url: 'https://example.com/webhook',
});
expect(requests).toHaveBeenCalledWith({ ssrf: 'disabled' });
expect(sentOptions).toMatchObject({
url: 'https://example.com/webhook',
method: 'POST',
returnFullResponse: true,
json: true,
});
});
it('should map the message payload to the JSON body', async () => {
const { sentOptions } = await sendThroughDestination({
__type: MessageEventBusDestinationTypeNames.webhook,
url: 'https://example.com/webhook',
});
expect(sentOptions.json).toBe(true);
expect(sentOptions.body).toMatchObject({
payload: { foo: 'bar' },
ts: '2020-01-01T00:00:00.000Z',
});
});
it('should confirm only when the response status matches the expected code', async () => {
const confirmCallback = jest.fn();
const { outboundHttp } = mockOutboundHttp({ statusCode: 418, body: {} });
const destination = new MessageEventBusDestinationWebhook(
mockEventBus,
{
__type: MessageEventBusDestinationTypeNames.webhook,
url: 'https://example.com/webhook',
responseCodeMustMatch: true,
expectedStatusCode: 200,
},
outboundHttp,
);
const result = await destination.receiveFromEventBus({
msg: createMessage(),
confirmCallback,
} as any);
expect(result).toBe(false);
expect(confirmCallback).not.toHaveBeenCalled();
});
});
describe('TLS option mapping', () => {
it('should map allowUnauthorizedCerts to skipSslCertificateValidation', async () => {
const { sentOptions } = await sendThroughDestination({
__type: MessageEventBusDestinationTypeNames.webhook,
url: 'https://example.com/webhook',
options: { allowUnauthorizedCerts: true },
});
expect(sentOptions.skipSslCertificateValidation).toBe(true);
});
it('should not set skipSslCertificateValidation when certs are validated', async () => {
const { sentOptions } = await sendThroughDestination({
__type: MessageEventBusDestinationTypeNames.webhook,
url: 'https://example.com/webhook',
});
expect(sentOptions.skipSslCertificateValidation).toBeUndefined();
});
});
describe('redirect option mapping', () => {
it('should disable following redirects by default', async () => {
const { sentOptions } = await sendThroughDestination({
__type: MessageEventBusDestinationTypeNames.webhook,
url: 'https://example.com/webhook',
});
expect(sentOptions.disableFollowRedirect).toBe(true);
});
it('should follow redirects when the destination opts in', async () => {
const { sentOptions } = await sendThroughDestination({
__type: MessageEventBusDestinationTypeNames.webhook,
url: 'https://example.com/webhook',
options: { redirect: { redirect: { followRedirects: true, maxRedirects: 10 } } },
});
expect(sentOptions.disableFollowRedirect).toBe(false);
});
it('should forward the configured redirect limit when following redirects', async () => {
const { sentOptions } = await sendThroughDestination({
__type: MessageEventBusDestinationTypeNames.webhook,
url: 'https://example.com/webhook',
options: { redirect: { redirect: { followRedirects: true, maxRedirects: 10 } } },
});
expect(sentOptions.maxRedirects).toBe(10);
});
});
describe('agent option mapping', () => {
it('should map socket pool settings to agentOptions', async () => {
const { sentOptions } = await sendThroughDestination({
__type: MessageEventBusDestinationTypeNames.webhook,
url: 'https://example.com/webhook',
options: { socket: { keepAlive: false, maxSockets: 10, maxFreeSockets: 2 } },
});
expect(sentOptions.agentOptions).toMatchObject({
keepAlive: false,
maxSockets: 10,
maxFreeSockets: 2,
});
});
it('should map the socket timeout to the top-level timeout', async () => {
const { sentOptions } = await sendThroughDestination({
__type: MessageEventBusDestinationTypeNames.webhook,
url: 'https://example.com/webhook',
options: { timeout: 1234 },
});
expect(sentOptions.timeout).toBe(1234);
});
});
describe('proxy option mapping', () => {
it('should unwrap nested proxy from fixedCollection shape (options.proxy.proxy)', async () => {
const { sentOptions } = await sendThroughDestination({
__type: MessageEventBusDestinationTypeNames.webhook,
url: 'https://example.com/webhook',
options: {
@@ -69,19 +253,17 @@ describe('MessageEventBusDestinationWebhook', () => {
},
},
},
};
});
const destination = new MessageEventBusDestinationWebhook(mockEventBus, options);
expect(destination.axiosInstance.defaults.proxy).toEqual({
expect(sentOptions.proxy).toEqual({
protocol: 'http',
host: '127.0.0.1',
port: 3128,
});
});
it('should handle flat proxy from legacy DB data', () => {
const options: MessageEventBusDestinationWebhookOptions = {
it('should handle flat proxy from legacy DB data', async () => {
const { sentOptions } = await sendThroughDestination({
__type: MessageEventBusDestinationTypeNames.webhook,
url: 'https://example.com/webhook',
options: {
@@ -91,26 +273,22 @@ describe('MessageEventBusDestinationWebhook', () => {
port: 3128,
},
} as any,
};
});
const destination = new MessageEventBusDestinationWebhook(mockEventBus, options);
expect(destination.axiosInstance.defaults.proxy).toEqual({
expect(sentOptions.proxy).toEqual({
protocol: 'http',
host: '127.0.0.1',
port: 3128,
});
});
it('should set proxy to false when options.proxy is absent', () => {
const options: MessageEventBusDestinationWebhookOptions = {
it('should leave proxy unset when options.proxy is absent (env is used)', async () => {
const { sentOptions } = await sendThroughDestination({
__type: MessageEventBusDestinationTypeNames.webhook,
url: 'https://example.com/webhook',
};
});
const destination = new MessageEventBusDestinationWebhook(mockEventBus, options);
expect(destination.axiosInstance.defaults.proxy).toBe(false);
expect(sentOptions.proxy).toBeUndefined();
});
});
@@ -130,7 +308,11 @@ describe('MessageEventBusDestinationWebhook', () => {
},
};
const destination = new MessageEventBusDestinationWebhook(mockEventBus, options);
const destination = new MessageEventBusDestinationWebhook(
mockEventBus,
options,
mock<OutboundHttp>(),
);
const serialized = destination.serialize();
expect(serialized.options?.proxy).toEqual({
@@ -155,7 +337,11 @@ describe('MessageEventBusDestinationWebhook', () => {
} as any,
};
const destination = new MessageEventBusDestinationWebhook(mockEventBus, options);
const destination = new MessageEventBusDestinationWebhook(
mockEventBus,
options,
mock<OutboundHttp>(),
);
const serialized = destination.serialize();
expect(serialized.options?.proxy).toEqual({
@@ -179,7 +365,11 @@ describe('MessageEventBusDestinationWebhook', () => {
} as any,
};
const destination = new MessageEventBusDestinationWebhook(mockEventBus, options);
const destination = new MessageEventBusDestinationWebhook(
mockEventBus,
options,
mock<OutboundHttp>(),
);
const serialized = destination.serialize();
expect(serialized.options?.redirect).toEqual({
@@ -204,7 +394,11 @@ describe('MessageEventBusDestinationWebhook', () => {
},
};
const destination = new MessageEventBusDestinationWebhook(mockEventBus, options);
const destination = new MessageEventBusDestinationWebhook(
mockEventBus,
options,
mock<OutboundHttp>(),
);
const serialized = destination.serialize();
expect(serialized.options?.redirect).toEqual({
@@ -1,4 +1,5 @@
import { Logger } from '@n8n/backend-common';
import type { OutboundHttp } from '@n8n/backend-network';
import { Container } from '@n8n/di';
import { MessageEventBusDestinationTypeNames } from 'n8n-workflow';
@@ -13,6 +14,7 @@ import type { EventDestinations } from '../database/entities';
export function messageEventBusDestinationFromDb(
eventBusInstance: MessageEventBus,
dbData: EventDestinations,
outboundHttp: OutboundHttp,
): MessageEventBusDestination | null {
const destinationData = dbData.destination;
if ('__type' in destinationData) {
@@ -22,7 +24,11 @@ export function messageEventBusDestinationFromDb(
case MessageEventBusDestinationTypeNames.syslog:
return MessageEventBusDestinationSyslog.deserialize(eventBusInstance, destinationData);
case MessageEventBusDestinationTypeNames.webhook:
return MessageEventBusDestinationWebhook.deserialize(eventBusInstance, destinationData);
return MessageEventBusDestinationWebhook.deserialize(
eventBusInstance,
destinationData,
outboundHttp,
);
default:
Container.get(Logger).debug('MessageEventBusDestination __type unknown');
}
@@ -1,5 +1,6 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import type { OutboundHttp } from '@n8n/backend-network';
import {
LOGSTREAMING_DEFAULT_MAX_FREE_SOCKETS,
LOGSTREAMING_DEFAULT_MAX_SOCKETS,
@@ -7,10 +8,6 @@ import {
LOGSTREAMING_DEFAULT_SOCKET_TIMEOUT_MS,
} from '@n8n/constants';
import { Container } from '@n8n/di';
import axios from 'axios';
import type { AxiosInstance, AxiosRequestConfig, CreateAxiosDefaults, Method } from 'axios';
import { Agent as HTTPAgent, type AgentOptions as HTTPAgentOptions } from 'http';
import { Agent as HTTPSAgent, type AgentOptions as HTTPSAgentOptions } from 'https';
import { ExternalSecretsProxy } from 'n8n-core';
import { jsonParse, MessageEventBusDestinationTypeNames } from 'n8n-workflow';
import type {
@@ -19,6 +16,8 @@ import type {
MessageEventBusDestinationWebhookParameterOptions,
IWorkflowExecuteAdditionalData,
MessageEventBusDestinationWebhookOptions,
IHttpRequestMethods,
IHttpRequestOptions,
} from 'n8n-workflow';
import { CredentialsHelper } from '@/credentials-helper';
@@ -77,13 +76,12 @@ export class MessageEventBusDestinationWebhook
credentialsHelper?: CredentialsHelper;
axiosRequestOptions?: AxiosRequestConfig;
axiosInstance: AxiosInstance;
requestOptions?: IHttpRequestOptions;
constructor(
eventBusInstance: MessageEventBus,
options: MessageEventBusDestinationWebhookOptions,
private readonly outboundHttp: OutboundHttp,
) {
super(eventBusInstance, options);
this.url = options.url;
@@ -107,10 +105,6 @@ export class MessageEventBusDestinationWebhook
if (options.sendPayload) this.sendPayload = options.sendPayload;
if (options.options) this.options = options.options;
const axiosSetting = this.buildAxiosSetting(options);
this.axiosInstance = axios.create(axiosSetting);
this.logger.debug(`MessageEventBusDestinationWebhook with id ${this.getId()} initialized`);
}
@@ -125,7 +119,7 @@ export class MessageEventBusDestinationWebhook
}
/**
* Axios expects proxy as { protocol, host, port }, so unwrap the nested
* The request proxy is a flat { protocol, host, port }, so unwrap the nested
* fixedCollection shape options.proxy = { proxy: { ... } } when present.
*/
private resolveProxy(options: MessageEventBusDestinationWebhookParameterOptions | undefined) {
@@ -135,54 +129,42 @@ export class MessageEventBusDestinationWebhook
private buildAgentOptions(
options: MessageEventBusDestinationWebhookParameterOptions | undefined,
): HTTPAgentOptions {
): IHttpRequestOptions['agentOptions'] {
return {
// keepAlive to keep TCP connections alive for reuse
keepAlive: options?.socket?.keepAlive ?? true,
maxSockets: options?.socket?.maxSockets ?? LOGSTREAMING_DEFAULT_MAX_SOCKETS,
maxFreeSockets: options?.socket?.maxFreeSockets ?? LOGSTREAMING_DEFAULT_MAX_FREE_SOCKETS,
maxTotalSockets: options?.socket?.maxSockets ?? LOGSTREAMING_DEFAULT_MAX_TOTAL_SOCKETS,
// Socket timeout in milliseconds defaults to 5 seconds
timeout: options?.timeout ?? LOGSTREAMING_DEFAULT_SOCKET_TIMEOUT_MS,
};
}
private buildAxiosSetting(
axiosParameters: MessageEventBusDestinationWebhookOptions,
): CreateAxiosDefaults {
const axiosSetting: CreateAxiosDefaults = {
private buildConnectionOptions(): IHttpRequestOptions {
const requestOptions: IHttpRequestOptions = {
url: this.url,
method: this.method as IHttpRequestMethods,
headers: {},
method: axiosParameters.method as Method,
url: axiosParameters.url,
maxRedirects: 0,
} as AxiosRequestConfig;
disableFollowRedirect: true,
agentOptions: this.buildAgentOptions(this.options),
timeout: this.options?.timeout ?? LOGSTREAMING_DEFAULT_SOCKET_TIMEOUT_MS,
};
const redirectInner = this.resolveRedirect(axiosParameters.options);
const redirectInner = this.resolveRedirect(this.options);
if (redirectInner?.followRedirects) {
axiosSetting.maxRedirects = redirectInner.maxRedirects;
requestOptions.disableFollowRedirect = false;
requestOptions.maxRedirects = redirectInner.maxRedirects;
}
const proxy = this.resolveProxy(axiosParameters.options);
const proxy = this.resolveProxy(this.options);
if (proxy) {
axiosSetting.proxy = proxy;
requestOptions.proxy = proxy;
}
axiosSetting.timeout =
axiosParameters.options?.timeout ?? LOGSTREAMING_DEFAULT_SOCKET_TIMEOUT_MS;
const agentOptions = this.buildAgentOptions(axiosParameters.options);
if (new URL(axiosParameters.url).protocol === 'https:') {
const httpsAgentOptions: HTTPSAgentOptions = { ...agentOptions };
if (axiosParameters.options?.allowUnauthorizedCerts) {
httpsAgentOptions.rejectUnauthorized = false;
}
axiosSetting.httpsAgent = new HTTPSAgent(httpsAgentOptions);
} else {
axiosSetting.httpAgent = new HTTPAgent(agentOptions);
if (this.options?.allowUnauthorizedCerts) {
requestOptions.skipSslCertificateValidation = true;
}
return axiosSetting;
return requestOptions;
}
async matchDecryptedCredentialType(credentialType: string) {
@@ -203,17 +185,12 @@ export class MessageEventBusDestinationWebhook
return null;
}
async generateAxiosOptions() {
if (this.axiosRequestOptions) {
async generateRequestOptions() {
if (this.requestOptions) {
return;
}
this.axiosRequestOptions = {
headers: {},
method: this.method as Method,
url: this.url,
maxRedirects: 0,
};
const requestOptions = this.buildConnectionOptions();
this.credentialsHelper ??= Container.get(CredentialsHelper);
@@ -223,9 +200,7 @@ export class MessageEventBusDestinationWebhook
const specifyHeaders = this.specifyHeaders;
if (this.sendQuery && this.options?.queryParameterArrays) {
Object.assign(this.axiosRequestOptions, {
qsStringifyOptions: { arrayFormat: this.options.queryParameterArrays },
});
requestOptions.arrayFormat = this.options.queryParameterArrays;
}
const parametersToKeyValue = async (
@@ -245,7 +220,7 @@ export class MessageEventBusDestinationWebhook
// Get parameters defined in the UI
if (sendQuery && this.queryParameters.parameters) {
if (specifyQuery === 'keypair') {
this.axiosRequestOptions.params = this.queryParameters.parameters.reduce(
requestOptions.qs = await this.queryParameters.parameters.reduce(
parametersToKeyValue,
Promise.resolve({}),
);
@@ -256,14 +231,14 @@ export class MessageEventBusDestinationWebhook
} catch {
this.logger.error('JSON parameter needs to be valid JSON');
}
this.axiosRequestOptions.params = jsonParse(this.jsonQuery);
requestOptions.qs = jsonParse(this.jsonQuery);
}
}
// Get parameters defined in the UI
if (sendHeaders && this.headerParameters.parameters) {
if (specifyHeaders === 'keypair') {
this.axiosRequestOptions.headers = await this.headerParameters.parameters.reduce(
requestOptions.headers = await this.headerParameters.parameters.reduce(
parametersToKeyValue,
Promise.resolve({}),
);
@@ -274,15 +249,17 @@ export class MessageEventBusDestinationWebhook
} catch {
this.logger.error('JSON parameter needs to be valid JSON');
}
this.axiosRequestOptions.headers = jsonParse(this.jsonHeaders);
requestOptions.headers = jsonParse(this.jsonHeaders);
}
}
// default for bodyContentType.raw
if (this.axiosRequestOptions.headers === undefined) {
this.axiosRequestOptions.headers = {};
if (requestOptions.headers === undefined) {
requestOptions.headers = {};
}
this.axiosRequestOptions.headers['Content-Type'] = 'application/json';
requestOptions.headers['Content-Type'] = 'application/json';
this.requestOptions = requestOptions;
}
serialize(): MessageEventBusDestinationWebhookOptions {
@@ -322,13 +299,14 @@ export class MessageEventBusDestinationWebhook
static deserialize(
eventBusInstance: MessageEventBus,
data: MessageEventBusDestinationOptions,
outboundHttp: OutboundHttp,
): MessageEventBusDestinationWebhook | null {
if (
'__type' in data &&
data.__type === MessageEventBusDestinationTypeNames.webhook &&
isMessageEventBusDestinationWebhookOptions(data)
) {
return new MessageEventBusDestinationWebhook(eventBusInstance, data);
return new MessageEventBusDestinationWebhook(eventBusInstance, data, outboundHttp);
}
return null;
}
@@ -339,27 +317,29 @@ export class MessageEventBusDestinationWebhook
let sendResult = false;
// at first run, build this.requestOptions with the destination settings
await this.generateAxiosOptions();
await this.generateRequestOptions();
// we need to make a copy of the request here, because to access the credentials
// later on we are awaiting and therefore yielding to the event loop
// therefore a race condition can occur for multiple events being processed simultaneously
const request: AxiosRequestConfig = {
...(this.axiosRequestOptions ?? {}),
const request: IHttpRequestOptions & { returnFullResponse: true } = {
...(this.requestOptions ?? { url: this.url }),
returnFullResponse: true,
};
const payload = this.anonymizeAuditMessages ? msg.anonymize() : msg.payload;
if (['PATCH', 'POST', 'PUT', 'GET'].includes(this.method.toUpperCase())) {
request.json = true;
if (this.sendPayload) {
request.data = {
request.body = {
...msg,
__type: undefined,
payload,
ts: msg.ts.toISO(),
};
} else {
request.data = {
request.body = {
...msg,
__type: undefined,
payload: undefined,
@@ -406,8 +386,8 @@ export class MessageEventBusDestinationWebhook
[httpHeaderAuth.name as string]: httpHeaderAuth.value as string,
};
} else if (httpQueryAuth) {
request.params = {
...request.params,
request.qs = {
...request.qs,
[httpQueryAuth.name as string]: httpQueryAuth.value as string,
};
} else if (httpDigestAuth) {
@@ -418,10 +398,12 @@ export class MessageEventBusDestinationWebhook
}
try {
const requestResponse = await this.axiosInstance.request(request);
const requestResponse = await this.outboundHttp
.requests({ ssrf: 'disabled' }) // The destination URL is admin-configured, so SSRF protection is disabled.
.request(request);
if (requestResponse) {
if (this.responseCodeMustMatch) {
if (requestResponse.status === this.expectedStatusCode) {
if (requestResponse.statusCode === this.expectedStatusCode) {
confirmCallback(msg, { id: this.id, name: this.label });
sendResult = true;
} else {
@@ -1,4 +1,5 @@
import { Logger } from '@n8n/backend-common';
import { OutboundHttp } from '@n8n/backend-network';
import { OnPubSubEvent } from '@n8n/decorators';
import { Service } from '@n8n/di';
import type { DeleteResult } from '@n8n/typeorm';
@@ -39,6 +40,7 @@ export class LogStreamingDestinationService {
private readonly eventDestinationsRepository: EventDestinationsRepository,
private readonly eventBus: MessageEventBus,
private readonly publisher: Publisher,
private readonly outboundHttp: OutboundHttp,
) {
this.messageHandler = this.handleMessage.bind(this);
}
@@ -51,7 +53,11 @@ export class LogStreamingDestinationService {
if (savedEventDestinations.length > 0) {
for (const destinationData of savedEventDestinations) {
try {
const destination = messageEventBusDestinationFromDb(this.eventBus, destinationData);
const destination = messageEventBusDestinationFromDb(
this.eventBus,
destinationData,
this.outboundHttp,
);
if (destination) {
this.destinations[destination.getId()] = destination;
this.logger.debug(`Loaded destination ${destination.getId()} from database`);
@@ -4,6 +4,7 @@ import {
GetDestinationQueryDto,
TestDestinationQueryDto,
} from '@n8n/api-types';
import { OutboundHttp } from '@n8n/backend-network';
import { InstanceSettingsLoaderConfig } from '@n8n/config';
import type { AuthenticatedRequest } from '@n8n/db';
import { Delete, Get, GlobalScope, Licensed, Post, Query, RestController } from '@n8n/decorators';
@@ -32,6 +33,7 @@ export class EventBusController {
private readonly eventBus: MessageEventBus,
private readonly destinationService: LogStreamingDestinationService,
private readonly instanceSettingsLoaderConfig: InstanceSettingsLoaderConfig,
private readonly outboundHttp: OutboundHttp,
) {}
private assertNotManagedByEnv() {
@@ -80,6 +82,7 @@ export class EventBusController {
new MessageEventBusDestinationWebhook(
this.eventBus,
body as MessageEventBusDestinationWebhookOptions,
this.outboundHttp,
),
);
break;
@@ -1,7 +1,7 @@
import { OutboundHttp, type HttpRequestClient } from '@n8n/backend-network';
import { mockInstance } from '@n8n/backend-test-utils';
import { GLOBAL_OWNER_ROLE, type User } from '@n8n/db';
import { Container } from '@n8n/di';
import axios from 'axios';
import { mock } from 'jest-mock-extended';
import type {
MessageEventBusDestinationSentryOptions,
@@ -31,11 +31,12 @@ import type { SuperAgentTest } from './shared/types';
import * as utils from './shared/utils';
jest.unmock('@/eventbus/message-event-bus/message-event-bus');
jest.mock('axios');
const mockAxiosInstance = mock<ReturnType<typeof axios.create>>();
const mockedAxios = axios as jest.Mocked<typeof axios>;
mockedAxios.create.mockReturnValue(mockAxiosInstance);
// The webhook destination sends through the OutboundHttp facade; capture the
// request it performs so we can assert the end-to-end delivery path.
const webhookRequest = jest.fn();
const outboundHttp = mockInstance(OutboundHttp);
outboundHttp.requests.mockReturnValue(mock<HttpRequestClient>({ request: webhookRequest }));
mockInstance(Publisher);
@@ -308,8 +309,7 @@ test('should send message to webhook ', async () => {
webhookDestination.enabled = true;
mockAxiosInstance.post.mockResolvedValue({ status: 200, data: { msg: 'OK' } });
mockAxiosInstance.request.mockResolvedValue({ status: 200, data: { msg: 'OK' } });
webhookRequest.mockResolvedValue({ statusCode: 200, body: { msg: 'OK' } });
await eventBus.send(testMessage);
await new Promise((resolve) => {
@@ -320,7 +320,14 @@ test('should send message to webhook ', async () => {
await confirmIdInAll(testMessage.id);
} else if (msg.command === 'confirmMessageSent') {
await confirmIdSent(testMessage.id);
expect(mockAxiosInstance.request).toHaveBeenCalled();
expect(outboundHttp.requests).toHaveBeenCalledWith({ ssrf: 'disabled' });
expect(webhookRequest).toHaveBeenCalledWith(
expect.objectContaining({
url: testWebhookDestination.url,
method: 'POST',
returnFullResponse: true,
}),
);
webhookDestination.enabled = false;
eventBus.logWriter.worker?.removeListener('message', handler003);
resolve(true);
+1
View File
@@ -523,6 +523,7 @@ export interface IHttpRequestOptions {
sendImmediately?: boolean;
};
disableFollowRedirect?: boolean;
maxRedirects?: number; // Ignored when `disableFollowRedirect` is `true`.
encoding?: 'arraybuffer' | 'blob' | 'document' | 'json' | 'text' | 'stream';
skipSslCertificateValidation?: boolean;
returnFullResponse?: boolean;