Skip to main content

rodio/source/
buffered.rs

1use std::cmp;
2use std::mem;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use super::SeekError;
7use crate::common::{ChannelCount, SampleRate};
8use crate::math::nz;
9use crate::Source;
10
11/// Internal function that builds a `Buffered` object.
12#[inline]
13pub fn buffered<I>(input: I) -> Buffered<I>
14where
15    I: Source,
16{
17    let total_duration = input.total_duration();
18    let first_span = extract(input);
19
20    Buffered {
21        current_span: first_span,
22        position_in_span: 0,
23        total_duration,
24    }
25}
26
27/// Iterator that at the same time extracts data from the iterator and stores it in a buffer.
28pub struct Buffered<I>
29where
30    I: Source,
31{
32    /// Immutable reference to the next span of data. Cannot be `Span::Input`.
33    current_span: Arc<Span<I>>,
34
35    /// The position in number of samples of this iterator inside `current_span`.
36    position_in_span: usize,
37
38    /// Obtained once at creation and never modified again.
39    total_duration: Option<Duration>,
40}
41
42enum Span<I>
43where
44    I: Source,
45{
46    /// Data that has already been extracted from the iterator. Also contains a pointer to the
47    /// next span.
48    Data(SpanData<I>),
49
50    /// No more data.
51    End,
52
53    /// Unextracted data. The `Option` should never be `None` and is only here for easier data
54    /// processing.
55    Input(Mutex<Option<I>>),
56}
57
58struct SpanData<I>
59where
60    I: Source,
61{
62    data: Vec<I::Item>,
63    channels: ChannelCount,
64    rate: SampleRate,
65    next: Mutex<Arc<Span<I>>>,
66}
67
68impl<I> Drop for SpanData<I>
69where
70    I: Source,
71{
72    fn drop(&mut self) {
73        // This is necessary to prevent stack overflows deallocating long chains of the mutually
74        // recursive `Span` and `SpanData` types. This iteratively traverses as much of the
75        // chain as needs to be deallocated, and repeatedly "pops" the head off the list. This
76        // solves the problem, as when the time comes to actually deallocate the `SpanData`,
77        // the `next` field will contain a `Span::End`, or an `Arc` with additional references,
78        // so the depth of recursive drops will be bounded.
79        while let Ok(arc_next) = self.next.get_mut() {
80            if let Some(next_ref) = Arc::get_mut(arc_next) {
81                // This allows us to own the next Span.
82                let next = mem::replace(next_ref, Span::End);
83                if let Span::Data(next_data) = next {
84                    // Swap the current SpanData with the next one, allowing the current one
85                    // to go out of scope.
86                    *self = next_data;
87                } else {
88                    break;
89                }
90            } else {
91                break;
92            }
93        }
94    }
95}
96
97/// Builds a span from the input iterator.
98fn extract<I>(mut input: I) -> Arc<Span<I>>
99where
100    I: Source,
101{
102    let span_len = input.current_span_len();
103
104    if span_len == Some(0) {
105        return Arc::new(Span::End);
106    }
107
108    let channels = input.channels();
109    let rate = input.sample_rate();
110    let data: Vec<I::Item> = input
111        .by_ref()
112        .take(cmp::min(span_len.unwrap_or(32768), 32768))
113        .collect();
114
115    if data.is_empty() {
116        return Arc::new(Span::End);
117    }
118
119    Arc::new(Span::Data(SpanData {
120        data,
121        channels,
122        rate,
123        next: Mutex::new(Arc::new(Span::Input(Mutex::new(Some(input))))),
124    }))
125}
126
127impl<I> Buffered<I>
128where
129    I: Source,
130{
131    /// Advances to the next span.
132    fn next_span(&mut self) {
133        let next_span = {
134            let mut next_span_ptr = match &*self.current_span {
135                Span::Data(SpanData { next, .. }) => next.lock().unwrap(),
136                _ => unreachable!(),
137            };
138
139            let next_span = match &**next_span_ptr {
140                Span::Data(_) => next_span_ptr.clone(),
141                Span::End => next_span_ptr.clone(),
142                Span::Input(input) => {
143                    let input = input.lock().unwrap().take().unwrap();
144                    extract(input)
145                }
146            };
147
148            *next_span_ptr = next_span.clone();
149            next_span
150        };
151
152        self.current_span = next_span;
153        self.position_in_span = 0;
154    }
155}
156
157impl<I> Iterator for Buffered<I>
158where
159    I: Source,
160{
161    type Item = I::Item;
162
163    #[inline]
164    fn next(&mut self) -> Option<I::Item> {
165        let current_sample;
166        let advance_span;
167
168        match &*self.current_span {
169            Span::Data(SpanData { data, .. }) => {
170                current_sample = Some(data[self.position_in_span]);
171                self.position_in_span += 1;
172                advance_span = self.position_in_span >= data.len();
173            }
174
175            Span::End => {
176                current_sample = None;
177                advance_span = false;
178            }
179
180            Span::Input(_) => unreachable!(),
181        };
182
183        if advance_span {
184            self.next_span();
185        }
186
187        current_sample
188    }
189
190    #[inline]
191    fn size_hint(&self) -> (usize, Option<usize>) {
192        // TODO:
193        (0, None)
194    }
195}
196
197// TODO: uncomment when `size_hint` is fixed
198/*impl<I> ExactSizeIterator for Amplify<I> where I: Source + ExactSizeIterator, I::Item: Sample {
199}*/
200
201impl<I> Source for Buffered<I>
202where
203    I: Source,
204{
205    #[inline]
206    fn current_span_len(&self) -> Option<usize> {
207        match &*self.current_span {
208            Span::Data(SpanData { data, .. }) => Some(data.len() - self.position_in_span),
209            Span::End => Some(0),
210            Span::Input(_) => unreachable!(),
211        }
212    }
213
214    #[inline]
215    fn channels(&self) -> ChannelCount {
216        match *self.current_span {
217            Span::Data(SpanData { channels, .. }) => channels,
218            Span::End => nz!(1),
219            Span::Input(_) => unreachable!(),
220        }
221    }
222
223    #[inline]
224    fn sample_rate(&self) -> SampleRate {
225        match *self.current_span {
226            Span::Data(SpanData { rate, .. }) => rate,
227            Span::End => nz!(44100),
228            Span::Input(_) => unreachable!(),
229        }
230    }
231
232    #[inline]
233    fn total_duration(&self) -> Option<Duration> {
234        self.total_duration
235    }
236
237    /// Can not support seek, in the end state we lose the underlying source
238    /// which makes seeking back impossible.
239    #[inline]
240    fn try_seek(&mut self, _: Duration) -> Result<(), SeekError> {
241        Err(SeekError::NotSupported {
242            underlying_source: std::any::type_name::<Self>(),
243        })
244    }
245}
246
247impl<I> Clone for Buffered<I>
248where
249    I: Source,
250{
251    #[inline]
252    fn clone(&self) -> Buffered<I> {
253        Buffered {
254            current_span: self.current_span.clone(),
255            position_in_span: self.position_in_span,
256            total_duration: self.total_duration,
257        }
258    }
259}