tokio\runtime/builder.rs
1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
5#[cfg(tokio_unstable)]
6use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
7use crate::util::rand::{RngSeed, RngSeedGenerator};
8
9use crate::runtime::blocking::BlockingPool;
10use crate::runtime::scheduler::CurrentThread;
11use std::fmt;
12use std::io;
13use std::thread::ThreadId;
14use std::time::Duration;
15
16/// Builds Tokio Runtime with custom configuration values.
17///
18/// Methods can be chained in order to set the configuration values. The
19/// Runtime is constructed by calling [`build`].
20///
21/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
22/// or [`Builder::new_current_thread`].
23///
24/// See function level documentation for details on the various configuration
25/// settings.
26///
27/// [`build`]: method@Self::build
28/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
29/// [`Builder::new_current_thread`]: method@Self::new_current_thread
30///
31/// # Examples
32///
33/// ```
34/// # #[cfg(not(target_family = "wasm"))]
35/// # {
36/// use tokio::runtime::Builder;
37///
38/// fn main() {
39/// // build runtime
40/// let runtime = Builder::new_multi_thread()
41/// .worker_threads(4)
42/// .thread_name("my-custom-name")
43/// .thread_stack_size(3 * 1024 * 1024)
44/// .build()
45/// .unwrap();
46///
47/// // use runtime ...
48/// }
49/// # }
50/// ```
51pub struct Builder {
52 /// Runtime type
53 kind: Kind,
54
55 /// Whether or not to enable the I/O driver
56 enable_io: bool,
57 nevents: usize,
58
59 /// Whether or not to enable the time driver
60 enable_time: bool,
61
62 /// Whether or not the clock should start paused.
63 start_paused: bool,
64
65 /// The number of worker threads, used by Runtime.
66 ///
67 /// Only used when not using the current-thread executor.
68 worker_threads: Option<usize>,
69
70 /// Cap on thread usage.
71 max_blocking_threads: usize,
72
73 /// Name fn used for threads spawned by the runtime.
74 pub(super) thread_name: ThreadNameFn,
75
76 /// Stack size used for threads spawned by the runtime.
77 pub(super) thread_stack_size: Option<usize>,
78
79 /// Callback to run after each thread starts.
80 pub(super) after_start: Option<Callback>,
81
82 /// To run before each worker thread stops
83 pub(super) before_stop: Option<Callback>,
84
85 /// To run before each worker thread is parked.
86 pub(super) before_park: Option<Callback>,
87
88 /// To run after each thread is unparked.
89 pub(super) after_unpark: Option<Callback>,
90
91 /// To run before each task is spawned.
92 pub(super) before_spawn: Option<TaskCallback>,
93
94 /// To run before each poll
95 #[cfg(tokio_unstable)]
96 pub(super) before_poll: Option<TaskCallback>,
97
98 /// To run after each poll
99 #[cfg(tokio_unstable)]
100 pub(super) after_poll: Option<TaskCallback>,
101
102 /// To run after each task is terminated.
103 pub(super) after_termination: Option<TaskCallback>,
104
105 /// Customizable keep alive timeout for `BlockingPool`
106 pub(super) keep_alive: Option<Duration>,
107
108 /// How many ticks before pulling a task from the global/remote queue?
109 ///
110 /// When `None`, the value is unspecified and behavior details are left to
111 /// the scheduler. Each scheduler flavor could choose to either pick its own
112 /// default value or use some other strategy to decide when to poll from the
113 /// global queue. For example, the multi-threaded scheduler uses a
114 /// self-tuning strategy based on mean task poll times.
115 pub(super) global_queue_interval: Option<u32>,
116
117 /// How many ticks before yielding to the driver for timer and I/O events?
118 pub(super) event_interval: u32,
119
120 /// When true, the multi-threade scheduler LIFO slot should not be used.
121 ///
122 /// This option should only be exposed as unstable.
123 pub(super) disable_lifo_slot: bool,
124
125 /// Specify a random number generator seed to provide deterministic results
126 pub(super) seed_generator: RngSeedGenerator,
127
128 /// When true, enables task poll count histogram instrumentation.
129 pub(super) metrics_poll_count_histogram_enable: bool,
130
131 /// Configures the task poll count histogram
132 pub(super) metrics_poll_count_histogram: HistogramBuilder,
133
134 #[cfg(tokio_unstable)]
135 pub(super) unhandled_panic: UnhandledPanic,
136}
137
138cfg_unstable! {
139 /// How the runtime should respond to unhandled panics.
140 ///
141 /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
142 /// to configure the runtime behavior when a spawned task panics.
143 ///
144 /// See [`Builder::unhandled_panic`] for more details.
145 #[derive(Debug, Clone)]
146 #[non_exhaustive]
147 pub enum UnhandledPanic {
148 /// The runtime should ignore panics on spawned tasks.
149 ///
150 /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
151 /// tasks continue running normally.
152 ///
153 /// This is the default behavior.
154 ///
155 /// # Examples
156 ///
157 /// ```
158 /// # #[cfg(not(target_family = "wasm"))]
159 /// # {
160 /// use tokio::runtime::{self, UnhandledPanic};
161 ///
162 /// # pub fn main() {
163 /// let rt = runtime::Builder::new_current_thread()
164 /// .unhandled_panic(UnhandledPanic::Ignore)
165 /// .build()
166 /// .unwrap();
167 ///
168 /// let task1 = rt.spawn(async { panic!("boom"); });
169 /// let task2 = rt.spawn(async {
170 /// // This task completes normally
171 /// "done"
172 /// });
173 ///
174 /// rt.block_on(async {
175 /// // The panic on the first task is forwarded to the `JoinHandle`
176 /// assert!(task1.await.is_err());
177 ///
178 /// // The second task completes normally
179 /// assert!(task2.await.is_ok());
180 /// })
181 /// # }
182 /// # }
183 /// ```
184 ///
185 /// [`JoinHandle`]: struct@crate::task::JoinHandle
186 Ignore,
187
188 /// The runtime should immediately shutdown if a spawned task panics.
189 ///
190 /// The runtime will immediately shutdown even if the panicked task's
191 /// [`JoinHandle`] is still available. All further spawned tasks will be
192 /// immediately dropped and call to [`Runtime::block_on`] will panic.
193 ///
194 /// # Examples
195 ///
196 /// ```should_panic
197 /// use tokio::runtime::{self, UnhandledPanic};
198 ///
199 /// # pub fn main() {
200 /// let rt = runtime::Builder::new_current_thread()
201 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
202 /// .build()
203 /// .unwrap();
204 ///
205 /// rt.spawn(async { panic!("boom"); });
206 /// rt.spawn(async {
207 /// // This task never completes.
208 /// });
209 ///
210 /// rt.block_on(async {
211 /// // Do some work
212 /// # loop { tokio::task::yield_now().await; }
213 /// })
214 /// # }
215 /// ```
216 ///
217 /// [`JoinHandle`]: struct@crate::task::JoinHandle
218 ShutdownRuntime,
219 }
220}
221
222pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
223
224#[derive(Clone, Copy)]
225pub(crate) enum Kind {
226 CurrentThread,
227 #[cfg(feature = "rt-multi-thread")]
228 MultiThread,
229}
230
231impl Builder {
232 /// Returns a new builder with the current thread scheduler selected.
233 ///
234 /// Configuration methods can be chained on the return value.
235 ///
236 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
237 /// [`LocalSet`].
238 ///
239 /// [`LocalSet`]: crate::task::LocalSet
240 pub fn new_current_thread() -> Builder {
241 #[cfg(loom)]
242 const EVENT_INTERVAL: u32 = 4;
243 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
244 #[cfg(not(loom))]
245 const EVENT_INTERVAL: u32 = 61;
246
247 Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
248 }
249
250 /// Returns a new builder with the multi thread scheduler selected.
251 ///
252 /// Configuration methods can be chained on the return value.
253 #[cfg(feature = "rt-multi-thread")]
254 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
255 pub fn new_multi_thread() -> Builder {
256 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
257 Builder::new(Kind::MultiThread, 61)
258 }
259
260 /// Returns a new runtime builder initialized with default configuration
261 /// values.
262 ///
263 /// Configuration methods can be chained on the return value.
264 pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
265 Builder {
266 kind,
267
268 // I/O defaults to "off"
269 enable_io: false,
270 nevents: 1024,
271
272 // Time defaults to "off"
273 enable_time: false,
274
275 // The clock starts not-paused
276 start_paused: false,
277
278 // Read from environment variable first in multi-threaded mode.
279 // Default to lazy auto-detection (one thread per CPU core)
280 worker_threads: None,
281
282 max_blocking_threads: 512,
283
284 // Default thread name
285 thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
286
287 // Do not set a stack size by default
288 thread_stack_size: None,
289
290 // No worker thread callbacks
291 after_start: None,
292 before_stop: None,
293 before_park: None,
294 after_unpark: None,
295
296 before_spawn: None,
297 after_termination: None,
298
299 #[cfg(tokio_unstable)]
300 before_poll: None,
301 #[cfg(tokio_unstable)]
302 after_poll: None,
303
304 keep_alive: None,
305
306 // Defaults for these values depend on the scheduler kind, so we get them
307 // as parameters.
308 global_queue_interval: None,
309 event_interval,
310
311 seed_generator: RngSeedGenerator::new(RngSeed::new()),
312
313 #[cfg(tokio_unstable)]
314 unhandled_panic: UnhandledPanic::Ignore,
315
316 metrics_poll_count_histogram_enable: false,
317
318 metrics_poll_count_histogram: HistogramBuilder::default(),
319
320 disable_lifo_slot: false,
321 }
322 }
323
324 /// Enables both I/O and time drivers.
325 ///
326 /// Doing this is a shorthand for calling `enable_io` and `enable_time`
327 /// individually. If additional components are added to Tokio in the future,
328 /// `enable_all` will include these future components.
329 ///
330 /// # Examples
331 ///
332 /// ```
333 /// # #[cfg(not(target_family = "wasm"))]
334 /// # {
335 /// use tokio::runtime;
336 ///
337 /// let rt = runtime::Builder::new_multi_thread()
338 /// .enable_all()
339 /// .build()
340 /// .unwrap();
341 /// # }
342 /// ```
343 pub fn enable_all(&mut self) -> &mut Self {
344 #[cfg(any(
345 feature = "net",
346 all(unix, feature = "process"),
347 all(unix, feature = "signal")
348 ))]
349 self.enable_io();
350
351 #[cfg(all(
352 tokio_unstable,
353 feature = "io-uring",
354 feature = "rt",
355 feature = "fs",
356 target_os = "linux",
357 ))]
358 self.enable_io_uring();
359
360 #[cfg(feature = "time")]
361 self.enable_time();
362
363 self
364 }
365
366 /// Sets the number of worker threads the `Runtime` will use.
367 ///
368 /// This can be any number above 0 though it is advised to keep this value
369 /// on the smaller side.
370 ///
371 /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
372 ///
373 /// # Default
374 ///
375 /// The default value is the number of cores available to the system.
376 ///
377 /// When using the `current_thread` runtime this method has no effect.
378 ///
379 /// # Examples
380 ///
381 /// ## Multi threaded runtime with 4 threads
382 ///
383 /// ```
384 /// # #[cfg(not(target_family = "wasm"))]
385 /// # {
386 /// use tokio::runtime;
387 ///
388 /// // This will spawn a work-stealing runtime with 4 worker threads.
389 /// let rt = runtime::Builder::new_multi_thread()
390 /// .worker_threads(4)
391 /// .build()
392 /// .unwrap();
393 ///
394 /// rt.spawn(async move {});
395 /// # }
396 /// ```
397 ///
398 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
399 ///
400 /// ```
401 /// use tokio::runtime;
402 ///
403 /// // Create a runtime that _must_ be driven from a call
404 /// // to `Runtime::block_on`.
405 /// let rt = runtime::Builder::new_current_thread()
406 /// .build()
407 /// .unwrap();
408 ///
409 /// // This will run the runtime and future on the current thread
410 /// rt.block_on(async move {});
411 /// ```
412 ///
413 /// # Panics
414 ///
415 /// This will panic if `val` is not larger than `0`.
416 #[track_caller]
417 pub fn worker_threads(&mut self, val: usize) -> &mut Self {
418 assert!(val > 0, "Worker threads cannot be set to 0");
419 self.worker_threads = Some(val);
420 self
421 }
422
423 /// Specifies the limit for additional threads spawned by the Runtime.
424 ///
425 /// These threads are used for blocking operations like tasks spawned
426 /// through [`spawn_blocking`], this includes but is not limited to:
427 /// - [`fs`] operations
428 /// - dns resolution through [`ToSocketAddrs`]
429 /// - writing to [`Stdout`] or [`Stderr`]
430 /// - reading from [`Stdin`]
431 ///
432 /// Unlike the [`worker_threads`], they are not always active and will exit
433 /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
434 ///
435 /// It's recommended to not set this limit too low in order to avoid hanging on operations
436 /// requiring [`spawn_blocking`].
437 ///
438 /// The default value is 512.
439 ///
440 /// # Queue Behavior
441 ///
442 /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
443 /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
444 /// method has not been reached, a new thread will be spawned. If no idle thread is available
445 /// and no more threads are allowed to be spawned, the task will remain in the queue until one
446 /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
447 /// it could potentially grow unbounded.
448 ///
449 /// # Panics
450 ///
451 /// This will panic if `val` is not larger than `0`.
452 ///
453 /// # Upgrading from 0.x
454 ///
455 /// In old versions `max_threads` limited both blocking and worker threads, but the
456 /// current `max_blocking_threads` does not include async worker threads in the count.
457 ///
458 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
459 /// [`fs`]: mod@crate::fs
460 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
461 /// [`Stdout`]: struct@crate::io::Stdout
462 /// [`Stdin`]: struct@crate::io::Stdin
463 /// [`Stderr`]: struct@crate::io::Stderr
464 /// [`worker_threads`]: Self::worker_threads
465 /// [`thread_keep_alive`]: Self::thread_keep_alive
466 #[track_caller]
467 #[cfg_attr(docsrs, doc(alias = "max_threads"))]
468 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
469 assert!(val > 0, "Max blocking threads cannot be set to 0");
470 self.max_blocking_threads = val;
471 self
472 }
473
474 /// Sets name of threads spawned by the `Runtime`'s thread pool.
475 ///
476 /// The default name is "tokio-runtime-worker".
477 ///
478 /// # Examples
479 ///
480 /// ```
481 /// # #[cfg(not(target_family = "wasm"))]
482 /// # {
483 /// # use tokio::runtime;
484 ///
485 /// # pub fn main() {
486 /// let rt = runtime::Builder::new_multi_thread()
487 /// .thread_name("my-pool")
488 /// .build();
489 /// # }
490 /// # }
491 /// ```
492 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
493 let val = val.into();
494 self.thread_name = std::sync::Arc::new(move || val.clone());
495 self
496 }
497
498 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
499 ///
500 /// The default name fn is `|| "tokio-runtime-worker".into()`.
501 ///
502 /// # Examples
503 ///
504 /// ```
505 /// # #[cfg(not(target_family = "wasm"))]
506 /// # {
507 /// # use tokio::runtime;
508 /// # use std::sync::atomic::{AtomicUsize, Ordering};
509 /// # pub fn main() {
510 /// let rt = runtime::Builder::new_multi_thread()
511 /// .thread_name_fn(|| {
512 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
513 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
514 /// format!("my-pool-{}", id)
515 /// })
516 /// .build();
517 /// # }
518 /// # }
519 /// ```
520 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
521 where
522 F: Fn() -> String + Send + Sync + 'static,
523 {
524 self.thread_name = std::sync::Arc::new(f);
525 self
526 }
527
528 /// Sets the stack size (in bytes) for worker threads.
529 ///
530 /// The actual stack size may be greater than this value if the platform
531 /// specifies minimal stack size.
532 ///
533 /// The default stack size for spawned threads is 2 MiB, though this
534 /// particular stack size is subject to change in the future.
535 ///
536 /// # Examples
537 ///
538 /// ```
539 /// # #[cfg(not(target_family = "wasm"))]
540 /// # {
541 /// # use tokio::runtime;
542 ///
543 /// # pub fn main() {
544 /// let rt = runtime::Builder::new_multi_thread()
545 /// .thread_stack_size(32 * 1024)
546 /// .build();
547 /// # }
548 /// # }
549 /// ```
550 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
551 self.thread_stack_size = Some(val);
552 self
553 }
554
555 /// Executes function `f` after each thread is started but before it starts
556 /// doing work.
557 ///
558 /// This is intended for bookkeeping and monitoring use cases.
559 ///
560 /// # Examples
561 ///
562 /// ```
563 /// # #[cfg(not(target_family = "wasm"))]
564 /// # {
565 /// # use tokio::runtime;
566 /// # pub fn main() {
567 /// let runtime = runtime::Builder::new_multi_thread()
568 /// .on_thread_start(|| {
569 /// println!("thread started");
570 /// })
571 /// .build();
572 /// # }
573 /// # }
574 /// ```
575 #[cfg(not(loom))]
576 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
577 where
578 F: Fn() + Send + Sync + 'static,
579 {
580 self.after_start = Some(std::sync::Arc::new(f));
581 self
582 }
583
584 /// Executes function `f` before each thread stops.
585 ///
586 /// This is intended for bookkeeping and monitoring use cases.
587 ///
588 /// # Examples
589 ///
590 /// ```
591 /// # #[cfg(not(target_family = "wasm"))]
592 /// {
593 /// # use tokio::runtime;
594 /// # pub fn main() {
595 /// let runtime = runtime::Builder::new_multi_thread()
596 /// .on_thread_stop(|| {
597 /// println!("thread stopping");
598 /// })
599 /// .build();
600 /// # }
601 /// # }
602 /// ```
603 #[cfg(not(loom))]
604 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
605 where
606 F: Fn() + Send + Sync + 'static,
607 {
608 self.before_stop = Some(std::sync::Arc::new(f));
609 self
610 }
611
612 /// Executes function `f` just before a thread is parked (goes idle).
613 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
614 /// can be called, and may result in this thread being unparked immediately.
615 ///
616 /// This can be used to start work only when the executor is idle, or for bookkeeping
617 /// and monitoring purposes.
618 ///
619 /// Note: There can only be one park callback for a runtime; calling this function
620 /// more than once replaces the last callback defined, rather than adding to it.
621 ///
622 /// # Examples
623 ///
624 /// ## Multithreaded executor
625 /// ```
626 /// # #[cfg(not(target_family = "wasm"))]
627 /// # {
628 /// # use std::sync::Arc;
629 /// # use std::sync::atomic::{AtomicBool, Ordering};
630 /// # use tokio::runtime;
631 /// # use tokio::sync::Barrier;
632 /// # pub fn main() {
633 /// let once = AtomicBool::new(true);
634 /// let barrier = Arc::new(Barrier::new(2));
635 ///
636 /// let runtime = runtime::Builder::new_multi_thread()
637 /// .worker_threads(1)
638 /// .on_thread_park({
639 /// let barrier = barrier.clone();
640 /// move || {
641 /// let barrier = barrier.clone();
642 /// if once.swap(false, Ordering::Relaxed) {
643 /// tokio::spawn(async move { barrier.wait().await; });
644 /// }
645 /// }
646 /// })
647 /// .build()
648 /// .unwrap();
649 ///
650 /// runtime.block_on(async {
651 /// barrier.wait().await;
652 /// })
653 /// # }
654 /// # }
655 /// ```
656 /// ## Current thread executor
657 /// ```
658 /// # use std::sync::Arc;
659 /// # use std::sync::atomic::{AtomicBool, Ordering};
660 /// # use tokio::runtime;
661 /// # use tokio::sync::Barrier;
662 /// # pub fn main() {
663 /// let once = AtomicBool::new(true);
664 /// let barrier = Arc::new(Barrier::new(2));
665 ///
666 /// let runtime = runtime::Builder::new_current_thread()
667 /// .on_thread_park({
668 /// let barrier = barrier.clone();
669 /// move || {
670 /// let barrier = barrier.clone();
671 /// if once.swap(false, Ordering::Relaxed) {
672 /// tokio::spawn(async move { barrier.wait().await; });
673 /// }
674 /// }
675 /// })
676 /// .build()
677 /// .unwrap();
678 ///
679 /// runtime.block_on(async {
680 /// barrier.wait().await;
681 /// })
682 /// # }
683 /// ```
684 #[cfg(not(loom))]
685 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
686 where
687 F: Fn() + Send + Sync + 'static,
688 {
689 self.before_park = Some(std::sync::Arc::new(f));
690 self
691 }
692
693 /// Executes function `f` just after a thread unparks (starts executing tasks).
694 ///
695 /// This is intended for bookkeeping and monitoring use cases; note that work
696 /// in this callback will increase latencies when the application has allowed one or
697 /// more runtime threads to go idle.
698 ///
699 /// Note: There can only be one unpark callback for a runtime; calling this function
700 /// more than once replaces the last callback defined, rather than adding to it.
701 ///
702 /// # Examples
703 ///
704 /// ```
705 /// # #[cfg(not(target_family = "wasm"))]
706 /// # {
707 /// # use tokio::runtime;
708 /// # pub fn main() {
709 /// let runtime = runtime::Builder::new_multi_thread()
710 /// .on_thread_unpark(|| {
711 /// println!("thread unparking");
712 /// })
713 /// .build();
714 ///
715 /// runtime.unwrap().block_on(async {
716 /// tokio::task::yield_now().await;
717 /// println!("Hello from Tokio!");
718 /// })
719 /// # }
720 /// # }
721 /// ```
722 #[cfg(not(loom))]
723 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
724 where
725 F: Fn() + Send + Sync + 'static,
726 {
727 self.after_unpark = Some(std::sync::Arc::new(f));
728 self
729 }
730
731 /// Executes function `f` just before a task is spawned.
732 ///
733 /// `f` is called within the Tokio context, so functions like
734 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
735 /// invoked immediately.
736 ///
737 /// This can be used for bookkeeping or monitoring purposes.
738 ///
739 /// Note: There can only be one spawn callback for a runtime; calling this function more
740 /// than once replaces the last callback defined, rather than adding to it.
741 ///
742 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
743 ///
744 /// **Note**: This is an [unstable API][unstable]. The public API of this type
745 /// may break in 1.x releases. See [the documentation on unstable
746 /// features][unstable] for details.
747 ///
748 /// [unstable]: crate#unstable-features
749 ///
750 /// # Examples
751 ///
752 /// ```
753 /// # use tokio::runtime;
754 /// # pub fn main() {
755 /// let runtime = runtime::Builder::new_current_thread()
756 /// .on_task_spawn(|_| {
757 /// println!("spawning task");
758 /// })
759 /// .build()
760 /// .unwrap();
761 ///
762 /// runtime.block_on(async {
763 /// tokio::task::spawn(std::future::ready(()));
764 ///
765 /// for _ in 0..64 {
766 /// tokio::task::yield_now().await;
767 /// }
768 /// })
769 /// # }
770 /// ```
771 #[cfg(all(not(loom), tokio_unstable))]
772 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
773 pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
774 where
775 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
776 {
777 self.before_spawn = Some(std::sync::Arc::new(f));
778 self
779 }
780
781 /// Executes function `f` just before a task is polled
782 ///
783 /// `f` is called within the Tokio context, so functions like
784 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
785 /// invoked immediately.
786 ///
787 /// **Note**: This is an [unstable API][unstable]. The public API of this type
788 /// may break in 1.x releases. See [the documentation on unstable
789 /// features][unstable] for details.
790 ///
791 /// [unstable]: crate#unstable-features
792 ///
793 /// # Examples
794 ///
795 /// ```
796 /// # #[cfg(not(target_family = "wasm"))]
797 /// # {
798 /// # use std::sync::{atomic::AtomicUsize, Arc};
799 /// # use tokio::task::yield_now;
800 /// # pub fn main() {
801 /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
802 /// let poll_start = poll_start_counter.clone();
803 /// let rt = tokio::runtime::Builder::new_multi_thread()
804 /// .enable_all()
805 /// .on_before_task_poll(move |meta| {
806 /// println!("task {} is about to be polled", meta.id())
807 /// })
808 /// .build()
809 /// .unwrap();
810 /// let task = rt.spawn(async {
811 /// yield_now().await;
812 /// });
813 /// let _ = rt.block_on(task);
814 ///
815 /// # }
816 /// # }
817 /// ```
818 #[cfg(tokio_unstable)]
819 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
820 pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
821 where
822 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
823 {
824 self.before_poll = Some(std::sync::Arc::new(f));
825 self
826 }
827
828 /// Executes function `f` just after a task is polled
829 ///
830 /// `f` is called within the Tokio context, so functions like
831 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
832 /// invoked immediately.
833 ///
834 /// **Note**: This is an [unstable API][unstable]. The public API of this type
835 /// may break in 1.x releases. See [the documentation on unstable
836 /// features][unstable] for details.
837 ///
838 /// [unstable]: crate#unstable-features
839 ///
840 /// # Examples
841 ///
842 /// ```
843 /// # #[cfg(not(target_family = "wasm"))]
844 /// # {
845 /// # use std::sync::{atomic::AtomicUsize, Arc};
846 /// # use tokio::task::yield_now;
847 /// # pub fn main() {
848 /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
849 /// let poll_stop = poll_stop_counter.clone();
850 /// let rt = tokio::runtime::Builder::new_multi_thread()
851 /// .enable_all()
852 /// .on_after_task_poll(move |meta| {
853 /// println!("task {} completed polling", meta.id());
854 /// })
855 /// .build()
856 /// .unwrap();
857 /// let task = rt.spawn(async {
858 /// yield_now().await;
859 /// });
860 /// let _ = rt.block_on(task);
861 ///
862 /// # }
863 /// # }
864 /// ```
865 #[cfg(tokio_unstable)]
866 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
867 pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
868 where
869 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
870 {
871 self.after_poll = Some(std::sync::Arc::new(f));
872 self
873 }
874
875 /// Executes function `f` just after a task is terminated.
876 ///
877 /// `f` is called within the Tokio context, so functions like
878 /// [`tokio::spawn`](crate::spawn) can be called.
879 ///
880 /// This can be used for bookkeeping or monitoring purposes.
881 ///
882 /// Note: There can only be one task termination callback for a runtime; calling this
883 /// function more than once replaces the last callback defined, rather than adding to it.
884 ///
885 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
886 ///
887 /// **Note**: This is an [unstable API][unstable]. The public API of this type
888 /// may break in 1.x releases. See [the documentation on unstable
889 /// features][unstable] for details.
890 ///
891 /// [unstable]: crate#unstable-features
892 ///
893 /// # Examples
894 ///
895 /// ```
896 /// # use tokio::runtime;
897 /// # pub fn main() {
898 /// let runtime = runtime::Builder::new_current_thread()
899 /// .on_task_terminate(|_| {
900 /// println!("killing task");
901 /// })
902 /// .build()
903 /// .unwrap();
904 ///
905 /// runtime.block_on(async {
906 /// tokio::task::spawn(std::future::ready(()));
907 ///
908 /// for _ in 0..64 {
909 /// tokio::task::yield_now().await;
910 /// }
911 /// })
912 /// # }
913 /// ```
914 #[cfg(all(not(loom), tokio_unstable))]
915 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
916 pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
917 where
918 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
919 {
920 self.after_termination = Some(std::sync::Arc::new(f));
921 self
922 }
923
924 /// Creates the configured `Runtime`.
925 ///
926 /// The returned `Runtime` instance is ready to spawn tasks.
927 ///
928 /// # Examples
929 ///
930 /// ```
931 /// # #[cfg(not(target_family = "wasm"))]
932 /// # {
933 /// use tokio::runtime::Builder;
934 ///
935 /// let rt = Builder::new_multi_thread().build().unwrap();
936 ///
937 /// rt.block_on(async {
938 /// println!("Hello from the Tokio runtime");
939 /// });
940 /// # }
941 /// ```
942 pub fn build(&mut self) -> io::Result<Runtime> {
943 match &self.kind {
944 Kind::CurrentThread => self.build_current_thread_runtime(),
945 #[cfg(feature = "rt-multi-thread")]
946 Kind::MultiThread => self.build_threaded_runtime(),
947 }
948 }
949
950 /// Creates the configured [`LocalRuntime`].
951 ///
952 /// The returned [`LocalRuntime`] instance is ready to spawn tasks.
953 ///
954 /// # Panics
955 ///
956 /// This will panic if the runtime is configured with [`new_multi_thread()`].
957 ///
958 /// [`new_multi_thread()`]: Builder::new_multi_thread
959 ///
960 /// # Examples
961 ///
962 /// ```
963 /// use tokio::runtime::{Builder, LocalOptions};
964 ///
965 /// let rt = Builder::new_current_thread()
966 /// .build_local(LocalOptions::default())
967 /// .unwrap();
968 ///
969 /// rt.spawn_local(async {
970 /// println!("Hello from the Tokio runtime");
971 /// });
972 /// ```
973 #[allow(unused_variables, unreachable_patterns)]
974 #[cfg(tokio_unstable)]
975 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
976 pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
977 match &self.kind {
978 Kind::CurrentThread => self.build_current_thread_local_runtime(),
979 #[cfg(feature = "rt-multi-thread")]
980 Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
981 }
982 }
983
984 fn get_cfg(&self) -> driver::Cfg {
985 driver::Cfg {
986 enable_pause_time: match self.kind {
987 Kind::CurrentThread => true,
988 #[cfg(feature = "rt-multi-thread")]
989 Kind::MultiThread => false,
990 },
991 enable_io: self.enable_io,
992 enable_time: self.enable_time,
993 start_paused: self.start_paused,
994 nevents: self.nevents,
995 }
996 }
997
998 /// Sets a custom timeout for a thread in the blocking pool.
999 ///
1000 /// By default, the timeout for a thread is set to 10 seconds. This can
1001 /// be overridden using `.thread_keep_alive()`.
1002 ///
1003 /// # Example
1004 ///
1005 /// ```
1006 /// # #[cfg(not(target_family = "wasm"))]
1007 /// # {
1008 /// # use tokio::runtime;
1009 /// # use std::time::Duration;
1010 /// # pub fn main() {
1011 /// let rt = runtime::Builder::new_multi_thread()
1012 /// .thread_keep_alive(Duration::from_millis(100))
1013 /// .build();
1014 /// # }
1015 /// # }
1016 /// ```
1017 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
1018 self.keep_alive = Some(duration);
1019 self
1020 }
1021
1022 /// Sets the number of scheduler ticks after which the scheduler will poll the global
1023 /// task queue.
1024 ///
1025 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1026 ///
1027 /// By default the global queue interval is 31 for the current-thread scheduler. Please see
1028 /// [the module documentation] for the default behavior of the multi-thread scheduler.
1029 ///
1030 /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
1031 /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
1032 /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
1033 /// getting started on new work, especially if tasks frequently yield rather than complete
1034 /// or await on further I/O. Setting the interval to `1` will prioritize the global queue and
1035 /// tasks from the local queue will be executed only if the global queue is empty.
1036 /// Conversely, a higher value prioritizes existing work, and is a good choice when most
1037 /// tasks quickly complete polling.
1038 ///
1039 /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
1040 ///
1041 /// # Panics
1042 ///
1043 /// This function will panic if 0 is passed as an argument.
1044 ///
1045 /// # Examples
1046 ///
1047 /// ```
1048 /// # #[cfg(not(target_family = "wasm"))]
1049 /// # {
1050 /// # use tokio::runtime;
1051 /// # pub fn main() {
1052 /// let rt = runtime::Builder::new_multi_thread()
1053 /// .global_queue_interval(31)
1054 /// .build();
1055 /// # }
1056 /// # }
1057 /// ```
1058 #[track_caller]
1059 pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1060 assert!(val > 0, "global_queue_interval must be greater than 0");
1061 self.global_queue_interval = Some(val);
1062 self
1063 }
1064
1065 /// Sets the number of scheduler ticks after which the scheduler will poll for
1066 /// external events (timers, I/O, and so on).
1067 ///
1068 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1069 ///
1070 /// By default, the event interval is `61` for all scheduler types.
1071 ///
1072 /// Setting the event interval determines the effective "priority" of delivering
1073 /// these external events (which may wake up additional tasks), compared to
1074 /// executing tasks that are currently ready to run. A smaller value is useful
1075 /// when tasks frequently spend a long time in polling, or frequently yield,
1076 /// which can result in overly long delays picking up I/O events. Conversely,
1077 /// picking up new events requires extra synchronization and syscall overhead,
1078 /// so if tasks generally complete their polling quickly, a higher event interval
1079 /// will minimize that overhead while still keeping the scheduler responsive to
1080 /// events.
1081 ///
1082 /// # Examples
1083 ///
1084 /// ```
1085 /// # #[cfg(not(target_family = "wasm"))]
1086 /// # {
1087 /// # use tokio::runtime;
1088 /// # pub fn main() {
1089 /// let rt = runtime::Builder::new_multi_thread()
1090 /// .event_interval(31)
1091 /// .build();
1092 /// # }
1093 /// # }
1094 /// ```
1095 pub fn event_interval(&mut self, val: u32) -> &mut Self {
1096 self.event_interval = val;
1097 self
1098 }
1099
1100 cfg_unstable! {
1101 /// Configure how the runtime responds to an unhandled panic on a
1102 /// spawned task.
1103 ///
1104 /// By default, an unhandled panic (i.e. a panic not caught by
1105 /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1106 /// execution. The panic's error value is forwarded to the task's
1107 /// [`JoinHandle`] and all other spawned tasks continue running.
1108 ///
1109 /// The `unhandled_panic` option enables configuring this behavior.
1110 ///
1111 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1112 /// spawned tasks have no impact on the runtime's execution.
1113 /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1114 /// shutdown immediately when a spawned task panics even if that
1115 /// task's `JoinHandle` has not been dropped. All other spawned tasks
1116 /// will immediately terminate and further calls to
1117 /// [`Runtime::block_on`] will panic.
1118 ///
1119 /// # Panics
1120 /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1121 /// on a runtime other than the current thread runtime.
1122 ///
1123 /// # Unstable
1124 ///
1125 /// This option is currently unstable and its implementation is
1126 /// incomplete. The API may change or be removed in the future. See
1127 /// issue [tokio-rs/tokio#4516] for more details.
1128 ///
1129 /// # Examples
1130 ///
1131 /// The following demonstrates a runtime configured to shutdown on
1132 /// panic. The first spawned task panics and results in the runtime
1133 /// shutting down. The second spawned task never has a chance to
1134 /// execute. The call to `block_on` will panic due to the runtime being
1135 /// forcibly shutdown.
1136 ///
1137 /// ```should_panic
1138 /// use tokio::runtime::{self, UnhandledPanic};
1139 ///
1140 /// # pub fn main() {
1141 /// let rt = runtime::Builder::new_current_thread()
1142 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1143 /// .build()
1144 /// .unwrap();
1145 ///
1146 /// rt.spawn(async { panic!("boom"); });
1147 /// rt.spawn(async {
1148 /// // This task never completes.
1149 /// });
1150 ///
1151 /// rt.block_on(async {
1152 /// // Do some work
1153 /// # loop { tokio::task::yield_now().await; }
1154 /// })
1155 /// # }
1156 /// ```
1157 ///
1158 /// [`JoinHandle`]: struct@crate::task::JoinHandle
1159 /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1160 pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1161 if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1162 panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1163 }
1164
1165 self.unhandled_panic = behavior;
1166 self
1167 }
1168
1169 /// Disables the LIFO task scheduler heuristic.
1170 ///
1171 /// The multi-threaded scheduler includes a heuristic for optimizing
1172 /// message-passing patterns. This heuristic results in the **last**
1173 /// scheduled task being polled first.
1174 ///
1175 /// To implement this heuristic, each worker thread has a slot which
1176 /// holds the task that should be polled next. However, this slot cannot
1177 /// be stolen by other worker threads, which can result in lower total
1178 /// throughput when tasks tend to have longer poll times.
1179 ///
1180 /// This configuration option will disable this heuristic resulting in
1181 /// all scheduled tasks being pushed into the worker-local queue, which
1182 /// is stealable.
1183 ///
1184 /// Consider trying this option when the task "scheduled" time is high
1185 /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1186 /// collect this data.
1187 ///
1188 /// # Unstable
1189 ///
1190 /// This configuration option is considered a workaround for the LIFO
1191 /// slot not being stealable. When the slot becomes stealable, we will
1192 /// revisit whether or not this option is necessary. See
1193 /// issue [tokio-rs/tokio#4941].
1194 ///
1195 /// # Examples
1196 ///
1197 /// ```
1198 /// # #[cfg(not(target_family = "wasm"))]
1199 /// # {
1200 /// use tokio::runtime;
1201 ///
1202 /// let rt = runtime::Builder::new_multi_thread()
1203 /// .disable_lifo_slot()
1204 /// .build()
1205 /// .unwrap();
1206 /// # }
1207 /// ```
1208 ///
1209 /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1210 /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1211 pub fn disable_lifo_slot(&mut self) -> &mut Self {
1212 self.disable_lifo_slot = true;
1213 self
1214 }
1215
1216 /// Specifies the random number generation seed to use within all
1217 /// threads associated with the runtime being built.
1218 ///
1219 /// This option is intended to make certain parts of the runtime
1220 /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1221 /// [`tokio::select!`] it will ensure that the order that branches are
1222 /// polled is deterministic.
1223 ///
1224 /// In addition to the code specifying `rng_seed` and interacting with
1225 /// the runtime, the internals of Tokio and the Rust compiler may affect
1226 /// the sequences of random numbers. In order to ensure repeatable
1227 /// results, the version of Tokio, the versions of all other
1228 /// dependencies that interact with Tokio, and the Rust compiler version
1229 /// should also all remain constant.
1230 ///
1231 /// # Examples
1232 ///
1233 /// ```
1234 /// # use tokio::runtime::{self, RngSeed};
1235 /// # pub fn main() {
1236 /// let seed = RngSeed::from_bytes(b"place your seed here");
1237 /// let rt = runtime::Builder::new_current_thread()
1238 /// .rng_seed(seed)
1239 /// .build();
1240 /// # }
1241 /// ```
1242 ///
1243 /// [`tokio::select!`]: crate::select
1244 pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1245 self.seed_generator = RngSeedGenerator::new(seed);
1246 self
1247 }
1248 }
1249
1250 cfg_unstable_metrics! {
1251 /// Enables tracking the distribution of task poll times.
1252 ///
1253 /// Task poll times are not instrumented by default as doing so requires
1254 /// calling [`Instant::now()`] twice per task poll, which could add
1255 /// measurable overhead. Use the [`Handle::metrics()`] to access the
1256 /// metrics data.
1257 ///
1258 /// The histogram uses fixed bucket sizes. In other words, the histogram
1259 /// buckets are not dynamic based on input values. Use the
1260 /// `metrics_poll_time_histogram` builder methods to configure the
1261 /// histogram details.
1262 ///
1263 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1264 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1265 /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1266 /// to select [`LogHistogram`] instead.
1267 ///
1268 /// # Examples
1269 ///
1270 /// ```
1271 /// # #[cfg(not(target_family = "wasm"))]
1272 /// # {
1273 /// use tokio::runtime;
1274 ///
1275 /// let rt = runtime::Builder::new_multi_thread()
1276 /// .enable_metrics_poll_time_histogram()
1277 /// .build()
1278 /// .unwrap();
1279 /// # // Test default values here
1280 /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1281 /// # let m = rt.handle().metrics();
1282 /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1283 /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1284 /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1285 /// # }
1286 /// ```
1287 ///
1288 /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1289 /// [`Instant::now()`]: std::time::Instant::now
1290 /// [`LogHistogram`]: crate::runtime::LogHistogram
1291 /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1292 pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1293 self.metrics_poll_count_histogram_enable = true;
1294 self
1295 }
1296
1297 /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1298 ///
1299 /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1300 #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1301 #[doc(hidden)]
1302 pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1303 self.enable_metrics_poll_time_histogram()
1304 }
1305
1306 /// Sets the histogram scale for tracking the distribution of task poll
1307 /// times.
1308 ///
1309 /// Tracking the distribution of task poll times can be done using a
1310 /// linear or log scale. When using linear scale, each histogram bucket
1311 /// will represent the same range of poll times. When using log scale,
1312 /// each histogram bucket will cover a range twice as big as the
1313 /// previous bucket.
1314 ///
1315 /// **Default:** linear scale.
1316 ///
1317 /// # Examples
1318 ///
1319 /// ```
1320 /// # #[cfg(not(target_family = "wasm"))]
1321 /// # {
1322 /// use tokio::runtime::{self, HistogramScale};
1323 ///
1324 /// # #[allow(deprecated)]
1325 /// let rt = runtime::Builder::new_multi_thread()
1326 /// .enable_metrics_poll_time_histogram()
1327 /// .metrics_poll_count_histogram_scale(HistogramScale::Log)
1328 /// .build()
1329 /// .unwrap();
1330 /// # }
1331 /// ```
1332 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1333 pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1334 self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1335 self
1336 }
1337
1338 /// Configure the histogram for tracking poll times
1339 ///
1340 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1341 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1342 /// better granularity with low memory usage, use [`LogHistogram`] instead.
1343 ///
1344 /// # Examples
1345 /// Configure a [`LogHistogram`] with [default configuration]:
1346 /// ```
1347 /// # #[cfg(not(target_family = "wasm"))]
1348 /// # {
1349 /// use tokio::runtime;
1350 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1351 ///
1352 /// let rt = runtime::Builder::new_multi_thread()
1353 /// .enable_metrics_poll_time_histogram()
1354 /// .metrics_poll_time_histogram_configuration(
1355 /// HistogramConfiguration::log(LogHistogram::default())
1356 /// )
1357 /// .build()
1358 /// .unwrap();
1359 /// # }
1360 /// ```
1361 ///
1362 /// Configure a linear histogram with 100 buckets, each 10μs wide
1363 /// ```
1364 /// # #[cfg(not(target_family = "wasm"))]
1365 /// # {
1366 /// use tokio::runtime;
1367 /// use std::time::Duration;
1368 /// use tokio::runtime::HistogramConfiguration;
1369 ///
1370 /// let rt = runtime::Builder::new_multi_thread()
1371 /// .enable_metrics_poll_time_histogram()
1372 /// .metrics_poll_time_histogram_configuration(
1373 /// HistogramConfiguration::linear(Duration::from_micros(10), 100)
1374 /// )
1375 /// .build()
1376 /// .unwrap();
1377 /// # }
1378 /// ```
1379 ///
1380 /// Configure a [`LogHistogram`] with the following settings:
1381 /// - Measure times from 100ns to 120s
1382 /// - Max error of 0.1
1383 /// - No more than 1024 buckets
1384 /// ```
1385 /// # #[cfg(not(target_family = "wasm"))]
1386 /// # {
1387 /// use std::time::Duration;
1388 /// use tokio::runtime;
1389 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1390 ///
1391 /// let rt = runtime::Builder::new_multi_thread()
1392 /// .enable_metrics_poll_time_histogram()
1393 /// .metrics_poll_time_histogram_configuration(
1394 /// HistogramConfiguration::log(LogHistogram::builder()
1395 /// .max_value(Duration::from_secs(120))
1396 /// .min_value(Duration::from_nanos(100))
1397 /// .max_error(0.1)
1398 /// .max_buckets(1024)
1399 /// .expect("configuration uses 488 buckets")
1400 /// )
1401 /// )
1402 /// .build()
1403 /// .unwrap();
1404 /// # }
1405 /// ```
1406 ///
1407 /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1408 /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1409 /// where each bucket is twice the size of the previous bucket.
1410 /// ```rust
1411 /// use std::time::Duration;
1412 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1413 /// let rt = tokio::runtime::Builder::new_current_thread()
1414 /// .enable_all()
1415 /// .enable_metrics_poll_time_histogram()
1416 /// .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1417 /// LogHistogram::builder()
1418 /// .min_value(Duration::from_micros(20))
1419 /// .max_value(Duration::from_millis(4))
1420 /// // Set `precision_exact` to `0` to match `HistogramScale::Log`
1421 /// .precision_exact(0)
1422 /// .max_buckets(10)
1423 /// .unwrap(),
1424 /// ))
1425 /// .build()
1426 /// .unwrap();
1427 /// ```
1428 ///
1429 /// [`LogHistogram`]: crate::runtime::LogHistogram
1430 /// [default configuration]: crate::runtime::LogHistogramBuilder
1431 /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1432 pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1433 self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1434 self
1435 }
1436
1437 /// Sets the histogram resolution for tracking the distribution of task
1438 /// poll times.
1439 ///
1440 /// The resolution is the histogram's first bucket's range. When using a
1441 /// linear histogram scale, each bucket will cover the same range. When
1442 /// using a log scale, each bucket will cover a range twice as big as
1443 /// the previous bucket. In the log case, the resolution represents the
1444 /// smallest bucket range.
1445 ///
1446 /// Note that, when using log scale, the resolution is rounded up to the
1447 /// nearest power of 2 in nanoseconds.
1448 ///
1449 /// **Default:** 100 microseconds.
1450 ///
1451 /// # Examples
1452 ///
1453 /// ```
1454 /// # #[cfg(not(target_family = "wasm"))]
1455 /// # {
1456 /// use tokio::runtime;
1457 /// use std::time::Duration;
1458 ///
1459 /// # #[allow(deprecated)]
1460 /// let rt = runtime::Builder::new_multi_thread()
1461 /// .enable_metrics_poll_time_histogram()
1462 /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1463 /// .build()
1464 /// .unwrap();
1465 /// # }
1466 /// ```
1467 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1468 pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1469 assert!(resolution > Duration::from_secs(0));
1470 // Sanity check the argument and also make the cast below safe.
1471 assert!(resolution <= Duration::from_secs(1));
1472
1473 let resolution = resolution.as_nanos() as u64;
1474
1475 self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1476 self
1477 }
1478
1479 /// Sets the number of buckets for the histogram tracking the
1480 /// distribution of task poll times.
1481 ///
1482 /// The last bucket tracks all greater values that fall out of other
1483 /// ranges. So, configuring the histogram using a linear scale,
1484 /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1485 /// polls that take more than 450ms to complete.
1486 ///
1487 /// **Default:** 10
1488 ///
1489 /// # Examples
1490 ///
1491 /// ```
1492 /// # #[cfg(not(target_family = "wasm"))]
1493 /// # {
1494 /// use tokio::runtime;
1495 ///
1496 /// # #[allow(deprecated)]
1497 /// let rt = runtime::Builder::new_multi_thread()
1498 /// .enable_metrics_poll_time_histogram()
1499 /// .metrics_poll_count_histogram_buckets(15)
1500 /// .build()
1501 /// .unwrap();
1502 /// # }
1503 /// ```
1504 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1505 pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1506 self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1507 self
1508 }
1509 }
1510
1511 fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1512 use crate::runtime::runtime::Scheduler;
1513
1514 let (scheduler, handle, blocking_pool) =
1515 self.build_current_thread_runtime_components(None)?;
1516
1517 Ok(Runtime::from_parts(
1518 Scheduler::CurrentThread(scheduler),
1519 handle,
1520 blocking_pool,
1521 ))
1522 }
1523
1524 #[cfg(tokio_unstable)]
1525 fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1526 use crate::runtime::local_runtime::LocalRuntimeScheduler;
1527
1528 let tid = std::thread::current().id();
1529
1530 let (scheduler, handle, blocking_pool) =
1531 self.build_current_thread_runtime_components(Some(tid))?;
1532
1533 Ok(LocalRuntime::from_parts(
1534 LocalRuntimeScheduler::CurrentThread(scheduler),
1535 handle,
1536 blocking_pool,
1537 ))
1538 }
1539
1540 fn build_current_thread_runtime_components(
1541 &mut self,
1542 local_tid: Option<ThreadId>,
1543 ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1544 use crate::runtime::scheduler;
1545 use crate::runtime::Config;
1546
1547 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1548
1549 // Blocking pool
1550 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1551 let blocking_spawner = blocking_pool.spawner().clone();
1552
1553 // Generate a rng seed for this runtime.
1554 let seed_generator_1 = self.seed_generator.next_generator();
1555 let seed_generator_2 = self.seed_generator.next_generator();
1556
1557 // And now put a single-threaded scheduler on top of the timer. When
1558 // there are no futures ready to do something, it'll let the timer or
1559 // the reactor to generate some new stimuli for the futures to continue
1560 // in their life.
1561 let (scheduler, handle) = CurrentThread::new(
1562 driver,
1563 driver_handle,
1564 blocking_spawner,
1565 seed_generator_2,
1566 Config {
1567 before_park: self.before_park.clone(),
1568 after_unpark: self.after_unpark.clone(),
1569 before_spawn: self.before_spawn.clone(),
1570 #[cfg(tokio_unstable)]
1571 before_poll: self.before_poll.clone(),
1572 #[cfg(tokio_unstable)]
1573 after_poll: self.after_poll.clone(),
1574 after_termination: self.after_termination.clone(),
1575 global_queue_interval: self.global_queue_interval,
1576 event_interval: self.event_interval,
1577 #[cfg(tokio_unstable)]
1578 unhandled_panic: self.unhandled_panic.clone(),
1579 disable_lifo_slot: self.disable_lifo_slot,
1580 seed_generator: seed_generator_1,
1581 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1582 },
1583 local_tid,
1584 );
1585
1586 let handle = Handle {
1587 inner: scheduler::Handle::CurrentThread(handle),
1588 };
1589
1590 Ok((scheduler, handle, blocking_pool))
1591 }
1592
1593 fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1594 if self.metrics_poll_count_histogram_enable {
1595 Some(self.metrics_poll_count_histogram.clone())
1596 } else {
1597 None
1598 }
1599 }
1600}
1601
1602cfg_io_driver! {
1603 impl Builder {
1604 /// Enables the I/O driver.
1605 ///
1606 /// Doing this enables using net, process, signal, and some I/O types on
1607 /// the runtime.
1608 ///
1609 /// # Examples
1610 ///
1611 /// ```
1612 /// use tokio::runtime;
1613 ///
1614 /// let rt = runtime::Builder::new_multi_thread()
1615 /// .enable_io()
1616 /// .build()
1617 /// .unwrap();
1618 /// ```
1619 pub fn enable_io(&mut self) -> &mut Self {
1620 self.enable_io = true;
1621 self
1622 }
1623
1624 /// Enables the I/O driver and configures the max number of events to be
1625 /// processed per tick.
1626 ///
1627 /// # Examples
1628 ///
1629 /// ```
1630 /// use tokio::runtime;
1631 ///
1632 /// let rt = runtime::Builder::new_current_thread()
1633 /// .enable_io()
1634 /// .max_io_events_per_tick(1024)
1635 /// .build()
1636 /// .unwrap();
1637 /// ```
1638 pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1639 self.nevents = capacity;
1640 self
1641 }
1642 }
1643}
1644
1645cfg_time! {
1646 impl Builder {
1647 /// Enables the time driver.
1648 ///
1649 /// Doing this enables using `tokio::time` on the runtime.
1650 ///
1651 /// # Examples
1652 ///
1653 /// ```
1654 /// # #[cfg(not(target_family = "wasm"))]
1655 /// # {
1656 /// use tokio::runtime;
1657 ///
1658 /// let rt = runtime::Builder::new_multi_thread()
1659 /// .enable_time()
1660 /// .build()
1661 /// .unwrap();
1662 /// # }
1663 /// ```
1664 pub fn enable_time(&mut self) -> &mut Self {
1665 self.enable_time = true;
1666 self
1667 }
1668 }
1669}
1670
1671cfg_io_uring! {
1672 impl Builder {
1673 /// Enables the tokio's io_uring driver.
1674 ///
1675 /// Doing this enables using io_uring operations on the runtime.
1676 ///
1677 /// # Examples
1678 ///
1679 /// ```
1680 /// use tokio::runtime;
1681 ///
1682 /// let rt = runtime::Builder::new_multi_thread()
1683 /// .enable_io_uring()
1684 /// .build()
1685 /// .unwrap();
1686 /// ```
1687 #[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))]
1688 pub fn enable_io_uring(&mut self) -> &mut Self {
1689 // Currently, the uring flag is equivalent to `enable_io`.
1690 self.enable_io = true;
1691 self
1692 }
1693 }
1694}
1695
1696cfg_test_util! {
1697 impl Builder {
1698 /// Controls if the runtime's clock starts paused or advancing.
1699 ///
1700 /// Pausing time requires the current-thread runtime; construction of
1701 /// the runtime will panic otherwise.
1702 ///
1703 /// # Examples
1704 ///
1705 /// ```
1706 /// use tokio::runtime;
1707 ///
1708 /// let rt = runtime::Builder::new_current_thread()
1709 /// .enable_time()
1710 /// .start_paused(true)
1711 /// .build()
1712 /// .unwrap();
1713 /// ```
1714 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1715 self.start_paused = start_paused;
1716 self
1717 }
1718 }
1719}
1720
1721cfg_rt_multi_thread! {
1722 impl Builder {
1723 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1724 use crate::loom::sys::num_cpus;
1725 use crate::runtime::{Config, runtime::Scheduler};
1726 use crate::runtime::scheduler::{self, MultiThread};
1727
1728 let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1729
1730 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1731
1732 // Create the blocking pool
1733 let blocking_pool =
1734 blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1735 let blocking_spawner = blocking_pool.spawner().clone();
1736
1737 // Generate a rng seed for this runtime.
1738 let seed_generator_1 = self.seed_generator.next_generator();
1739 let seed_generator_2 = self.seed_generator.next_generator();
1740
1741 let (scheduler, handle, launch) = MultiThread::new(
1742 worker_threads,
1743 driver,
1744 driver_handle,
1745 blocking_spawner,
1746 seed_generator_2,
1747 Config {
1748 before_park: self.before_park.clone(),
1749 after_unpark: self.after_unpark.clone(),
1750 before_spawn: self.before_spawn.clone(),
1751 #[cfg(tokio_unstable)]
1752 before_poll: self.before_poll.clone(),
1753 #[cfg(tokio_unstable)]
1754 after_poll: self.after_poll.clone(),
1755 after_termination: self.after_termination.clone(),
1756 global_queue_interval: self.global_queue_interval,
1757 event_interval: self.event_interval,
1758 #[cfg(tokio_unstable)]
1759 unhandled_panic: self.unhandled_panic.clone(),
1760 disable_lifo_slot: self.disable_lifo_slot,
1761 seed_generator: seed_generator_1,
1762 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1763 },
1764 );
1765
1766 let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1767
1768 // Spawn the thread pool workers
1769 let _enter = handle.enter();
1770 launch.launch();
1771
1772 Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1773 }
1774 }
1775}
1776
1777impl fmt::Debug for Builder {
1778 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1779 fmt.debug_struct("Builder")
1780 .field("worker_threads", &self.worker_threads)
1781 .field("max_blocking_threads", &self.max_blocking_threads)
1782 .field(
1783 "thread_name",
1784 &"<dyn Fn() -> String + Send + Sync + 'static>",
1785 )
1786 .field("thread_stack_size", &self.thread_stack_size)
1787 .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1788 .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1789 .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1790 .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1791 .finish()
1792 }
1793}