1use std::cmp;
2use std::mem;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use super::SeekError;
7use crate::common::{ChannelCount, SampleRate};
8use crate::Source;
9
10#[inline]
12pub fn buffered<I>(input: I) -> Buffered<I>
13where
14 I: Source,
15{
16 let total_duration = input.total_duration();
17 let first_span = extract(input);
18
19 Buffered {
20 current_span: first_span,
21 position_in_span: 0,
22 total_duration,
23 }
24}
25
26pub struct Buffered<I>
28where
29 I: Source,
30{
31 current_span: Arc<Span<I>>,
33
34 position_in_span: usize,
36
37 total_duration: Option<Duration>,
39}
40
41enum Span<I>
42where
43 I: Source,
44{
45 Data(SpanData<I>),
48
49 End,
51
52 Input(Mutex<Option<I>>),
55}
56
57struct SpanData<I>
58where
59 I: Source,
60{
61 data: Vec<I::Item>,
62 channels: ChannelCount,
63 rate: SampleRate,
64 next: Mutex<Arc<Span<I>>>,
65}
66
67impl<I> Drop for SpanData<I>
68where
69 I: Source,
70{
71 fn drop(&mut self) {
72 while let Ok(arc_next) = self.next.get_mut() {
79 if let Some(next_ref) = Arc::get_mut(arc_next) {
80 let next = mem::replace(next_ref, Span::End);
82 if let Span::Data(next_data) = next {
83 *self = next_data;
86 } else {
87 break;
88 }
89 } else {
90 break;
91 }
92 }
93 }
94}
95
96fn extract<I>(mut input: I) -> Arc<Span<I>>
98where
99 I: Source,
100{
101 let span_len = input.current_span_len();
102
103 if span_len == Some(0) {
104 return Arc::new(Span::End);
105 }
106
107 let channels = input.channels();
108 let rate = input.sample_rate();
109 let data: Vec<I::Item> = input
110 .by_ref()
111 .take(cmp::min(span_len.unwrap_or(32768), 32768))
112 .collect();
113
114 if data.is_empty() {
115 return Arc::new(Span::End);
116 }
117
118 Arc::new(Span::Data(SpanData {
119 data,
120 channels,
121 rate,
122 next: Mutex::new(Arc::new(Span::Input(Mutex::new(Some(input))))),
123 }))
124}
125
126impl<I> Buffered<I>
127where
128 I: Source,
129{
130 fn next_span(&mut self) {
132 let next_span = {
133 let mut next_span_ptr = match &*self.current_span {
134 Span::Data(SpanData { next, .. }) => next.lock().unwrap(),
135 _ => unreachable!(),
136 };
137
138 let next_span = match &**next_span_ptr {
139 Span::Data(_) => next_span_ptr.clone(),
140 Span::End => next_span_ptr.clone(),
141 Span::Input(input) => {
142 let input = input.lock().unwrap().take().unwrap();
143 extract(input)
144 }
145 };
146
147 *next_span_ptr = next_span.clone();
148 next_span
149 };
150
151 self.current_span = next_span;
152 self.position_in_span = 0;
153 }
154}
155
156impl<I> Iterator for Buffered<I>
157where
158 I: Source,
159{
160 type Item = I::Item;
161
162 #[inline]
163 fn next(&mut self) -> Option<I::Item> {
164 let current_sample;
165 let advance_span;
166
167 match &*self.current_span {
168 Span::Data(SpanData { data, .. }) => {
169 current_sample = Some(data[self.position_in_span]);
170 self.position_in_span += 1;
171 advance_span = self.position_in_span >= data.len();
172 }
173
174 Span::End => {
175 current_sample = None;
176 advance_span = false;
177 }
178
179 Span::Input(_) => unreachable!(),
180 };
181
182 if advance_span {
183 self.next_span();
184 }
185
186 current_sample
187 }
188
189 #[inline]
190 fn size_hint(&self) -> (usize, Option<usize>) {
191 (0, None)
193 }
194}
195
196impl<I> Source for Buffered<I>
201where
202 I: Source,
203{
204 #[inline]
205 fn current_span_len(&self) -> Option<usize> {
206 match &*self.current_span {
207 Span::Data(SpanData { data, .. }) => Some(data.len() - self.position_in_span),
208 Span::End => Some(0),
209 Span::Input(_) => unreachable!(),
210 }
211 }
212
213 #[inline]
214 fn channels(&self) -> ChannelCount {
215 match *self.current_span {
216 Span::Data(SpanData { channels, .. }) => channels,
217 Span::End => 1,
218 Span::Input(_) => unreachable!(),
219 }
220 }
221
222 #[inline]
223 fn sample_rate(&self) -> SampleRate {
224 match *self.current_span {
225 Span::Data(SpanData { rate, .. }) => rate,
226 Span::End => 44100,
227 Span::Input(_) => unreachable!(),
228 }
229 }
230
231 #[inline]
232 fn total_duration(&self) -> Option<Duration> {
233 self.total_duration
234 }
235
236 #[inline]
239 fn try_seek(&mut self, _: Duration) -> Result<(), SeekError> {
240 Err(SeekError::NotSupported {
241 underlying_source: std::any::type_name::<Self>(),
242 })
243 }
244}
245
246impl<I> Clone for Buffered<I>
247where
248 I: Source,
249{
250 #[inline]
251 fn clone(&self) -> Buffered<I> {
252 Buffered {
253 current_span: self.current_span.clone(),
254 position_in_span: self.position_in_span,
255 total_duration: self.total_duration,
256 }
257 }
258}