Files
navidrome/core/stream/media_streamer_test.go
Deluan 833c50adc7 test(stream): fix data race in MediaStreamer transcoding cap tests
The three It blocks that build a tight-cap streamer each spawned a fresh
transcoding cache without waiting for its background initialization. The
init goroutine reads conf.Server.CacheFolder, which races against
SnapshotConfig's pointer-swap restore (Server = &restored) fired by
DeferCleanup at the end of the spec. CI tripped the race under
-shuffle=on -race; locally it reproduced about 10% of the time.

Wait for tightCache.Available() before constructing the streamer, mirroring
the outer BeforeEach. For the slot-saturation spec, swap in a blocking
io.Pipe-backed mock ffmpeg so the cache's background copyAndClose can't
drain the source and release the slot — the previous behavior happened to
work only because the cache wasn't yet available and the no-cache path was
exercised.
2026-05-28 00:07:49 -03:00

143 lines
5.4 KiB
Go

package stream_test
import (
"context"
"errors"
"io"
"os"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/conf/configtest"
"github.com/navidrome/navidrome/core/stream"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/model/request"
"github.com/navidrome/navidrome/tests"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("MediaStreamer", func() {
var streamer stream.MediaStreamer
var ds model.DataStore
ffmpeg := tests.NewMockFFmpeg("fake data")
ctx := log.NewContext(context.TODO())
BeforeEach(func() {
DeferCleanup(configtest.SetupConfig())
cacheDir, _ := os.MkdirTemp("", "file_caches")
conf.Server.CacheFolder = conf.NewDir(cacheDir)
conf.Server.TranscodingCacheSize = "100MB"
ds = &tests.MockDataStore{MockedTranscoding: &tests.MockTranscodingRepo{}}
ds.MediaFile(ctx).(*tests.MockMediaFileRepo).SetData(model.MediaFiles{
{ID: "123", Path: "tests/fixtures/test.mp3", Suffix: "mp3", BitRate: 128, Duration: 257.0},
})
testCache := stream.NewTranscodingCache()
Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue())
streamer = stream.NewMediaStreamer(ds, ffmpeg, testCache)
})
AfterEach(func() {
_ = os.RemoveAll(conf.Server.CacheFolder.String())
})
Context("NewStream", func() {
var mf *model.MediaFile
BeforeEach(func() {
var err error
mf, err = ds.MediaFile(ctx).Get("123")
Expect(err).ToNot(HaveOccurred())
})
It("returns a seekable stream if format is 'raw'", func() {
s, err := streamer.NewStream(ctx, mf, stream.Request{Format: "raw"})
Expect(err).ToNot(HaveOccurred())
Expect(s.Seekable()).To(BeTrue())
})
It("returns a seekable stream if no format is specified (direct play)", func() {
s, err := streamer.NewStream(ctx, mf, stream.Request{})
Expect(err).ToNot(HaveOccurred())
Expect(s.Seekable()).To(BeTrue())
})
It("returns a NON seekable stream if transcode is required", func() {
s, err := streamer.NewStream(ctx, mf, stream.Request{Format: "mp3", BitRate: 64})
Expect(err).To(BeNil())
Expect(s.Seekable()).To(BeFalse())
Expect(s.Duration()).To(Equal(float32(257.0)))
})
It("rejects transcode requests beyond MaxConcurrent with ErrTooManyTranscodes", func() {
// Use an ffmpeg whose Read blocks indefinitely so the cache's
// background copy can't drain the source and release the slot —
// keeping the single transcode slot pinned for this test.
pr, pw := io.Pipe()
DeferCleanup(func() { _ = pw.Close() })
blockingFFmpeg := tests.NewMockFFmpeg("")
blockingFFmpeg.Reader = pr
conf.Server.Transcoding.MaxConcurrent = 1
conf.Server.Transcoding.MaxConcurrentPerUser = 0
tightCache := stream.NewTranscodingCache()
Eventually(func() bool { return tightCache.Available(context.TODO()) }).Should(BeTrue())
tightStreamer := stream.NewMediaStreamer(ds, blockingFFmpeg, tightCache)
userCtx := request.WithUsername(ctx, "alice")
s1, err := tightStreamer.NewStream(userCtx, mf, stream.Request{Format: "mp3", BitRate: 64})
Expect(err).ToNot(HaveOccurred())
defer s1.Close()
// Different cache key so it doesn't dedupe with the first request.
_, err = tightStreamer.NewStream(userCtx, mf, stream.Request{Format: "mp3", BitRate: 96})
Expect(errors.Is(err, stream.ErrTooManyTranscodes)).To(BeTrue())
})
It("releases the slot once the stream is closed", func() {
conf.Server.Transcoding.MaxConcurrent = 1
conf.Server.Transcoding.MaxConcurrentPerUser = 0
tightCache := stream.NewTranscodingCache()
Eventually(func() bool { return tightCache.Available(context.TODO()) }).Should(BeTrue())
tightStreamer := stream.NewMediaStreamer(ds, ffmpeg, tightCache)
userCtx := request.WithUsername(ctx, "alice")
s1, err := tightStreamer.NewStream(userCtx, mf, stream.Request{Format: "mp3", BitRate: 64})
Expect(err).ToNot(HaveOccurred())
_, _ = io.ReadAll(s1)
_ = s1.Close()
Eventually(func() bool { return ffmpeg.IsClosed() }, "3s").Should(BeTrue())
// Slot should now be free for a different transcode.
s2, err := tightStreamer.NewStream(userCtx, mf, stream.Request{Format: "mp3", BitRate: 96})
Expect(err).ToNot(HaveOccurred())
defer s2.Close()
})
It("does not consume a slot for raw streams", func() {
conf.Server.Transcoding.MaxConcurrent = 1
conf.Server.Transcoding.MaxConcurrentPerUser = 0
tightCache := stream.NewTranscodingCache()
Eventually(func() bool { return tightCache.Available(context.TODO()) }).Should(BeTrue())
tightStreamer := stream.NewMediaStreamer(ds, ffmpeg, tightCache)
userCtx := request.WithUsername(ctx, "alice")
// First, saturate the single transcode slot.
s1, err := tightStreamer.NewStream(userCtx, mf, stream.Request{Format: "mp3", BitRate: 64})
Expect(err).ToNot(HaveOccurred())
defer s1.Close()
// Raw stream must still succeed.
s2, err := tightStreamer.NewStream(userCtx, mf, stream.Request{Format: "raw"})
Expect(err).ToNot(HaveOccurred())
defer s2.Close()
})
It("returns a seekable stream if the file is complete in the cache", func() {
s, err := streamer.NewStream(ctx, mf, stream.Request{Format: "mp3", BitRate: 32})
Expect(err).To(BeNil())
_, _ = io.ReadAll(s)
_ = s.Close()
Eventually(func() bool { return ffmpeg.IsClosed() }, "3s").Should(BeTrue())
s, err = streamer.NewStream(ctx, mf, stream.Request{Format: "mp3", BitRate: 32})
Expect(err).To(BeNil())
Expect(s.Seekable()).To(BeTrue())
})
})
})