fix(enrichment): extend failed-job dedup backoff to artist, track, and vibe queues

The podcast dedup-on-failure trap was live on three more queues. The artist
and mood-tags phases never cleaned their queues at all, so a failed job's
jobId marker blocked re-queue until BullMQ's 24h removeOnFail age expired --
far slower than the worker's documented intent to re-pick-up a failed track.
The admin vibe start/retry routes cleaned only completed jobs, so 'Retry
failed embeddings' silently dropped tracks with a lingering failed job.

Automatic phases now clean completed (grace 0, immediately reusable on
success) and failed (15-min grace, so a permanently-failing entity retries
on a backoff instead of every 5s cycle). The manual admin retry routes clean
failed immediately -- the user asked to retry now. Adds a 3-test regression
suite asserting the grace-0-completed / grace-positive-failed split.
This commit is contained in:
chevron7
2026-06-15 08:34:08 -05:00
parent ebb488aa85
commit 07031f315d
3 changed files with 164 additions and 4 deletions
+8 -2
View File
@@ -561,8 +561,11 @@ router.post("/vibe/start", requireAuth, requireAdmin, async (req, res) => {
});
}
// Clean completed jobs to prevent jobId dedup from silently dropping re-queued tracks
// Clean completed AND failed jobs so jobId dedup can't silently drop a
// re-queued track. This is a manual retry action, so failed jobs are
// cleared immediately (grace 0) -- the user asked to retry now.
await vibeQueue.clean(0, 0, "completed");
await vibeQueue.clean(0, 0, "failed");
// Queue tracks for CLAP embedding via BullMQ (jobId deduplication)
await vibeQueue.addBulk(
@@ -626,8 +629,11 @@ router.post("/vibe/retry", requireAuth, requireAdmin, async (req, res) => {
data: { vibeAnalysisStatus: null, vibeAnalysisRetryCount: 0, vibeAnalysisStatusUpdatedAt: null },
});
// Clean completed jobs to prevent jobId dedup from silently dropping re-queued tracks
// Clean completed AND failed jobs so jobId dedup can't silently drop a
// re-queued track. This is a manual retry action, so failed jobs are
// cleared immediately (grace 0) -- the user asked to retry now.
await vibeQueue.clean(0, 0, "completed");
await vibeQueue.clean(0, 0, "failed");
// Queue for retry via BullMQ (jobId deduplication)
await vibeQueue.addBulk(
@@ -0,0 +1,127 @@
/**
* Regression guard for the dedup-on-failure trap on the artist and track
* enrichment phases (the same class as the podcast wedge).
*
* BullMQ keeps a failed job's jobId marker, so a failed enrichment can't be
* re-queued until the marker is gone. Each automatic phase must clean both
* "completed" (immediately reusable on success) and "failed" (reusable after a
* backoff grace, so a permanently-failing entity doesn't re-add every cycle).
*/
const mockArtistAdd = jest.fn().mockResolvedValue({ id: "j" });
const mockArtistClean = jest.fn().mockResolvedValue([]);
const mockTrackAdd = jest.fn().mockResolvedValue({ id: "j" });
const mockTrackClean = jest.fn().mockResolvedValue([]);
jest.mock("../enrichmentQueues", () => ({
artistQueue: { add: mockArtistAdd, clean: mockArtistClean, resume: jest.fn() },
trackQueue: { add: mockTrackAdd, clean: mockTrackClean, resume: jest.fn() },
vibeQueue: { add: jest.fn(), clean: jest.fn(), resume: jest.fn() },
podcastQueue: { add: jest.fn(), clean: jest.fn(), resume: jest.fn() },
closeEnrichmentQueues: jest.fn().mockResolvedValue(undefined),
}));
jest.mock("../artistEnrichmentWorker", () => ({ startArtistEnrichmentWorker: jest.fn() }));
jest.mock("../trackEnrichmentWorker", () => ({ startTrackEnrichmentWorker: jest.fn() }));
jest.mock("../podcastEnrichmentWorker", () => ({ startPodcastEnrichmentWorker: jest.fn() }));
jest.mock("../audioCompletionSubscriber", () => ({
startAudioCompletionSubscriber: jest.fn(),
stopAudioCompletionSubscriber: jest.fn().mockResolvedValue(undefined),
haltVibeQueuing: jest.fn(),
resumeVibeQueuing: jest.fn(),
}));
jest.mock("../../services/enrichmentState", () => ({ enrichmentStateService: {} }));
jest.mock("../../services/enrichmentFailureService", () => ({ enrichmentFailureService: {} }));
jest.mock("../../services/lastfm", () => ({ lastFmService: {} }));
jest.mock("ioredis", () => jest.fn().mockImplementation(() => ({
on: jest.fn(), publish: jest.fn(), subscribe: jest.fn(), quit: jest.fn(),
})));
jest.mock("../../config", () => ({
config: { redis: { host: "localhost", port: 6379 }, music: { musicPath: "/music" }, nodeEnv: "test" },
}));
const mockArtistFindMany = jest.fn();
const mockArtistUpdateMany = jest.fn().mockResolvedValue({ count: 0 });
const mockArtistUpdate = jest.fn().mockResolvedValue({});
const mockTrackFindMany = jest.fn();
const mockTrackUpdateMany = jest.fn().mockResolvedValue({ count: 0 });
const mockTrackUpdate = jest.fn().mockResolvedValue({});
jest.mock("../../utils/db", () => ({
prisma: {
artist: {
findMany: mockArtistFindMany,
updateMany: mockArtistUpdateMany,
update: mockArtistUpdate,
},
track: {
findMany: mockTrackFindMany,
updateMany: mockTrackUpdateMany,
update: mockTrackUpdate,
},
},
}));
jest.mock("../../utils/logger", () => ({
logger: { debug: jest.fn(), info: jest.fn(), warn: jest.fn(), error: jest.fn() },
}));
import {
executeArtistsPhase,
executeMoodTagsPhase,
} from "../unifiedEnrichment";
// clean(grace, limit, type)
const cleanCallsFor = (mock: jest.Mock, type: string) =>
mock.mock.calls.filter((c) => c[2] === type);
describe("enrichment phases -- clean completed immediately, failed with backoff", () => {
beforeEach(() => jest.clearAllMocks());
it("artists phase cleans completed (grace 0) and failed (grace > 0) before queuing", async () => {
mockArtistFindMany.mockResolvedValue([{ id: "a1", name: "Artist One" }]);
await executeArtistsPhase();
const completed = cleanCallsFor(mockArtistClean, "completed");
const failed = cleanCallsFor(mockArtistClean, "failed");
expect(completed).toHaveLength(1);
expect(completed[0][0]).toBe(0); // grace 0 -- immediate reuse on success
expect(failed).toHaveLength(1);
expect(failed[0][0]).toBeGreaterThan(0); // backoff grace before retry
expect(mockArtistAdd).toHaveBeenCalledWith(
"enrich",
{ artistId: "a1", artistName: "Artist One" },
{ jobId: "artist-a1" },
);
});
it("mood-tags phase cleans completed (grace 0) and failed (grace > 0) before queuing", async () => {
mockTrackFindMany.mockResolvedValue([{ id: "t1", title: "Track One" }]);
await executeMoodTagsPhase();
const completed = cleanCallsFor(mockTrackClean, "completed");
const failed = cleanCallsFor(mockTrackClean, "failed");
expect(completed).toHaveLength(1);
expect(completed[0][0]).toBe(0);
expect(failed).toHaveLength(1);
expect(failed[0][0]).toBeGreaterThan(0);
expect(mockTrackAdd).toHaveBeenCalledWith(
"enrich",
{ trackId: "t1", trackTitle: "Track One" },
{ jobId: "track-t1" },
);
});
it("no work: no clean, no queue", async () => {
mockArtistFindMany.mockResolvedValue([]);
mockTrackFindMany.mockResolvedValue([]);
await executeArtistsPhase();
await executeMoodTagsPhase();
expect(mockArtistClean).not.toHaveBeenCalled();
expect(mockTrackClean).not.toHaveBeenCalled();
expect(mockArtistAdd).not.toHaveBeenCalled();
expect(mockTrackAdd).not.toHaveBeenCalled();
});
});
+29 -2
View File
@@ -48,6 +48,13 @@ import { precomputeProjection } from "../services/umapProjection";
const ARTIST_BATCH_SIZE = 10;
const TRACK_BATCH_SIZE = 20;
const ENRICHMENT_INTERVAL_MS = 5 * 1000; // 5 seconds - rate limiter handles API limits
// BullMQ retains a failed job's jobId dedup marker, so an entity that fails
// enrichment can't be re-queued until the marker is gone. Cleaning failed jobs
// older than this grace window frees the jobId for a retry while still backing
// off a genuinely-failing entity (without it, a permanent failure would re-add
// every 5s cycle). Completed jobs are cleaned with grace 0 -- a success should
// be immediately re-queueable if the entity legitimately needs reprocessing.
const FAILED_JOB_RETRY_GRACE_MS = 15 * 60 * 1000; // 15 minutes
const MAX_CONSECUTIVE_SYSTEM_FAILURES = 5; // Circuit breaker threshold
let isRunning = false;
@@ -1036,7 +1043,7 @@ async function runPhase(
return result;
}
async function executeArtistsPhase(): Promise<number> {
export async function executeArtistsPhase(): Promise<number> {
// Reset temp-MBID artists that have been unresolvable for >24h
const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
await prisma.artist.updateMany({
@@ -1064,6 +1071,16 @@ async function executeArtistsPhase(): Promise<number> {
if (pendingArtists.length === 0) return 0;
// Free reusable jobIds: completed immediately, failed after a backoff grace.
// Without this a failed artist's `artist-<id>` marker blocks every re-add
// until BullMQ's removeOnFail age (24h) expires -- far slower than intended.
try {
await artistQueue.clean(0, 0, "completed");
await artistQueue.clean(FAILED_JOB_RETRY_GRACE_MS, 0, "failed");
} catch (err) {
logger.warn(`[Enrichment] artistQueue clean failed: ${(err as Error).message}`);
}
let queued = 0;
for (const artist of pendingArtists) {
try {
@@ -1090,7 +1107,7 @@ async function executeArtistsPhase(): Promise<number> {
return queued;
}
async function executeMoodTagsPhase(): Promise<number> {
export async function executeMoodTagsPhase(): Promise<number> {
const tracks = await prisma.track.findMany({
where: {
OR: [
@@ -1110,6 +1127,16 @@ async function executeMoodTagsPhase(): Promise<number> {
if (tracks.length === 0) return 0;
// Free reusable jobIds: completed immediately, failed after a backoff grace.
// The worker clears a failed track's tags to re-enable it, but the held
// `track-<id>` marker silently defeats that re-pickup until cleaned.
try {
await trackQueue.clean(0, 0, "completed");
await trackQueue.clean(FAILED_JOB_RETRY_GRACE_MS, 0, "failed");
} catch (err) {
logger.warn(`[Enrichment] trackQueue clean failed: ${(err as Error).message}`);
}
const queuedIds: string[] = [];
for (const track of tracks) {
try {