1use crate::common::{ChannelCount, SampleRate};
4use crate::source::{SeekError, Source, UniformSourceIterator};
5use crate::Sample;
6use std::sync::Arc;
7use std::time::Duration;
8
9#[cfg(feature = "crossbeam-channel")]
10use crossbeam_channel::{unbounded as channel, Receiver, Sender};
11#[cfg(not(feature = "crossbeam-channel"))]
12use std::sync::mpsc::{channel, Receiver, Sender};
13
14pub fn mixer(channels: ChannelCount, sample_rate: SampleRate) -> (Mixer, MixerSource) {
26 let (tx, rx) = channel();
27
28 let input = Mixer(Arc::new(Inner {
29 pending_tx: tx,
30 channels,
31 sample_rate,
32 }));
33
34 let output = MixerSource {
35 current_sources: Vec::with_capacity(16),
36 input: input.clone(),
37 sample_count: 0,
38 pending_rx: rx,
39 };
40
41 (input, output)
42}
43
44#[derive(Clone)]
46pub struct Mixer(Arc<Inner>);
47
48struct Inner {
49 pending_tx: Sender<Box<dyn Source + Send>>,
50 channels: ChannelCount,
51 sample_rate: SampleRate,
52}
53
54impl Mixer {
55 #[inline]
57 pub fn add<T>(&self, source: T)
58 where
59 T: Source + Send + 'static,
60 {
61 let uniform_source =
62 UniformSourceIterator::new(source, self.0.channels, self.0.sample_rate);
63 let _ = self.0.pending_tx.send(Box::new(uniform_source));
65 }
66}
67
68pub struct MixerSource {
70 current_sources: Vec<Box<dyn Source + Send>>,
72
73 input: Mixer,
75
76 sample_count: u16,
78
79 pending_rx: Receiver<Box<dyn Source + Send>>,
81}
82
83impl Source for MixerSource {
84 #[inline]
85 fn current_span_len(&self) -> Option<usize> {
86 None
87 }
88
89 #[inline]
90 fn channels(&self) -> ChannelCount {
91 self.input.0.channels
92 }
93
94 #[inline]
95 fn sample_rate(&self) -> SampleRate {
96 self.input.0.sample_rate
97 }
98
99 #[inline]
100 fn total_duration(&self) -> Option<Duration> {
101 None
102 }
103
104 #[inline]
105 fn try_seek(&mut self, _: Duration) -> Result<(), SeekError> {
106 Err(SeekError::NotSupported {
107 underlying_source: std::any::type_name::<Self>(),
108 })
109 }
110}
111
112impl Iterator for MixerSource {
113 type Item = Sample;
114
115 #[inline]
116 fn next(&mut self) -> Option<Self::Item> {
117 self.start_pending_sources();
118
119 self.sample_count += 1;
120 self.sample_count %= self.channels().get();
121
122 let sum = self.sum_current_sources();
123
124 if self.current_sources.is_empty() {
125 None
126 } else {
127 Some(sum)
128 }
129 }
130
131 #[inline]
132 fn size_hint(&self) -> (usize, Option<usize>) {
133 (0, None)
134 }
135}
136
137impl MixerSource {
138 fn start_pending_sources(&mut self) {
143 if self.sample_count == 0 {
144 self.current_sources.extend(self.pending_rx.try_iter());
145 }
146 }
147
148 fn sum_current_sources(&mut self) -> Sample {
149 let mut sum = 0.0;
150 self.current_sources.retain_mut(|source| {
151 match source.next() {
152 Some(value) => {
153 sum += value;
154 true }
156 None => false, }
158 });
159
160 sum
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use crate::buffer::SamplesBuffer;
167 use crate::math::nz;
168 use crate::mixer;
169 use crate::source::Source;
170
171 #[test]
172 fn basic() {
173 let (tx, mut rx) = mixer::mixer(nz!(1), nz!(48000));
174
175 tx.add(SamplesBuffer::new(
176 nz!(1),
177 nz!(48000),
178 vec![10.0, -10.0, 10.0, -10.0],
179 ));
180 tx.add(SamplesBuffer::new(
181 nz!(1),
182 nz!(48000),
183 vec![5.0, 5.0, 5.0, 5.0],
184 ));
185
186 assert_eq!(rx.channels(), nz!(1));
187 assert_eq!(rx.sample_rate().get(), 48000);
188 assert_eq!(rx.next(), Some(15.0));
189 assert_eq!(rx.next(), Some(-5.0));
190 assert_eq!(rx.next(), Some(15.0));
191 assert_eq!(rx.next(), Some(-5.0));
192 assert_eq!(rx.next(), None);
193 }
194
195 #[test]
196 fn channels_conv() {
197 let (tx, mut rx) = mixer::mixer(nz!(2), nz!(48000));
198
199 tx.add(SamplesBuffer::new(
200 nz!(1),
201 nz!(48000),
202 vec![10.0, -10.0, 10.0, -10.0],
203 ));
204 tx.add(SamplesBuffer::new(
205 nz!(1),
206 nz!(48000),
207 vec![5.0, 5.0, 5.0, 5.0],
208 ));
209
210 assert_eq!(rx.channels(), nz!(2));
211 assert_eq!(rx.sample_rate().get(), 48000);
212 assert_eq!(rx.next(), Some(15.0));
213 assert_eq!(rx.next(), Some(15.0));
214 assert_eq!(rx.next(), Some(-5.0));
215 assert_eq!(rx.next(), Some(-5.0));
216 assert_eq!(rx.next(), Some(15.0));
217 assert_eq!(rx.next(), Some(15.0));
218 assert_eq!(rx.next(), Some(-5.0));
219 assert_eq!(rx.next(), Some(-5.0));
220 assert_eq!(rx.next(), None);
221 }
222
223 #[test]
224 fn rate_conv() {
225 let (tx, mut rx) = mixer::mixer(nz!(1), nz!(96000));
226
227 tx.add(SamplesBuffer::new(
228 nz!(1),
229 nz!(48000),
230 vec![10.0, -10.0, 10.0, -10.0],
231 ));
232 tx.add(SamplesBuffer::new(
233 nz!(1),
234 nz!(48000),
235 vec![5.0, 5.0, 5.0, 5.0],
236 ));
237
238 assert_eq!(rx.channels(), nz!(1));
239 assert_eq!(rx.sample_rate().get(), 96000);
240 assert_eq!(rx.next(), Some(15.0));
241 assert_eq!(rx.next(), Some(5.0));
242 assert_eq!(rx.next(), Some(-5.0));
243 assert_eq!(rx.next(), Some(5.0));
244 assert_eq!(rx.next(), Some(15.0));
245 assert_eq!(rx.next(), Some(5.0));
246 assert_eq!(rx.next(), Some(-5.0));
247 assert_eq!(rx.next(), None);
248 }
249
250 #[test]
251 fn start_afterwards() {
252 let (tx, mut rx) = mixer::mixer(nz!(1), nz!(48000));
253
254 tx.add(SamplesBuffer::new(
255 nz!(1),
256 nz!(48000),
257 vec![10.0, -10.0, 10.0, -10.0],
258 ));
259
260 assert_eq!(rx.next(), Some(10.0));
261 assert_eq!(rx.next(), Some(-10.0));
262
263 tx.add(SamplesBuffer::new(
264 nz!(1),
265 nz!(48000),
266 vec![5.0, 5.0, 6.0, 6.0, 7.0, 7.0, 7.0],
267 ));
268
269 assert_eq!(rx.next(), Some(15.0)); assert_eq!(rx.next(), Some(-5.0));
271
272 assert_eq!(rx.next(), Some(6.0));
273 assert_eq!(rx.next(), Some(6.0));
274
275 tx.add(SamplesBuffer::new(nz!(1), nz!(48000), vec![2.0]));
276
277 assert_eq!(rx.next(), Some(9.0));
278 assert_eq!(rx.next(), Some(7.0));
279 assert_eq!(rx.next(), Some(7.0));
280
281 assert_eq!(rx.next(), None);
282 }
283
284 #[test]
285 fn added_taking_phase_into_account() {
286 let (tx, mut rx) = mixer::mixer(nz!(2), nz!(48000));
287
288 tx.add(SamplesBuffer::new(
289 nz!(2),
290 nz!(48000),
291 vec![10.0, -10.0, 10.0, -10.0],
292 ));
293
294 assert_eq!(rx.next(), Some(10.0));
295
296 tx.add(SamplesBuffer::new(
297 nz!(2),
298 nz!(48000),
299 vec![5.0, -5.0, 6.0, -6.0],
300 ));
301
302 assert_eq!(rx.next(), Some(-10.0)); assert_eq!(rx.next(), Some(15.0)); }
305}