feat(Kafka Node): Add Schema Registry credential type (#32026)

This commit is contained in:
Csaba Tuncsik
2026-06-12 17:29:09 +02:00
committed by GitHub
parent bc90a4b8f2
commit 93d9387f3e
11 changed files with 1113 additions and 33 deletions
@@ -0,0 +1,106 @@
import type {
ICredentialDataDecryptedObject,
ICredentialTestRequest,
ICredentialType,
IHttpRequestOptions,
INodeProperties,
} from 'n8n-workflow';
export class SchemaRegistryApi implements ICredentialType {
name = 'schemaRegistryApi';
displayName = 'Schema Registry';
documentationUrl = 'schemaregistry';
properties: INodeProperties[] = [
{
displayName: 'URL',
name: 'url',
type: 'string',
default: '',
required: true,
placeholder: 'https://schema-registry-domain:8081',
description: 'URL of the schema registry',
},
{
displayName: 'Authentication',
name: 'authentication',
type: 'options',
options: [
{
name: 'Basic Auth',
value: 'basicAuth',
description:
'Username and password, e.g. a Confluent Cloud Schema Registry API key and secret',
},
{
name: 'None',
value: 'none',
},
],
default: 'basicAuth',
},
{
displayName: 'Username',
name: 'username',
type: 'string',
default: '',
required: true,
description: 'Username or Confluent Cloud API key',
displayOptions: {
show: {
authentication: ['basicAuth'],
},
},
},
{
displayName: 'Password',
name: 'password',
type: 'string',
typeOptions: {
password: true,
},
default: '',
required: true,
description: 'Password or Confluent Cloud API secret',
displayOptions: {
show: {
authentication: ['basicAuth'],
},
},
},
];
/**
* Backs the credential test request and generic HTTP reuse (e.g. the HTTP
* Request node) only. The Kafka nodes authenticate against the registry
* through the `SchemaRegistry` client constructor, not this function.
*/
async authenticate(
credentials: ICredentialDataDecryptedObject,
requestOptions: IHttpRequestOptions,
): Promise<IHttpRequestOptions> {
if (credentials.authentication === 'basicAuth') {
const { username, password } = credentials;
// Only send credentials when both fields are filled in; credentials
// created via the public API may have empty fields
if (
typeof username === 'string' &&
username !== '' &&
typeof password === 'string' &&
password !== ''
) {
requestOptions.auth = { username, password };
}
}
return requestOptions;
}
test: ICredentialTestRequest = {
request: {
baseURL: '={{$credentials.url}}',
url: '/subjects',
},
};
}
@@ -0,0 +1,106 @@
import type { ICredentialDataDecryptedObject, IHttpRequestOptions } from 'n8n-workflow';
import { SchemaRegistryApi } from '../SchemaRegistryApi.credentials';
describe('SchemaRegistryApi Credential', () => {
const credential = new SchemaRegistryApi();
it('should test the credential against the subjects endpoint', () => {
expect(credential.name).toBe('schemaRegistryApi');
expect(credential.displayName).toBe('Schema Registry');
expect(credential.test.request).toEqual({
baseURL: '={{$credentials.url}}',
url: '/subjects',
});
});
describe('authenticate', () => {
it('should set basic auth and preserve existing headers for basicAuth', async () => {
const requestOptions: IHttpRequestOptions = {
headers: {
'Content-Type': 'application/json',
},
url: '/subjects',
};
const result = await credential.authenticate(
{
url: 'https://schema-registry.local:8081',
authentication: 'basicAuth',
username: 'registry-user',
password: 'registry-password',
} satisfies ICredentialDataDecryptedObject,
requestOptions,
);
expect(result.auth).toEqual({
username: 'registry-user',
password: 'registry-password',
});
expect(result.headers).toEqual({
'Content-Type': 'application/json',
});
});
it('should not set auth when the basicAuth username is empty', async () => {
const requestOptions: IHttpRequestOptions = {
url: '/subjects',
};
const result = await credential.authenticate(
{
url: 'https://schema-registry.local:8081',
authentication: 'basicAuth',
username: '',
password: 'registry-password',
} satisfies ICredentialDataDecryptedObject,
requestOptions,
);
expect(result).toBe(requestOptions);
expect(result).not.toHaveProperty('auth');
});
it('should not set auth when the basicAuth password is empty', async () => {
const requestOptions: IHttpRequestOptions = {
url: '/subjects',
};
const result = await credential.authenticate(
{
url: 'https://schema-registry.local:8081',
authentication: 'basicAuth',
username: 'registry-user',
password: '',
} satisfies ICredentialDataDecryptedObject,
requestOptions,
);
expect(result).toBe(requestOptions);
expect(result).not.toHaveProperty('auth');
});
it('should leave the request untouched when authentication is none', async () => {
const requestOptions: IHttpRequestOptions = {
headers: {
'Content-Type': 'application/json',
},
url: '/subjects',
};
const result = await credential.authenticate(
{
url: 'https://schema-registry.local:8081',
authentication: 'none',
} satisfies ICredentialDataDecryptedObject,
requestOptions,
);
expect(result).toBe(requestOptions);
expect(result).not.toHaveProperty('auth');
expect(result.headers).toEqual({
'Content-Type': 'application/json',
});
});
});
});
+41 -10
View File
@@ -1,4 +1,4 @@
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import type { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import type { KafkaConfig, SASLOptions, TopicMessages } from 'kafkajs';
import { CompressionTypes, Kafka as apacheKafka } from 'kafkajs';
import type {
@@ -15,6 +15,7 @@ import type {
import { ApplicationError, NodeConnectionTypes, NodeOperationError } from 'n8n-workflow';
import { generatePairedItemData } from '../../utils/utilities';
import { createSchemaRegistry } from './utils';
export class Kafka implements INodeType {
description: INodeTypeDescription = {
@@ -36,6 +37,10 @@ export class Kafka implements INodeType {
required: true,
testedBy: 'kafkaConnectionTest',
},
{
name: 'schemaRegistryApi',
required: false,
},
],
properties: [
{
@@ -82,7 +87,6 @@ export class Kafka implements INodeType {
displayName: 'Schema Registry URL',
name: 'schemaRegistryUrl',
type: 'string',
required: true,
displayOptions: {
show: {
useSchemaRegistry: [true],
@@ -90,7 +94,8 @@ export class Kafka implements INodeType {
},
placeholder: 'https://schema-registry-domain:8081',
default: '',
description: 'URL of the schema registry',
description:
'URL of the schema registry. Only used when no Schema Registry credential is selected.',
},
{
displayName: 'Use Key',
@@ -312,6 +317,27 @@ export class Kafka implements INodeType {
} as SASLOptions;
}
// Resolve the registry configuration once, before the producer is set
// up, so credential misconfiguration surfaces with its own error
// message and never leaks a connected producer. The registry client
// and schema ID are loop-invariant (`eventName` is read at index 0)
let schemaRegistry: { registry: SchemaRegistry; schemaId: number } | undefined;
if (useSchemaRegistry) {
const registry = await createSchemaRegistry(
this,
this.getNodeParameter('schemaRegistryUrl', 0) as string,
);
try {
const eventName = this.getNodeParameter('eventName', 0) as string;
const schemaId = await registry.getLatestSchemaId(eventName);
schemaRegistry = { registry, schemaId };
} catch (exception) {
throw new NodeOperationError(this.getNode(), 'Verify your Schema Registry configuration');
}
}
const kafka = new apacheKafka(config);
const producer = kafka.producer();
@@ -327,15 +353,20 @@ export class Kafka implements INodeType {
message = this.getNodeParameter('message', i) as string;
}
if (useSchemaRegistry) {
if (schemaRegistry) {
let parsedMessage: unknown;
try {
const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string;
const eventName = this.getNodeParameter('eventName', 0) as string;
parsedMessage = JSON.parse(message);
} catch (exception) {
throw new NodeOperationError(this.getNode(), 'Message is not valid JSON', {
description:
'The Schema Registry encodes JSON messages. Provide a valid JSON message, or turn off "Use Schema Registry".',
itemIndex: i,
});
}
const registry = new SchemaRegistry({ host: schemaRegistryUrl });
const id = await registry.getLatestSchemaId(eventName);
message = await registry.encode(id, JSON.parse(message));
try {
message = await schemaRegistry.registry.encode(schemaRegistry.schemaId, parsedMessage);
} catch (exception) {
throw new NodeOperationError(
this.getNode(),
@@ -44,6 +44,10 @@ export class KafkaTrigger implements INodeType {
name: 'kafka',
required: true,
},
{
name: 'schemaRegistryApi',
required: false,
},
],
properties: [
{
@@ -157,7 +161,6 @@ export class KafkaTrigger implements INodeType {
displayName: 'Schema Registry URL',
name: 'schemaRegistryUrl',
type: 'string',
required: true,
displayOptions: {
show: {
useSchemaRegistry: [true],
@@ -165,7 +168,8 @@ export class KafkaTrigger implements INodeType {
},
placeholder: 'https://schema-registry-domain:8081',
default: '',
description: 'URL of the schema registry',
description:
'URL of the schema registry. Only used when no Schema Registry credential is selected.',
},
{
displayName: 'Options',
@@ -383,7 +387,7 @@ export class KafkaTrigger implements INodeType {
const config = await createConfig(this);
const kafka = new apacheKafka(config);
const registry = setSchemaRegistry(this);
const registry = await setSchemaRegistry(this);
const options = this.getNodeParameter('options', {}) as KafkaTriggerOptions;
if (options.keepBinaryData && nodeVersion < 1.2) {
@@ -3,10 +3,63 @@ import { NodeTestHarness } from '@nodes-testing/node-test-harness';
import { mock } from 'jest-mock-extended';
import type { Producer } from 'kafkajs';
import { Kafka as apacheKafka } from 'kafkajs';
import type { OnError, WorkflowTestData } from 'n8n-workflow';
import { NodeConnectionTypes } from 'n8n-workflow';
jest.mock('kafkajs');
jest.mock('@kafkajs/confluent-schema-registry');
const errorWorkflow = (
eventName: string,
message = '{"foo":"bar"}',
onError?: OnError,
): WorkflowTestData['input']['workflowData'] => ({
nodes: [
{
parameters: {},
id: 'b1dcfb89-3dda-4d18-bdd6-c12d8dee70d2',
name: 'When clicking Execute workflow',
type: 'n8n-nodes-base.manualTrigger',
typeVersion: 1,
position: [0, 0],
},
{
parameters: {
topic: 'error-test-topic',
sendInputData: false,
message,
useSchemaRegistry: true,
schemaRegistryUrl: '',
eventName,
options: {},
},
id: '49emc1d5-4d18-4f9b-a2cd-7e2f871a23ed',
name: 'Schema Registry Error',
type: 'n8n-nodes-base.kafka',
typeVersion: 1,
position: [220, 0],
...(onError ? { onError } : {}),
credentials: {
kafka: { id: 'JJBjHkOrIfcj91EX', name: 'Kafka account' },
schemaRegistryApi: { id: 'wW0eW1iZK9d3Yz2g', name: 'Schema Registry account' },
},
},
],
connections: {
'When clicking Execute workflow': {
main: [
[
{
node: 'Schema Registry Error',
type: NodeConnectionTypes.Main,
index: 0,
},
],
],
},
},
});
describe('Kafka Node', () => {
let mockProducer: jest.Mocked<Producer>;
let mockKafka: jest.Mocked<apacheKafka>;
@@ -15,6 +68,7 @@ describe('Kafka Node', () => {
let mockProducerSend: jest.Mock;
let mockProducerDisconnect: jest.Mock;
let mockRegistryEncode: jest.Mock;
let mockRegistryGetLatestSchemaId: jest.Mock;
beforeAll(() => {
mockProducerConnect = jest.fn();
@@ -33,18 +87,99 @@ describe('Kafka Node', () => {
});
mockRegistryEncode = jest.fn((_id, input) => Buffer.from(JSON.stringify(input)));
mockRegistryGetLatestSchemaId = jest.fn(async (eventName: string) => {
if (eventName === 'failing-event-name') {
throw new Error('Subject not found');
}
return 1;
});
mockRegistry = mock<SchemaRegistry>({
encode: mockRegistryEncode,
getLatestSchemaId: mockRegistryGetLatestSchemaId,
});
(apacheKafka as jest.Mock).mockReturnValue(mockKafka);
(SchemaRegistry as jest.Mock).mockReturnValue(mockRegistry);
});
new NodeTestHarness().setupTests();
const harness = new NodeTestHarness();
const schemaRegistryCredential = {
url: 'https://cred-kafka-registry.local',
authentication: 'basicAuth',
username: 'registry-user',
password: 'registry-password',
};
harness.setupTests({
credentials: { schemaRegistryApi: schemaRegistryCredential },
});
harness.setupTest({
description:
'should fail with the misconfiguration message when the credential is missing the password',
input: { workflowData: errorWorkflow('test-event-name') },
output: {
nodeData: {},
error: 'Username and password are required for Schema Registry Basic Auth',
},
credentials: {
schemaRegistryApi: { ...schemaRegistryCredential, password: '' },
},
});
harness.setupTest({
description: 'should fail with the generic message when the schema lookup fails',
input: { workflowData: errorWorkflow('failing-event-name') },
output: {
nodeData: {},
error: 'Verify your Schema Registry configuration',
},
credentials: {
schemaRegistryApi: schemaRegistryCredential,
},
});
harness.setupTest({
description: 'should report a malformed message distinctly from a registry config error',
input: { workflowData: errorWorkflow('test-event-name', 'not-json') },
output: {
nodeData: {},
error: 'Message is not valid JSON',
},
credentials: {
schemaRegistryApi: schemaRegistryCredential,
},
});
harness.setupTest({
description: 'should return the error as item data when the node continues on fail',
input: {
workflowData: errorWorkflow('test-event-name', '{"foo":"bar"}', 'continueRegularOutput'),
},
output: {
nodeData: {
'Schema Registry Error': [
[{ error: 'Username and password are required for Schema Registry Basic Auth' }],
],
},
},
credentials: {
schemaRegistryApi: { ...schemaRegistryCredential, password: '' },
},
});
test('should only connect the producer once the schema registry is resolved', async () => {
// Cumulative count across all the workflows above: 3 node executions from
// the two successful workflows, plus 1 from the encode-failure workflow
// (encoding fails inside the loop, after the producer has connected).
// The two registry-resolution error workflows (missing password, failing
// schema lookup) must NOT contribute: registry misconfiguration surfaces
// before `producer.connect()`, so no connected producer is ever leaked.
expect(mockProducerConnect).toHaveBeenCalledTimes(4);
});
test('should publish the correct kafka messages', async () => {
expect(mockProducerSend).toHaveBeenCalledTimes(2);
expect(mockProducerSend).toHaveBeenCalledTimes(3);
expect(mockProducerSend).toHaveBeenCalledWith({
acks: 1,
compression: 1,
@@ -99,4 +234,37 @@ describe('Kafka Node', () => {
],
});
});
test('should configure the schema registry from the selected credential', async () => {
expect(SchemaRegistry).toHaveBeenCalledWith({
host: 'https://cred-kafka-registry.local',
auth: { username: 'registry-user', password: 'registry-password' },
});
// The legacy URL-parameter path stays unauthenticated
expect(SchemaRegistry).toHaveBeenCalledWith({ host: 'https://test-kafka-registry.local' });
expect(mockProducerSend).toHaveBeenCalledWith(
expect.objectContaining({
topicMessages: [
{
messages: [
{
headers: { headerKey: 'headerValue' },
key: null,
value: Buffer.from(JSON.stringify({ foo: 'bar' })),
},
],
topic: 'cred-test-topic',
},
],
}),
);
});
test('should resolve the schema id from the configured event name and encode with it', async () => {
// Exercised by the credential success path (workflow.credentials.json):
// eventName 'test-event-name' resolves to schemaId 1, used to encode the payload.
expect(mockRegistryGetLatestSchemaId).toHaveBeenCalledWith('test-event-name');
expect(mockRegistryEncode).toHaveBeenCalledWith(1, { foo: 'bar' });
});
});
@@ -307,6 +307,178 @@ describe('KafkaTrigger Node', () => {
]);
});
it('should use the schema registry credential when selected', async () => {
const { emit } = await testTriggerNode(KafkaTrigger, {
mode: 'trigger',
node: {
credentials: {
kafka: { id: '1', name: 'Kafka account' },
schemaRegistryApi: { id: '2', name: 'Schema Registry account' },
},
parameters: {
topic: 'test-topic',
groupId: 'test-group',
useSchemaRegistry: true,
schemaRegistryUrl: '',
options: { parallelProcessing: true },
},
},
credentials: {
kafka: {
brokers: 'localhost:9092',
clientId: 'n8n-kafka',
ssl: false,
authentication: false,
},
schemaRegistryApi: {
url: 'https://schema-registry.local:8081',
authentication: 'basicAuth',
username: 'registry-user',
password: 'registry-password',
},
},
});
await publishMessage({
value: Buffer.from('test-message'),
});
expect(SchemaRegistry).toHaveBeenCalledWith({
host: 'https://schema-registry.local:8081',
auth: { username: 'registry-user', password: 'registry-password' },
});
expect(mockRegistryDecode).toHaveBeenCalledWith(Buffer.from('test-message'));
expect(emit).toHaveBeenCalledWith([
[
{
json: {
message: { data: 'decoded-data' },
topic: 'test-topic',
},
},
],
]);
});
it('should use the schema registry credential without auth when authentication is none', async () => {
await testTriggerNode(KafkaTrigger, {
mode: 'trigger',
node: {
credentials: {
kafka: { id: '1', name: 'Kafka account' },
schemaRegistryApi: { id: '2', name: 'Schema Registry account' },
},
parameters: {
topic: 'test-topic',
groupId: 'test-group',
useSchemaRegistry: true,
schemaRegistryUrl: '',
options: { parallelProcessing: true },
},
},
credentials: {
kafka: {
brokers: 'localhost:9092',
clientId: 'n8n-kafka',
ssl: false,
authentication: false,
},
schemaRegistryApi: {
url: 'https://unauthenticated-registry.local:8081',
authentication: 'none',
},
},
});
expect(SchemaRegistry).toHaveBeenCalledWith({
host: 'https://unauthenticated-registry.local:8081',
});
});
it('should emit the original message and log a sanitized warning when decoding fails', async () => {
mockRegistryDecode.mockRejectedValue(
Object.assign(
new Error(
'request to https://registry-user:registry-password@schema-registry.local:8081/schemas/ids/1 failed',
),
{ status: 404 },
),
);
const { emit, logger } = await testTriggerNode(KafkaTrigger, {
mode: 'trigger',
node: {
parameters: {
topic: 'test-topic',
groupId: 'test-group',
useSchemaRegistry: true,
schemaRegistryUrl: 'http://localhost:8081',
options: { parallelProcessing: true },
},
},
credential: {
brokers: 'localhost:9092',
clientId: 'n8n-kafka',
ssl: false,
authentication: false,
},
});
await publishMessage({
value: Buffer.from('raw-message'),
});
expect(mockRegistryDecode).toHaveBeenCalledWith(Buffer.from('raw-message'));
// The raw message is still emitted
expect(emit).toHaveBeenCalledWith([
[{ json: { message: 'raw-message', topic: 'test-topic' } }],
]);
// The warning is sanitized: message with URL userinfo redacted plus status,
// never the raw error object
expect(logger.warn).toHaveBeenCalledWith(
'Could not decode message with Schema Registry, returning original message',
{
message: 'request to https://***@schema-registry.local:8081/schemas/ids/1 failed',
status: 404,
},
);
});
it('should fail activation when the schema registry credential is missing the password', async () => {
await expect(
testTriggerNode(KafkaTrigger, {
mode: 'trigger',
node: {
credentials: {
kafka: { id: '1', name: 'Kafka account' },
schemaRegistryApi: { id: '2', name: 'Schema Registry account' },
},
parameters: {
topic: 'test-topic',
groupId: 'test-group',
useSchemaRegistry: true,
schemaRegistryUrl: '',
options: { parallelProcessing: true },
},
},
credentials: {
kafka: {
brokers: 'localhost:9092',
clientId: 'n8n-kafka',
ssl: false,
authentication: false,
},
schemaRegistryApi: {
url: 'https://schema-registry.local:8081',
authentication: 'basicAuth',
username: 'registry-user',
password: '',
},
},
}),
).rejects.toThrow('Username and password are required for Schema Registry Basic Auth');
});
it('should parse JSON message when jsonParseMessage is true', async () => {
const { emit } = await testTriggerNode(KafkaTrigger, {
mode: 'trigger',
@@ -1,9 +1,24 @@
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import { mock } from 'jest-mock-extended';
import type { ITriggerFunctions, IRun, INode, Logger, IDeferredPromise } from 'n8n-workflow';
import type {
ITriggerFunctions,
IRun,
INode,
Logger,
IDeferredPromise,
ICredentialDataDecryptedObject,
} from 'n8n-workflow';
import { NodeOperationError, sleep } from 'n8n-workflow';
import { getAutoCommitSettings, configureDataEmitter, type KafkaTriggerOptions } from '../utils';
import {
getAutoCommitSettings,
configureDataEmitter,
getSchemaRegistryOptions,
setSchemaRegistry,
type KafkaTriggerOptions,
} from '../utils';
jest.mock('@kafkajs/confluent-schema-registry');
jest.mock('n8n-workflow', () => {
const actual = jest.requireActual('n8n-workflow');
return {
@@ -572,4 +587,296 @@ describe('Kafka Utils', () => {
});
});
});
describe('schema registry helpers', () => {
const registryNode: INode = {
id: 'test-node-id',
name: 'Test Kafka Trigger',
type: 'n8n-nodes-base.kafkaTrigger',
typeVersion: 1.3,
position: [0, 0],
parameters: {},
};
const createRegistryContext = ({
params = {},
nodeCredentials,
credentialData,
}: {
params?: Record<string, unknown>;
nodeCredentials?: INode['credentials'];
credentialData?: ICredentialDataDecryptedObject;
} = {}) => {
const ctx = mock<ITriggerFunctions>();
ctx.getNode.mockReturnValue({ ...registryNode, credentials: nodeCredentials });
ctx.getCredentials.mockResolvedValue(credentialData ?? {});
ctx.getNodeParameter.mockImplementation(
(name: string, fallback?: unknown) => (params[name] ?? fallback) as never,
);
ctx.logger = mock<Logger>();
return ctx;
};
const schemaRegistryNodeCredentials = {
schemaRegistryApi: { id: '1', name: 'Schema Registry account' },
};
beforeEach(() => {
jest.clearAllMocks();
});
describe('getSchemaRegistryOptions', () => {
it('should use the fallback URL when no credential is selected', async () => {
const ctx = createRegistryContext();
const result = await getSchemaRegistryOptions(ctx, 'https://fallback-registry.local');
expect(result).toEqual({ host: 'https://fallback-registry.local' });
expect(ctx.getCredentials).not.toHaveBeenCalled();
});
it('should trim the fallback URL', async () => {
const ctx = createRegistryContext();
const result = await getSchemaRegistryOptions(ctx, ' https://fallback-registry.local ');
expect(result).toEqual({ host: 'https://fallback-registry.local' });
});
it('should trim the credential URL', async () => {
const ctx = createRegistryContext({
nodeCredentials: schemaRegistryNodeCredentials,
credentialData: {
url: ' https://schema-registry.local:8081 ',
authentication: 'none',
},
});
const result = await getSchemaRegistryOptions(ctx, '');
expect(result).toEqual({ host: 'https://schema-registry.local:8081' });
});
it('should return host and auth for a basicAuth credential', async () => {
const ctx = createRegistryContext({
nodeCredentials: schemaRegistryNodeCredentials,
credentialData: {
url: 'https://schema-registry.local:8081',
authentication: 'basicAuth',
username: 'registry-user',
password: 'registry-password',
},
});
const result = await getSchemaRegistryOptions(ctx, '');
expect(ctx.getCredentials).toHaveBeenCalledWith('schemaRegistryApi');
expect(result).toEqual({
host: 'https://schema-registry.local:8081',
auth: { username: 'registry-user', password: 'registry-password' },
});
});
it('should return only the host for a credential without authentication', async () => {
const ctx = createRegistryContext({
nodeCredentials: schemaRegistryNodeCredentials,
credentialData: {
url: 'https://schema-registry.local:8081',
authentication: 'none',
},
});
const result = await getSchemaRegistryOptions(ctx, '');
expect(result).toEqual({ host: 'https://schema-registry.local:8081' });
expect(result).not.toHaveProperty('auth');
});
it('should throw when basicAuth credential is missing the password', async () => {
const ctx = createRegistryContext({
nodeCredentials: schemaRegistryNodeCredentials,
credentialData: {
url: 'https://schema-registry.local:8081',
authentication: 'basicAuth',
username: 'registry-user',
password: '',
},
});
await expect(getSchemaRegistryOptions(ctx, '')).rejects.toThrow(NodeOperationError);
await expect(getSchemaRegistryOptions(ctx, '')).rejects.toThrow(
'Username and password are required for Schema Registry Basic Auth',
);
});
it('should throw when no credential is selected and the fallback URL is blank', async () => {
const ctx = createRegistryContext();
await expect(getSchemaRegistryOptions(ctx, ' ')).rejects.toThrow(NodeOperationError);
await expect(getSchemaRegistryOptions(ctx, ' ')).rejects.toThrow(
'Select a Schema Registry credential or enter a Schema Registry URL',
);
expect(ctx.getCredentials).not.toHaveBeenCalled();
});
it('should throw when the selected credential has a blank URL', async () => {
const ctx = createRegistryContext({
nodeCredentials: schemaRegistryNodeCredentials,
credentialData: {
url: ' ',
authentication: 'none',
},
});
await expect(getSchemaRegistryOptions(ctx, '')).rejects.toThrow(NodeOperationError);
await expect(getSchemaRegistryOptions(ctx, '')).rejects.toThrow(
'Select a Schema Registry credential or enter a Schema Registry URL',
);
});
});
describe('setSchemaRegistry', () => {
it('should return undefined and not construct a registry when disabled', async () => {
const ctx = createRegistryContext({ params: { useSchemaRegistry: false } });
const result = await setSchemaRegistry(ctx);
expect(result).toBeUndefined();
expect(SchemaRegistry).not.toHaveBeenCalled();
});
it('should construct the registry with credential options', async () => {
const ctx = createRegistryContext({
params: { useSchemaRegistry: true, schemaRegistryUrl: '' },
nodeCredentials: schemaRegistryNodeCredentials,
credentialData: {
url: 'https://schema-registry.local:8081',
authentication: 'basicAuth',
username: 'registry-user',
password: 'registry-password',
},
});
const result = await setSchemaRegistry(ctx);
expect(SchemaRegistry).toHaveBeenCalledWith({
host: 'https://schema-registry.local:8081',
auth: { username: 'registry-user', password: 'registry-password' },
});
expect(result).toBeDefined();
});
it('should warn with a sanitized payload and continue on connection-type errors', async () => {
const ctx = createRegistryContext({
params: {
useSchemaRegistry: true,
schemaRegistryUrl: 'https://fallback-registry.local',
},
});
const connectionError = Object.assign(new Error('connect ECONNREFUSED'), {
status: 503,
});
(SchemaRegistry as jest.Mock).mockImplementationOnce(() => {
throw connectionError;
});
const result = await setSchemaRegistry(ctx);
expect(result).toBeUndefined();
expect(ctx.logger.warn).toHaveBeenCalledWith('Could not connect to Schema Registry', {
message: 'connect ECONNREFUSED',
status: 503,
});
});
it('should redact URL userinfo and omit status in the warn payload when absent', async () => {
const ctx = createRegistryContext({
params: {
useSchemaRegistry: true,
schemaRegistryUrl: 'https://fallback-registry.local',
},
});
(SchemaRegistry as jest.Mock).mockImplementationOnce(() => {
throw new Error(
'request to https://registry-user:registry-password@fallback-registry.local/subjects failed',
);
});
const result = await setSchemaRegistry(ctx);
expect(result).toBeUndefined();
const [logMessage, logPayload] = jest.mocked(ctx.logger.warn).mock.calls[0];
expect(logMessage).toBe('Could not connect to Schema Registry');
expect(logPayload).toStrictEqual({
message: 'request to https://***@fallback-registry.local/subjects failed',
});
});
it('should redact userinfo up to the last @ when the password contains an unencoded @', async () => {
const ctx = createRegistryContext({
params: {
useSchemaRegistry: true,
schemaRegistryUrl: 'https://fallback-registry.local',
},
});
(SchemaRegistry as jest.Mock).mockImplementationOnce(() => {
throw new Error(
'request to https://registry-user:p@ssw0rd@fallback-registry.local/subjects failed',
);
});
const result = await setSchemaRegistry(ctx);
expect(result).toBeUndefined();
const [, logPayload] = jest.mocked(ctx.logger.warn).mock.calls[0];
expect(logPayload).toStrictEqual({
message: 'request to https://***@fallback-registry.local/subjects failed',
});
});
it('should cap the logged message length for oversized registry errors', async () => {
const ctx = createRegistryContext({
params: {
useSchemaRegistry: true,
schemaRegistryUrl: 'https://fallback-registry.local',
},
});
(SchemaRegistry as jest.Mock).mockImplementationOnce(() => {
throw new Error('x'.repeat(2000));
});
const result = await setSchemaRegistry(ctx);
expect(result).toBeUndefined();
const [, logPayload] = jest.mocked(ctx.logger.warn).mock.calls[0];
const { message } = logPayload as { message: string };
expect(message).toHaveLength(503);
expect(message.endsWith('...')).toBe(true);
});
it('should rethrow misconfiguration errors instead of warning', async () => {
const ctx = createRegistryContext({
params: { useSchemaRegistry: true, schemaRegistryUrl: '' },
});
await expect(setSchemaRegistry(ctx)).rejects.toThrow(NodeOperationError);
await expect(setSchemaRegistry(ctx)).rejects.toThrow(
'Select a Schema Registry credential or enter a Schema Registry URL',
);
expect(ctx.logger.warn).not.toHaveBeenCalled();
});
it('should rethrow misconfiguration errors when the fallback URL is whitespace-only', async () => {
const ctx = createRegistryContext({
params: { useSchemaRegistry: true, schemaRegistryUrl: ' ' },
});
await expect(setSchemaRegistry(ctx)).rejects.toThrow(NodeOperationError);
await expect(setSchemaRegistry(ctx)).rejects.toThrow(
'Select a Schema Registry credential or enter a Schema Registry URL',
);
expect(ctx.logger.warn).not.toHaveBeenCalled();
});
});
});
});
@@ -0,0 +1,74 @@
{
"name": "Kafka schema registry credential test",
"nodes": [
{
"parameters": {},
"type": "n8n-nodes-base.manualTrigger",
"typeVersion": 1,
"position": [0, 0],
"id": "5ee79205-4ad6-4357-b876-ee4b9a0c4c5e",
"name": "When clicking Execute workflow"
},
{
"parameters": {
"topic": "cred-test-topic",
"sendInputData": false,
"message": "={{ JSON.stringify({foo: 'bar'}) }}",
"jsonParameters": true,
"useSchemaRegistry": true,
"schemaRegistryUrl": "",
"eventName": "test-event-name",
"headerParametersJson": "{\n \"headerKey\": \"headerValue\"\n}",
"options": {}
},
"type": "n8n-nodes-base.kafka",
"typeVersion": 1,
"position": [220, 0],
"id": "b8455e29-9a21-4a4a-800a-2c2e10b53e3a",
"name": "Schema Registry Credential",
"credentials": {
"kafka": {
"id": "JJBjHkOrIfcj91EX",
"name": "Kafka account"
},
"schemaRegistryApi": {
"id": "wW0eW1iZK9d3Yz2g",
"name": "Schema Registry account"
}
}
}
],
"pinData": {
"Schema Registry Credential": [
{
"json": {
"success": true
}
}
]
},
"connections": {
"When clicking Execute workflow": {
"main": [
[
{
"node": "Schema Registry Credential",
"type": "main",
"index": 0
}
]
]
}
},
"active": false,
"settings": {
"executionOrder": "v1"
},
"versionId": "0d4cf8fd-9aa4-4ee5-9f33-8f4a02f2b18d",
"meta": {
"templateCredsSetupCompleted": true,
"instanceId": "27cc9b56542ad45b38725555722c50a1c3fee1670bbb67980558314ee08517c4"
},
"id": "kQ4mZVcfhaGvCbgF",
"tags": []
}
+113 -6
View File
@@ -15,6 +15,7 @@ import type {
IRun,
IBinaryKeyData,
INodeExecutionData,
FunctionsBase,
} from 'n8n-workflow';
import { ensureError, jsonParse, NodeOperationError, sleep } from 'n8n-workflow';
@@ -55,6 +56,18 @@ interface KafkaCredentials {
saslMechanism?: 'plain' | 'scram-sha-256' | 'scram-sha-512';
}
interface SchemaRegistryCredentials {
url: string;
authentication: 'none' | 'basicAuth';
username?: string;
password?: string;
}
interface SchemaRegistryOptions {
host: string;
auth?: { username: string; password: string };
}
type ResolveOffsetMode = 'immediately' | 'onCompletion' | 'onSuccess' | 'onStatus';
/**
@@ -173,9 +186,10 @@ export function configureMessageParser(
try {
value = await registry.decode(message.value as Buffer);
} catch (error) {
logger.warn('Could not decode message with Schema Registry, returning original message', {
error,
});
logger.warn(
'Could not decode message with Schema Registry, returning original message',
sanitizeRegistryError(error),
);
}
}
@@ -274,20 +288,113 @@ export function disconnectEventListeners(
listeners.forEach((listener) => listener());
}
/**
* Builds a sanitized log payload from a schema registry error, so raw error
* payloads are never logged: only the message (with any URL userinfo redacted,
* since registry client errors embed full request URLs) and, when present, the
* status
* @param error - The caught error
* @returns Log metadata with the redacted error message and optional status
*/
// The registry client interpolates the upstream HTTP response body into its
// error message, which can be large, so the logged message is bounded.
const MAX_REGISTRY_ERROR_MESSAGE_LENGTH = 500;
function sanitizeRegistryError(error: unknown) {
const ensured = ensureError(error);
const redacted = ensured.message.replace(/\/\/[^/\s]+@/g, '//***@');
const message =
redacted.length > MAX_REGISTRY_ERROR_MESSAGE_LENGTH
? `${redacted.slice(0, MAX_REGISTRY_ERROR_MESSAGE_LENGTH)}...`
: redacted;
return {
message,
...('status' in ensured ? { status: ensured.status } : {}),
};
}
/**
* Resolves the Confluent Schema Registry connection options, preferring a
* selected `schemaRegistryApi` credential over the legacy URL node parameter
* @param ctx - The execution context (node or trigger)
* @param fallbackUrl - The `schemaRegistryUrl` node parameter, used when no credential is selected
* @returns Options for the `SchemaRegistry` constructor
*/
export async function getSchemaRegistryOptions(
ctx: Pick<FunctionsBase, 'getNode' | 'getCredentials'>,
fallbackUrl: string,
): Promise<SchemaRegistryOptions> {
const emptyConfigError = () =>
new NodeOperationError(
ctx.getNode(),
'Select a Schema Registry credential or enter a Schema Registry URL',
);
if (!ctx.getNode().credentials?.schemaRegistryApi) {
const host = fallbackUrl.trim();
if (!host) {
throw emptyConfigError();
}
return { host };
}
const credentials = await ctx.getCredentials<SchemaRegistryCredentials>('schemaRegistryApi');
const host = credentials.url?.trim();
if (!host) {
throw emptyConfigError();
}
const options: SchemaRegistryOptions = { host };
if (credentials.authentication === 'basicAuth') {
if (!(credentials.username && credentials.password)) {
throw new NodeOperationError(
ctx.getNode(),
'Username and password are required for Schema Registry Basic Auth',
);
}
options.auth = { username: credentials.username, password: credentials.password };
}
return options;
}
/**
* Constructs a Schema Registry client from the resolved connection options.
* Shared by the Kafka producer node and the Kafka Trigger; each caller layers
* its own behavior on top (the producer resolves a schema id, the trigger
* applies the warn-and-continue policy).
* @param ctx - The execution context (node or trigger)
* @param fallbackUrl - The `schemaRegistryUrl` node parameter, used when no credential is selected
*/
export async function createSchemaRegistry(
ctx: Pick<FunctionsBase, 'getNode' | 'getCredentials'>,
fallbackUrl: string,
): Promise<SchemaRegistry> {
const options = await getSchemaRegistryOptions(ctx, fallbackUrl);
return new SchemaRegistry(options);
}
/**
* Initializes Confluent Schema Registry if enabled in node parameters
* @param ctx - The trigger function context
* @returns Schema registry instance or undefined if not configured
*/
export function setSchemaRegistry(ctx: ITriggerFunctions) {
export async function setSchemaRegistry(ctx: ITriggerFunctions) {
const useSchemaRegistry = ctx.getNodeParameter('useSchemaRegistry', 0) as boolean;
if (useSchemaRegistry) {
try {
const schemaRegistryUrl = ctx.getNodeParameter('schemaRegistryUrl', 0) as string;
return new SchemaRegistry({ host: schemaRegistryUrl });
return await createSchemaRegistry(ctx, schemaRegistryUrl);
} catch (error) {
ctx.logger.warn('Could not connect to Schema Registry', { error });
// Credential/config misconfiguration must fail loudly at activation
if (error instanceof NodeOperationError) {
throw error;
}
// Connection-type failures keep the warn-and-continue behavior
ctx.logger.warn('Could not connect to Schema Registry', sanitizeRegistryError(error));
}
}
+1
View File
@@ -326,6 +326,7 @@
"dist/credentials/SalesforceJwtApi.credentials.js",
"dist/credentials/SalesforceOAuth2Api.credentials.js",
"dist/credentials/SalesmateApi.credentials.js",
"dist/credentials/SchemaRegistryApi.credentials.js",
"dist/credentials/SeaTableApi.credentials.js",
"dist/credentials/SecurityScorecardApi.credentials.js",
"dist/credentials/SegmentApi.credentials.js",
@@ -20,6 +20,7 @@ import {
type ITriggerFunctions,
type IWebhookFunctions,
type IWorkflowExecuteAdditionalData,
type Logger,
type NodeTypeAndVersion,
type VersionedNodeType,
type Workflow,
@@ -46,6 +47,7 @@ type TestTriggerNodeOptions = {
timezone?: string;
workflowStaticData?: IDataObject;
credential?: ICredentialDataDecryptedObject;
credentials?: Record<string, ICredentialDataDecryptedObject>;
helpers?: Partial<ITriggerFunctions['helpers']>;
workflow?: { id?: string; name?: string; active?: boolean };
};
@@ -111,20 +113,21 @@ export async function testTriggerNode(
name: options.workflow?.name,
active: options.workflow?.active ?? false,
};
const triggerLogger = mock<Logger>({
debug: jest.fn(),
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
});
const triggerFunctions = mock<ITriggerFunctions>({
helpers,
emit,
logger: mock({
debug: jest.fn(),
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
}),
logger: triggerLogger,
getTimezone: () => timezone,
getNode: () => node,
getWorkflow: () => workflowMetadata,
getCredentials: async <T extends object = ICredentialDataDecryptedObject>() =>
(options.credential ?? {}) as T,
getCredentials: async <T extends object = ICredentialDataDecryptedObject>(type: string) =>
(options.credentials?.[type] ?? options.credential ?? {}) as T,
getMode: () => options.mode ?? 'trigger',
getWorkflowStaticData: () => options.workflowStaticData ?? {},
getWorkflowSettings: () => ({}),
@@ -141,6 +144,7 @@ export async function testTriggerNode(
close: jest.fn(response?.closeFunction),
manualTriggerFunction: options.mode === 'manual' ? response?.manualTriggerFunction : undefined,
emit,
logger: triggerLogger,
};
}
@@ -220,8 +224,8 @@ export async function testWebhookTriggerNode(
getWorkflowSettings: () => ({}),
getNodeParameter: (parameterName, fallback) => get(node.parameters, parameterName) ?? fallback,
getChildNodes: () => options.childNodes ?? [],
getCredentials: async <T extends object = ICredentialDataDecryptedObject>() =>
(options.credential ?? {}) as T,
getCredentials: async <T extends object = ICredentialDataDecryptedObject>(type: string) =>
(options.credentials?.[type] ?? options.credential ?? {}) as T,
});
const responseData = await trigger.webhook?.call(webhookFunctions);