1use std::cmp;
2use std::mem;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use super::SeekError;
7use crate::common::{ChannelCount, SampleRate};
8use crate::math::nz;
9use crate::Source;
10
11#[inline]
13pub fn buffered<I>(input: I) -> Buffered<I>
14where
15 I: Source,
16{
17 let total_duration = input.total_duration();
18 let first_span = extract(input);
19
20 Buffered {
21 current_span: first_span,
22 position_in_span: 0,
23 total_duration,
24 }
25}
26
27pub struct Buffered<I>
29where
30 I: Source,
31{
32 current_span: Arc<Span<I>>,
34
35 position_in_span: usize,
37
38 total_duration: Option<Duration>,
40}
41
42enum Span<I>
43where
44 I: Source,
45{
46 Data(SpanData<I>),
49
50 End,
52
53 Input(Mutex<Option<I>>),
56}
57
58struct SpanData<I>
59where
60 I: Source,
61{
62 data: Vec<I::Item>,
63 channels: ChannelCount,
64 rate: SampleRate,
65 next: Mutex<Arc<Span<I>>>,
66}
67
68impl<I> Drop for SpanData<I>
69where
70 I: Source,
71{
72 fn drop(&mut self) {
73 while let Ok(arc_next) = self.next.get_mut() {
80 if let Some(next_ref) = Arc::get_mut(arc_next) {
81 let next = mem::replace(next_ref, Span::End);
83 if let Span::Data(next_data) = next {
84 *self = next_data;
87 } else {
88 break;
89 }
90 } else {
91 break;
92 }
93 }
94 }
95}
96
97fn extract<I>(mut input: I) -> Arc<Span<I>>
99where
100 I: Source,
101{
102 let span_len = input.current_span_len();
103
104 if span_len == Some(0) {
105 return Arc::new(Span::End);
106 }
107
108 let channels = input.channels();
109 let rate = input.sample_rate();
110 let data: Vec<I::Item> = input
111 .by_ref()
112 .take(cmp::min(span_len.unwrap_or(32768), 32768))
113 .collect();
114
115 if data.is_empty() {
116 return Arc::new(Span::End);
117 }
118
119 Arc::new(Span::Data(SpanData {
120 data,
121 channels,
122 rate,
123 next: Mutex::new(Arc::new(Span::Input(Mutex::new(Some(input))))),
124 }))
125}
126
127impl<I> Buffered<I>
128where
129 I: Source,
130{
131 fn next_span(&mut self) {
133 let next_span = {
134 let mut next_span_ptr = match &*self.current_span {
135 Span::Data(SpanData { next, .. }) => next.lock().unwrap(),
136 _ => unreachable!(),
137 };
138
139 let next_span = match &**next_span_ptr {
140 Span::Data(_) => next_span_ptr.clone(),
141 Span::End => next_span_ptr.clone(),
142 Span::Input(input) => {
143 let input = input.lock().unwrap().take().unwrap();
144 extract(input)
145 }
146 };
147
148 *next_span_ptr = next_span.clone();
149 next_span
150 };
151
152 self.current_span = next_span;
153 self.position_in_span = 0;
154 }
155}
156
157impl<I> Iterator for Buffered<I>
158where
159 I: Source,
160{
161 type Item = I::Item;
162
163 #[inline]
164 fn next(&mut self) -> Option<I::Item> {
165 let current_sample;
166 let advance_span;
167
168 match &*self.current_span {
169 Span::Data(SpanData { data, .. }) => {
170 current_sample = Some(data[self.position_in_span]);
171 self.position_in_span += 1;
172 advance_span = self.position_in_span >= data.len();
173 }
174
175 Span::End => {
176 current_sample = None;
177 advance_span = false;
178 }
179
180 Span::Input(_) => unreachable!(),
181 };
182
183 if advance_span {
184 self.next_span();
185 }
186
187 current_sample
188 }
189
190 #[inline]
191 fn size_hint(&self) -> (usize, Option<usize>) {
192 (0, None)
194 }
195}
196
197impl<I> Source for Buffered<I>
202where
203 I: Source,
204{
205 #[inline]
206 fn current_span_len(&self) -> Option<usize> {
207 match &*self.current_span {
208 Span::Data(SpanData { data, .. }) => Some(data.len() - self.position_in_span),
209 Span::End => Some(0),
210 Span::Input(_) => unreachable!(),
211 }
212 }
213
214 #[inline]
215 fn channels(&self) -> ChannelCount {
216 match *self.current_span {
217 Span::Data(SpanData { channels, .. }) => channels,
218 Span::End => nz!(1),
219 Span::Input(_) => unreachable!(),
220 }
221 }
222
223 #[inline]
224 fn sample_rate(&self) -> SampleRate {
225 match *self.current_span {
226 Span::Data(SpanData { rate, .. }) => rate,
227 Span::End => nz!(44100),
228 Span::Input(_) => unreachable!(),
229 }
230 }
231
232 #[inline]
233 fn total_duration(&self) -> Option<Duration> {
234 self.total_duration
235 }
236
237 #[inline]
240 fn try_seek(&mut self, _: Duration) -> Result<(), SeekError> {
241 Err(SeekError::NotSupported {
242 underlying_source: std::any::type_name::<Self>(),
243 })
244 }
245}
246
247impl<I> Clone for Buffered<I>
248where
249 I: Source,
250{
251 #[inline]
252 fn clone(&self) -> Buffered<I> {
253 Buffered {
254 current_span: self.current_span.clone(),
255 position_in_span: self.position_in_span,
256 total_duration: self.total_duration,
257 }
258 }
259}