Skip to main content

rodio/
queue.rs

1//! Queue that plays sounds one after the other.
2
3use std::collections::VecDeque;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7
8use crate::source::{Empty, SeekError, Source, Zero};
9use crate::Sample;
10
11use crate::common::{ChannelCount, SampleRate};
12#[cfg(feature = "crossbeam-channel")]
13use crossbeam_channel::{unbounded as channel, Receiver, Sender};
14#[cfg(not(feature = "crossbeam-channel"))]
15use std::sync::mpsc::{channel, Receiver, Sender};
16
17/// Builds a new queue. It consists of an input and an output.
18///
19/// The input can be used to add sounds to the end of the queue, while the output implements
20/// `Source` and plays the sounds.
21///
22/// The parameter indicates how the queue should behave if the queue becomes empty:
23///
24/// - If you pass `true`, then the queue is infinite and will play a silence instead until you add
25///   a new sound.
26/// - If you pass `false`, then the queue will report that it has finished playing.
27///
28pub fn queue(keep_alive_if_empty: bool) -> (Arc<SourcesQueueInput>, SourcesQueueOutput) {
29    let input = Arc::new(SourcesQueueInput {
30        next_sounds: Mutex::new(VecDeque::new()),
31        keep_alive_if_empty: AtomicBool::new(keep_alive_if_empty),
32    });
33
34    let output = SourcesQueueOutput {
35        current: Box::new(Empty::new()) as Box<_>,
36        signal_after_end: None,
37        input: input.clone(),
38        samples_consumed_in_span: 0,
39        padding_samples_remaining: 0,
40    };
41
42    (input, output)
43}
44
45// TODO: consider reimplementing this with `from_factory`
46
47type Sound = Box<dyn Source + Send>;
48type SignalDone = Option<Sender<()>>;
49
50/// The input of the queue.
51pub struct SourcesQueueInput {
52    next_sounds: Mutex<VecDeque<(Sound, SignalDone)>>,
53
54    // See constructor.
55    keep_alive_if_empty: AtomicBool,
56}
57
58impl SourcesQueueInput {
59    /// Adds a new source to the end of the queue.
60    #[inline]
61    pub fn append<T>(&self, source: T)
62    where
63        T: Source + Send + 'static,
64    {
65        self.next_sounds
66            .lock()
67            .unwrap()
68            .push_back((Box::new(source) as Box<_>, None));
69    }
70
71    /// Adds a new source to the end of the queue.
72    ///
73    /// The `Receiver` will be signalled when the sound has finished playing.
74    ///
75    /// Enable the feature flag `crossbeam-channel` in rodio to use a `crossbeam_channel::Receiver` instead.
76    #[inline]
77    pub fn append_with_signal<T>(&self, source: T) -> Receiver<()>
78    where
79        T: Source + Send + 'static,
80    {
81        let (tx, rx) = channel();
82        self.next_sounds
83            .lock()
84            .unwrap()
85            .push_back((Box::new(source) as Box<_>, Some(tx)));
86        rx
87    }
88
89    /// Sets whether the queue stays alive if there's no more sound to play.
90    ///
91    /// See also the constructor.
92    pub fn set_keep_alive_if_empty(&self, keep_alive_if_empty: bool) {
93        self.keep_alive_if_empty
94            .store(keep_alive_if_empty, Ordering::Release);
95    }
96
97    /// Removes all the sounds from the queue. Returns the number of sounds cleared.
98    pub fn clear(&self) -> usize {
99        let mut sounds = self.next_sounds.lock().unwrap();
100        let len = sounds.len();
101        sounds.clear();
102        len
103    }
104}
105/// The output of the queue. Implements `Source`.
106pub struct SourcesQueueOutput {
107    // The current iterator that produces samples.
108    current: Box<dyn Source + Send>,
109
110    // Signal this sender before picking from `next`.
111    signal_after_end: Option<Sender<()>>,
112
113    // The next sounds.
114    input: Arc<SourcesQueueInput>,
115
116    // Track samples consumed in the current span to detect mid-span endings.
117    samples_consumed_in_span: usize,
118
119    // When a source ends mid-frame, this counts how many silence samples to inject
120    // to complete the frame before transitioning to the next source.
121    padding_samples_remaining: usize,
122}
123
124/// Returns a threshold span length that ensures frame alignment.
125///
126/// Spans must end on frame boundaries (multiples of channel count) to prevent
127/// channel misalignment. Returns ~512 samples rounded to the nearest frame.
128#[inline]
129fn threshold(channels: ChannelCount) -> usize {
130    const BASE_SAMPLES: usize = 512;
131    let ch = channels.get() as usize;
132    BASE_SAMPLES.div_ceil(ch) * ch
133}
134
135impl Source for SourcesQueueOutput {
136    #[inline]
137    fn current_span_len(&self) -> Option<usize> {
138        // This function is non-trivial because the boundary between two sounds in the queue should
139        // be a span boundary as well.
140        //
141        // The current sound is free to return `None` for `current_span_len()`, in which case
142        // we *should* return the number of samples remaining the current sound.
143        // This can be estimated with `size_hint()`.
144        //
145        // If the `size_hint` is `None` as well, we are in the worst case scenario. To handle this
146        // situation we force a span to have a maximum number of samples indicate by this
147        // constant.
148
149        // Try the current `current_span_len`.
150        if !self.current.is_exhausted() {
151            return self.current.current_span_len();
152        } else if self.input.keep_alive_if_empty.load(Ordering::Acquire)
153            && self.input.next_sounds.lock().unwrap().is_empty()
154        {
155            // The next source will be a filler silence which will have a frame-aligned length
156            return Some(threshold(self.current.channels()));
157        }
158
159        // Try the size hint.
160        let (lower_bound, _) = self.current.size_hint();
161        // The iterator default implementation just returns 0.
162        // That's a problematic value, so skip it.
163        if lower_bound > 0 {
164            return Some(lower_bound);
165        }
166
167        // Otherwise we use a frame-aligned threshold value.
168        Some(threshold(self.current.channels()))
169    }
170
171    #[inline]
172    fn channels(&self) -> ChannelCount {
173        if !self.current.is_exhausted() {
174            // Current source is active (producing samples)
175            // - Initially: never (Empty is exhausted immediately)
176            // - After append: the appended source while playing
177            // - With keep_alive: Zero (silence) while playing
178            self.current.channels()
179        } else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().front() {
180            // Current source exhausted, peek at next queued source
181            // This is critical: UniformSourceIterator queries metadata during append,
182            // before any samples are pulled. We must report the next source's metadata.
183            next.channels()
184        } else {
185            // Queue is empty, no sources queued
186            // - Initially: Empty
187            // - With keep_alive: exhausted Zero between silence chunks (matches Empty)
188            // - Without keep_alive: Empty (will end on next())
189            self.current.channels()
190        }
191    }
192
193    #[inline]
194    fn sample_rate(&self) -> SampleRate {
195        if !self.current.is_exhausted() {
196            // Current source is active (producing samples)
197            self.current.sample_rate()
198        } else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().front() {
199            // Current source exhausted, peek at next queued source
200            // This prevents wrong resampling setup in UniformSourceIterator
201            next.sample_rate()
202        } else {
203            // Queue is empty, no sources queued
204            self.current.sample_rate()
205        }
206    }
207
208    #[inline]
209    fn total_duration(&self) -> Option<Duration> {
210        None
211    }
212
213    /// Only seeks within the current source.
214    // We can not go back to previous sources. We could implement seek such
215    // that it advances the queue if the position is beyond the current song.
216    //
217    // We would then however need to enable seeking backwards across sources too.
218    // That no longer seems in line with the queue behaviour.
219    //
220    // A final pain point is that we would need the total duration for the
221    // next few songs.
222    #[inline]
223    fn try_seek(&mut self, pos: Duration) -> Result<(), SeekError> {
224        self.current.try_seek(pos)
225    }
226}
227
228impl Iterator for SourcesQueueOutput {
229    type Item = Sample;
230
231    #[inline]
232    fn next(&mut self) -> Option<Self::Item> {
233        loop {
234            // If we're padding to complete a frame, return silence.
235            if self.padding_samples_remaining > 0 {
236                self.padding_samples_remaining -= 1;
237                return Some(0.0);
238            }
239
240            // Basic situation that will happen most of the time.
241            if let Some(sample) = self.current.next() {
242                self.samples_consumed_in_span += 1;
243                return Some(sample);
244            }
245
246            // Source ended - check if we ended mid-frame and need padding.
247            let channels = self.current.channels().get() as usize;
248            let incomplete_frame_samples = self.samples_consumed_in_span % channels;
249            if incomplete_frame_samples > 0 {
250                // We're mid-frame - need to pad with silence to complete it.
251                self.padding_samples_remaining = channels - incomplete_frame_samples;
252                // Reset counter now since we're transitioning to a new span.
253                self.samples_consumed_in_span = 0;
254                // Continue loop - next iteration will inject silence.
255                continue;
256            }
257
258            // Reset counter and move to next sound.
259            // In order to avoid inlining this expensive operation, the code is in another function.
260            self.samples_consumed_in_span = 0;
261            if self.go_next().is_err() {
262                return None;
263            }
264        }
265    }
266
267    #[inline]
268    fn size_hint(&self) -> (usize, Option<usize>) {
269        (self.current.size_hint().0, None)
270    }
271}
272
273impl SourcesQueueOutput {
274    // Called when `current` is empty, and we must jump to the next element.
275    // Returns `Ok` if the sound should continue playing, or an error if it should stop.
276    //
277    // This method is separate so that it is not inlined.
278    fn go_next(&mut self) -> Result<(), ()> {
279        if let Some(signal_after_end) = self.signal_after_end.take() {
280            let _ = signal_after_end.send(());
281        }
282
283        let (next, signal_after_end) = {
284            let mut next = self.input.next_sounds.lock().unwrap();
285
286            if let Some(next) = next.pop_front() {
287                next
288            } else {
289                let channels = self.current.channels();
290                let silence = Box::new(Zero::new_samples(
291                    channels,
292                    self.current.sample_rate(),
293                    threshold(channels),
294                )) as Box<_>;
295                if self.input.keep_alive_if_empty.load(Ordering::Acquire) {
296                    // Play a short silence in order to avoid spinlocking.
297                    (silence, None)
298                } else {
299                    return Err(());
300                }
301            }
302        };
303
304        self.current = next;
305        self.signal_after_end = signal_after_end;
306        Ok(())
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use crate::buffer::SamplesBuffer;
313    use crate::math::nz;
314    use crate::source::{SeekError, Source};
315    use crate::{queue, ChannelCount, Sample, SampleRate};
316    use std::time::Duration;
317
318    #[test]
319    fn basic() {
320        let (tx, mut rx) = queue::queue(false);
321
322        tx.append(SamplesBuffer::new(
323            nz!(1),
324            nz!(48000),
325            vec![10.0, -10.0, 10.0, -10.0],
326        ));
327        tx.append(SamplesBuffer::new(
328            nz!(2),
329            nz!(96000),
330            vec![5.0, 5.0, 5.0, 5.0],
331        ));
332
333        assert_eq!(rx.channels(), nz!(1));
334        assert_eq!(rx.sample_rate().get(), 48000);
335        assert_eq!(rx.next(), Some(10.0));
336        assert_eq!(rx.next(), Some(-10.0));
337        assert_eq!(rx.next(), Some(10.0));
338        assert_eq!(rx.next(), Some(-10.0));
339        assert_eq!(rx.channels(), nz!(2));
340        assert_eq!(rx.sample_rate().get(), 96000);
341        assert_eq!(rx.next(), Some(5.0));
342        assert_eq!(rx.next(), Some(5.0));
343        assert_eq!(rx.next(), Some(5.0));
344        assert_eq!(rx.next(), Some(5.0));
345        assert_eq!(rx.next(), None);
346    }
347
348    #[test]
349    fn immediate_end() {
350        let (_, mut rx) = queue::queue(false);
351        assert_eq!(rx.next(), None);
352    }
353
354    #[test]
355    fn keep_alive() {
356        let (tx, mut rx) = queue::queue(true);
357        tx.append(SamplesBuffer::new(
358            nz!(1),
359            nz!(48000),
360            vec![10.0, -10.0, 10.0, -10.0],
361        ));
362
363        assert_eq!(rx.next(), Some(10.0));
364        assert_eq!(rx.next(), Some(-10.0));
365        assert_eq!(rx.next(), Some(10.0));
366        assert_eq!(rx.next(), Some(-10.0));
367
368        for _ in 0..100000 {
369            assert_eq!(rx.next(), Some(0.0));
370        }
371    }
372
373    #[test]
374    #[ignore] // TODO: not yet implemented
375    fn no_delay_when_added() {
376        let (tx, mut rx) = queue::queue(true);
377
378        for _ in 0..500 {
379            assert_eq!(rx.next(), Some(0.0));
380        }
381
382        tx.append(SamplesBuffer::new(
383            nz!(1),
384            nz!(48000),
385            vec![10.0, -10.0, 10.0, -10.0],
386        ));
387        assert_eq!(rx.next(), Some(10.0));
388        assert_eq!(rx.next(), Some(-10.0));
389        assert_eq!(rx.next(), Some(10.0));
390        assert_eq!(rx.next(), Some(-10.0));
391    }
392
393    #[test]
394    fn append_updates_metadata() {
395        for keep_alive in [false, true] {
396            let (tx, rx) = queue::queue(keep_alive);
397            assert_eq!(
398                rx.channels(),
399                nz!(1),
400                "Initial channels should be 1 (keep_alive={keep_alive})"
401            );
402            assert_eq!(
403                rx.sample_rate(),
404                nz!(48000),
405                "Initial sample rate should be 48000 (keep_alive={keep_alive})"
406            );
407
408            tx.append(SamplesBuffer::new(
409                nz!(2),
410                nz!(44100),
411                vec![0.1, 0.2, 0.3, 0.4],
412            ));
413
414            assert_eq!(
415                rx.channels(),
416                nz!(2),
417                "Channels should update to 2 (keep_alive={keep_alive})"
418            );
419            assert_eq!(
420                rx.sample_rate(),
421                nz!(44100),
422                "Sample rate should update to 44100 (keep_alive={keep_alive})"
423            );
424        }
425    }
426
427    #[test]
428    fn span_ending_mid_frame() {
429        let mut test_source1 = TestSource::new(&[0.1, 0.2, 0.1, 0.2, 0.1])
430            .with_channels(nz!(2))
431            .with_false_span_len(Some(6));
432        let mut test_source2 = TestSource::new(&[0.3, 0.4, 0.3, 0.4]).with_channels(nz!(2));
433
434        let (controls, mut source) = queue::queue(true);
435        controls.append(test_source1.clone());
436        controls.append(test_source2.clone());
437
438        assert_eq!(source.next(), test_source1.next());
439        assert_eq!(source.next(), test_source1.next());
440        assert_eq!(source.next(), test_source1.next());
441        assert_eq!(source.next(), test_source1.next());
442        assert_eq!(source.next(), test_source1.next());
443        assert_eq!(None, test_source1.next());
444
445        // Source promised span of 6 but only delivered 5 samples.
446        // With 2 channels, that's 2.5 frames. Queue should pad with silence.
447        assert_eq!(
448            source.next(),
449            Some(0.0),
450            "Expected silence to complete frame"
451        );
452
453        assert_eq!(source.next(), test_source2.next());
454        assert_eq!(source.next(), test_source2.next());
455        assert_eq!(source.next(), test_source2.next());
456        assert_eq!(source.next(), test_source2.next());
457    }
458
459    /// Test helper source that allows setting false span length to simulate
460    /// sources that end before their promised span length.
461    #[derive(Debug, Clone)]
462    struct TestSource {
463        samples: Vec<Sample>,
464        pos: usize,
465        channels: ChannelCount,
466        sample_rate: SampleRate,
467        total_span_len: Option<usize>,
468    }
469
470    impl TestSource {
471        fn new(samples: &[Sample]) -> Self {
472            let samples = samples.to_vec();
473            Self {
474                total_span_len: Some(samples.len()),
475                pos: 0,
476                channels: nz!(1),
477                sample_rate: nz!(44100),
478                samples,
479            }
480        }
481
482        fn with_channels(mut self, count: ChannelCount) -> Self {
483            self.channels = count;
484            self
485        }
486
487        fn with_false_span_len(mut self, total_len: Option<usize>) -> Self {
488            self.total_span_len = total_len;
489            self
490        }
491    }
492
493    impl Iterator for TestSource {
494        type Item = Sample;
495
496        fn next(&mut self) -> Option<Self::Item> {
497            let res = self.samples.get(self.pos).copied();
498            self.pos += 1;
499            res
500        }
501
502        fn size_hint(&self) -> (usize, Option<usize>) {
503            let remaining = self.samples.len().saturating_sub(self.pos);
504            (remaining, Some(remaining))
505        }
506    }
507
508    impl Source for TestSource {
509        fn current_span_len(&self) -> Option<usize> {
510            self.total_span_len
511        }
512
513        fn channels(&self) -> ChannelCount {
514            self.channels
515        }
516
517        fn sample_rate(&self) -> SampleRate {
518            self.sample_rate
519        }
520
521        fn total_duration(&self) -> Option<Duration> {
522            None
523        }
524
525        fn try_seek(&mut self, _: Duration) -> Result<(), SeekError> {
526            Err(SeekError::NotSupported {
527                underlying_source: std::any::type_name::<Self>(),
528            })
529        }
530    }
531}