1use std::collections::VecDeque;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7
8use crate::source::{Empty, SeekError, Source, Zero};
9use crate::Sample;
10
11use crate::common::{ChannelCount, SampleRate};
12#[cfg(feature = "crossbeam-channel")]
13use crossbeam_channel::{unbounded as channel, Receiver, Sender};
14#[cfg(not(feature = "crossbeam-channel"))]
15use std::sync::mpsc::{channel, Receiver, Sender};
16
17pub fn queue(keep_alive_if_empty: bool) -> (Arc<SourcesQueueInput>, SourcesQueueOutput) {
29 let input = Arc::new(SourcesQueueInput {
30 next_sounds: Mutex::new(VecDeque::new()),
31 keep_alive_if_empty: AtomicBool::new(keep_alive_if_empty),
32 });
33
34 let output = SourcesQueueOutput {
35 current: Box::new(Empty::new()) as Box<_>,
36 signal_after_end: None,
37 input: input.clone(),
38 samples_consumed_in_span: 0,
39 padding_samples_remaining: 0,
40 };
41
42 (input, output)
43}
44
45type Sound = Box<dyn Source + Send>;
48type SignalDone = Option<Sender<()>>;
49
50pub struct SourcesQueueInput {
52 next_sounds: Mutex<VecDeque<(Sound, SignalDone)>>,
53
54 keep_alive_if_empty: AtomicBool,
56}
57
58impl SourcesQueueInput {
59 #[inline]
61 pub fn append<T>(&self, source: T)
62 where
63 T: Source + Send + 'static,
64 {
65 self.next_sounds
66 .lock()
67 .unwrap()
68 .push_back((Box::new(source) as Box<_>, None));
69 }
70
71 #[inline]
77 pub fn append_with_signal<T>(&self, source: T) -> Receiver<()>
78 where
79 T: Source + Send + 'static,
80 {
81 let (tx, rx) = channel();
82 self.next_sounds
83 .lock()
84 .unwrap()
85 .push_back((Box::new(source) as Box<_>, Some(tx)));
86 rx
87 }
88
89 pub fn set_keep_alive_if_empty(&self, keep_alive_if_empty: bool) {
93 self.keep_alive_if_empty
94 .store(keep_alive_if_empty, Ordering::Release);
95 }
96
97 pub fn clear(&self) -> usize {
99 let mut sounds = self.next_sounds.lock().unwrap();
100 let len = sounds.len();
101 sounds.clear();
102 len
103 }
104}
105pub struct SourcesQueueOutput {
107 current: Box<dyn Source + Send>,
109
110 signal_after_end: Option<Sender<()>>,
112
113 input: Arc<SourcesQueueInput>,
115
116 samples_consumed_in_span: usize,
118
119 padding_samples_remaining: usize,
122}
123
124#[inline]
129fn threshold(channels: ChannelCount) -> usize {
130 const BASE_SAMPLES: usize = 512;
131 let ch = channels.get() as usize;
132 BASE_SAMPLES.div_ceil(ch) * ch
133}
134
135impl Source for SourcesQueueOutput {
136 #[inline]
137 fn current_span_len(&self) -> Option<usize> {
138 if !self.current.is_exhausted() {
151 return self.current.current_span_len();
152 } else if self.input.keep_alive_if_empty.load(Ordering::Acquire)
153 && self.input.next_sounds.lock().unwrap().is_empty()
154 {
155 return Some(threshold(self.current.channels()));
157 }
158
159 let (lower_bound, _) = self.current.size_hint();
161 if lower_bound > 0 {
164 return Some(lower_bound);
165 }
166
167 Some(threshold(self.current.channels()))
169 }
170
171 #[inline]
172 fn channels(&self) -> ChannelCount {
173 if !self.current.is_exhausted() {
174 self.current.channels()
179 } else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().front() {
180 next.channels()
184 } else {
185 self.current.channels()
190 }
191 }
192
193 #[inline]
194 fn sample_rate(&self) -> SampleRate {
195 if !self.current.is_exhausted() {
196 self.current.sample_rate()
198 } else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().front() {
199 next.sample_rate()
202 } else {
203 self.current.sample_rate()
205 }
206 }
207
208 #[inline]
209 fn total_duration(&self) -> Option<Duration> {
210 None
211 }
212
213 #[inline]
223 fn try_seek(&mut self, pos: Duration) -> Result<(), SeekError> {
224 self.current.try_seek(pos)
225 }
226}
227
228impl Iterator for SourcesQueueOutput {
229 type Item = Sample;
230
231 #[inline]
232 fn next(&mut self) -> Option<Self::Item> {
233 loop {
234 if self.padding_samples_remaining > 0 {
236 self.padding_samples_remaining -= 1;
237 return Some(0.0);
238 }
239
240 if let Some(sample) = self.current.next() {
242 self.samples_consumed_in_span += 1;
243 return Some(sample);
244 }
245
246 let channels = self.current.channels().get() as usize;
248 let incomplete_frame_samples = self.samples_consumed_in_span % channels;
249 if incomplete_frame_samples > 0 {
250 self.padding_samples_remaining = channels - incomplete_frame_samples;
252 self.samples_consumed_in_span = 0;
254 continue;
256 }
257
258 self.samples_consumed_in_span = 0;
261 if self.go_next().is_err() {
262 return None;
263 }
264 }
265 }
266
267 #[inline]
268 fn size_hint(&self) -> (usize, Option<usize>) {
269 (self.current.size_hint().0, None)
270 }
271}
272
273impl SourcesQueueOutput {
274 fn go_next(&mut self) -> Result<(), ()> {
279 if let Some(signal_after_end) = self.signal_after_end.take() {
280 let _ = signal_after_end.send(());
281 }
282
283 let (next, signal_after_end) = {
284 let mut next = self.input.next_sounds.lock().unwrap();
285
286 if let Some(next) = next.pop_front() {
287 next
288 } else {
289 let channels = self.current.channels();
290 let silence = Box::new(Zero::new_samples(
291 channels,
292 self.current.sample_rate(),
293 threshold(channels),
294 )) as Box<_>;
295 if self.input.keep_alive_if_empty.load(Ordering::Acquire) {
296 (silence, None)
298 } else {
299 return Err(());
300 }
301 }
302 };
303
304 self.current = next;
305 self.signal_after_end = signal_after_end;
306 Ok(())
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use crate::buffer::SamplesBuffer;
313 use crate::math::nz;
314 use crate::source::{SeekError, Source};
315 use crate::{queue, ChannelCount, Sample, SampleRate};
316 use std::time::Duration;
317
318 #[test]
319 fn basic() {
320 let (tx, mut rx) = queue::queue(false);
321
322 tx.append(SamplesBuffer::new(
323 nz!(1),
324 nz!(48000),
325 vec![10.0, -10.0, 10.0, -10.0],
326 ));
327 tx.append(SamplesBuffer::new(
328 nz!(2),
329 nz!(96000),
330 vec![5.0, 5.0, 5.0, 5.0],
331 ));
332
333 assert_eq!(rx.channels(), nz!(1));
334 assert_eq!(rx.sample_rate().get(), 48000);
335 assert_eq!(rx.next(), Some(10.0));
336 assert_eq!(rx.next(), Some(-10.0));
337 assert_eq!(rx.next(), Some(10.0));
338 assert_eq!(rx.next(), Some(-10.0));
339 assert_eq!(rx.channels(), nz!(2));
340 assert_eq!(rx.sample_rate().get(), 96000);
341 assert_eq!(rx.next(), Some(5.0));
342 assert_eq!(rx.next(), Some(5.0));
343 assert_eq!(rx.next(), Some(5.0));
344 assert_eq!(rx.next(), Some(5.0));
345 assert_eq!(rx.next(), None);
346 }
347
348 #[test]
349 fn immediate_end() {
350 let (_, mut rx) = queue::queue(false);
351 assert_eq!(rx.next(), None);
352 }
353
354 #[test]
355 fn keep_alive() {
356 let (tx, mut rx) = queue::queue(true);
357 tx.append(SamplesBuffer::new(
358 nz!(1),
359 nz!(48000),
360 vec![10.0, -10.0, 10.0, -10.0],
361 ));
362
363 assert_eq!(rx.next(), Some(10.0));
364 assert_eq!(rx.next(), Some(-10.0));
365 assert_eq!(rx.next(), Some(10.0));
366 assert_eq!(rx.next(), Some(-10.0));
367
368 for _ in 0..100000 {
369 assert_eq!(rx.next(), Some(0.0));
370 }
371 }
372
373 #[test]
374 #[ignore] fn no_delay_when_added() {
376 let (tx, mut rx) = queue::queue(true);
377
378 for _ in 0..500 {
379 assert_eq!(rx.next(), Some(0.0));
380 }
381
382 tx.append(SamplesBuffer::new(
383 nz!(1),
384 nz!(48000),
385 vec![10.0, -10.0, 10.0, -10.0],
386 ));
387 assert_eq!(rx.next(), Some(10.0));
388 assert_eq!(rx.next(), Some(-10.0));
389 assert_eq!(rx.next(), Some(10.0));
390 assert_eq!(rx.next(), Some(-10.0));
391 }
392
393 #[test]
394 fn append_updates_metadata() {
395 for keep_alive in [false, true] {
396 let (tx, rx) = queue::queue(keep_alive);
397 assert_eq!(
398 rx.channels(),
399 nz!(1),
400 "Initial channels should be 1 (keep_alive={keep_alive})"
401 );
402 assert_eq!(
403 rx.sample_rate(),
404 nz!(48000),
405 "Initial sample rate should be 48000 (keep_alive={keep_alive})"
406 );
407
408 tx.append(SamplesBuffer::new(
409 nz!(2),
410 nz!(44100),
411 vec![0.1, 0.2, 0.3, 0.4],
412 ));
413
414 assert_eq!(
415 rx.channels(),
416 nz!(2),
417 "Channels should update to 2 (keep_alive={keep_alive})"
418 );
419 assert_eq!(
420 rx.sample_rate(),
421 nz!(44100),
422 "Sample rate should update to 44100 (keep_alive={keep_alive})"
423 );
424 }
425 }
426
427 #[test]
428 fn span_ending_mid_frame() {
429 let mut test_source1 = TestSource::new(&[0.1, 0.2, 0.1, 0.2, 0.1])
430 .with_channels(nz!(2))
431 .with_false_span_len(Some(6));
432 let mut test_source2 = TestSource::new(&[0.3, 0.4, 0.3, 0.4]).with_channels(nz!(2));
433
434 let (controls, mut source) = queue::queue(true);
435 controls.append(test_source1.clone());
436 controls.append(test_source2.clone());
437
438 assert_eq!(source.next(), test_source1.next());
439 assert_eq!(source.next(), test_source1.next());
440 assert_eq!(source.next(), test_source1.next());
441 assert_eq!(source.next(), test_source1.next());
442 assert_eq!(source.next(), test_source1.next());
443 assert_eq!(None, test_source1.next());
444
445 assert_eq!(
448 source.next(),
449 Some(0.0),
450 "Expected silence to complete frame"
451 );
452
453 assert_eq!(source.next(), test_source2.next());
454 assert_eq!(source.next(), test_source2.next());
455 assert_eq!(source.next(), test_source2.next());
456 assert_eq!(source.next(), test_source2.next());
457 }
458
459 #[derive(Debug, Clone)]
462 struct TestSource {
463 samples: Vec<Sample>,
464 pos: usize,
465 channels: ChannelCount,
466 sample_rate: SampleRate,
467 total_span_len: Option<usize>,
468 }
469
470 impl TestSource {
471 fn new(samples: &[Sample]) -> Self {
472 let samples = samples.to_vec();
473 Self {
474 total_span_len: Some(samples.len()),
475 pos: 0,
476 channels: nz!(1),
477 sample_rate: nz!(44100),
478 samples,
479 }
480 }
481
482 fn with_channels(mut self, count: ChannelCount) -> Self {
483 self.channels = count;
484 self
485 }
486
487 fn with_false_span_len(mut self, total_len: Option<usize>) -> Self {
488 self.total_span_len = total_len;
489 self
490 }
491 }
492
493 impl Iterator for TestSource {
494 type Item = Sample;
495
496 fn next(&mut self) -> Option<Self::Item> {
497 let res = self.samples.get(self.pos).copied();
498 self.pos += 1;
499 res
500 }
501
502 fn size_hint(&self) -> (usize, Option<usize>) {
503 let remaining = self.samples.len().saturating_sub(self.pos);
504 (remaining, Some(remaining))
505 }
506 }
507
508 impl Source for TestSource {
509 fn current_span_len(&self) -> Option<usize> {
510 self.total_span_len
511 }
512
513 fn channels(&self) -> ChannelCount {
514 self.channels
515 }
516
517 fn sample_rate(&self) -> SampleRate {
518 self.sample_rate
519 }
520
521 fn total_duration(&self) -> Option<Duration> {
522 None
523 }
524
525 fn try_seek(&mut self, _: Duration) -> Result<(), SeekError> {
526 Err(SeekError::NotSupported {
527 underlying_source: std::any::type_name::<Self>(),
528 })
529 }
530 }
531}