feat(Phantombuster Node): Add Launch Sync operation (#31101)

Co-authored-by: yehorkardash <yehor.kardash@n8n.io>
This commit is contained in:
Mohamed Nachit
2026-06-17 10:05:23 +02:00
committed by GitHub
parent 6e1ef71193
commit 2e76cf3b87
7 changed files with 407 additions and 18 deletions
@@ -42,6 +42,12 @@ export const agentOperations: INodeProperties[] = [
description: 'Add an agent to the launch queue',
action: 'Add an agent to the launch queue',
},
{
name: 'Launch Sync',
value: 'launchSync',
description: 'Launch an agent and stream results',
action: 'Launch an agent and stream results',
},
],
default: 'launch',
},
@@ -224,7 +230,7 @@ export const agentFields: INodeProperties[] = [
},
/* -------------------------------------------------------------------------- */
/* agent:launch */
/* agent:launch / launchSync */
/* -------------------------------------------------------------------------- */
{
displayName: 'Agent Name or ID',
@@ -238,7 +244,7 @@ export const agentFields: INodeProperties[] = [
required: true,
displayOptions: {
show: {
operation: ['launch'],
operation: ['launch', 'launchSync'],
resource: ['agent'],
},
},
@@ -266,7 +272,7 @@ export const agentFields: INodeProperties[] = [
default: false,
displayOptions: {
show: {
operation: ['launch'],
operation: ['launch', 'launchSync'],
resource: ['agent'],
},
},
@@ -279,7 +285,7 @@ export const agentFields: INodeProperties[] = [
displayOptions: {
show: {
resource: ['agent'],
operation: ['launch'],
operation: ['launch', 'launchSync'],
},
},
default: {},
@@ -1,12 +1,15 @@
import type {
JsonObject,
IDataObject,
IExecuteFunctions,
ILoadOptionsFunctions,
IHttpRequestMethods,
IHttpRequestOptions,
ILoadOptionsFunctions,
IRequestOptions,
JsonObject,
} from 'n8n-workflow';
import { NodeApiError, NodeOperationError } from 'n8n-workflow';
import { StringDecoder } from 'node:string_decoder';
import type { Readable } from 'stream';
export async function phantombusterApiRequest(
this: IExecuteFunctions | ILoadOptionsFunctions,
@@ -44,3 +47,163 @@ export function validateJSON(self: IExecuteFunctions, json: string | undefined,
}
return result;
}
/**
* Phantombuster streams have ndjson format
* fhe first message of the stream has this format { type: "start", data: unknown}
* the last message of the stream has this format { type: "summary", data: unknown}
* if there is an error, the last message has this format { type: "error", data: string }
* if the stream ends without a summary or an error, we assume there has been a disconnnection
*/
export async function phantombusterStreamingRequest(
this: IExecuteFunctions | ILoadOptionsFunctions,
options: {
method: IHttpRequestMethods;
path: string;
body?: IDataObject;
qs?: IDataObject;
},
onMessage?: (message: { type?: string; data?: unknown }) => void | Promise<void>,
): Promise<IDataObject | null> {
let stream: AsyncIterable<string>;
try {
stream = await streamingRequest.call(this, 'phantombusterApi', {
method: options.method,
url: `https://api.phantombuster.com/api/v2${options.path}`,
body: options.body,
qs: options.qs,
timeout: 30_000, // socket inactivity — server heartbeats every 10s keep this reset
});
} catch (error) {
// Transport-level failure (auth, routing, 5xx) before any data arrived —
// terminal, surface immediately rather than asking the caller to retry.
if (error instanceof NodeApiError) throw error;
throw new NodeApiError(this.getNode(), error as JsonObject);
}
try {
for await (const line of stream) {
if (line.length === 0) continue;
let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
throw new NodeApiError(this.getNode(), {
message: 'Phantombuster sent a malformed NDJSON line',
description: line.slice(0, 200),
});
}
if (typeof parsed !== 'object' || parsed === null || Array.isArray(parsed)) {
throw new NodeApiError(this.getNode(), {
message: 'Phantombuster sent a non-object NDJSON record',
description: line.slice(0, 200),
});
}
const message = parsed as { type?: string; data?: unknown };
if (message.type === 'error') {
const reason =
typeof message.data === 'string' ? message.data : JSON.stringify(message.data);
throw new NodeApiError(
this.getNode(),
{ message: reason },
{ message: reason, description: 'Phantombuster agent reported an error' },
);
}
if (message.type === 'summary') {
return (message.data ?? {}) as IDataObject;
}
await onMessage?.(message);
}
} catch (error) {
// A mid-stream transport drop is recoverable: return null so the caller can
// reconnect
const code = (error as { code?: string }).code;
const DISCONNECT_CODES = [
'ECONNRESET',
'ECONNABORTED',
'EPIPE',
'ETIMEDOUT',
'ERR_STREAM_PREMATURE_CLOSE',
'ERR_HTTP2_STREAM_ERROR',
];
if (code && DISCONNECT_CODES.includes(code)) return null;
if (error instanceof NodeApiError) throw error;
throw new NodeApiError(this.getNode(), error as JsonObject);
}
return null;
}
/**
* Generic streaming function.
* Yields line by line
*/
async function streamingRequest(
this: IExecuteFunctions | ILoadOptionsFunctions,
credentialsType: string,
options: {
method: IHttpRequestMethods;
url: string;
body?: IHttpRequestOptions['body'];
qs?: IDataObject;
timeout?: number;
},
): Promise<AsyncIterable<string>> {
const MAX_BUFFER_SIZE = 1_048_576; // 1 Mb
const node = this.getNode();
const abortSignal =
'getExecutionCancelSignal' in this ? this.getExecutionCancelSignal() : undefined;
const stream = (await this.helpers.httpRequestWithAuthentication.call(this, credentialsType, {
...options,
abortSignal,
encoding: 'stream',
})) as Readable;
async function* iterate(): AsyncGenerator<string> {
// StringDecoder buffers partial multi-byte UTF-8 sequences across chunk
// boundaries so codepoints split mid-byte aren't mangled into U+FFFD.
const decoder = new StringDecoder('utf8');
let buffer = '';
try {
for await (const chunk of stream) {
buffer += Buffer.isBuffer(chunk) ? decoder.write(chunk) : String(chunk);
const lines = buffer.split('\n');
buffer = lines.pop() ?? '';
// Guard against an unbounded unterminated line. Checked against the
// residual (post-split) so a chunk carrying many small complete
// records doesn't trip it — only the still-open trailing line matters.
if (buffer.length > MAX_BUFFER_SIZE) {
throw new NodeApiError(node, {
message: `Streaming line exceeded ${MAX_BUFFER_SIZE} bytes without a newline terminator`,
});
}
for (const rawLine of lines) {
yield rawLine.replace(/\r$/, '');
}
}
// Flush any final bytes still held by the decoder and any trailing data
// that didn't end with a newline.
buffer += decoder.end();
const tail = buffer.replace(/\r$/, '');
if (tail.length > 0) {
yield tail;
}
} finally {
stream.destroy();
}
}
return iterate();
}
@@ -1,16 +1,21 @@
import {
type IExecuteFunctions,
type IDataObject,
type IExecuteFunctions,
type ILoadOptionsFunctions,
type INodeExecutionData,
type INodePropertyOptions,
type INodeType,
type INodeTypeDescription,
NodeApiError,
NodeConnectionTypes,
} from 'n8n-workflow';
import { agentFields, agentOperations } from './AgentDescription';
import { phantombusterApiRequest, validateJSON } from './GenericFunctions';
import {
phantombusterApiRequest,
phantombusterStreamingRequest,
validateJSON,
} from './GenericFunctions';
// import {
// sentenceCase,
@@ -184,13 +189,12 @@ export class Phantombuster implements INodeType {
}
}
//https://hub.phantombuster.com/reference#post_agents-launch-1
if (operation === 'launch') {
// https://hub.phantombuster.com/reference/post_agents-launch-sync
if (operation === 'launch' || operation === 'launchSync') {
const agentId = this.getNodeParameter('agentId', i) as string;
const jsonParameters = this.getNodeParameter('jsonParameters', i);
const resolveData = this.getNodeParameter('resolveData', i);
const additionalFields = this.getNodeParameter('additionalFields', i);
const body: IDataObject = {
@@ -249,16 +253,60 @@ export class Phantombuster implements INodeType {
Object.assign(body, additionalFields);
responseData = await phantombusterApiRequest.call(this, 'POST', '/agents/launch', body);
if (resolveData) {
if (operation === 'launch') {
responseData = await phantombusterApiRequest.call(
this,
'GET',
'/containers/fetch',
{},
{ id: responseData.containerId },
'POST',
'/agents/launch',
body,
);
const resolveData = this.getNodeParameter('resolveData', i);
if (resolveData) {
responseData = await phantombusterApiRequest.call(
this,
'GET',
'/containers/fetch',
{},
{ id: responseData.containerId },
);
}
} else {
let containerId: string | undefined;
responseData = await phantombusterStreamingRequest.call(
this,
{ method: 'POST', path: '/agents/launch-sync', body },
(message) => {
if (
message.type === 'start' &&
typeof message.data === 'object' &&
message.data !== null &&
'containerId' in message.data
) {
containerId = message.data?.containerId as string;
}
},
);
// If the streaming call disconnects without an error ( no response ) reconnect using the attach endpoint
const MAX_RECONNECTIONS = 5;
const RECONNECTION_DELAY_MS = 1_000;
for (
let attempt = 0;
attempt < MAX_RECONNECTIONS && !responseData && containerId;
attempt += 1
) {
await new Promise((resolve) => setTimeout(resolve, RECONNECTION_DELAY_MS));
responseData = await phantombusterStreamingRequest.call(this, {
method: 'GET',
path: '/containers/attach',
qs: { id: containerId },
});
}
if (!responseData) {
throw new NodeApiError(this.getNode(), {
message: 'Stream did not provide a response',
});
}
}
}
}
@@ -0,0 +1,19 @@
{
"type": "object",
"properties": {
"containerId": {
"type": "string"
},
"executionTime": {
"type": "integer"
},
"exitCode": {
"type": "integer"
},
"resultObject": {},
"output": {
"type": ["string", "null"]
}
},
"version": 1
}
@@ -92,4 +92,38 @@ describe('Phantombuster Node', () => {
workflowFiles: ['launch-with-json-arguments.workflow.json'],
});
});
describe('Launch Sync Agent', () => {
beforeAll(() => {
const ndjson =
'{"type":"start","data":{"containerId":"container-456"}}\n' +
'{"type":"summary","data":{"containerId":"container-456","executionTime":1234,"exitCode":0,"resultObject":{"foo":"bar"},"output":"agent finished"}}\n';
nock('https://api.phantombuster.com')
.post('/api/v2/agents/launch-sync', (body) => body.id === 'test-agent-123')
.reply(200, ndjson, { 'Content-Type': 'application/x-ndjson' });
});
new NodeTestHarness().setupTests({
credentials,
workflowFiles: ['launch-sync-success.workflow.json'],
});
});
describe('Launch Sync Agent Reports Server Error', () => {
beforeAll(() => {
const ndjson =
'{"type":"start","data":{"containerId":"container-456"}}\n' +
'{"type":"error","data":"agent script crashed"}\n';
nock('https://api.phantombuster.com')
.post('/api/v2/agents/launch-sync', (body) => body.id === 'test-agent-123')
.reply(200, ndjson, { 'Content-Type': 'application/x-ndjson' });
});
new NodeTestHarness().setupTests({
credentials,
workflowFiles: ['launch-sync-server-error.workflow.json'],
});
});
});
@@ -0,0 +1,57 @@
{
"name": "Phantombuster Launch Sync Server Error Test",
"nodes": [
{
"parameters": {},
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"name": "When clicking 'Execute Workflow'",
"type": "n8n-nodes-base.manualTrigger",
"typeVersion": 1,
"position": [0, 0]
},
{
"parameters": {
"resource": "agent",
"operation": "launchSync",
"agentId": "test-agent-123",
"jsonParameters": false
},
"id": "b2c3d4e5-f6a7-8901-bcde-f12345678901",
"name": "Phantombuster",
"type": "n8n-nodes-base.phantombuster",
"typeVersion": 1,
"position": [220, 0],
"onError": "continueRegularOutput",
"credentials": {
"phantombusterApi": {
"id": "test-cred-id",
"name": "Phantombuster API"
}
}
}
],
"pinData": {
"Phantombuster": [
{
"error": "agent script crashed"
}
]
},
"connections": {
"When clicking 'Execute Workflow'": {
"main": [
[
{
"node": "Phantombuster",
"type": "main",
"index": 0
}
]
]
}
},
"active": false,
"settings": {
"executionOrder": "v1"
}
}
@@ -0,0 +1,62 @@
{
"name": "Phantombuster Launch Sync Success Test",
"nodes": [
{
"parameters": {},
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"name": "When clicking 'Execute Workflow'",
"type": "n8n-nodes-base.manualTrigger",
"typeVersion": 1,
"position": [0, 0]
},
{
"parameters": {
"resource": "agent",
"operation": "launchSync",
"agentId": "test-agent-123",
"jsonParameters": false
},
"id": "b2c3d4e5-f6a7-8901-bcde-f12345678901",
"name": "Phantombuster",
"type": "n8n-nodes-base.phantombuster",
"typeVersion": 1,
"position": [220, 0],
"credentials": {
"phantombusterApi": {
"id": "test-cred-id",
"name": "Phantombuster API"
}
}
}
],
"pinData": {
"Phantombuster": [
{
"json": {
"containerId": "container-456",
"executionTime": 1234,
"exitCode": 0,
"resultObject": { "foo": "bar" },
"output": "agent finished"
}
}
]
},
"connections": {
"When clicking 'Execute Workflow'": {
"main": [
[
{
"node": "Phantombuster",
"type": "main",
"index": 0
}
]
]
}
},
"active": false,
"settings": {
"executionOrder": "v1"
}
}