rodio/
queue.rs

1//! Queue that plays sounds one after the other.
2
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use crate::source::{Empty, SeekError, Source, Zero};
8use crate::Sample;
9
10use crate::common::{ChannelCount, SampleRate};
11#[cfg(feature = "crossbeam-channel")]
12use crossbeam_channel::{unbounded as channel, Receiver, Sender};
13#[cfg(not(feature = "crossbeam-channel"))]
14use std::sync::mpsc::{channel, Receiver, Sender};
15
16/// Builds a new queue. It consists of an input and an output.
17///
18/// The input can be used to add sounds to the end of the queue, while the output implements
19/// `Source` and plays the sounds.
20///
21/// The parameter indicates how the queue should behave if the queue becomes empty:
22///
23/// - If you pass `true`, then the queue is infinite and will play a silence instead until you add
24///   a new sound.
25/// - If you pass `false`, then the queue will report that it has finished playing.
26///
27pub fn queue(keep_alive_if_empty: bool) -> (Arc<SourcesQueueInput>, SourcesQueueOutput) {
28    let input = Arc::new(SourcesQueueInput {
29        next_sounds: Mutex::new(Vec::new()),
30        keep_alive_if_empty: AtomicBool::new(keep_alive_if_empty),
31    });
32
33    let output = SourcesQueueOutput {
34        current: Box::new(Empty::new()) as Box<_>,
35        signal_after_end: None,
36        input: input.clone(),
37    };
38
39    (input, output)
40}
41
42// TODO: consider reimplementing this with `from_factory`
43
44type Sound = Box<dyn Source + Send>;
45type SignalDone = Option<Sender<()>>;
46
47/// The input of the queue.
48pub struct SourcesQueueInput {
49    next_sounds: Mutex<Vec<(Sound, SignalDone)>>,
50
51    // See constructor.
52    keep_alive_if_empty: AtomicBool,
53}
54
55impl SourcesQueueInput {
56    /// Adds a new source to the end of the queue.
57    #[inline]
58    pub fn append<T>(&self, source: T)
59    where
60        T: Source + Send + 'static,
61    {
62        self.next_sounds
63            .lock()
64            .unwrap()
65            .push((Box::new(source) as Box<_>, None));
66    }
67
68    /// Adds a new source to the end of the queue.
69    ///
70    /// The `Receiver` will be signalled when the sound has finished playing.
71    ///
72    /// Enable the feature flag `crossbeam-channel` in rodio to use a `crossbeam_channel::Receiver` instead.
73    #[inline]
74    pub fn append_with_signal<T>(&self, source: T) -> Receiver<()>
75    where
76        T: Source + Send + 'static,
77    {
78        let (tx, rx) = channel();
79        self.next_sounds
80            .lock()
81            .unwrap()
82            .push((Box::new(source) as Box<_>, Some(tx)));
83        rx
84    }
85
86    /// Sets whether the queue stays alive if there's no more sound to play.
87    ///
88    /// See also the constructor.
89    pub fn set_keep_alive_if_empty(&self, keep_alive_if_empty: bool) {
90        self.keep_alive_if_empty
91            .store(keep_alive_if_empty, Ordering::Release);
92    }
93
94    /// Removes all the sounds from the queue. Returns the number of sounds cleared.
95    pub fn clear(&self) -> usize {
96        let mut sounds = self.next_sounds.lock().unwrap();
97        let len = sounds.len();
98        sounds.clear();
99        len
100    }
101}
102/// The output of the queue. Implements `Source`.
103pub struct SourcesQueueOutput {
104    // The current iterator that produces samples.
105    current: Box<dyn Source + Send>,
106
107    // Signal this sender before picking from `next`.
108    signal_after_end: Option<Sender<()>>,
109
110    // The next sounds.
111    input: Arc<SourcesQueueInput>,
112}
113
114const THRESHOLD: usize = 512;
115
116impl Source for SourcesQueueOutput {
117    #[inline]
118    fn current_span_len(&self) -> Option<usize> {
119        // This function is non-trivial because the boundary between two sounds in the queue should
120        // be a span boundary as well.
121        //
122        // The current sound is free to return `None` for `current_span_len()`, in which case
123        // we *should* return the number of samples remaining the current sound.
124        // This can be estimated with `size_hint()`.
125        //
126        // If the `size_hint` is `None` as well, we are in the worst case scenario. To handle this
127        // situation we force a span to have a maximum number of samples indicate by this
128        // constant.
129
130        // Try the current `current_span_len`.
131        if let Some(val) = self.current.current_span_len() {
132            if val != 0 {
133                return Some(val);
134            } else if self.input.keep_alive_if_empty.load(Ordering::Acquire)
135                && self.input.next_sounds.lock().unwrap().is_empty()
136            {
137                // The next source will be a filler silence which will have the length of `THRESHOLD`
138                return Some(THRESHOLD);
139            }
140        }
141
142        // Try the size hint.
143        let (lower_bound, _) = self.current.size_hint();
144        // The iterator default implementation just returns 0.
145        // That's a problematic value, so skip it.
146        if lower_bound > 0 {
147            return Some(lower_bound);
148        }
149
150        // Otherwise we use the constant value.
151        Some(THRESHOLD)
152    }
153
154    #[inline]
155    fn channels(&self) -> ChannelCount {
156        self.current.channels()
157    }
158
159    #[inline]
160    fn sample_rate(&self) -> SampleRate {
161        self.current.sample_rate()
162    }
163
164    #[inline]
165    fn total_duration(&self) -> Option<Duration> {
166        None
167    }
168
169    /// Only seeks within the current source.
170    // We can not go back to previous sources. We could implement seek such
171    // that it advances the queue if the position is beyond the current song.
172    //
173    // We would then however need to enable seeking backwards across sources too.
174    // That no longer seems in line with the queue behaviour.
175    //
176    // A final pain point is that we would need the total duration for the
177    // next few songs.
178    #[inline]
179    fn try_seek(&mut self, pos: Duration) -> Result<(), SeekError> {
180        self.current.try_seek(pos)
181    }
182}
183
184impl Iterator for SourcesQueueOutput {
185    type Item = Sample;
186
187    #[inline]
188    fn next(&mut self) -> Option<Self::Item> {
189        loop {
190            // Basic situation that will happen most of the time.
191            if let Some(sample) = self.current.next() {
192                return Some(sample);
193            }
194
195            // Since `self.current` has finished, we need to pick the next sound.
196            // In order to avoid inlining this expensive operation, the code is in another function.
197            if self.go_next().is_err() {
198                return None;
199            }
200        }
201    }
202
203    #[inline]
204    fn size_hint(&self) -> (usize, Option<usize>) {
205        (self.current.size_hint().0, None)
206    }
207}
208
209impl SourcesQueueOutput {
210    // Called when `current` is empty, and we must jump to the next element.
211    // Returns `Ok` if the sound should continue playing, or an error if it should stop.
212    //
213    // This method is separate so that it is not inlined.
214    fn go_next(&mut self) -> Result<(), ()> {
215        if let Some(signal_after_end) = self.signal_after_end.take() {
216            let _ = signal_after_end.send(());
217        }
218
219        let (next, signal_after_end) = {
220            let mut next = self.input.next_sounds.lock().unwrap();
221
222            if next.is_empty() {
223                let silence = Box::new(Zero::new_samples(1, 44100, THRESHOLD)) as Box<_>;
224                if self.input.keep_alive_if_empty.load(Ordering::Acquire) {
225                    // Play a short silence in order to avoid spinlocking.
226                    (silence, None)
227                } else {
228                    return Err(());
229                }
230            } else {
231                next.remove(0)
232            }
233        };
234
235        self.current = next;
236        self.signal_after_end = signal_after_end;
237        Ok(())
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use crate::buffer::SamplesBuffer;
244    use crate::queue;
245    use crate::source::Source;
246
247    #[test]
248    #[ignore] // FIXME: samples rate and channel not updated immediately after transition
249    fn basic() {
250        let (tx, mut rx) = queue::queue(false);
251
252        tx.append(SamplesBuffer::new(1, 48000, vec![10.0, -10.0, 10.0, -10.0]));
253        tx.append(SamplesBuffer::new(2, 96000, vec![5.0, 5.0, 5.0, 5.0]));
254
255        assert_eq!(rx.channels(), 1);
256        assert_eq!(rx.sample_rate(), 48000);
257        assert_eq!(rx.next(), Some(10.0));
258        assert_eq!(rx.next(), Some(-10.0));
259        assert_eq!(rx.next(), Some(10.0));
260        assert_eq!(rx.next(), Some(-10.0));
261        assert_eq!(rx.channels(), 2);
262        assert_eq!(rx.sample_rate(), 96000);
263        assert_eq!(rx.next(), Some(5.0));
264        assert_eq!(rx.next(), Some(5.0));
265        assert_eq!(rx.next(), Some(5.0));
266        assert_eq!(rx.next(), Some(5.0));
267        assert_eq!(rx.next(), None);
268    }
269
270    #[test]
271    fn immediate_end() {
272        let (_, mut rx) = queue::queue(false);
273        assert_eq!(rx.next(), None);
274    }
275
276    #[test]
277    fn keep_alive() {
278        let (tx, mut rx) = queue::queue(true);
279        tx.append(SamplesBuffer::new(1, 48000, vec![10.0, -10.0, 10.0, -10.0]));
280
281        assert_eq!(rx.next(), Some(10.0));
282        assert_eq!(rx.next(), Some(-10.0));
283        assert_eq!(rx.next(), Some(10.0));
284        assert_eq!(rx.next(), Some(-10.0));
285
286        for _ in 0..100000 {
287            assert_eq!(rx.next(), Some(0.0));
288        }
289    }
290
291    #[test]
292    #[ignore] // TODO: not yet implemented
293    fn no_delay_when_added() {
294        let (tx, mut rx) = queue::queue(true);
295
296        for _ in 0..500 {
297            assert_eq!(rx.next(), Some(0.0));
298        }
299
300        tx.append(SamplesBuffer::new(1, 48000, vec![10.0, -10.0, 10.0, -10.0]));
301        assert_eq!(rx.next(), Some(10.0));
302        assert_eq!(rx.next(), Some(-10.0));
303        assert_eq!(rx.next(), Some(10.0));
304        assert_eq!(rx.next(), Some(-10.0));
305    }
306}