mirror of
https://github.com/Chevron7Locked/kima-hub.git
synced 2026-06-19 07:37:17 +00:00
fix: audio analysis cleanup race condition
- Add embedding check before marking tracks as failed/pending - Recovered tracks (with existing embeddings) are marked completed, not failed - Circuit breaker now counts recovered tracks as success for proper state management - Added migration SQL to fix 703 incorrectly-failed tracks in production
This commit is contained in:
@@ -0,0 +1,35 @@
|
||||
-- Fix tracks with existing embeddings that were incorrectly marked as failed/pending/processing
|
||||
UPDATE "Track"
|
||||
SET
|
||||
"analysisStatus" = 'completed',
|
||||
"analysisError" = NULL,
|
||||
"analysisStartedAt" = NULL
|
||||
WHERE id IN (
|
||||
SELECT te.track_id
|
||||
FROM track_embeddings te
|
||||
WHERE te.track_id = "Track".id
|
||||
)
|
||||
AND "analysisStatus" IN ('failed', 'pending', 'processing');
|
||||
|
||||
-- Clean up stale EnrichmentFailure records for tracks that are now completed
|
||||
DELETE FROM "EnrichmentFailure"
|
||||
WHERE "entityType" = 'audio'
|
||||
AND EXISTS (
|
||||
SELECT 1 FROM "Track" t
|
||||
WHERE t.id = "EnrichmentFailure"."entityId"
|
||||
AND t."analysisStatus" = 'completed'
|
||||
);
|
||||
|
||||
-- Verify results
|
||||
SELECT
|
||||
'Before Fix Status' as info,
|
||||
"analysisStatus",
|
||||
COUNT(*) as count
|
||||
FROM "Track"
|
||||
GROUP BY "analysisStatus";
|
||||
|
||||
SELECT
|
||||
'Remaining EnrichmentFailures (audio)' as info,
|
||||
COUNT(*) as count
|
||||
FROM "EnrichmentFailure"
|
||||
WHERE "entityType" = 'audio';
|
||||
@@ -2,10 +2,10 @@ import { prisma } from "../utils/db";
|
||||
import { logger } from "../utils/logger";
|
||||
import { enrichmentFailureService } from "./enrichmentFailureService";
|
||||
|
||||
const STALE_THRESHOLD_MINUTES = 15; // Synchronized with Python analyzer (STALE_PROCESSING_MINUTES)
|
||||
const STALE_THRESHOLD_MINUTES = 15;
|
||||
const MAX_RETRIES = 3;
|
||||
const CIRCUIT_BREAKER_THRESHOLD = 30; // Increased from 10 to handle batch operations
|
||||
const CIRCUIT_BREAKER_WINDOW_MS = 5 * 60 * 1000; // 5 minutes
|
||||
const CIRCUIT_BREAKER_THRESHOLD = 30;
|
||||
const CIRCUIT_BREAKER_WINDOW_MS = 5 * 60 * 1000;
|
||||
|
||||
type CircuitState = "closed" | "open" | "half-open";
|
||||
|
||||
@@ -14,18 +14,12 @@ class AudioAnalysisCleanupService {
|
||||
private failureCount = 0;
|
||||
private lastFailureTime: Date | null = null;
|
||||
|
||||
/**
|
||||
* Check if we should attempt to transition from open to half-open
|
||||
*/
|
||||
private shouldAttemptReset(): boolean {
|
||||
if (!this.lastFailureTime) return false;
|
||||
const timeSinceFailure = Date.now() - this.lastFailureTime.getTime();
|
||||
return timeSinceFailure >= CIRCUIT_BREAKER_WINDOW_MS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle successful operation - close circuit if in half-open state
|
||||
*/
|
||||
private onSuccess(): void {
|
||||
if (this.state === "half-open") {
|
||||
logger.info(
|
||||
@@ -35,7 +29,6 @@ class AudioAnalysisCleanupService {
|
||||
this.failureCount = 0;
|
||||
this.lastFailureTime = null;
|
||||
} else if (this.state === "closed" && this.failureCount > 0) {
|
||||
// Reset failure counter on success while closed
|
||||
logger.debug(
|
||||
"[AudioAnalysisCleanup] Resetting failure counter on success"
|
||||
);
|
||||
@@ -44,9 +37,6 @@ class AudioAnalysisCleanupService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle failed operation - update state and counts
|
||||
*/
|
||||
private onFailure(
|
||||
resetCount: number,
|
||||
permanentlyFailedCount: number
|
||||
@@ -56,13 +46,11 @@ class AudioAnalysisCleanupService {
|
||||
this.lastFailureTime = new Date();
|
||||
|
||||
if (this.state === "half-open") {
|
||||
// Failed during half-open - reopen circuit
|
||||
this.state = "open";
|
||||
logger.warn(
|
||||
`[AudioAnalysisCleanup] Circuit breaker REOPENED - recovery attempt failed (${this.failureCount} total failures)`
|
||||
);
|
||||
} else if (this.failureCount >= CIRCUIT_BREAKER_THRESHOLD) {
|
||||
// Exceeded threshold - open circuit
|
||||
this.state = "open";
|
||||
logger.warn(
|
||||
`[AudioAnalysisCleanup] Circuit breaker OPEN - ${this.failureCount} failures in window. ` +
|
||||
@@ -71,10 +59,6 @@ class AudioAnalysisCleanupService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if circuit breaker is open (too many consecutive failures)
|
||||
* Automatically transitions to half-open after cooldown period
|
||||
*/
|
||||
isCircuitOpen(): boolean {
|
||||
if (this.state === "open" && this.shouldAttemptReset()) {
|
||||
this.state = "half-open";
|
||||
@@ -87,26 +71,19 @@ class AudioAnalysisCleanupService {
|
||||
return this.state === "open";
|
||||
}
|
||||
|
||||
/**
|
||||
* Record success for external callers (maintains backward compatibility)
|
||||
*/
|
||||
recordSuccess(): void {
|
||||
this.onSuccess();
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up tracks stuck in "processing" state
|
||||
* Returns number of tracks reset and permanently failed
|
||||
*/
|
||||
async cleanupStaleProcessing(): Promise<{
|
||||
reset: number;
|
||||
permanentlyFailed: number;
|
||||
recovered: number;
|
||||
}> {
|
||||
const cutoff = new Date(
|
||||
Date.now() - STALE_THRESHOLD_MINUTES * 60 * 1000
|
||||
);
|
||||
|
||||
// Find tracks stuck in processing
|
||||
const staleTracks = await prisma.track.findMany({
|
||||
where: {
|
||||
analysisStatus: "processing",
|
||||
@@ -128,7 +105,7 @@ class AudioAnalysisCleanupService {
|
||||
});
|
||||
|
||||
if (staleTracks.length === 0) {
|
||||
return { reset: 0, permanentlyFailed: 0 };
|
||||
return { reset: 0, permanentlyFailed: 0, recovered: 0 };
|
||||
}
|
||||
|
||||
logger.debug(
|
||||
@@ -137,13 +114,35 @@ class AudioAnalysisCleanupService {
|
||||
|
||||
let resetCount = 0;
|
||||
let permanentlyFailedCount = 0;
|
||||
let recoveredCount = 0;
|
||||
|
||||
for (const track of staleTracks) {
|
||||
const newRetryCount = (track.analysisRetryCount || 0) + 1;
|
||||
const trackName = `${track.album.artist.name} - ${track.title}`;
|
||||
|
||||
const existingEmbedding = await prisma.$queryRaw<{ count: bigint }[]>`
|
||||
SELECT COUNT(*) as count FROM track_embeddings WHERE track_id = ${track.id}
|
||||
`;
|
||||
|
||||
if (Number(existingEmbedding[0]?.count) > 0) {
|
||||
await prisma.track.update({
|
||||
where: { id: track.id },
|
||||
data: {
|
||||
analysisStatus: "completed",
|
||||
analysisError: null,
|
||||
analysisStartedAt: null,
|
||||
},
|
||||
});
|
||||
|
||||
logger.info(
|
||||
`[AudioAnalysisCleanup] Recovered stale track with existing embedding: ${trackName}`
|
||||
);
|
||||
|
||||
recoveredCount++;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (newRetryCount >= MAX_RETRIES) {
|
||||
// Permanently failed - mark as failed and record
|
||||
await prisma.track.update({
|
||||
where: { id: track.id },
|
||||
data: {
|
||||
@@ -154,7 +153,6 @@ class AudioAnalysisCleanupService {
|
||||
},
|
||||
});
|
||||
|
||||
// Record in EnrichmentFailure for user visibility
|
||||
await enrichmentFailureService.recordFailure({
|
||||
entityType: "audio",
|
||||
entityId: track.id,
|
||||
@@ -172,7 +170,6 @@ class AudioAnalysisCleanupService {
|
||||
);
|
||||
permanentlyFailedCount++;
|
||||
} else {
|
||||
// Reset to pending for retry
|
||||
await prisma.track.update({
|
||||
where: { id: track.id },
|
||||
data: {
|
||||
@@ -190,20 +187,21 @@ class AudioAnalysisCleanupService {
|
||||
}
|
||||
}
|
||||
|
||||
// Update circuit breaker state
|
||||
if (resetCount > 0 || permanentlyFailedCount > 0) {
|
||||
this.onFailure(resetCount, permanentlyFailedCount);
|
||||
logger.debug(
|
||||
`[AudioAnalysisCleanup] Cleanup complete: ${resetCount} reset, ${permanentlyFailedCount} permanently failed`
|
||||
);
|
||||
}
|
||||
|
||||
return { reset: resetCount, permanentlyFailed: permanentlyFailedCount };
|
||||
if (recoveredCount > 0) {
|
||||
this.onSuccess();
|
||||
}
|
||||
|
||||
logger.debug(
|
||||
`[AudioAnalysisCleanup] Cleanup complete: ${resetCount} reset, ${permanentlyFailedCount} permanently failed, ${recoveredCount} recovered`
|
||||
);
|
||||
|
||||
return { reset: resetCount, permanentlyFailed: permanentlyFailedCount, recovered: recoveredCount };
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current analysis statistics
|
||||
*/
|
||||
async getStats(): Promise<{
|
||||
pending: number;
|
||||
processing: number;
|
||||
|
||||
@@ -15,7 +15,7 @@ export interface CleanupResult {
|
||||
downloadJobs: { cleaned: number; ids: string[] };
|
||||
spotifyImportJobs: { cleaned: number; ids: string[] };
|
||||
bullQueues: { cleaned: number; queues: string[] };
|
||||
audioAnalysis: { reset: number; permanentlyFailed: number };
|
||||
audioAnalysis: { reset: number; permanentlyFailed: number; recovered: number };
|
||||
totalCleaned: number;
|
||||
}
|
||||
|
||||
|
||||
@@ -488,7 +488,7 @@ async function runEnrichmentCycle(fullMode: boolean): Promise<{
|
||||
await audioAnalysisCleanupService.cleanupStaleProcessing();
|
||||
if (cleanupResult.reset > 0 || cleanupResult.permanentlyFailed > 0) {
|
||||
logger.debug(
|
||||
`[Enrichment] Audio analysis cleanup: ${cleanupResult.reset} reset, ${cleanupResult.permanentlyFailed} permanently failed`,
|
||||
`[Enrichment] Audio analysis cleanup: ${cleanupResult.reset} reset, ${cleanupResult.permanentlyFailed} permanently failed, ${cleanupResult.recovered} recovered`,
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user