tokio\runtime/
runtime.rs

1use super::BOX_FUTURE_THRESHOLD;
2use crate::runtime::blocking::BlockingPool;
3use crate::runtime::scheduler::CurrentThread;
4use crate::runtime::{context, EnterGuard, Handle};
5use crate::task::JoinHandle;
6use crate::util::trace::SpawnMeta;
7
8use std::future::Future;
9use std::mem;
10use std::time::Duration;
11
12cfg_rt_multi_thread! {
13    use crate::runtime::Builder;
14    use crate::runtime::scheduler::MultiThread;
15}
16
17/// The Tokio runtime.
18///
19/// The runtime provides an I/O driver, task scheduler, [timer], and
20/// blocking pool, necessary for running asynchronous tasks.
21///
22/// Instances of `Runtime` can be created using [`new`], or [`Builder`].
23/// However, most users will use the [`#[tokio::main]`][main] annotation on
24/// their entry point instead.
25///
26/// See [module level][mod] documentation for more details.
27///
28/// # Shutdown
29///
30/// Shutting down the runtime is done by dropping the value, or calling
31/// [`shutdown_background`] or [`shutdown_timeout`].
32///
33/// Tasks spawned through [`Runtime::spawn`] keep running until they yield.
34/// Then they are dropped. They are not *guaranteed* to run to completion, but
35/// *might* do so if they do not yield until completion.
36///
37/// Blocking functions spawned through [`Runtime::spawn_blocking`] keep running
38/// until they return.
39///
40/// The thread initiating the shutdown blocks until all spawned work has been
41/// stopped. This can take an indefinite amount of time. The `Drop`
42/// implementation waits forever for this.
43///
44/// The [`shutdown_background`] and [`shutdown_timeout`] methods can be used if
45/// waiting forever is undesired. When the timeout is reached, spawned work that
46/// did not stop in time and threads running it are leaked. The work continues
47/// to run until one of the stopping conditions is fulfilled, but the thread
48/// initiating the shutdown is unblocked.
49///
50/// Once the runtime has been dropped, any outstanding I/O resources bound to
51/// it will no longer function. Calling any method on them will result in an
52/// error.
53///
54/// # Sharing
55///
56/// There are several ways to establish shared access to a Tokio runtime:
57///
58///  * Using an <code>[Arc]\<Runtime></code>.
59///  * Using a [`Handle`].
60///  * Entering the runtime context.
61///
62/// Using an <code>[Arc]\<Runtime></code> or [`Handle`] allows you to do various
63/// things with the runtime such as spawning new tasks or entering the runtime
64/// context. Both types can be cloned to create a new handle that allows access
65/// to the same runtime. By passing clones into different tasks or threads, you
66/// will be able to access the runtime from those tasks or threads.
67///
68/// The difference between <code>[Arc]\<Runtime></code> and [`Handle`] is that
69/// an <code>[Arc]\<Runtime></code> will prevent the runtime from shutting down,
70/// whereas a [`Handle`] does not prevent that. This is because shutdown of the
71/// runtime happens when the destructor of the `Runtime` object runs.
72///
73/// Calls to [`shutdown_background`] and [`shutdown_timeout`] require exclusive
74/// ownership of the `Runtime` type. When using an <code>[Arc]\<Runtime></code>,
75/// this can be achieved via [`Arc::try_unwrap`] when only one strong count
76/// reference is left over.
77///
78/// The runtime context is entered using the [`Runtime::enter`] or
79/// [`Handle::enter`] methods, which use a thread-local variable to store the
80/// current runtime. Whenever you are inside the runtime context, methods such
81/// as [`tokio::spawn`] will use the runtime whose context you are inside.
82///
83/// [timer]: crate::time
84/// [mod]: index.html
85/// [`new`]: method@Self::new
86/// [`Builder`]: struct@Builder
87/// [`Handle`]: struct@Handle
88/// [main]: macro@crate::main
89/// [`tokio::spawn`]: crate::spawn
90/// [`Arc::try_unwrap`]: std::sync::Arc::try_unwrap
91/// [Arc]: std::sync::Arc
92/// [`shutdown_background`]: method@Runtime::shutdown_background
93/// [`shutdown_timeout`]: method@Runtime::shutdown_timeout
94#[derive(Debug)]
95pub struct Runtime {
96    /// Task scheduler
97    scheduler: Scheduler,
98
99    /// Handle to runtime, also contains driver handles
100    handle: Handle,
101
102    /// Blocking pool handle, used to signal shutdown
103    blocking_pool: BlockingPool,
104}
105
106/// The flavor of a `Runtime`.
107///
108/// This is the return type for [`Handle::runtime_flavor`](crate::runtime::Handle::runtime_flavor()).
109#[derive(Debug, PartialEq, Eq)]
110#[non_exhaustive]
111pub enum RuntimeFlavor {
112    /// The flavor that executes all tasks on the current thread.
113    CurrentThread,
114    /// The flavor that executes tasks across multiple threads.
115    MultiThread,
116}
117
118/// The runtime scheduler is either a multi-thread or a current-thread executor.
119#[derive(Debug)]
120pub(super) enum Scheduler {
121    /// Execute all tasks on the current-thread.
122    CurrentThread(CurrentThread),
123
124    /// Execute tasks across multiple threads.
125    #[cfg(feature = "rt-multi-thread")]
126    MultiThread(MultiThread),
127}
128
129impl Runtime {
130    pub(super) fn from_parts(
131        scheduler: Scheduler,
132        handle: Handle,
133        blocking_pool: BlockingPool,
134    ) -> Runtime {
135        Runtime {
136            scheduler,
137            handle,
138            blocking_pool,
139        }
140    }
141
142    /// Creates a new runtime instance with default configuration values.
143    ///
144    /// This results in the multi threaded scheduler, I/O driver, and time driver being
145    /// initialized.
146    ///
147    /// Most applications will not need to call this function directly. Instead,
148    /// they will use the  [`#[tokio::main]` attribute][main]. When a more complex
149    /// configuration is necessary, the [runtime builder] may be used.
150    ///
151    /// See [module level][mod] documentation for more details.
152    ///
153    /// # Examples
154    ///
155    /// Creating a new `Runtime` with default configuration values.
156    ///
157    /// ```
158    /// use tokio::runtime::Runtime;
159    ///
160    /// let rt = Runtime::new()
161    ///     .unwrap();
162    ///
163    /// // Use the runtime...
164    /// ```
165    ///
166    /// [mod]: index.html
167    /// [main]: ../attr.main.html
168    /// [threaded scheduler]: index.html#threaded-scheduler
169    /// [runtime builder]: crate::runtime::Builder
170    #[cfg(feature = "rt-multi-thread")]
171    #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
172    pub fn new() -> std::io::Result<Runtime> {
173        Builder::new_multi_thread().enable_all().build()
174    }
175
176    /// Returns a handle to the runtime's spawner.
177    ///
178    /// The returned handle can be used to spawn tasks that run on this runtime, and can
179    /// be cloned to allow moving the `Handle` to other threads.
180    ///
181    /// Calling [`Handle::block_on`] on a handle to a `current_thread` runtime is error-prone.
182    /// Refer to the documentation of [`Handle::block_on`] for more.
183    ///
184    /// # Examples
185    ///
186    /// ```
187    /// # #[cfg(not(target_family = "wasm"))]
188    /// # {
189    /// use tokio::runtime::Runtime;
190    ///
191    /// let rt = Runtime::new()
192    ///     .unwrap();
193    ///
194    /// let handle = rt.handle();
195    ///
196    /// // Use the handle...
197    /// # }
198    /// ```
199    pub fn handle(&self) -> &Handle {
200        &self.handle
201    }
202
203    /// Spawns a future onto the Tokio runtime.
204    ///
205    /// This spawns the given future onto the runtime's executor, usually a
206    /// thread pool. The thread pool is then responsible for polling the future
207    /// until it completes.
208    ///
209    /// The provided future will start running in the background immediately
210    /// when `spawn` is called, even if you don't await the returned
211    /// `JoinHandle`.
212    ///
213    /// See [module level][mod] documentation for more details.
214    ///
215    /// [mod]: index.html
216    ///
217    /// # Examples
218    ///
219    /// ```
220    /// # #[cfg(not(target_family = "wasm"))]
221    /// # {
222    /// use tokio::runtime::Runtime;
223    ///
224    /// # fn dox() {
225    /// // Create the runtime
226    /// let rt = Runtime::new().unwrap();
227    ///
228    /// // Spawn a future onto the runtime
229    /// rt.spawn(async {
230    ///     println!("now running on a worker thread");
231    /// });
232    /// # }
233    /// # }
234    /// ```
235    #[track_caller]
236    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
237    where
238        F: Future + Send + 'static,
239        F::Output: Send + 'static,
240    {
241        let fut_size = mem::size_of::<F>();
242        if fut_size > BOX_FUTURE_THRESHOLD {
243            self.handle
244                .spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
245        } else {
246            self.handle
247                .spawn_named(future, SpawnMeta::new_unnamed(fut_size))
248        }
249    }
250
251    /// Runs the provided function on an executor dedicated to blocking operations.
252    ///
253    /// # Examples
254    ///
255    /// ```
256    /// # #[cfg(not(target_family = "wasm"))]
257    /// # {
258    /// use tokio::runtime::Runtime;
259    ///
260    /// # fn dox() {
261    /// // Create the runtime
262    /// let rt = Runtime::new().unwrap();
263    ///
264    /// // Spawn a blocking function onto the runtime
265    /// rt.spawn_blocking(|| {
266    ///     println!("now running on a worker thread");
267    /// });
268    /// # }
269    /// # }
270    /// ```
271    #[track_caller]
272    pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
273    where
274        F: FnOnce() -> R + Send + 'static,
275        R: Send + 'static,
276    {
277        self.handle.spawn_blocking(func)
278    }
279
280    /// Runs a future to completion on the Tokio runtime. This is the
281    /// runtime's entry point.
282    ///
283    /// This runs the given future on the current thread, blocking until it is
284    /// complete, and yielding its resolved result. Any tasks or timers
285    /// which the future spawns internally will be executed on the runtime.
286    ///
287    /// # Non-worker future
288    ///
289    /// Note that the future required by this function does not run as a
290    /// worker. The expectation is that other tasks are spawned by the future here.
291    /// Awaiting on other futures from the future provided here will not
292    /// perform as fast as those spawned as workers.
293    ///
294    /// # Multi thread scheduler
295    ///
296    /// When the multi thread scheduler is used this will allow futures
297    /// to run within the io driver and timer context of the overall runtime.
298    ///
299    /// Any spawned tasks will continue running after `block_on` returns.
300    ///
301    /// # Current thread scheduler
302    ///
303    /// When the current thread scheduler is enabled `block_on`
304    /// can be called concurrently from multiple threads. The first call
305    /// will take ownership of the io and timer drivers. This means
306    /// other threads which do not own the drivers will hook into that one.
307    /// When the first `block_on` completes, other threads will be able to
308    /// "steal" the driver to allow continued execution of their futures.
309    ///
310    /// Any spawned tasks will be suspended after `block_on` returns. Calling
311    /// `block_on` again will resume previously spawned tasks.
312    ///
313    /// # Panics
314    ///
315    /// This function panics if the provided future panics, or if called within an
316    /// asynchronous execution context.
317    ///
318    /// # Examples
319    ///
320    /// ```no_run
321    /// # #[cfg(not(target_family = "wasm"))]
322    /// # {
323    /// use tokio::runtime::Runtime;
324    ///
325    /// // Create the runtime
326    /// let rt  = Runtime::new().unwrap();
327    ///
328    /// // Execute the future, blocking the current thread until completion
329    /// rt.block_on(async {
330    ///     println!("hello");
331    /// });
332    /// # }
333    /// ```
334    ///
335    /// [handle]: fn@Handle::block_on
336    #[track_caller]
337    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
338        let fut_size = mem::size_of::<F>();
339        if fut_size > BOX_FUTURE_THRESHOLD {
340            self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
341        } else {
342            self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size))
343        }
344    }
345
346    #[track_caller]
347    fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
348        #[cfg(all(
349            tokio_unstable,
350            feature = "taskdump",
351            feature = "rt",
352            target_os = "linux",
353            any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
354        ))]
355        let future = super::task::trace::Trace::root(future);
356
357        #[cfg(all(tokio_unstable, feature = "tracing"))]
358        let future = crate::util::trace::task(
359            future,
360            "block_on",
361            _meta,
362            crate::runtime::task::Id::next().as_u64(),
363        );
364
365        let _enter = self.enter();
366
367        match &self.scheduler {
368            Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
369            #[cfg(feature = "rt-multi-thread")]
370            Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
371        }
372    }
373
374    /// Enters the runtime context.
375    ///
376    /// This allows you to construct types that must have an executor
377    /// available on creation such as [`Sleep`] or [`TcpStream`]. It will
378    /// also allow you to call methods such as [`tokio::spawn`].
379    ///
380    /// [`Sleep`]: struct@crate::time::Sleep
381    /// [`TcpStream`]: struct@crate::net::TcpStream
382    /// [`tokio::spawn`]: fn@crate::spawn
383    ///
384    /// # Example
385    ///
386    /// ```
387    /// # #[cfg(not(target_family = "wasm"))]
388    /// # {
389    /// use tokio::runtime::Runtime;
390    /// use tokio::task::JoinHandle;
391    ///
392    /// fn function_that_spawns(msg: String) -> JoinHandle<()> {
393    ///     // Had we not used `rt.enter` below, this would panic.
394    ///     tokio::spawn(async move {
395    ///         println!("{}", msg);
396    ///     })
397    /// }
398    ///
399    /// fn main() {
400    ///     let rt = Runtime::new().unwrap();
401    ///
402    ///     let s = "Hello World!".to_string();
403    ///
404    ///     // By entering the context, we tie `tokio::spawn` to this executor.
405    ///     let _guard = rt.enter();
406    ///     let handle = function_that_spawns(s);
407    ///
408    ///     // Wait for the task before we end the test.
409    ///     rt.block_on(handle).unwrap();
410    /// }
411    /// # }
412    /// ```
413    pub fn enter(&self) -> EnterGuard<'_> {
414        self.handle.enter()
415    }
416
417    /// Shuts down the runtime, waiting for at most `duration` for all spawned
418    /// work to stop.
419    ///
420    /// See the [struct level documentation](Runtime#shutdown) for more details.
421    ///
422    /// # Examples
423    ///
424    /// ```
425    /// # #[cfg(not(target_family = "wasm"))]
426    /// # {
427    /// use tokio::runtime::Runtime;
428    /// use tokio::task;
429    ///
430    /// use std::thread;
431    /// use std::time::Duration;
432    ///
433    /// fn main() {
434    /// #  if cfg!(miri) { return } // Miri reports error when main thread terminated without waiting all remaining threads.
435    ///    let runtime = Runtime::new().unwrap();
436    ///
437    ///    runtime.block_on(async move {
438    ///        task::spawn_blocking(move || {
439    ///            thread::sleep(Duration::from_secs(10_000));
440    ///        });
441    ///    });
442    ///
443    ///    runtime.shutdown_timeout(Duration::from_millis(100));
444    /// }
445    /// # }
446    /// ```
447    pub fn shutdown_timeout(mut self, duration: Duration) {
448        // Wakeup and shutdown all the worker threads
449        self.handle.inner.shutdown();
450        self.blocking_pool.shutdown(Some(duration));
451    }
452
453    /// Shuts down the runtime, without waiting for any spawned work to stop.
454    ///
455    /// This can be useful if you want to drop a runtime from within another runtime.
456    /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
457    /// to complete, which would normally not be permitted within an asynchronous context.
458    /// By calling `shutdown_background()`, you can drop the runtime from such a context.
459    ///
460    /// Note however, that because we do not wait for any blocking tasks to complete, this
461    /// may result in a resource leak (in that any blocking tasks are still running until they
462    /// return.
463    ///
464    /// See the [struct level documentation](Runtime#shutdown) for more details.
465    ///
466    /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`.
467    ///
468    /// ```
469    /// # #[cfg(not(target_family = "wasm"))]
470    /// # {
471    /// use tokio::runtime::Runtime;
472    ///
473    /// fn main() {
474    ///    let runtime = Runtime::new().unwrap();
475    ///
476    ///    runtime.block_on(async move {
477    ///        let inner_runtime = Runtime::new().unwrap();
478    ///        // ...
479    ///        inner_runtime.shutdown_background();
480    ///    });
481    /// }
482    /// # }
483    /// ```
484    pub fn shutdown_background(self) {
485        self.shutdown_timeout(Duration::from_nanos(0));
486    }
487
488    /// Returns a view that lets you get information about how the runtime
489    /// is performing.
490    pub fn metrics(&self) -> crate::runtime::RuntimeMetrics {
491        self.handle.metrics()
492    }
493}
494
495impl Drop for Runtime {
496    fn drop(&mut self) {
497        match &mut self.scheduler {
498            Scheduler::CurrentThread(current_thread) => {
499                // This ensures that tasks spawned on the current-thread
500                // runtime are dropped inside the runtime's context.
501                let _guard = context::try_set_current(&self.handle.inner);
502                current_thread.shutdown(&self.handle.inner);
503            }
504            #[cfg(feature = "rt-multi-thread")]
505            Scheduler::MultiThread(multi_thread) => {
506                // The threaded scheduler drops its tasks on its worker threads, which is
507                // already in the runtime's context.
508                multi_thread.shutdown(&self.handle.inner);
509            }
510        }
511    }
512}
513
514impl std::panic::UnwindSafe for Runtime {}
515
516impl std::panic::RefUnwindSafe for Runtime {}