rodio/source/
buffered.rs

1use std::cmp;
2use std::mem;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use super::SeekError;
7use crate::common::{ChannelCount, SampleRate};
8use crate::Source;
9
10/// Internal function that builds a `Buffered` object.
11#[inline]
12pub fn buffered<I>(input: I) -> Buffered<I>
13where
14    I: Source,
15{
16    let total_duration = input.total_duration();
17    let first_span = extract(input);
18
19    Buffered {
20        current_span: first_span,
21        position_in_span: 0,
22        total_duration,
23    }
24}
25
26/// Iterator that at the same time extracts data from the iterator and stores it in a buffer.
27pub struct Buffered<I>
28where
29    I: Source,
30{
31    /// Immutable reference to the next span of data. Cannot be `Span::Input`.
32    current_span: Arc<Span<I>>,
33
34    /// The position in number of samples of this iterator inside `current_span`.
35    position_in_span: usize,
36
37    /// Obtained once at creation and never modified again.
38    total_duration: Option<Duration>,
39}
40
41enum Span<I>
42where
43    I: Source,
44{
45    /// Data that has already been extracted from the iterator. Also contains a pointer to the
46    /// next span.
47    Data(SpanData<I>),
48
49    /// No more data.
50    End,
51
52    /// Unextracted data. The `Option` should never be `None` and is only here for easier data
53    /// processing.
54    Input(Mutex<Option<I>>),
55}
56
57struct SpanData<I>
58where
59    I: Source,
60{
61    data: Vec<I::Item>,
62    channels: ChannelCount,
63    rate: SampleRate,
64    next: Mutex<Arc<Span<I>>>,
65}
66
67impl<I> Drop for SpanData<I>
68where
69    I: Source,
70{
71    fn drop(&mut self) {
72        // This is necessary to prevent stack overflows deallocating long chains of the mutually
73        // recursive `Span` and `SpanData` types. This iteratively traverses as much of the
74        // chain as needs to be deallocated, and repeatedly "pops" the head off the list. This
75        // solves the problem, as when the time comes to actually deallocate the `SpanData`,
76        // the `next` field will contain a `Span::End`, or an `Arc` with additional references,
77        // so the depth of recursive drops will be bounded.
78        while let Ok(arc_next) = self.next.get_mut() {
79            if let Some(next_ref) = Arc::get_mut(arc_next) {
80                // This allows us to own the next Span.
81                let next = mem::replace(next_ref, Span::End);
82                if let Span::Data(next_data) = next {
83                    // Swap the current SpanData with the next one, allowing the current one
84                    // to go out of scope.
85                    *self = next_data;
86                } else {
87                    break;
88                }
89            } else {
90                break;
91            }
92        }
93    }
94}
95
96/// Builds a span from the input iterator.
97fn extract<I>(mut input: I) -> Arc<Span<I>>
98where
99    I: Source,
100{
101    let span_len = input.current_span_len();
102
103    if span_len == Some(0) {
104        return Arc::new(Span::End);
105    }
106
107    let channels = input.channels();
108    let rate = input.sample_rate();
109    let data: Vec<I::Item> = input
110        .by_ref()
111        .take(cmp::min(span_len.unwrap_or(32768), 32768))
112        .collect();
113
114    if data.is_empty() {
115        return Arc::new(Span::End);
116    }
117
118    Arc::new(Span::Data(SpanData {
119        data,
120        channels,
121        rate,
122        next: Mutex::new(Arc::new(Span::Input(Mutex::new(Some(input))))),
123    }))
124}
125
126impl<I> Buffered<I>
127where
128    I: Source,
129{
130    /// Advances to the next span.
131    fn next_span(&mut self) {
132        let next_span = {
133            let mut next_span_ptr = match &*self.current_span {
134                Span::Data(SpanData { next, .. }) => next.lock().unwrap(),
135                _ => unreachable!(),
136            };
137
138            let next_span = match &**next_span_ptr {
139                Span::Data(_) => next_span_ptr.clone(),
140                Span::End => next_span_ptr.clone(),
141                Span::Input(input) => {
142                    let input = input.lock().unwrap().take().unwrap();
143                    extract(input)
144                }
145            };
146
147            *next_span_ptr = next_span.clone();
148            next_span
149        };
150
151        self.current_span = next_span;
152        self.position_in_span = 0;
153    }
154}
155
156impl<I> Iterator for Buffered<I>
157where
158    I: Source,
159{
160    type Item = I::Item;
161
162    #[inline]
163    fn next(&mut self) -> Option<I::Item> {
164        let current_sample;
165        let advance_span;
166
167        match &*self.current_span {
168            Span::Data(SpanData { data, .. }) => {
169                current_sample = Some(data[self.position_in_span]);
170                self.position_in_span += 1;
171                advance_span = self.position_in_span >= data.len();
172            }
173
174            Span::End => {
175                current_sample = None;
176                advance_span = false;
177            }
178
179            Span::Input(_) => unreachable!(),
180        };
181
182        if advance_span {
183            self.next_span();
184        }
185
186        current_sample
187    }
188
189    #[inline]
190    fn size_hint(&self) -> (usize, Option<usize>) {
191        // TODO:
192        (0, None)
193    }
194}
195
196// TODO: uncomment when `size_hint` is fixed
197/*impl<I> ExactSizeIterator for Amplify<I> where I: Source + ExactSizeIterator, I::Item: Sample {
198}*/
199
200impl<I> Source for Buffered<I>
201where
202    I: Source,
203{
204    #[inline]
205    fn current_span_len(&self) -> Option<usize> {
206        match &*self.current_span {
207            Span::Data(SpanData { data, .. }) => Some(data.len() - self.position_in_span),
208            Span::End => Some(0),
209            Span::Input(_) => unreachable!(),
210        }
211    }
212
213    #[inline]
214    fn channels(&self) -> ChannelCount {
215        match *self.current_span {
216            Span::Data(SpanData { channels, .. }) => channels,
217            Span::End => 1,
218            Span::Input(_) => unreachable!(),
219        }
220    }
221
222    #[inline]
223    fn sample_rate(&self) -> SampleRate {
224        match *self.current_span {
225            Span::Data(SpanData { rate, .. }) => rate,
226            Span::End => 44100,
227            Span::Input(_) => unreachable!(),
228        }
229    }
230
231    #[inline]
232    fn total_duration(&self) -> Option<Duration> {
233        self.total_duration
234    }
235
236    /// Can not support seek, in the end state we lose the underlying source
237    /// which makes seeking back impossible.
238    #[inline]
239    fn try_seek(&mut self, _: Duration) -> Result<(), SeekError> {
240        Err(SeekError::NotSupported {
241            underlying_source: std::any::type_name::<Self>(),
242        })
243    }
244}
245
246impl<I> Clone for Buffered<I>
247where
248    I: Source,
249{
250    #[inline]
251    fn clone(&self) -> Buffered<I> {
252        Buffered {
253            current_span: self.current_span.clone(),
254            position_in_span: self.position_in_span,
255            total_duration: self.total_duration,
256        }
257    }
258}