release: v1.5.7

BullMQ enrichment migration
- Rewrote enrichment pipeline on BullMQ v5: artist, track, podcast workers with
  pause/resume/stop support and Bull Board visibility
- Essentia publishes audio:analysis:complete events; CLAP subscribes reactively
  instead of polling (eliminates scan delay between phases)
- Thread-safe DB pool in CLAP worker, all DB calls in run_in_executor
- Fixed psycopg2.pool submodule import crash on BullMQ vibe worker startup
- Silenced EnrichmentStateService disconnect error on already-closed Redis conn

Enrichment fixes
- lastfmTags NULL caused mood-tags phase to silently skip all tracks; migration
  backfills NULL -> '{}' and sets column default
- Cover art fetch errors for temp-MBID albums (temp-* passed to Cover Art Archive)
- VIBE-VOCAB vocabulary JSON not copied to Docker image (TypeScript omits .json)
- Wired cleanupOldResolved() to run daily; added missing enrichment status indexes
- Fixed circuit breaker reset, orphan cleanup, podcast entityType, hang detection

Soulseek
- Track search-page downloads in activity tracker
- Use async fs.promises.access instead of synchronous existsSync
- Verify file exists on disk before emitting download:complete (#110)

Docker image size: 28.4 GB -> 12.2 GB
- Removed all CUDA/NVIDIA dependencies; switched to CPU-only PyTorch/TensorFlow
- tensorflow-cpu + essentia-tensorflow --no-deps (avoids GPU TF transitive dep)
- Fixed .dockerignore: **/node_modules and **/.next now excluded from build context

PWA / mobile
- Background audio session loss on iOS/Android: SilenceKeepalive singleton,
  tryResume() in MediaSession play handler, direct track load on 'ended' event,
  visibilitychange/pageshow foreground recovery
- Lock orientation to portrait for Android device lock (#117)

Discovery
- Retry All re-importing albums already in library: apply same three-level filter
  as GET /current before creating download jobs; delete stale UnavailableAlbum
  records for already-present albums. Closes #34

CI / release
- linux/arm64 added to release and nightly Docker builds (#87)
- Isolated release CI from nightly GHA cache (cache-from/cache-to removed from
  docker-publish.yml to guarantee clean release builds)
- Redis vm.overcommit_memory=1 sysctl added to prod and server compose files

Other fixes
- Cross-artist album fallback by title+year prevents library splitting (#50)
- Retry temp-MBID artists after 24h not 7 days; hide temp MBIDs from API (#112)
- 3-attempt ECONNRESET retry on all Deezer getPlaylist call sites (#119)
- check response.ok on health probe — fetch does not throw on 5xx (#104)
- Z-index stacking hierarchy established (MiniPlayer through OverlayPlayer)
- API token display overflow on iPhone (min-w-0/overflow-hidden on flex container)
This commit is contained in:
Your Name
2026-02-23 17:12:09 -06:00
parent 9ffad66317
commit d67c8f49ef
62 changed files with 1723 additions and 978 deletions
+23
View File
@@ -0,0 +1,23 @@
.git
.worktrees
.claude
.serena
.aider-desk
.aider.tags.cache.v4
.roo
.ruff_cache
.vscode
context_portal
logs
docs
scripts
*.md
*.log
**/node_modules
**/.next
.env*
coverage
*.test.ts
__tests__
android
ios
+4 -2
View File
@@ -25,6 +25,9 @@ jobs:
sudo rm -rf /usr/local/share/boost
sudo rm -rf "$AGENT_TOOLSDIRECTORY"
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
@@ -52,5 +55,4 @@ jobs:
org.opencontainers.image.version=nightly-${{ steps.sha.outputs.short }}
cache-from: type=gha
cache-to: type=gha,mode=max
# ARM64 disabled due to QEMU emulation issues with npm packages
platforms: linux/amd64
platforms: linux/amd64,linux/arm64
+1 -3
View File
@@ -60,9 +60,7 @@ jobs:
${{ env.IMAGE_NAME }}:latest
cache-from: type=gha
cache-to: type=gha,mode=max
# Note: ARM64 removed due to QEMU emulation issues with npm packages
# Can be re-added when using native ARM64 runners
platforms: linux/amd64
platforms: linux/amd64,linux/arm64
create-release:
needs: [build]
+17 -1
View File
@@ -5,12 +5,28 @@ All notable changes to Kima will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.5.6] - 2026-02-22
## [1.5.7] - 2026-02-23
### Added
- **BullMQ enrichment infrastructure**: Rewrote the entire enrichment pipeline on top of BullMQ v5, replacing the custom BLPOP/Redis queue loops. Artist, track, and podcast enrichment all run as proper BullMQ Worker instances with job-level pause, resume, and stop support. All queues are visible in the Bull Board admin dashboard. The orchestrator pushes jobs into BullMQ and uses a sentinel pattern to track when all jobs in a phase have completed before advancing.
- **Reactive vibe queuing**: The Essentia audio analyzer now publishes an `audio:analysis:complete` event to Redis when each track finishes. The CLAP service subscribes and immediately queues a vibe embedding job for that track — eliminating the previous polling-based approach where CLAP scanned the database on a fixed interval looking for newly-completed Essentia tracks.
### Fixed
- **PWA background audio session lost on iOS and Android**: Pausing from lock-screen / notification controls while the app was backgrounded caused iOS to reclaim the audio session, blocking any subsequent `audio.play()` call until the app was foregrounded. Fixes two related symptoms: (1) resuming from lock-screen controls appeared to do nothing until the app was opened, (2) music stopped after extended background playback during track transitions. Fixed by: calling `audioEngine.tryResume()` synchronously inside the MediaSession `play` handler (within the user-activation window iOS grants to MediaSession callbacks); adding a silent looping audio keepalive (`silence-keepalive.ts`) that holds the OS audio session while user audio is paused and the app is backgrounded; loading the next track directly from the `ended` event handler to eliminate the inter-track silence gap that triggered session reclaim; and adding `visibilitychange` / `pageshow` foreground recovery to retry playback if the engine is paused when the app returns to the foreground.
- **Discovery "Retry All" importing entire albums already in library**: The `POST /discover/retry-unavailable` endpoint fetched all raw `UnavailableAlbum` records for the week without applying the same three-level filter the `GET /current` endpoint uses before displaying them. As a result, clicking "Retry All" triggered full re-downloads of albums that were already present in the library (matched by discovery MBID, library MBID, or fuzzy title+artist). The retry handler now applies all three filters before creating download jobs, and deletes stale `UnavailableAlbum` records for albums already in the library so they do not reappear. Closes #34.
- **Mood-tags phase silently skipping all tracks**: `lastfmTags` was `NULL` for tracks that had been enriched before the column was added. The mood-tags enrichment phase queries `WHERE lastfmTags != '{}'`, which never matches `NULL` — so every track was silently skipped every cycle. Migration backfills all `NULL` values to `'{}'` and sets the column default, so newly enriched tracks are never NULL.
- **Docker image size (28.4 GB → 12.2 GB)**: Removed all CUDA and NVIDIA dependencies from the Docker image. The `audio-analyzer` and `audio-analyzer-clap` services now run on CPU-only PyTorch and TensorFlow. Changed pip installs to use the CPU-only PyTorch wheel index (`--index-url https://download.pytorch.org/whl/cpu`), replaced `tensorflow` with `tensorflow-cpu`, and installed `essentia-tensorflow --no-deps` to prevent pip from pulling the GPU TensorFlow variant as a transitive dependency. Removed `nvidia-cudnn-cu12`, `torchvision` (not imported), the `/opt/cudnn8` CUDA layer, and all NVIDIA library paths from the supervisor `LD_LIBRARY_PATH`. No regressions: TensorFlow confirmed running on CPU, all 9 MusiCNN classification heads load normally.
- **Docker build context bloat**: `frontend/node_modules/` (598 MB) and `frontend/.next/` (313 MB) were not excluded from the Docker build context. The `.dockerignore` `node_modules` pattern only matched root-level; changed to `**/node_modules`. Added `**/.next`. Combined these reduced the `COPY frontend/ ./` layer from 946 MB to ~50 MB.
- **Cover art fetch errors for temp-MBID albums**: Albums with temporary MBIDs (temp-*) were being passed to the Cover Art Archive API, causing 400 errors. Added validation to skip temp-MBIDs in artist enrichment and data cache.
- **VIBE-VOCAB vocabulary file missing**: The vocabulary JSON file wasn't being copied to the Docker image because TypeScript doesn't copy .json files automatically. Added explicit import to force tsc to copy it.
- **Redis memory overcommit warning**: Added `vm.overcommit_memory=1` sysctl to docker-compose.prod.yml and docker-compose.server.yml.
- **Z-index stacking order**: MiniPlayer was z-50 (same tier as modals), causing it to appear above open dialogs due to DOM ordering. Established a consistent stacking hierarchy: MiniPlayer z-[45] → TopBar z-50 → VibeOverlay/toasts z-[55] → MobileSidebar backdrop z-[60] / drawer z-[70] → all modals z-[80] → nested confirm z-[85] → toast z-[100] → OverlayPlayer z-[9999]. MobileSidebar was also using non-standard `z-100` which is not a valid Tailwind class.
- **API token display overflowing viewport on iPhone**: The newly-generated token `<code>` block extended beyond the screen on narrow viewports due to missing `min-w-0` / `overflow-hidden` on its flex container; added both.
- **CLAP BullMQ worker crash on startup**: `import psycopg2` does not implicitly import `psycopg2.pool`; the BullMQ vibe worker was crashing immediately because `psycopg2.pool.ThreadedConnectionPool` was referenced without the submodule being imported. Added explicit `import psycopg2.pool`.
- **EnrichmentStateService Redis disconnect error**: Calling `disconnect()` on an already-closed Redis connection raised an unhandled error. The disconnect is now silenced when the connection is already in a closed state.
- **CLAP worker thread-safety**: All PostgreSQL calls in the CLAP BullMQ worker are now wrapped in `run_in_executor` so they execute on a thread-pool thread rather than blocking the asyncio event loop. Connection pool is initialized once per process and shared safely across concurrent jobs.
## [1.5.5] - 2026-02-21
+33 -38
View File
@@ -42,25 +42,38 @@ RUN mkdir -p /app/backend /app/frontend /app/audio-analyzer /app/models \
# ============================================
WORKDIR /app/audio-analyzer
# Install Python dependencies for audio analysis
# Note: TensorFlow must be installed explicitly for Python 3.11+ compatibility
# Install all Python dependencies in a single layer to minimize image size
# CPU-only torch/torchaudio: install first via the CPU index so downstream
# packages (laion-clap, transformers) reuse the already-installed CPU wheels.
# tensorflow-cpu replaces tensorflow to avoid pulling in CUDA runtime libs.
# essentia-tensorflow declares a dependency on `tensorflow` (not tensorflow-cpu)
# so we install it with --no-deps after tensorflow-cpu is already present.
RUN pip3 install --no-cache-dir --break-system-packages \
'tensorflow>=2.13.0,<2.16.0' \
torch torchaudio torchvision \
--index-url https://download.pytorch.org/whl/cpu \
&& pip3 install --no-cache-dir --break-system-packages \
'tensorflow-cpu>=2.13.0,<2.14.0' \
&& pip3 install --no-cache-dir --break-system-packages --no-deps \
essentia-tensorflow \
&& pip3 install --no-cache-dir --break-system-packages \
redis \
psycopg2-binary
psycopg2-binary \
'laion-clap>=1.1.4' \
'librosa>=0.10.0' \
'transformers>=4.30.0' \
'pgvector>=0.2.0' \
'python-dotenv>=1.0.0' \
'requests>=2.31.0' \
'bullmq==2.19.5' \
&& pip cache purge \
&& find /usr -name "*.pyc" -delete \
&& find /usr -name "__pycache__" -type d -exec rm -rf {} + 2>/dev/null || true
# Install cuDNN 8 for TensorFlow GPU (separate from PyTorch's cuDNN 9)
# TF 2.15 needs cuDNN 8, PyTorch needs cuDNN 9 -- installed to isolated path to avoid conflicts
RUN pip3 install --no-cache-dir --break-system-packages --target=/opt/cudnn8 'nvidia-cudnn-cu12==8.9.7.29'
# Download Essentia ML models (~200MB total) - these enable Enhanced vibe matching
# Download all ML models in a single layer (~800MB total)
# IMPORTANT: Using MusiCNN models to match analyzer.py expectations
RUN echo "Downloading Essentia ML models for Enhanced vibe matching..." && \
# Base MusiCNN embedding model (required for all predictions)
RUN echo "Downloading ML models..." && \
curl -L --retry 3 --retry-delay 5 --connect-timeout 30 --max-time 300 -o /app/models/msd-musicnn-1.pb \
"https://essentia.upf.edu/models/autotagging/msd/msd-musicnn-1.pb" && \
# Mood classification heads (using MusiCNN architecture)
curl -L --retry 3 --retry-delay 5 --connect-timeout 30 --max-time 300 -o /app/models/mood_happy-msd-musicnn-1.pb \
"https://essentia.upf.edu/models/classification-heads/mood_happy/mood_happy-msd-musicnn-1.pb" && \
curl -L --retry 3 --retry-delay 5 --connect-timeout 30 --max-time 300 -o /app/models/mood_sad-msd-musicnn-1.pb \
@@ -75,15 +88,16 @@ RUN echo "Downloading Essentia ML models for Enhanced vibe matching..." && \
"https://essentia.upf.edu/models/classification-heads/mood_acoustic/mood_acoustic-msd-musicnn-1.pb" && \
curl -L --retry 3 --retry-delay 5 --connect-timeout 30 --max-time 300 -o /app/models/mood_electronic-msd-musicnn-1.pb \
"https://essentia.upf.edu/models/classification-heads/mood_electronic/mood_electronic-msd-musicnn-1.pb" && \
# Other classification heads
curl -L --retry 3 --retry-delay 5 --connect-timeout 30 --max-time 300 -o /app/models/danceability-msd-musicnn-1.pb \
"https://essentia.upf.edu/models/classification-heads/danceability/danceability-msd-musicnn-1.pb" && \
curl -L --retry 3 --retry-delay 5 --connect-timeout 30 --max-time 300 -o /app/models/voice_instrumental-msd-musicnn-1.pb \
"https://essentia.upf.edu/models/classification-heads/voice_instrumental/voice_instrumental-msd-musicnn-1.pb" && \
echo "ML models downloaded successfully" && \
curl -L --retry 3 --retry-delay 5 --connect-timeout 30 --max-time 300 -o /app/models/music_audioset_epoch_15_esc_90.14.pt \
"https://huggingface.co/lukewys/laion_clap/resolve/main/music_audioset_epoch_15_esc_90.14.pt" && \
echo "All ML models downloaded" && \
ls -lh /app/models/
# Copy audio analyzer script
# Copy audio analyzer scripts
COPY services/audio-analyzer/analyzer.py /app/audio-analyzer/
# ============================================
@@ -91,30 +105,9 @@ COPY services/audio-analyzer/analyzer.py /app/audio-analyzer/
# ============================================
WORKDIR /app/audio-analyzer-clap
# Install CLAP Python dependencies
# Note: torch is large (~2GB) but required for CLAP embeddings
RUN pip3 install --no-cache-dir --break-system-packages \
'laion-clap>=1.1.4' \
'torch>=2.0.0' \
'torchaudio>=2.0.0' \
'torchvision>=0.15.0' \
'librosa>=0.10.0' \
'transformers>=4.30.0' \
'pgvector>=0.2.0' \
'python-dotenv>=1.0.0' \
'requests>=2.31.0'
# Copy CLAP analyzer script
COPY services/audio-analyzer-clap/analyzer.py /app/audio-analyzer-clap/
# Pre-download CLAP model (~600MB) during build to avoid runtime download
# The analyzer expects the model at /app/models/music_audioset_epoch_15_esc_90.14.pt
RUN echo "Downloading CLAP model for vibe similarity..." && \
curl -L --retry 3 --retry-delay 5 --connect-timeout 30 --max-time 300 -o /app/models/music_audioset_epoch_15_esc_90.14.pt \
"https://huggingface.co/lukewys/laion_clap/resolve/main/music_audioset_epoch_15_esc_90.14.pt" && \
echo "CLAP model downloaded successfully" && \
ls -lh /app/models/music_audioset_epoch_15_esc_90.14.pt
# Create database readiness check script
RUN cat > /app/wait-for-db.sh << 'EOF'
#!/bin/bash
@@ -166,7 +159,9 @@ RUN npx prisma generate
# Copy backend source and build
COPY backend/src ./src
COPY backend/tsconfig.json ./
RUN npm run build
RUN npm run build && \
npm prune --production && \
rm -rf src tests __tests__ tsconfig*.json
COPY backend/docker-entrypoint.sh ./
COPY backend/healthcheck.js ./healthcheck-backend.js
@@ -275,7 +270,7 @@ stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
environment=DATABASE_URL="postgresql://kima:kima@localhost:5432/kima",REDIS_URL="redis://localhost:6379",MUSIC_PATH="/music",BATCH_SIZE="10",SLEEP_INTERVAL="5",MAX_ANALYZE_SECONDS="90",BRPOP_TIMEOUT="30",MODEL_IDLE_TIMEOUT="300",NUM_WORKERS="2",THREADS_PER_WORKER="1",LD_LIBRARY_PATH="/opt/cudnn8/nvidia/cudnn/lib:/usr/local/lib/python3.11/dist-packages/nvidia/cublas/lib:/usr/local/lib/python3.11/dist-packages/nvidia/cufft/lib:/usr/local/lib/python3.11/dist-packages/nvidia/cuda_runtime/lib:/usr/local/lib/python3.11/dist-packages/nvidia/cuda_nvrtc/lib:/usr/local/lib/python3.11/dist-packages/nvidia/cusolver/lib:/usr/local/lib/python3.11/dist-packages/nvidia/cusparse/lib:/usr/local/lib/python3.11/dist-packages/nvidia/nccl/lib"
environment=DATABASE_URL="postgresql://kima:kima@localhost:5432/kima",REDIS_URL="redis://localhost:6379",MUSIC_PATH="/music",BATCH_SIZE="10",SLEEP_INTERVAL="5",MAX_ANALYZE_SECONDS="90",BRPOP_TIMEOUT="30",MODEL_IDLE_TIMEOUT="300",NUM_WORKERS="2",THREADS_PER_WORKER="1"
priority=50
[program:audio-analyzer-clap]
+89 -76
View File
@@ -1,19 +1,18 @@
{
"name": "kima-backend",
"version": "1.5.6",
"version": "1.5.7",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "kima-backend",
"version": "1.5.6",
"version": "1.5.7",
"license": "GPL-3.0",
"dependencies": {
"@bull-board/api": "^6.14.2",
"@bull-board/express": "^6.14.2",
"@ffmpeg-installer/ffmpeg": "^1.1.0",
"@prisma/client": "^5.22.0",
"@types/bull": "^3.15.9",
"@types/fluent-ffmpeg": "^2.1.28",
"@types/node-cron": "^3.0.11",
"@types/qrcode": "^1.5.6",
@@ -22,7 +21,7 @@
"@types/swagger-ui-express": "^4.1.8",
"axios": "^1.6.2",
"bcrypt": "^6.0.0",
"bull": "^4.16.5",
"bullmq": "^5.70.0",
"connect-redis": "^7.1.0",
"cors": "^2.8.5",
"date-fns": "^4.1.0",
@@ -35,6 +34,7 @@
"fluent-ffmpeg": "^2.1.3",
"fuzzball": "^2.2.3",
"helmet": "^7.1.0",
"ioredis": "^5.9.3",
"jsonwebtoken": "^9.0.2",
"music-metadata": "^11.10.0",
"node-cron": "^4.2.1",
@@ -2996,16 +2996,6 @@
"@types/node": "*"
}
},
"node_modules/@types/bull": {
"version": "3.15.9",
"resolved": "https://registry.npmjs.org/@types/bull/-/bull-3.15.9.tgz",
"integrity": "sha512-MPUcyPPQauAmynoO3ezHAmCOhbB0pWmYyijr/5ctaCqhbKWsjW0YCod38ZcLzUBprosfZ9dPqfYIcfdKjk7RNQ==",
"license": "MIT",
"dependencies": {
"@types/ioredis": "*",
"@types/redis": "^2.8.0"
}
},
"node_modules/@types/cacheable-request": {
"version": "6.0.3",
"resolved": "https://registry.npmjs.org/@types/cacheable-request/-/cacheable-request-6.0.3.tgz",
@@ -3092,15 +3082,6 @@
"integrity": "sha512-r8Tayk8HJnX0FztbZN7oVqGccWgw98T/0neJphO91KkmOzug1KkofZURD4UaD5uH8AqcFLfdPErnBod0u71/qg==",
"license": "MIT"
},
"node_modules/@types/ioredis": {
"version": "4.28.10",
"resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.28.10.tgz",
"integrity": "sha512-69LyhUgrXdgcNDv7ogs1qXZomnfOEnSmrmMFqKgt1XMJxmoOSG/u3wYy13yACIfKuMJ8IhKgHafDO3sx19zVQQ==",
"license": "MIT",
"dependencies": {
"@types/node": "*"
}
},
"node_modules/@types/istanbul-lib-coverage": {
"version": "2.0.6",
"resolved": "https://registry.npmjs.org/@types/istanbul-lib-coverage/-/istanbul-lib-coverage-2.0.6.tgz",
@@ -3214,15 +3195,6 @@
"integrity": "sha512-hKormJbkJqzQGhziax5PItDUTMAM9uE2XXQmM37dyd4hVM+5aVl7oVxMVUiVQn2oCQFN/LKCZdvSM0pFRqbSmQ==",
"license": "MIT"
},
"node_modules/@types/redis": {
"version": "2.8.32",
"resolved": "https://registry.npmjs.org/@types/redis/-/redis-2.8.32.tgz",
"integrity": "sha512-7jkMKxcGq9p242exlbsVzuJb57KqHRhNl4dHoQu2Y5v9bCAbtIXXH0R3HleSQW4CTOqpHIYUW3t6tpUj4BVQ+w==",
"license": "MIT",
"dependencies": {
"@types/node": "*"
}
},
"node_modules/@types/responselike": {
"version": "1.0.3",
"resolved": "https://registry.npmjs.org/@types/responselike/-/responselike-1.0.3.tgz",
@@ -3987,22 +3959,88 @@
"integrity": "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==",
"license": "MIT"
},
"node_modules/bull": {
"version": "4.16.5",
"resolved": "https://registry.npmjs.org/bull/-/bull-4.16.5.tgz",
"integrity": "sha512-lDsx2BzkKe7gkCYiT5Acj02DpTwDznl/VNN7Psn7M3USPG7Vs/BaClZJJTAG+ufAR9++N1/NiUTdaFBWDIl5TQ==",
"node_modules/bullmq": {
"version": "5.70.0",
"resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.70.0.tgz",
"integrity": "sha512-HlBSEJqG7MJ97+d/N/8rtGOcpisjGP3WD/zaXZia0hsmckJqAPTVWN6Yfw32FVfVSUVVInZQ2nUgMd2zCRghKg==",
"license": "MIT",
"dependencies": {
"cron-parser": "^4.9.0",
"get-port": "^5.1.1",
"ioredis": "^5.3.2",
"lodash": "^4.17.21",
"msgpackr": "^1.11.2",
"semver": "^7.5.2",
"uuid": "^8.3.0"
"cron-parser": "4.9.0",
"ioredis": "5.9.2",
"msgpackr": "1.11.5",
"node-abort-controller": "3.1.1",
"semver": "7.7.4",
"tslib": "2.8.1",
"uuid": "11.1.0"
}
},
"node_modules/bullmq/node_modules/debug": {
"version": "4.4.3",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.4.3.tgz",
"integrity": "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA==",
"license": "MIT",
"dependencies": {
"ms": "^2.1.3"
},
"engines": {
"node": ">=12"
"node": ">=6.0"
},
"peerDependenciesMeta": {
"supports-color": {
"optional": true
}
}
},
"node_modules/bullmq/node_modules/ioredis": {
"version": "5.9.2",
"resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.9.2.tgz",
"integrity": "sha512-tAAg/72/VxOUW7RQSX1pIxJVucYKcjFjfvj60L57jrZpYCHC3XN0WCQ3sNYL4Gmvv+7GPvTAjc+KSdeNuE8oWQ==",
"license": "MIT",
"dependencies": {
"@ioredis/commands": "1.5.0",
"cluster-key-slot": "^1.1.0",
"debug": "^4.3.4",
"denque": "^2.1.0",
"lodash.defaults": "^4.2.0",
"lodash.isarguments": "^3.1.0",
"redis-errors": "^1.2.0",
"redis-parser": "^3.0.0",
"standard-as-callback": "^2.1.0"
},
"engines": {
"node": ">=12.22.0"
},
"funding": {
"type": "opencollective",
"url": "https://opencollective.com/ioredis"
}
},
"node_modules/bullmq/node_modules/ms": {
"version": "2.1.3",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz",
"integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==",
"license": "MIT"
},
"node_modules/bullmq/node_modules/msgpackr": {
"version": "1.11.5",
"resolved": "https://registry.npmjs.org/msgpackr/-/msgpackr-1.11.5.tgz",
"integrity": "sha512-UjkUHN0yqp9RWKy0Lplhh+wlpdt9oQBYgULZOiFhV3VclSF1JnSQWZ5r9gORQlNYaUKQoR8itv7g7z1xDDuACA==",
"license": "MIT",
"optionalDependencies": {
"msgpackr-extract": "^3.0.2"
}
},
"node_modules/bullmq/node_modules/uuid": {
"version": "11.1.0",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-11.1.0.tgz",
"integrity": "sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==",
"funding": [
"https://github.com/sponsors/broofa",
"https://github.com/sponsors/ctavan"
],
"license": "MIT",
"bin": {
"uuid": "dist/esm/bin/uuid"
}
},
"node_modules/bytes": {
@@ -5384,18 +5422,6 @@
"node": ">=8.0.0"
}
},
"node_modules/get-port": {
"version": "5.1.1",
"resolved": "https://registry.npmjs.org/get-port/-/get-port-5.1.1.tgz",
"integrity": "sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==",
"license": "MIT",
"engines": {
"node": ">=8"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/get-proto": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/get-proto/-/get-proto-1.0.1.tgz",
@@ -7172,15 +7198,6 @@
"integrity": "sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==",
"license": "MIT"
},
"node_modules/msgpackr": {
"version": "1.11.8",
"resolved": "https://registry.npmjs.org/msgpackr/-/msgpackr-1.11.8.tgz",
"integrity": "sha512-bC4UGzHhVvgDNS7kn9tV8fAucIYUBuGojcaLiz7v+P63Lmtm0Xeji8B/8tYKddALXxJLpwIeBmUN3u64C4YkRA==",
"license": "MIT",
"optionalDependencies": {
"msgpackr-extract": "^3.0.2"
}
},
"node_modules/msgpackr-extract": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/msgpackr-extract/-/msgpackr-extract-3.0.3.tgz",
@@ -7296,6 +7313,12 @@
"dev": true,
"license": "MIT"
},
"node_modules/node-abort-controller": {
"version": "3.1.1",
"resolved": "https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.1.1.tgz",
"integrity": "sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==",
"license": "MIT"
},
"node_modules/node-addon-api": {
"version": "8.5.0",
"resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-8.5.0.tgz",
@@ -9078,8 +9101,7 @@
"version": "2.8.1",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz",
"integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==",
"license": "0BSD",
"optional": true
"license": "0BSD"
},
"node_modules/tsx": {
"version": "4.21.0",
@@ -9309,15 +9331,6 @@
"node": ">= 0.4.0"
}
},
"node_modules/uuid": {
"version": "8.3.2",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz",
"integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==",
"license": "MIT",
"bin": {
"uuid": "dist/bin/uuid"
}
},
"node_modules/v8-to-istanbul": {
"version": "9.3.0",
"resolved": "https://registry.npmjs.org/v8-to-istanbul/-/v8-to-istanbul-9.3.0.tgz",
+3 -3
View File
@@ -1,6 +1,6 @@
{
"name": "kima-backend",
"version": "1.5.6",
"version": "1.5.7",
"description": "Kima backend API server",
"license": "GPL-3.0",
"repository": {
@@ -25,7 +25,6 @@
"@bull-board/express": "^6.14.2",
"@ffmpeg-installer/ffmpeg": "^1.1.0",
"@prisma/client": "^5.22.0",
"@types/bull": "^3.15.9",
"@types/fluent-ffmpeg": "^2.1.28",
"@types/node-cron": "^3.0.11",
"@types/qrcode": "^1.5.6",
@@ -34,7 +33,7 @@
"@types/swagger-ui-express": "^4.1.8",
"axios": "^1.6.2",
"bcrypt": "^6.0.0",
"bull": "^4.16.5",
"bullmq": "^5.70.0",
"connect-redis": "^7.1.0",
"cors": "^2.8.5",
"date-fns": "^4.1.0",
@@ -47,6 +46,7 @@
"fluent-ffmpeg": "^2.1.3",
"fuzzball": "^2.2.3",
"helmet": "^7.1.0",
"ioredis": "^5.9.3",
"jsonwebtoken": "^9.0.2",
"music-metadata": "^11.10.0",
"node-cron": "^4.2.1",
@@ -0,0 +1,2 @@
-- CreateIndex
CREATE INDEX IF NOT EXISTS "Album_title_idx" ON "Album"("title");
@@ -0,0 +1,2 @@
-- AlterTable: allow targetMbid to be null for direct search downloads
ALTER TABLE "DownloadJob" ALTER COLUMN "targetMbid" DROP NOT NULL;
@@ -0,0 +1,11 @@
-- CreateIndex
CREATE INDEX IF NOT EXISTS "Artist_enrichmentStatus_idx" ON "Artist"("enrichmentStatus");
-- CreateIndex
CREATE INDEX IF NOT EXISTS "Track_analysisStatus_idx" ON "Track"("analysisStatus");
-- CreateIndex
CREATE INDEX IF NOT EXISTS "Track_vibeAnalysisStatus_idx" ON "Track"("vibeAnalysisStatus");
-- CreateIndex
CREATE INDEX IF NOT EXISTS "Track_analysisStatus_vibeAnalysisStatus_idx" ON "Track"("analysisStatus", "vibeAnalysisStatus");
@@ -0,0 +1,9 @@
-- Fix NULL lastfmTags so the enrichment orchestrator can find them.
-- New tracks were created with NULL because no default was set on the column.
-- The mood-tags phase queries `= '{}'` which never matches NULL in PostgreSQL.
-- Fix existing rows
UPDATE "Track" SET "lastfmTags" = '{}' WHERE "lastfmTags" IS NULL;
-- Prevent future NULLs
ALTER TABLE "Track" ALTER COLUMN "lastfmTags" SET DEFAULT '{}';
+10 -2
View File
@@ -28,6 +28,8 @@ model Album {
originalYear Int?
artist Artist @relation(fields: [artistId], references: [id], onDelete: Cascade)
tracks Track[]
@@index([title])
}
model ApiKey {
@@ -66,6 +68,8 @@ model Artist {
ownedAlbums OwnedAlbum[]
similarFrom SimilarArtist[] @relation("FromArtist")
similarTo SimilarArtist[] @relation("ToArtist")
@@index([enrichmentStatus])
}
model Audiobook {
@@ -224,7 +228,7 @@ model DownloadJob {
userId String
subject String
type String
targetMbid String
targetMbid String?
status String
error String?
lidarrRef String?
@@ -603,7 +607,7 @@ model Track {
danceabilityMl Float?
moodTags String[]
essentiaGenres String[]
lastfmTags String[]
lastfmTags String[] @default([])
analysisStatus String @default("pending")
analysisVersion String?
analysisMode String?
@@ -629,6 +633,10 @@ model Track {
trackGenres TrackGenre[]
transcodedFiles TranscodedFile[]
trackEmbedding TrackEmbedding?
@@index([analysisStatus])
@@index([vibeAnalysisStatus])
@@index([analysisStatus, vibeAnalysisStatus])
}
model TrackGenre {
+11 -4
View File
@@ -308,24 +308,31 @@ app.listen(config.port, "0.0.0.0", async () => {
const { initializeMusicConfig } = await import("./config");
await initializeMusicConfig();
// Initialize Bull queue workers
// Initialize BullMQ workers
await import("./workers");
// Set up Bull Board dashboard
const { createBullBoard } = await import("@bull-board/api");
const { BullAdapter } = await import("@bull-board/api/bullAdapter");
const { BullMQAdapter } = await import("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = await import("@bull-board/express");
const { scanQueue, discoverQueue } = await import(
"./workers/queues"
);
const { artistQueue, trackQueue, vibeQueue, podcastQueue } = await import(
"./workers/enrichmentQueues"
);
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath("/api/admin/queues");
createBullBoard({
queues: [
new BullAdapter(scanQueue),
new BullAdapter(discoverQueue),
new BullMQAdapter(scanQueue),
new BullMQAdapter(discoverQueue),
new BullMQAdapter(artistQueue),
new BullMQAdapter(trackQueue),
new BullMQAdapter(vibeQueue),
new BullMQAdapter(podcastQueue),
],
serverAdapter,
});
+5 -3
View File
@@ -2,6 +2,8 @@ import crypto from 'crypto'
import EventEmitter from 'events'
import net from 'net'
import stream from 'stream'
import { logger } from '../../utils/logger'
import type TypedEventEmitter from 'typed-emitter'
import type { Address } from './common'
@@ -224,7 +226,7 @@ export class SlskClient extends (EventEmitter as new () => TypedEventEmitter<Sls
d.username === msg.username && downloadHasToken(d) && d.token === token
)
if (!download_) {
console.error('No download found for', msg)
logger.error(`[Soulseek] No download found for ${JSON.stringify(msg)}`)
conn.end()
return
}
@@ -323,7 +325,7 @@ export class SlskClient extends (EventEmitter as new () => TypedEventEmitter<Sls
)
if (existingDownloadIndex === -1) {
console.error('No download found for', msg)
logger.error(`[Soulseek] No download found for ${JSON.stringify(msg)}`)
return
}
@@ -348,7 +350,7 @@ export class SlskClient extends (EventEmitter as new () => TypedEventEmitter<Sls
)
if (existingDownloadIndex === -1) {
console.error('No download found for', msg)
logger.error(`[Soulseek] No download found for ${JSON.stringify(msg)}`)
return
}
+3 -1
View File
@@ -3,6 +3,8 @@ import type { Server, Socket } from 'net'
import net from 'net'
import type TypedEventEmitter from 'typed-emitter'
import { logger } from '../../utils/logger'
import type { Address } from './common'
import type { FromPeerInitMessage } from './messages/from/peer-init'
import { fromPeerInitMessageParser } from './messages/from/peer-init'
@@ -39,7 +41,7 @@ export class SlskListen extends (EventEmitter as new () => TypedEventEmitter<Sls
this.emit('message', data, { host, port }, c)
}
} catch (error) {
console.error('Failed to parse peer init message', error)
logger.error(`[Soulseek] Failed to parse peer init message: ${error}`)
}
})
})
+2 -1
View File
@@ -3,6 +3,7 @@ import type { Socket } from 'net'
import net from 'net'
import type TypedEventEmitter from 'typed-emitter'
import { logger } from '../../utils/logger'
import type { Address } from './common'
import type { FromPeerMessage } from './messages/from/peer'
import { fromPeerMessageParser } from './messages/from/peer'
@@ -53,7 +54,7 @@ export class SlskPeer extends (EventEmitter as new () => TypedEventEmitter<SlskP
this.emit('message', data)
}
} catch (error) {
console.error('Failed to parse peer message', error)
logger.error(`[Soulseek] Failed to parse peer message: ${error}`)
}
})
}
+3 -1
View File
@@ -3,6 +3,8 @@ import type { Socket } from 'net'
import net from 'net'
import type TypedEventEmitter from 'typed-emitter'
import { logger } from '../../utils/logger'
import type { Address } from './common'
import type { FromServerMessage } from './messages/from/server'
import { fromServerMessageParser } from './messages/from/server'
@@ -45,7 +47,7 @@ export class SlskServer extends (EventEmitter as new () => TypedEventEmitter<Sls
this.emit('message', data)
}
} catch (error) {
console.error('Failed to parse server message', error)
logger.error(`[Soulseek] Failed to parse server message: ${error}`)
}
})
}
+24 -24
View File
@@ -5,13 +5,13 @@ import { redisClient } from "../utils/redis";
import { requireAuth, requireAdmin } from "../middleware/auth";
import { getSystemSettings, invalidateSystemSettingsCache } from "../utils/systemSettings";
import { enrichmentFailureService } from "../services/enrichmentFailureService";
import { vibeQueue } from "../workers/enrichmentQueues";
import os from "os";
const router = Router();
// Redis queue key for audio analysis
// Redis queue key for audio analysis (Essentia uses raw Redis BRPOP, not BullMQ)
const ANALYSIS_QUEUE = "audio:analysis:queue";
const VIBE_QUEUE = "audio:clap:queue";
/**
* GET /api/analysis/status
@@ -130,6 +130,7 @@ router.post("/retry-failed", requireAuth, requireAdmin, async (req, res) => {
data: {
analysisStatus: "pending",
analysisError: null,
analysisRetryCount: 0,
},
});
@@ -504,13 +505,16 @@ router.post("/vibe/start", requireAuth, requireAdmin, async (req, res) => {
logger.info("Cleared all vibe embeddings for re-generation");
}
// Find tracks without vibe embeddings (all tracks if force was used)
// Find analyzed tracks without vibe embeddings (all tracks if force was used).
// Requires analysisStatus = "completed" to match the reactive pub/sub path
// in audioCompletionSubscriber.ts.
const tracks = await prisma.$queryRaw<{ id: string; filePath: string; duration: number; title: string }[]>`
SELECT t.id, t."filePath", t.duration, t.title
FROM "Track" t
LEFT JOIN track_embeddings te ON t.id = te.track_id
WHERE te.track_id IS NULL
AND t."filePath" IS NOT NULL
AND t."analysisStatus" = 'completed'
ORDER BY t."fileModified" DESC
LIMIT ${limit}
`;
@@ -522,16 +526,14 @@ router.post("/vibe/start", requireAuth, requireAdmin, async (req, res) => {
});
}
// Queue tracks for CLAP embedding
const pipeline = redisClient.multi();
for (const track of tracks) {
pipeline.rPush(VIBE_QUEUE, JSON.stringify({
trackId: track.id,
filePath: track.filePath,
duration: track.duration,
}));
}
await pipeline.exec();
// Queue tracks for CLAP embedding via BullMQ (jobId deduplication)
await vibeQueue.addBulk(
tracks.map((track) => ({
name: "embed",
data: { trackId: track.id, filePath: track.filePath, duration: track.duration },
opts: { jobId: `vibe-${track.id}` },
})),
);
// Clear any existing vibe failures for these tracks
for (const track of tracks) {
@@ -577,22 +579,20 @@ router.post("/vibe/retry", requireAuth, requireAdmin, async (req, res) => {
select: { id: true, filePath: true, duration: true, title: true },
});
// Reset Track-level retry counts so queueVibeEmbeddings can pick them up again
// Reset Track-level retry counts so the vibe queue can pick them up again
await prisma.track.updateMany({
where: { id: { in: trackIds } },
data: { vibeAnalysisStatus: null, vibeAnalysisRetryCount: 0, vibeAnalysisStatusUpdatedAt: null },
});
// Queue for retry
const pipeline = redisClient.multi();
for (const track of tracks) {
pipeline.rPush(VIBE_QUEUE, JSON.stringify({
trackId: track.id,
filePath: track.filePath,
duration: track.duration,
}));
}
await pipeline.exec();
// Queue for retry via BullMQ (jobId deduplication)
await vibeQueue.addBulk(
tracks.map((track) => ({
name: "embed",
data: { trackId: track.id, filePath: track.filePath, duration: track.duration },
opts: { jobId: `vibe-${track.id}` },
})),
);
// Reset EnrichmentFailure retry counts
await enrichmentFailureService.resetRetryCount(failures.map(f => f.id));
+7 -2
View File
@@ -1,4 +1,5 @@
import { Router } from "express";
import { withRetry } from "../utils/async";
import { logger } from "../utils/logger";
import { requireAuthOrToken } from "../middleware/auth";
import { spotifyService } from "../services/spotify";
@@ -117,7 +118,7 @@ router.get("/playlists/search", async (req, res) => {
router.get("/playlists/:id", async (req, res) => {
try {
const { id } = req.params;
const playlist = await deezerService.getPlaylist(id);
const playlist = await withRetry(() => deezerService.getPlaylist(id));
if (!playlist) {
return res.status(404).json({ error: "Playlist not found" });
@@ -130,7 +131,11 @@ router.get("/playlists/:id", async (req, res) => {
});
} catch (error: any) {
logger.error("Playlist fetch error:", error);
res.status(500).json({ error: error.message || "Failed to fetch playlist" });
const isNetworkError = ["ECONNRESET", "ETIMEDOUT", "ECONNREFUSED"].includes(error.code);
const userMessage = isNetworkError
? "Deezer API is temporarily unavailable. Please try again in a moment."
: (error.message || "Failed to fetch playlist");
res.status(500).json({ error: userMessage });
}
});
+83 -12
View File
@@ -149,12 +149,12 @@ router.delete("/batch", async (req, res) => {
logger.info(`[Discover Cancel] Marked batch ${batch.id} as completed`);
}
// Find and cancel any active Bull queue jobs for this user
// Find and cancel any active BullMQ jobs for this user
const activeJobs = await discoverQueue.getActive();
for (const job of activeJobs) {
if (job.data.userId === userId) {
try {
await job.moveToFailed({ message: "Cancelled by user" }, true);
await job.moveToFailed(new Error("Cancelled by user"), "");
jobsCancelled++;
logger.info(`[Discover Cancel] Cancelled Bull job ${job.id}`);
} catch (jobError) {
@@ -193,7 +193,7 @@ router.delete("/batch", async (req, res) => {
}
});
// POST /discover/generate - Generate new Discover Weekly playlist (using Bull queue)
// POST /discover/generate - Generate new Discover Weekly playlist
router.post("/generate", async (req, res) => {
try {
const userId = req.user!.id;
@@ -297,7 +297,7 @@ router.get("/generate/status/:jobId", async (req, res) => {
}
const state = await job.getState();
const progress = job.progress();
const progress = job.progress;
const result = job.returnvalue;
res.json({
@@ -827,6 +827,77 @@ router.post("/retry-unavailable", async (req, res) => {
return res.json({ success: true, queued: 0, message: "No unavailable albums to retry" });
}
// Apply the same filtering used by GET /current to exclude albums that are
// already in the library. Without this, the retry would re-download albums
// the user already has, importing duplicate tracks.
// Filter 1: albums with a successful DiscoveryAlbum record this week
const successfulDiscovery = await prisma.discoveryAlbum.findMany({
where: {
userId,
weekStartDate: weekStart,
status: { in: ["ACTIVE", "LIKED"] },
},
select: { rgMbid: true },
});
const successfulMbids = new Set(successfulDiscovery.map((da) => da.rgMbid));
// Filter 2: albums already present in the library by MBID
const allMbids = unavailableAlbums.map((a) => a.albumMbid).filter(Boolean);
const existingByMbid = new Set(
(await prisma.album.findMany({
where: { rgMbid: { in: allMbids } },
select: { rgMbid: true },
})).map((a) => a.rgMbid)
);
// Filter 3: fuzzy title+artist match against the full library
const libraryAlbums = await prisma.album.findMany({
select: { title: true, artist: { select: { name: true } } },
});
const libraryIndex = libraryAlbums.map((a) => ({
title: a.title.toLowerCase().trim(),
artist: a.artist.name.toLowerCase().trim(),
}));
const staleIds: string[] = [];
const retriableAlbums = unavailableAlbums.filter((album) => {
if (successfulMbids.has(album.albumMbid)) {
staleIds.push(album.id);
return false;
}
if (existingByMbid.has(album.albumMbid)) {
staleIds.push(album.id);
return false;
}
const normalizedArtist = album.artistName.toLowerCase().trim();
const normalizedTitle = album.albumTitle
.toLowerCase()
.replace(/\(.*?\)/g, "")
.replace(/\[.*?\]/g, "")
.trim();
const existsInLibrary = libraryIndex.some(
(lib) =>
lib.title.includes(normalizedTitle) &&
lib.artist.includes(normalizedArtist)
);
if (existsInLibrary) {
staleIds.push(album.id);
return false;
}
return true;
});
// Delete stale UnavailableAlbum records so they don't reappear next retry
if (staleIds.length > 0) {
await prisma.unavailableAlbum.deleteMany({ where: { id: { in: staleIds } } });
logger.info(`[Discover Retry] Cleared ${staleIds.length} stale unavailable records (already in library)`);
}
if (retriableAlbums.length === 0) {
return res.json({ success: true, queued: 0, message: "All unavailable albums are already in your library" });
}
const settings = await getSystemSettings();
if (!settings?.musicPath) {
return res.status(400).json({ error: "Music path not configured" });
@@ -838,22 +909,22 @@ router.post("/retry-unavailable", async (req, res) => {
data: {
userId,
weekStart,
targetSongCount: unavailableAlbums.length,
targetSongCount: retriableAlbums.length,
status: "downloading",
totalAlbums: unavailableAlbums.length,
totalAlbums: retriableAlbums.length,
completedAlbums: 0,
failedAlbums: 0,
logs: [
{
timestamp: new Date().toISOString(),
level: "info",
message: `Retry: ${unavailableAlbums.length} unavailable albums`,
message: `Retry: ${retriableAlbums.length} unavailable albums`,
},
] as any,
},
});
for (const album of unavailableAlbums) {
for (const album of retriableAlbums) {
await tx.downloadJob.create({
data: {
userId,
@@ -888,13 +959,13 @@ router.post("/retry-unavailable", async (req, res) => {
return newBatch;
});
logger.info(`[Discover Retry] Created batch ${batch.id} with ${unavailableAlbums.length} albums`);
logger.info(`[Discover Retry] Created batch ${batch.id} with ${retriableAlbums.length} albums`);
res.json({
success: true,
queued: unavailableAlbums.length,
queued: retriableAlbums.length,
batchId: batch.id,
message: `Retrying ${unavailableAlbums.length} unavailable albums`,
message: `Retrying ${retriableAlbums.length} unavailable albums`,
});
// Process downloads in background using acquisition service
@@ -1529,7 +1600,7 @@ router.delete("/clear", async (req, res) => {
// Find completed jobs that didn't make the playlist AND aren't from liked artists
const extraJobs = completedJobs.filter((job) => {
// If MBID matches a discovery album, not an "extra"
if (discoveryMbids.has(job.targetMbid)) return false;
if (discoveryMbids.has(job.targetMbid!)) return false;
// If this job's artist has any LIKED albums, don't clean it up
const metadata = job.metadata as any;
+3 -2
View File
@@ -257,7 +257,7 @@ router.get("/scan/status/:jobId", async (req, res) => {
}
const state = await job.getState();
const progress = job.progress();
const progress = job.progress;
const result = job.returnvalue;
res.json({
@@ -650,7 +650,7 @@ router.get("/artists", async (req, res) => {
return {
id: artist.id,
mbid: artist.mbid,
mbid: artist.mbid?.startsWith("temp-") ? null : artist.mbid,
name: artist.name,
heroUrl: coverArt,
coverArt, // Alias for frontend consistency
@@ -1430,6 +1430,7 @@ router.get("/artists/:id", async (req, res) => {
res.json({
...artist,
mbid: artist.mbid?.startsWith("temp-") ? null : artist.mbid,
coverArt: heroUrl, // Use fetched hero image (falls back to artist.heroUrl)
bio: getArtistDisplaySummary(artist),
genres: getMergedGenres(artist),
+1 -1
View File
@@ -155,7 +155,7 @@ router.post("/register", async (req, res) => {
.status(400)
.json({ error: "Invalid request", details: err.errors });
}
logger.error("Registration error:", err);
logger.error("[ONBOARDING] Registration error:", { message: err.message, code: err.code, stack: err.stack });
res.status(500).json({ error: "Failed to create account" });
}
});
+48 -4
View File
@@ -11,6 +11,8 @@ import { soulseekService, SearchResult } from "../services/soulseek";
import { getSystemSettings } from "../utils/systemSettings";
import { randomUUID } from "crypto";
import { eventBus } from "../services/eventBus";
import { prisma } from "../utils/db";
import fs from "fs";
import path from "path";
import type { FileSearchResponse } from "../lib/soulseek/messages/from/peer";
import { FileAttribute } from "../lib/soulseek/messages/common";
@@ -353,6 +355,7 @@ router.post(
requireAuth,
requireSoulseekConfigured,
async (req, res) => {
let jobId: string | null = null;
try {
const { username, filepath, artist, title, album, filename } = req.body;
@@ -410,18 +413,47 @@ router.post(
score: 0,
};
const userId = req.user!.id;
const subject = `${resolvedArtist} - ${resolvedTitle}`;
// Create a DownloadJob so the activity tracker shows this download in progress
const job = await prisma.downloadJob.create({
data: {
userId,
subject,
type: "soulseek-search",
targetMbid: null,
status: "processing",
startedAt: new Date(),
},
});
jobId = job.id;
const result = await soulseekService.downloadTrack(match, destPath);
if (result.success) {
const userId = (req as any).user?.id || null;
try {
await fs.promises.access(destPath);
} catch {
logger.error(`[Soulseek] Download reported success but file missing: ${destPath}`);
await prisma.downloadJob.update({
where: { id: job.id },
data: { status: "failed", error: "File not written to disk", completedAt: new Date() },
});
return res.status(500).json({ success: false, error: "Download failed: file not written to disk" });
}
await prisma.downloadJob.update({
where: { id: job.id },
data: { status: "completed", completedAt: new Date() },
});
// Notify activity feed
eventBus.emit({
type: "download:complete",
userId,
payload: {
jobId: randomUUID(),
subject: `${resolvedArtist} - ${resolvedTitle}`,
jobId: job.id,
subject,
},
});
@@ -444,12 +476,22 @@ router.post(
message: "Download complete, scanning library...",
});
} else {
await prisma.downloadJob.update({
where: { id: job.id },
data: { status: "failed", error: result.error || "Download failed", completedAt: new Date() },
});
res.status(500).json({
success: false,
error: result.error || "Download failed",
});
}
} catch (error: any) {
if (jobId) {
await prisma.downloadJob.update({
where: { id: jobId },
data: { status: "failed", error: error.message, completedAt: new Date() },
}).catch(() => {});
}
logger.error("Soulseek download error:", error.message);
res.status(500).json({
error: "Download failed",
@@ -459,6 +501,8 @@ router.post(
},
);
/**
* POST /soulseek/disconnect
* Disconnect from Soulseek network
+7 -4
View File
@@ -1,4 +1,5 @@
import { Router } from "express";
import { withRetry } from "../utils/async";
import { logger } from "../utils/logger";
import { requireAuthOrToken } from "../middleware/auth";
import { z } from "zod";
@@ -149,7 +150,7 @@ router.post("/import", async (req, res) => {
.json({ error: "Invalid Deezer playlist URL" });
}
const playlistId = deezerMatch[1];
const deezerPlaylist = await deezerService.getPlaylist(playlistId);
const deezerPlaylist = await withRetry(() => deezerService.getPlaylist(playlistId));
if (!deezerPlaylist) {
return res
.status(404)
@@ -186,9 +187,11 @@ router.post("/import", async (req, res) => {
if (error.name === "ZodError") {
return res.status(400).json({ error: "Invalid request body" });
}
res.status(500).json({
error: error.message || "Failed to start import",
});
const isNetworkError = ["ECONNRESET", "ETIMEDOUT", "ECONNREFUSED"].includes(error.code);
const userMessage = isNetworkError
? "Deezer API is temporarily unavailable. Please try again in a moment."
: (error.message || "Failed to start import");
res.status(500).json({ error: userMessage });
}
});
+5
View File
@@ -88,6 +88,11 @@ class DataCacheService {
albumId: string,
rgMbid: string
): Promise<string | null> {
// Skip temp MBIDs - no cover art available for unscanned albums
if (!rgMbid || rgMbid.startsWith("temp-")) {
return null;
}
const cacheKey = `album-cover:${albumId}`;
// 1. Check DB first
+3 -3
View File
@@ -749,12 +749,12 @@ export class DiscoverWeeklyService {
userId_weekStartDate_albumMbid: {
userId: batch.userId,
weekStartDate: batch.weekStart,
albumMbid: job.targetMbid,
albumMbid: job.targetMbid!,
},
},
create: {
userId: batch.userId,
albumMbid: job.targetMbid,
albumMbid: job.targetMbid!,
artistName: metadata?.artistName || "Unknown",
albumTitle: metadata?.albumTitle || "Unknown",
similarity: metadata?.similarity || 0.5,
@@ -2072,7 +2072,7 @@ export class DiscoverWeeklyService {
where: { discoveryBatchId: batch.id },
});
for (const job of batchJobs) {
attemptedMbids.add(job.targetMbid);
if (job.targetMbid) attemptedMbids.add(job.targetMbid);
const jobMeta = job.metadata as any;
if (jobMeta?.artistMbid) {
attemptedArtistMbids.add(jobMeta.artistMbid);
+1 -1
View File
@@ -640,7 +640,7 @@ class DownloadQueueManager {
this.activeDownloads.set(job.lidarrRef || job.id, {
downloadId: job.lidarrRef || job.id,
albumTitle: job.subject,
albumMbid: job.targetMbid,
albumMbid: job.targetMbid ?? "",
artistName: metadata.artistName || "Unknown",
artistMbid: metadata.artistMbid,
albumId: metadata.lidarrAlbumId,
@@ -10,7 +10,7 @@ import { prisma } from "../utils/db";
export interface EnrichmentFailure {
id: string;
entityType: "artist" | "track" | "audio" | "vibe";
entityType: "artist" | "track" | "audio" | "vibe" | "podcast";
entityId: string;
entityName: string | null;
errorMessage: string | null;
@@ -27,7 +27,7 @@ export interface EnrichmentFailure {
}
export interface RecordFailureInput {
entityType: "artist" | "track" | "audio" | "vibe";
entityType: "artist" | "track" | "audio" | "vibe" | "podcast";
entityId: string;
entityName?: string;
errorMessage: string;
@@ -36,7 +36,7 @@ export interface RecordFailureInput {
}
export interface GetFailuresOptions {
entityType?: "artist" | "track" | "audio" | "vibe";
entityType?: "artist" | "track" | "audio" | "vibe" | "podcast";
includeSkipped?: boolean;
includeResolved?: boolean;
limit?: number;
@@ -252,7 +252,7 @@ class EnrichmentFailureService {
/**
* Clear all unresolved failures (optionally filtered by type)
*/
async clearAllFailures(entityType?: "artist" | "track" | "audio" | "vibe"): Promise<number> {
async clearAllFailures(entityType?: "artist" | "track" | "audio" | "vibe" | "podcast"): Promise<number> {
const where: any = {
resolved: false,
skipped: false,
@@ -357,6 +357,15 @@ class EnrichmentFailureService {
select: { id: true },
});
exists = !!track;
} else if (failure.entityType === "podcast") {
const podcast = await prisma.podcast.findUnique({
where: { id: failure.entityId },
select: { id: true },
});
exists = !!podcast;
} else {
// Unknown entity type — treat as existing to avoid silent deletion
exists = true;
}
if (!exists) {
+2 -2
View File
@@ -258,8 +258,8 @@ class EnrichmentStateService {
* Cleanup connections
*/
async disconnect(): Promise<void> {
await this.redis.quit();
await this.publisher.quit();
await this.redis.quit().catch(() => {});
await this.publisher.quit().catch(() => {});
}
}
-2
View File
@@ -242,7 +242,6 @@ class LidarrService {
): Promise<LidarrArtist[]> {
await this.ensureInitialized();
// DEBUG: Log exact parameters received
logger.debug(
`[LIDARR_SEARCH_ARTIST] artistName="${artistName}", mbid="${mbid}"`
);
@@ -392,7 +391,6 @@ class LidarrService {
): Promise<LidarrArtist | null> {
await this.ensureInitialized();
// DEBUG: Log exact parameters received
logger.debug(
`[LIDARR_ADD_ARTIST] artistName="${artistName}", mbid="${mbid}"`
);
+19
View File
@@ -759,6 +759,25 @@ export class MusicScannerService {
});
}
// Cross-artist fallback: if an album with the same title and year already
// exists under any artist in the library, reuse it to prevent splitting
// multi-artist / VA albums when albumartist tags are inconsistent.
// Guards: title must not be generic, year must be known (non-null).
if (!album && albumTitle !== "Unknown Album" && albumTitle !== "Unknown" && year !== null) {
album = await prisma.album.findFirst({
where: {
title: albumTitle,
year: year,
location: "LIBRARY",
},
});
if (album) {
logger.debug(
`[Scanner] Cross-artist album match: "${albumTitle}" (${year}) -> album ${album.id} (artist ${album.artistId})`,
);
}
}
if (!album) {
// Create new album (use a temporary MBID for now)
const rgMbid =
+3 -1
View File
@@ -251,7 +251,9 @@ async function performDownload(
if (episode?.fileSize && episode.fileSize > 0) {
expectedBytes = episode.fileSize;
}
} catch {}
} catch (err) {
logger.warn(`[PODCAST-DL] Could not fetch episode fileSize: ${(err as Error).message}`);
}
}
logger.debug(
+8 -3
View File
@@ -1,4 +1,5 @@
import { randomUUID } from "crypto";
import { withRetry } from "../utils/async";
import pLimit from "p-limit";
import { spotifyService, SpotifyTrack, SpotifyPlaylist } from "./spotify";
import { logger } from "../utils/logger";
@@ -3082,7 +3083,7 @@ class SpotifyImportService {
if (url.includes("deezer.com")) {
const deezerMatch = url.match(/playlist[\/:](\d+)/);
if (!deezerMatch) throw new Error("Invalid Deezer playlist URL");
const deezerPlaylist = await deezerService.getPlaylist(deezerMatch[1]);
const deezerPlaylist = await withRetry(() => deezerService.getPlaylist(deezerMatch[1]));
if (!deezerPlaylist) throw new Error("Deezer playlist not found");
eventBus.emit({
@@ -3115,15 +3116,19 @@ class SpotifyImportService {
});
} catch (error: any) {
logger?.error("[Preview Job] Failed:", error);
const isNetworkError = ["ECONNRESET", "ETIMEDOUT", "ECONNREFUSED"].includes(error.code);
const userMessage = isNetworkError
? "Deezer API is temporarily unavailable. Please try again in a moment."
: (error.message || "Import failed");
await redisClient.setEx(
PREVIEW_JOB_KEY(jobId),
PREVIEW_JOB_TTL,
JSON.stringify({ status: "failed", error: error.message, userId }),
JSON.stringify({ status: "failed", error: userMessage, userId }),
).catch((e) => logger.error("[Preview Job] Failed to persist error state to Redis:", e));
eventBus.emit({
type: "preview:complete",
userId,
payload: { jobId, error: error.message },
payload: { jobId, error: userMessage },
});
}
})().catch((e) => logger?.error("[Preview Job] Unhandled:", e));
+2 -1
View File
@@ -184,9 +184,10 @@ class StaleJobCleanupService {
// Clean completed jobs older than retention period
const completedCleaned = await queue.clean(
retentionMs,
1000,
"completed"
);
const failedCleaned = await queue.clean(retentionMs, "failed");
const failedCleaned = await queue.clean(retentionMs, 1000, "failed");
const queueCleaned =
completedCleaned.length + failedCleaned.length;
-146
View File
@@ -1,146 +0,0 @@
import { prisma } from "../utils/db";
import { logger } from "../utils/logger";
const STALE_THRESHOLD_MINUTES = 30; // Longer than audio analysis due to CLAP processing time
export const VIBE_MAX_RETRIES = 3;
class VibeAnalysisCleanupService {
/**
* Clean up tracks stuck in "processing" state for vibe embeddings
* Returns number of tracks reset
*/
async cleanupStaleProcessing(): Promise<{ reset: number }> {
const cutoff = new Date(Date.now() - STALE_THRESHOLD_MINUTES * 60 * 1000);
// Find tracks stuck in processing
const staleTracks = await prisma.track.findMany({
where: {
vibeAnalysisStatus: "processing",
OR: [
{ vibeAnalysisStatusUpdatedAt: { lt: cutoff } },
{
vibeAnalysisStatusUpdatedAt: null,
updatedAt: { lt: cutoff },
},
],
},
include: {
album: {
include: {
artist: { select: { name: true } },
},
},
},
});
if (staleTracks.length === 0) {
return { reset: 0 };
}
logger.debug(
`[VibeAnalysisCleanup] Found ${staleTracks.length} stale vibe tracks (processing > ${STALE_THRESHOLD_MINUTES} min)`
);
let resetCount: number = 0;
for (const track of staleTracks) {
const trackName = `${track.album.artist.name} - ${track.title}`;
const newRetryCount = (track.vibeAnalysisRetryCount ?? 0) + 1;
if (newRetryCount > VIBE_MAX_RETRIES) {
await prisma.track.update({
where: { id: track.id },
data: {
vibeAnalysisStatus: "failed",
vibeAnalysisError: `Exceeded ${VIBE_MAX_RETRIES} retry attempts`,
vibeAnalysisRetryCount: newRetryCount,
vibeAnalysisStatusUpdatedAt: new Date(),
},
});
logger.warn(`[VibeAnalysisCleanup] Permanently failed after ${VIBE_MAX_RETRIES} retries: ${trackName}`);
} else {
await prisma.track.update({
where: { id: track.id },
data: {
vibeAnalysisStatus: null,
vibeAnalysisRetryCount: newRetryCount,
vibeAnalysisStatusUpdatedAt: null,
},
});
logger.debug(`[VibeAnalysisCleanup] Reset for retry (${newRetryCount}/${VIBE_MAX_RETRIES}): ${trackName}`);
}
resetCount++;
}
return { reset: resetCount };
}
/**
* Clean up tracks where vibeAnalysisStatus='completed' but no embedding exists
* This fixes CLAP embedding stalls where tracks appear complete but have no embedding
*/
async cleanupOrphanedCompleted(options?: {
dryRun?: boolean;
batchSize?: number;
offset?: number;
}): Promise<{ reset: number; skipped: number; totalOrphaned: number }> {
const batchSize = options?.batchSize ?? 100;
const offset = options?.offset ?? 0;
const dryRun = options?.dryRun ?? false;
const RECENT_THRESHOLD_MS = 5 * 60 * 1000;
const recentCutoff = new Date(Date.now() - RECENT_THRESHOLD_MS);
try {
const orphanedTracks = await prisma.$queryRaw<{ id: string }[]>`
SELECT t.id
FROM "Track" t
LEFT JOIN "track_embeddings" te ON t.id = te.track_id
WHERE t."vibeAnalysisStatus" = 'completed'
AND te.track_id IS NULL
AND t."vibeAnalysisStatusUpdatedAt" < ${recentCutoff}
LIMIT ${batchSize} OFFSET ${offset}
`;
const totalOrphaned = orphanedTracks.length;
if (totalOrphaned === 0) {
return { reset: 0, skipped: 0, totalOrphaned: 0 };
}
logger.info(
`[VibeAnalysisCleanup] Found ${totalOrphaned} orphaned completed tracks (batchSize=${batchSize}, offset=${offset}, dryRun=${dryRun})`
);
if (dryRun) {
return { reset: 0, skipped: totalOrphaned, totalOrphaned };
}
let resetCount = 0;
await prisma.$transaction(async (tx) => {
const result = await tx.track.updateMany({
where: {
id: { in: orphanedTracks.map(t => t.id) },
},
data: {
vibeAnalysisStatus: null,
vibeAnalysisRetryCount: { increment: 1 },
vibeAnalysisStatusUpdatedAt: null,
vibeAnalysisError: "Orphaned: completed but no embedding found",
},
});
resetCount = result.count;
});
logger.info(`[VibeAnalysisCleanup] Reset ${resetCount} orphaned completed tracks`);
return { reset: resetCount, skipped: 0, totalOrphaned };
} catch (error) {
logger.error(`[VibeAnalysisCleanup] Error cleaning up orphaned completed: ${error}`);
return { reset: 0, skipped: 0, totalOrphaned: 0 };
}
}
}
export const vibeAnalysisCleanupService = new VibeAnalysisCleanupService();
+4
View File
@@ -5,6 +5,10 @@ import { join } from "path";
import { logger } from "../utils/logger";
import { VOCAB_DEFINITIONS, FeatureProfile, TermType } from "../data/featureProfiles";
// Force TypeScript to copy JSON to dist (without actually using it here - runtime reads the file directly)
import _vocabData from "../data/vibe-vocabulary.json";
void(_vocabData); // Prevent unused variable warning
export interface VocabTerm {
name: string;
type: TermType;
+17
View File
@@ -22,6 +22,23 @@ export function yieldToEventLoop(): Promise<void> {
return new Promise((resolve) => setImmediate(resolve));
}
/**
* Retry a function on transient network errors (ECONNRESET, ETIMEDOUT, AbortError)
* with linear backoff (2s, 4s, 6s). Non-retryable errors are re-thrown immediately.
*/
export async function withRetry<T>(fn: () => Promise<T>, attempts = 3, delayMs = 2000): Promise<T> {
for (let i = 0; i < attempts; i++) {
try {
return await fn();
} catch (err: any) {
const isRetryable = err.code === "ECONNRESET" || err.code === "ETIMEDOUT" || err.name === "AbortError";
if (!isRetryable || i === attempts - 1) throw err;
await new Promise(r => setTimeout(r, delayMs * (i + 1)));
}
}
throw new Error("unreachable");
}
/**
* Process items in batches with yielding between batches
* Checks abort signal to support early termination
+1 -1
View File
@@ -449,7 +449,7 @@ async function enrichAlbumCovers(
await Promise.all(
batch.map(async (album) => {
if (!album.rgMbid) return;
if (!album.rgMbid || album.rgMbid.startsWith("temp-")) return;
try {
const coverUrl = await coverArtService.getCoverArt(
@@ -0,0 +1,76 @@
import { Worker, Job } from "bullmq";
import { createWorkerConnection, QUEUE_NAMES } from "./enrichmentQueues";
import { enrichSimilarArtist } from "./artistEnrichment";
import { enrichmentFailureService } from "../services/enrichmentFailureService";
import { prisma } from "../utils/db";
import { logger } from "../utils/logger";
import { getSystemSettings } from "../utils/systemSettings";
export interface ArtistJobData {
artistId: string;
artistName: string;
}
export async function startArtistEnrichmentWorker(): Promise<Worker> {
const settings = await getSystemSettings();
const concurrency = settings?.enrichmentConcurrency ?? 3;
const worker = new Worker<ArtistJobData>(
QUEUE_NAMES.ARTISTS,
async (job: Job<ArtistJobData>) => {
const { artistId, artistName } = job.data;
logger.debug(`[ArtistWorker] Processing ${artistId} (${artistName})`);
const artist = await prisma.artist.findUnique({ where: { id: artistId } });
if (!artist) {
const err = new Error(`ENTITY_NOT_FOUND: Artist ${artistId} deleted`);
(err as any).entityNotFound = true;
throw err;
}
// enrichSimilarArtist handles its own final status update
// (sets "completed", "unresolvable", or "failed" internally)
await enrichSimilarArtist(artist);
},
{
connection: createWorkerConnection(),
concurrency,
lockDuration: 120000,
stalledInterval: 30000,
maxStalledCount: 2,
},
);
worker.on("failed", async (job, err) => {
if (!job) return;
const { artistId } = job.data;
const isEntityGone = (err as any).entityNotFound === true;
if (isEntityGone) {
logger.info(`[ArtistWorker] Artist ${artistId} no longer exists, resolving silently`);
return;
}
logger.error(`[ArtistWorker] Artist ${artistId} failed (attempt ${job.attemptsMade}): ${err.message}`);
const isFinalAttempt = job.attemptsMade >= (job.opts.attempts ?? 3);
if (isFinalAttempt) {
await prisma.artist
.update({
where: { id: artistId },
data: { enrichmentStatus: "pending" },
})
.catch(() => {});
await enrichmentFailureService.recordFailure({
entityType: "artist",
entityId: artistId,
errorMessage: err.message,
});
}
});
worker.on("error", (err) => logger.error(`[ArtistWorker] Error: ${err.message}`));
logger.info(`[ArtistWorker] Started (concurrency: ${concurrency})`);
return worker;
}
@@ -0,0 +1,132 @@
import Redis from "ioredis";
import { vibeQueue } from "./enrichmentQueues";
import { prisma } from "../utils/db";
import { config } from "../config";
import { logger } from "../utils/logger";
const CHANNEL = "audio:analysis:complete";
// Resume vibe embeddings after this many ms of Essentia silence.
// Keeps both ML models from loading simultaneously on low-RAM hosts.
const ESSENTIA_QUIET_MS = 30_000;
interface AudioCompletionEvent {
trackId: string;
filePath: string;
status: string;
}
let subscriber: Redis | null = null;
let quietTimer: ReturnType<typeof setTimeout> | null = null;
let vibePaused = false;
function pauseVibe(): void {
if (!vibePaused) {
vibePaused = true;
vibeQueue.pause().catch((err: Error) => {
logger.warn(`[AudioSub] Failed to pause vibe queue: ${err.message}`);
});
logger.debug("[AudioSub] Vibe queue paused (Essentia active)");
}
}
function scheduleVibeResume(): void {
if (quietTimer) clearTimeout(quietTimer);
quietTimer = setTimeout(() => {
quietTimer = null;
vibePaused = false;
vibeQueue.resume().catch((err: Error) => {
logger.warn(`[AudioSub] Failed to resume vibe queue: ${err.message}`);
});
logger.info("[AudioSub] Essentia idle — vibe queue resumed");
}, ESSENTIA_QUIET_MS);
}
export function startAudioCompletionSubscriber(): void {
// Resume in case a previous crash left the queue paused in Redis
vibeQueue.resume().catch(() => {});
subscriber = new Redis(config.redisUrl);
subscriber.subscribe(CHANNEL, (err) => {
if (err) {
logger.error(`[AudioSub] Subscribe failed: ${err.message}`);
return;
}
logger.info(`[AudioSub] Subscribed to ${CHANNEL}`);
});
subscriber.on("message", async (_channel, message) => {
let event: AudioCompletionEvent;
try {
event = JSON.parse(message);
} catch {
logger.warn(`[AudioSub] Invalid message: ${message}`);
return;
}
if (event.status !== "complete" || !event.trackId) return;
// Gate CLAP behind Essentia: pause vibe queue while Essentia is active,
// resume after ESSENTIA_QUIET_MS of silence. Prevents both ML models
// from loading simultaneously on low-RAM hosts.
pauseVibe();
scheduleVibeResume();
// Defensive guard: verify track analysisStatus in DB before queuing vibe.
// Protects against pub/sub messages from failed Essentia runs.
const track = await prisma.track
.findUnique({
where: { id: event.trackId },
select: { analysisStatus: true, vibeAnalysisStatus: true },
})
.catch(() => null);
if (!track) {
logger.warn(`[AudioSub] Track ${event.trackId} not found, skipping vibe queue`);
return;
}
if (track.analysisStatus !== "completed") {
logger.warn(
`[AudioSub] Track ${event.trackId} analysisStatus=${track.analysisStatus}, skipping vibe`,
);
return;
}
if (track.vibeAnalysisStatus === "completed") {
return; // Already has vibe embedding
}
try {
await vibeQueue.add(
"embed",
{ trackId: event.trackId, filePath: event.filePath },
{ jobId: `vibe-${event.trackId}` }, // dedup — no-op if already queued
);
logger.debug(`[AudioSub] Queued vibe job for track ${event.trackId}`);
} catch (err) {
logger.error(`[AudioSub] Failed to queue vibe job: ${(err as Error).message}`);
}
});
subscriber.on("error", (err) => {
logger.error(`[AudioSub] Redis error: ${err.message}`);
});
}
export async function stopAudioCompletionSubscriber(): Promise<void> {
if (quietTimer) {
clearTimeout(quietTimer);
quietTimer = null;
}
if (subscriber) {
await subscriber.unsubscribe(CHANNEL).catch(() => {});
await subscriber.quit().catch(() => {});
subscriber = null;
}
// Don't leave the vibe queue paused in Redis across restarts
if (vibePaused) {
vibePaused = false;
await vibeQueue.resume().catch(() => {});
}
}
+64
View File
@@ -0,0 +1,64 @@
import { Queue } from "bullmq";
import type { ConnectionOptions } from "bullmq";
import { config } from "../config";
function getConnectionOptions(): ConnectionOptions {
const url = new URL(config.redisUrl);
return {
host: url.hostname,
port: parseInt(url.port, 10) || 6379,
password: url.password || undefined,
maxRetriesPerRequest: null, // Required by BullMQ
enableReadyCheck: false,
};
}
// Queue names — BullMQ v5 forbids colons; use hyphens instead
export const QUEUE_NAMES = {
ARTISTS: "enrichment-artists",
TRACKS: "enrichment-tracks",
VIBE: "enrichment-vibe",
PODCASTS: "enrichment-podcasts",
} as const;
const DEFAULT_JOB_OPTIONS = {
attempts: 3,
backoff: { type: "exponential" as const, delay: 5000 },
removeOnComplete: { count: 100, age: 3600 },
removeOnFail: { count: 500, age: 86400 },
};
export const artistQueue = new Queue(QUEUE_NAMES.ARTISTS, {
connection: getConnectionOptions(),
defaultJobOptions: DEFAULT_JOB_OPTIONS,
});
export const trackQueue = new Queue(QUEUE_NAMES.TRACKS, {
connection: getConnectionOptions(),
defaultJobOptions: DEFAULT_JOB_OPTIONS,
});
// Vibe queue — consumed by the CLAP Python bullmq-python Worker (Phase 3)
export const vibeQueue = new Queue(QUEUE_NAMES.VIBE, {
connection: getConnectionOptions(),
defaultJobOptions: { ...DEFAULT_JOB_OPTIONS, attempts: 2 },
});
export const podcastQueue = new Queue(QUEUE_NAMES.PODCASTS, {
connection: getConnectionOptions(),
defaultJobOptions: DEFAULT_JOB_OPTIONS,
});
// Factory for Worker connection options — each BullMQ Worker must have its own connection
export function createWorkerConnection(): ConnectionOptions {
return getConnectionOptions();
}
export async function closeEnrichmentQueues(): Promise<void> {
await Promise.all([
artistQueue.close(),
trackQueue.close(),
vibeQueue.close(),
podcastQueue.close(),
]);
}
+38 -25
View File
@@ -1,3 +1,4 @@
import { Worker } from "bullmq";
import { logger } from "../utils/logger";
import {
scanQueue,
@@ -5,6 +6,7 @@ import {
} from "./queues";
import { processScan } from "./processors/scanProcessor";
import { processDiscoverWeekly } from "./processors/discoverProcessor";
import { createWorkerConnection } from "./enrichmentQueues";
import {
startUnifiedEnrichmentWorker,
stopUnifiedEnrichmentWorker,
@@ -31,9 +33,18 @@ import { enrichmentStateService } from "../services/enrichmentState";
// Track timeouts for cleanup
const timeouts: NodeJS.Timeout[] = [];
// Register processors with named job types
scanQueue.process("scan", processScan);
discoverQueue.process("discover-weekly", processDiscoverWeekly);
// BullMQ workers for scan and discover queues
const scanWorker = new Worker("library-scan-v2", processScan, {
connection: createWorkerConnection(),
concurrency: 1,
lockDuration: 300000, // 5 minutes — scans can be slow
});
const discoverWorker = new Worker("discover-weekly-v2", processDiscoverWeekly, {
connection: createWorkerConnection(),
concurrency: 1,
lockDuration: 120000,
});
// Register download queue callback for unavailable albums
downloadQueueManager.onUnavailableAlbum(async (info) => {
@@ -93,23 +104,27 @@ startMoodBucketWorker().catch((err) => {
logger.error("Failed to start mood bucket worker:", err);
});
// Event handlers for scan queue
scanQueue.on("completed", (job, result) => {
// Event handlers for scan worker
scanWorker.on("completed", (job, result) => {
logger.debug(
`Scan job ${job.id} completed: +${result.tracksAdded} ~${result.tracksUpdated} -${result.tracksRemoved}`
);
});
scanQueue.on("failed", (job, err) => {
logger.error(`Scan job ${job.id} failed:`, err.message);
scanWorker.on("failed", (job, err) => {
logger.error(`Scan job ${job?.id} failed:`, err.message);
});
scanQueue.on("active", (job) => {
scanWorker.on("active", (job) => {
logger.debug(` Scan job ${job.id} started`);
});
// Event handlers for discover queue
discoverQueue.on("completed", (job, result) => {
scanWorker.on("error", (err) => {
logger.error("Scan worker error:", err.message);
});
// Event handlers for discover worker
discoverWorker.on("completed", (job, result) => {
if (result.success) {
logger.debug(
`Discover job ${job.id} completed: ${result.playlistName} (${result.songCount} songs)`
@@ -119,15 +134,19 @@ discoverQueue.on("completed", (job, result) => {
}
});
discoverQueue.on("failed", (job, err) => {
logger.error(`Discover job ${job.id} failed:`, err.message);
discoverWorker.on("failed", (job, err) => {
logger.error(`Discover job ${job?.id} failed:`, err.message);
});
discoverQueue.on("active", (job) => {
discoverWorker.on("active", (job) => {
logger.debug(` Discover job ${job.id} started for user ${job.data.userId}`);
});
logger.debug("Worker processors registered and event handlers attached");
discoverWorker.on("error", (err) => {
logger.error("Discover worker error:", err.message);
});
logger.debug("BullMQ workers registered and event handlers attached");
// Start Discovery Weekly cron scheduler (Sundays at 8 PM)
startDiscoverWeeklyCron();
@@ -320,8 +339,8 @@ timeouts.push(
export async function shutdownWorkers(): Promise<void> {
logger.debug("Shutting down workers...");
// Stop unified enrichment worker
stopUnifiedEnrichmentWorker();
// Stop unified enrichment worker (async — closes BullMQ workers and queues)
await stopUnifiedEnrichmentWorker();
// Disconnect enrichment state service Redis connections (2 connections)
try {
@@ -349,15 +368,9 @@ export async function shutdownWorkers(): Promise<void> {
}
timeouts.length = 0;
// Remove all event listeners to prevent memory leaks
scanQueue.removeAllListeners();
discoverQueue.removeAllListeners();
// Close all queues gracefully
await Promise.all([
scanQueue.close(),
discoverQueue.close(),
]);
// Close workers first so in-flight jobs complete, then close queues
await Promise.all([scanWorker.close(), discoverWorker.close()]);
await Promise.all([scanQueue.close(), discoverQueue.close()]);
logger.debug("Workers shutdown complete");
}
@@ -0,0 +1,56 @@
import { Worker, Job } from "bullmq";
import { createWorkerConnection, QUEUE_NAMES } from "./enrichmentQueues";
import { refreshPodcastFeed } from "../routes/podcasts";
import { enrichmentFailureService } from "../services/enrichmentFailureService";
import { logger } from "../utils/logger";
export interface PodcastJobData {
podcastId: string;
podcastTitle: string;
}
export function startPodcastEnrichmentWorker(): Worker {
const worker = new Worker<PodcastJobData>(
QUEUE_NAMES.PODCASTS,
async (job: Job<PodcastJobData>) => {
const { podcastId, podcastTitle } = job.data;
logger.debug(`[PodcastWorker] Processing ${podcastId} (${podcastTitle})`);
await refreshPodcastFeed(podcastId);
},
{
connection: createWorkerConnection(),
concurrency: 2,
lockDuration: 60000,
stalledInterval: 30000,
maxStalledCount: 2,
},
);
worker.on("failed", async (job, err) => {
if (!job) return;
const { podcastId, podcastTitle } = job.data;
logger.error(`[PodcastWorker] Podcast ${podcastId} failed (attempt ${job.attemptsMade}): ${err.message}`);
const isEntityGone = (err as any).entityNotFound === true;
if (isEntityGone) {
logger.info(`[PodcastWorker] Podcast ${podcastId} no longer exists, skipping failure record`);
return;
}
const isFinalAttempt = job.attemptsMade >= (job.opts.attempts ?? 3);
if (isFinalAttempt) {
await enrichmentFailureService
.recordFailure({
entityType: "podcast",
entityId: podcastId,
entityName: podcastTitle,
errorMessage: err.message,
})
.catch(() => {});
}
});
worker.on("error", (err) => logger.error(`[PodcastWorker] Error: ${err.message}`));
logger.info("[PodcastWorker] Started");
return worker;
}
@@ -1,4 +1,4 @@
import { Job } from "bull";
import type { Job } from "bullmq";
import { logger } from "../../utils/logger";
import { discoverWeeklyService } from "../../services/discoverWeekly";
import { eventBus } from "../../services/eventBus";
@@ -24,12 +24,12 @@ export async function processDiscoverWeekly(
`[DiscoverJob ${job.id}] Generating Discover Weekly for user ${userId}`
);
await job.progress(10);
await job.updateProgress(10);
try {
// Note: The discoverWeeklyService.generatePlaylist doesn't have progress callback yet
// For now, we'll just report progress at key stages
await job.progress(20); // Starting generation
await job.updateProgress(20); // Starting generation
logger.debug(
`[DiscoverJob ${job.id}] Calling discoverWeeklyService.generatePlaylist...`
@@ -51,7 +51,7 @@ export async function processDiscoverWeekly(
});
}
await job.progress(100); // Complete
await job.updateProgress(100); // Complete
logger.debug(
`[DiscoverJob ${job.id}] Generation complete: SUCCESS`
@@ -70,7 +70,7 @@ export async function processDiscoverWeekly(
);
logger.error(`[DiscoverJob ${job.id}] Stack trace:`, error.stack);
// Re-throw so Bull can track the failure and trigger retries
// Re-throw so BullMQ can track the failure and trigger retries
throw error;
}
}
@@ -1,4 +1,4 @@
import { Job } from "bull";
import type { Job } from "bullmq";
import { logger } from "../../utils/logger";
import { MusicScannerService } from "../../services/musicScanner";
import { config } from "../../config";
@@ -183,7 +183,7 @@ export async function processScan(
logger.debug(`═══════════════════════════════════════════════`);
// Report progress
await job.progress(0);
await job.updateProgress(0);
// Prepare cover cache path (store alongside transcode cache)
const coverCachePath = path.join(
@@ -197,7 +197,7 @@ export async function processScan(
const percent = Math.floor(
(progress.filesScanned / progress.filesTotal) * 100
);
job.progress(percent).catch((err) =>
job.updateProgress(percent).catch((err) =>
logger.error(`Failed to update job progress:`, err)
);
// Emit SSE every 2% to avoid flooding
@@ -219,7 +219,7 @@ export async function processScan(
try {
const result = await scanner.scanLibrary(scanPath);
await job.progress(100);
await job.updateProgress(100);
if (userId) {
eventBus.emit({
+21 -35
View File
@@ -1,51 +1,37 @@
import Bull from "bull";
import { Queue } from "bullmq";
import type { ConnectionOptions } from "bullmq";
import { logger } from "../utils/logger";
import { config } from "../config";
const redisUrl = new URL(config.redisUrl);
const redisConfig = {
host: redisUrl.hostname,
port: parseInt(redisUrl.port),
};
function getConnectionOptions(): ConnectionOptions {
const url = new URL(config.redisUrl);
return {
host: url.hostname,
port: parseInt(url.port, 10) || 6379,
password: url.password || undefined,
maxRetriesPerRequest: null,
enableReadyCheck: false,
};
}
const defaultQueueSettings: Bull.QueueOptions["settings"] = {
stalledInterval: 30000,
lockDuration: 30000,
maxStalledCount: 1,
};
const defaultJobOptions: Bull.JobOptions = {
const defaultJobOptions = {
removeOnComplete: 100,
removeOnFail: 50,
attempts: 3,
backoff: { type: "exponential" as const, delay: 5000 },
};
export const scanQueue = new Bull("library-scan", {
redis: redisConfig,
settings: defaultQueueSettings,
// v2 suffix avoids key conflict with old Bull v4 data still in Redis
export const scanQueue = new Queue("library-scan-v2", {
connection: getConnectionOptions(),
defaultJobOptions,
});
export const discoverQueue = new Bull("discover-weekly", {
redis: redisConfig,
settings: defaultQueueSettings,
export const discoverQueue = new Queue("discover-weekly-v2", {
connection: getConnectionOptions(),
defaultJobOptions,
});
export const queues = [scanQueue, discoverQueue];
queues.forEach((queue) => {
queue.on("error", (error) => {
logger.error(`Bull queue error (${queue.name}):`, {
message: error.message,
stack: error.stack,
});
});
queue.on("stalled", (job) => {
logger.warn(`Bull job stalled (${queue.name}):`, {
jobId: job.id,
});
});
});
logger.debug("Bull queues initialized with stability settings");
logger.debug("BullMQ queues initialized (library-scan-v2, discover-weekly-v2)");
@@ -0,0 +1,66 @@
import { Worker, Job } from "bullmq";
import { createWorkerConnection, QUEUE_NAMES } from "./enrichmentQueues";
import { enrichSingleTrack } from "./unifiedEnrichment";
import { enrichmentFailureService } from "../services/enrichmentFailureService";
import { prisma } from "../utils/db";
import { logger } from "../utils/logger";
import { getSystemSettings } from "../utils/systemSettings";
export interface TrackJobData {
trackId: string;
trackTitle: string;
}
export async function startTrackEnrichmentWorker(): Promise<Worker> {
const settings = await getSystemSettings();
const concurrency = settings?.enrichmentConcurrency ?? 5;
const worker = new Worker<TrackJobData>(
QUEUE_NAMES.TRACKS,
async (job: Job<TrackJobData>) => {
const { trackId, trackTitle } = job.data;
logger.debug(`[TrackWorker] Processing ${trackId} (${trackTitle})`);
await enrichSingleTrack(trackId);
},
{
connection: createWorkerConnection(),
concurrency,
lockDuration: 60000,
stalledInterval: 30000,
maxStalledCount: 2,
},
);
worker.on("failed", async (job, err) => {
if (!job) return;
const { trackId, trackTitle } = job.data;
if ((err as any).entityNotFound) {
logger.info(`[TrackWorker] Track ${trackId} no longer exists, resolving silently`);
return;
}
logger.error(`[TrackWorker] Track ${trackId} failed (attempt ${job.attemptsMade}): ${err.message}`);
const isFinalAttempt = job.attemptsMade >= (job.opts.attempts ?? 3);
if (isFinalAttempt) {
// Clear the "_queued" sentinel so the orchestrator can re-pick this
// track up on the next library re-enrich rather than leaving it stuck
await prisma.track
.update({ where: { id: trackId }, data: { lastfmTags: [] } })
.catch(() => {});
await enrichmentFailureService
.recordFailure({
entityType: "track",
entityId: trackId,
entityName: trackTitle,
errorMessage: err.message,
})
.catch(() => {});
}
});
worker.on("error", (err) => logger.error(`[TrackWorker] Error: ${err.message}`));
logger.info(`[TrackWorker] Started (concurrency: ${concurrency})`);
return worker;
}
+277 -368
View File
@@ -13,18 +13,28 @@
import { logger } from "../utils/logger";
import { prisma } from "../utils/db";
import { enrichSimilarArtist } from "./artistEnrichment";
import { lastFmService } from "../services/lastfm";
import Redis from "ioredis";
import { config } from "../config";
import type { Worker as BullWorker } from "bullmq";
import {
artistQueue,
trackQueue,
vibeQueue,
podcastQueue,
closeEnrichmentQueues,
} from "./enrichmentQueues";
import { startArtistEnrichmentWorker } from "./artistEnrichmentWorker";
import { startTrackEnrichmentWorker } from "./trackEnrichmentWorker";
import { startPodcastEnrichmentWorker } from "./podcastEnrichmentWorker";
import {
startAudioCompletionSubscriber,
stopAudioCompletionSubscriber,
} from "./audioCompletionSubscriber";
import { enrichmentStateService } from "../services/enrichmentState";
import { enrichmentFailureService } from "../services/enrichmentFailureService";
import { audioAnalysisCleanupService } from "../services/audioAnalysisCleanup";
import { rateLimiter } from "../services/rateLimiter";
import { vibeAnalysisCleanupService, VIBE_MAX_RETRIES } from "../services/vibeAnalysisCleanup";
import { getSystemSettings } from "../utils/systemSettings";
import { featureDetection } from "../services/featureDetection";
import pLimit from "p-limit";
// Configuration
const ARTIST_BATCH_SIZE = 10;
@@ -39,6 +49,7 @@ let controlSubscriber: Redis | null = null;
let isPaused = false;
let isStopping = false;
let immediateEnrichmentRequested = false;
let activeEnrichmentWorkers: BullWorker[] = [];
let consecutiveSystemFailures = 0; // Track consecutive system-level failures
let lastRunTime = 0;
let audioLastCycleCompletedCount: number | null = null;
@@ -59,6 +70,12 @@ let currentBatchFailures: BatchFailures = {
// Session-level failure counter (accumulates across cycles, reset on enrichment start)
let sessionFailureCount = { artists: 0, tracks: 0, audio: 0 };
// Timestamp for once-per-hour orphaned failure record cleanup
let lastOrphanedFailuresCleanup: Date | null = null;
// Timestamp for once-per-day resolved failure record cleanup (>30 days old)
let lastResolvedCleanup: Date | null = null;
// Mood tags to extract from Last.fm
const MOOD_TAGS = new Set([
// Energy/Activity
@@ -192,9 +209,11 @@ async function setupControlChannel() {
if (message === "pause") {
isPaused = true;
logger.debug("[Enrichment] Paused");
Promise.all(activeEnrichmentWorkers.map((w) => w.pause())).catch(() => {});
} else if (message === "resume") {
isPaused = false;
logger.debug("[Enrichment] Resumed");
Promise.all(activeEnrichmentWorkers.map((w) => w.resume())).catch(() => {});
} else if (message === "stop") {
isStopping = true;
isPaused = true;
@@ -202,6 +221,8 @@ async function setupControlChannel() {
"[Enrichment] Stopping gracefully - completing current item...",
);
// DO NOT override state - let enrichmentStateService.stop() handle it
// Signal CLAP Python container to stop draining its queue
getRedis().publish("audio:clap:control", JSON.stringify({ command: "stop" })).catch(() => {});
}
}
});
@@ -227,7 +248,7 @@ export async function startUnifiedEnrichmentWorker() {
});
const orphanedVibe = await prisma.track.updateMany({
where: { vibeAnalysisStatus: "processing" },
data: { vibeAnalysisStatus: "pending", vibeAnalysisStartedAt: null },
data: { vibeAnalysisStatus: null, vibeAnalysisStartedAt: null },
});
if (orphanedAudio.count > 0 || orphanedVibe.count > 0) {
logger.info(
@@ -254,6 +275,16 @@ export async function startUnifiedEnrichmentWorker() {
// Initialize state
await enrichmentStateService.initializeState();
// Start BullMQ Workers (artist, track, podcast)
activeEnrichmentWorkers = await Promise.all([
startArtistEnrichmentWorker(),
startTrackEnrichmentWorker(),
startPodcastEnrichmentWorker(),
]);
// Start audio completion subscriber (Essentia → vibe queue bridge)
startAudioCompletionSubscriber();
// Setup control channel subscription
await setupControlChannel();
@@ -278,7 +309,7 @@ function scheduleNextEnrichmentCycle() {
/**
* Stop the enrichment worker
*/
export function stopUnifiedEnrichmentWorker() {
export async function stopUnifiedEnrichmentWorker() {
if (enrichmentInterval) {
clearTimeout(enrichmentInterval);
enrichmentInterval = null;
@@ -293,8 +324,14 @@ export function stopUnifiedEnrichmentWorker() {
controlSubscriber = null;
}
// Close BullMQ Workers, audio subscriber, and Queues
await Promise.all(activeEnrichmentWorkers.map((w) => w.close())).catch(() => {});
activeEnrichmentWorkers = [];
await stopAudioCompletionSubscriber().catch(() => {});
await closeEnrichmentQueues().catch(() => {});
// Mark as stopped in state
enrichmentStateService
await enrichmentStateService
.updateState({
status: "idle",
currentPhase: null,
@@ -421,6 +458,13 @@ async function runEnrichmentCycle(fullMode: boolean): Promise<{
immediateEnrichmentRequested = false;
lastRunTime = now;
// Detect hangs: warn if enrichment has been "running" > 15 min with no state update
const isHung = await enrichmentStateService.detectHang();
if (isHung) {
logger.warn("[Enrichment] Hang detected — enrichment has been running > 15 min with no activity");
}
isRunning = true;
let artistsProcessed = 0;
@@ -449,19 +493,27 @@ async function runEnrichmentCycle(fullMode: boolean): Promise<{
}
audioQueued = audioResult;
const vibeResult = await runPhase("vibe", executeVibePhase);
if (vibeResult === null) {
return { artists: artistsProcessed, tracks: tracksProcessed, audioQueued };
}
const vibeQueued = vibeResult;
// Podcast refresh phase -- only runs if subscriptions exist
await runPhase("podcasts", executePodcastRefreshPhase);
// Orphaned failure cleanup -- runs at most once per hour, never during stop/pause
const ONE_HOUR_MS = 60 * 60 * 1000;
if (!isStopping && !isPaused && (!lastOrphanedFailuresCleanup || Date.now() - lastOrphanedFailuresCleanup.getTime() > ONE_HOUR_MS)) {
await enrichmentFailureService.cleanupOrphanedFailures();
lastOrphanedFailuresCleanup = new Date();
}
// Daily: clean up old resolved failures (>30 days)
const ONE_DAY_MS = 24 * 60 * 60 * 1000;
if (!isStopping && !isPaused && (!lastResolvedCleanup || Date.now() - lastResolvedCleanup.getTime() > ONE_DAY_MS)) {
lastResolvedCleanup = new Date();
await enrichmentFailureService.cleanupOldResolved();
}
const features = await featureDetection.getFeatures();
// Log progress (only if work was done)
if (artistsProcessed > 0 || tracksProcessed > 0 || audioQueued > 0 || vibeQueued > 0) {
if (artistsProcessed > 0 || tracksProcessed > 0 || audioQueued > 0) {
const progress = await getEnrichmentProgress();
logger.debug(`\n[Enrichment Progress]`);
logger.debug(
@@ -667,135 +719,14 @@ async function runEnrichmentCycle(fullMode: boolean): Promise<{
return { artists: artistsProcessed, tracks: tracksProcessed, audioQueued };
}
/**
* Step 1: Enrich artist metadata
*/
async function enrichArtistsBatch(): Promise<number> {
// Get concurrency setting from system settings
const settings = await getSystemSettings();
const concurrency = settings?.enrichmentConcurrency || 1;
const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
const artists = await prisma.artist.findMany({
where: {
OR: [
{ enrichmentStatus: "pending" },
{ enrichmentStatus: "failed" },
{
enrichmentStatus: "unresolvable",
lastEnriched: { lt: sevenDaysAgo },
},
],
albums: { some: {} },
},
orderBy: { name: "asc" },
take: ARTIST_BATCH_SIZE,
});
if (artists.length === 0) return 0;
logger.debug(
`[Artists] Processing ${artists.length} artists (concurrency: ${concurrency})...`,
);
// Use p-limit to control concurrency
const limit = pLimit(concurrency);
const results = await Promise.allSettled(
artists.map((artist) =>
limit(async () => {
// Check if paused before processing
if (isPaused) {
throw new Error("Paused");
}
// Update state with current artist
await enrichmentStateService.updateState({
artists: {
current: artist.name,
} as any,
});
try {
// Add timeout to prevent hanging on rate-limited requests
// 60s to accommodate multiple sequential API calls (MusicBrainz, Wikidata, Last.fm, Fanart.tv, Deezer, covers)
await withTimeout(
enrichSimilarArtist(artist),
60000, // 60 second max per artist
`Timeout enriching artist: ${artist.name}`,
);
logger.debug(`${artist.name}`);
return artist.name;
} catch (error) {
logger.error(`${artist.name}:`, error);
// Collect failure for batch reporting
currentBatchFailures.artists.push({
name: artist.name,
error:
error instanceof Error ?
error.message
: String(error),
});
// Record failure
await enrichmentFailureService.recordFailure({
entityType: "artist",
entityId: artist.id,
entityName: artist.name,
errorMessage:
error instanceof Error ?
error.message
: String(error),
errorCode:
(
error instanceof Error &&
error.message.includes("Timeout")
) ?
"TIMEOUT_ERROR"
: "ENRICHMENT_ERROR",
metadata: {
mbid: artist.mbid,
},
});
throw error;
}
}),
),
);
// Count successful enrichments
const processed = results.filter((r) => r.status === "fulfilled").length;
if (processed > 0) {
logger.debug(
`[Artists] Successfully enriched ${processed}/${artists.length} artists`,
);
}
return processed;
}
/**
* Step 2: Enrich track mood tags from Last.fm
* Note: No longer waits for artist enrichment - runs in parallel
* Enrich a single track's tags from Last.fm.
* Used by the BullMQ track enrichment Worker (Phase 4).
*/
async function enrichTrackTagsBatch(): Promise<number> {
// Get concurrency setting from system settings
const settings = await getSystemSettings();
const concurrency = settings?.enrichmentConcurrency || 1;
// Note: Nested orderBy on relations doesn't work with isEmpty filtering in Prisma
// Track tag enrichment doesn't depend on artist enrichment status, so we just order by recency
// Match both empty array AND null (newly scanned tracks have null, not [])
const tracks = await prisma.track.findMany({
where: {
OR: [
{ lastfmTags: { equals: [] } },
{ lastfmTags: { isEmpty: true } },
{ lastfmTags: { equals: null } },
],
},
export async function enrichSingleTrack(trackId: string): Promise<void> {
const track = await prisma.track.findUnique({
where: { id: trackId },
include: {
album: {
include: {
@@ -803,119 +734,39 @@ async function enrichTrackTagsBatch(): Promise<number> {
},
},
},
take: TRACK_BATCH_SIZE,
orderBy: [{ fileModified: "desc" }],
});
if (tracks.length === 0) return 0;
logger.debug(
`[Track Tags] Processing ${tracks.length} tracks (concurrency: ${concurrency})...`,
);
// Use p-limit to control concurrency
const limit = pLimit(concurrency);
const results = await Promise.allSettled(
tracks.map((track) =>
limit(async () => {
// Check if paused before processing
if (isPaused) {
throw new Error("Paused");
}
// Update state with current track
await enrichmentStateService.updateState({
tracks: {
current: `${track.album.artist.name} - ${track.title}`,
} as any,
});
try {
const artistName = track.album.artist.name;
// Add timeout to prevent hanging on rate-limited requests
const trackInfo = await withTimeout(
lastFmService.getTrackInfo(artistName, track.title),
30000, // 30 second max per track
`Timeout enriching track: ${track.title}`,
);
if (trackInfo?.toptags?.tag) {
const allTags = trackInfo.toptags.tag.map(
(t: any) => t.name,
);
const moodTags = filterMoodTags(allTags);
await prisma.track.update({
where: { id: track.id },
data: {
lastfmTags:
moodTags.length > 0 ?
moodTags
: ["_no_mood_tags"],
},
});
if (moodTags.length > 0) {
logger.debug(
`${track.title}: [${moodTags
.slice(0, 3)
.join(", ")}...]`,
);
}
} else {
await prisma.track.update({
where: { id: track.id },
data: { lastfmTags: ["_not_found"] },
});
}
// Small delay between requests
await new Promise((resolve) => setTimeout(resolve, 200));
return track.title;
} catch (error: any) {
logger.error(
`${track.title}: ${error?.message || error}`,
);
// Collect failure for batch reporting
currentBatchFailures.tracks.push({
name: `${track.album.artist.name} - ${track.title}`,
error: error?.message || String(error),
});
// Record failure
await enrichmentFailureService.recordFailure({
entityType: "track",
entityId: track.id,
entityName: `${track.album.artist.name} - ${track.title}`,
errorMessage: error?.message || String(error),
errorCode:
error?.message?.includes("Timeout") ?
"TIMEOUT_ERROR"
: "LASTFM_ERROR",
metadata: {
albumId: track.albumId,
filePath: track.filePath,
},
});
throw error;
}
}),
),
);
// Count successful enrichments
const processed = results.filter((r) => r.status === "fulfilled").length;
if (processed > 0) {
logger.debug(
`[Track Tags] Successfully enriched ${processed}/${tracks.length} tracks`,
);
if (!track) {
const err = new Error(`ENTITY_NOT_FOUND: Track ${trackId} deleted`);
(err as any).entityNotFound = true;
throw err;
}
return processed;
const artistName = track.album.artist.name;
const trackInfo = await withTimeout(
lastFmService.getTrackInfo(artistName, track.title),
30000,
`Timeout enriching track: ${track.title}`,
);
if (trackInfo?.toptags?.tag) {
const allTags = trackInfo.toptags.tag.map((t: any) => t.name);
const moodTags = filterMoodTags(allTags);
await prisma.track.update({
where: { id: track.id },
data: {
lastfmTags: moodTags.length > 0 ? moodTags : ["_no_mood_tags"],
},
});
if (moodTags.length > 0) {
logger.debug(`${track.title}: [${moodTags.slice(0, 3).join(", ")}...]`);
}
} else {
await prisma.track.update({
where: { id: track.id },
data: { lastfmTags: ["_not_found"] },
});
}
}
/**
@@ -982,57 +833,6 @@ async function queueAudioAnalysis(): Promise<number> {
return queued;
}
/**
* Step 4: Queue tracks for CLAP vibe embeddings
* Only runs if CLAP analyzer is available
*/
async function queueVibeEmbeddings(): Promise<number> {
const tracks = await prisma.$queryRaw<{ id: string; filePath: string; vibeAnalysisStatus: string | null }[]>`
SELECT t.id, t."filePath", t."vibeAnalysisStatus"
FROM "Track" t
LEFT JOIN track_embeddings te ON t.id = te.track_id
WHERE te.track_id IS NULL
AND t."filePath" IS NOT NULL
AND (t."vibeAnalysisStatus" IS NULL OR t."vibeAnalysisStatus" <> 'processing')
AND COALESCE(t."vibeAnalysisRetryCount", 0) < ${VIBE_MAX_RETRIES}
LIMIT 1000
`;
if (tracks.length === 0) {
return 0;
}
const redis = getRedis();
let queued = 0;
for (const track of tracks) {
try {
await prisma.track.update({
where: { id: track.id },
data: {
vibeAnalysisStatus: 'processing',
vibeAnalysisStartedAt: new Date(),
vibeAnalysisStatusUpdatedAt: new Date(),
},
});
await redis.rpush(
"audio:clap:queue",
JSON.stringify({
trackId: track.id,
filePath: track.filePath,
})
);
queued++;
} catch (error) {
logger.error(` Failed to queue vibe embedding for ${track.id}:`, error);
}
}
return queued;
}
/**
* Check if enrichment should stop and handle state cleanup if stopping.
* Returns true if cycle should halt (either stopping or paused).
@@ -1053,7 +853,7 @@ async function shouldHaltCycle(): Promise<boolean> {
* Run a phase and return result. Returns null if cycle should halt.
*/
async function runPhase(
phaseName: "artists" | "tracks" | "audio" | "vibe" | "podcasts",
phaseName: "artists" | "tracks" | "audio" | "podcasts",
executor: () => Promise<number>,
): Promise<number | null> {
await enrichmentStateService.updateState({
@@ -1071,11 +871,102 @@ async function runPhase(
}
async function executeArtistsPhase(): Promise<number> {
return enrichArtistsBatch();
// Reset temp-MBID artists that have been unresolvable for >24h
const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
await prisma.artist.updateMany({
where: {
mbid: { startsWith: "temp-" },
enrichmentStatus: "unresolvable",
lastEnriched: { lt: oneDayAgo },
},
data: { enrichmentStatus: "pending" },
});
const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
const pendingArtists = await prisma.artist.findMany({
where: {
OR: [
{ enrichmentStatus: "pending" },
{ enrichmentStatus: "failed" },
{ enrichmentStatus: "unresolvable", lastEnriched: { lt: sevenDaysAgo } },
],
albums: { some: {} },
},
select: { id: true, name: true },
take: ARTIST_BATCH_SIZE,
});
if (pendingArtists.length === 0) return 0;
let queued = 0;
for (const artist of pendingArtists) {
try {
// Add FIRST — if Redis is down, status stays "pending" and retries naturally
await artistQueue.add(
"enrich",
{ artistId: artist.id, artistName: artist.name },
{ jobId: `artist-${artist.id}` }, // dedup — no-op if already queued
);
// Update AFTER successful add
await prisma.artist.update({
where: { id: artist.id },
data: { enrichmentStatus: "enriching" },
});
queued++;
} catch (err) {
logger.warn(`[Enrichment] Failed to queue artist ${artist.id}: ${(err as Error).message}`);
}
}
if (queued > 0) {
logger.debug(`[Enrichment] Queued ${queued} artists`);
}
return queued;
}
async function executeMoodTagsPhase(): Promise<number> {
return enrichTrackTagsBatch();
const tracks = await prisma.track.findMany({
where: {
OR: [
{ lastfmTags: { equals: [] } },
{ lastfmTags: { isEmpty: true } },
],
// Exclude tracks already queued this cycle — prevents re-adding the
// same tracks on every 5s tick before the worker can process them.
// The worker always overwrites ["_queued"] with real data or a
// terminal sentinel (["_no_mood_tags"], ["_not_found"]).
NOT: { lastfmTags: { has: "_queued" } },
},
select: { id: true, title: true },
take: TRACK_BATCH_SIZE,
orderBy: [{ fileModified: "desc" }],
});
if (tracks.length === 0) return 0;
const queuedIds: string[] = [];
for (const track of tracks) {
try {
await trackQueue.add(
"enrich",
{ trackId: track.id, trackTitle: track.title },
{ jobId: `track-${track.id}` }, // dedup — no-op if already queued
);
queuedIds.push(track.id);
} catch (err) {
logger.warn(`[Enrichment] Failed to queue track ${track.id}: ${(err as Error).message}`);
}
}
if (queuedIds.length > 0) {
// Mark as in-flight so the next orchestrator tick skips them
await prisma.track.updateMany({
where: { id: { in: queuedIds } },
data: { lastfmTags: ["_queued"] },
});
logger.debug(`[Enrichment] Queued ${queuedIds.length} tracks`);
}
return queuedIds.length;
}
async function executeAudioPhase(): Promise<number> {
@@ -1098,6 +989,21 @@ async function executeAudioPhase(): Promise<number> {
);
}
// Drain purgatory: tracks stuck as pending but retryCount >= MAX_RETRIES will never complete
const purgatoryDrained = await prisma.track.updateMany({
where: {
analysisStatus: "pending",
analysisRetryCount: { gte: 3 },
},
data: {
analysisStatus: "failed",
analysisError: "Exceeded retry limit — track may be corrupted or unsupported",
},
});
if (purgatoryDrained.count > 0) {
logger.warn(`[Enrichment] Drained ${purgatoryDrained.count} purgatory tracks to failed`);
}
if (audioAnalysisCleanupService.isCircuitOpen()) {
logger.warn(
"[Enrichment] Audio analysis circuit breaker OPEN - skipping queue",
@@ -1112,79 +1018,33 @@ async function executePodcastRefreshPhase(): Promise<number> {
const podcastCount = await prisma.podcast.count();
if (podcastCount === 0) return 0;
// Only refresh once per hour (check oldest lastRefreshed)
const ONE_HOUR = 60 * 60 * 1000;
const staleThreshold = new Date(Date.now() - ONE_HOUR);
const stalePodcasts = await prisma.podcast.findMany({
where: {
lastRefreshed: { lt: staleThreshold },
},
where: { lastRefreshed: { lt: staleThreshold } },
select: { id: true, title: true },
});
if (stalePodcasts.length === 0) return 0;
logger.debug(`[Enrichment] Refreshing ${stalePodcasts.length} podcast feeds...`);
const { refreshPodcastFeed } = await import("../routes/podcasts");
let refreshed = 0;
let queued = 0;
for (const podcast of stalePodcasts) {
if (isPaused || isStopping) break;
try {
const result = await withTimeout(
refreshPodcastFeed(podcast.id),
30000,
`Timeout refreshing podcast: ${podcast.title}`,
await podcastQueue.add(
"refresh",
{ podcastId: podcast.id, podcastTitle: podcast.title },
{ jobId: `podcast-${podcast.id}` }, // dedup — no-op if already queued
);
if (result.newEpisodesCount > 0) {
logger.debug(` [Podcast] ${podcast.title}: ${result.newEpisodesCount} new episodes`);
}
refreshed++;
} catch (error) {
logger.error(` [Podcast] Failed to refresh ${podcast.title}:`, error);
queued++;
} catch (err) {
logger.warn(`[Enrichment] Failed to queue podcast ${podcast.id}: ${(err as Error).message}`);
}
}
if (refreshed > 0) {
logger.debug(`[Enrichment] Refreshed ${refreshed} podcast feeds`);
if (queued > 0) {
logger.debug(`[Enrichment] Queued ${queued} podcast refreshes`);
}
return refreshed;
}
async function executeVibePhase(): Promise<number> {
const features = await featureDetection.getFeatures();
if (!features.vibeEmbeddings) {
return 0;
}
const audioProcessing = await prisma.track.count({
where: { analysisStatus: "processing" },
});
const audioQueue = await getRedis().llen("audio:analysis:queue");
if (audioProcessing > 0 || audioQueue > 0) {
logger.debug(
`[Enrichment] Skipping vibe phase - audio still running (${audioProcessing} processing, ${audioQueue} queued)`,
);
return 0;
}
const { reset } = await vibeAnalysisCleanupService.cleanupStaleProcessing();
if (reset > 0) {
logger.info(`[ENRICHMENT] Cleaned up ${reset} stale vibe processing entries`);
}
const orphanedResult = await vibeAnalysisCleanupService.cleanupOrphanedCompleted();
if (orphanedResult.reset > 0) {
logger.info(`[ENRICHMENT] Reset ${orphanedResult.reset} orphaned completed tracks (total orphaned: ${orphanedResult.totalOrphaned})`);
}
const result = await queueVibeEmbeddings();
logger.info(`[ENRICHMENT] Queued ${result} tracks for vibe embedding`);
return result;
return queued;
}
/**
@@ -1234,18 +1094,19 @@ export async function getEnrichmentProgress() {
});
// CLAP embedding progress (for vibe similarity)
const [clapEmbeddingCount, clapProcessing, clapQueueLength, clapFailedCount] = await Promise.all([
const [clapEmbeddingCount, clapProcessing, clapQueueCounts, clapFailedCount] = await Promise.all([
prisma.$queryRaw<{ count: bigint }[]>`
SELECT COUNT(*) as count FROM track_embeddings
`,
prisma.track.count({
where: { vibeAnalysisStatus: "processing" },
}),
getRedis().llen("audio:clap:queue"),
vibeQueue.getJobCounts("active", "waiting", "delayed"),
prisma.track.count({
where: { vibeAnalysisStatus: "failed" },
}),
]);
const clapQueueLength = (clapQueueCounts.active ?? 0) + (clapQueueCounts.waiting ?? 0) + (clapQueueCounts.delayed ?? 0);
const clapCompleted = Number(clapEmbeddingCount[0]?.count || 0);
const clapFailed = clapFailedCount;
@@ -1333,7 +1194,8 @@ export async function triggerEnrichmentNow(): Promise<{
// Reset pause state when triggering enrichment
isPaused = false;
// Set flag to bypass isRunning check (prevents race conditions)
// Set flag to bypass the minimum interval check (does NOT bypass isRunning —
// a concurrent cycle will still cause this call to return an empty result)
immediateEnrichmentRequested = true;
return runEnrichmentCycle(false);
@@ -1383,6 +1245,10 @@ export async function triggerEnrichmentNow(): Promise<{
export async function reRunAudioAnalysisOnly(): Promise<number> {
logger.debug("[Enrichment] Re-running audio analysis only...");
// Reset circuit breaker first so cleanupStaleProcessing doesn't increment a failure
// count that we're about to discard anyway
audioAnalysisCleanupService.resetCircuitBreaker();
await audioAnalysisCleanupService.cleanupStaleProcessing();
// Reset all non-pending tracks so they get re-queued
@@ -1393,15 +1259,24 @@ export async function triggerEnrichmentNow(): Promise<{
data: {
analysisStatus: "pending",
analysisStartedAt: null,
analysisRetryCount: 0,
},
});
logger.debug(`[Enrichment] Reset ${reset.count} tracks to pending for audio re-analysis`);
const queued = await queueAudioAnalysis();
logger.debug(`[Enrichment] Queued ${queued} tracks for audio analysis`);
// Trigger a cycle immediately so the UI shows running and progress updates
isPaused = false;
immediateEnrichmentRequested = true;
runEnrichmentCycle(false).catch((err) =>
logger.error("[Enrichment] reRunAudioAnalysisOnly cycle error:", err)
);
return queued;
}
@@ -1418,11 +1293,45 @@ export async function triggerEnrichmentNow(): Promise<{
return 0;
}
await vibeAnalysisCleanupService.cleanupStaleProcessing();
// Reset all tracks so they can be re-embedded.
// Only reset tracks whose audio analysis is complete (no point embedding incomplete audio).
await prisma.track.updateMany({
where: {
analysisStatus: "completed",
vibeAnalysisStatus: { not: null },
},
data: {
vibeAnalysisStatus: null,
vibeAnalysisRetryCount: 0,
vibeAnalysisStartedAt: null,
vibeAnalysisStatusUpdatedAt: null,
},
});
const queued = await queueVibeEmbeddings();
const tracks = await prisma.$queryRaw<{ id: string; filePath: string }[]>`
SELECT t.id, t."filePath"
FROM "Track" t
LEFT JOIN track_embeddings te ON t.id = te.track_id
WHERE te.track_id IS NULL
AND t."analysisStatus" = 'completed'
AND t."filePath" IS NOT NULL
LIMIT 5000
`;
let queued = 0;
for (const track of tracks) {
try {
await vibeQueue.add(
"embed",
{ trackId: track.id, filePath: track.filePath },
{ jobId: `vibe-${track.id}` },
);
queued++;
} catch (err) {
logger.error(`[Enrichment] Failed to queue vibe job for ${track.id}: ${(err as Error).message}`);
}
}
logger.debug(`[Enrichment] Queued ${queued} tracks for vibe embeddings`);
return queued;
}
+3
View File
@@ -32,6 +32,9 @@ services:
# Makes host.docker.internal work on Linux (already works on Docker Desktop)
extra_hosts:
- "host.docker.internal:host-gateway"
# Fix Redis memory overcommit warning
sysctls:
- vm.overcommit_memory=1
restart: unless-stopped
healthcheck:
test: ["CMD", "node", "/app/healthcheck.js"]
+3
View File
@@ -30,6 +30,9 @@ services:
# Makes host.docker.internal work on Linux (already works on Docker Desktop)
extra_hosts:
- "host.docker.internal:host-gateway"
# Fix Redis memory overcommit warning
sysctls:
- vm.overcommit_memory=1
restart: unless-stopped
healthcheck:
test:
-12
View File
@@ -267,13 +267,6 @@ services:
cpus: "4"
reservations:
memory: 2G
# GPU acceleration (optional) - uncomment to enable
# Requires NVIDIA Container Toolkit on the host:
# https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/install-guide.html
# devices:
# - driver: nvidia
# count: 1
# capabilities: [gpu]
healthcheck:
test: ["CMD", "pgrep", "-f", "python.*analyzer"]
interval: 30s
@@ -317,11 +310,6 @@ services:
cpus: "4"
reservations:
memory: 1G
# GPU acceleration (optional) - uncomment to enable
# devices:
# - driver: nvidia
# count: 1
# capabilities: [gpu]
volumes:
# Kima core
+7
View File
@@ -81,6 +81,13 @@ export default function OnboardingPage() {
setLoading(true);
try {
try {
const healthRes = await fetch("/api/health", { method: "GET" });
if (!healthRes.ok) throw new Error("not ready");
} catch {
setError("Cannot reach the server. Check that Kima is fully started and try again.");
return;
}
const response = await api.post<{
token: string;
user: { id: string; username: string };
+111
View File
@@ -8,6 +8,7 @@ import { api } from "@/lib/api";
import { audioEngine } from "@/lib/audio-engine";
import { audioSeekEmitter } from "@/lib/audio-seek-emitter";
import { dispatchQueryEvent } from "@/lib/query-events";
import { silenceKeepalive } from "@/lib/silence-keepalive";
import {
useEffect,
useLayoutEffect,
@@ -105,6 +106,9 @@ export const AudioElement = memo(function AudioElement() {
const nextPodcastEpisodeRef = useRef(nextPodcastEpisode);
const pauseRef = useRef(pause);
const queueRef = useRef(queue);
const currentIndexRef = useRef(currentIndex);
const isShuffleRef = useRef(isShuffle);
const shuffleIndicesRef = useRef(shuffleIndices);
const setCurrentTrackRef = useRef(setCurrentTrack);
const setCurrentAudiobookRef = useRef(setCurrentAudiobook);
const setCurrentPodcastRef = useRef(setCurrentPodcast);
@@ -120,6 +124,9 @@ export const AudioElement = memo(function AudioElement() {
nextPodcastEpisodeRef.current = nextPodcastEpisode;
pauseRef.current = pause;
queueRef.current = queue;
currentIndexRef.current = currentIndex;
isShuffleRef.current = isShuffle;
shuffleIndicesRef.current = shuffleIndices;
setCurrentTrackRef.current = setCurrentTrack;
setCurrentAudiobookRef.current = setCurrentAudiobook;
setCurrentPodcastRef.current = setCurrentPodcast;
@@ -300,6 +307,17 @@ export const AudioElement = memo(function AudioElement() {
const handlePlaying = () => {
setIsBuffering(false);
// Main audio is playing — the engine holds the session, keepalive not needed.
// Also attempt a prime here for Android and permissive iOS contexts; the
// definitive iOS prime comes from the MediaSession handler in useMediaSession.
silenceKeepalive.prime();
silenceKeepalive.stop();
// If audio started playing while React state says we're paused
// (e.g. direct tryResume() call from MediaSession handler bypassed the
// React update chain), sync state back to playing so the UI reflects reality.
if (!lastPlayingStateRef.current) {
setIsPlaying(true);
}
};
const handleEnded = () => {
@@ -320,6 +338,29 @@ export const AudioElement = memo(function AudioElement() {
audioEngine.seek(0);
audioEngine.play();
} else {
// Compute and load the next track synchronously within this 'ended'
// event handler. iOS may reclaim the audio session during any silence
// between tracks — the same mechanism that blocks background resume.
// Loading here keeps audio output continuous and the session alive.
// The React track-change effect will see lastTrackIdRef already matches
// and skip the load, preventing a double-fetch.
// Compute and load synchronously to avoid a silence gap.
// getNextTrackInfo mirrors the index-advancement logic in next().
// If either is updated, keep the other in sync.
const nextTrackInfo = getNextTrackInfo(
queueRef.current,
currentIndexRef.current,
isShuffleRef.current,
shuffleIndicesRef.current,
repeatModeRef.current
);
if (nextTrackInfo) {
// Load directly — do NOT pre-set lastTrackIdRef here.
// The load effect detects this via currentSrc comparison and
// skips the duplicate fetch, keeping lastTrackIdRef in sync.
audioEngine.load(api.getStreamUrl(nextTrackInfo.id), true);
}
// Always call next() to keep React state and UI in sync
nextRef.current();
}
} else {
@@ -422,6 +463,13 @@ export const AudioElement = memo(function AudioElement() {
if (!streamUrl) return;
// handleEnded may have already loaded this URL directly (gapless track transition).
// If the engine is already on this source, sync the ref and skip the duplicate load.
if (audioEngine.getState().currentSrc === streamUrl) {
lastTrackIdRef.current = currentMediaId;
return;
}
// Determine autoplay: play if user was playing or if isPlaying was set (e.g., by next())
const shouldAutoPlay = lastPlayingStateRef.current;
@@ -460,6 +508,68 @@ export const AudioElement = memo(function AudioElement() {
}
}, [isPlaying, setIsPlaying]);
// --- Silence keepalive: hold the audio session while paused in background ---
//
// iOS and Android reclaim the audio session after a backgrounded PWA pauses.
// Playing near-silent audio prevents this, keeping MediaSession controls and
// subsequent audio.play() calls functional without opening the app.
useEffect(() => {
const hasMedia = !!(currentTrack || currentAudiobook || currentPodcast);
const sync = () => {
if (document.hidden && !isPlaying && hasMedia) {
silenceKeepalive.start();
} else {
silenceKeepalive.stop();
}
};
sync();
document.addEventListener("visibilitychange", sync);
return () => {
document.removeEventListener("visibilitychange", sync);
silenceKeepalive.stop();
};
}, [isPlaying, currentTrack, currentAudiobook, currentPodcast]);
// --- Foreground recovery: retry play if we should be playing but aren't ---
//
// Handles two iOS PWA scenarios:
// 1. visibilitychange: App was backgrounded. The audioEngine.tryResume() call in the
// MediaSession play handler may have been deferred by iOS. When foregrounded, if
// state says we should be playing but the audio element is still paused, retry.
// 2. pageshow: iOS restores pages from the back-forward cache (bfcache). Audio
// state is preserved in React but the audio element may be reset.
useEffect(() => {
const handleVisibilityChange = () => {
if (document.hidden) return;
if (lastPlayingStateRef.current && !audioEngine.isPlaying()) {
audioEngine.tryResume().then((started) => {
if (!started) setIsPlaying(false);
});
}
};
const handlePageShow = (event: PageTransitionEvent) => {
if (!event.persisted) return; // Only handle bfcache restores
if (lastPlayingStateRef.current && !audioEngine.isPlaying()) {
audioEngine.tryResume().then((started) => {
if (!started) setIsPlaying(false);
});
}
};
document.addEventListener("visibilitychange", handleVisibilityChange);
window.addEventListener("pageshow", handlePageShow);
return () => {
document.removeEventListener(
"visibilitychange",
handleVisibilityChange
);
window.removeEventListener("pageshow", handlePageShow);
};
}, [setIsPlaying]);
// --- Volume/mute sync ---
useEffect(() => { audioEngine.setVolume(volume); }, [volume]);
@@ -557,6 +667,7 @@ export const AudioElement = memo(function AudioElement() {
}
audioEngine.cleanup();
silenceKeepalive.destroy();
lastTrackIdRef.current = null;
if (progressSaveIntervalRef.current) {
+37
View File
@@ -1,6 +1,8 @@
import { useEffect, useCallback, useRef } from "react";
import { useAudio } from "@/lib/audio-context";
import { api } from "@/lib/api";
import { audioEngine } from "@/lib/audio-engine";
import { silenceKeepalive } from "@/lib/silence-keepalive";
/**
* Media Session API integration for OS-level media controls
@@ -28,11 +30,16 @@ export function useMediaSession() {
} = useAudio();
const currentTimeRef = useRef(currentTime);
const isPlayingRef = useRef(isPlaying);
useEffect(() => {
currentTimeRef.current = currentTime;
}, [currentTime]);
useEffect(() => {
isPlayingRef.current = isPlaying;
}, [isPlaying]);
// Track if this device has initiated playback locally
// Prevents cross-device media session interference from state sync
const hasPlayedLocallyRef = useRef(false);
@@ -239,6 +246,15 @@ export function useMediaSession() {
// Register action handlers
navigator.mediaSession.setActionHandler("play", () => {
resume();
// Call the engine directly within the MediaSession user-activation context.
// iOS releases the audio session when a backgrounded PWA pauses audio.
// By the time React processes the state update chain, the activation window
// has expired and audio.play() is blocked. Calling it synchronously here
// bypasses that race condition.
audioEngine.tryResume();
// Prime the silence keepalive element. MediaSession handlers are user-gesture
// contexts on both iOS and Android, making this the most reliable unlock point.
silenceKeepalive.prime();
});
navigator.mediaSession.setActionHandler("pause", () => {
@@ -357,4 +373,25 @@ export function useMediaSession() {
}
}
}, [currentTime, currentTrack, currentAudiobook, currentPodcast]);
// Re-sync MediaSession playbackState when the app comes back to the foreground.
// iOS may have shown stale controls while the app was backgrounded.
useEffect(() => {
if (!("mediaSession" in navigator)) return;
const handleVisibilityChange = () => {
if (!document.hidden) {
navigator.mediaSession.playbackState = isPlayingRef.current
? "playing"
: "paused";
}
};
document.addEventListener("visibilitychange", handleVisibilityChange);
return () =>
document.removeEventListener(
"visibilitychange",
handleVisibilityChange
);
}, []);
}
+17
View File
@@ -246,6 +246,23 @@ class AudioEngine {
}
}
/**
* Attempt to resume playback without emitting error events.
* Used for direct resumption from MediaSession handlers or visibility recovery,
* where a failed play should not trigger the errorskip-track flow.
* Returns true if playback started successfully.
*/
async tryResume(): Promise<boolean> {
if (!this.audio || !this.audio.src) return false;
if (!this.audio.paused) return true;
try {
await this.audio.play();
return true;
} catch {
return false;
}
}
/**
* Pause audio.
*/
+112
View File
@@ -0,0 +1,112 @@
/**
* SilenceKeepalive - Maintains the OS audio session while user audio is paused.
*
* iOS and Android may reclaim the audio session when a PWA is backgrounded with
* audio paused. This breaks MediaSession lock-screen controls and causes subsequent
* audio.play() calls to be blocked until the app is foregrounded.
*
* Keeping an inaudible audio element playing prevents the OS from reclaiming
* the session. The element loops a programmatically-generated silent WAV so no
* static asset is required.
*
* Usage:
* silenceKeepalive.prime() call from a user-gesture handler to unlock autoplay
* silenceKeepalive.start() begin looping silence (backgrounded + paused)
* silenceKeepalive.stop() stop when main audio resumes or media clears
*/
function buildSilentWavBlob(): Blob {
// 1-second mono 8-bit PCM WAV at 8 kHz — ~8 KB, universally supported
const sampleRate = 8000;
const numSamples = sampleRate;
const buffer = new ArrayBuffer(44 + numSamples);
const view = new DataView(buffer);
// RIFF header
view.setUint32(0, 0x52494646, false); // "RIFF"
view.setUint32(4, 36 + numSamples, true);
view.setUint32(8, 0x57415645, false); // "WAVE"
// fmt chunk
view.setUint32(12, 0x666d7420, false); // "fmt "
view.setUint32(16, 16, true); // chunk size
view.setUint16(20, 1, true); // PCM
view.setUint16(22, 1, true); // mono
view.setUint32(24, sampleRate, true);
view.setUint32(28, sampleRate, true); // byteRate = sampleRate × 1ch × 1byte
view.setUint16(32, 1, true); // blockAlign
view.setUint16(34, 8, true); // bitsPerSample
// data chunk
view.setUint32(36, 0x64617461, false); // "data"
view.setUint32(40, numSamples, true);
// 8-bit PCM silence = 128 (midpoint), not 0 (which is maximum negative)
new Uint8Array(buffer, 44).fill(128);
return new Blob([buffer], { type: "audio/wav" });
}
class SilenceKeepalive {
private audio: HTMLAudioElement | null = null;
private blobUrl: string | null = null;
private getAudio(): HTMLAudioElement {
if (this.audio) return this.audio;
const blob = buildSilentWavBlob();
this.blobUrl = URL.createObjectURL(blob);
const el = new Audio(this.blobUrl);
el.loop = true;
// Volume near-zero rather than exactly 0 — some platforms may skip the
// session heartbeat for a mathematically-silent output graph.
el.volume = 0.001;
this.audio = el;
return el;
}
/**
* Unlock the audio element for future programmatic play calls.
* Must be called from a user-gesture handler (click, touch, or MediaSession action).
* Delegates to start() kept as a separate call-site name so MediaSession handlers
* read as "prime the keepalive" rather than "start" (intent over implementation).
*/
prime(): void {
this.start();
}
/**
* Start looping silence to keep the audio session alive.
* Call when the app is backgrounded with user audio paused, or from a user-gesture
* context to unlock the element for subsequent backgrounded calls.
*/
start(): void {
if (typeof window === "undefined") return;
const el = this.getAudio();
if (!el.paused) return;
el.play().catch(() => {
// Will be retried on next prime() call or visibility change.
});
}
/**
* Stop looping silence.
* Call when main audio resumes or no media is loaded.
*/
stop(): void {
if (!this.audio || this.audio.paused) return;
this.audio.pause();
}
destroy(): void {
this.stop();
this.audio = null;
if (this.blobUrl) {
URL.revokeObjectURL(this.blobUrl);
this.blobUrl = null;
}
}
}
export const silenceKeepalive = new SilenceKeepalive();
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "kima-frontend",
"version": "1.5.6",
"version": "1.5.7",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "kima-frontend",
"version": "1.5.6",
"version": "1.5.7",
"license": "GPL-3.0",
"dependencies": {
"@tanstack/react-query": "^5.90.10",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "kima-frontend",
"version": "1.5.6",
"version": "1.5.7",
"description": "Kima web frontend",
"license": "GPL-3.0",
"repository": {
+1 -1
View File
@@ -5,7 +5,7 @@
"start_url": "/",
"display": "standalone",
"display_override": ["window-controls-overlay", "standalone"],
"orientation": "any",
"orientation": "portrait",
"theme_color": "#000000",
"background_color": "#000000",
"categories": ["music", "entertainment"],
+193 -160
View File
@@ -15,7 +15,7 @@ Features:
Architecture:
- CLAPAnalyzer: Model loading and embedding generation
- Worker: Queue consumer that processes tracks and stores embeddings
- BullMQVibeWorker: Queue consumer that processes tracks and stores embeddings
- TextEmbedHandler: Real-time text embedding via Redis pub/sub
"""
@@ -26,6 +26,7 @@ import json
import time
import logging
import gc
import asyncio
import threading
from datetime import datetime
from typing import Optional, Tuple
@@ -44,16 +45,11 @@ os.environ['NUMEXPR_MAX_THREADS'] = str(THREADS_PER_WORKER)
import torch
torch.set_num_threads(THREADS_PER_WORKER)
# Device detection - use GPU if available
if torch.cuda.is_available():
DEVICE = torch.device('cuda')
GPU_NAME = torch.cuda.get_device_name(0)
else:
DEVICE = torch.device('cpu')
GPU_NAME = None
DEVICE = torch.device('cpu')
import redis
import psycopg2
import psycopg2.pool
from psycopg2.extras import RealDictCursor
from pgvector.psycopg2 import register_vector
@@ -73,7 +69,7 @@ BACKEND_URL = os.getenv('BACKEND_URL', 'http://backend:3006')
MODEL_IDLE_TIMEOUT = int(os.getenv('MODEL_IDLE_TIMEOUT', '300'))
# Queue and channel names
ANALYSIS_QUEUE = 'audio:clap:queue'
VIBE_QUEUE_NAME = 'enrichment-vibe' # BullMQ queue consumed by BullMQVibeWorker
TEXT_EMBED_CHANNEL = 'audio:text:embed'
TEXT_EMBED_RESPONSE_PREFIX = 'audio:text:embed:response:'
CONTROL_CHANNEL = 'audio:clap:control'
@@ -123,10 +119,7 @@ class CLAPAnalyzer:
self._model_loaded = True
self.last_work_time = time.time()
if GPU_NAME:
logger.info(f"CLAP model loaded successfully on GPU: {GPU_NAME}")
else:
logger.info("CLAP model loaded successfully on CPU")
logger.info("CLAP model loaded successfully on CPU")
except Exception as e:
logger.error(f"Failed to load CLAP model: {e}")
traceback.print_exc()
@@ -140,8 +133,6 @@ class CLAPAnalyzer:
logger.info("Unloading CLAP model to free memory...")
self.model = None
self._model_loaded = False
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
# Force glibc to return freed pages to OS (Python/PyTorch hold RSS otherwise)
try:
@@ -350,197 +341,238 @@ class DatabaseConnection:
self.conn = None
class Worker:
class BullMQVibeWorker:
"""
Queue worker that processes audio files and stores embeddings.
BullMQ-based vibe embedding worker.
Polls the Redis queue for jobs, generates CLAP embeddings,
and stores results in PostgreSQL.
Replaces the legacy BLPOP-based Worker class. Consumes jobs from
'enrichment-vibe' (BullMQ queue) instead of polling 'audio:clap:queue'
(raw Redis list). Runs asyncio event loop in a background thread so
it does not block TextEmbedHandler or ControlHandler.
Concurrency: controlled by NUM_WORKERS env var (default 2). Each job
gets its own connection from a ThreadedConnectionPool, and all DB calls
run via run_in_executor so they never block the asyncio event loop.
CLAPAnalyzer._lock still serialises model inference; concurrency > 1
allows pipeline parallelism (one job waiting on DB I/O while another
runs inference).
"""
def __init__(self, worker_id: int, analyzer: CLAPAnalyzer, stop_event: threading.Event):
self.worker_id = worker_id
def __init__(self, analyzer: CLAPAnalyzer, stop_event: threading.Event):
self.analyzer = analyzer
self.stop_event = stop_event
self.redis_client = None
self.db = None
self._redis_client = None
self._db_pool = None # psycopg2.pool.ThreadedConnectionPool
def start(self):
"""Start the worker loop"""
logger.info(f"Worker {self.worker_id} starting...")
"""Start the BullMQ worker in a dedicated asyncio event loop."""
logger.info("BullMQVibeWorker starting...")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(self._run())
finally:
loop.close()
logger.info("BullMQVibeWorker stopped")
async def _run(self):
from bullmq import Worker as BullWorker
self._redis_client = redis.from_url(REDIS_URL)
# Thread-safe connection pool — each executor thread gets its own
# connection so concurrent jobs never share a psycopg2 connection.
pool_size = max(NUM_WORKERS * 2, 4)
self._db_pool = psycopg2.pool.ThreadedConnectionPool(
minconn=1,
maxconn=pool_size,
dsn=DATABASE_URL,
options="-c client_encoding=UTF8",
)
logger.info(f"Connected to PostgreSQL (pool size: {pool_size})")
bullmq_worker = BullWorker(
VIBE_QUEUE_NAME,
self._process_job,
{
"connection": REDIS_URL,
"concurrency": NUM_WORKERS,
"lockDuration": 300000, # 5 min — CLAP inference can take 3060s
},
)
bullmq_worker.on("failed", lambda job, err: logger.error(
f"[BullMQ] Job {job.id if job else '?'} failed: {err}"
))
bullmq_worker.on("error", lambda err: logger.error(f"[BullMQ] Worker error: {err}"))
try:
self.redis_client = redis.from_url(REDIS_URL)
self.db = DatabaseConnection(DATABASE_URL)
self.db.connect()
while not self.stop_event.is_set():
# Publish heartbeat for feature detection
# Publish heartbeat featureDetection checks this to enable vibe search.
# Run in executor to avoid blocking the async event loop.
try:
self.redis_client.set("clap:worker:heartbeat", str(int(time.time() * 1000)))
hb_loop = asyncio.get_running_loop()
hb_val = str(int(time.time() * 1000))
await hb_loop.run_in_executor(
None, lambda: self._redis_client.set("clap:worker:heartbeat", hb_val)
)
except Exception:
pass # Heartbeat is informational, don't crash on Redis failure
try:
self._process_job()
except psycopg2.Error as e:
logger.error(f"Worker {self.worker_id} database error: {e}")
traceback.print_exc()
self.db.reconnect()
time.sleep(SLEEP_INTERVAL)
except Exception as e:
logger.error(f"Worker {self.worker_id} error: {e}")
traceback.print_exc()
time.sleep(SLEEP_INTERVAL)
pass
await asyncio.sleep(30)
finally:
if self.db:
self.db.close()
logger.info(f"Worker {self.worker_id} stopped")
await bullmq_worker.close()
if self._db_pool:
self._db_pool.closeall()
def _process_job(self):
"""Process a single job from the queue"""
# Try to get a job from the queue (blocking with timeout)
job_data = self.redis_client.blpop(ANALYSIS_QUEUE, timeout=SLEEP_INTERVAL)
if not job_data:
return
_, raw_job = job_data
job = json.loads(raw_job)
track_id = job.get('trackId')
file_path = job.get('filePath', '')
duration = job.get('duration') # Pre-computed duration in seconds
async def _process_job(self, job, job_token: str) -> dict:
"""BullMQ job processor — called for each enrichment-vibe job."""
track_id = job.data.get("trackId")
file_path = job.data.get("filePath", "")
duration = job.data.get("duration")
if not track_id:
logger.warning(f"Invalid job (no trackId): {job}")
return
raise ValueError(f"Invalid job data (no trackId): {job.data}")
logger.info(f"Worker {self.worker_id} processing track: {track_id}")
logger.info(f"[BullMQ] Processing vibe for track: {track_id}")
# Update track status to processing
self._update_track_status(track_id, 'processing')
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._update_track_status, track_id, "processing")
# Build full path (normalize Windows-style paths)
normalized_path = file_path.replace('\\', '/')
normalized_path = file_path.replace("\\", "/")
full_path = os.path.join(MUSIC_PATH, normalized_path)
# Skip 0-byte / missing files (incomplete downloads, stubs)
try:
file_size = os.path.getsize(full_path)
if file_size == 0:
self._mark_failed(track_id, "Empty file (0 bytes) - likely incomplete download")
return
await loop.run_in_executor(
None, self._mark_failed, track_id,
"Empty file (0 bytes) - likely incomplete download",
)
raise ValueError("Empty file")
except OSError:
self._mark_failed(track_id, f"File not found: {normalized_path}")
return
await loop.run_in_executor(
None, self._mark_failed, track_id, f"File not found: {normalized_path}"
)
raise
# Generate embedding (pass duration to avoid file probe)
embedding = self.analyzer.get_audio_embedding(full_path, duration)
embedding = await loop.run_in_executor(
None, self.analyzer.get_audio_embedding, full_path, duration
)
await job.updateProgress(50)
if embedding is None:
self._mark_failed(track_id, "Failed to generate embedding")
return
await loop.run_in_executor(
None, self._mark_failed, track_id, "Failed to generate embedding"
)
raise RuntimeError("Embedding generation failed")
# Store embedding in database
success = self._store_embedding(track_id, embedding)
success = await loop.run_in_executor(None, self._store_embedding, track_id, embedding)
if success:
self._update_track_status(track_id, 'completed')
logger.info(f"Worker {self.worker_id} completed track: {track_id}")
await loop.run_in_executor(None, self._update_track_status, track_id, "completed")
await job.updateProgress(100)
logger.info(f"[BullMQ] Completed vibe for track: {track_id}")
return {"trackId": track_id, "status": "complete"}
else:
self._mark_failed(track_id, "Failed to store embedding")
await loop.run_in_executor(
None, self._mark_failed, track_id, "Failed to store embedding"
)
raise RuntimeError("Failed to store embedding")
def _update_track_status(self, track_id: str, status: str):
"""Update the track's vibe analysis status (CLAP embeddings)"""
cursor = self.db.get_cursor()
"""Update the track's vibe analysis status (runs in executor thread)."""
conn = self._db_pool.getconn()
try:
cursor.execute("""
UPDATE "Track"
SET "vibeAnalysisStatus" = %s
WHERE id = %s
""", (status, track_id))
self.db.commit()
with conn.cursor() as cursor:
cursor.execute(
'UPDATE "Track" SET "vibeAnalysisStatus" = %s WHERE id = %s',
(status, track_id),
)
conn.commit()
except Exception as e:
logger.error(f"Failed to update track vibe status: {e}")
self.db.rollback()
conn.rollback()
finally:
cursor.close()
self._db_pool.putconn(conn)
def _mark_failed(self, track_id: str, error: str):
"""Mark track as failed and record in enrichment failures"""
cursor = self.db.get_cursor()
"""Mark track as failed and record in enrichment failures (runs in executor thread)."""
track_name = None
conn = self._db_pool.getconn()
try:
# Get track name for better failure visibility
cursor.execute('SELECT title FROM "Track" WHERE id = %s', (track_id,))
row = cursor.fetchone()
track_name = row['title'] if row else None
cursor.execute("""
UPDATE "Track"
SET
"vibeAnalysisStatus" = 'failed',
"vibeAnalysisError" = %s,
"vibeAnalysisRetryCount" = COALESCE("vibeAnalysisRetryCount", 0) + 1
WHERE id = %s
""", (error[:500], track_id))
self.db.commit()
logger.error(f"Track {track_id} failed: {error}")
# Report failure to backend enrichment failure service
try:
headers = {
"Content-Type": "application/json",
"X-Internal-Secret": os.getenv("INTERNAL_API_SECRET", "")
}
requests.post(
f"{BACKEND_URL}/api/analysis/vibe/failure",
json={
"trackId": track_id,
"trackName": track_name,
"errorMessage": error[:500],
"errorCode": "VIBE_EMBEDDING_FAILED"
},
headers=headers,
timeout=5
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute('SELECT title FROM "Track" WHERE id = %s', (track_id,))
row = cursor.fetchone()
track_name = row['title'] if row else None
cursor.execute(
"""
UPDATE "Track"
SET
"vibeAnalysisStatus" = 'failed',
"vibeAnalysisError" = %s,
"vibeAnalysisRetryCount" = COALESCE("vibeAnalysisRetryCount", 0) + 1
WHERE id = %s
""",
(error[:500], track_id),
)
except Exception as report_err:
logger.warning(f"Failed to report failure to backend: {report_err}")
conn.commit()
logger.error(f"Track {track_id} failed: {error}")
except Exception as e:
logger.error(f"Failed to mark track as failed: {e}")
self.db.rollback()
conn.rollback()
finally:
cursor.close()
# Release connection before making the network call below
self._db_pool.putconn(conn)
# Report failure to backend enrichment failure service
try:
headers = {
"Content-Type": "application/json",
"X-Internal-Secret": os.getenv("INTERNAL_API_SECRET", "")
}
requests.post(
f"{BACKEND_URL}/api/analysis/vibe/failure",
json={
"trackId": track_id,
"trackName": track_name,
"errorMessage": error[:500],
"errorCode": "VIBE_EMBEDDING_FAILED"
},
headers=headers,
timeout=5
)
except Exception as report_err:
logger.warning(f"Failed to report failure to backend: {report_err}")
def _store_embedding(self, track_id: str, embedding: np.ndarray) -> bool:
"""Store the embedding in the track_embeddings table"""
cursor = self.db.get_cursor()
"""Store the embedding in track_embeddings (runs in executor thread)."""
conn = self._db_pool.getconn()
try:
# Convert numpy array to list for pgvector
# pgvector type must be registered per-connection; idempotent on repeat calls
register_vector(conn)
embedding_list = embedding.tolist()
cursor.execute("""
INSERT INTO track_embeddings (track_id, embedding, model_version, analyzed_at)
VALUES (%s, %s::vector, %s, %s)
ON CONFLICT (track_id)
DO UPDATE SET
embedding = EXCLUDED.embedding,
model_version = EXCLUDED.model_version,
analyzed_at = EXCLUDED.analyzed_at
""", (track_id, embedding_list, MODEL_VERSION, datetime.utcnow()))
self.db.commit()
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO track_embeddings (track_id, embedding, model_version, analyzed_at)
VALUES (%s, %s::vector, %s, %s)
ON CONFLICT (track_id)
DO UPDATE SET
embedding = EXCLUDED.embedding,
model_version = EXCLUDED.model_version,
analyzed_at = EXCLUDED.analyzed_at
""",
(track_id, embedding_list, MODEL_VERSION, datetime.utcnow()),
)
conn.commit()
return True
except Exception as e:
logger.error(f"Failed to store embedding for {track_id}: {e}")
traceback.print_exc()
self.db.rollback()
conn.rollback()
return False
finally:
cursor.close()
self._db_pool.putconn(conn)
class TextEmbedHandler:
@@ -684,6 +716,9 @@ class ControlHandler:
new_count = control.get('count', NUM_WORKERS)
logger.info(f"Received worker count change request: {NUM_WORKERS} -> {new_count}")
logger.info("Note: Restart the CLAP analyzer container to apply the new worker count")
elif command == 'stop':
logger.info("Received stop command — signalling worker threads to stop")
self.stop_event.set()
else:
logger.warning(f"Unknown control command: {command}")
@@ -723,14 +758,13 @@ def main():
threads = []
# Start worker threads
for i in range(NUM_WORKERS):
worker = Worker(i, analyzer, stop_event)
thread = threading.Thread(target=worker.start, name=f"Worker-{i}")
thread.daemon = True
thread.start()
threads.append(thread)
logger.info(f"Started worker thread {i}")
# Start BullMQ vibe worker (runs asyncio event loop in its own thread)
bullmq_worker = BullMQVibeWorker(analyzer, stop_event)
bullmq_thread = threading.Thread(target=bullmq_worker.start, name="BullMQVibeWorker")
bullmq_thread.daemon = True
bullmq_thread.start()
threads.append(bullmq_thread)
logger.info("Started BullMQ vibe worker thread")
# Start text embed handler thread
text_handler = TextEmbedHandler(analyzer, stop_event)
@@ -770,8 +804,7 @@ def main():
""")
remaining = cursor.fetchone()['cnt']
cursor.close()
queue_len = redis.from_url(REDIS_URL).llen(ANALYSIS_QUEUE)
if remaining == 0 and queue_len == 0:
if remaining == 0:
analyzer.unload_model()
logger.info("All tracks have embeddings, model unloaded (will reload when work arrives)")
except Exception as e:
@@ -2,7 +2,6 @@
laion-clap>=1.1.4
torch>=2.0.0
torchaudio>=2.0.0
torchvision>=0.15.0
transformers>=4.30.0
librosa>=0.10.0
@@ -10,6 +9,7 @@ librosa>=0.10.0
redis>=4.5.0
psycopg2-binary>=2.9.0
pgvector>=0.2.0
bullmq==2.19.5
# Utilities
python-dotenv>=1.0.0
+17 -16
View File
@@ -110,8 +110,6 @@ except ImportError as e:
# Worker processes import TF independently via spawn mode.
# TF_MODELS_AVAILABLE is set after MODELS dict is defined below.
TF_MODELS_AVAILABLE = False
TF_GPU_AVAILABLE = False # Detected in worker processes
TF_GPU_NAME = None
TensorflowPredictMusiCNN = None # Loaded in worker processes
# Configuration from environment
@@ -311,19 +309,7 @@ class AudioAnalyzer:
return
try:
import tensorflow as tf
# Detect and configure GPU (runs in worker processes only)
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
for gpu in gpus:
try:
tf.config.experimental.set_memory_growth(gpu, True)
except RuntimeError:
pass
logger.info(f"TensorFlow GPU detected: {gpus[0].name}")
else:
logger.info("TensorFlow running on CPU")
logger.info("TensorFlow running on CPU")
from essentia.standard import TensorflowPredict2D, TensorflowPredictMusiCNN
logger.info("Loading MusiCNN models...")
@@ -1308,7 +1294,8 @@ class AnalysisWorker:
UPDATE "Track" t
SET
"analysisStatus" = 'pending',
"analysisError" = NULL
"analysisError" = NULL,
"analysisRetryCount" = 0
WHERE t."analysisStatus" = 'failed'
AND COALESCE(t."analysisRetryCount", 0) < %s
AND NOT EXISTS (SELECT 1 FROM track_embeddings te WHERE te.track_id = t.id)
@@ -1652,6 +1639,20 @@ class AnalysisWorker:
track_id
))
self.db.commit()
# Publish completion event for Node audio completion subscriber
# Node subscribes and queues a BullMQ vibe embedding job (enrichment:vibe)
try:
completion_event = json.dumps({
"trackId": track_id,
"filePath": file_path,
"status": "complete",
})
self.redis.publish("audio:analysis:complete", completion_event)
logger.debug(f"[Essentia] Published completion for track {track_id}")
except Exception as pub_err:
logger.warning(f"[Essentia] Failed to publish completion event for {track_id}: {pub_err}")
except Exception as e:
logger.error(f"Failed to save results for {track_id}: {e}")
self.db.rollback()