1extern crate alsa;
6extern crate libc;
7
8use std::{
9 cmp,
10 sync::{
11 atomic::{AtomicBool, AtomicUsize, Ordering},
12 Arc,
13 },
14 thread::{self, JoinHandle},
15 time::Duration,
16 vec::IntoIter as VecIntoIter,
17};
18
19use self::alsa::poll::Descriptors;
20pub use self::enumerate::Devices;
21
22use crate::{
23 iter::{SupportedInputConfigs, SupportedOutputConfigs},
24 traits::{DeviceTrait, HostTrait, StreamTrait},
25 BackendSpecificError, BufferSize, BuildStreamError, ChannelCount, Data,
26 DefaultStreamConfigError, DeviceDescription, DeviceDescriptionBuilder, DeviceDirection,
27 DeviceId, DeviceIdError, DeviceNameError, DevicesError, FrameCount, InputCallbackInfo,
28 OutputCallbackInfo, PauseStreamError, PlayStreamError, Sample, SampleFormat, SampleRate,
29 StreamConfig, StreamError, SupportedBufferSize, SupportedStreamConfig,
30 SupportedStreamConfigRange, SupportedStreamConfigsError, I24, U24,
31};
32
33mod enumerate;
34
35const DEFAULT_DEVICE: &str = "default";
87
88const LIBC_ENOTSUPP: libc::c_int = 524;
90
91#[derive(Debug, Clone)]
93pub struct Host {
94 inner: Arc<AlsaContext>,
95}
96
97impl Host {
98 pub fn new() -> Result<Self, crate::HostUnavailable> {
99 let inner = AlsaContext::new().map_err(|_| crate::HostUnavailable)?;
100 Ok(Host {
101 inner: Arc::new(inner),
102 })
103 }
104}
105
106impl HostTrait for Host {
107 type Devices = Devices;
108 type Device = Device;
109
110 fn is_available() -> bool {
111 true
113 }
114
115 fn devices(&self) -> Result<Self::Devices, DevicesError> {
116 self.enumerate_devices()
117 }
118
119 fn default_input_device(&self) -> Option<Self::Device> {
120 Some(Device::default())
121 }
122
123 fn default_output_device(&self) -> Option<Self::Device> {
124 Some(Device::default())
125 }
126}
127
128static ALSA_CONTEXT_COUNT: AtomicUsize = AtomicUsize::new(0);
130
131#[derive(Debug)]
133pub(super) struct AlsaContext;
134
135impl AlsaContext {
136 fn new() -> Result<Self, alsa::Error> {
137 if ALSA_CONTEXT_COUNT.fetch_add(1, Ordering::SeqCst) == 0 {
139 alsa::config::update()?;
140 }
141 Ok(Self)
142 }
143}
144
145impl Drop for AlsaContext {
146 fn drop(&mut self) {
147 if ALSA_CONTEXT_COUNT.fetch_sub(1, Ordering::SeqCst) == 1 {
149 let _ = alsa::config::update_free_global();
150 }
151 }
152}
153
154impl DeviceTrait for Device {
155 type SupportedInputConfigs = SupportedInputConfigs;
156 type SupportedOutputConfigs = SupportedOutputConfigs;
157 type Stream = Stream;
158
159 fn name(&self) -> Result<String, DeviceNameError> {
161 Device::name(self)
162 }
163
164 fn description(&self) -> Result<DeviceDescription, DeviceNameError> {
165 Device::description(self)
166 }
167
168 fn id(&self) -> Result<DeviceId, DeviceIdError> {
169 Device::id(self)
170 }
171
172 fn supports_input(&self) -> bool {
178 matches!(
179 self.direction,
180 DeviceDirection::Input | DeviceDirection::Duplex
181 )
182 }
183
184 fn supports_output(&self) -> bool {
185 matches!(
186 self.direction,
187 DeviceDirection::Output | DeviceDirection::Duplex
188 )
189 }
190
191 fn supported_input_configs(
192 &self,
193 ) -> Result<Self::SupportedInputConfigs, SupportedStreamConfigsError> {
194 Device::supported_input_configs(self)
195 }
196
197 fn supported_output_configs(
198 &self,
199 ) -> Result<Self::SupportedOutputConfigs, SupportedStreamConfigsError> {
200 Device::supported_output_configs(self)
201 }
202
203 fn default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
204 Device::default_input_config(self)
205 }
206
207 fn default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
208 Device::default_output_config(self)
209 }
210
211 fn build_input_stream_raw<D, E>(
212 &self,
213 conf: &StreamConfig,
214 sample_format: SampleFormat,
215 data_callback: D,
216 error_callback: E,
217 timeout: Option<Duration>,
218 ) -> Result<Self::Stream, BuildStreamError>
219 where
220 D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
221 E: FnMut(StreamError) + Send + 'static,
222 {
223 let stream_inner =
224 self.build_stream_inner(conf, sample_format, alsa::Direction::Capture)?;
225 let stream = Self::Stream::new_input(
226 Arc::new(stream_inner),
227 data_callback,
228 error_callback,
229 timeout,
230 );
231 Ok(stream)
232 }
233
234 fn build_output_stream_raw<D, E>(
235 &self,
236 conf: &StreamConfig,
237 sample_format: SampleFormat,
238 data_callback: D,
239 error_callback: E,
240 timeout: Option<Duration>,
241 ) -> Result<Self::Stream, BuildStreamError>
242 where
243 D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
244 E: FnMut(StreamError) + Send + 'static,
245 {
246 let stream_inner =
247 self.build_stream_inner(conf, sample_format, alsa::Direction::Playback)?;
248 let stream = Self::Stream::new_output(
249 Arc::new(stream_inner),
250 data_callback,
251 error_callback,
252 timeout,
253 );
254 Ok(stream)
255 }
256}
257
258#[derive(Debug)]
259struct TriggerSender(libc::c_int);
260
261#[derive(Debug)]
262struct TriggerReceiver(libc::c_int);
263
264impl TriggerSender {
265 fn wakeup(&self) {
266 let buf = 1u64;
267 let ret = unsafe { libc::write(self.0, &buf as *const u64 as *const _, 8) };
268 assert_eq!(ret, 8);
269 }
270}
271
272impl TriggerReceiver {
273 fn clear_pipe(&self) {
274 let mut out = 0u64;
275 let ret = unsafe { libc::read(self.0, &mut out as *mut u64 as *mut _, 8) };
276 assert_eq!(ret, 8);
277 }
278}
279
280fn trigger() -> (TriggerSender, TriggerReceiver) {
281 let mut fds = [0, 0];
282 match unsafe { libc::pipe(fds.as_mut_ptr()) } {
283 0 => (TriggerSender(fds[1]), TriggerReceiver(fds[0])),
284 _ => panic!("Could not create pipe"),
285 }
286}
287
288impl Drop for TriggerSender {
289 fn drop(&mut self) {
290 unsafe {
291 libc::close(self.0);
292 }
293 }
294}
295
296impl Drop for TriggerReceiver {
297 fn drop(&mut self) {
298 unsafe {
299 libc::close(self.0);
300 }
301 }
302}
303
304#[derive(Clone, Debug)]
305pub struct Device {
306 pcm_id: String,
307 desc: Option<String>,
308 direction: DeviceDirection,
309 _context: Arc<AlsaContext>,
310}
311
312impl PartialEq for Device {
313 fn eq(&self, other: &Self) -> bool {
314 self.pcm_id == other.pcm_id
315 }
316}
317
318impl Eq for Device {}
319
320impl std::hash::Hash for Device {
321 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
322 self.pcm_id.hash(state);
323 }
324}
325
326impl Device {
327 fn build_stream_inner(
328 &self,
329 conf: &StreamConfig,
330 sample_format: SampleFormat,
331 stream_type: alsa::Direction,
332 ) -> Result<StreamInner, BuildStreamError> {
333 if let BufferSize::Fixed(requested_size) = conf.buffer_size {
338 let supported_config = match stream_type {
343 alsa::Direction::Capture => self.default_input_config(),
344 alsa::Direction::Playback => self.default_output_config(),
345 };
346 if let Ok(config) = supported_config {
347 if let SupportedBufferSize::Range { min, max } = config.buffer_size {
348 if !(min..=max).contains(&requested_size) {
349 return Err(BuildStreamError::StreamConfigNotSupported);
350 }
351 }
352 }
353 }
354
355 let handle = match alsa::pcm::PCM::new(&self.pcm_id, stream_type, true)
356 .map_err(|e| (e, e.errno()))
357 {
358 Err((_, libc::ENOENT))
359 | Err((_, libc::EPERM))
360 | Err((_, libc::ENODEV))
361 | Err((_, LIBC_ENOTSUPP))
362 | Err((_, libc::EBUSY))
363 | Err((_, libc::EAGAIN)) => return Err(BuildStreamError::DeviceNotAvailable),
364 Err((_, libc::EINVAL)) => return Err(BuildStreamError::InvalidArgument),
365 Err((e, _)) => return Err(e.into()),
366 Ok(handle) => handle,
367 };
368
369 let can_pause = set_hw_params_from_format(&handle, conf, sample_format)?;
370 let period_samples = set_sw_params_from_format(&handle, conf, stream_type)?;
371
372 handle.prepare()?;
373
374 let num_descriptors = handle.count();
375 if num_descriptors == 0 {
376 let description = "poll descriptor count for stream was 0".to_string();
377 let err = BackendSpecificError { description };
378 return Err(err.into());
379 }
380
381 let ts = handle.status()?.get_htstamp();
384 let creation_instant = match (ts.tv_sec, ts.tv_nsec) {
385 (0, 0) => Some(std::time::Instant::now()),
386 _ => None,
387 };
388
389 if let alsa::Direction::Capture = stream_type {
390 handle.start()?;
391 }
392
393 let period_frames = period_samples / conf.channels as usize;
395 let period_bytes = period_samples * sample_format.sample_size();
396 let mut silence_template = vec![0u8; period_bytes].into_boxed_slice();
397
398 if sample_format.is_uint() {
400 fill_with_equilibrium(&mut silence_template, sample_format);
401 }
402
403 let stream_inner = StreamInner {
404 dropping: AtomicBool::new(false),
405 channel: handle,
406 sample_format,
407 num_descriptors,
408 conf: conf.clone(),
409 period_samples,
410 period_frames,
411 silence_template,
412 can_pause,
413 creation_instant,
414 _context: self._context.clone(),
415 };
416
417 Ok(stream_inner)
418 }
419
420 fn name(&self) -> Result<String, DeviceNameError> {
421 Ok(self.pcm_id.clone())
422 }
423
424 fn description(&self) -> Result<DeviceDescription, DeviceNameError> {
425 let name = self
426 .desc
427 .as_ref()
428 .and_then(|desc| desc.lines().next())
429 .unwrap_or(&self.pcm_id)
430 .to_string();
431
432 let mut builder = DeviceDescriptionBuilder::new(name)
433 .driver(self.pcm_id.clone())
434 .direction(self.direction);
435
436 if let Some(ref desc) = self.desc {
437 let lines = desc
438 .lines()
439 .map(|line| line.trim().to_string())
440 .filter(|line| !line.is_empty())
441 .collect();
442 builder = builder.extended(lines);
443 }
444
445 Ok(builder.build())
446 }
447
448 fn id(&self) -> Result<DeviceId, DeviceIdError> {
449 Ok(DeviceId(crate::platform::HostId::Alsa, self.pcm_id.clone()))
450 }
451
452 fn supported_configs(
453 &self,
454 stream_t: alsa::Direction,
455 ) -> Result<VecIntoIter<SupportedStreamConfigRange>, SupportedStreamConfigsError> {
456 let pcm =
457 match alsa::pcm::PCM::new(&self.pcm_id, stream_t, true).map_err(|e| (e, e.errno())) {
458 Err((_, libc::ENOENT))
459 | Err((_, libc::EPERM))
460 | Err((_, libc::ENODEV))
461 | Err((_, LIBC_ENOTSUPP))
462 | Err((_, libc::EBUSY))
463 | Err((_, libc::EAGAIN)) => {
464 return Err(SupportedStreamConfigsError::DeviceNotAvailable)
465 }
466 Err((_, libc::EINVAL)) => return Err(SupportedStreamConfigsError::InvalidArgument),
467 Err((e, _)) => return Err(e.into()),
468 Ok(pcm) => pcm,
469 };
470
471 let hw_params = alsa::pcm::HwParams::any(&pcm)?;
472
473 const FORMATS: [(SampleFormat, alsa::pcm::Format); 23] = [
477 (SampleFormat::I8, alsa::pcm::Format::S8),
478 (SampleFormat::U8, alsa::pcm::Format::U8),
479 (SampleFormat::I16, alsa::pcm::Format::S16LE),
480 (SampleFormat::I16, alsa::pcm::Format::S16BE),
481 (SampleFormat::U16, alsa::pcm::Format::U16LE),
482 (SampleFormat::U16, alsa::pcm::Format::U16BE),
483 (SampleFormat::I24, alsa::pcm::Format::S24LE),
484 (SampleFormat::I24, alsa::pcm::Format::S24BE),
485 (SampleFormat::U24, alsa::pcm::Format::U24LE),
486 (SampleFormat::U24, alsa::pcm::Format::U24BE),
487 (SampleFormat::I32, alsa::pcm::Format::S32LE),
488 (SampleFormat::I32, alsa::pcm::Format::S32BE),
489 (SampleFormat::U32, alsa::pcm::Format::U32LE),
490 (SampleFormat::U32, alsa::pcm::Format::U32BE),
491 (SampleFormat::F32, alsa::pcm::Format::FloatLE),
492 (SampleFormat::F32, alsa::pcm::Format::FloatBE),
493 (SampleFormat::F64, alsa::pcm::Format::Float64LE),
494 (SampleFormat::F64, alsa::pcm::Format::Float64BE),
495 (SampleFormat::DsdU8, alsa::pcm::Format::DSDU8),
496 (SampleFormat::DsdU16, alsa::pcm::Format::DSDU16LE),
497 (SampleFormat::DsdU16, alsa::pcm::Format::DSDU16BE),
498 (SampleFormat::DsdU32, alsa::pcm::Format::DSDU32LE),
499 (SampleFormat::DsdU32, alsa::pcm::Format::DSDU32BE),
500 ];
521
522 let mut supported_formats = Vec::new();
525 for &(sample_format, alsa_format) in FORMATS.iter() {
526 if hw_params.test_format(alsa_format).is_ok()
527 && !supported_formats.contains(&sample_format)
528 {
529 supported_formats.push(sample_format);
530 }
531 }
532
533 let min_rate = hw_params.get_rate_min()?;
534 let max_rate = hw_params.get_rate_max()?;
535
536 let sample_rates = if min_rate == max_rate || hw_params.test_rate(min_rate + 1).is_ok() {
537 vec![(min_rate, max_rate)]
538 } else {
539 let mut rates = Vec::new();
540 for &sample_rate in crate::COMMON_SAMPLE_RATES.iter() {
541 if hw_params.test_rate(sample_rate).is_ok() {
542 rates.push((sample_rate, sample_rate));
543 }
544 }
545
546 if rates.is_empty() {
547 vec![(min_rate, max_rate)]
548 } else {
549 rates
550 }
551 };
552
553 let min_channels = hw_params.get_channels_min()?;
554 let max_channels = hw_params.get_channels_max()?;
555
556 let max_channels = cmp::min(max_channels, 32); let supported_channels = (min_channels..max_channels + 1)
558 .filter_map(|num| {
559 if hw_params.test_channels(num).is_ok() {
560 Some(num as ChannelCount)
561 } else {
562 None
563 }
564 })
565 .collect::<Vec<_>>();
566
567 let (min_buffer_size, max_buffer_size) = hw_params_buffer_size_min_max(&hw_params);
568 let buffer_size_range = SupportedBufferSize::Range {
569 min: min_buffer_size,
570 max: max_buffer_size,
571 };
572
573 let mut output = Vec::with_capacity(
574 supported_formats.len() * supported_channels.len() * sample_rates.len(),
575 );
576 for &sample_format in supported_formats.iter() {
577 for &channels in supported_channels.iter() {
578 for &(min_rate, max_rate) in sample_rates.iter() {
579 output.push(SupportedStreamConfigRange {
580 channels,
581 min_sample_rate: min_rate,
582 max_sample_rate: max_rate,
583 buffer_size: buffer_size_range,
584 sample_format,
585 });
586 }
587 }
588 }
589
590 Ok(output.into_iter())
591 }
592
593 fn supported_input_configs(
594 &self,
595 ) -> Result<SupportedInputConfigs, SupportedStreamConfigsError> {
596 self.supported_configs(alsa::Direction::Capture)
597 }
598
599 fn supported_output_configs(
600 &self,
601 ) -> Result<SupportedOutputConfigs, SupportedStreamConfigsError> {
602 self.supported_configs(alsa::Direction::Playback)
603 }
604
605 fn default_config(
608 &self,
609 stream_t: alsa::Direction,
610 ) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
611 let mut formats: Vec<_> = {
612 match self.supported_configs(stream_t) {
613 Err(SupportedStreamConfigsError::DeviceNotAvailable) => {
614 return Err(DefaultStreamConfigError::DeviceNotAvailable);
615 }
616 Err(SupportedStreamConfigsError::InvalidArgument) => {
617 return Err(DefaultStreamConfigError::StreamTypeNotSupported);
620 }
621 Err(SupportedStreamConfigsError::BackendSpecific { err }) => {
622 return Err(err.into());
623 }
624 Ok(fmts) => fmts.collect(),
625 }
626 };
627
628 formats.sort_by(|a, b| a.cmp_default_heuristics(b));
629
630 match formats.into_iter().next_back() {
631 Some(f) => {
632 let min_r = f.min_sample_rate;
633 let max_r = f.max_sample_rate;
634 let mut format = f.with_max_sample_rate();
635 const HZ_44100: SampleRate = 44_100;
636 if min_r <= HZ_44100 && HZ_44100 <= max_r {
637 format.sample_rate = HZ_44100;
638 }
639 Ok(format)
640 }
641 None => Err(DefaultStreamConfigError::StreamTypeNotSupported),
642 }
643 }
644
645 fn default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
646 self.default_config(alsa::Direction::Capture)
647 }
648
649 fn default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
650 self.default_config(alsa::Direction::Playback)
651 }
652}
653
654impl Default for Device {
655 fn default() -> Self {
656 Self {
659 pcm_id: DEFAULT_DEVICE.to_owned(),
660 desc: Some("Default Audio Device".to_string()),
661 direction: DeviceDirection::Unknown,
662 _context: Arc::new(
663 AlsaContext::new().expect("Failed to initialize ALSA configuration"),
664 ),
665 }
666 }
667}
668
669#[derive(Debug)]
670struct StreamInner {
671 dropping: AtomicBool,
674
675 channel: alsa::pcm::PCM,
677
678 num_descriptors: usize,
681
682 sample_format: SampleFormat,
684
685 conf: StreamConfig,
687
688 period_samples: usize,
690 period_frames: usize,
691 silence_template: Box<[u8]>,
692
693 #[allow(dead_code)]
694 can_pause: bool,
697
698 creation_instant: Option<std::time::Instant>,
707
708 _context: Arc<AlsaContext>,
710}
711
712unsafe impl Sync for StreamInner {}
714
715#[derive(Debug)]
716pub struct Stream {
717 thread: Option<JoinHandle<()>>,
720
721 inner: Arc<StreamInner>,
723
724 trigger: TriggerSender,
726}
727
728crate::assert_stream_send!(Stream);
730crate::assert_stream_sync!(Stream);
731
732struct StreamWorkerContext {
733 descriptors: Box<[libc::pollfd]>,
734 transfer_buffer: Box<[u8]>,
735 poll_timeout: i32,
736}
737
738impl StreamWorkerContext {
739 fn new(poll_timeout: &Option<Duration>, stream: &StreamInner, rx: &TriggerReceiver) -> Self {
740 let poll_timeout: i32 = if let Some(d) = poll_timeout {
741 d.as_millis().try_into().unwrap()
742 } else {
743 -1 };
745
746 let transfer_buffer = stream.silence_template.clone();
748
749 let total_descriptors = 1 + stream.num_descriptors;
753 let mut descriptors = vec![
754 libc::pollfd {
755 fd: 0,
756 events: 0,
757 revents: 0
758 };
759 total_descriptors
760 ]
761 .into_boxed_slice();
762
763 descriptors[0] = libc::pollfd {
765 fd: rx.0,
766 events: libc::POLLIN,
767 revents: 0,
768 };
769
770 let filled = stream
772 .channel
773 .fill(&mut descriptors[1..])
774 .expect("Failed to fill ALSA descriptors");
775 debug_assert_eq!(filled, stream.num_descriptors);
776
777 Self {
778 descriptors,
779 transfer_buffer,
780 poll_timeout,
781 }
782 }
783}
784
785fn input_stream_worker(
786 rx: TriggerReceiver,
787 stream: &StreamInner,
788 data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
789 error_callback: &mut (dyn FnMut(StreamError) + Send + 'static),
790 timeout: Option<Duration>,
791) {
792 boost_current_thread_priority(stream.conf.buffer_size, stream.conf.sample_rate);
793
794 let mut ctxt = StreamWorkerContext::new(&timeout, stream, &rx);
795 loop {
796 let flow =
797 poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt).unwrap_or_else(|err| {
798 error_callback(err.into());
799 PollDescriptorsFlow::Continue
800 });
801
802 match flow {
803 PollDescriptorsFlow::Continue => {
804 continue;
805 }
806 PollDescriptorsFlow::XRun => {
807 error_callback(StreamError::BufferUnderrun);
808 if let Err(err) = stream.channel.prepare() {
809 error_callback(err.into());
810 }
811 continue;
812 }
813 PollDescriptorsFlow::Return => return,
814 PollDescriptorsFlow::Ready {
815 status,
816 delay_frames,
817 } => {
818 if let Err(err) = process_input(
819 stream,
820 &mut ctxt.transfer_buffer,
821 status,
822 delay_frames,
823 data_callback,
824 ) {
825 error_callback(err.into());
826 }
827 }
828 }
829 }
830}
831
832fn output_stream_worker(
833 rx: TriggerReceiver,
834 stream: &StreamInner,
835 data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
836 error_callback: &mut (dyn FnMut(StreamError) + Send + 'static),
837 timeout: Option<Duration>,
838) {
839 boost_current_thread_priority(stream.conf.buffer_size, stream.conf.sample_rate);
840
841 let mut ctxt = StreamWorkerContext::new(&timeout, stream, &rx);
842
843 loop {
844 let flow =
845 poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt).unwrap_or_else(|err| {
846 error_callback(err.into());
847 PollDescriptorsFlow::Continue
848 });
849
850 match flow {
851 PollDescriptorsFlow::Continue => continue,
852 PollDescriptorsFlow::XRun => {
853 error_callback(StreamError::BufferUnderrun);
854 if let Err(err) = stream.channel.prepare() {
855 error_callback(err.into());
856 }
857 continue;
858 }
859 PollDescriptorsFlow::Return => return,
860 PollDescriptorsFlow::Ready {
861 status,
862 delay_frames,
863 } => {
864 if let Err(err) = process_output(
865 stream,
866 &mut ctxt.transfer_buffer,
867 status,
868 delay_frames,
869 data_callback,
870 error_callback,
871 ) {
872 error_callback(err.into());
873 }
874 }
875 }
876 }
877}
878
879#[cfg(feature = "audio_thread_priority")]
880fn boost_current_thread_priority(buffer_size: BufferSize, sample_rate: SampleRate) {
881 use audio_thread_priority::promote_current_thread_to_real_time;
882
883 let buffer_size = if let BufferSize::Fixed(buffer_size) = buffer_size {
884 buffer_size
885 } else {
886 0
888 };
889
890 if let Err(err) = promote_current_thread_to_real_time(buffer_size, sample_rate) {
891 eprintln!("Failed to promote audio thread to real-time priority: {err}");
892 }
893}
894
895#[cfg(not(feature = "audio_thread_priority"))]
896fn boost_current_thread_priority(_: BufferSize, _: SampleRate) {}
897
898enum PollDescriptorsFlow {
899 Continue,
900 Return,
901 Ready {
902 status: alsa::pcm::Status,
903 delay_frames: usize,
904 },
905 XRun,
906}
907
908fn poll_descriptors_and_prepare_buffer(
910 rx: &TriggerReceiver,
911 stream: &StreamInner,
912 ctxt: &mut StreamWorkerContext,
913) -> Result<PollDescriptorsFlow, BackendSpecificError> {
914 if stream.dropping.load(Ordering::Acquire) {
915 rx.clear_pipe();
917 return Ok(PollDescriptorsFlow::Return);
918 }
919
920 let StreamWorkerContext {
921 ref mut descriptors,
922 ref poll_timeout,
923 ..
924 } = *ctxt;
925
926 let res = alsa::poll::poll(descriptors, *poll_timeout)?;
927 if res == 0 {
928 let description = String::from("`alsa::poll()` spuriously returned");
929 return Err(BackendSpecificError { description });
930 }
931
932 if descriptors[0].revents != 0 {
933 rx.clear_pipe();
935 return Ok(PollDescriptorsFlow::Return);
936 }
937
938 let revents = stream.channel.revents(&descriptors[1..])?;
939 if revents.contains(alsa::poll::Flags::ERR) {
940 let description = String::from("`alsa::poll()` returned POLLERR");
941 return Err(BackendSpecificError { description });
942 }
943
944 if !revents.contains(alsa::poll::Flags::IN) && !revents.contains(alsa::poll::Flags::OUT) {
946 return Ok(PollDescriptorsFlow::Continue);
948 }
949
950 let status = stream.channel.status()?;
951 let avail_frames = match stream.channel.avail() {
952 Err(err) if err.errno() == libc::EPIPE => return Ok(PollDescriptorsFlow::XRun),
953 res => res,
954 }? as usize;
955 let delay_frames = match status.get_delay() {
956 d if d < 0 => 0,
958 d => d as usize,
959 };
960 let available_samples = avail_frames * stream.conf.channels as usize;
961
962 if available_samples < stream.period_samples {
967 return Ok(PollDescriptorsFlow::Continue);
968 }
969
970 Ok(PollDescriptorsFlow::Ready {
971 status,
972 delay_frames,
973 })
974}
975
976fn process_input(
978 stream: &StreamInner,
979 buffer: &mut [u8],
980 status: alsa::pcm::Status,
981 delay_frames: usize,
982 data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
983) -> Result<(), BackendSpecificError> {
984 stream.channel.io_bytes().readi(buffer)?;
985 let data = buffer.as_mut_ptr() as *mut ();
986 let data = unsafe { Data::from_parts(data, stream.period_samples, stream.sample_format) };
987 let callback = match stream.creation_instant {
988 None => stream_timestamp_hardware(&status)?,
989 Some(creation) => stream_timestamp_fallback(creation)?,
990 };
991 let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
992 let capture = callback
993 .sub(delay_duration)
994 .ok_or_else(|| BackendSpecificError {
995 description: "`capture` is earlier than representation supported by `StreamInstant`"
996 .to_string(),
997 })?;
998 let timestamp = crate::InputStreamTimestamp { callback, capture };
999 let info = crate::InputCallbackInfo { timestamp };
1000 data_callback(&data, &info);
1001
1002 Ok(())
1003}
1004
1005fn process_output(
1009 stream: &StreamInner,
1010 buffer: &mut [u8],
1011 status: alsa::pcm::Status,
1012 delay_frames: usize,
1013 data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
1014 error_callback: &mut dyn FnMut(StreamError),
1015) -> Result<(), BackendSpecificError> {
1016 buffer.copy_from_slice(&stream.silence_template);
1018 {
1019 let data = buffer.as_mut_ptr() as *mut ();
1020 let mut data =
1021 unsafe { Data::from_parts(data, stream.period_samples, stream.sample_format) };
1022 let callback = match stream.creation_instant {
1023 None => stream_timestamp_hardware(&status)?,
1024 Some(creation) => stream_timestamp_fallback(creation)?,
1025 };
1026 let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
1027 let playback = callback
1028 .add(delay_duration)
1029 .ok_or_else(|| BackendSpecificError {
1030 description: "`playback` occurs beyond representation supported by `StreamInstant`"
1031 .to_string(),
1032 })?;
1033 let timestamp = crate::OutputStreamTimestamp { callback, playback };
1034 let info = crate::OutputCallbackInfo { timestamp };
1035 data_callback(&mut data, &info);
1036 }
1037
1038 loop {
1039 match stream.channel.io_bytes().writei(buffer) {
1040 Err(err) if err.errno() == libc::EPIPE => {
1041 error_callback(StreamError::BufferUnderrun);
1046 if let Err(recover_err) = stream.channel.try_recover(err, true) {
1047 error_callback(recover_err.into());
1048 }
1049 }
1050 Err(err) => {
1051 error_callback(err.into());
1052 continue;
1053 }
1054 Ok(result) if result != stream.period_frames => {
1055 let description = format!(
1056 "unexpected number of frames written: expected {}, \
1057 result {result} (this should never happen)",
1058 stream.period_frames
1059 );
1060 error_callback(BackendSpecificError { description }.into());
1061 continue;
1062 }
1063 _ => {
1064 break;
1065 }
1066 }
1067 }
1068 Ok(())
1069}
1070
1071#[inline]
1075fn stream_timestamp_hardware(
1076 status: &alsa::pcm::Status,
1077) -> Result<crate::StreamInstant, BackendSpecificError> {
1078 let trigger_ts = status.get_trigger_htstamp();
1079 let ts = status.get_htstamp();
1080 let nanos = timespec_diff_nanos(ts, trigger_ts);
1081 if nanos < 0 {
1082 let description = format!(
1083 "get_htstamp `{}.{}` was earlier than get_trigger_htstamp `{}.{}`",
1084 ts.tv_sec, ts.tv_nsec, trigger_ts.tv_sec, trigger_ts.tv_nsec
1085 );
1086 return Err(BackendSpecificError { description });
1087 }
1088 Ok(crate::StreamInstant::from_nanos(nanos))
1089}
1090
1091#[inline]
1095fn stream_timestamp_fallback(
1096 creation: std::time::Instant,
1097) -> Result<crate::StreamInstant, BackendSpecificError> {
1098 let now = std::time::Instant::now();
1099 let duration = now.duration_since(creation);
1100 crate::StreamInstant::from_nanos_i128(duration.as_nanos() as i128).ok_or(BackendSpecificError {
1101 description: "stream duration has exceeded `StreamInstant` representation".to_string(),
1102 })
1103}
1104
1105#[inline]
1108fn timespec_to_nanos(ts: libc::timespec) -> i64 {
1109 let nanos = ts.tv_sec * 1_000_000_000 + ts.tv_nsec;
1110 #[cfg(target_pointer_width = "64")]
1111 return nanos;
1112 #[cfg(not(target_pointer_width = "64"))]
1113 return nanos.into();
1114}
1115
1116#[inline]
1119fn timespec_diff_nanos(a: libc::timespec, b: libc::timespec) -> i64 {
1120 timespec_to_nanos(a) - timespec_to_nanos(b)
1121}
1122
1123#[inline]
1125fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration {
1126 let secsf = frames as f64 / rate as f64;
1127 let secs = secsf as u64;
1128 let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32;
1129 std::time::Duration::new(secs, nanos)
1130}
1131
1132impl Stream {
1133 fn new_input<D, E>(
1134 inner: Arc<StreamInner>,
1135 mut data_callback: D,
1136 mut error_callback: E,
1137 timeout: Option<Duration>,
1138 ) -> Stream
1139 where
1140 D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
1141 E: FnMut(StreamError) + Send + 'static,
1142 {
1143 let (tx, rx) = trigger();
1144 let stream = inner.clone();
1146 let thread = thread::Builder::new()
1147 .name("cpal_alsa_in".to_owned())
1148 .spawn(move || {
1149 input_stream_worker(
1150 rx,
1151 &stream,
1152 &mut data_callback,
1153 &mut error_callback,
1154 timeout,
1155 );
1156 })
1157 .unwrap();
1158 Self {
1159 thread: Some(thread),
1160 inner,
1161 trigger: tx,
1162 }
1163 }
1164
1165 fn new_output<D, E>(
1166 inner: Arc<StreamInner>,
1167 mut data_callback: D,
1168 mut error_callback: E,
1169 timeout: Option<Duration>,
1170 ) -> Stream
1171 where
1172 D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
1173 E: FnMut(StreamError) + Send + 'static,
1174 {
1175 let (tx, rx) = trigger();
1176 let stream = inner.clone();
1178 let thread = thread::Builder::new()
1179 .name("cpal_alsa_out".to_owned())
1180 .spawn(move || {
1181 output_stream_worker(
1182 rx,
1183 &stream,
1184 &mut data_callback,
1185 &mut error_callback,
1186 timeout,
1187 );
1188 })
1189 .unwrap();
1190 Self {
1191 thread: Some(thread),
1192 inner,
1193 trigger: tx,
1194 }
1195 }
1196}
1197
1198impl Drop for Stream {
1199 fn drop(&mut self) {
1200 self.inner.dropping.store(true, Ordering::Release);
1201 self.trigger.wakeup();
1202 if let Some(handle) = self.thread.take() {
1203 let _ = handle.join();
1204 }
1205 }
1206}
1207
1208impl StreamTrait for Stream {
1209 fn play(&self) -> Result<(), PlayStreamError> {
1210 self.inner.channel.pause(false).ok();
1211 Ok(())
1212 }
1213 fn pause(&self) -> Result<(), PauseStreamError> {
1214 self.inner.channel.pause(true).ok();
1215 Ok(())
1216 }
1217}
1218
1219fn clamp_frame_count(buffer_size: alsa::pcm::Frames) -> FrameCount {
1222 buffer_size.max(1).try_into().unwrap_or(FrameCount::MAX)
1223}
1224
1225fn hw_params_buffer_size_min_max(hw_params: &alsa::pcm::HwParams) -> (FrameCount, FrameCount) {
1226 let min_buf = hw_params
1227 .get_buffer_size_min()
1228 .map(clamp_frame_count)
1229 .unwrap_or(1);
1230 let max_buf = hw_params
1231 .get_buffer_size_max()
1232 .map(clamp_frame_count)
1233 .unwrap_or(FrameCount::MAX);
1234 (min_buf, max_buf)
1235}
1236
1237fn fill_with_equilibrium(buffer: &mut [u8], sample_format: SampleFormat) {
1240 macro_rules! fill_typed {
1241 ($sample_type:ty) => {{
1242 let sample_size = std::mem::size_of::<$sample_type>();
1243
1244 assert_eq!(
1245 buffer.len() % sample_size,
1246 0,
1247 "Buffer size must be aligned to sample size for format {:?}",
1248 sample_format
1249 );
1250
1251 let num_samples = buffer.len() / sample_size;
1252 let equilibrium = <$sample_type as Sample>::EQUILIBRIUM;
1253
1254 let samples = unsafe {
1256 std::slice::from_raw_parts_mut(
1257 buffer.as_mut_ptr() as *mut $sample_type,
1258 num_samples,
1259 )
1260 };
1261
1262 for sample in samples {
1263 *sample = equilibrium;
1264 }
1265 }};
1266 }
1267 const DSD_SILENCE_BYTE: u8 = 0x69;
1268
1269 match sample_format {
1270 SampleFormat::I8 => fill_typed!(i8),
1271 SampleFormat::I16 => fill_typed!(i16),
1272 SampleFormat::I24 => fill_typed!(I24),
1273 SampleFormat::I32 => fill_typed!(i32),
1274 SampleFormat::I64 => fill_typed!(i64),
1276 SampleFormat::U8 => fill_typed!(u8),
1277 SampleFormat::U16 => fill_typed!(u16),
1278 SampleFormat::U24 => fill_typed!(U24),
1279 SampleFormat::U32 => fill_typed!(u32),
1280 SampleFormat::U64 => fill_typed!(u64),
1282 SampleFormat::F32 => fill_typed!(f32),
1283 SampleFormat::F64 => fill_typed!(f64),
1284 SampleFormat::DsdU8 | SampleFormat::DsdU16 | SampleFormat::DsdU32 => {
1285 buffer.fill(DSD_SILENCE_BYTE)
1286 }
1287 }
1288}
1289
1290fn init_hw_params<'a>(
1291 pcm_handle: &'a alsa::pcm::PCM,
1292 config: &StreamConfig,
1293 sample_format: SampleFormat,
1294) -> Result<alsa::pcm::HwParams<'a>, BackendSpecificError> {
1295 let hw_params = alsa::pcm::HwParams::any(pcm_handle)?;
1296 hw_params.set_access(alsa::pcm::Access::RWInterleaved)?;
1297
1298 let alsa_format = sample_format_to_alsa_format(&hw_params, sample_format)?;
1302 hw_params.set_format(alsa_format)?;
1303
1304 hw_params.set_rate(config.sample_rate, alsa::ValueOr::Nearest)?;
1305 hw_params.set_channels(config.channels as u32)?;
1306 Ok(hw_params)
1307}
1308
1309fn sample_format_to_alsa_format(
1312 hw_params: &alsa::pcm::HwParams,
1313 sample_format: SampleFormat,
1314) -> Result<alsa::pcm::Format, BackendSpecificError> {
1315 use alsa::pcm::Format;
1316
1317 let (native, opposite) = match sample_format {
1319 SampleFormat::I8 => return Ok(Format::S8), SampleFormat::U8 => return Ok(Format::U8), #[cfg(target_endian = "little")]
1322 SampleFormat::I16 => (Format::S16LE, Format::S16BE),
1323 #[cfg(target_endian = "big")]
1324 SampleFormat::I16 => (Format::S16BE, Format::S16LE),
1325 #[cfg(target_endian = "little")]
1326 SampleFormat::U16 => (Format::U16LE, Format::U16BE),
1327 #[cfg(target_endian = "big")]
1328 SampleFormat::U16 => (Format::U16BE, Format::U16LE),
1329 #[cfg(target_endian = "little")]
1330 SampleFormat::I24 => (Format::S24LE, Format::S24BE),
1331 #[cfg(target_endian = "big")]
1332 SampleFormat::I24 => (Format::S24BE, Format::S24LE),
1333 #[cfg(target_endian = "little")]
1334 SampleFormat::U24 => (Format::U24LE, Format::U24BE),
1335 #[cfg(target_endian = "big")]
1336 SampleFormat::U24 => (Format::U24BE, Format::U24LE),
1337 #[cfg(target_endian = "little")]
1338 SampleFormat::I32 => (Format::S32LE, Format::S32BE),
1339 #[cfg(target_endian = "big")]
1340 SampleFormat::I32 => (Format::S32BE, Format::S32LE),
1341 #[cfg(target_endian = "little")]
1342 SampleFormat::U32 => (Format::U32LE, Format::U32BE),
1343 #[cfg(target_endian = "big")]
1344 SampleFormat::U32 => (Format::U32BE, Format::U32LE),
1345 #[cfg(target_endian = "little")]
1346 SampleFormat::F32 => (Format::FloatLE, Format::FloatBE),
1347 #[cfg(target_endian = "big")]
1348 SampleFormat::F32 => (Format::FloatBE, Format::FloatLE),
1349 #[cfg(target_endian = "little")]
1350 SampleFormat::F64 => (Format::Float64LE, Format::Float64BE),
1351 #[cfg(target_endian = "big")]
1352 SampleFormat::F64 => (Format::Float64BE, Format::Float64LE),
1353 SampleFormat::DsdU8 => return Ok(Format::DSDU8),
1354 #[cfg(target_endian = "little")]
1355 SampleFormat::DsdU16 => (Format::DSDU16LE, Format::DSDU16BE),
1356 #[cfg(target_endian = "big")]
1357 SampleFormat::DsdU16 => (Format::DSDU16BE, Format::DSDU16LE),
1358 #[cfg(target_endian = "little")]
1359 SampleFormat::DsdU32 => (Format::DSDU32LE, Format::DSDU32BE),
1360 #[cfg(target_endian = "big")]
1361 SampleFormat::DsdU32 => (Format::DSDU32BE, Format::DSDU32LE),
1362 _ => {
1363 return Err(BackendSpecificError {
1364 description: format!("Sample format '{sample_format}' is not supported"),
1365 })
1366 }
1367 };
1368
1369 if hw_params.test_format(native).is_ok() {
1371 return Ok(native);
1372 }
1373
1374 if hw_params.test_format(opposite).is_ok() {
1376 return Ok(opposite);
1377 }
1378
1379 Err(BackendSpecificError {
1380 description: format!(
1381 "Sample format '{sample_format}' is not supported by hardware in any endianness"
1382 ),
1383 })
1384}
1385
1386fn set_hw_params_from_format(
1387 pcm_handle: &alsa::pcm::PCM,
1388 config: &StreamConfig,
1389 sample_format: SampleFormat,
1390) -> Result<bool, BackendSpecificError> {
1391 let hw_params = init_hw_params(pcm_handle, config, sample_format)?;
1392
1393 if let BufferSize::Fixed(buffer_frames) = config.buffer_size {
1397 hw_params.set_buffer_size_near((2 * buffer_frames) as alsa::pcm::Frames)?;
1398 hw_params
1399 .set_period_size_near(buffer_frames as alsa::pcm::Frames, alsa::ValueOr::Nearest)?;
1400 }
1401
1402 pcm_handle.hw_params(&hw_params)?;
1404
1405 if config.buffer_size == BufferSize::Default {
1409 if let Ok(period) = hw_params.get_period_size() {
1410 let hw_params = init_hw_params(pcm_handle, config, sample_format)?;
1412
1413 hw_params.set_period_size_near(period, alsa::ValueOr::Nearest)?;
1415 hw_params.set_buffer_size_near(2 * period)?;
1416
1417 pcm_handle.hw_params(&hw_params)?;
1419 }
1420 }
1421
1422 Ok(hw_params.can_pause())
1423}
1424
1425fn set_sw_params_from_format(
1426 pcm_handle: &alsa::pcm::PCM,
1427 config: &StreamConfig,
1428 stream_type: alsa::Direction,
1429) -> Result<usize, BackendSpecificError> {
1430 let sw_params = pcm_handle.sw_params_current()?;
1431
1432 let period_samples = {
1433 let (buffer, period) = pcm_handle.get_params()?;
1434 if buffer == 0 {
1435 return Err(BackendSpecificError {
1436 description: "initialization resulted in a null buffer".to_string(),
1437 });
1438 }
1439 let start_threshold = match stream_type {
1440 alsa::Direction::Playback => {
1441 2 * period
1444 }
1445 alsa::Direction::Capture => 1,
1446 };
1447 sw_params.set_start_threshold(start_threshold as alsa::pcm::Frames)?;
1448 sw_params.set_avail_min(period as alsa::pcm::Frames)?;
1449
1450 period as usize * config.channels as usize
1451 };
1452
1453 sw_params.set_tstamp_mode(true)?;
1454 sw_params.set_tstamp_type(alsa::pcm::TstampType::MonotonicRaw)?;
1455
1456 if pcm_handle.sw_params(&sw_params).is_err() {
1460 sw_params.set_tstamp_type(alsa::pcm::TstampType::Monotonic)?;
1461 pcm_handle.sw_params(&sw_params)?;
1462 }
1463
1464 Ok(period_samples)
1465}
1466
1467impl From<alsa::Error> for BackendSpecificError {
1468 fn from(err: alsa::Error) -> Self {
1469 Self {
1470 description: err.to_string(),
1471 }
1472 }
1473}
1474
1475impl From<alsa::Error> for BuildStreamError {
1476 fn from(err: alsa::Error) -> Self {
1477 let err: BackendSpecificError = err.into();
1478 err.into()
1479 }
1480}
1481
1482impl From<alsa::Error> for SupportedStreamConfigsError {
1483 fn from(err: alsa::Error) -> Self {
1484 let err: BackendSpecificError = err.into();
1485 err.into()
1486 }
1487}
1488
1489impl From<alsa::Error> for PlayStreamError {
1490 fn from(err: alsa::Error) -> Self {
1491 let err: BackendSpecificError = err.into();
1492 err.into()
1493 }
1494}
1495
1496impl From<alsa::Error> for PauseStreamError {
1497 fn from(err: alsa::Error) -> Self {
1498 let err: BackendSpecificError = err.into();
1499 err.into()
1500 }
1501}
1502
1503impl From<alsa::Error> for StreamError {
1504 fn from(err: alsa::Error) -> Self {
1505 let err: BackendSpecificError = err.into();
1506 err.into()
1507 }
1508}