Skip to main content

rodio/
player.rs

1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5#[cfg(feature = "crossbeam-channel")]
6use crossbeam_channel::{Receiver, Sender};
7use dasp_sample::FromSample;
8#[cfg(not(feature = "crossbeam-channel"))]
9use std::sync::mpsc::{Receiver, Sender};
10
11use crate::mixer::Mixer;
12use crate::source::SeekError;
13use crate::Float;
14use crate::{queue, source::Done, Source};
15
16/// Handle to a device that outputs sounds.
17///
18/// Dropping the `Player` stops all its sounds. You can use `detach` if you want the sounds to continue
19/// playing.
20pub struct Player {
21    queue_tx: Arc<queue::SourcesQueueInput>,
22    sleep_until_end: Mutex<Option<Receiver<()>>>,
23
24    controls: Arc<Controls>,
25    sound_count: Arc<AtomicUsize>,
26
27    detached: bool,
28}
29
30struct SeekOrder {
31    pos: Duration,
32    feedback: Sender<Result<(), SeekError>>,
33}
34
35impl SeekOrder {
36    fn new(pos: Duration) -> (Self, Receiver<Result<(), SeekError>>) {
37        #[cfg(not(feature = "crossbeam-channel"))]
38        let (tx, rx) = {
39            use std::sync::mpsc;
40            mpsc::channel()
41        };
42
43        #[cfg(feature = "crossbeam-channel")]
44        let (tx, rx) = {
45            use crossbeam_channel::bounded;
46            bounded(1)
47        };
48        (Self { pos, feedback: tx }, rx)
49    }
50
51    fn attempt<S>(self, maybe_seekable: &mut S)
52    where
53        S: Source,
54    {
55        let res = maybe_seekable.try_seek(self.pos);
56        let _ignore_receiver_dropped = self.feedback.send(res);
57    }
58}
59
60struct Controls {
61    pause: AtomicBool,
62    volume: Mutex<Float>,
63    stopped: AtomicBool,
64    speed: Mutex<f32>,
65    to_clear: Mutex<u32>,
66    seek: Mutex<Option<SeekOrder>>,
67    position: Mutex<Duration>,
68}
69
70impl Player {
71    /// Builds a new `Player`, beginning playback on a stream.
72    #[inline]
73    pub fn connect_new(mixer: &Mixer) -> Player {
74        let (sink, source) = Player::new();
75        mixer.add(source);
76        sink
77    }
78
79    /// Builds a new `Player`.
80    #[inline]
81    pub fn new() -> (Player, queue::SourcesQueueOutput) {
82        let (queue_tx, queue_rx) = queue::queue(true);
83
84        let sink = Player {
85            queue_tx,
86            sleep_until_end: Mutex::new(None),
87            controls: Arc::new(Controls {
88                pause: AtomicBool::new(false),
89                volume: Mutex::new(1.0),
90                stopped: AtomicBool::new(false),
91                speed: Mutex::new(1.0),
92                to_clear: Mutex::new(0),
93                seek: Mutex::new(None),
94                position: Mutex::new(Duration::ZERO),
95            }),
96            sound_count: Arc::new(AtomicUsize::new(0)),
97            detached: false,
98        };
99        (sink, queue_rx)
100    }
101
102    /// Appends a sound to the queue of sounds to play.
103    #[inline]
104    pub fn append<S>(&self, source: S)
105    where
106        S: Source + Send + 'static,
107        f32: FromSample<S::Item>,
108    {
109        // Wait for the queue to flush then resume stopped playback
110        if self.controls.stopped.load(Ordering::SeqCst) {
111            if self.sound_count.load(Ordering::SeqCst) > 0 {
112                self.sleep_until_end();
113            }
114            self.controls.stopped.store(false, Ordering::SeqCst);
115        }
116
117        let controls = self.controls.clone();
118
119        let start_played = AtomicBool::new(false);
120
121        let source = source
122            .speed(1.0)
123            // Must be placed before pausable but after speed & delay
124            .track_position()
125            .pausable(false)
126            .amplify(1.0)
127            .skippable()
128            .stoppable()
129            // If you change the duration update the docs for try_seek!
130            .periodic_access(Duration::from_millis(5), move |src| {
131                if controls.stopped.load(Ordering::SeqCst) {
132                    src.stop();
133                    *controls.position.lock().unwrap() = Duration::ZERO;
134                }
135                {
136                    let mut to_clear = controls.to_clear.lock().unwrap();
137                    if *to_clear > 0 {
138                        src.inner_mut().skip();
139                        *to_clear -= 1;
140                        *controls.position.lock().unwrap() = Duration::ZERO;
141                    } else {
142                        *controls.position.lock().unwrap() =
143                            src.inner().inner().inner().inner().get_pos();
144                    }
145                }
146                let amp = src.inner_mut().inner_mut();
147                amp.set_factor(*controls.volume.lock().unwrap());
148                amp.inner_mut()
149                    .set_paused(controls.pause.load(Ordering::SeqCst));
150                amp.inner_mut()
151                    .inner_mut()
152                    .inner_mut()
153                    .set_factor(*controls.speed.lock().unwrap());
154                if let Some(seek) = controls.seek.lock().unwrap().take() {
155                    seek.attempt(amp)
156                }
157                start_played.store(true, Ordering::SeqCst);
158            });
159        self.sound_count.fetch_add(1, Ordering::Relaxed);
160        let source = Done::new(source, self.sound_count.clone());
161        *self.sleep_until_end.lock().unwrap() = Some(self.queue_tx.append_with_signal(source));
162    }
163
164    /// Gets the volume of the sound.
165    ///
166    /// The value `1.0` is the "normal" volume (unfiltered input). Any value other than 1.0 will
167    /// multiply each sample by this value.
168    #[inline]
169    pub fn volume(&self) -> Float {
170        *self.controls.volume.lock().unwrap()
171    }
172
173    /// Changes the volume of the sound.
174    ///
175    /// The value `1.0` is the "normal" volume (unfiltered input). Any value other than `1.0` will
176    /// multiply each sample by this value.
177    #[inline]
178    pub fn set_volume(&self, value: Float) {
179        *self.controls.volume.lock().unwrap() = value;
180    }
181
182    /// Gets the speed of the sound.
183    ///
184    /// See [`Player::set_speed`] for details on what *speed* means.
185    #[inline]
186    pub fn speed(&self) -> f32 {
187        *self.controls.speed.lock().unwrap()
188    }
189
190    /// Changes the play speed of the sound. Does not adjust the samples, only the playback speed.
191    ///
192    /// # Note:
193    /// 1. **Increasing the speed will increase the pitch by the same factor**
194    /// - If you set the speed to 0.5 this will halve the frequency of the sound
195    ///   lowering its pitch.
196    /// - If you set the speed to 2 the frequency will double raising the
197    ///   pitch of the sound.
198    /// 2. **Change in the speed affect the total duration inversely**
199    /// - If you set the speed to 0.5, the total duration will be twice as long.
200    /// - If you set the speed to 2 the total duration will be halve of what it
201    ///   was.
202    ///
203    #[inline]
204    pub fn set_speed(&self, value: f32) {
205        *self.controls.speed.lock().unwrap() = value;
206    }
207
208    /// Resumes playback of a paused player.
209    ///
210    /// No effect if not paused.
211    #[inline]
212    pub fn play(&self) {
213        self.controls.pause.store(false, Ordering::SeqCst);
214    }
215
216    // There is no `can_seek()` method as it is impossible to use correctly. Between
217    // checking if a source supports seeking and actually seeking the sink can
218    // switch to a new source.
219
220    /// Attempts to seek to a given position in the current source.
221    ///
222    /// This blocks between 0 and ~5 milliseconds.
223    ///
224    /// As long as the duration of the source is known, seek is guaranteed to saturate
225    /// at the end of the source. For example given a source that reports a total duration
226    /// of 42 seconds calling `try_seek()` with 60 seconds as argument will seek to
227    /// 42 seconds.
228    ///
229    /// # Errors
230    /// This function will return [`SeekError::NotSupported`] if one of the underlying
231    /// sources does not support seeking.
232    ///
233    /// It will return an error if an implementation ran
234    /// into one during the seek.
235    ///
236    /// When seeking beyond the end of a source this
237    /// function might return an error if the duration of the source is not known.
238    pub fn try_seek(&self, pos: Duration) -> Result<(), SeekError> {
239        let (order, feedback) = SeekOrder::new(pos);
240        *self.controls.seek.lock().unwrap() = Some(order);
241
242        if self.sound_count.load(Ordering::Acquire) == 0 {
243            // No sound is playing, seek will not be performed
244            return Ok(());
245        }
246
247        match feedback.recv() {
248            Ok(seek_res) => {
249                *self.controls.position.lock().unwrap() = pos;
250                seek_res
251            }
252            // The feedback channel closed. Probably another SeekOrder was set
253            // invalidating this one and closing the feedback channel
254            // ... or the audio thread panicked.
255            Err(_) => Ok(()),
256        }
257    }
258
259    /// Pauses playback of this player.
260    ///
261    /// No effect if already paused.
262    ///
263    /// A paused sink can be resumed with `play()`.
264    pub fn pause(&self) {
265        self.controls.pause.store(true, Ordering::SeqCst);
266    }
267
268    /// Gets if a sink is paused
269    ///
270    /// Players can be paused and resumed using `pause()` and `play()`. This returns `true` if the
271    /// sink is paused.
272    pub fn is_paused(&self) -> bool {
273        self.controls.pause.load(Ordering::SeqCst)
274    }
275
276    /// Removes all currently loaded `Source`s from the `Player`, and pauses it.
277    ///
278    /// See `pause()` for information about pausing a `Player`.
279    pub fn clear(&self) {
280        let len = self.sound_count.load(Ordering::SeqCst) as u32;
281        *self.controls.to_clear.lock().unwrap() = len;
282        self.sleep_until_end();
283        self.pause();
284    }
285
286    /// Skips to the next `Source` in the `Player`
287    ///
288    /// If there are more `Source`s appended to the `Player` at the time,
289    /// it will play the next one. Otherwise, the `Player` will finish as if
290    /// it had finished playing a `Source` all the way through.
291    pub fn skip_one(&self) {
292        let len = self.sound_count.load(Ordering::SeqCst) as u32;
293        let mut to_clear = self.controls.to_clear.lock().unwrap();
294        if len > *to_clear {
295            *to_clear += 1;
296        }
297    }
298
299    /// Stops the sink by emptying the queue.
300    #[inline]
301    pub fn stop(&self) {
302        self.controls.stopped.store(true, Ordering::SeqCst);
303    }
304
305    /// Destroys the sink without stopping the sounds that are still playing.
306    #[inline]
307    pub fn detach(mut self) {
308        self.detached = true;
309    }
310
311    /// Sleeps the current thread until the sound ends.
312    #[inline]
313    pub fn sleep_until_end(&self) {
314        if let Some(sleep_until_end) = self.sleep_until_end.lock().unwrap().take() {
315            let _ = sleep_until_end.recv();
316        }
317    }
318
319    /// Returns true if this sink has no more sounds to play.
320    #[inline]
321    pub fn empty(&self) -> bool {
322        self.len() == 0
323    }
324
325    /// Returns the number of sounds currently in the queue.
326    #[allow(clippy::len_without_is_empty)]
327    #[inline]
328    pub fn len(&self) -> usize {
329        self.sound_count.load(Ordering::Relaxed)
330    }
331
332    /// Returns the position of the sound that's being played.
333    ///
334    /// This takes into account any speedup or delay applied.
335    ///
336    /// Example: if you apply a speedup of *2* to an mp3 decoder source and
337    /// [`get_pos()`](Player::get_pos) returns *5s* then the position in the mp3
338    /// recording is *10s* from its start.
339    #[inline]
340    pub fn get_pos(&self) -> Duration {
341        *self.controls.position.lock().unwrap()
342    }
343}
344
345impl Drop for Player {
346    #[inline]
347    fn drop(&mut self) {
348        self.queue_tx.set_keep_alive_if_empty(false);
349
350        if !self.detached {
351            self.controls.stopped.store(true, Ordering::Relaxed);
352        }
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use std::sync::atomic::Ordering;
359
360    use crate::buffer::SamplesBuffer;
361    use crate::math::nz;
362    use crate::{Player, Source};
363
364    #[test]
365    fn test_pause_and_stop() {
366        let (player, mut source) = Player::new();
367
368        assert_eq!(source.next(), Some(0.0));
369        // TODO (review) How did this test passed before? I might have broken something but
370        //      silence source should come first as next source is only polled while previous ends.
371        //      Respective test in Queue seem to be ignored (see queue::test::no_delay_when_added()
372        //      at src/queue.rs:293).
373        let mut source = source.skip_while(|x| *x == 0.0);
374
375        let v = vec![10.0, -10.0, 20.0, -20.0, 30.0, -30.0];
376
377        // Low rate to ensure immediate control.
378        player.append(SamplesBuffer::new(nz!(1), nz!(1), v.clone()));
379        let mut reference_src = SamplesBuffer::new(nz!(1), nz!(1), v);
380
381        assert_eq!(source.next(), reference_src.next());
382        assert_eq!(source.next(), reference_src.next());
383
384        player.pause();
385
386        assert_eq!(source.next(), Some(0.0));
387
388        player.play();
389
390        assert_eq!(source.next(), reference_src.next());
391        assert_eq!(source.next(), reference_src.next());
392
393        player.stop();
394
395        assert_eq!(source.next(), Some(0.0));
396
397        assert!(player.empty());
398    }
399
400    #[test]
401    fn test_stop_and_start() {
402        let (player, mut queue_rx) = Player::new();
403
404        let v = vec![10.0, -10.0, 20.0, -20.0, 30.0, -30.0];
405
406        player.append(SamplesBuffer::new(nz!(1), nz!(1), v.clone()));
407        let mut src = SamplesBuffer::new(nz!(1), nz!(1), v.clone());
408
409        assert_eq!(queue_rx.next(), src.next());
410        assert_eq!(queue_rx.next(), src.next());
411
412        player.stop();
413
414        assert!(player.controls.stopped.load(Ordering::SeqCst));
415        assert_eq!(queue_rx.next(), Some(0.0));
416
417        src = SamplesBuffer::new(nz!(1), nz!(1), v.clone());
418        player.append(SamplesBuffer::new(nz!(1), nz!(1), v));
419
420        assert!(!player.controls.stopped.load(Ordering::SeqCst));
421        // Flush silence
422        let mut queue_rx = queue_rx.skip_while(|v| *v == 0.0);
423
424        assert_eq!(queue_rx.next(), src.next());
425        assert_eq!(queue_rx.next(), src.next());
426    }
427
428    #[test]
429    fn test_volume() {
430        let (player, mut queue_rx) = Player::new();
431
432        let v = vec![10.0, -10.0, 20.0, -20.0, 30.0, -30.0];
433
434        // High rate to avoid immediate control.
435        player.append(SamplesBuffer::new(nz!(2), nz!(44100), v.clone()));
436        let src = SamplesBuffer::new(nz!(2), nz!(44100), v.clone());
437
438        let mut src = src.amplify(0.5);
439        player.set_volume(0.5);
440
441        for _ in 0..v.len() {
442            assert_eq!(queue_rx.next(), src.next());
443        }
444    }
445}