1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5#[cfg(feature = "crossbeam-channel")]
6use crossbeam_channel::{Receiver, Sender};
7use dasp_sample::FromSample;
8#[cfg(not(feature = "crossbeam-channel"))]
9use std::sync::mpsc::{Receiver, Sender};
10
11use crate::mixer::Mixer;
12use crate::source::SeekError;
13use crate::Float;
14use crate::{queue, source::Done, Source};
15
16pub struct Player {
21 queue_tx: Arc<queue::SourcesQueueInput>,
22 sleep_until_end: Mutex<Option<Receiver<()>>>,
23
24 controls: Arc<Controls>,
25 sound_count: Arc<AtomicUsize>,
26
27 detached: bool,
28}
29
30struct SeekOrder {
31 pos: Duration,
32 feedback: Sender<Result<(), SeekError>>,
33}
34
35impl SeekOrder {
36 fn new(pos: Duration) -> (Self, Receiver<Result<(), SeekError>>) {
37 #[cfg(not(feature = "crossbeam-channel"))]
38 let (tx, rx) = {
39 use std::sync::mpsc;
40 mpsc::channel()
41 };
42
43 #[cfg(feature = "crossbeam-channel")]
44 let (tx, rx) = {
45 use crossbeam_channel::bounded;
46 bounded(1)
47 };
48 (Self { pos, feedback: tx }, rx)
49 }
50
51 fn attempt<S>(self, maybe_seekable: &mut S)
52 where
53 S: Source,
54 {
55 let res = maybe_seekable.try_seek(self.pos);
56 let _ignore_receiver_dropped = self.feedback.send(res);
57 }
58}
59
60struct Controls {
61 pause: AtomicBool,
62 volume: Mutex<Float>,
63 stopped: AtomicBool,
64 speed: Mutex<f32>,
65 to_clear: Mutex<u32>,
66 seek: Mutex<Option<SeekOrder>>,
67 position: Mutex<Duration>,
68}
69
70impl Player {
71 #[inline]
73 pub fn connect_new(mixer: &Mixer) -> Player {
74 let (sink, source) = Player::new();
75 mixer.add(source);
76 sink
77 }
78
79 #[inline]
81 pub fn new() -> (Player, queue::SourcesQueueOutput) {
82 let (queue_tx, queue_rx) = queue::queue(true);
83
84 let sink = Player {
85 queue_tx,
86 sleep_until_end: Mutex::new(None),
87 controls: Arc::new(Controls {
88 pause: AtomicBool::new(false),
89 volume: Mutex::new(1.0),
90 stopped: AtomicBool::new(false),
91 speed: Mutex::new(1.0),
92 to_clear: Mutex::new(0),
93 seek: Mutex::new(None),
94 position: Mutex::new(Duration::ZERO),
95 }),
96 sound_count: Arc::new(AtomicUsize::new(0)),
97 detached: false,
98 };
99 (sink, queue_rx)
100 }
101
102 #[inline]
104 pub fn append<S>(&self, source: S)
105 where
106 S: Source + Send + 'static,
107 f32: FromSample<S::Item>,
108 {
109 if self.controls.stopped.load(Ordering::SeqCst) {
111 if self.sound_count.load(Ordering::SeqCst) > 0 {
112 self.sleep_until_end();
113 }
114 self.controls.stopped.store(false, Ordering::SeqCst);
115 }
116
117 let controls = self.controls.clone();
118
119 let start_played = AtomicBool::new(false);
120
121 let source = source
122 .speed(1.0)
123 .track_position()
125 .pausable(false)
126 .amplify(1.0)
127 .skippable()
128 .stoppable()
129 .periodic_access(Duration::from_millis(5), move |src| {
131 if controls.stopped.load(Ordering::SeqCst) {
132 src.stop();
133 *controls.position.lock().unwrap() = Duration::ZERO;
134 }
135 {
136 let mut to_clear = controls.to_clear.lock().unwrap();
137 if *to_clear > 0 {
138 src.inner_mut().skip();
139 *to_clear -= 1;
140 *controls.position.lock().unwrap() = Duration::ZERO;
141 } else {
142 *controls.position.lock().unwrap() =
143 src.inner().inner().inner().inner().get_pos();
144 }
145 }
146 let amp = src.inner_mut().inner_mut();
147 amp.set_factor(*controls.volume.lock().unwrap());
148 amp.inner_mut()
149 .set_paused(controls.pause.load(Ordering::SeqCst));
150 amp.inner_mut()
151 .inner_mut()
152 .inner_mut()
153 .set_factor(*controls.speed.lock().unwrap());
154 if let Some(seek) = controls.seek.lock().unwrap().take() {
155 seek.attempt(amp)
156 }
157 start_played.store(true, Ordering::SeqCst);
158 });
159 self.sound_count.fetch_add(1, Ordering::Relaxed);
160 let source = Done::new(source, self.sound_count.clone());
161 *self.sleep_until_end.lock().unwrap() = Some(self.queue_tx.append_with_signal(source));
162 }
163
164 #[inline]
169 pub fn volume(&self) -> Float {
170 *self.controls.volume.lock().unwrap()
171 }
172
173 #[inline]
178 pub fn set_volume(&self, value: Float) {
179 *self.controls.volume.lock().unwrap() = value;
180 }
181
182 #[inline]
186 pub fn speed(&self) -> f32 {
187 *self.controls.speed.lock().unwrap()
188 }
189
190 #[inline]
204 pub fn set_speed(&self, value: f32) {
205 *self.controls.speed.lock().unwrap() = value;
206 }
207
208 #[inline]
212 pub fn play(&self) {
213 self.controls.pause.store(false, Ordering::SeqCst);
214 }
215
216 pub fn try_seek(&self, pos: Duration) -> Result<(), SeekError> {
239 let (order, feedback) = SeekOrder::new(pos);
240 *self.controls.seek.lock().unwrap() = Some(order);
241
242 if self.sound_count.load(Ordering::Acquire) == 0 {
243 return Ok(());
245 }
246
247 match feedback.recv() {
248 Ok(seek_res) => {
249 *self.controls.position.lock().unwrap() = pos;
250 seek_res
251 }
252 Err(_) => Ok(()),
256 }
257 }
258
259 pub fn pause(&self) {
265 self.controls.pause.store(true, Ordering::SeqCst);
266 }
267
268 pub fn is_paused(&self) -> bool {
273 self.controls.pause.load(Ordering::SeqCst)
274 }
275
276 pub fn clear(&self) {
280 let len = self.sound_count.load(Ordering::SeqCst) as u32;
281 *self.controls.to_clear.lock().unwrap() = len;
282 self.sleep_until_end();
283 self.pause();
284 }
285
286 pub fn skip_one(&self) {
292 let len = self.sound_count.load(Ordering::SeqCst) as u32;
293 let mut to_clear = self.controls.to_clear.lock().unwrap();
294 if len > *to_clear {
295 *to_clear += 1;
296 }
297 }
298
299 #[inline]
301 pub fn stop(&self) {
302 self.controls.stopped.store(true, Ordering::SeqCst);
303 }
304
305 #[inline]
307 pub fn detach(mut self) {
308 self.detached = true;
309 }
310
311 #[inline]
313 pub fn sleep_until_end(&self) {
314 if let Some(sleep_until_end) = self.sleep_until_end.lock().unwrap().take() {
315 let _ = sleep_until_end.recv();
316 }
317 }
318
319 #[inline]
321 pub fn empty(&self) -> bool {
322 self.len() == 0
323 }
324
325 #[allow(clippy::len_without_is_empty)]
327 #[inline]
328 pub fn len(&self) -> usize {
329 self.sound_count.load(Ordering::Relaxed)
330 }
331
332 #[inline]
340 pub fn get_pos(&self) -> Duration {
341 *self.controls.position.lock().unwrap()
342 }
343}
344
345impl Drop for Player {
346 #[inline]
347 fn drop(&mut self) {
348 self.queue_tx.set_keep_alive_if_empty(false);
349
350 if !self.detached {
351 self.controls.stopped.store(true, Ordering::Relaxed);
352 }
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use std::sync::atomic::Ordering;
359
360 use crate::buffer::SamplesBuffer;
361 use crate::math::nz;
362 use crate::{Player, Source};
363
364 #[test]
365 fn test_pause_and_stop() {
366 let (player, mut source) = Player::new();
367
368 assert_eq!(source.next(), Some(0.0));
369 let mut source = source.skip_while(|x| *x == 0.0);
374
375 let v = vec![10.0, -10.0, 20.0, -20.0, 30.0, -30.0];
376
377 player.append(SamplesBuffer::new(nz!(1), nz!(1), v.clone()));
379 let mut reference_src = SamplesBuffer::new(nz!(1), nz!(1), v);
380
381 assert_eq!(source.next(), reference_src.next());
382 assert_eq!(source.next(), reference_src.next());
383
384 player.pause();
385
386 assert_eq!(source.next(), Some(0.0));
387
388 player.play();
389
390 assert_eq!(source.next(), reference_src.next());
391 assert_eq!(source.next(), reference_src.next());
392
393 player.stop();
394
395 assert_eq!(source.next(), Some(0.0));
396
397 assert!(player.empty());
398 }
399
400 #[test]
401 fn test_stop_and_start() {
402 let (player, mut queue_rx) = Player::new();
403
404 let v = vec![10.0, -10.0, 20.0, -20.0, 30.0, -30.0];
405
406 player.append(SamplesBuffer::new(nz!(1), nz!(1), v.clone()));
407 let mut src = SamplesBuffer::new(nz!(1), nz!(1), v.clone());
408
409 assert_eq!(queue_rx.next(), src.next());
410 assert_eq!(queue_rx.next(), src.next());
411
412 player.stop();
413
414 assert!(player.controls.stopped.load(Ordering::SeqCst));
415 assert_eq!(queue_rx.next(), Some(0.0));
416
417 src = SamplesBuffer::new(nz!(1), nz!(1), v.clone());
418 player.append(SamplesBuffer::new(nz!(1), nz!(1), v));
419
420 assert!(!player.controls.stopped.load(Ordering::SeqCst));
421 let mut queue_rx = queue_rx.skip_while(|v| *v == 0.0);
423
424 assert_eq!(queue_rx.next(), src.next());
425 assert_eq!(queue_rx.next(), src.next());
426 }
427
428 #[test]
429 fn test_volume() {
430 let (player, mut queue_rx) = Player::new();
431
432 let v = vec![10.0, -10.0, 20.0, -20.0, 30.0, -30.0];
433
434 player.append(SamplesBuffer::new(nz!(2), nz!(44100), v.clone()));
436 let src = SamplesBuffer::new(nz!(2), nz!(44100), v.clone());
437
438 let mut src = src.amplify(0.5);
439 player.set_volume(0.5);
440
441 for _ in 0..v.len() {
442 assert_eq!(queue_rx.next(), src.next());
443 }
444 }
445}