1use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use crate::source::{Empty, SeekError, Source, Zero};
8use crate::Sample;
9
10use crate::common::{ChannelCount, SampleRate};
11#[cfg(feature = "crossbeam-channel")]
12use crossbeam_channel::{unbounded as channel, Receiver, Sender};
13#[cfg(not(feature = "crossbeam-channel"))]
14use std::sync::mpsc::{channel, Receiver, Sender};
15
16pub fn queue(keep_alive_if_empty: bool) -> (Arc<SourcesQueueInput>, SourcesQueueOutput) {
28 let input = Arc::new(SourcesQueueInput {
29 next_sounds: Mutex::new(Vec::new()),
30 keep_alive_if_empty: AtomicBool::new(keep_alive_if_empty),
31 });
32
33 let output = SourcesQueueOutput {
34 current: Box::new(Empty::new()) as Box<_>,
35 signal_after_end: None,
36 input: input.clone(),
37 };
38
39 (input, output)
40}
41
42type Sound = Box<dyn Source + Send>;
45type SignalDone = Option<Sender<()>>;
46
47pub struct SourcesQueueInput {
49 next_sounds: Mutex<Vec<(Sound, SignalDone)>>,
50
51 keep_alive_if_empty: AtomicBool,
53}
54
55impl SourcesQueueInput {
56 #[inline]
58 pub fn append<T>(&self, source: T)
59 where
60 T: Source + Send + 'static,
61 {
62 self.next_sounds
63 .lock()
64 .unwrap()
65 .push((Box::new(source) as Box<_>, None));
66 }
67
68 #[inline]
74 pub fn append_with_signal<T>(&self, source: T) -> Receiver<()>
75 where
76 T: Source + Send + 'static,
77 {
78 let (tx, rx) = channel();
79 self.next_sounds
80 .lock()
81 .unwrap()
82 .push((Box::new(source) as Box<_>, Some(tx)));
83 rx
84 }
85
86 pub fn set_keep_alive_if_empty(&self, keep_alive_if_empty: bool) {
90 self.keep_alive_if_empty
91 .store(keep_alive_if_empty, Ordering::Release);
92 }
93
94 pub fn clear(&self) -> usize {
96 let mut sounds = self.next_sounds.lock().unwrap();
97 let len = sounds.len();
98 sounds.clear();
99 len
100 }
101}
102pub struct SourcesQueueOutput {
104 current: Box<dyn Source + Send>,
106
107 signal_after_end: Option<Sender<()>>,
109
110 input: Arc<SourcesQueueInput>,
112}
113
114const THRESHOLD: usize = 512;
115
116impl Source for SourcesQueueOutput {
117 #[inline]
118 fn current_span_len(&self) -> Option<usize> {
119 if let Some(val) = self.current.current_span_len() {
132 if val != 0 {
133 return Some(val);
134 } else if self.input.keep_alive_if_empty.load(Ordering::Acquire)
135 && self.input.next_sounds.lock().unwrap().is_empty()
136 {
137 return Some(THRESHOLD);
139 }
140 }
141
142 let (lower_bound, _) = self.current.size_hint();
144 if lower_bound > 0 {
147 return Some(lower_bound);
148 }
149
150 Some(THRESHOLD)
152 }
153
154 #[inline]
155 fn channels(&self) -> ChannelCount {
156 self.current.channels()
157 }
158
159 #[inline]
160 fn sample_rate(&self) -> SampleRate {
161 self.current.sample_rate()
162 }
163
164 #[inline]
165 fn total_duration(&self) -> Option<Duration> {
166 None
167 }
168
169 #[inline]
179 fn try_seek(&mut self, pos: Duration) -> Result<(), SeekError> {
180 self.current.try_seek(pos)
181 }
182}
183
184impl Iterator for SourcesQueueOutput {
185 type Item = Sample;
186
187 #[inline]
188 fn next(&mut self) -> Option<Self::Item> {
189 loop {
190 if let Some(sample) = self.current.next() {
192 return Some(sample);
193 }
194
195 if self.go_next().is_err() {
198 return None;
199 }
200 }
201 }
202
203 #[inline]
204 fn size_hint(&self) -> (usize, Option<usize>) {
205 (self.current.size_hint().0, None)
206 }
207}
208
209impl SourcesQueueOutput {
210 fn go_next(&mut self) -> Result<(), ()> {
215 if let Some(signal_after_end) = self.signal_after_end.take() {
216 let _ = signal_after_end.send(());
217 }
218
219 let (next, signal_after_end) = {
220 let mut next = self.input.next_sounds.lock().unwrap();
221
222 if next.is_empty() {
223 let silence = Box::new(Zero::new_samples(1, 44100, THRESHOLD)) as Box<_>;
224 if self.input.keep_alive_if_empty.load(Ordering::Acquire) {
225 (silence, None)
227 } else {
228 return Err(());
229 }
230 } else {
231 next.remove(0)
232 }
233 };
234
235 self.current = next;
236 self.signal_after_end = signal_after_end;
237 Ok(())
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use crate::buffer::SamplesBuffer;
244 use crate::queue;
245 use crate::source::Source;
246
247 #[test]
248 #[ignore] fn basic() {
250 let (tx, mut rx) = queue::queue(false);
251
252 tx.append(SamplesBuffer::new(1, 48000, vec![10.0, -10.0, 10.0, -10.0]));
253 tx.append(SamplesBuffer::new(2, 96000, vec![5.0, 5.0, 5.0, 5.0]));
254
255 assert_eq!(rx.channels(), 1);
256 assert_eq!(rx.sample_rate(), 48000);
257 assert_eq!(rx.next(), Some(10.0));
258 assert_eq!(rx.next(), Some(-10.0));
259 assert_eq!(rx.next(), Some(10.0));
260 assert_eq!(rx.next(), Some(-10.0));
261 assert_eq!(rx.channels(), 2);
262 assert_eq!(rx.sample_rate(), 96000);
263 assert_eq!(rx.next(), Some(5.0));
264 assert_eq!(rx.next(), Some(5.0));
265 assert_eq!(rx.next(), Some(5.0));
266 assert_eq!(rx.next(), Some(5.0));
267 assert_eq!(rx.next(), None);
268 }
269
270 #[test]
271 fn immediate_end() {
272 let (_, mut rx) = queue::queue(false);
273 assert_eq!(rx.next(), None);
274 }
275
276 #[test]
277 fn keep_alive() {
278 let (tx, mut rx) = queue::queue(true);
279 tx.append(SamplesBuffer::new(1, 48000, vec![10.0, -10.0, 10.0, -10.0]));
280
281 assert_eq!(rx.next(), Some(10.0));
282 assert_eq!(rx.next(), Some(-10.0));
283 assert_eq!(rx.next(), Some(10.0));
284 assert_eq!(rx.next(), Some(-10.0));
285
286 for _ in 0..100000 {
287 assert_eq!(rx.next(), Some(0.0));
288 }
289 }
290
291 #[test]
292 #[ignore] fn no_delay_when_added() {
294 let (tx, mut rx) = queue::queue(true);
295
296 for _ in 0..500 {
297 assert_eq!(rx.next(), Some(0.0));
298 }
299
300 tx.append(SamplesBuffer::new(1, 48000, vec![10.0, -10.0, 10.0, -10.0]));
301 assert_eq!(rx.next(), Some(10.0));
302 assert_eq!(rx.next(), Some(-10.0));
303 assert_eq!(rx.next(), Some(10.0));
304 assert_eq!(rx.next(), Some(-10.0));
305 }
306}