1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5#[cfg(feature = "crossbeam-channel")]
6use crossbeam_channel::{Receiver, Sender};
7use dasp_sample::FromSample;
8#[cfg(not(feature = "crossbeam-channel"))]
9use std::sync::mpsc::{Receiver, Sender};
10
11use crate::mixer::Mixer;
12use crate::source::SeekError;
13use crate::{queue, source::Done, Source};
14
15pub struct Sink {
20 queue_tx: Arc<queue::SourcesQueueInput>,
21 sleep_until_end: Mutex<Option<Receiver<()>>>,
22
23 controls: Arc<Controls>,
24 sound_count: Arc<AtomicUsize>,
25
26 detached: bool,
27}
28
29struct SeekOrder {
30 pos: Duration,
31 feedback: Sender<Result<(), SeekError>>,
32}
33
34impl SeekOrder {
35 fn new(pos: Duration) -> (Self, Receiver<Result<(), SeekError>>) {
36 #[cfg(not(feature = "crossbeam-channel"))]
37 let (tx, rx) = {
38 use std::sync::mpsc;
39 mpsc::channel()
40 };
41
42 #[cfg(feature = "crossbeam-channel")]
43 let (tx, rx) = {
44 use crossbeam_channel::bounded;
45 bounded(1)
46 };
47 (Self { pos, feedback: tx }, rx)
48 }
49
50 fn attempt<S>(self, maybe_seekable: &mut S)
51 where
52 S: Source,
53 {
54 let res = maybe_seekable.try_seek(self.pos);
55 let _ignore_receiver_dropped = self.feedback.send(res);
56 }
57}
58
59struct Controls {
60 pause: AtomicBool,
61 volume: Mutex<f32>,
62 stopped: AtomicBool,
63 speed: Mutex<f32>,
64 to_clear: Mutex<u32>,
65 seek: Mutex<Option<SeekOrder>>,
66 position: Mutex<Duration>,
67}
68
69impl Sink {
70 #[inline]
72 pub fn connect_new(mixer: &Mixer) -> Sink {
73 let (sink, source) = Sink::new();
74 mixer.add(source);
75 sink
76 }
77
78 #[inline]
80 pub fn new() -> (Sink, queue::SourcesQueueOutput) {
81 let (queue_tx, queue_rx) = queue::queue(true);
82
83 let sink = Sink {
84 queue_tx,
85 sleep_until_end: Mutex::new(None),
86 controls: Arc::new(Controls {
87 pause: AtomicBool::new(false),
88 volume: Mutex::new(1.0),
89 stopped: AtomicBool::new(false),
90 speed: Mutex::new(1.0),
91 to_clear: Mutex::new(0),
92 seek: Mutex::new(None),
93 position: Mutex::new(Duration::ZERO),
94 }),
95 sound_count: Arc::new(AtomicUsize::new(0)),
96 detached: false,
97 };
98 (sink, queue_rx)
99 }
100
101 #[inline]
103 pub fn append<S>(&self, source: S)
104 where
105 S: Source + Send + 'static,
106 f32: FromSample<S::Item>,
107 {
108 if self.controls.stopped.load(Ordering::SeqCst) {
110 if self.sound_count.load(Ordering::SeqCst) > 0 {
111 self.sleep_until_end();
112 }
113 self.controls.stopped.store(false, Ordering::SeqCst);
114 }
115
116 let controls = self.controls.clone();
117
118 let start_played = AtomicBool::new(false);
119
120 let source = source
121 .speed(1.0)
122 .track_position()
124 .pausable(false)
125 .amplify(1.0)
126 .skippable()
127 .stoppable()
128 .periodic_access(Duration::from_millis(5), move |src| {
130 if controls.stopped.load(Ordering::SeqCst) {
131 src.stop();
132 *controls.position.lock().unwrap() = Duration::ZERO;
133 }
134 {
135 let mut to_clear = controls.to_clear.lock().unwrap();
136 if *to_clear > 0 {
137 src.inner_mut().skip();
138 *to_clear -= 1;
139 *controls.position.lock().unwrap() = Duration::ZERO;
140 } else {
141 *controls.position.lock().unwrap() = src.inner().inner().inner().inner().get_pos();
142 }
143 }
144 let amp = src.inner_mut().inner_mut();
145 amp.set_factor(*controls.volume.lock().unwrap());
146 amp.inner_mut()
147 .set_paused(controls.pause.load(Ordering::SeqCst));
148 amp.inner_mut()
149 .inner_mut()
150 .inner_mut()
151 .set_factor(*controls.speed.lock().unwrap());
152 if let Some(seek) = controls.seek.lock().unwrap().take() {
153 seek.attempt(amp)
154 }
155 start_played.store(true, Ordering::SeqCst);
156 });
157 self.sound_count.fetch_add(1, Ordering::Relaxed);
158 let source = Done::new(source, self.sound_count.clone());
159 *self.sleep_until_end.lock().unwrap() = Some(self.queue_tx.append_with_signal(source));
160 }
161
162 #[inline]
167 pub fn volume(&self) -> f32 {
168 *self.controls.volume.lock().unwrap()
169 }
170
171 #[inline]
176 pub fn set_volume(&self, value: f32) {
177 *self.controls.volume.lock().unwrap() = value;
178 }
179
180 #[inline]
184 pub fn speed(&self) -> f32 {
185 *self.controls.speed.lock().unwrap()
186 }
187
188 #[inline]
202 pub fn set_speed(&self, value: f32) {
203 *self.controls.speed.lock().unwrap() = value;
204 }
205
206 #[inline]
210 pub fn play(&self) {
211 self.controls.pause.store(false, Ordering::SeqCst);
212 }
213
214 pub fn try_seek(&self, pos: Duration) -> Result<(), SeekError> {
237 let (order, feedback) = SeekOrder::new(pos);
238 *self.controls.seek.lock().unwrap() = Some(order);
239
240 if self.sound_count.load(Ordering::Acquire) == 0 {
241 return Ok(());
243 }
244
245 match feedback.recv() {
246 Ok(seek_res) => {
247 *self.controls.position.lock().unwrap() = pos;
248 seek_res
249 }
250 Err(_) => Ok(()),
254 }
255 }
256
257 pub fn pause(&self) {
263 self.controls.pause.store(true, Ordering::SeqCst);
264 }
265
266 pub fn is_paused(&self) -> bool {
271 self.controls.pause.load(Ordering::SeqCst)
272 }
273
274 pub fn clear(&self) {
278 let len = self.sound_count.load(Ordering::SeqCst) as u32;
279 *self.controls.to_clear.lock().unwrap() = len;
280 self.sleep_until_end();
281 self.pause();
282 }
283
284 pub fn skip_one(&self) {
290 let len = self.sound_count.load(Ordering::SeqCst) as u32;
291 let mut to_clear = self.controls.to_clear.lock().unwrap();
292 if len > *to_clear {
293 *to_clear += 1;
294 }
295 }
296
297 #[inline]
299 pub fn stop(&self) {
300 self.controls.stopped.store(true, Ordering::SeqCst);
301 }
302
303 #[inline]
305 pub fn detach(mut self) {
306 self.detached = true;
307 }
308
309 #[inline]
311 pub fn sleep_until_end(&self) {
312 if let Some(sleep_until_end) = self.sleep_until_end.lock().unwrap().take() {
313 let _ = sleep_until_end.recv();
314 }
315 }
316
317 #[inline]
319 pub fn empty(&self) -> bool {
320 self.len() == 0
321 }
322
323 #[allow(clippy::len_without_is_empty)]
325 #[inline]
326 pub fn len(&self) -> usize {
327 self.sound_count.load(Ordering::Relaxed)
328 }
329
330 #[inline]
338 pub fn get_pos(&self) -> Duration {
339 *self.controls.position.lock().unwrap()
340 }
341}
342
343impl Drop for Sink {
344 #[inline]
345 fn drop(&mut self) {
346 self.queue_tx.set_keep_alive_if_empty(false);
347
348 if !self.detached {
349 self.controls.stopped.store(true, Ordering::Relaxed);
350 }
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use std::sync::atomic::Ordering;
357
358 use crate::buffer::SamplesBuffer;
359 use crate::{Sink, Source};
360
361 #[test]
362 fn test_pause_and_stop() {
363 let (sink, mut source) = Sink::new();
364
365 assert_eq!(source.next(), Some(0.0));
366 let mut source = source.skip_while(|x| *x == 0.0);
371
372 let v = vec![10.0, -10.0, 20.0, -20.0, 30.0, -30.0];
373
374 sink.append(SamplesBuffer::new(1, 1, v.clone()));
376 let mut reference_src = SamplesBuffer::new(1, 1, v);
377
378 assert_eq!(source.next(), reference_src.next());
379 assert_eq!(source.next(), reference_src.next());
380
381 sink.pause();
382
383 assert_eq!(source.next(), Some(0.0));
384
385 sink.play();
386
387 assert_eq!(source.next(), reference_src.next());
388 assert_eq!(source.next(), reference_src.next());
389
390 sink.stop();
391
392 assert_eq!(source.next(), Some(0.0));
393
394 assert!(sink.empty());
395 }
396
397 #[test]
398 fn test_stop_and_start() {
399 let (sink, mut queue_rx) = Sink::new();
400
401 let v = vec![10.0, -10.0, 20.0, -20.0, 30.0, -30.0];
402
403 sink.append(SamplesBuffer::new(1, 1, v.clone()));
404 let mut src = SamplesBuffer::new(1, 1, v.clone());
405
406 assert_eq!(queue_rx.next(), src.next());
407 assert_eq!(queue_rx.next(), src.next());
408
409 sink.stop();
410
411 assert!(sink.controls.stopped.load(Ordering::SeqCst));
412 assert_eq!(queue_rx.next(), Some(0.0));
413
414 src = SamplesBuffer::new(1, 1, v.clone());
415 sink.append(SamplesBuffer::new(1, 1, v));
416
417 assert!(!sink.controls.stopped.load(Ordering::SeqCst));
418 let mut queue_rx = queue_rx.skip_while(|v| *v == 0.0);
420
421 assert_eq!(queue_rx.next(), src.next());
422 assert_eq!(queue_rx.next(), src.next());
423 }
424
425 #[test]
426 fn test_volume() {
427 let (sink, mut queue_rx) = Sink::new();
428
429 let v = vec![10.0, -10.0, 20.0, -20.0, 30.0, -30.0];
430
431 sink.append(SamplesBuffer::new(2, 44100, v.clone()));
433 let src = SamplesBuffer::new(2, 44100, v.clone());
434
435 let mut src = src.amplify(0.5);
436 sink.set_volume(0.5);
437
438 for _ in 0..v.len() {
439 assert_eq!(queue_rx.next(), src.next());
440 }
441 }
442}