cpal/host/alsa/
mod.rs

1extern crate alsa;
2extern crate libc;
3
4use self::alsa::poll::Descriptors;
5use crate::traits::{DeviceTrait, HostTrait, StreamTrait};
6use crate::{
7    BackendSpecificError, BufferSize, BuildStreamError, ChannelCount, Data,
8    DefaultStreamConfigError, DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo,
9    PauseStreamError, PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError,
10    SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange,
11    SupportedStreamConfigsError,
12};
13use std::cell::Cell;
14use std::cmp;
15use std::convert::TryInto;
16use std::sync::{Arc, Mutex};
17use std::thread::{self, JoinHandle};
18use std::time::Duration;
19use std::vec::IntoIter as VecIntoIter;
20
21pub use self::enumerate::{default_input_device, default_output_device, Devices};
22
23pub type SupportedInputConfigs = VecIntoIter<SupportedStreamConfigRange>;
24pub type SupportedOutputConfigs = VecIntoIter<SupportedStreamConfigRange>;
25
26mod enumerate;
27
28/// The default linux, dragonfly, freebsd and netbsd host type.
29#[derive(Debug)]
30pub struct Host;
31
32impl Host {
33    pub fn new() -> Result<Self, crate::HostUnavailable> {
34        Ok(Host)
35    }
36}
37
38impl HostTrait for Host {
39    type Devices = Devices;
40    type Device = Device;
41
42    fn is_available() -> bool {
43        // Assume ALSA is always available on linux/dragonfly/freebsd/netbsd.
44        true
45    }
46
47    fn devices(&self) -> Result<Self::Devices, DevicesError> {
48        Devices::new()
49    }
50
51    fn default_input_device(&self) -> Option<Self::Device> {
52        default_input_device()
53    }
54
55    fn default_output_device(&self) -> Option<Self::Device> {
56        default_output_device()
57    }
58}
59
60impl DeviceTrait for Device {
61    type SupportedInputConfigs = SupportedInputConfigs;
62    type SupportedOutputConfigs = SupportedOutputConfigs;
63    type Stream = Stream;
64
65    fn name(&self) -> Result<String, DeviceNameError> {
66        Device::name(self)
67    }
68
69    fn supported_input_configs(
70        &self,
71    ) -> Result<Self::SupportedInputConfigs, SupportedStreamConfigsError> {
72        Device::supported_input_configs(self)
73    }
74
75    fn supported_output_configs(
76        &self,
77    ) -> Result<Self::SupportedOutputConfigs, SupportedStreamConfigsError> {
78        Device::supported_output_configs(self)
79    }
80
81    fn default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
82        Device::default_input_config(self)
83    }
84
85    fn default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
86        Device::default_output_config(self)
87    }
88
89    fn build_input_stream_raw<D, E>(
90        &self,
91        conf: &StreamConfig,
92        sample_format: SampleFormat,
93        data_callback: D,
94        error_callback: E,
95        timeout: Option<Duration>,
96    ) -> Result<Self::Stream, BuildStreamError>
97    where
98        D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
99        E: FnMut(StreamError) + Send + 'static,
100    {
101        let stream_inner =
102            self.build_stream_inner(conf, sample_format, alsa::Direction::Capture)?;
103        let stream = Stream::new_input(
104            Arc::new(stream_inner),
105            data_callback,
106            error_callback,
107            timeout,
108        );
109        Ok(stream)
110    }
111
112    fn build_output_stream_raw<D, E>(
113        &self,
114        conf: &StreamConfig,
115        sample_format: SampleFormat,
116        data_callback: D,
117        error_callback: E,
118        timeout: Option<Duration>,
119    ) -> Result<Self::Stream, BuildStreamError>
120    where
121        D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
122        E: FnMut(StreamError) + Send + 'static,
123    {
124        let stream_inner =
125            self.build_stream_inner(conf, sample_format, alsa::Direction::Playback)?;
126        let stream = Stream::new_output(
127            Arc::new(stream_inner),
128            data_callback,
129            error_callback,
130            timeout,
131        );
132        Ok(stream)
133    }
134}
135
136struct TriggerSender(libc::c_int);
137
138struct TriggerReceiver(libc::c_int);
139
140impl TriggerSender {
141    fn wakeup(&self) {
142        let buf = 1u64;
143        let ret = unsafe { libc::write(self.0, &buf as *const u64 as *const _, 8) };
144        assert_eq!(ret, 8);
145    }
146}
147
148impl TriggerReceiver {
149    fn clear_pipe(&self) {
150        let mut out = 0u64;
151        let ret = unsafe { libc::read(self.0, &mut out as *mut u64 as *mut _, 8) };
152        assert_eq!(ret, 8);
153    }
154}
155
156fn trigger() -> (TriggerSender, TriggerReceiver) {
157    let mut fds = [0, 0];
158    match unsafe { libc::pipe(fds.as_mut_ptr()) } {
159        0 => (TriggerSender(fds[1]), TriggerReceiver(fds[0])),
160        _ => panic!("Could not create pipe"),
161    }
162}
163
164impl Drop for TriggerSender {
165    fn drop(&mut self) {
166        unsafe {
167            libc::close(self.0);
168        }
169    }
170}
171
172impl Drop for TriggerReceiver {
173    fn drop(&mut self) {
174        unsafe {
175            libc::close(self.0);
176        }
177    }
178}
179
180#[derive(Default)]
181struct DeviceHandles {
182    playback: Option<alsa::PCM>,
183    capture: Option<alsa::PCM>,
184}
185
186impl DeviceHandles {
187    /// Create `DeviceHandles` for `name` and try to open a handle for both
188    /// directions. Returns `Ok` if either direction is opened successfully.
189    fn open(pcm_id: &str) -> Result<Self, alsa::Error> {
190        let mut handles = Self::default();
191        let playback_err = handles.try_open(pcm_id, alsa::Direction::Playback).err();
192        let capture_err = handles.try_open(pcm_id, alsa::Direction::Capture).err();
193        if let Some(err) = capture_err.and(playback_err) {
194            Err(err)
195        } else {
196            Ok(handles)
197        }
198    }
199
200    /// Get a mutable reference to the `Option` for a specific `stream_type`.
201    /// If the `Option` is `None`, the `alsa::PCM` will be opened and placed in
202    /// the `Option` before returning. If `handle_mut()` returns `Ok` the contained
203    /// `Option` is guaranteed to be `Some(..)`.
204    fn try_open(
205        &mut self,
206        pcm_id: &str,
207        stream_type: alsa::Direction,
208    ) -> Result<&mut Option<alsa::PCM>, alsa::Error> {
209        let handle = match stream_type {
210            alsa::Direction::Playback => &mut self.playback,
211            alsa::Direction::Capture => &mut self.capture,
212        };
213
214        if handle.is_none() {
215            *handle = Some(alsa::pcm::PCM::new(pcm_id, stream_type, true)?);
216        }
217
218        Ok(handle)
219    }
220
221    /// Get a mutable reference to the `alsa::PCM` handle for a specific `stream_type`.
222    /// If the handle is not yet opened, it will be opened and stored in `self`.
223    fn get_mut(
224        &mut self,
225        pcm_id: &str,
226        stream_type: alsa::Direction,
227    ) -> Result<&mut alsa::PCM, alsa::Error> {
228        Ok(self.try_open(pcm_id, stream_type)?.as_mut().unwrap())
229    }
230
231    /// Take ownership of the `alsa::PCM` handle for a specific `stream_type`.
232    /// If the handle is not yet opened, it will be opened and returned.
233    fn take(&mut self, name: &str, stream_type: alsa::Direction) -> Result<alsa::PCM, alsa::Error> {
234        Ok(self.try_open(name, stream_type)?.take().unwrap())
235    }
236}
237
238#[derive(Clone)]
239pub struct Device {
240    name: String,
241    pcm_id: String,
242    handles: Arc<Mutex<DeviceHandles>>,
243}
244
245impl Device {
246    fn build_stream_inner(
247        &self,
248        conf: &StreamConfig,
249        sample_format: SampleFormat,
250        stream_type: alsa::Direction,
251    ) -> Result<StreamInner, BuildStreamError> {
252        let handle_result = self
253            .handles
254            .lock()
255            .unwrap()
256            .take(&self.pcm_id, stream_type)
257            .map_err(|e| (e, e.errno()));
258
259        let handle = match handle_result {
260            Err((_, libc::EBUSY)) => return Err(BuildStreamError::DeviceNotAvailable),
261            Err((_, libc::EINVAL)) => return Err(BuildStreamError::InvalidArgument),
262            Err((e, _)) => return Err(e.into()),
263            Ok(handle) => handle,
264        };
265        let can_pause = set_hw_params_from_format(&handle, conf, sample_format)?;
266        let period_len = set_sw_params_from_format(&handle, conf, stream_type)?;
267
268        handle.prepare()?;
269
270        let num_descriptors = handle.count();
271        if num_descriptors == 0 {
272            let description = "poll descriptor count for stream was 0".to_string();
273            let err = BackendSpecificError { description };
274            return Err(err.into());
275        }
276
277        // Check to see if we can retrieve valid timestamps from the device.
278        // Related: https://bugs.freedesktop.org/show_bug.cgi?id=88503
279        let ts = handle.status()?.get_htstamp();
280        let creation_instant = match (ts.tv_sec, ts.tv_nsec) {
281            (0, 0) => Some(std::time::Instant::now()),
282            _ => None,
283        };
284
285        if let alsa::Direction::Capture = stream_type {
286            handle.start()?;
287        }
288
289        let stream_inner = StreamInner {
290            dropping: Cell::new(false),
291            channel: handle,
292            sample_format,
293            num_descriptors,
294            conf: conf.clone(),
295            period_len,
296            can_pause,
297            creation_instant,
298        };
299
300        Ok(stream_inner)
301    }
302
303    #[inline]
304    fn name(&self) -> Result<String, DeviceNameError> {
305        Ok(self.name.clone())
306    }
307
308    fn supported_configs(
309        &self,
310        stream_t: alsa::Direction,
311    ) -> Result<VecIntoIter<SupportedStreamConfigRange>, SupportedStreamConfigsError> {
312        let mut guard = self.handles.lock().unwrap();
313        let handle_result = guard
314            .get_mut(&self.pcm_id, stream_t)
315            .map_err(|e| (e, e.errno()));
316
317        let handle = match handle_result {
318            Err((_, libc::ENOENT)) | Err((_, libc::EBUSY)) => {
319                return Err(SupportedStreamConfigsError::DeviceNotAvailable)
320            }
321            Err((_, libc::EINVAL)) => return Err(SupportedStreamConfigsError::InvalidArgument),
322            Err((e, _)) => return Err(e.into()),
323            Ok(handle) => handle,
324        };
325
326        let hw_params = alsa::pcm::HwParams::any(handle)?;
327
328        // TODO: check endianness
329        const FORMATS: [(SampleFormat, alsa::pcm::Format); 8] = [
330            (SampleFormat::I8, alsa::pcm::Format::S8),
331            (SampleFormat::U8, alsa::pcm::Format::U8),
332            (SampleFormat::I16, alsa::pcm::Format::S16LE),
333            //SND_PCM_FORMAT_S16_BE,
334            (SampleFormat::U16, alsa::pcm::Format::U16LE),
335            //SND_PCM_FORMAT_U16_BE,
336            //SND_PCM_FORMAT_S24_LE,
337            //SND_PCM_FORMAT_S24_BE,
338            //SND_PCM_FORMAT_U24_LE,
339            //SND_PCM_FORMAT_U24_BE,
340            (SampleFormat::I32, alsa::pcm::Format::S32LE),
341            //SND_PCM_FORMAT_S32_BE,
342            (SampleFormat::U32, alsa::pcm::Format::U32LE),
343            //SND_PCM_FORMAT_U32_BE,
344            (SampleFormat::F32, alsa::pcm::Format::FloatLE),
345            //SND_PCM_FORMAT_FLOAT_BE,
346            (SampleFormat::F64, alsa::pcm::Format::Float64LE),
347            //SND_PCM_FORMAT_FLOAT64_BE,
348            //SND_PCM_FORMAT_IEC958_SUBFRAME_LE,
349            //SND_PCM_FORMAT_IEC958_SUBFRAME_BE,
350            //SND_PCM_FORMAT_MU_LAW,
351            //SND_PCM_FORMAT_A_LAW,
352            //SND_PCM_FORMAT_IMA_ADPCM,
353            //SND_PCM_FORMAT_MPEG,
354            //SND_PCM_FORMAT_GSM,
355            //SND_PCM_FORMAT_SPECIAL,
356            //SND_PCM_FORMAT_S24_3LE,
357            //SND_PCM_FORMAT_S24_3BE,
358            //SND_PCM_FORMAT_U24_3LE,
359            //SND_PCM_FORMAT_U24_3BE,
360            //SND_PCM_FORMAT_S20_3LE,
361            //SND_PCM_FORMAT_S20_3BE,
362            //SND_PCM_FORMAT_U20_3LE,
363            //SND_PCM_FORMAT_U20_3BE,
364            //SND_PCM_FORMAT_S18_3LE,
365            //SND_PCM_FORMAT_S18_3BE,
366            //SND_PCM_FORMAT_U18_3LE,
367            //SND_PCM_FORMAT_U18_3BE,
368        ];
369
370        let mut supported_formats = Vec::new();
371        for &(sample_format, alsa_format) in FORMATS.iter() {
372            if hw_params.test_format(alsa_format).is_ok() {
373                supported_formats.push(sample_format);
374            }
375        }
376
377        let min_rate = hw_params.get_rate_min()?;
378        let max_rate = hw_params.get_rate_max()?;
379
380        let sample_rates = if min_rate == max_rate || hw_params.test_rate(min_rate + 1).is_ok() {
381            vec![(min_rate, max_rate)]
382        } else {
383            const RATES: [libc::c_uint; 13] = [
384                5512, 8000, 11025, 16000, 22050, 32000, 44100, 48000, 64000, 88200, 96000, 176400,
385                192000,
386            ];
387
388            let mut rates = Vec::new();
389            for &rate in RATES.iter() {
390                if hw_params.test_rate(rate).is_ok() {
391                    rates.push((rate, rate));
392                }
393            }
394
395            if rates.is_empty() {
396                vec![(min_rate, max_rate)]
397            } else {
398                rates
399            }
400        };
401
402        let min_channels = hw_params.get_channels_min()?;
403        let max_channels = hw_params.get_channels_max()?;
404
405        let max_channels = cmp::min(max_channels, 32); // TODO: limiting to 32 channels or too much stuff is returned
406        let supported_channels = (min_channels..max_channels + 1)
407            .filter_map(|num| {
408                if hw_params.test_channels(num).is_ok() {
409                    Some(num as ChannelCount)
410                } else {
411                    None
412                }
413            })
414            .collect::<Vec<_>>();
415
416        let min_buffer_size = hw_params.get_buffer_size_min()?;
417        let max_buffer_size = hw_params.get_buffer_size_max()?;
418
419        let buffer_size_range = SupportedBufferSize::Range {
420            min: min_buffer_size as u32,
421            max: max_buffer_size as u32,
422        };
423
424        let mut output = Vec::with_capacity(
425            supported_formats.len() * supported_channels.len() * sample_rates.len(),
426        );
427        for &sample_format in supported_formats.iter() {
428            for &channels in supported_channels.iter() {
429                for &(min_rate, max_rate) in sample_rates.iter() {
430                    output.push(SupportedStreamConfigRange {
431                        channels,
432                        min_sample_rate: SampleRate(min_rate),
433                        max_sample_rate: SampleRate(max_rate),
434                        buffer_size: buffer_size_range,
435                        sample_format,
436                    });
437                }
438            }
439        }
440
441        Ok(output.into_iter())
442    }
443
444    fn supported_input_configs(
445        &self,
446    ) -> Result<SupportedInputConfigs, SupportedStreamConfigsError> {
447        self.supported_configs(alsa::Direction::Capture)
448    }
449
450    fn supported_output_configs(
451        &self,
452    ) -> Result<SupportedOutputConfigs, SupportedStreamConfigsError> {
453        self.supported_configs(alsa::Direction::Playback)
454    }
455
456    // ALSA does not offer default stream formats, so instead we compare all supported formats by
457    // the `SupportedStreamConfigRange::cmp_default_heuristics` order and select the greatest.
458    fn default_config(
459        &self,
460        stream_t: alsa::Direction,
461    ) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
462        let mut formats: Vec<_> = {
463            match self.supported_configs(stream_t) {
464                Err(SupportedStreamConfigsError::DeviceNotAvailable) => {
465                    return Err(DefaultStreamConfigError::DeviceNotAvailable);
466                }
467                Err(SupportedStreamConfigsError::InvalidArgument) => {
468                    // this happens sometimes when querying for input and output capabilities, but
469                    // the device supports only one
470                    return Err(DefaultStreamConfigError::StreamTypeNotSupported);
471                }
472                Err(SupportedStreamConfigsError::BackendSpecific { err }) => {
473                    return Err(err.into());
474                }
475                Ok(fmts) => fmts.collect(),
476            }
477        };
478
479        formats.sort_by(|a, b| a.cmp_default_heuristics(b));
480
481        match formats.into_iter().next_back() {
482            Some(f) => {
483                let min_r = f.min_sample_rate;
484                let max_r = f.max_sample_rate;
485                let mut format = f.with_max_sample_rate();
486                const HZ_44100: SampleRate = SampleRate(44_100);
487                if min_r <= HZ_44100 && HZ_44100 <= max_r {
488                    format.sample_rate = HZ_44100;
489                }
490                Ok(format)
491            }
492            None => Err(DefaultStreamConfigError::StreamTypeNotSupported),
493        }
494    }
495
496    fn default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
497        self.default_config(alsa::Direction::Capture)
498    }
499
500    fn default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
501        self.default_config(alsa::Direction::Playback)
502    }
503}
504
505struct StreamInner {
506    // Flag used to check when to stop polling, regardless of the state of the stream
507    // (e.g. broken due to a disconnected device).
508    dropping: Cell<bool>,
509
510    // The ALSA channel.
511    channel: alsa::pcm::PCM,
512
513    // When converting between file descriptors and `snd_pcm_t`, this is the number of
514    // file descriptors that this `snd_pcm_t` uses.
515    num_descriptors: usize,
516
517    // Format of the samples.
518    sample_format: SampleFormat,
519
520    // The configuration used to open this stream.
521    conf: StreamConfig,
522
523    // Minimum number of samples to put in the buffer.
524    period_len: usize,
525
526    #[allow(dead_code)]
527    // Whether or not the hardware supports pausing the stream.
528    // TODO: We need an API to expose this. See #197, #284.
529    can_pause: bool,
530
531    // In the case that the device does not return valid timestamps via `get_htstamp`, this field
532    // will be `Some` and will contain an `Instant` representing the moment the stream was created.
533    //
534    // If this field is `Some`, then the stream will use the duration since this instant as a
535    // source for timestamps.
536    //
537    // If this field is `None` then the elapsed duration between `get_trigger_htstamp` and
538    // `get_htstamp` is used.
539    creation_instant: Option<std::time::Instant>,
540}
541
542// Assume that the ALSA library is built with thread safe option.
543unsafe impl Sync for StreamInner {}
544
545#[derive(Debug, Eq, PartialEq)]
546enum StreamType {
547    Input,
548    Output,
549}
550
551pub struct Stream {
552    /// The high-priority audio processing thread calling callbacks.
553    /// Option used for moving out in destructor.
554    thread: Option<JoinHandle<()>>,
555
556    /// Handle to the underlying stream for playback controls.
557    inner: Arc<StreamInner>,
558
559    /// Used to signal to stop processing.
560    trigger: TriggerSender,
561}
562
563struct StreamWorkerContext {
564    descriptors: Vec<libc::pollfd>,
565    buffer: Vec<u8>,
566    poll_timeout: i32,
567}
568
569impl StreamWorkerContext {
570    fn new(poll_timeout: &Option<Duration>) -> Self {
571        let poll_timeout: i32 = if let Some(d) = poll_timeout {
572            d.as_millis().try_into().unwrap()
573        } else {
574            -1
575        };
576
577        Self {
578            descriptors: Vec::new(),
579            buffer: Vec::new(),
580            poll_timeout,
581        }
582    }
583}
584
585fn input_stream_worker(
586    rx: TriggerReceiver,
587    stream: &StreamInner,
588    data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
589    error_callback: &mut (dyn FnMut(StreamError) + Send + 'static),
590    timeout: Option<Duration>,
591) {
592    boost_current_thread_priority(stream.conf.buffer_size, stream.conf.sample_rate);
593
594    let mut ctxt = StreamWorkerContext::new(&timeout);
595    loop {
596        let flow =
597            poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt).unwrap_or_else(|err| {
598                error_callback(err.into());
599                PollDescriptorsFlow::Continue
600            });
601
602        match flow {
603            PollDescriptorsFlow::Continue => {
604                continue;
605            }
606            PollDescriptorsFlow::XRun => {
607                if let Err(err) = stream.channel.prepare() {
608                    error_callback(err.into());
609                }
610                continue;
611            }
612            PollDescriptorsFlow::Return => return,
613            PollDescriptorsFlow::Ready {
614                status,
615                avail_frames: _,
616                delay_frames,
617                stream_type,
618            } => {
619                assert_eq!(
620                    stream_type,
621                    StreamType::Input,
622                    "expected input stream, but polling descriptors indicated output",
623                );
624                if let Err(err) = process_input(
625                    stream,
626                    &mut ctxt.buffer,
627                    status,
628                    delay_frames,
629                    data_callback,
630                ) {
631                    error_callback(err.into());
632                }
633            }
634        }
635    }
636}
637
638fn output_stream_worker(
639    rx: TriggerReceiver,
640    stream: &StreamInner,
641    data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
642    error_callback: &mut (dyn FnMut(StreamError) + Send + 'static),
643    timeout: Option<Duration>,
644) {
645    boost_current_thread_priority(stream.conf.buffer_size, stream.conf.sample_rate);
646
647    let mut ctxt = StreamWorkerContext::new(&timeout);
648    loop {
649        let flow =
650            poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt).unwrap_or_else(|err| {
651                error_callback(err.into());
652                PollDescriptorsFlow::Continue
653            });
654
655        match flow {
656            PollDescriptorsFlow::Continue => continue,
657            PollDescriptorsFlow::XRun => {
658                if let Err(err) = stream.channel.prepare() {
659                    error_callback(err.into());
660                }
661                continue;
662            }
663            PollDescriptorsFlow::Return => return,
664            PollDescriptorsFlow::Ready {
665                status,
666                avail_frames,
667                delay_frames,
668                stream_type,
669            } => {
670                assert_eq!(
671                    stream_type,
672                    StreamType::Output,
673                    "expected output stream, but polling descriptors indicated input",
674                );
675                if let Err(err) = process_output(
676                    stream,
677                    &mut ctxt.buffer,
678                    status,
679                    avail_frames,
680                    delay_frames,
681                    data_callback,
682                    error_callback,
683                ) {
684                    error_callback(err.into());
685                }
686            }
687        }
688    }
689}
690
691#[cfg(feature = "audio_thread_priority")]
692fn boost_current_thread_priority(buffer_size: BufferSize, sample_rate: SampleRate) {
693    use audio_thread_priority::promote_current_thread_to_real_time;
694
695    let buffer_size = if let BufferSize::Fixed(buffer_size) = buffer_size {
696        buffer_size
697    } else {
698        // if the buffer size isn't fixed, let audio_thread_priority choose a sensible default value
699        0
700    };
701
702    if let Err(err) = promote_current_thread_to_real_time(buffer_size, sample_rate.0) {
703        eprintln!("Failed to promote audio thread to real-time priority: {err}");
704    }
705}
706
707#[cfg(not(feature = "audio_thread_priority"))]
708fn boost_current_thread_priority(_: BufferSize, _: SampleRate) {}
709
710enum PollDescriptorsFlow {
711    Continue,
712    Return,
713    Ready {
714        stream_type: StreamType,
715        status: alsa::pcm::Status,
716        avail_frames: usize,
717        delay_frames: usize,
718    },
719    XRun,
720}
721
722// This block is shared between both input and output stream worker functions.
723fn poll_descriptors_and_prepare_buffer(
724    rx: &TriggerReceiver,
725    stream: &StreamInner,
726    ctxt: &mut StreamWorkerContext,
727) -> Result<PollDescriptorsFlow, BackendSpecificError> {
728    if stream.dropping.get() {
729        // The stream has been requested to be destroyed.
730        rx.clear_pipe();
731        return Ok(PollDescriptorsFlow::Return);
732    }
733
734    let StreamWorkerContext {
735        ref mut descriptors,
736        ref mut buffer,
737        ref poll_timeout,
738    } = *ctxt;
739
740    descriptors.clear();
741
742    // Add the self-pipe for signaling termination.
743    descriptors.push(libc::pollfd {
744        fd: rx.0,
745        events: libc::POLLIN,
746        revents: 0,
747    });
748
749    // Add ALSA polling fds.
750    let len = descriptors.len();
751    descriptors.resize(
752        stream.num_descriptors + len,
753        libc::pollfd {
754            fd: 0,
755            events: 0,
756            revents: 0,
757        },
758    );
759    let filled = stream.channel.fill(&mut descriptors[len..])?;
760    debug_assert_eq!(filled, stream.num_descriptors);
761
762    // Don't timeout, wait forever.
763    let res = alsa::poll::poll(descriptors, *poll_timeout)?;
764    if res == 0 {
765        let description = String::from("`alsa::poll()` spuriously returned");
766        return Err(BackendSpecificError { description });
767    }
768
769    if descriptors[0].revents != 0 {
770        // The stream has been requested to be destroyed.
771        rx.clear_pipe();
772        return Ok(PollDescriptorsFlow::Return);
773    }
774
775    let revents = stream.channel.revents(&descriptors[1..])?;
776    if revents.contains(alsa::poll::Flags::ERR) {
777        let description = String::from("`alsa::poll()` returned POLLERR");
778        return Err(BackendSpecificError { description });
779    }
780    let stream_type = match revents {
781        alsa::poll::Flags::OUT => StreamType::Output,
782        alsa::poll::Flags::IN => StreamType::Input,
783        _ => {
784            // Nothing to process, poll again
785            return Ok(PollDescriptorsFlow::Continue);
786        }
787    };
788
789    let status = stream.channel.status()?;
790    let avail_frames = match stream.channel.avail() {
791        Err(err) if err.errno() == libc::EPIPE => return Ok(PollDescriptorsFlow::XRun),
792        res => res,
793    }? as usize;
794    let delay_frames = match status.get_delay() {
795        // Buffer underrun. TODO: Notify the user.
796        d if d < 0 => 0,
797        d => d as usize,
798    };
799    let available_samples = avail_frames * stream.conf.channels as usize;
800
801    // Only go on if there is at least `stream.period_len` samples.
802    if available_samples < stream.period_len {
803        return Ok(PollDescriptorsFlow::Continue);
804    }
805
806    // Prepare the data buffer.
807    let buffer_size = stream.sample_format.sample_size() * available_samples;
808    buffer.resize(buffer_size, 0u8);
809
810    Ok(PollDescriptorsFlow::Ready {
811        stream_type,
812        status,
813        avail_frames,
814        delay_frames,
815    })
816}
817
818// Read input data from ALSA and deliver it to the user.
819fn process_input(
820    stream: &StreamInner,
821    buffer: &mut [u8],
822    status: alsa::pcm::Status,
823    delay_frames: usize,
824    data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
825) -> Result<(), BackendSpecificError> {
826    stream.channel.io_bytes().readi(buffer)?;
827    let sample_format = stream.sample_format;
828    let data = buffer.as_mut_ptr() as *mut ();
829    let len = buffer.len() / sample_format.sample_size();
830    let data = unsafe { Data::from_parts(data, len, sample_format) };
831    let callback = stream_timestamp(&status, stream.creation_instant)?;
832    let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
833    let capture = callback
834        .sub(delay_duration)
835        .expect("`capture` is earlier than representation supported by `StreamInstant`");
836    let timestamp = crate::InputStreamTimestamp { callback, capture };
837    let info = crate::InputCallbackInfo { timestamp };
838    data_callback(&data, &info);
839
840    Ok(())
841}
842
843// Request data from the user's function and write it via ALSA.
844//
845// Returns `true`
846fn process_output(
847    stream: &StreamInner,
848    buffer: &mut [u8],
849    status: alsa::pcm::Status,
850    available_frames: usize,
851    delay_frames: usize,
852    data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
853    error_callback: &mut dyn FnMut(StreamError),
854) -> Result<(), BackendSpecificError> {
855    {
856        // We're now sure that we're ready to write data.
857        let sample_format = stream.sample_format;
858        let data = buffer.as_mut_ptr() as *mut ();
859        let len = buffer.len() / sample_format.sample_size();
860        let mut data = unsafe { Data::from_parts(data, len, sample_format) };
861        let callback = stream_timestamp(&status, stream.creation_instant)?;
862        let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
863        let playback = callback
864            .add(delay_duration)
865            .expect("`playback` occurs beyond representation supported by `StreamInstant`");
866        let timestamp = crate::OutputStreamTimestamp { callback, playback };
867        let info = crate::OutputCallbackInfo { timestamp };
868        data_callback(&mut data, &info);
869    }
870    loop {
871        match stream.channel.io_bytes().writei(buffer) {
872            Err(err) if err.errno() == libc::EPIPE => {
873                // buffer underrun
874                // TODO: Notify the user of this.
875                let _ = stream.channel.try_recover(err, false);
876            }
877            Err(err) => {
878                error_callback(err.into());
879                continue;
880            }
881            Ok(result) if result != available_frames => {
882                let description = format!(
883                    "unexpected number of frames written: expected {}, \
884                     result {} (this should never happen)",
885                    available_frames, result,
886                );
887                error_callback(BackendSpecificError { description }.into());
888                continue;
889            }
890            _ => {
891                break;
892            }
893        }
894    }
895    Ok(())
896}
897
898// Use the elapsed duration since the start of the stream.
899//
900// This ensures positive values that are compatible with our `StreamInstant` representation.
901fn stream_timestamp(
902    status: &alsa::pcm::Status,
903    creation_instant: Option<std::time::Instant>,
904) -> Result<crate::StreamInstant, BackendSpecificError> {
905    match creation_instant {
906        None => {
907            let trigger_ts = status.get_trigger_htstamp();
908            let ts = status.get_htstamp();
909            let nanos = timespec_diff_nanos(ts, trigger_ts);
910            if nanos < 0 {
911                let description = format!(
912                    "get_htstamp `{}.{}` was earlier than get_trigger_htstamp `{}.{}`",
913                    ts.tv_sec, ts.tv_nsec, trigger_ts.tv_sec, trigger_ts.tv_nsec
914                );
915                return Err(BackendSpecificError { description });
916            }
917            Ok(crate::StreamInstant::from_nanos(nanos))
918        }
919        Some(creation) => {
920            let now = std::time::Instant::now();
921            let duration = now.duration_since(creation);
922            crate::StreamInstant::from_nanos_i128(duration.as_nanos() as i128).ok_or(
923                BackendSpecificError {
924                    description: "stream duration has exceeded `StreamInstant` representation"
925                        .to_string(),
926                },
927            )
928        }
929    }
930}
931
932// Adapted from `timestamp2ns` here:
933// https://fossies.org/linux/alsa-lib/test/audio_time.c
934fn timespec_to_nanos(ts: libc::timespec) -> i64 {
935    ts.tv_sec as i64 * 1_000_000_000 + ts.tv_nsec as i64
936}
937
938// Adapted from `timediff` here:
939// https://fossies.org/linux/alsa-lib/test/audio_time.c
940fn timespec_diff_nanos(a: libc::timespec, b: libc::timespec) -> i64 {
941    timespec_to_nanos(a) - timespec_to_nanos(b)
942}
943
944// Convert the given duration in frames at the given sample rate to a `std::time::Duration`.
945fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration {
946    let secsf = frames as f64 / rate.0 as f64;
947    let secs = secsf as u64;
948    let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32;
949    std::time::Duration::new(secs, nanos)
950}
951
952impl Stream {
953    fn new_input<D, E>(
954        inner: Arc<StreamInner>,
955        mut data_callback: D,
956        mut error_callback: E,
957        timeout: Option<Duration>,
958    ) -> Stream
959    where
960        D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
961        E: FnMut(StreamError) + Send + 'static,
962    {
963        let (tx, rx) = trigger();
964        // Clone the handle for passing into worker thread.
965        let stream = inner.clone();
966        let thread = thread::Builder::new()
967            .name("cpal_alsa_in".to_owned())
968            .spawn(move || {
969                input_stream_worker(
970                    rx,
971                    &stream,
972                    &mut data_callback,
973                    &mut error_callback,
974                    timeout,
975                );
976            })
977            .unwrap();
978        Stream {
979            thread: Some(thread),
980            inner,
981            trigger: tx,
982        }
983    }
984
985    fn new_output<D, E>(
986        inner: Arc<StreamInner>,
987        mut data_callback: D,
988        mut error_callback: E,
989        timeout: Option<Duration>,
990    ) -> Stream
991    where
992        D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
993        E: FnMut(StreamError) + Send + 'static,
994    {
995        let (tx, rx) = trigger();
996        // Clone the handle for passing into worker thread.
997        let stream = inner.clone();
998        let thread = thread::Builder::new()
999            .name("cpal_alsa_out".to_owned())
1000            .spawn(move || {
1001                output_stream_worker(
1002                    rx,
1003                    &stream,
1004                    &mut data_callback,
1005                    &mut error_callback,
1006                    timeout,
1007                );
1008            })
1009            .unwrap();
1010        Stream {
1011            thread: Some(thread),
1012            inner,
1013            trigger: tx,
1014        }
1015    }
1016}
1017
1018impl Drop for Stream {
1019    fn drop(&mut self) {
1020        self.inner.dropping.set(true);
1021        self.trigger.wakeup();
1022        self.thread.take().unwrap().join().unwrap();
1023    }
1024}
1025
1026impl StreamTrait for Stream {
1027    fn play(&self) -> Result<(), PlayStreamError> {
1028        self.inner.channel.pause(false).ok();
1029        Ok(())
1030    }
1031    fn pause(&self) -> Result<(), PauseStreamError> {
1032        self.inner.channel.pause(true).ok();
1033        Ok(())
1034    }
1035}
1036
1037fn set_hw_params_from_format(
1038    pcm_handle: &alsa::pcm::PCM,
1039    config: &StreamConfig,
1040    sample_format: SampleFormat,
1041) -> Result<bool, BackendSpecificError> {
1042    let hw_params = alsa::pcm::HwParams::any(pcm_handle)?;
1043    hw_params.set_access(alsa::pcm::Access::RWInterleaved)?;
1044
1045    let sample_format = if cfg!(target_endian = "big") {
1046        match sample_format {
1047            SampleFormat::I8 => alsa::pcm::Format::S8,
1048            SampleFormat::I16 => alsa::pcm::Format::S16BE,
1049            // SampleFormat::I24 => alsa::pcm::Format::S24BE,
1050            SampleFormat::I32 => alsa::pcm::Format::S32BE,
1051            // SampleFormat::I48 => alsa::pcm::Format::S48BE,
1052            // SampleFormat::I64 => alsa::pcm::Format::S64BE,
1053            SampleFormat::U8 => alsa::pcm::Format::U8,
1054            SampleFormat::U16 => alsa::pcm::Format::U16BE,
1055            // SampleFormat::U24 => alsa::pcm::Format::U24BE,
1056            SampleFormat::U32 => alsa::pcm::Format::U32BE,
1057            // SampleFormat::U48 => alsa::pcm::Format::U48BE,
1058            // SampleFormat::U64 => alsa::pcm::Format::U64BE,
1059            SampleFormat::F32 => alsa::pcm::Format::FloatBE,
1060            SampleFormat::F64 => alsa::pcm::Format::Float64BE,
1061            sample_format => {
1062                return Err(BackendSpecificError {
1063                    description: format!(
1064                        "Sample format '{}' is not supported by this backend",
1065                        sample_format
1066                    ),
1067                })
1068            }
1069        }
1070    } else {
1071        match sample_format {
1072            SampleFormat::I8 => alsa::pcm::Format::S8,
1073            SampleFormat::I16 => alsa::pcm::Format::S16LE,
1074            // SampleFormat::I24 => alsa::pcm::Format::S24LE,
1075            SampleFormat::I32 => alsa::pcm::Format::S32LE,
1076            // SampleFormat::I48 => alsa::pcm::Format::S48LE,
1077            // SampleFormat::I64 => alsa::pcm::Format::S64LE,
1078            SampleFormat::U8 => alsa::pcm::Format::U8,
1079            SampleFormat::U16 => alsa::pcm::Format::U16LE,
1080            // SampleFormat::U24 => alsa::pcm::Format::U24LE,
1081            SampleFormat::U32 => alsa::pcm::Format::U32LE,
1082            // SampleFormat::U48 => alsa::pcm::Format::U48LE,
1083            // SampleFormat::U64 => alsa::pcm::Format::U64LE,
1084            SampleFormat::F32 => alsa::pcm::Format::FloatLE,
1085            SampleFormat::F64 => alsa::pcm::Format::Float64LE,
1086            sample_format => {
1087                return Err(BackendSpecificError {
1088                    description: format!(
1089                        "Sample format '{}' is not supported by this backend",
1090                        sample_format
1091                    ),
1092                })
1093            }
1094        }
1095    };
1096
1097    hw_params.set_format(sample_format)?;
1098    hw_params.set_rate(config.sample_rate.0, alsa::ValueOr::Nearest)?;
1099    hw_params.set_channels(config.channels as u32)?;
1100
1101    match config.buffer_size {
1102        BufferSize::Fixed(v) => {
1103            hw_params.set_period_size_near((v / 4) as alsa::pcm::Frames, alsa::ValueOr::Nearest)?;
1104            hw_params.set_buffer_size(v as alsa::pcm::Frames)?;
1105        }
1106        BufferSize::Default => {
1107            // These values together represent a moderate latency and wakeup interval.
1108            // Without them, we are at the mercy of the device
1109            hw_params.set_period_time_near(25_000, alsa::ValueOr::Nearest)?;
1110            hw_params.set_buffer_time_near(100_000, alsa::ValueOr::Nearest)?;
1111        }
1112    }
1113
1114    pcm_handle.hw_params(&hw_params)?;
1115
1116    Ok(hw_params.can_pause())
1117}
1118
1119fn set_sw_params_from_format(
1120    pcm_handle: &alsa::pcm::PCM,
1121    config: &StreamConfig,
1122    stream_type: alsa::Direction,
1123) -> Result<usize, BackendSpecificError> {
1124    let sw_params = pcm_handle.sw_params_current()?;
1125
1126    let period_len = {
1127        let (buffer, period) = pcm_handle.get_params()?;
1128        if buffer == 0 {
1129            return Err(BackendSpecificError {
1130                description: "initialization resulted in a null buffer".to_string(),
1131            });
1132        }
1133        sw_params.set_avail_min(period as alsa::pcm::Frames)?;
1134
1135        let start_threshold = match stream_type {
1136            alsa::Direction::Playback => buffer - period,
1137
1138            // For capture streams, the start threshold is irrelevant and ignored,
1139            // because build_stream_inner() starts the stream before process_input()
1140            // reads from it. Set it anyway I guess, since it's better than leaving
1141            // it at an unspecified default value.
1142            alsa::Direction::Capture => 1,
1143        };
1144        sw_params.set_start_threshold(start_threshold.try_into().unwrap())?;
1145
1146        period as usize * config.channels as usize
1147    };
1148
1149    sw_params.set_tstamp_mode(true)?;
1150    sw_params.set_tstamp_type(alsa::pcm::TstampType::MonotonicRaw)?;
1151
1152    // tstamp_type param cannot be changed after the device is opened.
1153    // The default tstamp_type value on most Linux systems is "monotonic",
1154    // let's try to use it if setting the tstamp_type fails.
1155    if pcm_handle.sw_params(&sw_params).is_err() {
1156        sw_params.set_tstamp_type(alsa::pcm::TstampType::Monotonic)?;
1157        pcm_handle.sw_params(&sw_params)?;
1158    }
1159
1160    Ok(period_len)
1161}
1162
1163impl From<alsa::Error> for BackendSpecificError {
1164    fn from(err: alsa::Error) -> Self {
1165        BackendSpecificError {
1166            description: err.to_string(),
1167        }
1168    }
1169}
1170
1171impl From<alsa::Error> for BuildStreamError {
1172    fn from(err: alsa::Error) -> Self {
1173        let err: BackendSpecificError = err.into();
1174        err.into()
1175    }
1176}
1177
1178impl From<alsa::Error> for SupportedStreamConfigsError {
1179    fn from(err: alsa::Error) -> Self {
1180        let err: BackendSpecificError = err.into();
1181        err.into()
1182    }
1183}
1184
1185impl From<alsa::Error> for PlayStreamError {
1186    fn from(err: alsa::Error) -> Self {
1187        let err: BackendSpecificError = err.into();
1188        err.into()
1189    }
1190}
1191
1192impl From<alsa::Error> for PauseStreamError {
1193    fn from(err: alsa::Error) -> Self {
1194        let err: BackendSpecificError = err.into();
1195        err.into()
1196    }
1197}
1198
1199impl From<alsa::Error> for StreamError {
1200    fn from(err: alsa::Error) -> Self {
1201        let err: BackendSpecificError = err.into();
1202        err.into()
1203    }
1204}