feat(core): Send push messages after publication outbox record is processed (no-changelog) (#32483)

This commit is contained in:
Tomi Turtiainen
2026-06-17 17:21:20 +03:00
committed by GitHub
parent 4218e1b017
commit 640dc13117
2 changed files with 29 additions and 1 deletions
@@ -54,6 +54,10 @@ describe('PublicationStatusReporter', () => {
expect(outboxRepository.markCompleted).toHaveBeenCalledWith(1);
expect(activationErrorsService.deregister).toHaveBeenCalledWith('wf-1');
expect(outboxRepository.markFailed).not.toHaveBeenCalled();
expect(push.broadcast).toHaveBeenCalledWith({
type: 'workflowActivated',
data: { workflowId: 'wf-1', activeVersionId: 'v-2' },
});
});
test.each([['workflow-not-found'], ['workflow-inactive']] as const)(
@@ -64,6 +68,7 @@ describe('PublicationStatusReporter', () => {
expect(outboxRepository.markCompleted).toHaveBeenCalledWith(1);
expect(activationErrorsService.deregister).toHaveBeenCalledWith('wf-1');
expect(outboxRepository.markFailed).not.toHaveBeenCalled();
expect(push.broadcast).not.toHaveBeenCalled();
},
);
@@ -73,6 +78,10 @@ describe('PublicationStatusReporter', () => {
expect(outboxRepository.markFailed).toHaveBeenCalledWith(1, 'Published version not found');
expect(errorReporter.error).not.toHaveBeenCalled();
expect(activationErrorsService.deregister).not.toHaveBeenCalled();
expect(push.broadcast).toHaveBeenCalledWith({
type: 'workflowFailedToActivate',
data: { workflowId: 'wf-1', errorMessage: 'Published version not found' },
});
});
test('failed reports the error and marks the record failed with its message', async () => {
@@ -83,6 +92,10 @@ describe('PublicationStatusReporter', () => {
expect(errorReporter.error).toHaveBeenCalledWith(error, { shouldBeLogged: true });
expect(outboxRepository.markFailed).toHaveBeenCalledWith(1, 'registration failed');
expect(outboxRepository.markCompleted).not.toHaveBeenCalled();
expect(push.broadcast).toHaveBeenCalledWith({
type: 'workflowFailedToActivate',
data: { workflowId: 'wf-1', errorMessage: 'registration failed' },
});
});
test('partial marks partial_success, registers per-node detail, and pushes the failures', async () => {
@@ -33,6 +33,10 @@ export class PublicationStatusReporter {
switch (result.type) {
case 'completed': {
await this.complete(record);
this.push.broadcast({
type: 'workflowActivated',
data: { workflowId: record.workflowId, activeVersionId: record.publishedVersionId },
});
return;
}
@@ -43,18 +47,21 @@ export class PublicationStatusReporter {
}
case 'version-missing': {
const errorMessage = 'Published version not found';
this.logger.warn('Published version not found, marking outbox record as failed', {
workflowId: record.workflowId,
publishedVersionId: record.publishedVersionId,
outboxId: record.id,
});
await this.outboxRepository.markFailed(record.id, 'Published version not found');
await this.outboxRepository.markFailed(record.id, errorMessage);
this.pushFailedToActivate(record.workflowId, errorMessage);
return;
}
case 'failed': {
this.errorReporter.error(result.error, { shouldBeLogged: true });
await this.outboxRepository.markFailed(record.id, result.error.message);
this.pushFailedToActivate(record.workflowId, result.error.message);
return;
}
@@ -113,6 +120,14 @@ export class PublicationStatusReporter {
return `Some triggers failed to activate: ${detail}`;
}
/** Broadcasts a failed-to-activate status to connected clients (leader-local). */
private pushFailedToActivate(workflowId: string, errorMessage: string): void {
this.push.broadcast({
type: 'workflowFailedToActivate',
data: { workflowId, errorMessage },
});
}
/** Marks the record completed and clears any activation errors for the workflow. */
private async complete(record: WorkflowPublicationOutbox): Promise<void> {
await this.outboxRepository.markCompleted(record.id);