rodio/
sink.rs

1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5#[cfg(feature = "crossbeam-channel")]
6use crossbeam_channel::{Receiver, Sender};
7use dasp_sample::FromSample;
8#[cfg(not(feature = "crossbeam-channel"))]
9use std::sync::mpsc::{Receiver, Sender};
10
11use crate::mixer::Mixer;
12use crate::source::SeekError;
13use crate::{queue, source::Done, Source};
14
15/// Handle to a device that outputs sounds.
16///
17/// Dropping the `Sink` stops all its sounds. You can use `detach` if you want the sounds to continue
18/// playing.
19pub struct Sink {
20    queue_tx: Arc<queue::SourcesQueueInput>,
21    sleep_until_end: Mutex<Option<Receiver<()>>>,
22
23    controls: Arc<Controls>,
24    sound_count: Arc<AtomicUsize>,
25
26    detached: bool,
27}
28
29struct SeekOrder {
30    pos: Duration,
31    feedback: Sender<Result<(), SeekError>>,
32}
33
34impl SeekOrder {
35    fn new(pos: Duration) -> (Self, Receiver<Result<(), SeekError>>) {
36        #[cfg(not(feature = "crossbeam-channel"))]
37        let (tx, rx) = {
38            use std::sync::mpsc;
39            mpsc::channel()
40        };
41
42        #[cfg(feature = "crossbeam-channel")]
43        let (tx, rx) = {
44            use crossbeam_channel::bounded;
45            bounded(1)
46        };
47        (Self { pos, feedback: tx }, rx)
48    }
49
50    fn attempt<S>(self, maybe_seekable: &mut S)
51    where
52        S: Source,
53    {
54        let res = maybe_seekable.try_seek(self.pos);
55        let _ignore_receiver_dropped = self.feedback.send(res);
56    }
57}
58
59struct Controls {
60    pause: AtomicBool,
61    volume: Mutex<f32>,
62    stopped: AtomicBool,
63    speed: Mutex<f32>,
64    to_clear: Mutex<u32>,
65    seek: Mutex<Option<SeekOrder>>,
66    position: Mutex<Duration>,
67}
68
69impl Sink {
70    /// Builds a new `Sink`, beginning playback on a stream.
71    #[inline]
72    pub fn connect_new(mixer: &Mixer) -> Sink {
73        let (sink, source) = Sink::new();
74        mixer.add(source);
75        sink
76    }
77
78    /// Builds a new `Sink`.
79    #[inline]
80    pub fn new() -> (Sink, queue::SourcesQueueOutput) {
81        let (queue_tx, queue_rx) = queue::queue(true);
82
83        let sink = Sink {
84            queue_tx,
85            sleep_until_end: Mutex::new(None),
86            controls: Arc::new(Controls {
87                pause: AtomicBool::new(false),
88                volume: Mutex::new(1.0),
89                stopped: AtomicBool::new(false),
90                speed: Mutex::new(1.0),
91                to_clear: Mutex::new(0),
92                seek: Mutex::new(None),
93                position: Mutex::new(Duration::ZERO),
94            }),
95            sound_count: Arc::new(AtomicUsize::new(0)),
96            detached: false,
97        };
98        (sink, queue_rx)
99    }
100
101    /// Appends a sound to the queue of sounds to play.
102    #[inline]
103    pub fn append<S>(&self, source: S)
104    where
105        S: Source + Send + 'static,
106        f32: FromSample<S::Item>,
107    {
108        // Wait for the queue to flush then resume stopped playback
109        if self.controls.stopped.load(Ordering::SeqCst) {
110            if self.sound_count.load(Ordering::SeqCst) > 0 {
111                self.sleep_until_end();
112            }
113            self.controls.stopped.store(false, Ordering::SeqCst);
114        }
115
116        let controls = self.controls.clone();
117
118        let start_played = AtomicBool::new(false);
119
120        let source = source
121            .speed(1.0)
122            // Must be placed before pausable but after speed & delay
123            .track_position()
124            .pausable(false)
125            .amplify(1.0)
126            .skippable()
127            .stoppable()
128            // If you change the duration update the docs for try_seek!
129            .periodic_access(Duration::from_millis(5), move |src| {
130                if controls.stopped.load(Ordering::SeqCst) {
131                    src.stop();
132                    *controls.position.lock().unwrap() = Duration::ZERO;
133                }
134                {
135                    let mut to_clear = controls.to_clear.lock().unwrap();
136                    if *to_clear > 0 {
137                        src.inner_mut().skip();
138                        *to_clear -= 1;
139                        *controls.position.lock().unwrap() = Duration::ZERO;
140                    } else {
141                        *controls.position.lock().unwrap() = src.inner().inner().inner().inner().get_pos();
142                    }
143                }
144                let amp = src.inner_mut().inner_mut();
145                amp.set_factor(*controls.volume.lock().unwrap());
146                amp.inner_mut()
147                    .set_paused(controls.pause.load(Ordering::SeqCst));
148                amp.inner_mut()
149                    .inner_mut()
150                    .inner_mut()
151                    .set_factor(*controls.speed.lock().unwrap());
152                if let Some(seek) = controls.seek.lock().unwrap().take() {
153                    seek.attempt(amp)
154                }
155                start_played.store(true, Ordering::SeqCst);
156            });
157        self.sound_count.fetch_add(1, Ordering::Relaxed);
158        let source = Done::new(source, self.sound_count.clone());
159        *self.sleep_until_end.lock().unwrap() = Some(self.queue_tx.append_with_signal(source));
160    }
161
162    /// Gets the volume of the sound.
163    ///
164    /// The value `1.0` is the "normal" volume (unfiltered input). Any value other than 1.0 will
165    /// multiply each sample by this value.
166    #[inline]
167    pub fn volume(&self) -> f32 {
168        *self.controls.volume.lock().unwrap()
169    }
170
171    /// Changes the volume of the sound.
172    ///
173    /// The value `1.0` is the "normal" volume (unfiltered input). Any value other than `1.0` will
174    /// multiply each sample by this value.
175    #[inline]
176    pub fn set_volume(&self, value: f32) {
177        *self.controls.volume.lock().unwrap() = value;
178    }
179
180    /// Gets the speed of the sound.
181    ///
182    /// See [`Sink::set_speed`] for details on what *speed* means.
183    #[inline]
184    pub fn speed(&self) -> f32 {
185        *self.controls.speed.lock().unwrap()
186    }
187
188    /// Changes the play speed of the sound. Does not adjust the samples, only the playback speed.
189    ///
190    /// # Note:
191    /// 1. **Increasing the speed will increase the pitch by the same factor**
192    /// - If you set the speed to 0.5 this will halve the frequency of the sound
193    ///   lowering its pitch.
194    /// - If you set the speed to 2 the frequency will double raising the
195    ///   pitch of the sound.
196    /// 2. **Change in the speed affect the total duration inversely**
197    /// - If you set the speed to 0.5, the total duration will be twice as long.
198    /// - If you set the speed to 2 the total duration will be halve of what it
199    ///   was.
200    ///
201    #[inline]
202    pub fn set_speed(&self, value: f32) {
203        *self.controls.speed.lock().unwrap() = value;
204    }
205
206    /// Resumes playback of a paused sink.
207    ///
208    /// No effect if not paused.
209    #[inline]
210    pub fn play(&self) {
211        self.controls.pause.store(false, Ordering::SeqCst);
212    }
213
214    // There is no `can_seek()` method as it is impossible to use correctly. Between
215    // checking if a source supports seeking and actually seeking the sink can
216    // switch to a new source.
217
218    /// Attempts to seek to a given position in the current source.
219    ///
220    /// This blocks between 0 and ~5 milliseconds.
221    ///
222    /// As long as the duration of the source is known, seek is guaranteed to saturate
223    /// at the end of the source. For example given a source that reports a total duration
224    /// of 42 seconds calling `try_seek()` with 60 seconds as argument will seek to
225    /// 42 seconds.
226    ///
227    /// # Errors
228    /// This function will return [`SeekError::NotSupported`] if one of the underlying
229    /// sources does not support seeking.
230    ///
231    /// It will return an error if an implementation ran
232    /// into one during the seek.
233    ///
234    /// When seeking beyond the end of a source this
235    /// function might return an error if the duration of the source is not known.
236    pub fn try_seek(&self, pos: Duration) -> Result<(), SeekError> {
237        let (order, feedback) = SeekOrder::new(pos);
238        *self.controls.seek.lock().unwrap() = Some(order);
239
240        if self.sound_count.load(Ordering::Acquire) == 0 {
241            // No sound is playing, seek will not be performed
242            return Ok(());
243        }
244
245        match feedback.recv() {
246            Ok(seek_res) => {
247                *self.controls.position.lock().unwrap() = pos;
248                seek_res
249            }
250            // The feedback channel closed. Probably another SeekOrder was set
251            // invalidating this one and closing the feedback channel
252            // ... or the audio thread panicked.
253            Err(_) => Ok(()),
254        }
255    }
256
257    /// Pauses playback of this sink.
258    ///
259    /// No effect if already paused.
260    ///
261    /// A paused sink can be resumed with `play()`.
262    pub fn pause(&self) {
263        self.controls.pause.store(true, Ordering::SeqCst);
264    }
265
266    /// Gets if a sink is paused
267    ///
268    /// Sinks can be paused and resumed using `pause()` and `play()`. This returns `true` if the
269    /// sink is paused.
270    pub fn is_paused(&self) -> bool {
271        self.controls.pause.load(Ordering::SeqCst)
272    }
273
274    /// Removes all currently loaded `Source`s from the `Sink`, and pauses it.
275    ///
276    /// See `pause()` for information about pausing a `Sink`.
277    pub fn clear(&self) {
278        let len = self.sound_count.load(Ordering::SeqCst) as u32;
279        *self.controls.to_clear.lock().unwrap() = len;
280        self.sleep_until_end();
281        self.pause();
282    }
283
284    /// Skips to the next `Source` in the `Sink`
285    ///
286    /// If there are more `Source`s appended to the `Sink` at the time,
287    /// it will play the next one. Otherwise, the `Sink` will finish as if
288    /// it had finished playing a `Source` all the way through.
289    pub fn skip_one(&self) {
290        let len = self.sound_count.load(Ordering::SeqCst) as u32;
291        let mut to_clear = self.controls.to_clear.lock().unwrap();
292        if len > *to_clear {
293            *to_clear += 1;
294        }
295    }
296
297    /// Stops the sink by emptying the queue.
298    #[inline]
299    pub fn stop(&self) {
300        self.controls.stopped.store(true, Ordering::SeqCst);
301    }
302
303    /// Destroys the sink without stopping the sounds that are still playing.
304    #[inline]
305    pub fn detach(mut self) {
306        self.detached = true;
307    }
308
309    /// Sleeps the current thread until the sound ends.
310    #[inline]
311    pub fn sleep_until_end(&self) {
312        if let Some(sleep_until_end) = self.sleep_until_end.lock().unwrap().take() {
313            let _ = sleep_until_end.recv();
314        }
315    }
316
317    /// Returns true if this sink has no more sounds to play.
318    #[inline]
319    pub fn empty(&self) -> bool {
320        self.len() == 0
321    }
322
323    /// Returns the number of sounds currently in the queue.
324    #[allow(clippy::len_without_is_empty)]
325    #[inline]
326    pub fn len(&self) -> usize {
327        self.sound_count.load(Ordering::Relaxed)
328    }
329
330    /// Returns the position of the sound that's being played.
331    ///
332    /// This takes into account any speedup or delay applied.
333    ///
334    /// Example: if you apply a speedup of *2* to an mp3 decoder source and
335    /// [`get_pos()`](Sink::get_pos) returns *5s* then the position in the mp3
336    /// recording is *10s* from its start.
337    #[inline]
338    pub fn get_pos(&self) -> Duration {
339        *self.controls.position.lock().unwrap()
340    }
341}
342
343impl Drop for Sink {
344    #[inline]
345    fn drop(&mut self) {
346        self.queue_tx.set_keep_alive_if_empty(false);
347
348        if !self.detached {
349            self.controls.stopped.store(true, Ordering::Relaxed);
350        }
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use std::sync::atomic::Ordering;
357
358    use crate::buffer::SamplesBuffer;
359    use crate::{Sink, Source};
360
361    #[test]
362    fn test_pause_and_stop() {
363        let (sink, mut source) = Sink::new();
364
365        assert_eq!(source.next(), Some(0.0));
366        // TODO (review) How did this test passed before? I might have broken something but
367        //      silence source should come first as next source is only polled while previous ends.
368        //      Respective test in Queue seem to be ignored (see queue::test::no_delay_when_added()
369        //      at src/queue.rs:293).
370        let mut source = source.skip_while(|x| *x == 0.0);
371
372        let v = vec![10.0, -10.0, 20.0, -20.0, 30.0, -30.0];
373
374        // Low rate to ensure immediate control.
375        sink.append(SamplesBuffer::new(1, 1, v.clone()));
376        let mut reference_src = SamplesBuffer::new(1, 1, v);
377
378        assert_eq!(source.next(), reference_src.next());
379        assert_eq!(source.next(), reference_src.next());
380
381        sink.pause();
382
383        assert_eq!(source.next(), Some(0.0));
384
385        sink.play();
386
387        assert_eq!(source.next(), reference_src.next());
388        assert_eq!(source.next(), reference_src.next());
389
390        sink.stop();
391
392        assert_eq!(source.next(), Some(0.0));
393
394        assert!(sink.empty());
395    }
396
397    #[test]
398    fn test_stop_and_start() {
399        let (sink, mut queue_rx) = Sink::new();
400
401        let v = vec![10.0, -10.0, 20.0, -20.0, 30.0, -30.0];
402
403        sink.append(SamplesBuffer::new(1, 1, v.clone()));
404        let mut src = SamplesBuffer::new(1, 1, v.clone());
405
406        assert_eq!(queue_rx.next(), src.next());
407        assert_eq!(queue_rx.next(), src.next());
408
409        sink.stop();
410
411        assert!(sink.controls.stopped.load(Ordering::SeqCst));
412        assert_eq!(queue_rx.next(), Some(0.0));
413
414        src = SamplesBuffer::new(1, 1, v.clone());
415        sink.append(SamplesBuffer::new(1, 1, v));
416
417        assert!(!sink.controls.stopped.load(Ordering::SeqCst));
418        // Flush silence
419        let mut queue_rx = queue_rx.skip_while(|v| *v == 0.0);
420
421        assert_eq!(queue_rx.next(), src.next());
422        assert_eq!(queue_rx.next(), src.next());
423    }
424
425    #[test]
426    fn test_volume() {
427        let (sink, mut queue_rx) = Sink::new();
428
429        let v = vec![10.0, -10.0, 20.0, -20.0, 30.0, -30.0];
430
431        // High rate to avoid immediate control.
432        sink.append(SamplesBuffer::new(2, 44100, v.clone()));
433        let src = SamplesBuffer::new(2, 44100, v.clone());
434
435        let mut src = src.amplify(0.5);
436        sink.set_volume(0.5);
437
438        for _ in 0..v.len() {
439            assert_eq!(queue_rx.next(), src.next());
440        }
441    }
442}