rodio/
mixer.rs

1//! Mixer that plays multiple sounds at the same time.
2
3use crate::common::{ChannelCount, SampleRate};
4use crate::source::{SeekError, Source, UniformSourceIterator};
5use crate::Sample;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8use std::time::Duration;
9
10/// Builds a new mixer.
11///
12/// You can choose the characteristics of the output thanks to this constructor. All the sounds
13/// added to the mixer will be converted to these values.
14///
15/// After creating a mixer, you can add new sounds with the controller.
16///
17/// Note that mixer without any input source behaves like an `Empty` (not: `Zero`) source,
18/// and thus, just after appending to a sink, the mixer is removed from the sink.
19/// As a result, input sources added to the mixer later might not be forwarded to the sink.
20/// Add `Zero` source to prevent detaching the mixer from sink.
21pub fn mixer(channels: ChannelCount, sample_rate: SampleRate) -> (Mixer, MixerSource) {
22    let input = Mixer(Arc::new(Inner {
23        has_pending: AtomicBool::new(false),
24        pending_sources: Mutex::new(Vec::new()),
25        channels,
26        sample_rate,
27    }));
28
29    let output = MixerSource {
30        current_sources: Vec::with_capacity(16),
31        input: input.clone(),
32        sample_count: 0,
33        still_pending: vec![],
34        still_current: vec![],
35    };
36
37    (input, output)
38}
39
40/// The input of the mixer.
41#[derive(Clone)]
42pub struct Mixer(Arc<Inner>);
43
44struct Inner {
45    has_pending: AtomicBool,
46    pending_sources: Mutex<Vec<Box<dyn Source + Send>>>,
47    channels: ChannelCount,
48    sample_rate: SampleRate,
49}
50
51impl Mixer {
52    /// Adds a new source to mix to the existing ones.
53    #[inline]
54    pub fn add<T>(&self, source: T)
55    where
56        T: Source + Send + 'static,
57    {
58        let uniform_source =
59            UniformSourceIterator::new(source, self.0.channels, self.0.sample_rate);
60        self.0
61            .pending_sources
62            .lock()
63            .unwrap()
64            .push(Box::new(uniform_source) as Box<_>);
65        self.0.has_pending.store(true, Ordering::SeqCst); // TODO: can we relax this ordering?
66    }
67}
68
69/// The output of the mixer. Implements `Source`.
70pub struct MixerSource {
71    // The current iterator that produces samples.
72    current_sources: Vec<Box<dyn Source + Send>>,
73
74    // The pending sounds.
75    input: Mixer,
76
77    // The number of samples produced so far.
78    sample_count: usize,
79
80    // A temporary vec used in start_pending_sources.
81    still_pending: Vec<Box<dyn Source + Send>>,
82
83    // A temporary vec used in sum_current_sources.
84    still_current: Vec<Box<dyn Source + Send>>,
85}
86
87impl Source for MixerSource {
88    #[inline]
89    fn current_span_len(&self) -> Option<usize> {
90        None
91    }
92
93    #[inline]
94    fn channels(&self) -> ChannelCount {
95        self.input.0.channels
96    }
97
98    #[inline]
99    fn sample_rate(&self) -> SampleRate {
100        self.input.0.sample_rate
101    }
102
103    #[inline]
104    fn total_duration(&self) -> Option<Duration> {
105        None
106    }
107
108    #[inline]
109    fn try_seek(&mut self, _: Duration) -> Result<(), SeekError> {
110        Err(SeekError::NotSupported {
111            underlying_source: std::any::type_name::<Self>(),
112        })
113
114        // uncomment when #510 is implemented (query position of playback)
115
116        // let mut org_positions = Vec::with_capacity(self.current_sources.len());
117        // let mut encounterd_err = None;
118        //
119        // for source in &mut self.current_sources {
120        //     let pos = /* source.playback_pos() */ todo!();
121        //     if let Err(e) = source.try_seek(pos) {
122        //         encounterd_err = Some(e);
123        //         break;
124        //     } else {
125        //         // store pos in case we need to roll back
126        //         org_positions.push(pos);
127        //     }
128        // }
129        //
130        // if let Some(e) = encounterd_err {
131        //     // rollback seeks that happend before err
132        //     for (pos, source) in org_positions
133        //         .into_iter()
134        //         .zip(self.current_sources.iter_mut())
135        //     {
136        //         source.try_seek(pos)?;
137        //     }
138        //     Err(e)
139        // } else {
140        //     Ok(())
141        // }
142    }
143}
144
145impl Iterator for MixerSource {
146    type Item = Sample;
147
148    #[inline]
149    fn next(&mut self) -> Option<Self::Item> {
150        if self.input.0.has_pending.load(Ordering::SeqCst) {
151            self.start_pending_sources();
152        }
153
154        self.sample_count += 1;
155
156        let sum = self.sum_current_sources();
157
158        if self.current_sources.is_empty() {
159            None
160        } else {
161            Some(sum)
162        }
163    }
164
165    #[inline]
166    fn size_hint(&self) -> (usize, Option<usize>) {
167        (0, None)
168    }
169}
170
171impl MixerSource {
172    // Samples from the #next() function are interlaced for each of the channels.
173    // We need to ensure we start playing sources so that their samples are
174    // in-step with the modulo of the samples produced so far. Otherwise, the
175    // sound will play on the wrong channels, e.g. left / right will be reversed.
176    fn start_pending_sources(&mut self) {
177        let mut pending = self.input.0.pending_sources.lock().unwrap(); // TODO: relax ordering?
178
179        for source in pending.drain(..) {
180            let in_step = self.sample_count % source.channels() as usize == 0;
181
182            if in_step {
183                self.current_sources.push(source);
184            } else {
185                self.still_pending.push(source);
186            }
187        }
188        std::mem::swap(&mut self.still_pending, &mut pending);
189
190        let has_pending = !pending.is_empty();
191        self.input
192            .0
193            .has_pending
194            .store(has_pending, Ordering::SeqCst); // TODO: relax ordering?
195    }
196
197    fn sum_current_sources(&mut self) -> Sample {
198        let mut sum = 0.0;
199        for mut source in self.current_sources.drain(..) {
200            if let Some(value) = source.next() {
201                sum += value;
202                self.still_current.push(source);
203            }
204        }
205        std::mem::swap(&mut self.still_current, &mut self.current_sources);
206
207        sum
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use crate::buffer::SamplesBuffer;
214    use crate::mixer;
215    use crate::source::Source;
216
217    #[test]
218    fn basic() {
219        let (tx, mut rx) = mixer::mixer(1, 48000);
220
221        tx.add(SamplesBuffer::new(1, 48000, vec![10.0, -10.0, 10.0, -10.0]));
222        tx.add(SamplesBuffer::new(1, 48000, vec![5.0, 5.0, 5.0, 5.0]));
223
224        assert_eq!(rx.channels(), 1);
225        assert_eq!(rx.sample_rate(), 48000);
226        assert_eq!(rx.next(), Some(15.0));
227        assert_eq!(rx.next(), Some(-5.0));
228        assert_eq!(rx.next(), Some(15.0));
229        assert_eq!(rx.next(), Some(-5.0));
230        assert_eq!(rx.next(), None);
231    }
232
233    #[test]
234    fn channels_conv() {
235        let (tx, mut rx) = mixer::mixer(2, 48000);
236
237        tx.add(SamplesBuffer::new(1, 48000, vec![10.0, -10.0, 10.0, -10.0]));
238        tx.add(SamplesBuffer::new(1, 48000, vec![5.0, 5.0, 5.0, 5.0]));
239
240        assert_eq!(rx.channels(), 2);
241        assert_eq!(rx.sample_rate(), 48000);
242        assert_eq!(rx.next(), Some(15.0));
243        assert_eq!(rx.next(), Some(15.0));
244        assert_eq!(rx.next(), Some(-5.0));
245        assert_eq!(rx.next(), Some(-5.0));
246        assert_eq!(rx.next(), Some(15.0));
247        assert_eq!(rx.next(), Some(15.0));
248        assert_eq!(rx.next(), Some(-5.0));
249        assert_eq!(rx.next(), Some(-5.0));
250        assert_eq!(rx.next(), None);
251    }
252
253    #[test]
254    fn rate_conv() {
255        let (tx, mut rx) = mixer::mixer(1, 96000);
256
257        tx.add(SamplesBuffer::new(1, 48000, vec![10.0, -10.0, 10.0, -10.0]));
258        tx.add(SamplesBuffer::new(1, 48000, vec![5.0, 5.0, 5.0, 5.0]));
259
260        assert_eq!(rx.channels(), 1);
261        assert_eq!(rx.sample_rate(), 96000);
262        assert_eq!(rx.next(), Some(15.0));
263        assert_eq!(rx.next(), Some(5.0));
264        assert_eq!(rx.next(), Some(-5.0));
265        assert_eq!(rx.next(), Some(5.0));
266        assert_eq!(rx.next(), Some(15.0));
267        assert_eq!(rx.next(), Some(5.0));
268        assert_eq!(rx.next(), Some(-5.0));
269        assert_eq!(rx.next(), None);
270    }
271
272    #[test]
273    fn start_afterwards() {
274        let (tx, mut rx) = mixer::mixer(1, 48000);
275
276        tx.add(SamplesBuffer::new(1, 48000, vec![10.0, -10.0, 10.0, -10.0]));
277
278        assert_eq!(rx.next(), Some(10.0));
279        assert_eq!(rx.next(), Some(-10.0));
280
281        tx.add(SamplesBuffer::new(
282            1,
283            48000,
284            vec![5.0, 5.0, 6.0, 6.0, 7.0, 7.0, 7.0],
285        ));
286
287        assert_eq!(rx.next(), Some(15.0));
288        assert_eq!(rx.next(), Some(-5.0));
289
290        assert_eq!(rx.next(), Some(6.0));
291        assert_eq!(rx.next(), Some(6.0));
292
293        tx.add(SamplesBuffer::new(1, 48000, vec![2.0]));
294
295        assert_eq!(rx.next(), Some(9.0));
296        assert_eq!(rx.next(), Some(7.0));
297        assert_eq!(rx.next(), Some(7.0));
298
299        assert_eq!(rx.next(), None);
300    }
301}