mirror of
https://github.com/Chevron7Locked/kima-hub.git
synced 2026-06-19 07:37:17 +00:00
fix(audio-analyzer): pre-decode ffmpeg gate to stop Essentia segfault crash-loop (#204)
Adds an ffmpeg integrity probe before MonoLoader so corrupt files that SIGSEGV Essentia become a normal load failure (and flow into the existing retry/quarantine) instead of crash-looping the worker. By gossip31.
This commit is contained in:
@@ -59,6 +59,7 @@ import gc
|
||||
from datetime import datetime, timezone
|
||||
from typing import Dict, Any, Optional, List, Tuple
|
||||
import traceback
|
||||
import subprocess
|
||||
import numpy as np
|
||||
from concurrent.futures import ProcessPoolExecutor, as_completed
|
||||
import multiprocessing
|
||||
@@ -219,6 +220,8 @@ ESSENTIA_VERSION = '2.1b6-enhanced-v2'
|
||||
|
||||
# Retry configuration
|
||||
MAX_RETRIES = int(os.getenv('MAX_RETRIES', '3')) # Max retry attempts per track
|
||||
# Seconds to allow the ffmpeg pre-decode integrity probe before giving up.
|
||||
PRE_DECODE_PROBE_TIMEOUT = int(os.getenv('PRE_DECODE_PROBE_TIMEOUT', '120'))
|
||||
STALE_PROCESSING_MINUTES = int(os.getenv('STALE_PROCESSING_MINUTES', '15')) # Reset tracks stuck in 'processing' (synchronized with backend)
|
||||
|
||||
# Queue names
|
||||
@@ -397,11 +400,56 @@ class AudioAnalyzer:
|
||||
traceback.print_exc()
|
||||
self.enhanced_mode = False
|
||||
|
||||
def _is_decodable(self, file_path: str) -> Tuple[bool, Optional[str]]:
|
||||
"""Pre-decode integrity probe using the system ffmpeg, which tolerates
|
||||
(and cleanly reports) the corrupt bitstreams that SIGSEGV Essentia's
|
||||
bundled libavcodec. A bad file makes es.MonoLoader crash the entire
|
||||
worker process via an uncatchable native fault, so we gate on ffmpeg
|
||||
first and skip undecodable files rather than feeding them to Essentia.
|
||||
|
||||
Fails OPEN (returns decodable) if ffmpeg is missing or the probe itself
|
||||
errors, so analysis is never blocked by a tooling gap. ffmpeg reads no
|
||||
stdin (-nostdin) and exits non-zero on the first decode error (-xerror).
|
||||
"""
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
['ffmpeg', '-nostdin', '-v', 'error', '-xerror',
|
||||
'-i', file_path, '-f', 'null', '-'],
|
||||
stdin=subprocess.DEVNULL,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.PIPE,
|
||||
timeout=PRE_DECODE_PROBE_TIMEOUT,
|
||||
)
|
||||
except FileNotFoundError:
|
||||
return (True, None) # ffmpeg unavailable -> never block analysis
|
||||
except subprocess.TimeoutExpired:
|
||||
return (False, "pre-decode probe timed out (corrupt/unreadable)")
|
||||
except Exception as e:
|
||||
logger.warning(f"Pre-decode probe error for {file_path}: {e}")
|
||||
return (True, None) # never let the gate itself reject an OK file
|
||||
if proc.returncode != 0:
|
||||
lines = proc.stderr.decode('utf-8', 'replace').strip().splitlines()
|
||||
detail = lines[-1] if lines else f"ffmpeg exit {proc.returncode}"
|
||||
return (False, f"corrupt bitstream (pre-decode gate): {detail[:160]}")
|
||||
return (True, None)
|
||||
|
||||
def load_audio(self, file_path: str, max_duration: int = 90) -> Optional[Any]:
|
||||
"""Load up to max_duration seconds of audio at 44.1kHz as mono signal"""
|
||||
self._last_load_error = None
|
||||
if not ESSENTIA_AVAILABLE:
|
||||
return None
|
||||
|
||||
# Pre-decode gate: Essentia's MonoLoader takes an uncatchable native
|
||||
# SIGSEGV on certain corrupt files (e.g. AAC "channel element N is not
|
||||
# allocated", MP3 "Header missing"), killing the worker process and
|
||||
# dumping a ~0.5G core. Probe with the tolerant system ffmpeg first and
|
||||
# skip the file instead of crashing the pool. See _is_decodable.
|
||||
decodable, probe_err = self._is_decodable(file_path)
|
||||
if not decodable:
|
||||
logger.error(f"Pre-decode gate rejected {file_path}: {probe_err}")
|
||||
self._last_load_error = probe_err
|
||||
return None
|
||||
|
||||
try:
|
||||
loader = es.MonoLoader(filename=file_path, sampleRate=44100)
|
||||
audio = loader()
|
||||
@@ -411,6 +459,7 @@ class AudioAnalyzer:
|
||||
return audio
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load audio {file_path}: {e}")
|
||||
self._last_load_error = f"audio load failed: {e}"
|
||||
return None
|
||||
|
||||
def validate_audio(self, audio, file_path: str) -> Tuple[bool, Optional[str]]:
|
||||
@@ -504,6 +553,7 @@ class AudioAnalyzer:
|
||||
result['_error'] = 'MemoryError: audio file too large'
|
||||
return result
|
||||
if audio_44k is None:
|
||||
result['_error'] = getattr(self, '_last_load_error', None) or 'Audio failed to load (corrupt or unreadable)'
|
||||
return result
|
||||
|
||||
# Validate audio before analysis
|
||||
|
||||
Reference in New Issue
Block a user