mirror of
https://github.com/navidrome/navidrome.git
synced 2026-06-19 07:37:15 +00:00
833c50adc7
The three It blocks that build a tight-cap streamer each spawned a fresh transcoding cache without waiting for its background initialization. The init goroutine reads conf.Server.CacheFolder, which races against SnapshotConfig's pointer-swap restore (Server = &restored) fired by DeferCleanup at the end of the spec. CI tripped the race under -shuffle=on -race; locally it reproduced about 10% of the time. Wait for tightCache.Available() before constructing the streamer, mirroring the outer BeforeEach. For the slot-saturation spec, swap in a blocking io.Pipe-backed mock ffmpeg so the cache's background copyAndClose can't drain the source and release the slot — the previous behavior happened to work only because the cache wasn't yet available and the no-cache path was exercised.
143 lines
5.4 KiB
Go
143 lines
5.4 KiB
Go
package stream_test
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"os"
|
|
|
|
"github.com/navidrome/navidrome/conf"
|
|
"github.com/navidrome/navidrome/conf/configtest"
|
|
"github.com/navidrome/navidrome/core/stream"
|
|
"github.com/navidrome/navidrome/log"
|
|
"github.com/navidrome/navidrome/model"
|
|
"github.com/navidrome/navidrome/model/request"
|
|
"github.com/navidrome/navidrome/tests"
|
|
. "github.com/onsi/ginkgo/v2"
|
|
. "github.com/onsi/gomega"
|
|
)
|
|
|
|
var _ = Describe("MediaStreamer", func() {
|
|
var streamer stream.MediaStreamer
|
|
var ds model.DataStore
|
|
ffmpeg := tests.NewMockFFmpeg("fake data")
|
|
ctx := log.NewContext(context.TODO())
|
|
|
|
BeforeEach(func() {
|
|
DeferCleanup(configtest.SetupConfig())
|
|
cacheDir, _ := os.MkdirTemp("", "file_caches")
|
|
conf.Server.CacheFolder = conf.NewDir(cacheDir)
|
|
conf.Server.TranscodingCacheSize = "100MB"
|
|
ds = &tests.MockDataStore{MockedTranscoding: &tests.MockTranscodingRepo{}}
|
|
ds.MediaFile(ctx).(*tests.MockMediaFileRepo).SetData(model.MediaFiles{
|
|
{ID: "123", Path: "tests/fixtures/test.mp3", Suffix: "mp3", BitRate: 128, Duration: 257.0},
|
|
})
|
|
testCache := stream.NewTranscodingCache()
|
|
Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue())
|
|
streamer = stream.NewMediaStreamer(ds, ffmpeg, testCache)
|
|
})
|
|
AfterEach(func() {
|
|
_ = os.RemoveAll(conf.Server.CacheFolder.String())
|
|
})
|
|
|
|
Context("NewStream", func() {
|
|
var mf *model.MediaFile
|
|
BeforeEach(func() {
|
|
var err error
|
|
mf, err = ds.MediaFile(ctx).Get("123")
|
|
Expect(err).ToNot(HaveOccurred())
|
|
})
|
|
It("returns a seekable stream if format is 'raw'", func() {
|
|
s, err := streamer.NewStream(ctx, mf, stream.Request{Format: "raw"})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(s.Seekable()).To(BeTrue())
|
|
})
|
|
It("returns a seekable stream if no format is specified (direct play)", func() {
|
|
s, err := streamer.NewStream(ctx, mf, stream.Request{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(s.Seekable()).To(BeTrue())
|
|
})
|
|
It("returns a NON seekable stream if transcode is required", func() {
|
|
s, err := streamer.NewStream(ctx, mf, stream.Request{Format: "mp3", BitRate: 64})
|
|
Expect(err).To(BeNil())
|
|
Expect(s.Seekable()).To(BeFalse())
|
|
Expect(s.Duration()).To(Equal(float32(257.0)))
|
|
})
|
|
It("rejects transcode requests beyond MaxConcurrent with ErrTooManyTranscodes", func() {
|
|
// Use an ffmpeg whose Read blocks indefinitely so the cache's
|
|
// background copy can't drain the source and release the slot —
|
|
// keeping the single transcode slot pinned for this test.
|
|
pr, pw := io.Pipe()
|
|
DeferCleanup(func() { _ = pw.Close() })
|
|
blockingFFmpeg := tests.NewMockFFmpeg("")
|
|
blockingFFmpeg.Reader = pr
|
|
|
|
conf.Server.Transcoding.MaxConcurrent = 1
|
|
conf.Server.Transcoding.MaxConcurrentPerUser = 0
|
|
tightCache := stream.NewTranscodingCache()
|
|
Eventually(func() bool { return tightCache.Available(context.TODO()) }).Should(BeTrue())
|
|
tightStreamer := stream.NewMediaStreamer(ds, blockingFFmpeg, tightCache)
|
|
|
|
userCtx := request.WithUsername(ctx, "alice")
|
|
s1, err := tightStreamer.NewStream(userCtx, mf, stream.Request{Format: "mp3", BitRate: 64})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
defer s1.Close()
|
|
|
|
// Different cache key so it doesn't dedupe with the first request.
|
|
_, err = tightStreamer.NewStream(userCtx, mf, stream.Request{Format: "mp3", BitRate: 96})
|
|
Expect(errors.Is(err, stream.ErrTooManyTranscodes)).To(BeTrue())
|
|
})
|
|
|
|
It("releases the slot once the stream is closed", func() {
|
|
conf.Server.Transcoding.MaxConcurrent = 1
|
|
conf.Server.Transcoding.MaxConcurrentPerUser = 0
|
|
tightCache := stream.NewTranscodingCache()
|
|
Eventually(func() bool { return tightCache.Available(context.TODO()) }).Should(BeTrue())
|
|
tightStreamer := stream.NewMediaStreamer(ds, ffmpeg, tightCache)
|
|
|
|
userCtx := request.WithUsername(ctx, "alice")
|
|
s1, err := tightStreamer.NewStream(userCtx, mf, stream.Request{Format: "mp3", BitRate: 64})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
_, _ = io.ReadAll(s1)
|
|
_ = s1.Close()
|
|
Eventually(func() bool { return ffmpeg.IsClosed() }, "3s").Should(BeTrue())
|
|
|
|
// Slot should now be free for a different transcode.
|
|
s2, err := tightStreamer.NewStream(userCtx, mf, stream.Request{Format: "mp3", BitRate: 96})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
defer s2.Close()
|
|
})
|
|
|
|
It("does not consume a slot for raw streams", func() {
|
|
conf.Server.Transcoding.MaxConcurrent = 1
|
|
conf.Server.Transcoding.MaxConcurrentPerUser = 0
|
|
tightCache := stream.NewTranscodingCache()
|
|
Eventually(func() bool { return tightCache.Available(context.TODO()) }).Should(BeTrue())
|
|
tightStreamer := stream.NewMediaStreamer(ds, ffmpeg, tightCache)
|
|
|
|
userCtx := request.WithUsername(ctx, "alice")
|
|
// First, saturate the single transcode slot.
|
|
s1, err := tightStreamer.NewStream(userCtx, mf, stream.Request{Format: "mp3", BitRate: 64})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
defer s1.Close()
|
|
|
|
// Raw stream must still succeed.
|
|
s2, err := tightStreamer.NewStream(userCtx, mf, stream.Request{Format: "raw"})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
defer s2.Close()
|
|
})
|
|
|
|
It("returns a seekable stream if the file is complete in the cache", func() {
|
|
s, err := streamer.NewStream(ctx, mf, stream.Request{Format: "mp3", BitRate: 32})
|
|
Expect(err).To(BeNil())
|
|
_, _ = io.ReadAll(s)
|
|
_ = s.Close()
|
|
Eventually(func() bool { return ffmpeg.IsClosed() }, "3s").Should(BeTrue())
|
|
|
|
s, err = streamer.NewStream(ctx, mf, stream.Request{Format: "mp3", BitRate: 32})
|
|
Expect(err).To(BeNil())
|
|
Expect(s.Seekable()).To(BeTrue())
|
|
})
|
|
})
|
|
})
|