tokio/task/local.rs
1//! Runs `!Send` futures on the current thread.
2use crate::loom::cell::UnsafeCell;
3use crate::loom::sync::{Arc, Mutex};
4#[cfg(tokio_unstable)]
5use crate::runtime;
6use crate::runtime::task::{
7 self, JoinHandle, LocalOwnedTasks, SpawnLocation, Task, TaskHarnessScheduleHooks,
8};
9use crate::runtime::{context, ThreadId, BOX_FUTURE_THRESHOLD};
10use crate::sync::AtomicWaker;
11use crate::util::trace::SpawnMeta;
12use crate::util::RcCell;
13
14use std::cell::Cell;
15use std::collections::VecDeque;
16use std::fmt;
17use std::future::Future;
18use std::marker::PhantomData;
19use std::mem;
20use std::pin::Pin;
21use std::rc::Rc;
22use std::task::Poll;
23
24use pin_project_lite::pin_project;
25
26cfg_rt! {
27 /// A set of tasks which are executed on the same thread.
28 ///
29 /// In some cases, it is necessary to run one or more futures that do not
30 /// implement [`Send`] and thus are unsafe to send between threads. In these
31 /// cases, a [local task set] may be used to schedule one or more `!Send`
32 /// futures to run together on the same thread.
33 ///
34 /// For example, the following code will not compile:
35 ///
36 /// ```rust,compile_fail
37 /// use std::rc::Rc;
38 ///
39 /// #[tokio::main]
40 /// async fn main() {
41 /// // `Rc` does not implement `Send`, and thus may not be sent between
42 /// // threads safely.
43 /// let nonsend_data = Rc::new("my nonsend data...");
44 ///
45 /// let nonsend_data = nonsend_data.clone();
46 /// // Because the `async` block here moves `nonsend_data`, the future is `!Send`.
47 /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this
48 /// // will not compile.
49 /// tokio::spawn(async move {
50 /// println!("{}", nonsend_data);
51 /// // ...
52 /// }).await.unwrap();
53 /// }
54 /// ```
55 ///
56 /// # Use with `run_until`
57 ///
58 /// To spawn `!Send` futures, we can use a local task set to schedule them
59 /// on the thread calling [`Runtime::block_on`]. When running inside of the
60 /// local task set, we can use [`task::spawn_local`], which can spawn
61 /// `!Send` futures. For example:
62 ///
63 /// ```rust
64 /// use std::rc::Rc;
65 /// use tokio::task;
66 ///
67 /// #[tokio::main]
68 /// async fn main() {
69 /// let nonsend_data = Rc::new("my nonsend data...");
70 ///
71 /// // Construct a local task set that can run `!Send` futures.
72 /// let local = task::LocalSet::new();
73 ///
74 /// // Run the local task set.
75 /// local.run_until(async move {
76 /// let nonsend_data = nonsend_data.clone();
77 /// // `spawn_local` ensures that the future is spawned on the local
78 /// // task set.
79 /// task::spawn_local(async move {
80 /// println!("{}", nonsend_data);
81 /// // ...
82 /// }).await.unwrap();
83 /// }).await;
84 /// }
85 /// ```
86 /// **Note:** The `run_until` method can only be used in `#[tokio::main]`,
87 /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It
88 /// cannot be used inside a task spawned with `tokio::spawn`.
89 ///
90 /// ## Awaiting a `LocalSet`
91 ///
92 /// Additionally, a `LocalSet` itself implements `Future`, completing when
93 /// *all* tasks spawned on the `LocalSet` complete. This can be used to run
94 /// several futures on a `LocalSet` and drive the whole set until they
95 /// complete. For example,
96 ///
97 /// ```rust
98 /// use tokio::{task, time};
99 /// use std::rc::Rc;
100 ///
101 /// #[tokio::main]
102 /// async fn main() {
103 /// let nonsend_data = Rc::new("world");
104 /// let local = task::LocalSet::new();
105 ///
106 /// let nonsend_data2 = nonsend_data.clone();
107 /// local.spawn_local(async move {
108 /// // ...
109 /// println!("hello {}", nonsend_data2)
110 /// });
111 ///
112 /// local.spawn_local(async move {
113 /// time::sleep(time::Duration::from_millis(100)).await;
114 /// println!("goodbye {}", nonsend_data)
115 /// });
116 ///
117 /// // ...
118 ///
119 /// local.await;
120 /// }
121 /// ```
122 /// **Note:** Awaiting a `LocalSet` can only be done inside
123 /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to
124 /// [`Runtime::block_on`]. It cannot be used inside a task spawned with
125 /// `tokio::spawn`.
126 ///
127 /// ## Use inside `tokio::spawn`
128 ///
129 /// The two methods mentioned above cannot be used inside `tokio::spawn`, so
130 /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do
131 /// something else. The solution is to create the `LocalSet` somewhere else,
132 /// and communicate with it using an [`mpsc`] channel.
133 ///
134 /// The following example puts the `LocalSet` inside a new thread.
135 /// ```
136 /// use tokio::runtime::Builder;
137 /// use tokio::sync::{mpsc, oneshot};
138 /// use tokio::task::LocalSet;
139 ///
140 /// // This struct describes the task you want to spawn. Here we include
141 /// // some simple examples. The oneshot channel allows sending a response
142 /// // to the spawner.
143 /// #[derive(Debug)]
144 /// enum Task {
145 /// PrintNumber(u32),
146 /// AddOne(u32, oneshot::Sender<u32>),
147 /// }
148 ///
149 /// #[derive(Clone)]
150 /// struct LocalSpawner {
151 /// send: mpsc::UnboundedSender<Task>,
152 /// }
153 ///
154 /// impl LocalSpawner {
155 /// pub fn new() -> Self {
156 /// let (send, mut recv) = mpsc::unbounded_channel();
157 ///
158 /// let rt = Builder::new_current_thread()
159 /// .enable_all()
160 /// .build()
161 /// .unwrap();
162 ///
163 /// std::thread::spawn(move || {
164 /// let local = LocalSet::new();
165 ///
166 /// local.spawn_local(async move {
167 /// while let Some(new_task) = recv.recv().await {
168 /// tokio::task::spawn_local(run_task(new_task));
169 /// }
170 /// // If the while loop returns, then all the LocalSpawner
171 /// // objects have been dropped.
172 /// });
173 ///
174 /// // This will return once all senders are dropped and all
175 /// // spawned tasks have returned.
176 /// rt.block_on(local);
177 /// });
178 ///
179 /// Self {
180 /// send,
181 /// }
182 /// }
183 ///
184 /// pub fn spawn(&self, task: Task) {
185 /// self.send.send(task).expect("Thread with LocalSet has shut down.");
186 /// }
187 /// }
188 ///
189 /// // This task may do !Send stuff. We use printing a number as an example,
190 /// // but it could be anything.
191 /// //
192 /// // The Task struct is an enum to support spawning many different kinds
193 /// // of operations.
194 /// async fn run_task(task: Task) {
195 /// match task {
196 /// Task::PrintNumber(n) => {
197 /// println!("{}", n);
198 /// },
199 /// Task::AddOne(n, response) => {
200 /// // We ignore failures to send the response.
201 /// let _ = response.send(n + 1);
202 /// },
203 /// }
204 /// }
205 ///
206 /// #[tokio::main]
207 /// async fn main() {
208 /// let spawner = LocalSpawner::new();
209 ///
210 /// let (send, response) = oneshot::channel();
211 /// spawner.spawn(Task::AddOne(10, send));
212 /// let eleven = response.await.unwrap();
213 /// assert_eq!(eleven, 11);
214 /// }
215 /// ```
216 ///
217 /// [`Send`]: trait@std::marker::Send
218 /// [local task set]: struct@LocalSet
219 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
220 /// [`task::spawn_local`]: fn@spawn_local
221 /// [`mpsc`]: mod@crate::sync::mpsc
222 pub struct LocalSet {
223 /// Current scheduler tick.
224 tick: Cell<u8>,
225
226 /// State available from thread-local.
227 context: Rc<Context>,
228
229 /// This type should not be Send.
230 _not_send: PhantomData<*const ()>,
231 }
232}
233
234/// State available from the thread-local.
235struct Context {
236 /// State shared between threads.
237 shared: Arc<Shared>,
238
239 /// True if a task panicked without being handled and the local set is
240 /// configured to shutdown on unhandled panic.
241 unhandled_panic: Cell<bool>,
242}
243
244/// `LocalSet` state shared between threads.
245struct Shared {
246 /// # Safety
247 ///
248 /// This field must *only* be accessed from the thread that owns the
249 /// `LocalSet` (i.e., `Thread::current().id() == owner`).
250 local_state: LocalState,
251
252 /// Remote run queue sender.
253 queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
254
255 /// Wake the `LocalSet` task.
256 waker: AtomicWaker,
257
258 /// How to respond to unhandled task panics.
259 #[cfg(tokio_unstable)]
260 pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
261}
262
263/// Tracks the `LocalSet` state that must only be accessed from the thread that
264/// created the `LocalSet`.
265struct LocalState {
266 /// The `ThreadId` of the thread that owns the `LocalSet`.
267 owner: ThreadId,
268
269 /// Local run queue sender and receiver.
270 local_queue: UnsafeCell<VecDeque<task::Notified<Arc<Shared>>>>,
271
272 /// Collection of all active tasks spawned onto this executor.
273 owned: LocalOwnedTasks<Arc<Shared>>,
274}
275
276pin_project! {
277 #[derive(Debug)]
278 struct RunUntil<'a, F> {
279 local_set: &'a LocalSet,
280 #[pin]
281 future: F,
282 }
283}
284
285tokio_thread_local!(static CURRENT: LocalData = const { LocalData {
286 ctx: RcCell::new(),
287 wake_on_schedule: Cell::new(false),
288} });
289
290struct LocalData {
291 ctx: RcCell<Context>,
292 wake_on_schedule: Cell<bool>,
293}
294
295impl LocalData {
296 /// Should be called except when we call `LocalSet::enter`.
297 /// Especially when we poll a `LocalSet`.
298 #[must_use = "dropping this guard will reset the entered state"]
299 fn enter(&self, ctx: Rc<Context>) -> LocalDataEnterGuard<'_> {
300 let ctx = self.ctx.replace(Some(ctx));
301 let wake_on_schedule = self.wake_on_schedule.replace(false);
302 LocalDataEnterGuard {
303 local_data_ref: self,
304 ctx,
305 wake_on_schedule,
306 }
307 }
308}
309
310/// A guard for `LocalData::enter()`
311struct LocalDataEnterGuard<'a> {
312 local_data_ref: &'a LocalData,
313 ctx: Option<Rc<Context>>,
314 wake_on_schedule: bool,
315}
316
317impl<'a> Drop for LocalDataEnterGuard<'a> {
318 fn drop(&mut self) {
319 self.local_data_ref.ctx.set(self.ctx.take());
320 self.local_data_ref
321 .wake_on_schedule
322 .set(self.wake_on_schedule)
323 }
324}
325
326cfg_rt! {
327 /// Spawns a `!Send` future on the current [`LocalSet`] or [`LocalRuntime`].
328 ///
329 /// The spawned future will run on the same thread that called `spawn_local`.
330 ///
331 /// The provided future will start running in the background immediately
332 /// when `spawn_local` is called, even if you don't await the returned
333 /// `JoinHandle`.
334 ///
335 /// # Panics
336 ///
337 /// This function panics if called outside of a [`LocalSet`].
338 ///
339 /// Note that if [`tokio::spawn`] is used from within a `LocalSet`, the
340 /// resulting new task will _not_ be inside the `LocalSet`, so you must use
341 /// `spawn_local` if you want to stay within the `LocalSet`.
342 ///
343 /// # Examples
344 ///
345 /// ```rust
346 /// use std::rc::Rc;
347 /// use tokio::task;
348 ///
349 /// #[tokio::main]
350 /// async fn main() {
351 /// let nonsend_data = Rc::new("my nonsend data...");
352 ///
353 /// let local = task::LocalSet::new();
354 ///
355 /// // Run the local task set.
356 /// local.run_until(async move {
357 /// let nonsend_data = nonsend_data.clone();
358 /// task::spawn_local(async move {
359 /// println!("{}", nonsend_data);
360 /// // ...
361 /// }).await.unwrap();
362 /// }).await;
363 /// }
364 /// ```
365 ///
366 /// [`LocalSet`]: struct@crate::task::LocalSet
367 /// [`LocalRuntime`]: struct@crate::runtime::LocalRuntime
368 /// [`tokio::spawn`]: fn@crate::task::spawn
369 #[track_caller]
370 pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
371 where
372 F: Future + 'static,
373 F::Output: 'static,
374 {
375 let fut_size = std::mem::size_of::<F>();
376 if fut_size > BOX_FUTURE_THRESHOLD {
377 spawn_local_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
378 } else {
379 spawn_local_inner(future, SpawnMeta::new_unnamed(fut_size))
380 }
381 }
382
383
384 #[track_caller]
385 pub(super) fn spawn_local_inner<F>(future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
386 where F: Future + 'static,
387 F::Output: 'static
388 {
389 use crate::runtime::{context, task};
390
391 let mut future = Some(future);
392
393 let res = context::with_current(|handle| {
394 Some(if handle.is_local() {
395 if !handle.can_spawn_local_on_local_runtime() {
396 return None;
397 }
398
399 let future = future.take().unwrap();
400
401 #[cfg(all(
402 tokio_unstable,
403 tokio_taskdump,
404 feature = "rt",
405 target_os = "linux",
406 any(
407 target_arch = "aarch64",
408 target_arch = "x86",
409 target_arch = "x86_64"
410 )
411 ))]
412 let future = task::trace::Trace::root(future);
413 let id = task::Id::next();
414 let task = crate::util::trace::task(future, "task", meta, id.as_u64());
415
416 // safety: we have verified that this is a `LocalRuntime` owned by the current thread
417 unsafe { handle.spawn_local(task, id, meta.spawned_at) }
418 } else {
419 match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
420 None => panic!("`spawn_local` called from outside of a `task::LocalSet` or LocalRuntime"),
421 Some(cx) => cx.spawn(future.take().unwrap(), meta)
422 }
423 })
424 });
425
426 match res {
427 Ok(None) => panic!("Local tasks can only be spawned on a LocalRuntime from the thread the runtime was created on"),
428 Ok(Some(join_handle)) => join_handle,
429 Err(_) => match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
430 None => panic!("`spawn_local` called from outside of a `task::LocalSet` or LocalRuntime"),
431 Some(cx) => cx.spawn(future.unwrap(), meta)
432 }
433 }
434 }
435}
436
437/// Initial queue capacity.
438const INITIAL_CAPACITY: usize = 64;
439
440/// Max number of tasks to poll per tick.
441const MAX_TASKS_PER_TICK: usize = 61;
442
443/// How often it check the remote queue first.
444const REMOTE_FIRST_INTERVAL: u8 = 31;
445
446/// Context guard for `LocalSet`
447pub struct LocalEnterGuard {
448 ctx: Option<Rc<Context>>,
449
450 /// Distinguishes whether the context was entered or being polled.
451 /// When we enter it, the value `wake_on_schedule` is set. In this case
452 /// `spawn_local` refers the context, whereas it is not being polled now.
453 wake_on_schedule: bool,
454}
455
456impl Drop for LocalEnterGuard {
457 fn drop(&mut self) {
458 CURRENT.with(
459 |LocalData {
460 ctx,
461 wake_on_schedule,
462 }| {
463 ctx.set(self.ctx.take());
464 wake_on_schedule.set(self.wake_on_schedule);
465 },
466 );
467 }
468}
469
470impl fmt::Debug for LocalEnterGuard {
471 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
472 f.debug_struct("LocalEnterGuard").finish()
473 }
474}
475
476impl LocalSet {
477 /// Returns a new local task set.
478 pub fn new() -> LocalSet {
479 let owner = context::thread_id().expect("cannot create LocalSet during thread shutdown");
480
481 LocalSet {
482 tick: Cell::new(0),
483 context: Rc::new(Context {
484 shared: Arc::new(Shared {
485 local_state: LocalState {
486 owner,
487 owned: LocalOwnedTasks::new(),
488 local_queue: UnsafeCell::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
489 },
490 queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
491 waker: AtomicWaker::new(),
492 #[cfg(tokio_unstable)]
493 unhandled_panic: crate::runtime::UnhandledPanic::Ignore,
494 }),
495 unhandled_panic: Cell::new(false),
496 }),
497 _not_send: PhantomData,
498 }
499 }
500
501 /// Enters the context of this `LocalSet`.
502 ///
503 /// The [`spawn_local`] method will spawn tasks on the `LocalSet` whose
504 /// context you are inside.
505 ///
506 /// [`spawn_local`]: fn@crate::task::spawn_local
507 pub fn enter(&self) -> LocalEnterGuard {
508 CURRENT.with(
509 |LocalData {
510 ctx,
511 wake_on_schedule,
512 ..
513 }| {
514 let ctx = ctx.replace(Some(self.context.clone()));
515 let wake_on_schedule = wake_on_schedule.replace(true);
516 LocalEnterGuard {
517 ctx,
518 wake_on_schedule,
519 }
520 },
521 )
522 }
523
524 /// Spawns a `!Send` task onto the local task set.
525 ///
526 /// This task is guaranteed to be run on the current thread.
527 ///
528 /// Unlike the free function [`spawn_local`], this method may be used to
529 /// spawn local tasks when the `LocalSet` is _not_ running. The provided
530 /// future will start running once the `LocalSet` is next started, even if
531 /// you don't await the returned `JoinHandle`.
532 ///
533 /// # Examples
534 ///
535 /// ```rust
536 /// use tokio::task;
537 ///
538 /// #[tokio::main]
539 /// async fn main() {
540 /// let local = task::LocalSet::new();
541 ///
542 /// // Spawn a future on the local set. This future will be run when
543 /// // we call `run_until` to drive the task set.
544 /// local.spawn_local(async {
545 /// // ...
546 /// });
547 ///
548 /// // Run the local task set.
549 /// local.run_until(async move {
550 /// // ...
551 /// }).await;
552 ///
553 /// // When `run` finishes, we can spawn _more_ futures, which will
554 /// // run in subsequent calls to `run_until`.
555 /// local.spawn_local(async {
556 /// // ...
557 /// });
558 ///
559 /// local.run_until(async move {
560 /// // ...
561 /// }).await;
562 /// }
563 /// ```
564 /// [`spawn_local`]: fn@spawn_local
565 #[track_caller]
566 pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
567 where
568 F: Future + 'static,
569 F::Output: 'static,
570 {
571 let fut_size = mem::size_of::<F>();
572 if fut_size > BOX_FUTURE_THRESHOLD {
573 self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
574 } else {
575 self.spawn_named(future, SpawnMeta::new_unnamed(fut_size))
576 }
577 }
578
579 /// Runs a future to completion on the provided runtime, driving any local
580 /// futures spawned on this task set on the current thread.
581 ///
582 /// This runs the given future on the runtime, blocking until it is
583 /// complete, and yielding its resolved result. Any tasks or timers which
584 /// the future spawns internally will be executed on the runtime. The future
585 /// may also call [`spawn_local`] to `spawn_local` additional local futures on the
586 /// current thread.
587 ///
588 /// This method should not be called from an asynchronous context.
589 ///
590 /// # Panics
591 ///
592 /// This function panics if the executor is at capacity, if the provided
593 /// future panics, or if called within an asynchronous execution context.
594 ///
595 /// # Notes
596 ///
597 /// Since this function internally calls [`Runtime::block_on`], and drives
598 /// futures in the local task set inside that call to `block_on`, the local
599 /// futures may not use [in-place blocking]. If a blocking call needs to be
600 /// issued from a local task, the [`spawn_blocking`] API may be used instead.
601 ///
602 /// For example, this will panic:
603 /// ```should_panic
604 /// use tokio::runtime::Runtime;
605 /// use tokio::task;
606 ///
607 /// let rt = Runtime::new().unwrap();
608 /// let local = task::LocalSet::new();
609 /// local.block_on(&rt, async {
610 /// let join = task::spawn_local(async {
611 /// let blocking_result = task::block_in_place(|| {
612 /// // ...
613 /// });
614 /// // ...
615 /// });
616 /// join.await.unwrap();
617 /// })
618 /// ```
619 /// This, however, will not panic:
620 /// ```
621 /// use tokio::runtime::Runtime;
622 /// use tokio::task;
623 ///
624 /// let rt = Runtime::new().unwrap();
625 /// let local = task::LocalSet::new();
626 /// local.block_on(&rt, async {
627 /// let join = task::spawn_local(async {
628 /// let blocking_result = task::spawn_blocking(|| {
629 /// // ...
630 /// }).await;
631 /// // ...
632 /// });
633 /// join.await.unwrap();
634 /// })
635 /// ```
636 ///
637 /// [`spawn_local`]: fn@spawn_local
638 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
639 /// [in-place blocking]: fn@crate::task::block_in_place
640 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
641 #[track_caller]
642 #[cfg(feature = "rt")]
643 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
644 pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output
645 where
646 F: Future,
647 {
648 rt.block_on(self.run_until(future))
649 }
650
651 /// Runs a future to completion on the local set, returning its output.
652 ///
653 /// This returns a future that runs the given future with a local set,
654 /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
655 /// Any local futures spawned on the local set will be driven in the
656 /// background until the future passed to `run_until` completes. When the future
657 /// passed to `run_until` finishes, any local futures which have not completed
658 /// will remain on the local set, and will be driven on subsequent calls to
659 /// `run_until` or when [awaiting the local set] itself.
660 ///
661 /// # Cancel safety
662 ///
663 /// This method is cancel safe when `future` is cancel safe.
664 ///
665 /// # Examples
666 ///
667 /// ```rust
668 /// use tokio::task;
669 ///
670 /// #[tokio::main]
671 /// async fn main() {
672 /// task::LocalSet::new().run_until(async {
673 /// task::spawn_local(async move {
674 /// // ...
675 /// }).await.unwrap();
676 /// // ...
677 /// }).await;
678 /// }
679 /// ```
680 ///
681 /// [`spawn_local`]: fn@spawn_local
682 /// [awaiting the local set]: #awaiting-a-localset
683 pub async fn run_until<F>(&self, future: F) -> F::Output
684 where
685 F: Future,
686 {
687 let run_until = RunUntil {
688 future,
689 local_set: self,
690 };
691 run_until.await
692 }
693
694 #[track_caller]
695 pub(in crate::task) fn spawn_named<F>(
696 &self,
697 future: F,
698 meta: SpawnMeta<'_>,
699 ) -> JoinHandle<F::Output>
700 where
701 F: Future + 'static,
702 F::Output: 'static,
703 {
704 self.spawn_named_inner(future, meta)
705 }
706
707 #[track_caller]
708 fn spawn_named_inner<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
709 where
710 F: Future + 'static,
711 F::Output: 'static,
712 {
713 let handle = self.context.spawn(future, meta);
714
715 // Because a task was spawned from *outside* the `LocalSet`, wake the
716 // `LocalSet` future to execute the new task, if it hasn't been woken.
717 //
718 // Spawning via the free fn `spawn` does not require this, as it can
719 // only be called from *within* a future executing on the `LocalSet` —
720 // in that case, the `LocalSet` must already be awake.
721 self.context.shared.waker.wake();
722 handle
723 }
724
725 /// Ticks the scheduler, returning whether the local future needs to be
726 /// notified again.
727 fn tick(&self) -> bool {
728 for _ in 0..MAX_TASKS_PER_TICK {
729 // Make sure we didn't hit an unhandled panic
730 assert!(!self.context.unhandled_panic.get(), "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic");
731
732 match self.next_task() {
733 // Run the task
734 //
735 // Safety: As spawned tasks are `!Send`, `run_unchecked` must be
736 // used. We are responsible for maintaining the invariant that
737 // `run_unchecked` is only called on threads that spawned the
738 // task initially. Because `LocalSet` itself is `!Send`, and
739 // `spawn_local` spawns into the `LocalSet` on the current
740 // thread, the invariant is maintained.
741 Some(task) => crate::task::coop::budget(|| task.run()),
742 // We have fully drained the queue of notified tasks, so the
743 // local future doesn't need to be notified again — it can wait
744 // until something else wakes a task in the local set.
745 None => return false,
746 }
747 }
748
749 true
750 }
751
752 fn next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>> {
753 let tick = self.tick.get();
754 self.tick.set(tick.wrapping_add(1));
755
756 let task = if tick % REMOTE_FIRST_INTERVAL == 0 {
757 self.context
758 .shared
759 .queue
760 .lock()
761 .as_mut()
762 .and_then(|queue| queue.pop_front())
763 .or_else(|| self.pop_local())
764 } else {
765 self.pop_local().or_else(|| {
766 self.context
767 .shared
768 .queue
769 .lock()
770 .as_mut()
771 .and_then(VecDeque::pop_front)
772 })
773 };
774
775 task.map(|task| unsafe {
776 // Safety: because the `LocalSet` itself is `!Send`, we know we are
777 // on the same thread if we have access to the `LocalSet`, and can
778 // therefore access the local run queue.
779 self.context.shared.local_state.assert_owner(task)
780 })
781 }
782
783 fn pop_local(&self) -> Option<task::Notified<Arc<Shared>>> {
784 unsafe {
785 // Safety: because the `LocalSet` itself is `!Send`, we know we are
786 // on the same thread if we have access to the `LocalSet`, and can
787 // therefore access the local run queue.
788 self.context.shared.local_state.task_pop_front()
789 }
790 }
791
792 fn with<T>(&self, f: impl FnOnce() -> T) -> T {
793 CURRENT.with(|local_data| {
794 let _guard = local_data.enter(self.context.clone());
795 f()
796 })
797 }
798
799 /// This method is like `with`, but it just calls `f` without setting the thread-local if that
800 /// fails.
801 fn with_if_possible<T>(&self, f: impl FnOnce() -> T) -> T {
802 let mut f = Some(f);
803
804 let res = CURRENT.try_with(|local_data| {
805 let _guard = local_data.enter(self.context.clone());
806 (f.take().unwrap())()
807 });
808
809 match res {
810 Ok(res) => res,
811 Err(_access_error) => (f.take().unwrap())(),
812 }
813 }
814}
815
816cfg_unstable! {
817 impl LocalSet {
818 /// Configure how the `LocalSet` responds to an unhandled panic on a
819 /// spawned task.
820 ///
821 /// By default, an unhandled panic (i.e. a panic not caught by
822 /// [`std::panic::catch_unwind`]) has no impact on the `LocalSet`'s
823 /// execution. The panic is error value is forwarded to the task's
824 /// [`JoinHandle`] and all other spawned tasks continue running.
825 ///
826 /// The `unhandled_panic` option enables configuring this behavior.
827 ///
828 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
829 /// spawned tasks have no impact on the `LocalSet`'s execution.
830 /// * `UnhandledPanic::ShutdownRuntime` will force the `LocalSet` to
831 /// shutdown immediately when a spawned task panics even if that
832 /// task's `JoinHandle` has not been dropped. All other spawned tasks
833 /// will immediately terminate and further calls to
834 /// [`LocalSet::block_on`] and [`LocalSet::run_until`] will panic.
835 ///
836 /// # Panics
837 ///
838 /// This method panics if called after the `LocalSet` has started
839 /// running.
840 ///
841 /// # Unstable
842 ///
843 /// This option is currently unstable and its implementation is
844 /// incomplete. The API may change or be removed in the future. See
845 /// tokio-rs/tokio#4516 for more details.
846 ///
847 /// # Examples
848 ///
849 /// The following demonstrates a `LocalSet` configured to shutdown on
850 /// panic. The first spawned task panics and results in the `LocalSet`
851 /// shutting down. The second spawned task never has a chance to
852 /// execute. The call to `run_until` will panic due to the runtime being
853 /// forcibly shutdown.
854 ///
855 /// ```should_panic
856 /// use tokio::runtime::UnhandledPanic;
857 ///
858 /// # #[tokio::main]
859 /// # async fn main() {
860 /// tokio::task::LocalSet::new()
861 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
862 /// .run_until(async {
863 /// tokio::task::spawn_local(async { panic!("boom"); });
864 /// tokio::task::spawn_local(async {
865 /// // This task never completes
866 /// });
867 ///
868 /// // Do some work, but `run_until` will panic before it completes
869 /// # loop { tokio::task::yield_now().await; }
870 /// })
871 /// .await;
872 /// # }
873 /// ```
874 ///
875 /// [`JoinHandle`]: struct@crate::task::JoinHandle
876 pub fn unhandled_panic(&mut self, behavior: crate::runtime::UnhandledPanic) -> &mut Self {
877 // TODO: This should be set as a builder
878 Rc::get_mut(&mut self.context)
879 .and_then(|ctx| Arc::get_mut(&mut ctx.shared))
880 .expect("Unhandled Panic behavior modified after starting LocalSet")
881 .unhandled_panic = behavior;
882 self
883 }
884
885 /// Returns the [`Id`] of the current `LocalSet` runtime.
886 ///
887 /// # Examples
888 ///
889 /// ```rust
890 /// use tokio::task;
891 ///
892 /// #[tokio::main]
893 /// async fn main() {
894 /// let local_set = task::LocalSet::new();
895 /// println!("Local set id: {}", local_set.id());
896 /// }
897 /// ```
898 ///
899 /// **Note**: This is an [unstable API][unstable]. The public API of this type
900 /// may break in 1.x releases. See [the documentation on unstable
901 /// features][unstable] for details.
902 ///
903 /// [unstable]: crate#unstable-features
904 /// [`Id`]: struct@crate::runtime::Id
905 pub fn id(&self) -> runtime::Id {
906 self.context.shared.local_state.owned.id.into()
907 }
908 }
909}
910
911impl fmt::Debug for LocalSet {
912 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
913 fmt.debug_struct("LocalSet").finish()
914 }
915}
916
917impl Future for LocalSet {
918 type Output = ();
919
920 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
921 let _no_blocking = crate::runtime::context::disallow_block_in_place();
922
923 // Register the waker before starting to work
924 self.context.shared.waker.register_by_ref(cx.waker());
925
926 if self.with(|| self.tick()) {
927 // If `tick` returns true, we need to notify the local future again:
928 // there are still tasks remaining in the run queue.
929 cx.waker().wake_by_ref();
930 Poll::Pending
931
932 // Safety: called from the thread that owns `LocalSet`. Because
933 // `LocalSet` is `!Send`, this is safe.
934 } else if unsafe { self.context.shared.local_state.owned_is_empty() } {
935 // If the scheduler has no remaining futures, we're done!
936 Poll::Ready(())
937 } else {
938 // There are still futures in the local set, but we've polled all the
939 // futures in the run queue. Therefore, we can just return Pending
940 // since the remaining futures will be woken from somewhere else.
941 Poll::Pending
942 }
943 }
944}
945
946impl Default for LocalSet {
947 fn default() -> LocalSet {
948 LocalSet::new()
949 }
950}
951
952impl Drop for LocalSet {
953 fn drop(&mut self) {
954 self.with_if_possible(|| {
955 let _no_blocking = crate::runtime::context::disallow_block_in_place();
956
957 // Shut down all tasks in the LocalOwnedTasks and close it to
958 // prevent new tasks from ever being added.
959 unsafe {
960 // Safety: called from the thread that owns `LocalSet`
961 self.context.shared.local_state.close_and_shutdown_all();
962 }
963
964 // We already called shutdown on all tasks above, so there is no
965 // need to call shutdown.
966
967 // Safety: note that this *intentionally* bypasses the unsafe
968 // `Shared::local_queue()` method. This is in order to avoid the
969 // debug assertion that we are on the thread that owns the
970 // `LocalSet`, because on some systems (e.g. at least some macOS
971 // versions), attempting to get the current thread ID can panic due
972 // to the thread's local data that stores the thread ID being
973 // dropped *before* the `LocalSet`.
974 //
975 // Despite avoiding the assertion here, it is safe for us to access
976 // the local queue in `Drop`, because the `LocalSet` itself is
977 // `!Send`, so we can reasonably guarantee that it will not be
978 // `Drop`ped from another thread.
979 let local_queue = unsafe {
980 // Safety: called from the thread that owns `LocalSet`
981 self.context.shared.local_state.take_local_queue()
982 };
983 for task in local_queue {
984 drop(task);
985 }
986
987 // Take the queue from the Shared object to prevent pushing
988 // notifications to it in the future.
989 let queue = self.context.shared.queue.lock().take().unwrap();
990 for task in queue {
991 drop(task);
992 }
993
994 // Safety: called from the thread that owns `LocalSet`
995 assert!(unsafe { self.context.shared.local_state.owned_is_empty() });
996 });
997 }
998}
999
1000// === impl Context ===
1001
1002impl Context {
1003 #[track_caller]
1004 fn spawn<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
1005 where
1006 F: Future + 'static,
1007 F::Output: 'static,
1008 {
1009 let id = crate::runtime::task::Id::next();
1010 let future = crate::util::trace::task(future, "local", meta, id.as_u64());
1011
1012 // Safety: called from the thread that owns the `LocalSet`
1013 let (handle, notified) = {
1014 self.shared.local_state.assert_called_from_owner_thread();
1015 self.shared.local_state.owned.bind(
1016 future,
1017 self.shared.clone(),
1018 id,
1019 SpawnLocation::capture(),
1020 )
1021 };
1022
1023 if let Some(notified) = notified {
1024 self.shared.schedule(notified);
1025 }
1026
1027 handle
1028 }
1029}
1030
1031// === impl LocalFuture ===
1032
1033impl<T: Future> Future for RunUntil<'_, T> {
1034 type Output = T::Output;
1035
1036 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1037 let me = self.project();
1038
1039 me.local_set.with(|| {
1040 me.local_set
1041 .context
1042 .shared
1043 .waker
1044 .register_by_ref(cx.waker());
1045
1046 let _no_blocking = crate::runtime::context::disallow_block_in_place();
1047 let f = me.future;
1048
1049 if let Poll::Ready(output) = f.poll(cx) {
1050 return Poll::Ready(output);
1051 }
1052
1053 if me.local_set.tick() {
1054 // If `tick` returns `true`, we need to notify the local future again:
1055 // there are still tasks remaining in the run queue.
1056 cx.waker().wake_by_ref();
1057 }
1058
1059 Poll::Pending
1060 })
1061 }
1062}
1063
1064impl Shared {
1065 /// Schedule the provided task on the scheduler.
1066 fn schedule(&self, task: task::Notified<Arc<Self>>) {
1067 CURRENT.with(|localdata| {
1068 match localdata.ctx.get() {
1069 // If the current `LocalSet` is being polled, we don't need to wake it.
1070 // When we `enter` it, then the value `wake_on_schedule` is set to be true.
1071 // In this case it is not being polled, so we need to wake it.
1072 Some(cx) if cx.shared.ptr_eq(self) && !localdata.wake_on_schedule.get() => unsafe {
1073 // Safety: if the current `LocalSet` context points to this
1074 // `LocalSet`, then we are on the thread that owns it.
1075 cx.shared.local_state.task_push_back(task);
1076 },
1077
1078 // We are on the thread that owns the `LocalSet`, so we can
1079 // wake to the local queue.
1080 _ if context::thread_id().ok() == Some(self.local_state.owner) => {
1081 unsafe {
1082 // Safety: we just checked that the thread ID matches
1083 // the localset's owner, so this is safe.
1084 self.local_state.task_push_back(task);
1085 }
1086 // We still have to wake the `LocalSet`, because it isn't
1087 // currently being polled.
1088 self.waker.wake();
1089 }
1090
1091 // We are *not* on the thread that owns the `LocalSet`, so we
1092 // have to wake to the remote queue.
1093 _ => {
1094 // First, check whether the queue is still there (if not, the
1095 // LocalSet is dropped). Then push to it if so, and if not,
1096 // do nothing.
1097 let mut lock = self.queue.lock();
1098
1099 if let Some(queue) = lock.as_mut() {
1100 queue.push_back(task);
1101 drop(lock);
1102 self.waker.wake();
1103 }
1104 }
1105 }
1106 });
1107 }
1108
1109 fn ptr_eq(&self, other: &Shared) -> bool {
1110 std::ptr::eq(self, other)
1111 }
1112}
1113
1114// This is safe because (and only because) we *pinky pwomise* to never touch the
1115// local run queue except from the thread that owns the `LocalSet`.
1116unsafe impl Sync for Shared {}
1117
1118impl task::Schedule for Arc<Shared> {
1119 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
1120 // Safety, this is always called from the thread that owns `LocalSet`
1121 unsafe { self.local_state.task_remove(task) }
1122 }
1123
1124 fn schedule(&self, task: task::Notified<Self>) {
1125 Shared::schedule(self, task);
1126 }
1127
1128 // localset does not currently support task hooks
1129 fn hooks(&self) -> TaskHarnessScheduleHooks {
1130 TaskHarnessScheduleHooks {
1131 task_terminate_callback: None,
1132 }
1133 }
1134
1135 cfg_unstable! {
1136 fn unhandled_panic(&self) {
1137 use crate::runtime::UnhandledPanic;
1138
1139 match self.unhandled_panic {
1140 UnhandledPanic::Ignore => {
1141 // Do nothing
1142 }
1143 UnhandledPanic::ShutdownRuntime => {
1144 // This hook is only called from within the runtime, so
1145 // `CURRENT` should match with `&self`, i.e. there is no
1146 // opportunity for a nested scheduler to be called.
1147 CURRENT.with(|LocalData { ctx, .. }| match ctx.get() {
1148 Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
1149 cx.unhandled_panic.set(true);
1150 // Safety: this is always called from the thread that owns `LocalSet`
1151 unsafe { cx.shared.local_state.close_and_shutdown_all(); }
1152 }
1153 _ => unreachable!("runtime core not set in CURRENT thread-local"),
1154 })
1155 }
1156 }
1157 }
1158 }
1159}
1160
1161impl LocalState {
1162 unsafe fn task_pop_front(&self) -> Option<task::Notified<Arc<Shared>>> {
1163 // The caller ensures it is called from the same thread that owns
1164 // the LocalSet.
1165 self.assert_called_from_owner_thread();
1166
1167 self.local_queue.with_mut(|ptr| (*ptr).pop_front())
1168 }
1169
1170 unsafe fn task_push_back(&self, task: task::Notified<Arc<Shared>>) {
1171 // The caller ensures it is called from the same thread that owns
1172 // the LocalSet.
1173 self.assert_called_from_owner_thread();
1174
1175 self.local_queue.with_mut(|ptr| (*ptr).push_back(task));
1176 }
1177
1178 unsafe fn take_local_queue(&self) -> VecDeque<task::Notified<Arc<Shared>>> {
1179 // The caller ensures it is called from the same thread that owns
1180 // the LocalSet.
1181 self.assert_called_from_owner_thread();
1182
1183 self.local_queue.with_mut(|ptr| std::mem::take(&mut (*ptr)))
1184 }
1185
1186 unsafe fn task_remove(&self, task: &Task<Arc<Shared>>) -> Option<Task<Arc<Shared>>> {
1187 // The caller ensures it is called from the same thread that owns
1188 // the LocalSet.
1189 self.assert_called_from_owner_thread();
1190
1191 self.owned.remove(task)
1192 }
1193
1194 /// Returns true if the `LocalSet` does not have any spawned tasks
1195 unsafe fn owned_is_empty(&self) -> bool {
1196 // The caller ensures it is called from the same thread that owns
1197 // the LocalSet.
1198 self.assert_called_from_owner_thread();
1199
1200 self.owned.is_empty()
1201 }
1202
1203 unsafe fn assert_owner(
1204 &self,
1205 task: task::Notified<Arc<Shared>>,
1206 ) -> task::LocalNotified<Arc<Shared>> {
1207 // The caller ensures it is called from the same thread that owns
1208 // the LocalSet.
1209 self.assert_called_from_owner_thread();
1210
1211 self.owned.assert_owner(task)
1212 }
1213
1214 unsafe fn close_and_shutdown_all(&self) {
1215 // The caller ensures it is called from the same thread that owns
1216 // the LocalSet.
1217 self.assert_called_from_owner_thread();
1218
1219 self.owned.close_and_shutdown_all();
1220 }
1221
1222 #[track_caller]
1223 fn assert_called_from_owner_thread(&self) {
1224 // FreeBSD has some weirdness around thread-local destruction.
1225 // TODO: remove this hack when thread id is cleaned up
1226 #[cfg(not(any(target_os = "openbsd", target_os = "freebsd")))]
1227 debug_assert!(
1228 // if we couldn't get the thread ID because we're dropping the local
1229 // data, skip the assertion --- the `Drop` impl is not going to be
1230 // called from another thread, because `LocalSet` is `!Send`
1231 context::thread_id()
1232 .map(|id| id == self.owner)
1233 .unwrap_or(true),
1234 "`LocalSet`'s local run queue must not be accessed by another thread!"
1235 );
1236 }
1237}
1238
1239// This is `Send` because it is stored in `Shared`. It is up to the caller to
1240// ensure they are on the same thread that owns the `LocalSet`.
1241unsafe impl Send for LocalState {}
1242
1243#[cfg(all(test, not(loom)))]
1244mod tests {
1245 use super::*;
1246
1247 // Does a `LocalSet` running on a current-thread runtime...basically work?
1248 //
1249 // This duplicates a test in `tests/task_local_set.rs`, but because this is
1250 // a lib test, it will run under Miri, so this is necessary to catch stacked
1251 // borrows violations in the `LocalSet` implementation.
1252 #[test]
1253 fn local_current_thread_scheduler() {
1254 let f = async {
1255 LocalSet::new()
1256 .run_until(async {
1257 spawn_local(async {}).await.unwrap();
1258 })
1259 .await;
1260 };
1261 crate::runtime::Builder::new_current_thread()
1262 .build()
1263 .expect("rt")
1264 .block_on(f)
1265 }
1266
1267 // Tests that when a task on a `LocalSet` is woken by an io driver on the
1268 // same thread, the task is woken to the localset's local queue rather than
1269 // its remote queue.
1270 //
1271 // This test has to be defined in the `local.rs` file as a lib test, rather
1272 // than in `tests/`, because it makes assertions about the local set's
1273 // internal state.
1274 #[test]
1275 fn wakes_to_local_queue() {
1276 use super::*;
1277 use crate::sync::Notify;
1278 let rt = crate::runtime::Builder::new_current_thread()
1279 .build()
1280 .expect("rt");
1281 rt.block_on(async {
1282 let local = LocalSet::new();
1283 let notify = Arc::new(Notify::new());
1284 let task = local.spawn_local({
1285 let notify = notify.clone();
1286 async move {
1287 notify.notified().await;
1288 }
1289 });
1290 let mut run_until = Box::pin(local.run_until(async move {
1291 task.await.unwrap();
1292 }));
1293
1294 // poll the run until future once
1295 std::future::poll_fn(|cx| {
1296 let _ = run_until.as_mut().poll(cx);
1297 Poll::Ready(())
1298 })
1299 .await;
1300
1301 notify.notify_one();
1302 let task = unsafe { local.context.shared.local_state.task_pop_front() };
1303 // TODO(eliza): it would be nice to be able to assert that this is
1304 // the local task.
1305 assert!(
1306 task.is_some(),
1307 "task should have been notified to the LocalSet's local queue"
1308 );
1309 })
1310 }
1311}