1extern crate alsa;
2extern crate libc;
3
4use self::alsa::poll::Descriptors;
5use crate::traits::{DeviceTrait, HostTrait, StreamTrait};
6use crate::{
7 BackendSpecificError, BufferSize, BuildStreamError, ChannelCount, Data,
8 DefaultStreamConfigError, DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo,
9 PauseStreamError, PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError,
10 SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange,
11 SupportedStreamConfigsError,
12};
13use std::cell::Cell;
14use std::cmp;
15use std::convert::TryInto;
16use std::sync::{Arc, Mutex};
17use std::thread::{self, JoinHandle};
18use std::time::Duration;
19use std::vec::IntoIter as VecIntoIter;
20
21pub use self::enumerate::{default_input_device, default_output_device, Devices};
22
23pub type SupportedInputConfigs = VecIntoIter<SupportedStreamConfigRange>;
24pub type SupportedOutputConfigs = VecIntoIter<SupportedStreamConfigRange>;
25
26mod enumerate;
27
28#[derive(Debug)]
30pub struct Host;
31
32impl Host {
33 pub fn new() -> Result<Self, crate::HostUnavailable> {
34 Ok(Host)
35 }
36}
37
38impl HostTrait for Host {
39 type Devices = Devices;
40 type Device = Device;
41
42 fn is_available() -> bool {
43 true
45 }
46
47 fn devices(&self) -> Result<Self::Devices, DevicesError> {
48 Devices::new()
49 }
50
51 fn default_input_device(&self) -> Option<Self::Device> {
52 default_input_device()
53 }
54
55 fn default_output_device(&self) -> Option<Self::Device> {
56 default_output_device()
57 }
58}
59
60impl DeviceTrait for Device {
61 type SupportedInputConfigs = SupportedInputConfigs;
62 type SupportedOutputConfigs = SupportedOutputConfigs;
63 type Stream = Stream;
64
65 fn name(&self) -> Result<String, DeviceNameError> {
66 Device::name(self)
67 }
68
69 fn supported_input_configs(
70 &self,
71 ) -> Result<Self::SupportedInputConfigs, SupportedStreamConfigsError> {
72 Device::supported_input_configs(self)
73 }
74
75 fn supported_output_configs(
76 &self,
77 ) -> Result<Self::SupportedOutputConfigs, SupportedStreamConfigsError> {
78 Device::supported_output_configs(self)
79 }
80
81 fn default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
82 Device::default_input_config(self)
83 }
84
85 fn default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
86 Device::default_output_config(self)
87 }
88
89 fn build_input_stream_raw<D, E>(
90 &self,
91 conf: &StreamConfig,
92 sample_format: SampleFormat,
93 data_callback: D,
94 error_callback: E,
95 timeout: Option<Duration>,
96 ) -> Result<Self::Stream, BuildStreamError>
97 where
98 D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
99 E: FnMut(StreamError) + Send + 'static,
100 {
101 let stream_inner =
102 self.build_stream_inner(conf, sample_format, alsa::Direction::Capture)?;
103 let stream = Stream::new_input(
104 Arc::new(stream_inner),
105 data_callback,
106 error_callback,
107 timeout,
108 );
109 Ok(stream)
110 }
111
112 fn build_output_stream_raw<D, E>(
113 &self,
114 conf: &StreamConfig,
115 sample_format: SampleFormat,
116 data_callback: D,
117 error_callback: E,
118 timeout: Option<Duration>,
119 ) -> Result<Self::Stream, BuildStreamError>
120 where
121 D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
122 E: FnMut(StreamError) + Send + 'static,
123 {
124 let stream_inner =
125 self.build_stream_inner(conf, sample_format, alsa::Direction::Playback)?;
126 let stream = Stream::new_output(
127 Arc::new(stream_inner),
128 data_callback,
129 error_callback,
130 timeout,
131 );
132 Ok(stream)
133 }
134}
135
136struct TriggerSender(libc::c_int);
137
138struct TriggerReceiver(libc::c_int);
139
140impl TriggerSender {
141 fn wakeup(&self) {
142 let buf = 1u64;
143 let ret = unsafe { libc::write(self.0, &buf as *const u64 as *const _, 8) };
144 assert_eq!(ret, 8);
145 }
146}
147
148impl TriggerReceiver {
149 fn clear_pipe(&self) {
150 let mut out = 0u64;
151 let ret = unsafe { libc::read(self.0, &mut out as *mut u64 as *mut _, 8) };
152 assert_eq!(ret, 8);
153 }
154}
155
156fn trigger() -> (TriggerSender, TriggerReceiver) {
157 let mut fds = [0, 0];
158 match unsafe { libc::pipe(fds.as_mut_ptr()) } {
159 0 => (TriggerSender(fds[1]), TriggerReceiver(fds[0])),
160 _ => panic!("Could not create pipe"),
161 }
162}
163
164impl Drop for TriggerSender {
165 fn drop(&mut self) {
166 unsafe {
167 libc::close(self.0);
168 }
169 }
170}
171
172impl Drop for TriggerReceiver {
173 fn drop(&mut self) {
174 unsafe {
175 libc::close(self.0);
176 }
177 }
178}
179
180#[derive(Default)]
181struct DeviceHandles {
182 playback: Option<alsa::PCM>,
183 capture: Option<alsa::PCM>,
184}
185
186impl DeviceHandles {
187 fn open(pcm_id: &str) -> Result<Self, alsa::Error> {
190 let mut handles = Self::default();
191 let playback_err = handles.try_open(pcm_id, alsa::Direction::Playback).err();
192 let capture_err = handles.try_open(pcm_id, alsa::Direction::Capture).err();
193 if let Some(err) = capture_err.and(playback_err) {
194 Err(err)
195 } else {
196 Ok(handles)
197 }
198 }
199
200 fn try_open(
205 &mut self,
206 pcm_id: &str,
207 stream_type: alsa::Direction,
208 ) -> Result<&mut Option<alsa::PCM>, alsa::Error> {
209 let handle = match stream_type {
210 alsa::Direction::Playback => &mut self.playback,
211 alsa::Direction::Capture => &mut self.capture,
212 };
213
214 if handle.is_none() {
215 *handle = Some(alsa::pcm::PCM::new(pcm_id, stream_type, true)?);
216 }
217
218 Ok(handle)
219 }
220
221 fn get_mut(
224 &mut self,
225 pcm_id: &str,
226 stream_type: alsa::Direction,
227 ) -> Result<&mut alsa::PCM, alsa::Error> {
228 Ok(self.try_open(pcm_id, stream_type)?.as_mut().unwrap())
229 }
230
231 fn take(&mut self, name: &str, stream_type: alsa::Direction) -> Result<alsa::PCM, alsa::Error> {
234 Ok(self.try_open(name, stream_type)?.take().unwrap())
235 }
236}
237
238#[derive(Clone)]
239pub struct Device {
240 name: String,
241 pcm_id: String,
242 handles: Arc<Mutex<DeviceHandles>>,
243}
244
245impl Device {
246 fn build_stream_inner(
247 &self,
248 conf: &StreamConfig,
249 sample_format: SampleFormat,
250 stream_type: alsa::Direction,
251 ) -> Result<StreamInner, BuildStreamError> {
252 let handle_result = self
253 .handles
254 .lock()
255 .unwrap()
256 .take(&self.pcm_id, stream_type)
257 .map_err(|e| (e, e.errno()));
258
259 let handle = match handle_result {
260 Err((_, libc::EBUSY)) => return Err(BuildStreamError::DeviceNotAvailable),
261 Err((_, libc::EINVAL)) => return Err(BuildStreamError::InvalidArgument),
262 Err((e, _)) => return Err(e.into()),
263 Ok(handle) => handle,
264 };
265 let can_pause = set_hw_params_from_format(&handle, conf, sample_format)?;
266 let period_len = set_sw_params_from_format(&handle, conf, stream_type)?;
267
268 handle.prepare()?;
269
270 let num_descriptors = handle.count();
271 if num_descriptors == 0 {
272 let description = "poll descriptor count for stream was 0".to_string();
273 let err = BackendSpecificError { description };
274 return Err(err.into());
275 }
276
277 let ts = handle.status()?.get_htstamp();
280 let creation_instant = match (ts.tv_sec, ts.tv_nsec) {
281 (0, 0) => Some(std::time::Instant::now()),
282 _ => None,
283 };
284
285 if let alsa::Direction::Capture = stream_type {
286 handle.start()?;
287 }
288
289 let stream_inner = StreamInner {
290 dropping: Cell::new(false),
291 channel: handle,
292 sample_format,
293 num_descriptors,
294 conf: conf.clone(),
295 period_len,
296 can_pause,
297 creation_instant,
298 };
299
300 Ok(stream_inner)
301 }
302
303 #[inline]
304 fn name(&self) -> Result<String, DeviceNameError> {
305 Ok(self.name.clone())
306 }
307
308 fn supported_configs(
309 &self,
310 stream_t: alsa::Direction,
311 ) -> Result<VecIntoIter<SupportedStreamConfigRange>, SupportedStreamConfigsError> {
312 let mut guard = self.handles.lock().unwrap();
313 let handle_result = guard
314 .get_mut(&self.pcm_id, stream_t)
315 .map_err(|e| (e, e.errno()));
316
317 let handle = match handle_result {
318 Err((_, libc::ENOENT)) | Err((_, libc::EBUSY)) => {
319 return Err(SupportedStreamConfigsError::DeviceNotAvailable)
320 }
321 Err((_, libc::EINVAL)) => return Err(SupportedStreamConfigsError::InvalidArgument),
322 Err((e, _)) => return Err(e.into()),
323 Ok(handle) => handle,
324 };
325
326 let hw_params = alsa::pcm::HwParams::any(handle)?;
327
328 const FORMATS: [(SampleFormat, alsa::pcm::Format); 8] = [
330 (SampleFormat::I8, alsa::pcm::Format::S8),
331 (SampleFormat::U8, alsa::pcm::Format::U8),
332 (SampleFormat::I16, alsa::pcm::Format::S16LE),
333 (SampleFormat::U16, alsa::pcm::Format::U16LE),
335 (SampleFormat::I32, alsa::pcm::Format::S32LE),
341 (SampleFormat::U32, alsa::pcm::Format::U32LE),
343 (SampleFormat::F32, alsa::pcm::Format::FloatLE),
345 (SampleFormat::F64, alsa::pcm::Format::Float64LE),
347 ];
369
370 let mut supported_formats = Vec::new();
371 for &(sample_format, alsa_format) in FORMATS.iter() {
372 if hw_params.test_format(alsa_format).is_ok() {
373 supported_formats.push(sample_format);
374 }
375 }
376
377 let min_rate = hw_params.get_rate_min()?;
378 let max_rate = hw_params.get_rate_max()?;
379
380 let sample_rates = if min_rate == max_rate || hw_params.test_rate(min_rate + 1).is_ok() {
381 vec![(min_rate, max_rate)]
382 } else {
383 const RATES: [libc::c_uint; 13] = [
384 5512, 8000, 11025, 16000, 22050, 32000, 44100, 48000, 64000, 88200, 96000, 176400,
385 192000,
386 ];
387
388 let mut rates = Vec::new();
389 for &rate in RATES.iter() {
390 if hw_params.test_rate(rate).is_ok() {
391 rates.push((rate, rate));
392 }
393 }
394
395 if rates.is_empty() {
396 vec![(min_rate, max_rate)]
397 } else {
398 rates
399 }
400 };
401
402 let min_channels = hw_params.get_channels_min()?;
403 let max_channels = hw_params.get_channels_max()?;
404
405 let max_channels = cmp::min(max_channels, 32); let supported_channels = (min_channels..max_channels + 1)
407 .filter_map(|num| {
408 if hw_params.test_channels(num).is_ok() {
409 Some(num as ChannelCount)
410 } else {
411 None
412 }
413 })
414 .collect::<Vec<_>>();
415
416 let min_buffer_size = hw_params.get_buffer_size_min()?;
417 let max_buffer_size = hw_params.get_buffer_size_max()?;
418
419 let buffer_size_range = SupportedBufferSize::Range {
420 min: min_buffer_size as u32,
421 max: max_buffer_size as u32,
422 };
423
424 let mut output = Vec::with_capacity(
425 supported_formats.len() * supported_channels.len() * sample_rates.len(),
426 );
427 for &sample_format in supported_formats.iter() {
428 for &channels in supported_channels.iter() {
429 for &(min_rate, max_rate) in sample_rates.iter() {
430 output.push(SupportedStreamConfigRange {
431 channels,
432 min_sample_rate: SampleRate(min_rate),
433 max_sample_rate: SampleRate(max_rate),
434 buffer_size: buffer_size_range,
435 sample_format,
436 });
437 }
438 }
439 }
440
441 Ok(output.into_iter())
442 }
443
444 fn supported_input_configs(
445 &self,
446 ) -> Result<SupportedInputConfigs, SupportedStreamConfigsError> {
447 self.supported_configs(alsa::Direction::Capture)
448 }
449
450 fn supported_output_configs(
451 &self,
452 ) -> Result<SupportedOutputConfigs, SupportedStreamConfigsError> {
453 self.supported_configs(alsa::Direction::Playback)
454 }
455
456 fn default_config(
459 &self,
460 stream_t: alsa::Direction,
461 ) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
462 let mut formats: Vec<_> = {
463 match self.supported_configs(stream_t) {
464 Err(SupportedStreamConfigsError::DeviceNotAvailable) => {
465 return Err(DefaultStreamConfigError::DeviceNotAvailable);
466 }
467 Err(SupportedStreamConfigsError::InvalidArgument) => {
468 return Err(DefaultStreamConfigError::StreamTypeNotSupported);
471 }
472 Err(SupportedStreamConfigsError::BackendSpecific { err }) => {
473 return Err(err.into());
474 }
475 Ok(fmts) => fmts.collect(),
476 }
477 };
478
479 formats.sort_by(|a, b| a.cmp_default_heuristics(b));
480
481 match formats.into_iter().next_back() {
482 Some(f) => {
483 let min_r = f.min_sample_rate;
484 let max_r = f.max_sample_rate;
485 let mut format = f.with_max_sample_rate();
486 const HZ_44100: SampleRate = SampleRate(44_100);
487 if min_r <= HZ_44100 && HZ_44100 <= max_r {
488 format.sample_rate = HZ_44100;
489 }
490 Ok(format)
491 }
492 None => Err(DefaultStreamConfigError::StreamTypeNotSupported),
493 }
494 }
495
496 fn default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
497 self.default_config(alsa::Direction::Capture)
498 }
499
500 fn default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
501 self.default_config(alsa::Direction::Playback)
502 }
503}
504
505struct StreamInner {
506 dropping: Cell<bool>,
509
510 channel: alsa::pcm::PCM,
512
513 num_descriptors: usize,
516
517 sample_format: SampleFormat,
519
520 conf: StreamConfig,
522
523 period_len: usize,
525
526 #[allow(dead_code)]
527 can_pause: bool,
530
531 creation_instant: Option<std::time::Instant>,
540}
541
542unsafe impl Sync for StreamInner {}
544
545#[derive(Debug, Eq, PartialEq)]
546enum StreamType {
547 Input,
548 Output,
549}
550
551pub struct Stream {
552 thread: Option<JoinHandle<()>>,
555
556 inner: Arc<StreamInner>,
558
559 trigger: TriggerSender,
561}
562
563struct StreamWorkerContext {
564 descriptors: Vec<libc::pollfd>,
565 buffer: Vec<u8>,
566 poll_timeout: i32,
567}
568
569impl StreamWorkerContext {
570 fn new(poll_timeout: &Option<Duration>) -> Self {
571 let poll_timeout: i32 = if let Some(d) = poll_timeout {
572 d.as_millis().try_into().unwrap()
573 } else {
574 -1
575 };
576
577 Self {
578 descriptors: Vec::new(),
579 buffer: Vec::new(),
580 poll_timeout,
581 }
582 }
583}
584
585fn input_stream_worker(
586 rx: TriggerReceiver,
587 stream: &StreamInner,
588 data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
589 error_callback: &mut (dyn FnMut(StreamError) + Send + 'static),
590 timeout: Option<Duration>,
591) {
592 boost_current_thread_priority(stream.conf.buffer_size, stream.conf.sample_rate);
593
594 let mut ctxt = StreamWorkerContext::new(&timeout);
595 loop {
596 let flow =
597 poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt).unwrap_or_else(|err| {
598 error_callback(err.into());
599 PollDescriptorsFlow::Continue
600 });
601
602 match flow {
603 PollDescriptorsFlow::Continue => {
604 continue;
605 }
606 PollDescriptorsFlow::XRun => {
607 if let Err(err) = stream.channel.prepare() {
608 error_callback(err.into());
609 }
610 continue;
611 }
612 PollDescriptorsFlow::Return => return,
613 PollDescriptorsFlow::Ready {
614 status,
615 avail_frames: _,
616 delay_frames,
617 stream_type,
618 } => {
619 assert_eq!(
620 stream_type,
621 StreamType::Input,
622 "expected input stream, but polling descriptors indicated output",
623 );
624 if let Err(err) = process_input(
625 stream,
626 &mut ctxt.buffer,
627 status,
628 delay_frames,
629 data_callback,
630 ) {
631 error_callback(err.into());
632 }
633 }
634 }
635 }
636}
637
638fn output_stream_worker(
639 rx: TriggerReceiver,
640 stream: &StreamInner,
641 data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
642 error_callback: &mut (dyn FnMut(StreamError) + Send + 'static),
643 timeout: Option<Duration>,
644) {
645 boost_current_thread_priority(stream.conf.buffer_size, stream.conf.sample_rate);
646
647 let mut ctxt = StreamWorkerContext::new(&timeout);
648 loop {
649 let flow =
650 poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt).unwrap_or_else(|err| {
651 error_callback(err.into());
652 PollDescriptorsFlow::Continue
653 });
654
655 match flow {
656 PollDescriptorsFlow::Continue => continue,
657 PollDescriptorsFlow::XRun => {
658 if let Err(err) = stream.channel.prepare() {
659 error_callback(err.into());
660 }
661 continue;
662 }
663 PollDescriptorsFlow::Return => return,
664 PollDescriptorsFlow::Ready {
665 status,
666 avail_frames,
667 delay_frames,
668 stream_type,
669 } => {
670 assert_eq!(
671 stream_type,
672 StreamType::Output,
673 "expected output stream, but polling descriptors indicated input",
674 );
675 if let Err(err) = process_output(
676 stream,
677 &mut ctxt.buffer,
678 status,
679 avail_frames,
680 delay_frames,
681 data_callback,
682 error_callback,
683 ) {
684 error_callback(err.into());
685 }
686 }
687 }
688 }
689}
690
691#[cfg(feature = "audio_thread_priority")]
692fn boost_current_thread_priority(buffer_size: BufferSize, sample_rate: SampleRate) {
693 use audio_thread_priority::promote_current_thread_to_real_time;
694
695 let buffer_size = if let BufferSize::Fixed(buffer_size) = buffer_size {
696 buffer_size
697 } else {
698 0
700 };
701
702 if let Err(err) = promote_current_thread_to_real_time(buffer_size, sample_rate.0) {
703 eprintln!("Failed to promote audio thread to real-time priority: {err}");
704 }
705}
706
707#[cfg(not(feature = "audio_thread_priority"))]
708fn boost_current_thread_priority(_: BufferSize, _: SampleRate) {}
709
710enum PollDescriptorsFlow {
711 Continue,
712 Return,
713 Ready {
714 stream_type: StreamType,
715 status: alsa::pcm::Status,
716 avail_frames: usize,
717 delay_frames: usize,
718 },
719 XRun,
720}
721
722fn poll_descriptors_and_prepare_buffer(
724 rx: &TriggerReceiver,
725 stream: &StreamInner,
726 ctxt: &mut StreamWorkerContext,
727) -> Result<PollDescriptorsFlow, BackendSpecificError> {
728 if stream.dropping.get() {
729 rx.clear_pipe();
731 return Ok(PollDescriptorsFlow::Return);
732 }
733
734 let StreamWorkerContext {
735 ref mut descriptors,
736 ref mut buffer,
737 ref poll_timeout,
738 } = *ctxt;
739
740 descriptors.clear();
741
742 descriptors.push(libc::pollfd {
744 fd: rx.0,
745 events: libc::POLLIN,
746 revents: 0,
747 });
748
749 let len = descriptors.len();
751 descriptors.resize(
752 stream.num_descriptors + len,
753 libc::pollfd {
754 fd: 0,
755 events: 0,
756 revents: 0,
757 },
758 );
759 let filled = stream.channel.fill(&mut descriptors[len..])?;
760 debug_assert_eq!(filled, stream.num_descriptors);
761
762 let res = alsa::poll::poll(descriptors, *poll_timeout)?;
764 if res == 0 {
765 let description = String::from("`alsa::poll()` spuriously returned");
766 return Err(BackendSpecificError { description });
767 }
768
769 if descriptors[0].revents != 0 {
770 rx.clear_pipe();
772 return Ok(PollDescriptorsFlow::Return);
773 }
774
775 let revents = stream.channel.revents(&descriptors[1..])?;
776 if revents.contains(alsa::poll::Flags::ERR) {
777 let description = String::from("`alsa::poll()` returned POLLERR");
778 return Err(BackendSpecificError { description });
779 }
780 let stream_type = match revents {
781 alsa::poll::Flags::OUT => StreamType::Output,
782 alsa::poll::Flags::IN => StreamType::Input,
783 _ => {
784 return Ok(PollDescriptorsFlow::Continue);
786 }
787 };
788
789 let status = stream.channel.status()?;
790 let avail_frames = match stream.channel.avail() {
791 Err(err) if err.errno() == libc::EPIPE => return Ok(PollDescriptorsFlow::XRun),
792 res => res,
793 }? as usize;
794 let delay_frames = match status.get_delay() {
795 d if d < 0 => 0,
797 d => d as usize,
798 };
799 let available_samples = avail_frames * stream.conf.channels as usize;
800
801 if available_samples < stream.period_len {
803 return Ok(PollDescriptorsFlow::Continue);
804 }
805
806 let buffer_size = stream.sample_format.sample_size() * available_samples;
808 buffer.resize(buffer_size, 0u8);
809
810 Ok(PollDescriptorsFlow::Ready {
811 stream_type,
812 status,
813 avail_frames,
814 delay_frames,
815 })
816}
817
818fn process_input(
820 stream: &StreamInner,
821 buffer: &mut [u8],
822 status: alsa::pcm::Status,
823 delay_frames: usize,
824 data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
825) -> Result<(), BackendSpecificError> {
826 stream.channel.io_bytes().readi(buffer)?;
827 let sample_format = stream.sample_format;
828 let data = buffer.as_mut_ptr() as *mut ();
829 let len = buffer.len() / sample_format.sample_size();
830 let data = unsafe { Data::from_parts(data, len, sample_format) };
831 let callback = stream_timestamp(&status, stream.creation_instant)?;
832 let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
833 let capture = callback
834 .sub(delay_duration)
835 .expect("`capture` is earlier than representation supported by `StreamInstant`");
836 let timestamp = crate::InputStreamTimestamp { callback, capture };
837 let info = crate::InputCallbackInfo { timestamp };
838 data_callback(&data, &info);
839
840 Ok(())
841}
842
843fn process_output(
847 stream: &StreamInner,
848 buffer: &mut [u8],
849 status: alsa::pcm::Status,
850 available_frames: usize,
851 delay_frames: usize,
852 data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
853 error_callback: &mut dyn FnMut(StreamError),
854) -> Result<(), BackendSpecificError> {
855 {
856 let sample_format = stream.sample_format;
858 let data = buffer.as_mut_ptr() as *mut ();
859 let len = buffer.len() / sample_format.sample_size();
860 let mut data = unsafe { Data::from_parts(data, len, sample_format) };
861 let callback = stream_timestamp(&status, stream.creation_instant)?;
862 let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
863 let playback = callback
864 .add(delay_duration)
865 .expect("`playback` occurs beyond representation supported by `StreamInstant`");
866 let timestamp = crate::OutputStreamTimestamp { callback, playback };
867 let info = crate::OutputCallbackInfo { timestamp };
868 data_callback(&mut data, &info);
869 }
870 loop {
871 match stream.channel.io_bytes().writei(buffer) {
872 Err(err) if err.errno() == libc::EPIPE => {
873 let _ = stream.channel.try_recover(err, false);
876 }
877 Err(err) => {
878 error_callback(err.into());
879 continue;
880 }
881 Ok(result) if result != available_frames => {
882 let description = format!(
883 "unexpected number of frames written: expected {}, \
884 result {} (this should never happen)",
885 available_frames, result,
886 );
887 error_callback(BackendSpecificError { description }.into());
888 continue;
889 }
890 _ => {
891 break;
892 }
893 }
894 }
895 Ok(())
896}
897
898fn stream_timestamp(
902 status: &alsa::pcm::Status,
903 creation_instant: Option<std::time::Instant>,
904) -> Result<crate::StreamInstant, BackendSpecificError> {
905 match creation_instant {
906 None => {
907 let trigger_ts = status.get_trigger_htstamp();
908 let ts = status.get_htstamp();
909 let nanos = timespec_diff_nanos(ts, trigger_ts);
910 if nanos < 0 {
911 let description = format!(
912 "get_htstamp `{}.{}` was earlier than get_trigger_htstamp `{}.{}`",
913 ts.tv_sec, ts.tv_nsec, trigger_ts.tv_sec, trigger_ts.tv_nsec
914 );
915 return Err(BackendSpecificError { description });
916 }
917 Ok(crate::StreamInstant::from_nanos(nanos))
918 }
919 Some(creation) => {
920 let now = std::time::Instant::now();
921 let duration = now.duration_since(creation);
922 crate::StreamInstant::from_nanos_i128(duration.as_nanos() as i128).ok_or(
923 BackendSpecificError {
924 description: "stream duration has exceeded `StreamInstant` representation"
925 .to_string(),
926 },
927 )
928 }
929 }
930}
931
932fn timespec_to_nanos(ts: libc::timespec) -> i64 {
935 ts.tv_sec as i64 * 1_000_000_000 + ts.tv_nsec as i64
936}
937
938fn timespec_diff_nanos(a: libc::timespec, b: libc::timespec) -> i64 {
941 timespec_to_nanos(a) - timespec_to_nanos(b)
942}
943
944fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration {
946 let secsf = frames as f64 / rate.0 as f64;
947 let secs = secsf as u64;
948 let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32;
949 std::time::Duration::new(secs, nanos)
950}
951
952impl Stream {
953 fn new_input<D, E>(
954 inner: Arc<StreamInner>,
955 mut data_callback: D,
956 mut error_callback: E,
957 timeout: Option<Duration>,
958 ) -> Stream
959 where
960 D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
961 E: FnMut(StreamError) + Send + 'static,
962 {
963 let (tx, rx) = trigger();
964 let stream = inner.clone();
966 let thread = thread::Builder::new()
967 .name("cpal_alsa_in".to_owned())
968 .spawn(move || {
969 input_stream_worker(
970 rx,
971 &stream,
972 &mut data_callback,
973 &mut error_callback,
974 timeout,
975 );
976 })
977 .unwrap();
978 Stream {
979 thread: Some(thread),
980 inner,
981 trigger: tx,
982 }
983 }
984
985 fn new_output<D, E>(
986 inner: Arc<StreamInner>,
987 mut data_callback: D,
988 mut error_callback: E,
989 timeout: Option<Duration>,
990 ) -> Stream
991 where
992 D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
993 E: FnMut(StreamError) + Send + 'static,
994 {
995 let (tx, rx) = trigger();
996 let stream = inner.clone();
998 let thread = thread::Builder::new()
999 .name("cpal_alsa_out".to_owned())
1000 .spawn(move || {
1001 output_stream_worker(
1002 rx,
1003 &stream,
1004 &mut data_callback,
1005 &mut error_callback,
1006 timeout,
1007 );
1008 })
1009 .unwrap();
1010 Stream {
1011 thread: Some(thread),
1012 inner,
1013 trigger: tx,
1014 }
1015 }
1016}
1017
1018impl Drop for Stream {
1019 fn drop(&mut self) {
1020 self.inner.dropping.set(true);
1021 self.trigger.wakeup();
1022 self.thread.take().unwrap().join().unwrap();
1023 }
1024}
1025
1026impl StreamTrait for Stream {
1027 fn play(&self) -> Result<(), PlayStreamError> {
1028 self.inner.channel.pause(false).ok();
1029 Ok(())
1030 }
1031 fn pause(&self) -> Result<(), PauseStreamError> {
1032 self.inner.channel.pause(true).ok();
1033 Ok(())
1034 }
1035}
1036
1037fn set_hw_params_from_format(
1038 pcm_handle: &alsa::pcm::PCM,
1039 config: &StreamConfig,
1040 sample_format: SampleFormat,
1041) -> Result<bool, BackendSpecificError> {
1042 let hw_params = alsa::pcm::HwParams::any(pcm_handle)?;
1043 hw_params.set_access(alsa::pcm::Access::RWInterleaved)?;
1044
1045 let sample_format = if cfg!(target_endian = "big") {
1046 match sample_format {
1047 SampleFormat::I8 => alsa::pcm::Format::S8,
1048 SampleFormat::I16 => alsa::pcm::Format::S16BE,
1049 SampleFormat::I32 => alsa::pcm::Format::S32BE,
1051 SampleFormat::U8 => alsa::pcm::Format::U8,
1054 SampleFormat::U16 => alsa::pcm::Format::U16BE,
1055 SampleFormat::U32 => alsa::pcm::Format::U32BE,
1057 SampleFormat::F32 => alsa::pcm::Format::FloatBE,
1060 SampleFormat::F64 => alsa::pcm::Format::Float64BE,
1061 sample_format => {
1062 return Err(BackendSpecificError {
1063 description: format!(
1064 "Sample format '{}' is not supported by this backend",
1065 sample_format
1066 ),
1067 })
1068 }
1069 }
1070 } else {
1071 match sample_format {
1072 SampleFormat::I8 => alsa::pcm::Format::S8,
1073 SampleFormat::I16 => alsa::pcm::Format::S16LE,
1074 SampleFormat::I32 => alsa::pcm::Format::S32LE,
1076 SampleFormat::U8 => alsa::pcm::Format::U8,
1079 SampleFormat::U16 => alsa::pcm::Format::U16LE,
1080 SampleFormat::U32 => alsa::pcm::Format::U32LE,
1082 SampleFormat::F32 => alsa::pcm::Format::FloatLE,
1085 SampleFormat::F64 => alsa::pcm::Format::Float64LE,
1086 sample_format => {
1087 return Err(BackendSpecificError {
1088 description: format!(
1089 "Sample format '{}' is not supported by this backend",
1090 sample_format
1091 ),
1092 })
1093 }
1094 }
1095 };
1096
1097 hw_params.set_format(sample_format)?;
1098 hw_params.set_rate(config.sample_rate.0, alsa::ValueOr::Nearest)?;
1099 hw_params.set_channels(config.channels as u32)?;
1100
1101 match config.buffer_size {
1102 BufferSize::Fixed(v) => {
1103 hw_params.set_period_size_near((v / 4) as alsa::pcm::Frames, alsa::ValueOr::Nearest)?;
1104 hw_params.set_buffer_size(v as alsa::pcm::Frames)?;
1105 }
1106 BufferSize::Default => {
1107 hw_params.set_period_time_near(25_000, alsa::ValueOr::Nearest)?;
1110 hw_params.set_buffer_time_near(100_000, alsa::ValueOr::Nearest)?;
1111 }
1112 }
1113
1114 pcm_handle.hw_params(&hw_params)?;
1115
1116 Ok(hw_params.can_pause())
1117}
1118
1119fn set_sw_params_from_format(
1120 pcm_handle: &alsa::pcm::PCM,
1121 config: &StreamConfig,
1122 stream_type: alsa::Direction,
1123) -> Result<usize, BackendSpecificError> {
1124 let sw_params = pcm_handle.sw_params_current()?;
1125
1126 let period_len = {
1127 let (buffer, period) = pcm_handle.get_params()?;
1128 if buffer == 0 {
1129 return Err(BackendSpecificError {
1130 description: "initialization resulted in a null buffer".to_string(),
1131 });
1132 }
1133 sw_params.set_avail_min(period as alsa::pcm::Frames)?;
1134
1135 let start_threshold = match stream_type {
1136 alsa::Direction::Playback => buffer - period,
1137
1138 alsa::Direction::Capture => 1,
1143 };
1144 sw_params.set_start_threshold(start_threshold.try_into().unwrap())?;
1145
1146 period as usize * config.channels as usize
1147 };
1148
1149 sw_params.set_tstamp_mode(true)?;
1150 sw_params.set_tstamp_type(alsa::pcm::TstampType::MonotonicRaw)?;
1151
1152 if pcm_handle.sw_params(&sw_params).is_err() {
1156 sw_params.set_tstamp_type(alsa::pcm::TstampType::Monotonic)?;
1157 pcm_handle.sw_params(&sw_params)?;
1158 }
1159
1160 Ok(period_len)
1161}
1162
1163impl From<alsa::Error> for BackendSpecificError {
1164 fn from(err: alsa::Error) -> Self {
1165 BackendSpecificError {
1166 description: err.to_string(),
1167 }
1168 }
1169}
1170
1171impl From<alsa::Error> for BuildStreamError {
1172 fn from(err: alsa::Error) -> Self {
1173 let err: BackendSpecificError = err.into();
1174 err.into()
1175 }
1176}
1177
1178impl From<alsa::Error> for SupportedStreamConfigsError {
1179 fn from(err: alsa::Error) -> Self {
1180 let err: BackendSpecificError = err.into();
1181 err.into()
1182 }
1183}
1184
1185impl From<alsa::Error> for PlayStreamError {
1186 fn from(err: alsa::Error) -> Self {
1187 let err: BackendSpecificError = err.into();
1188 err.into()
1189 }
1190}
1191
1192impl From<alsa::Error> for PauseStreamError {
1193 fn from(err: alsa::Error) -> Self {
1194 let err: BackendSpecificError = err.into();
1195 err.into()
1196 }
1197}
1198
1199impl From<alsa::Error> for StreamError {
1200 fn from(err: alsa::Error) -> Self {
1201 let err: BackendSpecificError = err.into();
1202 err.into()
1203 }
1204}