tokio/runtime/builder.rs
1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
5#[cfg(tokio_unstable)]
6use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
7use crate::util::rand::{RngSeed, RngSeedGenerator};
8
9use crate::runtime::blocking::BlockingPool;
10use crate::runtime::scheduler::CurrentThread;
11use std::fmt;
12use std::io;
13use std::thread::ThreadId;
14use std::time::Duration;
15
16/// Builds Tokio Runtime with custom configuration values.
17///
18/// Methods can be chained in order to set the configuration values. The
19/// Runtime is constructed by calling [`build`].
20///
21/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
22/// or [`Builder::new_current_thread`].
23///
24/// See function level documentation for details on the various configuration
25/// settings.
26///
27/// [`build`]: method@Self::build
28/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
29/// [`Builder::new_current_thread`]: method@Self::new_current_thread
30///
31/// # Examples
32///
33/// ```
34/// use tokio::runtime::Builder;
35///
36/// fn main() {
37/// // build runtime
38/// let runtime = Builder::new_multi_thread()
39/// .worker_threads(4)
40/// .thread_name("my-custom-name")
41/// .thread_stack_size(3 * 1024 * 1024)
42/// .build()
43/// .unwrap();
44///
45/// // use runtime ...
46/// }
47/// ```
48pub struct Builder {
49 /// Runtime type
50 kind: Kind,
51
52 /// Whether or not to enable the I/O driver
53 enable_io: bool,
54 nevents: usize,
55
56 /// Whether or not to enable the time driver
57 enable_time: bool,
58
59 /// Whether or not the clock should start paused.
60 start_paused: bool,
61
62 /// The number of worker threads, used by Runtime.
63 ///
64 /// Only used when not using the current-thread executor.
65 worker_threads: Option<usize>,
66
67 /// Cap on thread usage.
68 max_blocking_threads: usize,
69
70 /// Name fn used for threads spawned by the runtime.
71 pub(super) thread_name: ThreadNameFn,
72
73 /// Stack size used for threads spawned by the runtime.
74 pub(super) thread_stack_size: Option<usize>,
75
76 /// Callback to run after each thread starts.
77 pub(super) after_start: Option<Callback>,
78
79 /// To run before each worker thread stops
80 pub(super) before_stop: Option<Callback>,
81
82 /// To run before each worker thread is parked.
83 pub(super) before_park: Option<Callback>,
84
85 /// To run after each thread is unparked.
86 pub(super) after_unpark: Option<Callback>,
87
88 /// To run before each task is spawned.
89 pub(super) before_spawn: Option<TaskCallback>,
90
91 /// To run before each poll
92 #[cfg(tokio_unstable)]
93 pub(super) before_poll: Option<TaskCallback>,
94
95 /// To run after each poll
96 #[cfg(tokio_unstable)]
97 pub(super) after_poll: Option<TaskCallback>,
98
99 /// To run after each task is terminated.
100 pub(super) after_termination: Option<TaskCallback>,
101
102 /// Customizable keep alive timeout for `BlockingPool`
103 pub(super) keep_alive: Option<Duration>,
104
105 /// How many ticks before pulling a task from the global/remote queue?
106 ///
107 /// When `None`, the value is unspecified and behavior details are left to
108 /// the scheduler. Each scheduler flavor could choose to either pick its own
109 /// default value or use some other strategy to decide when to poll from the
110 /// global queue. For example, the multi-threaded scheduler uses a
111 /// self-tuning strategy based on mean task poll times.
112 pub(super) global_queue_interval: Option<u32>,
113
114 /// How many ticks before yielding to the driver for timer and I/O events?
115 pub(super) event_interval: u32,
116
117 /// When true, the multi-threade scheduler LIFO slot should not be used.
118 ///
119 /// This option should only be exposed as unstable.
120 pub(super) disable_lifo_slot: bool,
121
122 /// Specify a random number generator seed to provide deterministic results
123 pub(super) seed_generator: RngSeedGenerator,
124
125 /// When true, enables task poll count histogram instrumentation.
126 pub(super) metrics_poll_count_histogram_enable: bool,
127
128 /// Configures the task poll count histogram
129 pub(super) metrics_poll_count_histogram: HistogramBuilder,
130
131 #[cfg(tokio_unstable)]
132 pub(super) unhandled_panic: UnhandledPanic,
133}
134
135cfg_unstable! {
136 /// How the runtime should respond to unhandled panics.
137 ///
138 /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
139 /// to configure the runtime behavior when a spawned task panics.
140 ///
141 /// See [`Builder::unhandled_panic`] for more details.
142 #[derive(Debug, Clone)]
143 #[non_exhaustive]
144 pub enum UnhandledPanic {
145 /// The runtime should ignore panics on spawned tasks.
146 ///
147 /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
148 /// tasks continue running normally.
149 ///
150 /// This is the default behavior.
151 ///
152 /// # Examples
153 ///
154 /// ```
155 /// use tokio::runtime::{self, UnhandledPanic};
156 ///
157 /// # pub fn main() {
158 /// let rt = runtime::Builder::new_current_thread()
159 /// .unhandled_panic(UnhandledPanic::Ignore)
160 /// .build()
161 /// .unwrap();
162 ///
163 /// let task1 = rt.spawn(async { panic!("boom"); });
164 /// let task2 = rt.spawn(async {
165 /// // This task completes normally
166 /// "done"
167 /// });
168 ///
169 /// rt.block_on(async {
170 /// // The panic on the first task is forwarded to the `JoinHandle`
171 /// assert!(task1.await.is_err());
172 ///
173 /// // The second task completes normally
174 /// assert!(task2.await.is_ok());
175 /// })
176 /// # }
177 /// ```
178 ///
179 /// [`JoinHandle`]: struct@crate::task::JoinHandle
180 Ignore,
181
182 /// The runtime should immediately shutdown if a spawned task panics.
183 ///
184 /// The runtime will immediately shutdown even if the panicked task's
185 /// [`JoinHandle`] is still available. All further spawned tasks will be
186 /// immediately dropped and call to [`Runtime::block_on`] will panic.
187 ///
188 /// # Examples
189 ///
190 /// ```should_panic
191 /// use tokio::runtime::{self, UnhandledPanic};
192 ///
193 /// # pub fn main() {
194 /// let rt = runtime::Builder::new_current_thread()
195 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
196 /// .build()
197 /// .unwrap();
198 ///
199 /// rt.spawn(async { panic!("boom"); });
200 /// rt.spawn(async {
201 /// // This task never completes.
202 /// });
203 ///
204 /// rt.block_on(async {
205 /// // Do some work
206 /// # loop { tokio::task::yield_now().await; }
207 /// })
208 /// # }
209 /// ```
210 ///
211 /// [`JoinHandle`]: struct@crate::task::JoinHandle
212 ShutdownRuntime,
213 }
214}
215
216pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
217
218#[derive(Clone, Copy)]
219pub(crate) enum Kind {
220 CurrentThread,
221 #[cfg(feature = "rt-multi-thread")]
222 MultiThread,
223}
224
225impl Builder {
226 /// Returns a new builder with the current thread scheduler selected.
227 ///
228 /// Configuration methods can be chained on the return value.
229 ///
230 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
231 /// [`LocalSet`].
232 ///
233 /// [`LocalSet`]: crate::task::LocalSet
234 pub fn new_current_thread() -> Builder {
235 #[cfg(loom)]
236 const EVENT_INTERVAL: u32 = 4;
237 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
238 #[cfg(not(loom))]
239 const EVENT_INTERVAL: u32 = 61;
240
241 Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
242 }
243
244 /// Returns a new builder with the multi thread scheduler selected.
245 ///
246 /// Configuration methods can be chained on the return value.
247 #[cfg(feature = "rt-multi-thread")]
248 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
249 pub fn new_multi_thread() -> Builder {
250 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
251 Builder::new(Kind::MultiThread, 61)
252 }
253
254 /// Returns a new runtime builder initialized with default configuration
255 /// values.
256 ///
257 /// Configuration methods can be chained on the return value.
258 pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
259 Builder {
260 kind,
261
262 // I/O defaults to "off"
263 enable_io: false,
264 nevents: 1024,
265
266 // Time defaults to "off"
267 enable_time: false,
268
269 // The clock starts not-paused
270 start_paused: false,
271
272 // Read from environment variable first in multi-threaded mode.
273 // Default to lazy auto-detection (one thread per CPU core)
274 worker_threads: None,
275
276 max_blocking_threads: 512,
277
278 // Default thread name
279 thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
280
281 // Do not set a stack size by default
282 thread_stack_size: None,
283
284 // No worker thread callbacks
285 after_start: None,
286 before_stop: None,
287 before_park: None,
288 after_unpark: None,
289
290 before_spawn: None,
291 after_termination: None,
292
293 #[cfg(tokio_unstable)]
294 before_poll: None,
295 #[cfg(tokio_unstable)]
296 after_poll: None,
297
298 keep_alive: None,
299
300 // Defaults for these values depend on the scheduler kind, so we get them
301 // as parameters.
302 global_queue_interval: None,
303 event_interval,
304
305 seed_generator: RngSeedGenerator::new(RngSeed::new()),
306
307 #[cfg(tokio_unstable)]
308 unhandled_panic: UnhandledPanic::Ignore,
309
310 metrics_poll_count_histogram_enable: false,
311
312 metrics_poll_count_histogram: HistogramBuilder::default(),
313
314 disable_lifo_slot: false,
315 }
316 }
317
318 /// Enables both I/O and time drivers.
319 ///
320 /// Doing this is a shorthand for calling `enable_io` and `enable_time`
321 /// individually. If additional components are added to Tokio in the future,
322 /// `enable_all` will include these future components.
323 ///
324 /// # Examples
325 ///
326 /// ```
327 /// use tokio::runtime;
328 ///
329 /// let rt = runtime::Builder::new_multi_thread()
330 /// .enable_all()
331 /// .build()
332 /// .unwrap();
333 /// ```
334 pub fn enable_all(&mut self) -> &mut Self {
335 #[cfg(any(
336 feature = "net",
337 all(unix, feature = "process"),
338 all(unix, feature = "signal")
339 ))]
340 self.enable_io();
341 #[cfg(feature = "time")]
342 self.enable_time();
343
344 self
345 }
346
347 /// Sets the number of worker threads the `Runtime` will use.
348 ///
349 /// This can be any number above 0 though it is advised to keep this value
350 /// on the smaller side.
351 ///
352 /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
353 ///
354 /// # Default
355 ///
356 /// The default value is the number of cores available to the system.
357 ///
358 /// When using the `current_thread` runtime this method has no effect.
359 ///
360 /// # Examples
361 ///
362 /// ## Multi threaded runtime with 4 threads
363 ///
364 /// ```
365 /// use tokio::runtime;
366 ///
367 /// // This will spawn a work-stealing runtime with 4 worker threads.
368 /// let rt = runtime::Builder::new_multi_thread()
369 /// .worker_threads(4)
370 /// .build()
371 /// .unwrap();
372 ///
373 /// rt.spawn(async move {});
374 /// ```
375 ///
376 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
377 ///
378 /// ```
379 /// use tokio::runtime;
380 ///
381 /// // Create a runtime that _must_ be driven from a call
382 /// // to `Runtime::block_on`.
383 /// let rt = runtime::Builder::new_current_thread()
384 /// .build()
385 /// .unwrap();
386 ///
387 /// // This will run the runtime and future on the current thread
388 /// rt.block_on(async move {});
389 /// ```
390 ///
391 /// # Panics
392 ///
393 /// This will panic if `val` is not larger than `0`.
394 #[track_caller]
395 pub fn worker_threads(&mut self, val: usize) -> &mut Self {
396 assert!(val > 0, "Worker threads cannot be set to 0");
397 self.worker_threads = Some(val);
398 self
399 }
400
401 /// Specifies the limit for additional threads spawned by the Runtime.
402 ///
403 /// These threads are used for blocking operations like tasks spawned
404 /// through [`spawn_blocking`], this includes but is not limited to:
405 /// - [`fs`] operations
406 /// - dns resolution through [`ToSocketAddrs`]
407 /// - writing to [`Stdout`] or [`Stderr`]
408 /// - reading from [`Stdin`]
409 ///
410 /// Unlike the [`worker_threads`], they are not always active and will exit
411 /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
412 ///
413 /// It's recommended to not set this limit too low in order to avoid hanging on operations
414 /// requiring [`spawn_blocking`].
415 ///
416 /// The default value is 512.
417 ///
418 /// # Queue Behavior
419 ///
420 /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
421 /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
422 /// method has not been reached, a new thread will be spawned. If no idle thread is available
423 /// and no more threads are allowed to be spawned, the task will remain in the queue until one
424 /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
425 /// it could potentially grow unbounded.
426 ///
427 /// # Panics
428 ///
429 /// This will panic if `val` is not larger than `0`.
430 ///
431 /// # Upgrading from 0.x
432 ///
433 /// In old versions `max_threads` limited both blocking and worker threads, but the
434 /// current `max_blocking_threads` does not include async worker threads in the count.
435 ///
436 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
437 /// [`fs`]: mod@crate::fs
438 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
439 /// [`Stdout`]: struct@crate::io::Stdout
440 /// [`Stdin`]: struct@crate::io::Stdin
441 /// [`Stderr`]: struct@crate::io::Stderr
442 /// [`worker_threads`]: Self::worker_threads
443 /// [`thread_keep_alive`]: Self::thread_keep_alive
444 #[track_caller]
445 #[cfg_attr(docsrs, doc(alias = "max_threads"))]
446 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
447 assert!(val > 0, "Max blocking threads cannot be set to 0");
448 self.max_blocking_threads = val;
449 self
450 }
451
452 /// Sets name of threads spawned by the `Runtime`'s thread pool.
453 ///
454 /// The default name is "tokio-runtime-worker".
455 ///
456 /// # Examples
457 ///
458 /// ```
459 /// # use tokio::runtime;
460 ///
461 /// # pub fn main() {
462 /// let rt = runtime::Builder::new_multi_thread()
463 /// .thread_name("my-pool")
464 /// .build();
465 /// # }
466 /// ```
467 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
468 let val = val.into();
469 self.thread_name = std::sync::Arc::new(move || val.clone());
470 self
471 }
472
473 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
474 ///
475 /// The default name fn is `|| "tokio-runtime-worker".into()`.
476 ///
477 /// # Examples
478 ///
479 /// ```
480 /// # use tokio::runtime;
481 /// # use std::sync::atomic::{AtomicUsize, Ordering};
482 /// # pub fn main() {
483 /// let rt = runtime::Builder::new_multi_thread()
484 /// .thread_name_fn(|| {
485 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
486 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
487 /// format!("my-pool-{}", id)
488 /// })
489 /// .build();
490 /// # }
491 /// ```
492 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
493 where
494 F: Fn() -> String + Send + Sync + 'static,
495 {
496 self.thread_name = std::sync::Arc::new(f);
497 self
498 }
499
500 /// Sets the stack size (in bytes) for worker threads.
501 ///
502 /// The actual stack size may be greater than this value if the platform
503 /// specifies minimal stack size.
504 ///
505 /// The default stack size for spawned threads is 2 MiB, though this
506 /// particular stack size is subject to change in the future.
507 ///
508 /// # Examples
509 ///
510 /// ```
511 /// # use tokio::runtime;
512 ///
513 /// # pub fn main() {
514 /// let rt = runtime::Builder::new_multi_thread()
515 /// .thread_stack_size(32 * 1024)
516 /// .build();
517 /// # }
518 /// ```
519 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
520 self.thread_stack_size = Some(val);
521 self
522 }
523
524 /// Executes function `f` after each thread is started but before it starts
525 /// doing work.
526 ///
527 /// This is intended for bookkeeping and monitoring use cases.
528 ///
529 /// # Examples
530 ///
531 /// ```
532 /// # use tokio::runtime;
533 /// # pub fn main() {
534 /// let runtime = runtime::Builder::new_multi_thread()
535 /// .on_thread_start(|| {
536 /// println!("thread started");
537 /// })
538 /// .build();
539 /// # }
540 /// ```
541 #[cfg(not(loom))]
542 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
543 where
544 F: Fn() + Send + Sync + 'static,
545 {
546 self.after_start = Some(std::sync::Arc::new(f));
547 self
548 }
549
550 /// Executes function `f` before each thread stops.
551 ///
552 /// This is intended for bookkeeping and monitoring use cases.
553 ///
554 /// # Examples
555 ///
556 /// ```
557 /// # use tokio::runtime;
558 /// # pub fn main() {
559 /// let runtime = runtime::Builder::new_multi_thread()
560 /// .on_thread_stop(|| {
561 /// println!("thread stopping");
562 /// })
563 /// .build();
564 /// # }
565 /// ```
566 #[cfg(not(loom))]
567 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
568 where
569 F: Fn() + Send + Sync + 'static,
570 {
571 self.before_stop = Some(std::sync::Arc::new(f));
572 self
573 }
574
575 /// Executes function `f` just before a thread is parked (goes idle).
576 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
577 /// can be called, and may result in this thread being unparked immediately.
578 ///
579 /// This can be used to start work only when the executor is idle, or for bookkeeping
580 /// and monitoring purposes.
581 ///
582 /// Note: There can only be one park callback for a runtime; calling this function
583 /// more than once replaces the last callback defined, rather than adding to it.
584 ///
585 /// # Examples
586 ///
587 /// ## Multithreaded executor
588 /// ```
589 /// # use std::sync::Arc;
590 /// # use std::sync::atomic::{AtomicBool, Ordering};
591 /// # use tokio::runtime;
592 /// # use tokio::sync::Barrier;
593 /// # pub fn main() {
594 /// let once = AtomicBool::new(true);
595 /// let barrier = Arc::new(Barrier::new(2));
596 ///
597 /// let runtime = runtime::Builder::new_multi_thread()
598 /// .worker_threads(1)
599 /// .on_thread_park({
600 /// let barrier = barrier.clone();
601 /// move || {
602 /// let barrier = barrier.clone();
603 /// if once.swap(false, Ordering::Relaxed) {
604 /// tokio::spawn(async move { barrier.wait().await; });
605 /// }
606 /// }
607 /// })
608 /// .build()
609 /// .unwrap();
610 ///
611 /// runtime.block_on(async {
612 /// barrier.wait().await;
613 /// })
614 /// # }
615 /// ```
616 /// ## Current thread executor
617 /// ```
618 /// # use std::sync::Arc;
619 /// # use std::sync::atomic::{AtomicBool, Ordering};
620 /// # use tokio::runtime;
621 /// # use tokio::sync::Barrier;
622 /// # pub fn main() {
623 /// let once = AtomicBool::new(true);
624 /// let barrier = Arc::new(Barrier::new(2));
625 ///
626 /// let runtime = runtime::Builder::new_current_thread()
627 /// .on_thread_park({
628 /// let barrier = barrier.clone();
629 /// move || {
630 /// let barrier = barrier.clone();
631 /// if once.swap(false, Ordering::Relaxed) {
632 /// tokio::spawn(async move { barrier.wait().await; });
633 /// }
634 /// }
635 /// })
636 /// .build()
637 /// .unwrap();
638 ///
639 /// runtime.block_on(async {
640 /// barrier.wait().await;
641 /// })
642 /// # }
643 /// ```
644 #[cfg(not(loom))]
645 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
646 where
647 F: Fn() + Send + Sync + 'static,
648 {
649 self.before_park = Some(std::sync::Arc::new(f));
650 self
651 }
652
653 /// Executes function `f` just after a thread unparks (starts executing tasks).
654 ///
655 /// This is intended for bookkeeping and monitoring use cases; note that work
656 /// in this callback will increase latencies when the application has allowed one or
657 /// more runtime threads to go idle.
658 ///
659 /// Note: There can only be one unpark callback for a runtime; calling this function
660 /// more than once replaces the last callback defined, rather than adding to it.
661 ///
662 /// # Examples
663 ///
664 /// ```
665 /// # use tokio::runtime;
666 /// # pub fn main() {
667 /// let runtime = runtime::Builder::new_multi_thread()
668 /// .on_thread_unpark(|| {
669 /// println!("thread unparking");
670 /// })
671 /// .build();
672 ///
673 /// runtime.unwrap().block_on(async {
674 /// tokio::task::yield_now().await;
675 /// println!("Hello from Tokio!");
676 /// })
677 /// # }
678 /// ```
679 #[cfg(not(loom))]
680 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
681 where
682 F: Fn() + Send + Sync + 'static,
683 {
684 self.after_unpark = Some(std::sync::Arc::new(f));
685 self
686 }
687
688 /// Executes function `f` just before a task is spawned.
689 ///
690 /// `f` is called within the Tokio context, so functions like
691 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
692 /// invoked immediately.
693 ///
694 /// This can be used for bookkeeping or monitoring purposes.
695 ///
696 /// Note: There can only be one spawn callback for a runtime; calling this function more
697 /// than once replaces the last callback defined, rather than adding to it.
698 ///
699 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
700 ///
701 /// **Note**: This is an [unstable API][unstable]. The public API of this type
702 /// may break in 1.x releases. See [the documentation on unstable
703 /// features][unstable] for details.
704 ///
705 /// [unstable]: crate#unstable-features
706 ///
707 /// # Examples
708 ///
709 /// ```
710 /// # use tokio::runtime;
711 /// # pub fn main() {
712 /// let runtime = runtime::Builder::new_current_thread()
713 /// .on_task_spawn(|_| {
714 /// println!("spawning task");
715 /// })
716 /// .build()
717 /// .unwrap();
718 ///
719 /// runtime.block_on(async {
720 /// tokio::task::spawn(std::future::ready(()));
721 ///
722 /// for _ in 0..64 {
723 /// tokio::task::yield_now().await;
724 /// }
725 /// })
726 /// # }
727 /// ```
728 #[cfg(all(not(loom), tokio_unstable))]
729 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
730 pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
731 where
732 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
733 {
734 self.before_spawn = Some(std::sync::Arc::new(f));
735 self
736 }
737
738 /// Executes function `f` just before a task is polled
739 ///
740 /// `f` is called within the Tokio context, so functions like
741 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
742 /// invoked immediately.
743 ///
744 /// **Note**: This is an [unstable API][unstable]. The public API of this type
745 /// may break in 1.x releases. See [the documentation on unstable
746 /// features][unstable] for details.
747 ///
748 /// [unstable]: crate#unstable-features
749 ///
750 /// # Examples
751 ///
752 /// ```
753 /// # use std::sync::{atomic::AtomicUsize, Arc};
754 /// # use tokio::task::yield_now;
755 /// # pub fn main() {
756 /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
757 /// let poll_start = poll_start_counter.clone();
758 /// let rt = tokio::runtime::Builder::new_multi_thread()
759 /// .enable_all()
760 /// .on_before_task_poll(move |meta| {
761 /// println!("task {} is about to be polled", meta.id())
762 /// })
763 /// .build()
764 /// .unwrap();
765 /// let task = rt.spawn(async {
766 /// yield_now().await;
767 /// });
768 /// let _ = rt.block_on(task);
769 ///
770 /// # }
771 /// ```
772 #[cfg(tokio_unstable)]
773 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
774 pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
775 where
776 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
777 {
778 self.before_poll = Some(std::sync::Arc::new(f));
779 self
780 }
781
782 /// Executes function `f` just after a task is polled
783 ///
784 /// `f` is called within the Tokio context, so functions like
785 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
786 /// invoked immediately.
787 ///
788 /// **Note**: This is an [unstable API][unstable]. The public API of this type
789 /// may break in 1.x releases. See [the documentation on unstable
790 /// features][unstable] for details.
791 ///
792 /// [unstable]: crate#unstable-features
793 ///
794 /// # Examples
795 ///
796 /// ```
797 /// # use std::sync::{atomic::AtomicUsize, Arc};
798 /// # use tokio::task::yield_now;
799 /// # pub fn main() {
800 /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
801 /// let poll_stop = poll_stop_counter.clone();
802 /// let rt = tokio::runtime::Builder::new_multi_thread()
803 /// .enable_all()
804 /// .on_after_task_poll(move |meta| {
805 /// println!("task {} completed polling", meta.id());
806 /// })
807 /// .build()
808 /// .unwrap();
809 /// let task = rt.spawn(async {
810 /// yield_now().await;
811 /// });
812 /// let _ = rt.block_on(task);
813 ///
814 /// # }
815 /// ```
816 #[cfg(tokio_unstable)]
817 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
818 pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
819 where
820 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
821 {
822 self.after_poll = Some(std::sync::Arc::new(f));
823 self
824 }
825
826 /// Executes function `f` just after a task is terminated.
827 ///
828 /// `f` is called within the Tokio context, so functions like
829 /// [`tokio::spawn`](crate::spawn) can be called.
830 ///
831 /// This can be used for bookkeeping or monitoring purposes.
832 ///
833 /// Note: There can only be one task termination callback for a runtime; calling this
834 /// function more than once replaces the last callback defined, rather than adding to it.
835 ///
836 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
837 ///
838 /// **Note**: This is an [unstable API][unstable]. The public API of this type
839 /// may break in 1.x releases. See [the documentation on unstable
840 /// features][unstable] for details.
841 ///
842 /// [unstable]: crate#unstable-features
843 ///
844 /// # Examples
845 ///
846 /// ```
847 /// # use tokio::runtime;
848 /// # pub fn main() {
849 /// let runtime = runtime::Builder::new_current_thread()
850 /// .on_task_terminate(|_| {
851 /// println!("killing task");
852 /// })
853 /// .build()
854 /// .unwrap();
855 ///
856 /// runtime.block_on(async {
857 /// tokio::task::spawn(std::future::ready(()));
858 ///
859 /// for _ in 0..64 {
860 /// tokio::task::yield_now().await;
861 /// }
862 /// })
863 /// # }
864 /// ```
865 #[cfg(all(not(loom), tokio_unstable))]
866 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
867 pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
868 where
869 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
870 {
871 self.after_termination = Some(std::sync::Arc::new(f));
872 self
873 }
874
875 /// Creates the configured `Runtime`.
876 ///
877 /// The returned `Runtime` instance is ready to spawn tasks.
878 ///
879 /// # Examples
880 ///
881 /// ```
882 /// use tokio::runtime::Builder;
883 ///
884 /// let rt = Builder::new_multi_thread().build().unwrap();
885 ///
886 /// rt.block_on(async {
887 /// println!("Hello from the Tokio runtime");
888 /// });
889 /// ```
890 pub fn build(&mut self) -> io::Result<Runtime> {
891 match &self.kind {
892 Kind::CurrentThread => self.build_current_thread_runtime(),
893 #[cfg(feature = "rt-multi-thread")]
894 Kind::MultiThread => self.build_threaded_runtime(),
895 }
896 }
897
898 /// Creates the configured [`LocalRuntime`].
899 ///
900 /// The returned [`LocalRuntime`] instance is ready to spawn tasks.
901 ///
902 /// # Panics
903 ///
904 /// This will panic if the runtime is configured with [`new_multi_thread()`].
905 ///
906 /// [`new_multi_thread()`]: Builder::new_multi_thread
907 ///
908 /// # Examples
909 ///
910 /// ```
911 /// use tokio::runtime::{Builder, LocalOptions};
912 ///
913 /// let rt = Builder::new_current_thread()
914 /// .build_local(LocalOptions::default())
915 /// .unwrap();
916 ///
917 /// rt.spawn_local(async {
918 /// println!("Hello from the Tokio runtime");
919 /// });
920 /// ```
921 #[allow(unused_variables, unreachable_patterns)]
922 #[cfg(tokio_unstable)]
923 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
924 pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
925 match &self.kind {
926 Kind::CurrentThread => self.build_current_thread_local_runtime(),
927 #[cfg(feature = "rt-multi-thread")]
928 Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
929 }
930 }
931
932 fn get_cfg(&self) -> driver::Cfg {
933 driver::Cfg {
934 enable_pause_time: match self.kind {
935 Kind::CurrentThread => true,
936 #[cfg(feature = "rt-multi-thread")]
937 Kind::MultiThread => false,
938 },
939 enable_io: self.enable_io,
940 enable_time: self.enable_time,
941 start_paused: self.start_paused,
942 nevents: self.nevents,
943 }
944 }
945
946 /// Sets a custom timeout for a thread in the blocking pool.
947 ///
948 /// By default, the timeout for a thread is set to 10 seconds. This can
949 /// be overridden using `.thread_keep_alive()`.
950 ///
951 /// # Example
952 ///
953 /// ```
954 /// # use tokio::runtime;
955 /// # use std::time::Duration;
956 /// # pub fn main() {
957 /// let rt = runtime::Builder::new_multi_thread()
958 /// .thread_keep_alive(Duration::from_millis(100))
959 /// .build();
960 /// # }
961 /// ```
962 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
963 self.keep_alive = Some(duration);
964 self
965 }
966
967 /// Sets the number of scheduler ticks after which the scheduler will poll the global
968 /// task queue.
969 ///
970 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
971 ///
972 /// By default the global queue interval is 31 for the current-thread scheduler. Please see
973 /// [the module documentation] for the default behavior of the multi-thread scheduler.
974 ///
975 /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
976 /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
977 /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
978 /// getting started on new work, especially if tasks frequently yield rather than complete
979 /// or await on further I/O. Conversely, a higher value prioritizes existing work, and
980 /// is a good choice when most tasks quickly complete polling.
981 ///
982 /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
983 ///
984 /// # Panics
985 ///
986 /// This function will panic if 0 is passed as an argument.
987 ///
988 /// # Examples
989 ///
990 /// ```
991 /// # use tokio::runtime;
992 /// # pub fn main() {
993 /// let rt = runtime::Builder::new_multi_thread()
994 /// .global_queue_interval(31)
995 /// .build();
996 /// # }
997 /// ```
998 #[track_caller]
999 pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1000 assert!(val > 0, "global_queue_interval must be greater than 0");
1001 self.global_queue_interval = Some(val);
1002 self
1003 }
1004
1005 /// Sets the number of scheduler ticks after which the scheduler will poll for
1006 /// external events (timers, I/O, and so on).
1007 ///
1008 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1009 ///
1010 /// By default, the event interval is `61` for all scheduler types.
1011 ///
1012 /// Setting the event interval determines the effective "priority" of delivering
1013 /// these external events (which may wake up additional tasks), compared to
1014 /// executing tasks that are currently ready to run. A smaller value is useful
1015 /// when tasks frequently spend a long time in polling, or frequently yield,
1016 /// which can result in overly long delays picking up I/O events. Conversely,
1017 /// picking up new events requires extra synchronization and syscall overhead,
1018 /// so if tasks generally complete their polling quickly, a higher event interval
1019 /// will minimize that overhead while still keeping the scheduler responsive to
1020 /// events.
1021 ///
1022 /// # Examples
1023 ///
1024 /// ```
1025 /// # use tokio::runtime;
1026 /// # pub fn main() {
1027 /// let rt = runtime::Builder::new_multi_thread()
1028 /// .event_interval(31)
1029 /// .build();
1030 /// # }
1031 /// ```
1032 pub fn event_interval(&mut self, val: u32) -> &mut Self {
1033 self.event_interval = val;
1034 self
1035 }
1036
1037 cfg_unstable! {
1038 /// Configure how the runtime responds to an unhandled panic on a
1039 /// spawned task.
1040 ///
1041 /// By default, an unhandled panic (i.e. a panic not caught by
1042 /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1043 /// execution. The panic's error value is forwarded to the task's
1044 /// [`JoinHandle`] and all other spawned tasks continue running.
1045 ///
1046 /// The `unhandled_panic` option enables configuring this behavior.
1047 ///
1048 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1049 /// spawned tasks have no impact on the runtime's execution.
1050 /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1051 /// shutdown immediately when a spawned task panics even if that
1052 /// task's `JoinHandle` has not been dropped. All other spawned tasks
1053 /// will immediately terminate and further calls to
1054 /// [`Runtime::block_on`] will panic.
1055 ///
1056 /// # Panics
1057 /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1058 /// on a runtime other than the current thread runtime.
1059 ///
1060 /// # Unstable
1061 ///
1062 /// This option is currently unstable and its implementation is
1063 /// incomplete. The API may change or be removed in the future. See
1064 /// issue [tokio-rs/tokio#4516] for more details.
1065 ///
1066 /// # Examples
1067 ///
1068 /// The following demonstrates a runtime configured to shutdown on
1069 /// panic. The first spawned task panics and results in the runtime
1070 /// shutting down. The second spawned task never has a chance to
1071 /// execute. The call to `block_on` will panic due to the runtime being
1072 /// forcibly shutdown.
1073 ///
1074 /// ```should_panic
1075 /// use tokio::runtime::{self, UnhandledPanic};
1076 ///
1077 /// # pub fn main() {
1078 /// let rt = runtime::Builder::new_current_thread()
1079 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1080 /// .build()
1081 /// .unwrap();
1082 ///
1083 /// rt.spawn(async { panic!("boom"); });
1084 /// rt.spawn(async {
1085 /// // This task never completes.
1086 /// });
1087 ///
1088 /// rt.block_on(async {
1089 /// // Do some work
1090 /// # loop { tokio::task::yield_now().await; }
1091 /// })
1092 /// # }
1093 /// ```
1094 ///
1095 /// [`JoinHandle`]: struct@crate::task::JoinHandle
1096 /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1097 pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1098 if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1099 panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1100 }
1101
1102 self.unhandled_panic = behavior;
1103 self
1104 }
1105
1106 /// Disables the LIFO task scheduler heuristic.
1107 ///
1108 /// The multi-threaded scheduler includes a heuristic for optimizing
1109 /// message-passing patterns. This heuristic results in the **last**
1110 /// scheduled task being polled first.
1111 ///
1112 /// To implement this heuristic, each worker thread has a slot which
1113 /// holds the task that should be polled next. However, this slot cannot
1114 /// be stolen by other worker threads, which can result in lower total
1115 /// throughput when tasks tend to have longer poll times.
1116 ///
1117 /// This configuration option will disable this heuristic resulting in
1118 /// all scheduled tasks being pushed into the worker-local queue, which
1119 /// is stealable.
1120 ///
1121 /// Consider trying this option when the task "scheduled" time is high
1122 /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1123 /// collect this data.
1124 ///
1125 /// # Unstable
1126 ///
1127 /// This configuration option is considered a workaround for the LIFO
1128 /// slot not being stealable. When the slot becomes stealable, we will
1129 /// revisit whether or not this option is necessary. See
1130 /// issue [tokio-rs/tokio#4941].
1131 ///
1132 /// # Examples
1133 ///
1134 /// ```
1135 /// use tokio::runtime;
1136 ///
1137 /// let rt = runtime::Builder::new_multi_thread()
1138 /// .disable_lifo_slot()
1139 /// .build()
1140 /// .unwrap();
1141 /// ```
1142 ///
1143 /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1144 /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1145 pub fn disable_lifo_slot(&mut self) -> &mut Self {
1146 self.disable_lifo_slot = true;
1147 self
1148 }
1149
1150 /// Specifies the random number generation seed to use within all
1151 /// threads associated with the runtime being built.
1152 ///
1153 /// This option is intended to make certain parts of the runtime
1154 /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1155 /// [`tokio::select!`] it will ensure that the order that branches are
1156 /// polled is deterministic.
1157 ///
1158 /// In addition to the code specifying `rng_seed` and interacting with
1159 /// the runtime, the internals of Tokio and the Rust compiler may affect
1160 /// the sequences of random numbers. In order to ensure repeatable
1161 /// results, the version of Tokio, the versions of all other
1162 /// dependencies that interact with Tokio, and the Rust compiler version
1163 /// should also all remain constant.
1164 ///
1165 /// # Examples
1166 ///
1167 /// ```
1168 /// # use tokio::runtime::{self, RngSeed};
1169 /// # pub fn main() {
1170 /// let seed = RngSeed::from_bytes(b"place your seed here");
1171 /// let rt = runtime::Builder::new_current_thread()
1172 /// .rng_seed(seed)
1173 /// .build();
1174 /// # }
1175 /// ```
1176 ///
1177 /// [`tokio::select!`]: crate::select
1178 pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1179 self.seed_generator = RngSeedGenerator::new(seed);
1180 self
1181 }
1182 }
1183
1184 cfg_unstable_metrics! {
1185 /// Enables tracking the distribution of task poll times.
1186 ///
1187 /// Task poll times are not instrumented by default as doing so requires
1188 /// calling [`Instant::now()`] twice per task poll, which could add
1189 /// measurable overhead. Use the [`Handle::metrics()`] to access the
1190 /// metrics data.
1191 ///
1192 /// The histogram uses fixed bucket sizes. In other words, the histogram
1193 /// buckets are not dynamic based on input values. Use the
1194 /// `metrics_poll_time_histogram` builder methods to configure the
1195 /// histogram details.
1196 ///
1197 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1198 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1199 /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1200 /// to select [`LogHistogram`] instead.
1201 ///
1202 /// # Examples
1203 ///
1204 /// ```
1205 /// use tokio::runtime;
1206 ///
1207 /// let rt = runtime::Builder::new_multi_thread()
1208 /// .enable_metrics_poll_time_histogram()
1209 /// .build()
1210 /// .unwrap();
1211 /// # // Test default values here
1212 /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1213 /// # let m = rt.handle().metrics();
1214 /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1215 /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1216 /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1217 /// ```
1218 ///
1219 /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1220 /// [`Instant::now()`]: std::time::Instant::now
1221 /// [`LogHistogram`]: crate::runtime::LogHistogram
1222 /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1223 pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1224 self.metrics_poll_count_histogram_enable = true;
1225 self
1226 }
1227
1228 /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1229 ///
1230 /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1231 #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1232 #[doc(hidden)]
1233 pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1234 self.enable_metrics_poll_time_histogram()
1235 }
1236
1237 /// Sets the histogram scale for tracking the distribution of task poll
1238 /// times.
1239 ///
1240 /// Tracking the distribution of task poll times can be done using a
1241 /// linear or log scale. When using linear scale, each histogram bucket
1242 /// will represent the same range of poll times. When using log scale,
1243 /// each histogram bucket will cover a range twice as big as the
1244 /// previous bucket.
1245 ///
1246 /// **Default:** linear scale.
1247 ///
1248 /// # Examples
1249 ///
1250 /// ```
1251 /// use tokio::runtime::{self, HistogramScale};
1252 ///
1253 /// # #[allow(deprecated)]
1254 /// let rt = runtime::Builder::new_multi_thread()
1255 /// .enable_metrics_poll_time_histogram()
1256 /// .metrics_poll_count_histogram_scale(HistogramScale::Log)
1257 /// .build()
1258 /// .unwrap();
1259 /// ```
1260 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1261 pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1262 self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1263 self
1264 }
1265
1266 /// Configure the histogram for tracking poll times
1267 ///
1268 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1269 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1270 /// better granularity with low memory usage, use [`LogHistogram`] instead.
1271 ///
1272 /// # Examples
1273 /// Configure a [`LogHistogram`] with [default configuration]:
1274 /// ```
1275 /// use tokio::runtime;
1276 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1277 ///
1278 /// let rt = runtime::Builder::new_multi_thread()
1279 /// .enable_metrics_poll_time_histogram()
1280 /// .metrics_poll_time_histogram_configuration(
1281 /// HistogramConfiguration::log(LogHistogram::default())
1282 /// )
1283 /// .build()
1284 /// .unwrap();
1285 /// ```
1286 ///
1287 /// Configure a linear histogram with 100 buckets, each 10μs wide
1288 /// ```
1289 /// use tokio::runtime;
1290 /// use std::time::Duration;
1291 /// use tokio::runtime::HistogramConfiguration;
1292 ///
1293 /// let rt = runtime::Builder::new_multi_thread()
1294 /// .enable_metrics_poll_time_histogram()
1295 /// .metrics_poll_time_histogram_configuration(
1296 /// HistogramConfiguration::linear(Duration::from_micros(10), 100)
1297 /// )
1298 /// .build()
1299 /// .unwrap();
1300 /// ```
1301 ///
1302 /// Configure a [`LogHistogram`] with the following settings:
1303 /// - Measure times from 100ns to 120s
1304 /// - Max error of 0.1
1305 /// - No more than 1024 buckets
1306 /// ```
1307 /// use std::time::Duration;
1308 /// use tokio::runtime;
1309 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1310 ///
1311 /// let rt = runtime::Builder::new_multi_thread()
1312 /// .enable_metrics_poll_time_histogram()
1313 /// .metrics_poll_time_histogram_configuration(
1314 /// HistogramConfiguration::log(LogHistogram::builder()
1315 /// .max_value(Duration::from_secs(120))
1316 /// .min_value(Duration::from_nanos(100))
1317 /// .max_error(0.1)
1318 /// .max_buckets(1024)
1319 /// .expect("configuration uses 488 buckets")
1320 /// )
1321 /// )
1322 /// .build()
1323 /// .unwrap();
1324 /// ```
1325 ///
1326 /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1327 /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1328 /// where each bucket is twice the size of the previous bucket.
1329 /// ```rust
1330 /// use std::time::Duration;
1331 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1332 /// let rt = tokio::runtime::Builder::new_current_thread()
1333 /// .enable_all()
1334 /// .enable_metrics_poll_time_histogram()
1335 /// .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1336 /// LogHistogram::builder()
1337 /// .min_value(Duration::from_micros(20))
1338 /// .max_value(Duration::from_millis(4))
1339 /// // Set `precision_exact` to `0` to match `HistogramScale::Log`
1340 /// .precision_exact(0)
1341 /// .max_buckets(10)
1342 /// .unwrap(),
1343 /// ))
1344 /// .build()
1345 /// .unwrap();
1346 /// ```
1347 ///
1348 /// [`LogHistogram`]: crate::runtime::LogHistogram
1349 /// [default configuration]: crate::runtime::LogHistogramBuilder
1350 /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1351 pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1352 self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1353 self
1354 }
1355
1356 /// Sets the histogram resolution for tracking the distribution of task
1357 /// poll times.
1358 ///
1359 /// The resolution is the histogram's first bucket's range. When using a
1360 /// linear histogram scale, each bucket will cover the same range. When
1361 /// using a log scale, each bucket will cover a range twice as big as
1362 /// the previous bucket. In the log case, the resolution represents the
1363 /// smallest bucket range.
1364 ///
1365 /// Note that, when using log scale, the resolution is rounded up to the
1366 /// nearest power of 2 in nanoseconds.
1367 ///
1368 /// **Default:** 100 microseconds.
1369 ///
1370 /// # Examples
1371 ///
1372 /// ```
1373 /// use tokio::runtime;
1374 /// use std::time::Duration;
1375 ///
1376 /// # #[allow(deprecated)]
1377 /// let rt = runtime::Builder::new_multi_thread()
1378 /// .enable_metrics_poll_time_histogram()
1379 /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1380 /// .build()
1381 /// .unwrap();
1382 /// ```
1383 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1384 pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1385 assert!(resolution > Duration::from_secs(0));
1386 // Sanity check the argument and also make the cast below safe.
1387 assert!(resolution <= Duration::from_secs(1));
1388
1389 let resolution = resolution.as_nanos() as u64;
1390
1391 self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1392 self
1393 }
1394
1395 /// Sets the number of buckets for the histogram tracking the
1396 /// distribution of task poll times.
1397 ///
1398 /// The last bucket tracks all greater values that fall out of other
1399 /// ranges. So, configuring the histogram using a linear scale,
1400 /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1401 /// polls that take more than 450ms to complete.
1402 ///
1403 /// **Default:** 10
1404 ///
1405 /// # Examples
1406 ///
1407 /// ```
1408 /// use tokio::runtime;
1409 ///
1410 /// # #[allow(deprecated)]
1411 /// let rt = runtime::Builder::new_multi_thread()
1412 /// .enable_metrics_poll_time_histogram()
1413 /// .metrics_poll_count_histogram_buckets(15)
1414 /// .build()
1415 /// .unwrap();
1416 /// ```
1417 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1418 pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1419 self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1420 self
1421 }
1422 }
1423
1424 fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1425 use crate::runtime::runtime::Scheduler;
1426
1427 let (scheduler, handle, blocking_pool) =
1428 self.build_current_thread_runtime_components(None)?;
1429
1430 Ok(Runtime::from_parts(
1431 Scheduler::CurrentThread(scheduler),
1432 handle,
1433 blocking_pool,
1434 ))
1435 }
1436
1437 #[cfg(tokio_unstable)]
1438 fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1439 use crate::runtime::local_runtime::LocalRuntimeScheduler;
1440
1441 let tid = std::thread::current().id();
1442
1443 let (scheduler, handle, blocking_pool) =
1444 self.build_current_thread_runtime_components(Some(tid))?;
1445
1446 Ok(LocalRuntime::from_parts(
1447 LocalRuntimeScheduler::CurrentThread(scheduler),
1448 handle,
1449 blocking_pool,
1450 ))
1451 }
1452
1453 fn build_current_thread_runtime_components(
1454 &mut self,
1455 local_tid: Option<ThreadId>,
1456 ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1457 use crate::runtime::scheduler;
1458 use crate::runtime::Config;
1459
1460 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1461
1462 // Blocking pool
1463 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1464 let blocking_spawner = blocking_pool.spawner().clone();
1465
1466 // Generate a rng seed for this runtime.
1467 let seed_generator_1 = self.seed_generator.next_generator();
1468 let seed_generator_2 = self.seed_generator.next_generator();
1469
1470 // And now put a single-threaded scheduler on top of the timer. When
1471 // there are no futures ready to do something, it'll let the timer or
1472 // the reactor to generate some new stimuli for the futures to continue
1473 // in their life.
1474 let (scheduler, handle) = CurrentThread::new(
1475 driver,
1476 driver_handle,
1477 blocking_spawner,
1478 seed_generator_2,
1479 Config {
1480 before_park: self.before_park.clone(),
1481 after_unpark: self.after_unpark.clone(),
1482 before_spawn: self.before_spawn.clone(),
1483 #[cfg(tokio_unstable)]
1484 before_poll: self.before_poll.clone(),
1485 #[cfg(tokio_unstable)]
1486 after_poll: self.after_poll.clone(),
1487 after_termination: self.after_termination.clone(),
1488 global_queue_interval: self.global_queue_interval,
1489 event_interval: self.event_interval,
1490 #[cfg(tokio_unstable)]
1491 unhandled_panic: self.unhandled_panic.clone(),
1492 disable_lifo_slot: self.disable_lifo_slot,
1493 seed_generator: seed_generator_1,
1494 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1495 },
1496 local_tid,
1497 );
1498
1499 let handle = Handle {
1500 inner: scheduler::Handle::CurrentThread(handle),
1501 };
1502
1503 Ok((scheduler, handle, blocking_pool))
1504 }
1505
1506 fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1507 if self.metrics_poll_count_histogram_enable {
1508 Some(self.metrics_poll_count_histogram.clone())
1509 } else {
1510 None
1511 }
1512 }
1513}
1514
1515cfg_io_driver! {
1516 impl Builder {
1517 /// Enables the I/O driver.
1518 ///
1519 /// Doing this enables using net, process, signal, and some I/O types on
1520 /// the runtime.
1521 ///
1522 /// # Examples
1523 ///
1524 /// ```
1525 /// use tokio::runtime;
1526 ///
1527 /// let rt = runtime::Builder::new_multi_thread()
1528 /// .enable_io()
1529 /// .build()
1530 /// .unwrap();
1531 /// ```
1532 pub fn enable_io(&mut self) -> &mut Self {
1533 self.enable_io = true;
1534 self
1535 }
1536
1537 /// Enables the I/O driver and configures the max number of events to be
1538 /// processed per tick.
1539 ///
1540 /// # Examples
1541 ///
1542 /// ```
1543 /// use tokio::runtime;
1544 ///
1545 /// let rt = runtime::Builder::new_current_thread()
1546 /// .enable_io()
1547 /// .max_io_events_per_tick(1024)
1548 /// .build()
1549 /// .unwrap();
1550 /// ```
1551 pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1552 self.nevents = capacity;
1553 self
1554 }
1555 }
1556}
1557
1558cfg_time! {
1559 impl Builder {
1560 /// Enables the time driver.
1561 ///
1562 /// Doing this enables using `tokio::time` on the runtime.
1563 ///
1564 /// # Examples
1565 ///
1566 /// ```
1567 /// use tokio::runtime;
1568 ///
1569 /// let rt = runtime::Builder::new_multi_thread()
1570 /// .enable_time()
1571 /// .build()
1572 /// .unwrap();
1573 /// ```
1574 pub fn enable_time(&mut self) -> &mut Self {
1575 self.enable_time = true;
1576 self
1577 }
1578 }
1579}
1580
1581cfg_test_util! {
1582 impl Builder {
1583 /// Controls if the runtime's clock starts paused or advancing.
1584 ///
1585 /// Pausing time requires the current-thread runtime; construction of
1586 /// the runtime will panic otherwise.
1587 ///
1588 /// # Examples
1589 ///
1590 /// ```
1591 /// use tokio::runtime;
1592 ///
1593 /// let rt = runtime::Builder::new_current_thread()
1594 /// .enable_time()
1595 /// .start_paused(true)
1596 /// .build()
1597 /// .unwrap();
1598 /// ```
1599 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1600 self.start_paused = start_paused;
1601 self
1602 }
1603 }
1604}
1605
1606cfg_rt_multi_thread! {
1607 impl Builder {
1608 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1609 use crate::loom::sys::num_cpus;
1610 use crate::runtime::{Config, runtime::Scheduler};
1611 use crate::runtime::scheduler::{self, MultiThread};
1612
1613 let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1614
1615 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1616
1617 // Create the blocking pool
1618 let blocking_pool =
1619 blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1620 let blocking_spawner = blocking_pool.spawner().clone();
1621
1622 // Generate a rng seed for this runtime.
1623 let seed_generator_1 = self.seed_generator.next_generator();
1624 let seed_generator_2 = self.seed_generator.next_generator();
1625
1626 let (scheduler, handle, launch) = MultiThread::new(
1627 worker_threads,
1628 driver,
1629 driver_handle,
1630 blocking_spawner,
1631 seed_generator_2,
1632 Config {
1633 before_park: self.before_park.clone(),
1634 after_unpark: self.after_unpark.clone(),
1635 before_spawn: self.before_spawn.clone(),
1636 #[cfg(tokio_unstable)]
1637 before_poll: self.before_poll.clone(),
1638 #[cfg(tokio_unstable)]
1639 after_poll: self.after_poll.clone(),
1640 after_termination: self.after_termination.clone(),
1641 global_queue_interval: self.global_queue_interval,
1642 event_interval: self.event_interval,
1643 #[cfg(tokio_unstable)]
1644 unhandled_panic: self.unhandled_panic.clone(),
1645 disable_lifo_slot: self.disable_lifo_slot,
1646 seed_generator: seed_generator_1,
1647 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1648 },
1649 );
1650
1651 let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1652
1653 // Spawn the thread pool workers
1654 let _enter = handle.enter();
1655 launch.launch();
1656
1657 Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1658 }
1659 }
1660}
1661
1662impl fmt::Debug for Builder {
1663 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1664 fmt.debug_struct("Builder")
1665 .field("worker_threads", &self.worker_threads)
1666 .field("max_blocking_threads", &self.max_blocking_threads)
1667 .field(
1668 "thread_name",
1669 &"<dyn Fn() -> String + Send + Sync + 'static>",
1670 )
1671 .field("thread_stack_size", &self.thread_stack_size)
1672 .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1673 .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1674 .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1675 .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1676 .finish()
1677 }
1678}