1use crate::common::{ChannelCount, SampleRate};
4use crate::source::{SeekError, Source, UniformSourceIterator};
5use crate::Sample;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8use std::time::Duration;
9
10pub fn mixer(channels: ChannelCount, sample_rate: SampleRate) -> (Mixer, MixerSource) {
22 let input = Mixer(Arc::new(Inner {
23 has_pending: AtomicBool::new(false),
24 pending_sources: Mutex::new(Vec::new()),
25 channels,
26 sample_rate,
27 }));
28
29 let output = MixerSource {
30 current_sources: Vec::with_capacity(16),
31 input: input.clone(),
32 sample_count: 0,
33 still_pending: vec![],
34 still_current: vec![],
35 };
36
37 (input, output)
38}
39
40#[derive(Clone)]
42pub struct Mixer(Arc<Inner>);
43
44struct Inner {
45 has_pending: AtomicBool,
46 pending_sources: Mutex<Vec<Box<dyn Source + Send>>>,
47 channels: ChannelCount,
48 sample_rate: SampleRate,
49}
50
51impl Mixer {
52 #[inline]
54 pub fn add<T>(&self, source: T)
55 where
56 T: Source + Send + 'static,
57 {
58 let uniform_source =
59 UniformSourceIterator::new(source, self.0.channels, self.0.sample_rate);
60 self.0
61 .pending_sources
62 .lock()
63 .unwrap()
64 .push(Box::new(uniform_source) as Box<_>);
65 self.0.has_pending.store(true, Ordering::SeqCst); }
67}
68
69pub struct MixerSource {
71 current_sources: Vec<Box<dyn Source + Send>>,
73
74 input: Mixer,
76
77 sample_count: usize,
79
80 still_pending: Vec<Box<dyn Source + Send>>,
82
83 still_current: Vec<Box<dyn Source + Send>>,
85}
86
87impl Source for MixerSource {
88 #[inline]
89 fn current_span_len(&self) -> Option<usize> {
90 None
91 }
92
93 #[inline]
94 fn channels(&self) -> ChannelCount {
95 self.input.0.channels
96 }
97
98 #[inline]
99 fn sample_rate(&self) -> SampleRate {
100 self.input.0.sample_rate
101 }
102
103 #[inline]
104 fn total_duration(&self) -> Option<Duration> {
105 None
106 }
107
108 #[inline]
109 fn try_seek(&mut self, _: Duration) -> Result<(), SeekError> {
110 Err(SeekError::NotSupported {
111 underlying_source: std::any::type_name::<Self>(),
112 })
113
114 }
143}
144
145impl Iterator for MixerSource {
146 type Item = Sample;
147
148 #[inline]
149 fn next(&mut self) -> Option<Self::Item> {
150 if self.input.0.has_pending.load(Ordering::SeqCst) {
151 self.start_pending_sources();
152 }
153
154 self.sample_count += 1;
155
156 let sum = self.sum_current_sources();
157
158 if self.current_sources.is_empty() {
159 None
160 } else {
161 Some(sum)
162 }
163 }
164
165 #[inline]
166 fn size_hint(&self) -> (usize, Option<usize>) {
167 (0, None)
168 }
169}
170
171impl MixerSource {
172 fn start_pending_sources(&mut self) {
177 let mut pending = self.input.0.pending_sources.lock().unwrap(); for source in pending.drain(..) {
180 let in_step = self.sample_count % source.channels() as usize == 0;
181
182 if in_step {
183 self.current_sources.push(source);
184 } else {
185 self.still_pending.push(source);
186 }
187 }
188 std::mem::swap(&mut self.still_pending, &mut pending);
189
190 let has_pending = !pending.is_empty();
191 self.input
192 .0
193 .has_pending
194 .store(has_pending, Ordering::SeqCst); }
196
197 fn sum_current_sources(&mut self) -> Sample {
198 let mut sum = 0.0;
199 for mut source in self.current_sources.drain(..) {
200 if let Some(value) = source.next() {
201 sum += value;
202 self.still_current.push(source);
203 }
204 }
205 std::mem::swap(&mut self.still_current, &mut self.current_sources);
206
207 sum
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use crate::buffer::SamplesBuffer;
214 use crate::mixer;
215 use crate::source::Source;
216
217 #[test]
218 fn basic() {
219 let (tx, mut rx) = mixer::mixer(1, 48000);
220
221 tx.add(SamplesBuffer::new(1, 48000, vec![10.0, -10.0, 10.0, -10.0]));
222 tx.add(SamplesBuffer::new(1, 48000, vec![5.0, 5.0, 5.0, 5.0]));
223
224 assert_eq!(rx.channels(), 1);
225 assert_eq!(rx.sample_rate(), 48000);
226 assert_eq!(rx.next(), Some(15.0));
227 assert_eq!(rx.next(), Some(-5.0));
228 assert_eq!(rx.next(), Some(15.0));
229 assert_eq!(rx.next(), Some(-5.0));
230 assert_eq!(rx.next(), None);
231 }
232
233 #[test]
234 fn channels_conv() {
235 let (tx, mut rx) = mixer::mixer(2, 48000);
236
237 tx.add(SamplesBuffer::new(1, 48000, vec![10.0, -10.0, 10.0, -10.0]));
238 tx.add(SamplesBuffer::new(1, 48000, vec![5.0, 5.0, 5.0, 5.0]));
239
240 assert_eq!(rx.channels(), 2);
241 assert_eq!(rx.sample_rate(), 48000);
242 assert_eq!(rx.next(), Some(15.0));
243 assert_eq!(rx.next(), Some(15.0));
244 assert_eq!(rx.next(), Some(-5.0));
245 assert_eq!(rx.next(), Some(-5.0));
246 assert_eq!(rx.next(), Some(15.0));
247 assert_eq!(rx.next(), Some(15.0));
248 assert_eq!(rx.next(), Some(-5.0));
249 assert_eq!(rx.next(), Some(-5.0));
250 assert_eq!(rx.next(), None);
251 }
252
253 #[test]
254 fn rate_conv() {
255 let (tx, mut rx) = mixer::mixer(1, 96000);
256
257 tx.add(SamplesBuffer::new(1, 48000, vec![10.0, -10.0, 10.0, -10.0]));
258 tx.add(SamplesBuffer::new(1, 48000, vec![5.0, 5.0, 5.0, 5.0]));
259
260 assert_eq!(rx.channels(), 1);
261 assert_eq!(rx.sample_rate(), 96000);
262 assert_eq!(rx.next(), Some(15.0));
263 assert_eq!(rx.next(), Some(5.0));
264 assert_eq!(rx.next(), Some(-5.0));
265 assert_eq!(rx.next(), Some(5.0));
266 assert_eq!(rx.next(), Some(15.0));
267 assert_eq!(rx.next(), Some(5.0));
268 assert_eq!(rx.next(), Some(-5.0));
269 assert_eq!(rx.next(), None);
270 }
271
272 #[test]
273 fn start_afterwards() {
274 let (tx, mut rx) = mixer::mixer(1, 48000);
275
276 tx.add(SamplesBuffer::new(1, 48000, vec![10.0, -10.0, 10.0, -10.0]));
277
278 assert_eq!(rx.next(), Some(10.0));
279 assert_eq!(rx.next(), Some(-10.0));
280
281 tx.add(SamplesBuffer::new(
282 1,
283 48000,
284 vec![5.0, 5.0, 6.0, 6.0, 7.0, 7.0, 7.0],
285 ));
286
287 assert_eq!(rx.next(), Some(15.0));
288 assert_eq!(rx.next(), Some(-5.0));
289
290 assert_eq!(rx.next(), Some(6.0));
291 assert_eq!(rx.next(), Some(6.0));
292
293 tx.add(SamplesBuffer::new(1, 48000, vec![2.0]));
294
295 assert_eq!(rx.next(), Some(9.0));
296 assert_eq!(rx.next(), Some(7.0));
297 assert_eq!(rx.next(), Some(7.0));
298
299 assert_eq!(rx.next(), None);
300 }
301}