tokio/runtime/
builder.rs

1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
5#[cfg(tokio_unstable)]
6use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
7use crate::util::rand::{RngSeed, RngSeedGenerator};
8
9use crate::runtime::blocking::BlockingPool;
10use crate::runtime::scheduler::CurrentThread;
11use std::fmt;
12use std::io;
13use std::thread::ThreadId;
14use std::time::Duration;
15
16/// Builds Tokio Runtime with custom configuration values.
17///
18/// Methods can be chained in order to set the configuration values. The
19/// Runtime is constructed by calling [`build`].
20///
21/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
22/// or [`Builder::new_current_thread`].
23///
24/// See function level documentation for details on the various configuration
25/// settings.
26///
27/// [`build`]: method@Self::build
28/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
29/// [`Builder::new_current_thread`]: method@Self::new_current_thread
30///
31/// # Examples
32///
33/// ```
34/// use tokio::runtime::Builder;
35///
36/// fn main() {
37///     // build runtime
38///     let runtime = Builder::new_multi_thread()
39///         .worker_threads(4)
40///         .thread_name("my-custom-name")
41///         .thread_stack_size(3 * 1024 * 1024)
42///         .build()
43///         .unwrap();
44///
45///     // use runtime ...
46/// }
47/// ```
48pub struct Builder {
49    /// Runtime type
50    kind: Kind,
51
52    /// Whether or not to enable the I/O driver
53    enable_io: bool,
54    nevents: usize,
55
56    /// Whether or not to enable the time driver
57    enable_time: bool,
58
59    /// Whether or not the clock should start paused.
60    start_paused: bool,
61
62    /// The number of worker threads, used by Runtime.
63    ///
64    /// Only used when not using the current-thread executor.
65    worker_threads: Option<usize>,
66
67    /// Cap on thread usage.
68    max_blocking_threads: usize,
69
70    /// Name fn used for threads spawned by the runtime.
71    pub(super) thread_name: ThreadNameFn,
72
73    /// Stack size used for threads spawned by the runtime.
74    pub(super) thread_stack_size: Option<usize>,
75
76    /// Callback to run after each thread starts.
77    pub(super) after_start: Option<Callback>,
78
79    /// To run before each worker thread stops
80    pub(super) before_stop: Option<Callback>,
81
82    /// To run before each worker thread is parked.
83    pub(super) before_park: Option<Callback>,
84
85    /// To run after each thread is unparked.
86    pub(super) after_unpark: Option<Callback>,
87
88    /// To run before each task is spawned.
89    pub(super) before_spawn: Option<TaskCallback>,
90
91    /// To run before each poll
92    #[cfg(tokio_unstable)]
93    pub(super) before_poll: Option<TaskCallback>,
94
95    /// To run after each poll
96    #[cfg(tokio_unstable)]
97    pub(super) after_poll: Option<TaskCallback>,
98
99    /// To run after each task is terminated.
100    pub(super) after_termination: Option<TaskCallback>,
101
102    /// Customizable keep alive timeout for `BlockingPool`
103    pub(super) keep_alive: Option<Duration>,
104
105    /// How many ticks before pulling a task from the global/remote queue?
106    ///
107    /// When `None`, the value is unspecified and behavior details are left to
108    /// the scheduler. Each scheduler flavor could choose to either pick its own
109    /// default value or use some other strategy to decide when to poll from the
110    /// global queue. For example, the multi-threaded scheduler uses a
111    /// self-tuning strategy based on mean task poll times.
112    pub(super) global_queue_interval: Option<u32>,
113
114    /// How many ticks before yielding to the driver for timer and I/O events?
115    pub(super) event_interval: u32,
116
117    /// When true, the multi-threade scheduler LIFO slot should not be used.
118    ///
119    /// This option should only be exposed as unstable.
120    pub(super) disable_lifo_slot: bool,
121
122    /// Specify a random number generator seed to provide deterministic results
123    pub(super) seed_generator: RngSeedGenerator,
124
125    /// When true, enables task poll count histogram instrumentation.
126    pub(super) metrics_poll_count_histogram_enable: bool,
127
128    /// Configures the task poll count histogram
129    pub(super) metrics_poll_count_histogram: HistogramBuilder,
130
131    #[cfg(tokio_unstable)]
132    pub(super) unhandled_panic: UnhandledPanic,
133}
134
135cfg_unstable! {
136    /// How the runtime should respond to unhandled panics.
137    ///
138    /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
139    /// to configure the runtime behavior when a spawned task panics.
140    ///
141    /// See [`Builder::unhandled_panic`] for more details.
142    #[derive(Debug, Clone)]
143    #[non_exhaustive]
144    pub enum UnhandledPanic {
145        /// The runtime should ignore panics on spawned tasks.
146        ///
147        /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
148        /// tasks continue running normally.
149        ///
150        /// This is the default behavior.
151        ///
152        /// # Examples
153        ///
154        /// ```
155        /// use tokio::runtime::{self, UnhandledPanic};
156        ///
157        /// # pub fn main() {
158        /// let rt = runtime::Builder::new_current_thread()
159        ///     .unhandled_panic(UnhandledPanic::Ignore)
160        ///     .build()
161        ///     .unwrap();
162        ///
163        /// let task1 = rt.spawn(async { panic!("boom"); });
164        /// let task2 = rt.spawn(async {
165        ///     // This task completes normally
166        ///     "done"
167        /// });
168        ///
169        /// rt.block_on(async {
170        ///     // The panic on the first task is forwarded to the `JoinHandle`
171        ///     assert!(task1.await.is_err());
172        ///
173        ///     // The second task completes normally
174        ///     assert!(task2.await.is_ok());
175        /// })
176        /// # }
177        /// ```
178        ///
179        /// [`JoinHandle`]: struct@crate::task::JoinHandle
180        Ignore,
181
182        /// The runtime should immediately shutdown if a spawned task panics.
183        ///
184        /// The runtime will immediately shutdown even if the panicked task's
185        /// [`JoinHandle`] is still available. All further spawned tasks will be
186        /// immediately dropped and call to [`Runtime::block_on`] will panic.
187        ///
188        /// # Examples
189        ///
190        /// ```should_panic
191        /// use tokio::runtime::{self, UnhandledPanic};
192        ///
193        /// # pub fn main() {
194        /// let rt = runtime::Builder::new_current_thread()
195        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
196        ///     .build()
197        ///     .unwrap();
198        ///
199        /// rt.spawn(async { panic!("boom"); });
200        /// rt.spawn(async {
201        ///     // This task never completes.
202        /// });
203        ///
204        /// rt.block_on(async {
205        ///     // Do some work
206        /// # loop { tokio::task::yield_now().await; }
207        /// })
208        /// # }
209        /// ```
210        ///
211        /// [`JoinHandle`]: struct@crate::task::JoinHandle
212        ShutdownRuntime,
213    }
214}
215
216pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
217
218#[derive(Clone, Copy)]
219pub(crate) enum Kind {
220    CurrentThread,
221    #[cfg(feature = "rt-multi-thread")]
222    MultiThread,
223}
224
225impl Builder {
226    /// Returns a new builder with the current thread scheduler selected.
227    ///
228    /// Configuration methods can be chained on the return value.
229    ///
230    /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
231    /// [`LocalSet`].
232    ///
233    /// [`LocalSet`]: crate::task::LocalSet
234    pub fn new_current_thread() -> Builder {
235        #[cfg(loom)]
236        const EVENT_INTERVAL: u32 = 4;
237        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
238        #[cfg(not(loom))]
239        const EVENT_INTERVAL: u32 = 61;
240
241        Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
242    }
243
244    /// Returns a new builder with the multi thread scheduler selected.
245    ///
246    /// Configuration methods can be chained on the return value.
247    #[cfg(feature = "rt-multi-thread")]
248    #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
249    pub fn new_multi_thread() -> Builder {
250        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
251        Builder::new(Kind::MultiThread, 61)
252    }
253
254    /// Returns a new runtime builder initialized with default configuration
255    /// values.
256    ///
257    /// Configuration methods can be chained on the return value.
258    pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
259        Builder {
260            kind,
261
262            // I/O defaults to "off"
263            enable_io: false,
264            nevents: 1024,
265
266            // Time defaults to "off"
267            enable_time: false,
268
269            // The clock starts not-paused
270            start_paused: false,
271
272            // Read from environment variable first in multi-threaded mode.
273            // Default to lazy auto-detection (one thread per CPU core)
274            worker_threads: None,
275
276            max_blocking_threads: 512,
277
278            // Default thread name
279            thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
280
281            // Do not set a stack size by default
282            thread_stack_size: None,
283
284            // No worker thread callbacks
285            after_start: None,
286            before_stop: None,
287            before_park: None,
288            after_unpark: None,
289
290            before_spawn: None,
291            after_termination: None,
292
293            #[cfg(tokio_unstable)]
294            before_poll: None,
295            #[cfg(tokio_unstable)]
296            after_poll: None,
297
298            keep_alive: None,
299
300            // Defaults for these values depend on the scheduler kind, so we get them
301            // as parameters.
302            global_queue_interval: None,
303            event_interval,
304
305            seed_generator: RngSeedGenerator::new(RngSeed::new()),
306
307            #[cfg(tokio_unstable)]
308            unhandled_panic: UnhandledPanic::Ignore,
309
310            metrics_poll_count_histogram_enable: false,
311
312            metrics_poll_count_histogram: HistogramBuilder::default(),
313
314            disable_lifo_slot: false,
315        }
316    }
317
318    /// Enables both I/O and time drivers.
319    ///
320    /// Doing this is a shorthand for calling `enable_io` and `enable_time`
321    /// individually. If additional components are added to Tokio in the future,
322    /// `enable_all` will include these future components.
323    ///
324    /// # Examples
325    ///
326    /// ```
327    /// use tokio::runtime;
328    ///
329    /// let rt = runtime::Builder::new_multi_thread()
330    ///     .enable_all()
331    ///     .build()
332    ///     .unwrap();
333    /// ```
334    pub fn enable_all(&mut self) -> &mut Self {
335        #[cfg(any(
336            feature = "net",
337            all(unix, feature = "process"),
338            all(unix, feature = "signal")
339        ))]
340        self.enable_io();
341        #[cfg(feature = "time")]
342        self.enable_time();
343
344        self
345    }
346
347    /// Sets the number of worker threads the `Runtime` will use.
348    ///
349    /// This can be any number above 0 though it is advised to keep this value
350    /// on the smaller side.
351    ///
352    /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
353    ///
354    /// # Default
355    ///
356    /// The default value is the number of cores available to the system.
357    ///
358    /// When using the `current_thread` runtime this method has no effect.
359    ///
360    /// # Examples
361    ///
362    /// ## Multi threaded runtime with 4 threads
363    ///
364    /// ```
365    /// use tokio::runtime;
366    ///
367    /// // This will spawn a work-stealing runtime with 4 worker threads.
368    /// let rt = runtime::Builder::new_multi_thread()
369    ///     .worker_threads(4)
370    ///     .build()
371    ///     .unwrap();
372    ///
373    /// rt.spawn(async move {});
374    /// ```
375    ///
376    /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
377    ///
378    /// ```
379    /// use tokio::runtime;
380    ///
381    /// // Create a runtime that _must_ be driven from a call
382    /// // to `Runtime::block_on`.
383    /// let rt = runtime::Builder::new_current_thread()
384    ///     .build()
385    ///     .unwrap();
386    ///
387    /// // This will run the runtime and future on the current thread
388    /// rt.block_on(async move {});
389    /// ```
390    ///
391    /// # Panics
392    ///
393    /// This will panic if `val` is not larger than `0`.
394    #[track_caller]
395    pub fn worker_threads(&mut self, val: usize) -> &mut Self {
396        assert!(val > 0, "Worker threads cannot be set to 0");
397        self.worker_threads = Some(val);
398        self
399    }
400
401    /// Specifies the limit for additional threads spawned by the Runtime.
402    ///
403    /// These threads are used for blocking operations like tasks spawned
404    /// through [`spawn_blocking`], this includes but is not limited to:
405    /// - [`fs`] operations
406    /// - dns resolution through [`ToSocketAddrs`]
407    /// - writing to [`Stdout`] or [`Stderr`]
408    /// - reading from [`Stdin`]
409    ///
410    /// Unlike the [`worker_threads`], they are not always active and will exit
411    /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
412    ///
413    /// It's recommended to not set this limit too low in order to avoid hanging on operations
414    /// requiring [`spawn_blocking`].
415    ///
416    /// The default value is 512.
417    ///
418    /// # Queue Behavior
419    ///
420    /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
421    /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
422    /// method has not been reached, a new thread will be spawned. If no idle thread is available
423    /// and no more threads are allowed to be spawned, the task will remain in the queue until one
424    /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
425    /// it could potentially grow unbounded.
426    ///
427    /// # Panics
428    ///
429    /// This will panic if `val` is not larger than `0`.
430    ///
431    /// # Upgrading from 0.x
432    ///
433    /// In old versions `max_threads` limited both blocking and worker threads, but the
434    /// current `max_blocking_threads` does not include async worker threads in the count.
435    ///
436    /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
437    /// [`fs`]: mod@crate::fs
438    /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
439    /// [`Stdout`]: struct@crate::io::Stdout
440    /// [`Stdin`]: struct@crate::io::Stdin
441    /// [`Stderr`]: struct@crate::io::Stderr
442    /// [`worker_threads`]: Self::worker_threads
443    /// [`thread_keep_alive`]: Self::thread_keep_alive
444    #[track_caller]
445    #[cfg_attr(docsrs, doc(alias = "max_threads"))]
446    pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
447        assert!(val > 0, "Max blocking threads cannot be set to 0");
448        self.max_blocking_threads = val;
449        self
450    }
451
452    /// Sets name of threads spawned by the `Runtime`'s thread pool.
453    ///
454    /// The default name is "tokio-runtime-worker".
455    ///
456    /// # Examples
457    ///
458    /// ```
459    /// # use tokio::runtime;
460    ///
461    /// # pub fn main() {
462    /// let rt = runtime::Builder::new_multi_thread()
463    ///     .thread_name("my-pool")
464    ///     .build();
465    /// # }
466    /// ```
467    pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
468        let val = val.into();
469        self.thread_name = std::sync::Arc::new(move || val.clone());
470        self
471    }
472
473    /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
474    ///
475    /// The default name fn is `|| "tokio-runtime-worker".into()`.
476    ///
477    /// # Examples
478    ///
479    /// ```
480    /// # use tokio::runtime;
481    /// # use std::sync::atomic::{AtomicUsize, Ordering};
482    /// # pub fn main() {
483    /// let rt = runtime::Builder::new_multi_thread()
484    ///     .thread_name_fn(|| {
485    ///        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
486    ///        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
487    ///        format!("my-pool-{}", id)
488    ///     })
489    ///     .build();
490    /// # }
491    /// ```
492    pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
493    where
494        F: Fn() -> String + Send + Sync + 'static,
495    {
496        self.thread_name = std::sync::Arc::new(f);
497        self
498    }
499
500    /// Sets the stack size (in bytes) for worker threads.
501    ///
502    /// The actual stack size may be greater than this value if the platform
503    /// specifies minimal stack size.
504    ///
505    /// The default stack size for spawned threads is 2 MiB, though this
506    /// particular stack size is subject to change in the future.
507    ///
508    /// # Examples
509    ///
510    /// ```
511    /// # use tokio::runtime;
512    ///
513    /// # pub fn main() {
514    /// let rt = runtime::Builder::new_multi_thread()
515    ///     .thread_stack_size(32 * 1024)
516    ///     .build();
517    /// # }
518    /// ```
519    pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
520        self.thread_stack_size = Some(val);
521        self
522    }
523
524    /// Executes function `f` after each thread is started but before it starts
525    /// doing work.
526    ///
527    /// This is intended for bookkeeping and monitoring use cases.
528    ///
529    /// # Examples
530    ///
531    /// ```
532    /// # use tokio::runtime;
533    /// # pub fn main() {
534    /// let runtime = runtime::Builder::new_multi_thread()
535    ///     .on_thread_start(|| {
536    ///         println!("thread started");
537    ///     })
538    ///     .build();
539    /// # }
540    /// ```
541    #[cfg(not(loom))]
542    pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
543    where
544        F: Fn() + Send + Sync + 'static,
545    {
546        self.after_start = Some(std::sync::Arc::new(f));
547        self
548    }
549
550    /// Executes function `f` before each thread stops.
551    ///
552    /// This is intended for bookkeeping and monitoring use cases.
553    ///
554    /// # Examples
555    ///
556    /// ```
557    /// # use tokio::runtime;
558    /// # pub fn main() {
559    /// let runtime = runtime::Builder::new_multi_thread()
560    ///     .on_thread_stop(|| {
561    ///         println!("thread stopping");
562    ///     })
563    ///     .build();
564    /// # }
565    /// ```
566    #[cfg(not(loom))]
567    pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
568    where
569        F: Fn() + Send + Sync + 'static,
570    {
571        self.before_stop = Some(std::sync::Arc::new(f));
572        self
573    }
574
575    /// Executes function `f` just before a thread is parked (goes idle).
576    /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
577    /// can be called, and may result in this thread being unparked immediately.
578    ///
579    /// This can be used to start work only when the executor is idle, or for bookkeeping
580    /// and monitoring purposes.
581    ///
582    /// Note: There can only be one park callback for a runtime; calling this function
583    /// more than once replaces the last callback defined, rather than adding to it.
584    ///
585    /// # Examples
586    ///
587    /// ## Multithreaded executor
588    /// ```
589    /// # use std::sync::Arc;
590    /// # use std::sync::atomic::{AtomicBool, Ordering};
591    /// # use tokio::runtime;
592    /// # use tokio::sync::Barrier;
593    /// # pub fn main() {
594    /// let once = AtomicBool::new(true);
595    /// let barrier = Arc::new(Barrier::new(2));
596    ///
597    /// let runtime = runtime::Builder::new_multi_thread()
598    ///     .worker_threads(1)
599    ///     .on_thread_park({
600    ///         let barrier = barrier.clone();
601    ///         move || {
602    ///             let barrier = barrier.clone();
603    ///             if once.swap(false, Ordering::Relaxed) {
604    ///                 tokio::spawn(async move { barrier.wait().await; });
605    ///            }
606    ///         }
607    ///     })
608    ///     .build()
609    ///     .unwrap();
610    ///
611    /// runtime.block_on(async {
612    ///    barrier.wait().await;
613    /// })
614    /// # }
615    /// ```
616    /// ## Current thread executor
617    /// ```
618    /// # use std::sync::Arc;
619    /// # use std::sync::atomic::{AtomicBool, Ordering};
620    /// # use tokio::runtime;
621    /// # use tokio::sync::Barrier;
622    /// # pub fn main() {
623    /// let once = AtomicBool::new(true);
624    /// let barrier = Arc::new(Barrier::new(2));
625    ///
626    /// let runtime = runtime::Builder::new_current_thread()
627    ///     .on_thread_park({
628    ///         let barrier = barrier.clone();
629    ///         move || {
630    ///             let barrier = barrier.clone();
631    ///             if once.swap(false, Ordering::Relaxed) {
632    ///                 tokio::spawn(async move { barrier.wait().await; });
633    ///            }
634    ///         }
635    ///     })
636    ///     .build()
637    ///     .unwrap();
638    ///
639    /// runtime.block_on(async {
640    ///    barrier.wait().await;
641    /// })
642    /// # }
643    /// ```
644    #[cfg(not(loom))]
645    pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
646    where
647        F: Fn() + Send + Sync + 'static,
648    {
649        self.before_park = Some(std::sync::Arc::new(f));
650        self
651    }
652
653    /// Executes function `f` just after a thread unparks (starts executing tasks).
654    ///
655    /// This is intended for bookkeeping and monitoring use cases; note that work
656    /// in this callback will increase latencies when the application has allowed one or
657    /// more runtime threads to go idle.
658    ///
659    /// Note: There can only be one unpark callback for a runtime; calling this function
660    /// more than once replaces the last callback defined, rather than adding to it.
661    ///
662    /// # Examples
663    ///
664    /// ```
665    /// # use tokio::runtime;
666    /// # pub fn main() {
667    /// let runtime = runtime::Builder::new_multi_thread()
668    ///     .on_thread_unpark(|| {
669    ///         println!("thread unparking");
670    ///     })
671    ///     .build();
672    ///
673    /// runtime.unwrap().block_on(async {
674    ///    tokio::task::yield_now().await;
675    ///    println!("Hello from Tokio!");
676    /// })
677    /// # }
678    /// ```
679    #[cfg(not(loom))]
680    pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
681    where
682        F: Fn() + Send + Sync + 'static,
683    {
684        self.after_unpark = Some(std::sync::Arc::new(f));
685        self
686    }
687
688    /// Executes function `f` just before a task is spawned.
689    ///
690    /// `f` is called within the Tokio context, so functions like
691    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
692    /// invoked immediately.
693    ///
694    /// This can be used for bookkeeping or monitoring purposes.
695    ///
696    /// Note: There can only be one spawn callback for a runtime; calling this function more
697    /// than once replaces the last callback defined, rather than adding to it.
698    ///
699    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
700    ///
701    /// **Note**: This is an [unstable API][unstable]. The public API of this type
702    /// may break in 1.x releases. See [the documentation on unstable
703    /// features][unstable] for details.
704    ///
705    /// [unstable]: crate#unstable-features
706    ///
707    /// # Examples
708    ///
709    /// ```
710    /// # use tokio::runtime;
711    /// # pub fn main() {
712    /// let runtime = runtime::Builder::new_current_thread()
713    ///     .on_task_spawn(|_| {
714    ///         println!("spawning task");
715    ///     })
716    ///     .build()
717    ///     .unwrap();
718    ///
719    /// runtime.block_on(async {
720    ///     tokio::task::spawn(std::future::ready(()));
721    ///
722    ///     for _ in 0..64 {
723    ///         tokio::task::yield_now().await;
724    ///     }
725    /// })
726    /// # }
727    /// ```
728    #[cfg(all(not(loom), tokio_unstable))]
729    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
730    pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
731    where
732        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
733    {
734        self.before_spawn = Some(std::sync::Arc::new(f));
735        self
736    }
737
738    /// Executes function `f` just before a task is polled
739    ///
740    /// `f` is called within the Tokio context, so functions like
741    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
742    /// invoked immediately.
743    ///
744    /// **Note**: This is an [unstable API][unstable]. The public API of this type
745    /// may break in 1.x releases. See [the documentation on unstable
746    /// features][unstable] for details.
747    ///
748    /// [unstable]: crate#unstable-features
749    ///
750    /// # Examples
751    ///
752    /// ```
753    /// # use std::sync::{atomic::AtomicUsize, Arc};
754    /// # use tokio::task::yield_now;
755    /// # pub fn main() {
756    /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
757    /// let poll_start = poll_start_counter.clone();
758    /// let rt = tokio::runtime::Builder::new_multi_thread()
759    ///     .enable_all()
760    ///     .on_before_task_poll(move |meta| {
761    ///         println!("task {} is about to be polled", meta.id())
762    ///     })
763    ///     .build()
764    ///     .unwrap();
765    /// let task = rt.spawn(async {
766    ///     yield_now().await;
767    /// });
768    /// let _ = rt.block_on(task);
769    ///
770    /// # }
771    /// ```
772    #[cfg(tokio_unstable)]
773    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
774    pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
775    where
776        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
777    {
778        self.before_poll = Some(std::sync::Arc::new(f));
779        self
780    }
781
782    /// Executes function `f` just after a task is polled
783    ///
784    /// `f` is called within the Tokio context, so functions like
785    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
786    /// invoked immediately.
787    ///
788    /// **Note**: This is an [unstable API][unstable]. The public API of this type
789    /// may break in 1.x releases. See [the documentation on unstable
790    /// features][unstable] for details.
791    ///
792    /// [unstable]: crate#unstable-features
793    ///
794    /// # Examples
795    ///
796    /// ```
797    /// # use std::sync::{atomic::AtomicUsize, Arc};
798    /// # use tokio::task::yield_now;
799    /// # pub fn main() {
800    /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
801    /// let poll_stop = poll_stop_counter.clone();
802    /// let rt = tokio::runtime::Builder::new_multi_thread()
803    ///     .enable_all()
804    ///     .on_after_task_poll(move |meta| {
805    ///         println!("task {} completed polling", meta.id());
806    ///     })
807    ///     .build()
808    ///     .unwrap();
809    /// let task = rt.spawn(async {
810    ///     yield_now().await;
811    /// });
812    /// let _ = rt.block_on(task);
813    ///
814    /// # }
815    /// ```
816    #[cfg(tokio_unstable)]
817    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
818    pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
819    where
820        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
821    {
822        self.after_poll = Some(std::sync::Arc::new(f));
823        self
824    }
825
826    /// Executes function `f` just after a task is terminated.
827    ///
828    /// `f` is called within the Tokio context, so functions like
829    /// [`tokio::spawn`](crate::spawn) can be called.
830    ///
831    /// This can be used for bookkeeping or monitoring purposes.
832    ///
833    /// Note: There can only be one task termination callback for a runtime; calling this
834    /// function more than once replaces the last callback defined, rather than adding to it.
835    ///
836    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
837    ///
838    /// **Note**: This is an [unstable API][unstable]. The public API of this type
839    /// may break in 1.x releases. See [the documentation on unstable
840    /// features][unstable] for details.
841    ///
842    /// [unstable]: crate#unstable-features
843    ///
844    /// # Examples
845    ///
846    /// ```
847    /// # use tokio::runtime;
848    /// # pub fn main() {
849    /// let runtime = runtime::Builder::new_current_thread()
850    ///     .on_task_terminate(|_| {
851    ///         println!("killing task");
852    ///     })
853    ///     .build()
854    ///     .unwrap();
855    ///
856    /// runtime.block_on(async {
857    ///     tokio::task::spawn(std::future::ready(()));
858    ///
859    ///     for _ in 0..64 {
860    ///         tokio::task::yield_now().await;
861    ///     }
862    /// })
863    /// # }
864    /// ```
865    #[cfg(all(not(loom), tokio_unstable))]
866    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
867    pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
868    where
869        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
870    {
871        self.after_termination = Some(std::sync::Arc::new(f));
872        self
873    }
874
875    /// Creates the configured `Runtime`.
876    ///
877    /// The returned `Runtime` instance is ready to spawn tasks.
878    ///
879    /// # Examples
880    ///
881    /// ```
882    /// use tokio::runtime::Builder;
883    ///
884    /// let rt  = Builder::new_multi_thread().build().unwrap();
885    ///
886    /// rt.block_on(async {
887    ///     println!("Hello from the Tokio runtime");
888    /// });
889    /// ```
890    pub fn build(&mut self) -> io::Result<Runtime> {
891        match &self.kind {
892            Kind::CurrentThread => self.build_current_thread_runtime(),
893            #[cfg(feature = "rt-multi-thread")]
894            Kind::MultiThread => self.build_threaded_runtime(),
895        }
896    }
897
898    /// Creates the configured [`LocalRuntime`].
899    ///
900    /// The returned [`LocalRuntime`] instance is ready to spawn tasks.
901    ///
902    /// # Panics
903    ///
904    /// This will panic if the runtime is configured with [`new_multi_thread()`].
905    ///
906    /// [`new_multi_thread()`]: Builder::new_multi_thread
907    ///
908    /// # Examples
909    ///
910    /// ```
911    /// use tokio::runtime::{Builder, LocalOptions};
912    ///
913    /// let rt = Builder::new_current_thread()
914    ///     .build_local(LocalOptions::default())
915    ///     .unwrap();
916    ///
917    /// rt.spawn_local(async {
918    ///     println!("Hello from the Tokio runtime");
919    /// });
920    /// ```
921    #[allow(unused_variables, unreachable_patterns)]
922    #[cfg(tokio_unstable)]
923    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
924    pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
925        match &self.kind {
926            Kind::CurrentThread => self.build_current_thread_local_runtime(),
927            #[cfg(feature = "rt-multi-thread")]
928            Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
929        }
930    }
931
932    fn get_cfg(&self) -> driver::Cfg {
933        driver::Cfg {
934            enable_pause_time: match self.kind {
935                Kind::CurrentThread => true,
936                #[cfg(feature = "rt-multi-thread")]
937                Kind::MultiThread => false,
938            },
939            enable_io: self.enable_io,
940            enable_time: self.enable_time,
941            start_paused: self.start_paused,
942            nevents: self.nevents,
943        }
944    }
945
946    /// Sets a custom timeout for a thread in the blocking pool.
947    ///
948    /// By default, the timeout for a thread is set to 10 seconds. This can
949    /// be overridden using `.thread_keep_alive()`.
950    ///
951    /// # Example
952    ///
953    /// ```
954    /// # use tokio::runtime;
955    /// # use std::time::Duration;
956    /// # pub fn main() {
957    /// let rt = runtime::Builder::new_multi_thread()
958    ///     .thread_keep_alive(Duration::from_millis(100))
959    ///     .build();
960    /// # }
961    /// ```
962    pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
963        self.keep_alive = Some(duration);
964        self
965    }
966
967    /// Sets the number of scheduler ticks after which the scheduler will poll the global
968    /// task queue.
969    ///
970    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
971    ///
972    /// By default the global queue interval is 31 for the current-thread scheduler. Please see
973    /// [the module documentation] for the default behavior of the multi-thread scheduler.
974    ///
975    /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
976    /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
977    /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
978    /// getting started on new work, especially if tasks frequently yield rather than complete
979    /// or await on further I/O. Conversely, a higher value prioritizes existing work, and
980    /// is a good choice when most tasks quickly complete polling.
981    ///
982    /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
983    ///
984    /// # Panics
985    ///
986    /// This function will panic if 0 is passed as an argument.
987    ///
988    /// # Examples
989    ///
990    /// ```
991    /// # use tokio::runtime;
992    /// # pub fn main() {
993    /// let rt = runtime::Builder::new_multi_thread()
994    ///     .global_queue_interval(31)
995    ///     .build();
996    /// # }
997    /// ```
998    #[track_caller]
999    pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1000        assert!(val > 0, "global_queue_interval must be greater than 0");
1001        self.global_queue_interval = Some(val);
1002        self
1003    }
1004
1005    /// Sets the number of scheduler ticks after which the scheduler will poll for
1006    /// external events (timers, I/O, and so on).
1007    ///
1008    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1009    ///
1010    /// By default, the event interval is `61` for all scheduler types.
1011    ///
1012    /// Setting the event interval determines the effective "priority" of delivering
1013    /// these external events (which may wake up additional tasks), compared to
1014    /// executing tasks that are currently ready to run. A smaller value is useful
1015    /// when tasks frequently spend a long time in polling, or frequently yield,
1016    /// which can result in overly long delays picking up I/O events. Conversely,
1017    /// picking up new events requires extra synchronization and syscall overhead,
1018    /// so if tasks generally complete their polling quickly, a higher event interval
1019    /// will minimize that overhead while still keeping the scheduler responsive to
1020    /// events.
1021    ///
1022    /// # Examples
1023    ///
1024    /// ```
1025    /// # use tokio::runtime;
1026    /// # pub fn main() {
1027    /// let rt = runtime::Builder::new_multi_thread()
1028    ///     .event_interval(31)
1029    ///     .build();
1030    /// # }
1031    /// ```
1032    pub fn event_interval(&mut self, val: u32) -> &mut Self {
1033        self.event_interval = val;
1034        self
1035    }
1036
1037    cfg_unstable! {
1038        /// Configure how the runtime responds to an unhandled panic on a
1039        /// spawned task.
1040        ///
1041        /// By default, an unhandled panic (i.e. a panic not caught by
1042        /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1043        /// execution. The panic's error value is forwarded to the task's
1044        /// [`JoinHandle`] and all other spawned tasks continue running.
1045        ///
1046        /// The `unhandled_panic` option enables configuring this behavior.
1047        ///
1048        /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1049        ///   spawned tasks have no impact on the runtime's execution.
1050        /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1051        ///   shutdown immediately when a spawned task panics even if that
1052        ///   task's `JoinHandle` has not been dropped. All other spawned tasks
1053        ///   will immediately terminate and further calls to
1054        ///   [`Runtime::block_on`] will panic.
1055        ///
1056        /// # Panics
1057        /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1058        /// on a runtime other than the current thread runtime.
1059        ///
1060        /// # Unstable
1061        ///
1062        /// This option is currently unstable and its implementation is
1063        /// incomplete. The API may change or be removed in the future. See
1064        /// issue [tokio-rs/tokio#4516] for more details.
1065        ///
1066        /// # Examples
1067        ///
1068        /// The following demonstrates a runtime configured to shutdown on
1069        /// panic. The first spawned task panics and results in the runtime
1070        /// shutting down. The second spawned task never has a chance to
1071        /// execute. The call to `block_on` will panic due to the runtime being
1072        /// forcibly shutdown.
1073        ///
1074        /// ```should_panic
1075        /// use tokio::runtime::{self, UnhandledPanic};
1076        ///
1077        /// # pub fn main() {
1078        /// let rt = runtime::Builder::new_current_thread()
1079        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1080        ///     .build()
1081        ///     .unwrap();
1082        ///
1083        /// rt.spawn(async { panic!("boom"); });
1084        /// rt.spawn(async {
1085        ///     // This task never completes.
1086        /// });
1087        ///
1088        /// rt.block_on(async {
1089        ///     // Do some work
1090        /// # loop { tokio::task::yield_now().await; }
1091        /// })
1092        /// # }
1093        /// ```
1094        ///
1095        /// [`JoinHandle`]: struct@crate::task::JoinHandle
1096        /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1097        pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1098            if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1099                panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1100            }
1101
1102            self.unhandled_panic = behavior;
1103            self
1104        }
1105
1106        /// Disables the LIFO task scheduler heuristic.
1107        ///
1108        /// The multi-threaded scheduler includes a heuristic for optimizing
1109        /// message-passing patterns. This heuristic results in the **last**
1110        /// scheduled task being polled first.
1111        ///
1112        /// To implement this heuristic, each worker thread has a slot which
1113        /// holds the task that should be polled next. However, this slot cannot
1114        /// be stolen by other worker threads, which can result in lower total
1115        /// throughput when tasks tend to have longer poll times.
1116        ///
1117        /// This configuration option will disable this heuristic resulting in
1118        /// all scheduled tasks being pushed into the worker-local queue, which
1119        /// is stealable.
1120        ///
1121        /// Consider trying this option when the task "scheduled" time is high
1122        /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1123        /// collect this data.
1124        ///
1125        /// # Unstable
1126        ///
1127        /// This configuration option is considered a workaround for the LIFO
1128        /// slot not being stealable. When the slot becomes stealable, we will
1129        /// revisit whether or not this option is necessary. See
1130        /// issue [tokio-rs/tokio#4941].
1131        ///
1132        /// # Examples
1133        ///
1134        /// ```
1135        /// use tokio::runtime;
1136        ///
1137        /// let rt = runtime::Builder::new_multi_thread()
1138        ///     .disable_lifo_slot()
1139        ///     .build()
1140        ///     .unwrap();
1141        /// ```
1142        ///
1143        /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1144        /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1145        pub fn disable_lifo_slot(&mut self) -> &mut Self {
1146            self.disable_lifo_slot = true;
1147            self
1148        }
1149
1150        /// Specifies the random number generation seed to use within all
1151        /// threads associated with the runtime being built.
1152        ///
1153        /// This option is intended to make certain parts of the runtime
1154        /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1155        /// [`tokio::select!`] it will ensure that the order that branches are
1156        /// polled is deterministic.
1157        ///
1158        /// In addition to the code specifying `rng_seed` and interacting with
1159        /// the runtime, the internals of Tokio and the Rust compiler may affect
1160        /// the sequences of random numbers. In order to ensure repeatable
1161        /// results, the version of Tokio, the versions of all other
1162        /// dependencies that interact with Tokio, and the Rust compiler version
1163        /// should also all remain constant.
1164        ///
1165        /// # Examples
1166        ///
1167        /// ```
1168        /// # use tokio::runtime::{self, RngSeed};
1169        /// # pub fn main() {
1170        /// let seed = RngSeed::from_bytes(b"place your seed here");
1171        /// let rt = runtime::Builder::new_current_thread()
1172        ///     .rng_seed(seed)
1173        ///     .build();
1174        /// # }
1175        /// ```
1176        ///
1177        /// [`tokio::select!`]: crate::select
1178        pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1179            self.seed_generator = RngSeedGenerator::new(seed);
1180            self
1181        }
1182    }
1183
1184    cfg_unstable_metrics! {
1185        /// Enables tracking the distribution of task poll times.
1186        ///
1187        /// Task poll times are not instrumented by default as doing so requires
1188        /// calling [`Instant::now()`] twice per task poll, which could add
1189        /// measurable overhead. Use the [`Handle::metrics()`] to access the
1190        /// metrics data.
1191        ///
1192        /// The histogram uses fixed bucket sizes. In other words, the histogram
1193        /// buckets are not dynamic based on input values. Use the
1194        /// `metrics_poll_time_histogram` builder methods to configure the
1195        /// histogram details.
1196        ///
1197        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1198        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1199        /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1200        /// to select [`LogHistogram`] instead.
1201        ///
1202        /// # Examples
1203        ///
1204        /// ```
1205        /// use tokio::runtime;
1206        ///
1207        /// let rt = runtime::Builder::new_multi_thread()
1208        ///     .enable_metrics_poll_time_histogram()
1209        ///     .build()
1210        ///     .unwrap();
1211        /// # // Test default values here
1212        /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1213        /// # let m = rt.handle().metrics();
1214        /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1215        /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1216        /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1217        /// ```
1218        ///
1219        /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1220        /// [`Instant::now()`]: std::time::Instant::now
1221        /// [`LogHistogram`]: crate::runtime::LogHistogram
1222        /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1223        pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1224            self.metrics_poll_count_histogram_enable = true;
1225            self
1226        }
1227
1228        /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1229        ///
1230        /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1231        #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1232        #[doc(hidden)]
1233        pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1234            self.enable_metrics_poll_time_histogram()
1235        }
1236
1237        /// Sets the histogram scale for tracking the distribution of task poll
1238        /// times.
1239        ///
1240        /// Tracking the distribution of task poll times can be done using a
1241        /// linear or log scale. When using linear scale, each histogram bucket
1242        /// will represent the same range of poll times. When using log scale,
1243        /// each histogram bucket will cover a range twice as big as the
1244        /// previous bucket.
1245        ///
1246        /// **Default:** linear scale.
1247        ///
1248        /// # Examples
1249        ///
1250        /// ```
1251        /// use tokio::runtime::{self, HistogramScale};
1252        ///
1253        /// # #[allow(deprecated)]
1254        /// let rt = runtime::Builder::new_multi_thread()
1255        ///     .enable_metrics_poll_time_histogram()
1256        ///     .metrics_poll_count_histogram_scale(HistogramScale::Log)
1257        ///     .build()
1258        ///     .unwrap();
1259        /// ```
1260        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1261        pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1262            self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1263            self
1264        }
1265
1266        /// Configure the histogram for tracking poll times
1267        ///
1268        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1269        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1270        /// better granularity with low memory usage, use [`LogHistogram`] instead.
1271        ///
1272        /// # Examples
1273        /// Configure a [`LogHistogram`] with [default configuration]:
1274        /// ```
1275        /// use tokio::runtime;
1276        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1277        ///
1278        /// let rt = runtime::Builder::new_multi_thread()
1279        ///     .enable_metrics_poll_time_histogram()
1280        ///     .metrics_poll_time_histogram_configuration(
1281        ///         HistogramConfiguration::log(LogHistogram::default())
1282        ///     )
1283        ///     .build()
1284        ///     .unwrap();
1285        /// ```
1286        ///
1287        /// Configure a linear histogram with 100 buckets, each 10μs wide
1288        /// ```
1289        /// use tokio::runtime;
1290        /// use std::time::Duration;
1291        /// use tokio::runtime::HistogramConfiguration;
1292        ///
1293        /// let rt = runtime::Builder::new_multi_thread()
1294        ///     .enable_metrics_poll_time_histogram()
1295        ///     .metrics_poll_time_histogram_configuration(
1296        ///         HistogramConfiguration::linear(Duration::from_micros(10), 100)
1297        ///     )
1298        ///     .build()
1299        ///     .unwrap();
1300        /// ```
1301        ///
1302        /// Configure a [`LogHistogram`] with the following settings:
1303        /// - Measure times from 100ns to 120s
1304        /// - Max error of 0.1
1305        /// - No more than 1024 buckets
1306        /// ```
1307        /// use std::time::Duration;
1308        /// use tokio::runtime;
1309        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1310        ///
1311        /// let rt = runtime::Builder::new_multi_thread()
1312        ///     .enable_metrics_poll_time_histogram()
1313        ///     .metrics_poll_time_histogram_configuration(
1314        ///         HistogramConfiguration::log(LogHistogram::builder()
1315        ///             .max_value(Duration::from_secs(120))
1316        ///             .min_value(Duration::from_nanos(100))
1317        ///             .max_error(0.1)
1318        ///             .max_buckets(1024)
1319        ///             .expect("configuration uses 488 buckets")
1320        ///         )
1321        ///     )
1322        ///     .build()
1323        ///     .unwrap();
1324        /// ```
1325        ///
1326        /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1327        /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1328        /// where each bucket is twice the size of the previous bucket.
1329        /// ```rust
1330        /// use std::time::Duration;
1331        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1332        /// let rt = tokio::runtime::Builder::new_current_thread()
1333        ///     .enable_all()
1334        ///     .enable_metrics_poll_time_histogram()
1335        ///     .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1336        ///         LogHistogram::builder()
1337        ///             .min_value(Duration::from_micros(20))
1338        ///             .max_value(Duration::from_millis(4))
1339        ///             // Set `precision_exact` to `0` to match `HistogramScale::Log`
1340        ///             .precision_exact(0)
1341        ///             .max_buckets(10)
1342        ///             .unwrap(),
1343        ///     ))
1344        ///     .build()
1345        ///     .unwrap();
1346        /// ```
1347        ///
1348        /// [`LogHistogram`]: crate::runtime::LogHistogram
1349        /// [default configuration]: crate::runtime::LogHistogramBuilder
1350        /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1351        pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1352            self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1353            self
1354        }
1355
1356        /// Sets the histogram resolution for tracking the distribution of task
1357        /// poll times.
1358        ///
1359        /// The resolution is the histogram's first bucket's range. When using a
1360        /// linear histogram scale, each bucket will cover the same range. When
1361        /// using a log scale, each bucket will cover a range twice as big as
1362        /// the previous bucket. In the log case, the resolution represents the
1363        /// smallest bucket range.
1364        ///
1365        /// Note that, when using log scale, the resolution is rounded up to the
1366        /// nearest power of 2 in nanoseconds.
1367        ///
1368        /// **Default:** 100 microseconds.
1369        ///
1370        /// # Examples
1371        ///
1372        /// ```
1373        /// use tokio::runtime;
1374        /// use std::time::Duration;
1375        ///
1376        /// # #[allow(deprecated)]
1377        /// let rt = runtime::Builder::new_multi_thread()
1378        ///     .enable_metrics_poll_time_histogram()
1379        ///     .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1380        ///     .build()
1381        ///     .unwrap();
1382        /// ```
1383        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1384        pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1385            assert!(resolution > Duration::from_secs(0));
1386            // Sanity check the argument and also make the cast below safe.
1387            assert!(resolution <= Duration::from_secs(1));
1388
1389            let resolution = resolution.as_nanos() as u64;
1390
1391            self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1392            self
1393        }
1394
1395        /// Sets the number of buckets for the histogram tracking the
1396        /// distribution of task poll times.
1397        ///
1398        /// The last bucket tracks all greater values that fall out of other
1399        /// ranges. So, configuring the histogram using a linear scale,
1400        /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1401        /// polls that take more than 450ms to complete.
1402        ///
1403        /// **Default:** 10
1404        ///
1405        /// # Examples
1406        ///
1407        /// ```
1408        /// use tokio::runtime;
1409        ///
1410        /// # #[allow(deprecated)]
1411        /// let rt = runtime::Builder::new_multi_thread()
1412        ///     .enable_metrics_poll_time_histogram()
1413        ///     .metrics_poll_count_histogram_buckets(15)
1414        ///     .build()
1415        ///     .unwrap();
1416        /// ```
1417        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1418        pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1419            self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1420            self
1421        }
1422    }
1423
1424    fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1425        use crate::runtime::runtime::Scheduler;
1426
1427        let (scheduler, handle, blocking_pool) =
1428            self.build_current_thread_runtime_components(None)?;
1429
1430        Ok(Runtime::from_parts(
1431            Scheduler::CurrentThread(scheduler),
1432            handle,
1433            blocking_pool,
1434        ))
1435    }
1436
1437    #[cfg(tokio_unstable)]
1438    fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1439        use crate::runtime::local_runtime::LocalRuntimeScheduler;
1440
1441        let tid = std::thread::current().id();
1442
1443        let (scheduler, handle, blocking_pool) =
1444            self.build_current_thread_runtime_components(Some(tid))?;
1445
1446        Ok(LocalRuntime::from_parts(
1447            LocalRuntimeScheduler::CurrentThread(scheduler),
1448            handle,
1449            blocking_pool,
1450        ))
1451    }
1452
1453    fn build_current_thread_runtime_components(
1454        &mut self,
1455        local_tid: Option<ThreadId>,
1456    ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1457        use crate::runtime::scheduler;
1458        use crate::runtime::Config;
1459
1460        let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1461
1462        // Blocking pool
1463        let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1464        let blocking_spawner = blocking_pool.spawner().clone();
1465
1466        // Generate a rng seed for this runtime.
1467        let seed_generator_1 = self.seed_generator.next_generator();
1468        let seed_generator_2 = self.seed_generator.next_generator();
1469
1470        // And now put a single-threaded scheduler on top of the timer. When
1471        // there are no futures ready to do something, it'll let the timer or
1472        // the reactor to generate some new stimuli for the futures to continue
1473        // in their life.
1474        let (scheduler, handle) = CurrentThread::new(
1475            driver,
1476            driver_handle,
1477            blocking_spawner,
1478            seed_generator_2,
1479            Config {
1480                before_park: self.before_park.clone(),
1481                after_unpark: self.after_unpark.clone(),
1482                before_spawn: self.before_spawn.clone(),
1483                #[cfg(tokio_unstable)]
1484                before_poll: self.before_poll.clone(),
1485                #[cfg(tokio_unstable)]
1486                after_poll: self.after_poll.clone(),
1487                after_termination: self.after_termination.clone(),
1488                global_queue_interval: self.global_queue_interval,
1489                event_interval: self.event_interval,
1490                #[cfg(tokio_unstable)]
1491                unhandled_panic: self.unhandled_panic.clone(),
1492                disable_lifo_slot: self.disable_lifo_slot,
1493                seed_generator: seed_generator_1,
1494                metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1495            },
1496            local_tid,
1497        );
1498
1499        let handle = Handle {
1500            inner: scheduler::Handle::CurrentThread(handle),
1501        };
1502
1503        Ok((scheduler, handle, blocking_pool))
1504    }
1505
1506    fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1507        if self.metrics_poll_count_histogram_enable {
1508            Some(self.metrics_poll_count_histogram.clone())
1509        } else {
1510            None
1511        }
1512    }
1513}
1514
1515cfg_io_driver! {
1516    impl Builder {
1517        /// Enables the I/O driver.
1518        ///
1519        /// Doing this enables using net, process, signal, and some I/O types on
1520        /// the runtime.
1521        ///
1522        /// # Examples
1523        ///
1524        /// ```
1525        /// use tokio::runtime;
1526        ///
1527        /// let rt = runtime::Builder::new_multi_thread()
1528        ///     .enable_io()
1529        ///     .build()
1530        ///     .unwrap();
1531        /// ```
1532        pub fn enable_io(&mut self) -> &mut Self {
1533            self.enable_io = true;
1534            self
1535        }
1536
1537        /// Enables the I/O driver and configures the max number of events to be
1538        /// processed per tick.
1539        ///
1540        /// # Examples
1541        ///
1542        /// ```
1543        /// use tokio::runtime;
1544        ///
1545        /// let rt = runtime::Builder::new_current_thread()
1546        ///     .enable_io()
1547        ///     .max_io_events_per_tick(1024)
1548        ///     .build()
1549        ///     .unwrap();
1550        /// ```
1551        pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1552            self.nevents = capacity;
1553            self
1554        }
1555    }
1556}
1557
1558cfg_time! {
1559    impl Builder {
1560        /// Enables the time driver.
1561        ///
1562        /// Doing this enables using `tokio::time` on the runtime.
1563        ///
1564        /// # Examples
1565        ///
1566        /// ```
1567        /// use tokio::runtime;
1568        ///
1569        /// let rt = runtime::Builder::new_multi_thread()
1570        ///     .enable_time()
1571        ///     .build()
1572        ///     .unwrap();
1573        /// ```
1574        pub fn enable_time(&mut self) -> &mut Self {
1575            self.enable_time = true;
1576            self
1577        }
1578    }
1579}
1580
1581cfg_test_util! {
1582    impl Builder {
1583        /// Controls if the runtime's clock starts paused or advancing.
1584        ///
1585        /// Pausing time requires the current-thread runtime; construction of
1586        /// the runtime will panic otherwise.
1587        ///
1588        /// # Examples
1589        ///
1590        /// ```
1591        /// use tokio::runtime;
1592        ///
1593        /// let rt = runtime::Builder::new_current_thread()
1594        ///     .enable_time()
1595        ///     .start_paused(true)
1596        ///     .build()
1597        ///     .unwrap();
1598        /// ```
1599        pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1600            self.start_paused = start_paused;
1601            self
1602        }
1603    }
1604}
1605
1606cfg_rt_multi_thread! {
1607    impl Builder {
1608        fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1609            use crate::loom::sys::num_cpus;
1610            use crate::runtime::{Config, runtime::Scheduler};
1611            use crate::runtime::scheduler::{self, MultiThread};
1612
1613            let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1614
1615            let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1616
1617            // Create the blocking pool
1618            let blocking_pool =
1619                blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1620            let blocking_spawner = blocking_pool.spawner().clone();
1621
1622            // Generate a rng seed for this runtime.
1623            let seed_generator_1 = self.seed_generator.next_generator();
1624            let seed_generator_2 = self.seed_generator.next_generator();
1625
1626            let (scheduler, handle, launch) = MultiThread::new(
1627                worker_threads,
1628                driver,
1629                driver_handle,
1630                blocking_spawner,
1631                seed_generator_2,
1632                Config {
1633                    before_park: self.before_park.clone(),
1634                    after_unpark: self.after_unpark.clone(),
1635                    before_spawn: self.before_spawn.clone(),
1636                    #[cfg(tokio_unstable)]
1637                    before_poll: self.before_poll.clone(),
1638                    #[cfg(tokio_unstable)]
1639                    after_poll: self.after_poll.clone(),
1640                    after_termination: self.after_termination.clone(),
1641                    global_queue_interval: self.global_queue_interval,
1642                    event_interval: self.event_interval,
1643                    #[cfg(tokio_unstable)]
1644                    unhandled_panic: self.unhandled_panic.clone(),
1645                    disable_lifo_slot: self.disable_lifo_slot,
1646                    seed_generator: seed_generator_1,
1647                    metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1648                },
1649            );
1650
1651            let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1652
1653            // Spawn the thread pool workers
1654            let _enter = handle.enter();
1655            launch.launch();
1656
1657            Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1658        }
1659    }
1660}
1661
1662impl fmt::Debug for Builder {
1663    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1664        fmt.debug_struct("Builder")
1665            .field("worker_threads", &self.worker_threads)
1666            .field("max_blocking_threads", &self.max_blocking_threads)
1667            .field(
1668                "thread_name",
1669                &"<dyn Fn() -> String + Send + Sync + 'static>",
1670            )
1671            .field("thread_stack_size", &self.thread_stack_size)
1672            .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1673            .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1674            .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1675            .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1676            .finish()
1677    }
1678}