tokio\runtime/
builder.rs

1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
5#[cfg(tokio_unstable)]
6use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
7use crate::util::rand::{RngSeed, RngSeedGenerator};
8
9use crate::runtime::blocking::BlockingPool;
10use crate::runtime::scheduler::CurrentThread;
11use std::fmt;
12use std::io;
13use std::thread::ThreadId;
14use std::time::Duration;
15
16/// Builds Tokio Runtime with custom configuration values.
17///
18/// Methods can be chained in order to set the configuration values. The
19/// Runtime is constructed by calling [`build`].
20///
21/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
22/// or [`Builder::new_current_thread`].
23///
24/// See function level documentation for details on the various configuration
25/// settings.
26///
27/// [`build`]: method@Self::build
28/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
29/// [`Builder::new_current_thread`]: method@Self::new_current_thread
30///
31/// # Examples
32///
33/// ```
34/// # #[cfg(not(target_family = "wasm"))]
35/// # {
36/// use tokio::runtime::Builder;
37///
38/// fn main() {
39///     // build runtime
40///     let runtime = Builder::new_multi_thread()
41///         .worker_threads(4)
42///         .thread_name("my-custom-name")
43///         .thread_stack_size(3 * 1024 * 1024)
44///         .build()
45///         .unwrap();
46///
47///     // use runtime ...
48/// }
49/// # }
50/// ```
51pub struct Builder {
52    /// Runtime type
53    kind: Kind,
54
55    /// Whether or not to enable the I/O driver
56    enable_io: bool,
57    nevents: usize,
58
59    /// Whether or not to enable the time driver
60    enable_time: bool,
61
62    /// Whether or not the clock should start paused.
63    start_paused: bool,
64
65    /// The number of worker threads, used by Runtime.
66    ///
67    /// Only used when not using the current-thread executor.
68    worker_threads: Option<usize>,
69
70    /// Cap on thread usage.
71    max_blocking_threads: usize,
72
73    /// Name fn used for threads spawned by the runtime.
74    pub(super) thread_name: ThreadNameFn,
75
76    /// Stack size used for threads spawned by the runtime.
77    pub(super) thread_stack_size: Option<usize>,
78
79    /// Callback to run after each thread starts.
80    pub(super) after_start: Option<Callback>,
81
82    /// To run before each worker thread stops
83    pub(super) before_stop: Option<Callback>,
84
85    /// To run before each worker thread is parked.
86    pub(super) before_park: Option<Callback>,
87
88    /// To run after each thread is unparked.
89    pub(super) after_unpark: Option<Callback>,
90
91    /// To run before each task is spawned.
92    pub(super) before_spawn: Option<TaskCallback>,
93
94    /// To run before each poll
95    #[cfg(tokio_unstable)]
96    pub(super) before_poll: Option<TaskCallback>,
97
98    /// To run after each poll
99    #[cfg(tokio_unstable)]
100    pub(super) after_poll: Option<TaskCallback>,
101
102    /// To run after each task is terminated.
103    pub(super) after_termination: Option<TaskCallback>,
104
105    /// Customizable keep alive timeout for `BlockingPool`
106    pub(super) keep_alive: Option<Duration>,
107
108    /// How many ticks before pulling a task from the global/remote queue?
109    ///
110    /// When `None`, the value is unspecified and behavior details are left to
111    /// the scheduler. Each scheduler flavor could choose to either pick its own
112    /// default value or use some other strategy to decide when to poll from the
113    /// global queue. For example, the multi-threaded scheduler uses a
114    /// self-tuning strategy based on mean task poll times.
115    pub(super) global_queue_interval: Option<u32>,
116
117    /// How many ticks before yielding to the driver for timer and I/O events?
118    pub(super) event_interval: u32,
119
120    /// When true, the multi-threade scheduler LIFO slot should not be used.
121    ///
122    /// This option should only be exposed as unstable.
123    pub(super) disable_lifo_slot: bool,
124
125    /// Specify a random number generator seed to provide deterministic results
126    pub(super) seed_generator: RngSeedGenerator,
127
128    /// When true, enables task poll count histogram instrumentation.
129    pub(super) metrics_poll_count_histogram_enable: bool,
130
131    /// Configures the task poll count histogram
132    pub(super) metrics_poll_count_histogram: HistogramBuilder,
133
134    #[cfg(tokio_unstable)]
135    pub(super) unhandled_panic: UnhandledPanic,
136}
137
138cfg_unstable! {
139    /// How the runtime should respond to unhandled panics.
140    ///
141    /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
142    /// to configure the runtime behavior when a spawned task panics.
143    ///
144    /// See [`Builder::unhandled_panic`] for more details.
145    #[derive(Debug, Clone)]
146    #[non_exhaustive]
147    pub enum UnhandledPanic {
148        /// The runtime should ignore panics on spawned tasks.
149        ///
150        /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
151        /// tasks continue running normally.
152        ///
153        /// This is the default behavior.
154        ///
155        /// # Examples
156        ///
157        /// ```
158        /// # #[cfg(not(target_family = "wasm"))]
159        /// # {
160        /// use tokio::runtime::{self, UnhandledPanic};
161        ///
162        /// # pub fn main() {
163        /// let rt = runtime::Builder::new_current_thread()
164        ///     .unhandled_panic(UnhandledPanic::Ignore)
165        ///     .build()
166        ///     .unwrap();
167        ///
168        /// let task1 = rt.spawn(async { panic!("boom"); });
169        /// let task2 = rt.spawn(async {
170        ///     // This task completes normally
171        ///     "done"
172        /// });
173        ///
174        /// rt.block_on(async {
175        ///     // The panic on the first task is forwarded to the `JoinHandle`
176        ///     assert!(task1.await.is_err());
177        ///
178        ///     // The second task completes normally
179        ///     assert!(task2.await.is_ok());
180        /// })
181        /// # }
182        /// # }
183        /// ```
184        ///
185        /// [`JoinHandle`]: struct@crate::task::JoinHandle
186        Ignore,
187
188        /// The runtime should immediately shutdown if a spawned task panics.
189        ///
190        /// The runtime will immediately shutdown even if the panicked task's
191        /// [`JoinHandle`] is still available. All further spawned tasks will be
192        /// immediately dropped and call to [`Runtime::block_on`] will panic.
193        ///
194        /// # Examples
195        ///
196        /// ```should_panic
197        /// use tokio::runtime::{self, UnhandledPanic};
198        ///
199        /// # pub fn main() {
200        /// let rt = runtime::Builder::new_current_thread()
201        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
202        ///     .build()
203        ///     .unwrap();
204        ///
205        /// rt.spawn(async { panic!("boom"); });
206        /// rt.spawn(async {
207        ///     // This task never completes.
208        /// });
209        ///
210        /// rt.block_on(async {
211        ///     // Do some work
212        /// # loop { tokio::task::yield_now().await; }
213        /// })
214        /// # }
215        /// ```
216        ///
217        /// [`JoinHandle`]: struct@crate::task::JoinHandle
218        ShutdownRuntime,
219    }
220}
221
222pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
223
224#[derive(Clone, Copy)]
225pub(crate) enum Kind {
226    CurrentThread,
227    #[cfg(feature = "rt-multi-thread")]
228    MultiThread,
229}
230
231impl Builder {
232    /// Returns a new builder with the current thread scheduler selected.
233    ///
234    /// Configuration methods can be chained on the return value.
235    ///
236    /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
237    /// [`LocalSet`].
238    ///
239    /// [`LocalSet`]: crate::task::LocalSet
240    pub fn new_current_thread() -> Builder {
241        #[cfg(loom)]
242        const EVENT_INTERVAL: u32 = 4;
243        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
244        #[cfg(not(loom))]
245        const EVENT_INTERVAL: u32 = 61;
246
247        Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
248    }
249
250    /// Returns a new builder with the multi thread scheduler selected.
251    ///
252    /// Configuration methods can be chained on the return value.
253    #[cfg(feature = "rt-multi-thread")]
254    #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
255    pub fn new_multi_thread() -> Builder {
256        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
257        Builder::new(Kind::MultiThread, 61)
258    }
259
260    /// Returns a new runtime builder initialized with default configuration
261    /// values.
262    ///
263    /// Configuration methods can be chained on the return value.
264    pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
265        Builder {
266            kind,
267
268            // I/O defaults to "off"
269            enable_io: false,
270            nevents: 1024,
271
272            // Time defaults to "off"
273            enable_time: false,
274
275            // The clock starts not-paused
276            start_paused: false,
277
278            // Read from environment variable first in multi-threaded mode.
279            // Default to lazy auto-detection (one thread per CPU core)
280            worker_threads: None,
281
282            max_blocking_threads: 512,
283
284            // Default thread name
285            thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
286
287            // Do not set a stack size by default
288            thread_stack_size: None,
289
290            // No worker thread callbacks
291            after_start: None,
292            before_stop: None,
293            before_park: None,
294            after_unpark: None,
295
296            before_spawn: None,
297            after_termination: None,
298
299            #[cfg(tokio_unstable)]
300            before_poll: None,
301            #[cfg(tokio_unstable)]
302            after_poll: None,
303
304            keep_alive: None,
305
306            // Defaults for these values depend on the scheduler kind, so we get them
307            // as parameters.
308            global_queue_interval: None,
309            event_interval,
310
311            seed_generator: RngSeedGenerator::new(RngSeed::new()),
312
313            #[cfg(tokio_unstable)]
314            unhandled_panic: UnhandledPanic::Ignore,
315
316            metrics_poll_count_histogram_enable: false,
317
318            metrics_poll_count_histogram: HistogramBuilder::default(),
319
320            disable_lifo_slot: false,
321        }
322    }
323
324    /// Enables both I/O and time drivers.
325    ///
326    /// Doing this is a shorthand for calling `enable_io` and `enable_time`
327    /// individually. If additional components are added to Tokio in the future,
328    /// `enable_all` will include these future components.
329    ///
330    /// # Examples
331    ///
332    /// ```
333    /// # #[cfg(not(target_family = "wasm"))]
334    /// # {
335    /// use tokio::runtime;
336    ///
337    /// let rt = runtime::Builder::new_multi_thread()
338    ///     .enable_all()
339    ///     .build()
340    ///     .unwrap();
341    /// # }
342    /// ```
343    pub fn enable_all(&mut self) -> &mut Self {
344        #[cfg(any(
345            feature = "net",
346            all(unix, feature = "process"),
347            all(unix, feature = "signal")
348        ))]
349        self.enable_io();
350
351        #[cfg(all(
352            tokio_unstable,
353            feature = "io-uring",
354            feature = "rt",
355            feature = "fs",
356            target_os = "linux",
357        ))]
358        self.enable_io_uring();
359
360        #[cfg(feature = "time")]
361        self.enable_time();
362
363        self
364    }
365
366    /// Sets the number of worker threads the `Runtime` will use.
367    ///
368    /// This can be any number above 0 though it is advised to keep this value
369    /// on the smaller side.
370    ///
371    /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
372    ///
373    /// # Default
374    ///
375    /// The default value is the number of cores available to the system.
376    ///
377    /// When using the `current_thread` runtime this method has no effect.
378    ///
379    /// # Examples
380    ///
381    /// ## Multi threaded runtime with 4 threads
382    ///
383    /// ```
384    /// # #[cfg(not(target_family = "wasm"))]
385    /// # {
386    /// use tokio::runtime;
387    ///
388    /// // This will spawn a work-stealing runtime with 4 worker threads.
389    /// let rt = runtime::Builder::new_multi_thread()
390    ///     .worker_threads(4)
391    ///     .build()
392    ///     .unwrap();
393    ///
394    /// rt.spawn(async move {});
395    /// # }
396    /// ```
397    ///
398    /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
399    ///
400    /// ```
401    /// use tokio::runtime;
402    ///
403    /// // Create a runtime that _must_ be driven from a call
404    /// // to `Runtime::block_on`.
405    /// let rt = runtime::Builder::new_current_thread()
406    ///     .build()
407    ///     .unwrap();
408    ///
409    /// // This will run the runtime and future on the current thread
410    /// rt.block_on(async move {});
411    /// ```
412    ///
413    /// # Panics
414    ///
415    /// This will panic if `val` is not larger than `0`.
416    #[track_caller]
417    pub fn worker_threads(&mut self, val: usize) -> &mut Self {
418        assert!(val > 0, "Worker threads cannot be set to 0");
419        self.worker_threads = Some(val);
420        self
421    }
422
423    /// Specifies the limit for additional threads spawned by the Runtime.
424    ///
425    /// These threads are used for blocking operations like tasks spawned
426    /// through [`spawn_blocking`], this includes but is not limited to:
427    /// - [`fs`] operations
428    /// - dns resolution through [`ToSocketAddrs`]
429    /// - writing to [`Stdout`] or [`Stderr`]
430    /// - reading from [`Stdin`]
431    ///
432    /// Unlike the [`worker_threads`], they are not always active and will exit
433    /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
434    ///
435    /// It's recommended to not set this limit too low in order to avoid hanging on operations
436    /// requiring [`spawn_blocking`].
437    ///
438    /// The default value is 512.
439    ///
440    /// # Queue Behavior
441    ///
442    /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
443    /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
444    /// method has not been reached, a new thread will be spawned. If no idle thread is available
445    /// and no more threads are allowed to be spawned, the task will remain in the queue until one
446    /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
447    /// it could potentially grow unbounded.
448    ///
449    /// # Panics
450    ///
451    /// This will panic if `val` is not larger than `0`.
452    ///
453    /// # Upgrading from 0.x
454    ///
455    /// In old versions `max_threads` limited both blocking and worker threads, but the
456    /// current `max_blocking_threads` does not include async worker threads in the count.
457    ///
458    /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
459    /// [`fs`]: mod@crate::fs
460    /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
461    /// [`Stdout`]: struct@crate::io::Stdout
462    /// [`Stdin`]: struct@crate::io::Stdin
463    /// [`Stderr`]: struct@crate::io::Stderr
464    /// [`worker_threads`]: Self::worker_threads
465    /// [`thread_keep_alive`]: Self::thread_keep_alive
466    #[track_caller]
467    #[cfg_attr(docsrs, doc(alias = "max_threads"))]
468    pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
469        assert!(val > 0, "Max blocking threads cannot be set to 0");
470        self.max_blocking_threads = val;
471        self
472    }
473
474    /// Sets name of threads spawned by the `Runtime`'s thread pool.
475    ///
476    /// The default name is "tokio-runtime-worker".
477    ///
478    /// # Examples
479    ///
480    /// ```
481    /// # #[cfg(not(target_family = "wasm"))]
482    /// # {
483    /// # use tokio::runtime;
484    ///
485    /// # pub fn main() {
486    /// let rt = runtime::Builder::new_multi_thread()
487    ///     .thread_name("my-pool")
488    ///     .build();
489    /// # }
490    /// # }
491    /// ```
492    pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
493        let val = val.into();
494        self.thread_name = std::sync::Arc::new(move || val.clone());
495        self
496    }
497
498    /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
499    ///
500    /// The default name fn is `|| "tokio-runtime-worker".into()`.
501    ///
502    /// # Examples
503    ///
504    /// ```
505    /// # #[cfg(not(target_family = "wasm"))]
506    /// # {
507    /// # use tokio::runtime;
508    /// # use std::sync::atomic::{AtomicUsize, Ordering};
509    /// # pub fn main() {
510    /// let rt = runtime::Builder::new_multi_thread()
511    ///     .thread_name_fn(|| {
512    ///        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
513    ///        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
514    ///        format!("my-pool-{}", id)
515    ///     })
516    ///     .build();
517    /// # }
518    /// # }
519    /// ```
520    pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
521    where
522        F: Fn() -> String + Send + Sync + 'static,
523    {
524        self.thread_name = std::sync::Arc::new(f);
525        self
526    }
527
528    /// Sets the stack size (in bytes) for worker threads.
529    ///
530    /// The actual stack size may be greater than this value if the platform
531    /// specifies minimal stack size.
532    ///
533    /// The default stack size for spawned threads is 2 MiB, though this
534    /// particular stack size is subject to change in the future.
535    ///
536    /// # Examples
537    ///
538    /// ```
539    /// # #[cfg(not(target_family = "wasm"))]
540    /// # {
541    /// # use tokio::runtime;
542    ///
543    /// # pub fn main() {
544    /// let rt = runtime::Builder::new_multi_thread()
545    ///     .thread_stack_size(32 * 1024)
546    ///     .build();
547    /// # }
548    /// # }
549    /// ```
550    pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
551        self.thread_stack_size = Some(val);
552        self
553    }
554
555    /// Executes function `f` after each thread is started but before it starts
556    /// doing work.
557    ///
558    /// This is intended for bookkeeping and monitoring use cases.
559    ///
560    /// # Examples
561    ///
562    /// ```
563    /// # #[cfg(not(target_family = "wasm"))]
564    /// # {
565    /// # use tokio::runtime;
566    /// # pub fn main() {
567    /// let runtime = runtime::Builder::new_multi_thread()
568    ///     .on_thread_start(|| {
569    ///         println!("thread started");
570    ///     })
571    ///     .build();
572    /// # }
573    /// # }
574    /// ```
575    #[cfg(not(loom))]
576    pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
577    where
578        F: Fn() + Send + Sync + 'static,
579    {
580        self.after_start = Some(std::sync::Arc::new(f));
581        self
582    }
583
584    /// Executes function `f` before each thread stops.
585    ///
586    /// This is intended for bookkeeping and monitoring use cases.
587    ///
588    /// # Examples
589    ///
590    /// ```
591    /// # #[cfg(not(target_family = "wasm"))]
592    /// {
593    /// # use tokio::runtime;
594    /// # pub fn main() {
595    /// let runtime = runtime::Builder::new_multi_thread()
596    ///     .on_thread_stop(|| {
597    ///         println!("thread stopping");
598    ///     })
599    ///     .build();
600    /// # }
601    /// # }
602    /// ```
603    #[cfg(not(loom))]
604    pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
605    where
606        F: Fn() + Send + Sync + 'static,
607    {
608        self.before_stop = Some(std::sync::Arc::new(f));
609        self
610    }
611
612    /// Executes function `f` just before a thread is parked (goes idle).
613    /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
614    /// can be called, and may result in this thread being unparked immediately.
615    ///
616    /// This can be used to start work only when the executor is idle, or for bookkeeping
617    /// and monitoring purposes.
618    ///
619    /// Note: There can only be one park callback for a runtime; calling this function
620    /// more than once replaces the last callback defined, rather than adding to it.
621    ///
622    /// # Examples
623    ///
624    /// ## Multithreaded executor
625    /// ```
626    /// # #[cfg(not(target_family = "wasm"))]
627    /// # {
628    /// # use std::sync::Arc;
629    /// # use std::sync::atomic::{AtomicBool, Ordering};
630    /// # use tokio::runtime;
631    /// # use tokio::sync::Barrier;
632    /// # pub fn main() {
633    /// let once = AtomicBool::new(true);
634    /// let barrier = Arc::new(Barrier::new(2));
635    ///
636    /// let runtime = runtime::Builder::new_multi_thread()
637    ///     .worker_threads(1)
638    ///     .on_thread_park({
639    ///         let barrier = barrier.clone();
640    ///         move || {
641    ///             let barrier = barrier.clone();
642    ///             if once.swap(false, Ordering::Relaxed) {
643    ///                 tokio::spawn(async move { barrier.wait().await; });
644    ///            }
645    ///         }
646    ///     })
647    ///     .build()
648    ///     .unwrap();
649    ///
650    /// runtime.block_on(async {
651    ///    barrier.wait().await;
652    /// })
653    /// # }
654    /// # }
655    /// ```
656    /// ## Current thread executor
657    /// ```
658    /// # use std::sync::Arc;
659    /// # use std::sync::atomic::{AtomicBool, Ordering};
660    /// # use tokio::runtime;
661    /// # use tokio::sync::Barrier;
662    /// # pub fn main() {
663    /// let once = AtomicBool::new(true);
664    /// let barrier = Arc::new(Barrier::new(2));
665    ///
666    /// let runtime = runtime::Builder::new_current_thread()
667    ///     .on_thread_park({
668    ///         let barrier = barrier.clone();
669    ///         move || {
670    ///             let barrier = barrier.clone();
671    ///             if once.swap(false, Ordering::Relaxed) {
672    ///                 tokio::spawn(async move { barrier.wait().await; });
673    ///            }
674    ///         }
675    ///     })
676    ///     .build()
677    ///     .unwrap();
678    ///
679    /// runtime.block_on(async {
680    ///    barrier.wait().await;
681    /// })
682    /// # }
683    /// ```
684    #[cfg(not(loom))]
685    pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
686    where
687        F: Fn() + Send + Sync + 'static,
688    {
689        self.before_park = Some(std::sync::Arc::new(f));
690        self
691    }
692
693    /// Executes function `f` just after a thread unparks (starts executing tasks).
694    ///
695    /// This is intended for bookkeeping and monitoring use cases; note that work
696    /// in this callback will increase latencies when the application has allowed one or
697    /// more runtime threads to go idle.
698    ///
699    /// Note: There can only be one unpark callback for a runtime; calling this function
700    /// more than once replaces the last callback defined, rather than adding to it.
701    ///
702    /// # Examples
703    ///
704    /// ```
705    /// # #[cfg(not(target_family = "wasm"))]
706    /// # {
707    /// # use tokio::runtime;
708    /// # pub fn main() {
709    /// let runtime = runtime::Builder::new_multi_thread()
710    ///     .on_thread_unpark(|| {
711    ///         println!("thread unparking");
712    ///     })
713    ///     .build();
714    ///
715    /// runtime.unwrap().block_on(async {
716    ///    tokio::task::yield_now().await;
717    ///    println!("Hello from Tokio!");
718    /// })
719    /// # }
720    /// # }
721    /// ```
722    #[cfg(not(loom))]
723    pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
724    where
725        F: Fn() + Send + Sync + 'static,
726    {
727        self.after_unpark = Some(std::sync::Arc::new(f));
728        self
729    }
730
731    /// Executes function `f` just before a task is spawned.
732    ///
733    /// `f` is called within the Tokio context, so functions like
734    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
735    /// invoked immediately.
736    ///
737    /// This can be used for bookkeeping or monitoring purposes.
738    ///
739    /// Note: There can only be one spawn callback for a runtime; calling this function more
740    /// than once replaces the last callback defined, rather than adding to it.
741    ///
742    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
743    ///
744    /// **Note**: This is an [unstable API][unstable]. The public API of this type
745    /// may break in 1.x releases. See [the documentation on unstable
746    /// features][unstable] for details.
747    ///
748    /// [unstable]: crate#unstable-features
749    ///
750    /// # Examples
751    ///
752    /// ```
753    /// # use tokio::runtime;
754    /// # pub fn main() {
755    /// let runtime = runtime::Builder::new_current_thread()
756    ///     .on_task_spawn(|_| {
757    ///         println!("spawning task");
758    ///     })
759    ///     .build()
760    ///     .unwrap();
761    ///
762    /// runtime.block_on(async {
763    ///     tokio::task::spawn(std::future::ready(()));
764    ///
765    ///     for _ in 0..64 {
766    ///         tokio::task::yield_now().await;
767    ///     }
768    /// })
769    /// # }
770    /// ```
771    #[cfg(all(not(loom), tokio_unstable))]
772    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
773    pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
774    where
775        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
776    {
777        self.before_spawn = Some(std::sync::Arc::new(f));
778        self
779    }
780
781    /// Executes function `f` just before a task is polled
782    ///
783    /// `f` is called within the Tokio context, so functions like
784    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
785    /// invoked immediately.
786    ///
787    /// **Note**: This is an [unstable API][unstable]. The public API of this type
788    /// may break in 1.x releases. See [the documentation on unstable
789    /// features][unstable] for details.
790    ///
791    /// [unstable]: crate#unstable-features
792    ///
793    /// # Examples
794    ///
795    /// ```
796    /// # #[cfg(not(target_family = "wasm"))]
797    /// # {
798    /// # use std::sync::{atomic::AtomicUsize, Arc};
799    /// # use tokio::task::yield_now;
800    /// # pub fn main() {
801    /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
802    /// let poll_start = poll_start_counter.clone();
803    /// let rt = tokio::runtime::Builder::new_multi_thread()
804    ///     .enable_all()
805    ///     .on_before_task_poll(move |meta| {
806    ///         println!("task {} is about to be polled", meta.id())
807    ///     })
808    ///     .build()
809    ///     .unwrap();
810    /// let task = rt.spawn(async {
811    ///     yield_now().await;
812    /// });
813    /// let _ = rt.block_on(task);
814    ///
815    /// # }
816    /// # }
817    /// ```
818    #[cfg(tokio_unstable)]
819    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
820    pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
821    where
822        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
823    {
824        self.before_poll = Some(std::sync::Arc::new(f));
825        self
826    }
827
828    /// Executes function `f` just after a task is polled
829    ///
830    /// `f` is called within the Tokio context, so functions like
831    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
832    /// invoked immediately.
833    ///
834    /// **Note**: This is an [unstable API][unstable]. The public API of this type
835    /// may break in 1.x releases. See [the documentation on unstable
836    /// features][unstable] for details.
837    ///
838    /// [unstable]: crate#unstable-features
839    ///
840    /// # Examples
841    ///
842    /// ```
843    /// # #[cfg(not(target_family = "wasm"))]
844    /// # {
845    /// # use std::sync::{atomic::AtomicUsize, Arc};
846    /// # use tokio::task::yield_now;
847    /// # pub fn main() {
848    /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
849    /// let poll_stop = poll_stop_counter.clone();
850    /// let rt = tokio::runtime::Builder::new_multi_thread()
851    ///     .enable_all()
852    ///     .on_after_task_poll(move |meta| {
853    ///         println!("task {} completed polling", meta.id());
854    ///     })
855    ///     .build()
856    ///     .unwrap();
857    /// let task = rt.spawn(async {
858    ///     yield_now().await;
859    /// });
860    /// let _ = rt.block_on(task);
861    ///
862    /// # }
863    /// # }
864    /// ```
865    #[cfg(tokio_unstable)]
866    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
867    pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
868    where
869        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
870    {
871        self.after_poll = Some(std::sync::Arc::new(f));
872        self
873    }
874
875    /// Executes function `f` just after a task is terminated.
876    ///
877    /// `f` is called within the Tokio context, so functions like
878    /// [`tokio::spawn`](crate::spawn) can be called.
879    ///
880    /// This can be used for bookkeeping or monitoring purposes.
881    ///
882    /// Note: There can only be one task termination callback for a runtime; calling this
883    /// function more than once replaces the last callback defined, rather than adding to it.
884    ///
885    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
886    ///
887    /// **Note**: This is an [unstable API][unstable]. The public API of this type
888    /// may break in 1.x releases. See [the documentation on unstable
889    /// features][unstable] for details.
890    ///
891    /// [unstable]: crate#unstable-features
892    ///
893    /// # Examples
894    ///
895    /// ```
896    /// # use tokio::runtime;
897    /// # pub fn main() {
898    /// let runtime = runtime::Builder::new_current_thread()
899    ///     .on_task_terminate(|_| {
900    ///         println!("killing task");
901    ///     })
902    ///     .build()
903    ///     .unwrap();
904    ///
905    /// runtime.block_on(async {
906    ///     tokio::task::spawn(std::future::ready(()));
907    ///
908    ///     for _ in 0..64 {
909    ///         tokio::task::yield_now().await;
910    ///     }
911    /// })
912    /// # }
913    /// ```
914    #[cfg(all(not(loom), tokio_unstable))]
915    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
916    pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
917    where
918        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
919    {
920        self.after_termination = Some(std::sync::Arc::new(f));
921        self
922    }
923
924    /// Creates the configured `Runtime`.
925    ///
926    /// The returned `Runtime` instance is ready to spawn tasks.
927    ///
928    /// # Examples
929    ///
930    /// ```
931    /// # #[cfg(not(target_family = "wasm"))]
932    /// # {
933    /// use tokio::runtime::Builder;
934    ///
935    /// let rt  = Builder::new_multi_thread().build().unwrap();
936    ///
937    /// rt.block_on(async {
938    ///     println!("Hello from the Tokio runtime");
939    /// });
940    /// # }
941    /// ```
942    pub fn build(&mut self) -> io::Result<Runtime> {
943        match &self.kind {
944            Kind::CurrentThread => self.build_current_thread_runtime(),
945            #[cfg(feature = "rt-multi-thread")]
946            Kind::MultiThread => self.build_threaded_runtime(),
947        }
948    }
949
950    /// Creates the configured [`LocalRuntime`].
951    ///
952    /// The returned [`LocalRuntime`] instance is ready to spawn tasks.
953    ///
954    /// # Panics
955    ///
956    /// This will panic if the runtime is configured with [`new_multi_thread()`].
957    ///
958    /// [`new_multi_thread()`]: Builder::new_multi_thread
959    ///
960    /// # Examples
961    ///
962    /// ```
963    /// use tokio::runtime::{Builder, LocalOptions};
964    ///
965    /// let rt = Builder::new_current_thread()
966    ///     .build_local(LocalOptions::default())
967    ///     .unwrap();
968    ///
969    /// rt.spawn_local(async {
970    ///     println!("Hello from the Tokio runtime");
971    /// });
972    /// ```
973    #[allow(unused_variables, unreachable_patterns)]
974    #[cfg(tokio_unstable)]
975    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
976    pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
977        match &self.kind {
978            Kind::CurrentThread => self.build_current_thread_local_runtime(),
979            #[cfg(feature = "rt-multi-thread")]
980            Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
981        }
982    }
983
984    fn get_cfg(&self) -> driver::Cfg {
985        driver::Cfg {
986            enable_pause_time: match self.kind {
987                Kind::CurrentThread => true,
988                #[cfg(feature = "rt-multi-thread")]
989                Kind::MultiThread => false,
990            },
991            enable_io: self.enable_io,
992            enable_time: self.enable_time,
993            start_paused: self.start_paused,
994            nevents: self.nevents,
995        }
996    }
997
998    /// Sets a custom timeout for a thread in the blocking pool.
999    ///
1000    /// By default, the timeout for a thread is set to 10 seconds. This can
1001    /// be overridden using `.thread_keep_alive()`.
1002    ///
1003    /// # Example
1004    ///
1005    /// ```
1006    /// # #[cfg(not(target_family = "wasm"))]
1007    /// # {
1008    /// # use tokio::runtime;
1009    /// # use std::time::Duration;
1010    /// # pub fn main() {
1011    /// let rt = runtime::Builder::new_multi_thread()
1012    ///     .thread_keep_alive(Duration::from_millis(100))
1013    ///     .build();
1014    /// # }
1015    /// # }
1016    /// ```
1017    pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
1018        self.keep_alive = Some(duration);
1019        self
1020    }
1021
1022    /// Sets the number of scheduler ticks after which the scheduler will poll the global
1023    /// task queue.
1024    ///
1025    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1026    ///
1027    /// By default the global queue interval is 31 for the current-thread scheduler. Please see
1028    /// [the module documentation] for the default behavior of the multi-thread scheduler.
1029    ///
1030    /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
1031    /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
1032    /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
1033    /// getting started on new work, especially if tasks frequently yield rather than complete
1034    /// or await on further I/O. Setting the interval to `1` will prioritize the global queue and
1035    /// tasks from the local queue will be executed only if the global queue is empty.
1036    /// Conversely, a higher value prioritizes existing work, and is a good choice when most
1037    /// tasks quickly complete polling.
1038    ///
1039    /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
1040    ///
1041    /// # Panics
1042    ///
1043    /// This function will panic if 0 is passed as an argument.
1044    ///
1045    /// # Examples
1046    ///
1047    /// ```
1048    /// # #[cfg(not(target_family = "wasm"))]
1049    /// # {
1050    /// # use tokio::runtime;
1051    /// # pub fn main() {
1052    /// let rt = runtime::Builder::new_multi_thread()
1053    ///     .global_queue_interval(31)
1054    ///     .build();
1055    /// # }
1056    /// # }
1057    /// ```
1058    #[track_caller]
1059    pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1060        assert!(val > 0, "global_queue_interval must be greater than 0");
1061        self.global_queue_interval = Some(val);
1062        self
1063    }
1064
1065    /// Sets the number of scheduler ticks after which the scheduler will poll for
1066    /// external events (timers, I/O, and so on).
1067    ///
1068    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1069    ///
1070    /// By default, the event interval is `61` for all scheduler types.
1071    ///
1072    /// Setting the event interval determines the effective "priority" of delivering
1073    /// these external events (which may wake up additional tasks), compared to
1074    /// executing tasks that are currently ready to run. A smaller value is useful
1075    /// when tasks frequently spend a long time in polling, or frequently yield,
1076    /// which can result in overly long delays picking up I/O events. Conversely,
1077    /// picking up new events requires extra synchronization and syscall overhead,
1078    /// so if tasks generally complete their polling quickly, a higher event interval
1079    /// will minimize that overhead while still keeping the scheduler responsive to
1080    /// events.
1081    ///
1082    /// # Examples
1083    ///
1084    /// ```
1085    /// # #[cfg(not(target_family = "wasm"))]
1086    /// # {
1087    /// # use tokio::runtime;
1088    /// # pub fn main() {
1089    /// let rt = runtime::Builder::new_multi_thread()
1090    ///     .event_interval(31)
1091    ///     .build();
1092    /// # }
1093    /// # }
1094    /// ```
1095    pub fn event_interval(&mut self, val: u32) -> &mut Self {
1096        self.event_interval = val;
1097        self
1098    }
1099
1100    cfg_unstable! {
1101        /// Configure how the runtime responds to an unhandled panic on a
1102        /// spawned task.
1103        ///
1104        /// By default, an unhandled panic (i.e. a panic not caught by
1105        /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1106        /// execution. The panic's error value is forwarded to the task's
1107        /// [`JoinHandle`] and all other spawned tasks continue running.
1108        ///
1109        /// The `unhandled_panic` option enables configuring this behavior.
1110        ///
1111        /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1112        ///   spawned tasks have no impact on the runtime's execution.
1113        /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1114        ///   shutdown immediately when a spawned task panics even if that
1115        ///   task's `JoinHandle` has not been dropped. All other spawned tasks
1116        ///   will immediately terminate and further calls to
1117        ///   [`Runtime::block_on`] will panic.
1118        ///
1119        /// # Panics
1120        /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1121        /// on a runtime other than the current thread runtime.
1122        ///
1123        /// # Unstable
1124        ///
1125        /// This option is currently unstable and its implementation is
1126        /// incomplete. The API may change or be removed in the future. See
1127        /// issue [tokio-rs/tokio#4516] for more details.
1128        ///
1129        /// # Examples
1130        ///
1131        /// The following demonstrates a runtime configured to shutdown on
1132        /// panic. The first spawned task panics and results in the runtime
1133        /// shutting down. The second spawned task never has a chance to
1134        /// execute. The call to `block_on` will panic due to the runtime being
1135        /// forcibly shutdown.
1136        ///
1137        /// ```should_panic
1138        /// use tokio::runtime::{self, UnhandledPanic};
1139        ///
1140        /// # pub fn main() {
1141        /// let rt = runtime::Builder::new_current_thread()
1142        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1143        ///     .build()
1144        ///     .unwrap();
1145        ///
1146        /// rt.spawn(async { panic!("boom"); });
1147        /// rt.spawn(async {
1148        ///     // This task never completes.
1149        /// });
1150        ///
1151        /// rt.block_on(async {
1152        ///     // Do some work
1153        /// # loop { tokio::task::yield_now().await; }
1154        /// })
1155        /// # }
1156        /// ```
1157        ///
1158        /// [`JoinHandle`]: struct@crate::task::JoinHandle
1159        /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1160        pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1161            if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1162                panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1163            }
1164
1165            self.unhandled_panic = behavior;
1166            self
1167        }
1168
1169        /// Disables the LIFO task scheduler heuristic.
1170        ///
1171        /// The multi-threaded scheduler includes a heuristic for optimizing
1172        /// message-passing patterns. This heuristic results in the **last**
1173        /// scheduled task being polled first.
1174        ///
1175        /// To implement this heuristic, each worker thread has a slot which
1176        /// holds the task that should be polled next. However, this slot cannot
1177        /// be stolen by other worker threads, which can result in lower total
1178        /// throughput when tasks tend to have longer poll times.
1179        ///
1180        /// This configuration option will disable this heuristic resulting in
1181        /// all scheduled tasks being pushed into the worker-local queue, which
1182        /// is stealable.
1183        ///
1184        /// Consider trying this option when the task "scheduled" time is high
1185        /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1186        /// collect this data.
1187        ///
1188        /// # Unstable
1189        ///
1190        /// This configuration option is considered a workaround for the LIFO
1191        /// slot not being stealable. When the slot becomes stealable, we will
1192        /// revisit whether or not this option is necessary. See
1193        /// issue [tokio-rs/tokio#4941].
1194        ///
1195        /// # Examples
1196        ///
1197        /// ```
1198        /// # #[cfg(not(target_family = "wasm"))]
1199        /// # {
1200        /// use tokio::runtime;
1201        ///
1202        /// let rt = runtime::Builder::new_multi_thread()
1203        ///     .disable_lifo_slot()
1204        ///     .build()
1205        ///     .unwrap();
1206        /// # }
1207        /// ```
1208        ///
1209        /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1210        /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1211        pub fn disable_lifo_slot(&mut self) -> &mut Self {
1212            self.disable_lifo_slot = true;
1213            self
1214        }
1215
1216        /// Specifies the random number generation seed to use within all
1217        /// threads associated with the runtime being built.
1218        ///
1219        /// This option is intended to make certain parts of the runtime
1220        /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1221        /// [`tokio::select!`] it will ensure that the order that branches are
1222        /// polled is deterministic.
1223        ///
1224        /// In addition to the code specifying `rng_seed` and interacting with
1225        /// the runtime, the internals of Tokio and the Rust compiler may affect
1226        /// the sequences of random numbers. In order to ensure repeatable
1227        /// results, the version of Tokio, the versions of all other
1228        /// dependencies that interact with Tokio, and the Rust compiler version
1229        /// should also all remain constant.
1230        ///
1231        /// # Examples
1232        ///
1233        /// ```
1234        /// # use tokio::runtime::{self, RngSeed};
1235        /// # pub fn main() {
1236        /// let seed = RngSeed::from_bytes(b"place your seed here");
1237        /// let rt = runtime::Builder::new_current_thread()
1238        ///     .rng_seed(seed)
1239        ///     .build();
1240        /// # }
1241        /// ```
1242        ///
1243        /// [`tokio::select!`]: crate::select
1244        pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1245            self.seed_generator = RngSeedGenerator::new(seed);
1246            self
1247        }
1248    }
1249
1250    cfg_unstable_metrics! {
1251        /// Enables tracking the distribution of task poll times.
1252        ///
1253        /// Task poll times are not instrumented by default as doing so requires
1254        /// calling [`Instant::now()`] twice per task poll, which could add
1255        /// measurable overhead. Use the [`Handle::metrics()`] to access the
1256        /// metrics data.
1257        ///
1258        /// The histogram uses fixed bucket sizes. In other words, the histogram
1259        /// buckets are not dynamic based on input values. Use the
1260        /// `metrics_poll_time_histogram` builder methods to configure the
1261        /// histogram details.
1262        ///
1263        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1264        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1265        /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1266        /// to select [`LogHistogram`] instead.
1267        ///
1268        /// # Examples
1269        ///
1270        /// ```
1271        /// # #[cfg(not(target_family = "wasm"))]
1272        /// # {
1273        /// use tokio::runtime;
1274        ///
1275        /// let rt = runtime::Builder::new_multi_thread()
1276        ///     .enable_metrics_poll_time_histogram()
1277        ///     .build()
1278        ///     .unwrap();
1279        /// # // Test default values here
1280        /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1281        /// # let m = rt.handle().metrics();
1282        /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1283        /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1284        /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1285        /// # }
1286        /// ```
1287        ///
1288        /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1289        /// [`Instant::now()`]: std::time::Instant::now
1290        /// [`LogHistogram`]: crate::runtime::LogHistogram
1291        /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1292        pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1293            self.metrics_poll_count_histogram_enable = true;
1294            self
1295        }
1296
1297        /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1298        ///
1299        /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1300        #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1301        #[doc(hidden)]
1302        pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1303            self.enable_metrics_poll_time_histogram()
1304        }
1305
1306        /// Sets the histogram scale for tracking the distribution of task poll
1307        /// times.
1308        ///
1309        /// Tracking the distribution of task poll times can be done using a
1310        /// linear or log scale. When using linear scale, each histogram bucket
1311        /// will represent the same range of poll times. When using log scale,
1312        /// each histogram bucket will cover a range twice as big as the
1313        /// previous bucket.
1314        ///
1315        /// **Default:** linear scale.
1316        ///
1317        /// # Examples
1318        ///
1319        /// ```
1320        /// # #[cfg(not(target_family = "wasm"))]
1321        /// # {
1322        /// use tokio::runtime::{self, HistogramScale};
1323        ///
1324        /// # #[allow(deprecated)]
1325        /// let rt = runtime::Builder::new_multi_thread()
1326        ///     .enable_metrics_poll_time_histogram()
1327        ///     .metrics_poll_count_histogram_scale(HistogramScale::Log)
1328        ///     .build()
1329        ///     .unwrap();
1330        /// # }
1331        /// ```
1332        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1333        pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1334            self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1335            self
1336        }
1337
1338        /// Configure the histogram for tracking poll times
1339        ///
1340        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1341        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1342        /// better granularity with low memory usage, use [`LogHistogram`] instead.
1343        ///
1344        /// # Examples
1345        /// Configure a [`LogHistogram`] with [default configuration]:
1346        /// ```
1347        /// # #[cfg(not(target_family = "wasm"))]
1348        /// # {
1349        /// use tokio::runtime;
1350        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1351        ///
1352        /// let rt = runtime::Builder::new_multi_thread()
1353        ///     .enable_metrics_poll_time_histogram()
1354        ///     .metrics_poll_time_histogram_configuration(
1355        ///         HistogramConfiguration::log(LogHistogram::default())
1356        ///     )
1357        ///     .build()
1358        ///     .unwrap();
1359        /// # }
1360        /// ```
1361        ///
1362        /// Configure a linear histogram with 100 buckets, each 10μs wide
1363        /// ```
1364        /// # #[cfg(not(target_family = "wasm"))]
1365        /// # {
1366        /// use tokio::runtime;
1367        /// use std::time::Duration;
1368        /// use tokio::runtime::HistogramConfiguration;
1369        ///
1370        /// let rt = runtime::Builder::new_multi_thread()
1371        ///     .enable_metrics_poll_time_histogram()
1372        ///     .metrics_poll_time_histogram_configuration(
1373        ///         HistogramConfiguration::linear(Duration::from_micros(10), 100)
1374        ///     )
1375        ///     .build()
1376        ///     .unwrap();
1377        /// # }
1378        /// ```
1379        ///
1380        /// Configure a [`LogHistogram`] with the following settings:
1381        /// - Measure times from 100ns to 120s
1382        /// - Max error of 0.1
1383        /// - No more than 1024 buckets
1384        /// ```
1385        /// # #[cfg(not(target_family = "wasm"))]
1386        /// # {
1387        /// use std::time::Duration;
1388        /// use tokio::runtime;
1389        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1390        ///
1391        /// let rt = runtime::Builder::new_multi_thread()
1392        ///     .enable_metrics_poll_time_histogram()
1393        ///     .metrics_poll_time_histogram_configuration(
1394        ///         HistogramConfiguration::log(LogHistogram::builder()
1395        ///             .max_value(Duration::from_secs(120))
1396        ///             .min_value(Duration::from_nanos(100))
1397        ///             .max_error(0.1)
1398        ///             .max_buckets(1024)
1399        ///             .expect("configuration uses 488 buckets")
1400        ///         )
1401        ///     )
1402        ///     .build()
1403        ///     .unwrap();
1404        /// # }
1405        /// ```
1406        ///
1407        /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1408        /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1409        /// where each bucket is twice the size of the previous bucket.
1410        /// ```rust
1411        /// use std::time::Duration;
1412        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1413        /// let rt = tokio::runtime::Builder::new_current_thread()
1414        ///     .enable_all()
1415        ///     .enable_metrics_poll_time_histogram()
1416        ///     .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1417        ///         LogHistogram::builder()
1418        ///             .min_value(Duration::from_micros(20))
1419        ///             .max_value(Duration::from_millis(4))
1420        ///             // Set `precision_exact` to `0` to match `HistogramScale::Log`
1421        ///             .precision_exact(0)
1422        ///             .max_buckets(10)
1423        ///             .unwrap(),
1424        ///     ))
1425        ///     .build()
1426        ///     .unwrap();
1427        /// ```
1428        ///
1429        /// [`LogHistogram`]: crate::runtime::LogHistogram
1430        /// [default configuration]: crate::runtime::LogHistogramBuilder
1431        /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1432        pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1433            self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1434            self
1435        }
1436
1437        /// Sets the histogram resolution for tracking the distribution of task
1438        /// poll times.
1439        ///
1440        /// The resolution is the histogram's first bucket's range. When using a
1441        /// linear histogram scale, each bucket will cover the same range. When
1442        /// using a log scale, each bucket will cover a range twice as big as
1443        /// the previous bucket. In the log case, the resolution represents the
1444        /// smallest bucket range.
1445        ///
1446        /// Note that, when using log scale, the resolution is rounded up to the
1447        /// nearest power of 2 in nanoseconds.
1448        ///
1449        /// **Default:** 100 microseconds.
1450        ///
1451        /// # Examples
1452        ///
1453        /// ```
1454        /// # #[cfg(not(target_family = "wasm"))]
1455        /// # {
1456        /// use tokio::runtime;
1457        /// use std::time::Duration;
1458        ///
1459        /// # #[allow(deprecated)]
1460        /// let rt = runtime::Builder::new_multi_thread()
1461        ///     .enable_metrics_poll_time_histogram()
1462        ///     .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1463        ///     .build()
1464        ///     .unwrap();
1465        /// # }
1466        /// ```
1467        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1468        pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1469            assert!(resolution > Duration::from_secs(0));
1470            // Sanity check the argument and also make the cast below safe.
1471            assert!(resolution <= Duration::from_secs(1));
1472
1473            let resolution = resolution.as_nanos() as u64;
1474
1475            self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1476            self
1477        }
1478
1479        /// Sets the number of buckets for the histogram tracking the
1480        /// distribution of task poll times.
1481        ///
1482        /// The last bucket tracks all greater values that fall out of other
1483        /// ranges. So, configuring the histogram using a linear scale,
1484        /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1485        /// polls that take more than 450ms to complete.
1486        ///
1487        /// **Default:** 10
1488        ///
1489        /// # Examples
1490        ///
1491        /// ```
1492        /// # #[cfg(not(target_family = "wasm"))]
1493        /// # {
1494        /// use tokio::runtime;
1495        ///
1496        /// # #[allow(deprecated)]
1497        /// let rt = runtime::Builder::new_multi_thread()
1498        ///     .enable_metrics_poll_time_histogram()
1499        ///     .metrics_poll_count_histogram_buckets(15)
1500        ///     .build()
1501        ///     .unwrap();
1502        /// # }
1503        /// ```
1504        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1505        pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1506            self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1507            self
1508        }
1509    }
1510
1511    fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1512        use crate::runtime::runtime::Scheduler;
1513
1514        let (scheduler, handle, blocking_pool) =
1515            self.build_current_thread_runtime_components(None)?;
1516
1517        Ok(Runtime::from_parts(
1518            Scheduler::CurrentThread(scheduler),
1519            handle,
1520            blocking_pool,
1521        ))
1522    }
1523
1524    #[cfg(tokio_unstable)]
1525    fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1526        use crate::runtime::local_runtime::LocalRuntimeScheduler;
1527
1528        let tid = std::thread::current().id();
1529
1530        let (scheduler, handle, blocking_pool) =
1531            self.build_current_thread_runtime_components(Some(tid))?;
1532
1533        Ok(LocalRuntime::from_parts(
1534            LocalRuntimeScheduler::CurrentThread(scheduler),
1535            handle,
1536            blocking_pool,
1537        ))
1538    }
1539
1540    fn build_current_thread_runtime_components(
1541        &mut self,
1542        local_tid: Option<ThreadId>,
1543    ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1544        use crate::runtime::scheduler;
1545        use crate::runtime::Config;
1546
1547        let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1548
1549        // Blocking pool
1550        let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1551        let blocking_spawner = blocking_pool.spawner().clone();
1552
1553        // Generate a rng seed for this runtime.
1554        let seed_generator_1 = self.seed_generator.next_generator();
1555        let seed_generator_2 = self.seed_generator.next_generator();
1556
1557        // And now put a single-threaded scheduler on top of the timer. When
1558        // there are no futures ready to do something, it'll let the timer or
1559        // the reactor to generate some new stimuli for the futures to continue
1560        // in their life.
1561        let (scheduler, handle) = CurrentThread::new(
1562            driver,
1563            driver_handle,
1564            blocking_spawner,
1565            seed_generator_2,
1566            Config {
1567                before_park: self.before_park.clone(),
1568                after_unpark: self.after_unpark.clone(),
1569                before_spawn: self.before_spawn.clone(),
1570                #[cfg(tokio_unstable)]
1571                before_poll: self.before_poll.clone(),
1572                #[cfg(tokio_unstable)]
1573                after_poll: self.after_poll.clone(),
1574                after_termination: self.after_termination.clone(),
1575                global_queue_interval: self.global_queue_interval,
1576                event_interval: self.event_interval,
1577                #[cfg(tokio_unstable)]
1578                unhandled_panic: self.unhandled_panic.clone(),
1579                disable_lifo_slot: self.disable_lifo_slot,
1580                seed_generator: seed_generator_1,
1581                metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1582            },
1583            local_tid,
1584        );
1585
1586        let handle = Handle {
1587            inner: scheduler::Handle::CurrentThread(handle),
1588        };
1589
1590        Ok((scheduler, handle, blocking_pool))
1591    }
1592
1593    fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1594        if self.metrics_poll_count_histogram_enable {
1595            Some(self.metrics_poll_count_histogram.clone())
1596        } else {
1597            None
1598        }
1599    }
1600}
1601
1602cfg_io_driver! {
1603    impl Builder {
1604        /// Enables the I/O driver.
1605        ///
1606        /// Doing this enables using net, process, signal, and some I/O types on
1607        /// the runtime.
1608        ///
1609        /// # Examples
1610        ///
1611        /// ```
1612        /// use tokio::runtime;
1613        ///
1614        /// let rt = runtime::Builder::new_multi_thread()
1615        ///     .enable_io()
1616        ///     .build()
1617        ///     .unwrap();
1618        /// ```
1619        pub fn enable_io(&mut self) -> &mut Self {
1620            self.enable_io = true;
1621            self
1622        }
1623
1624        /// Enables the I/O driver and configures the max number of events to be
1625        /// processed per tick.
1626        ///
1627        /// # Examples
1628        ///
1629        /// ```
1630        /// use tokio::runtime;
1631        ///
1632        /// let rt = runtime::Builder::new_current_thread()
1633        ///     .enable_io()
1634        ///     .max_io_events_per_tick(1024)
1635        ///     .build()
1636        ///     .unwrap();
1637        /// ```
1638        pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1639            self.nevents = capacity;
1640            self
1641        }
1642    }
1643}
1644
1645cfg_time! {
1646    impl Builder {
1647        /// Enables the time driver.
1648        ///
1649        /// Doing this enables using `tokio::time` on the runtime.
1650        ///
1651        /// # Examples
1652        ///
1653        /// ```
1654        /// # #[cfg(not(target_family = "wasm"))]
1655        /// # {
1656        /// use tokio::runtime;
1657        ///
1658        /// let rt = runtime::Builder::new_multi_thread()
1659        ///     .enable_time()
1660        ///     .build()
1661        ///     .unwrap();
1662        /// # }
1663        /// ```
1664        pub fn enable_time(&mut self) -> &mut Self {
1665            self.enable_time = true;
1666            self
1667        }
1668    }
1669}
1670
1671cfg_io_uring! {
1672    impl Builder {
1673        /// Enables the tokio's io_uring driver.
1674        ///
1675        /// Doing this enables using io_uring operations on the runtime.
1676        ///
1677        /// # Examples
1678        ///
1679        /// ```
1680        /// use tokio::runtime;
1681        ///
1682        /// let rt = runtime::Builder::new_multi_thread()
1683        ///     .enable_io_uring()
1684        ///     .build()
1685        ///     .unwrap();
1686        /// ```
1687        #[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))]
1688        pub fn enable_io_uring(&mut self) -> &mut Self {
1689            // Currently, the uring flag is equivalent to `enable_io`.
1690            self.enable_io = true;
1691            self
1692        }
1693    }
1694}
1695
1696cfg_test_util! {
1697    impl Builder {
1698        /// Controls if the runtime's clock starts paused or advancing.
1699        ///
1700        /// Pausing time requires the current-thread runtime; construction of
1701        /// the runtime will panic otherwise.
1702        ///
1703        /// # Examples
1704        ///
1705        /// ```
1706        /// use tokio::runtime;
1707        ///
1708        /// let rt = runtime::Builder::new_current_thread()
1709        ///     .enable_time()
1710        ///     .start_paused(true)
1711        ///     .build()
1712        ///     .unwrap();
1713        /// ```
1714        pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1715            self.start_paused = start_paused;
1716            self
1717        }
1718    }
1719}
1720
1721cfg_rt_multi_thread! {
1722    impl Builder {
1723        fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1724            use crate::loom::sys::num_cpus;
1725            use crate::runtime::{Config, runtime::Scheduler};
1726            use crate::runtime::scheduler::{self, MultiThread};
1727
1728            let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1729
1730            let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1731
1732            // Create the blocking pool
1733            let blocking_pool =
1734                blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1735            let blocking_spawner = blocking_pool.spawner().clone();
1736
1737            // Generate a rng seed for this runtime.
1738            let seed_generator_1 = self.seed_generator.next_generator();
1739            let seed_generator_2 = self.seed_generator.next_generator();
1740
1741            let (scheduler, handle, launch) = MultiThread::new(
1742                worker_threads,
1743                driver,
1744                driver_handle,
1745                blocking_spawner,
1746                seed_generator_2,
1747                Config {
1748                    before_park: self.before_park.clone(),
1749                    after_unpark: self.after_unpark.clone(),
1750                    before_spawn: self.before_spawn.clone(),
1751                    #[cfg(tokio_unstable)]
1752                    before_poll: self.before_poll.clone(),
1753                    #[cfg(tokio_unstable)]
1754                    after_poll: self.after_poll.clone(),
1755                    after_termination: self.after_termination.clone(),
1756                    global_queue_interval: self.global_queue_interval,
1757                    event_interval: self.event_interval,
1758                    #[cfg(tokio_unstable)]
1759                    unhandled_panic: self.unhandled_panic.clone(),
1760                    disable_lifo_slot: self.disable_lifo_slot,
1761                    seed_generator: seed_generator_1,
1762                    metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1763                },
1764            );
1765
1766            let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1767
1768            // Spawn the thread pool workers
1769            let _enter = handle.enter();
1770            launch.launch();
1771
1772            Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1773        }
1774    }
1775}
1776
1777impl fmt::Debug for Builder {
1778    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1779        fmt.debug_struct("Builder")
1780            .field("worker_threads", &self.worker_threads)
1781            .field("max_blocking_threads", &self.max_blocking_threads)
1782            .field(
1783                "thread_name",
1784                &"<dyn Fn() -> String + Send + Sync + 'static>",
1785            )
1786            .field("thread_stack_size", &self.thread_stack_size)
1787            .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1788            .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1789            .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1790            .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1791            .finish()
1792    }
1793}