Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/task.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::pin::Pin;
16
use std::sync::Arc;
17
use std::task::{Context, Poll};
18
19
use futures::Future;
20
use hyper::rt::Executor;
21
use hyper_util::rt::tokio::TokioExecutor;
22
use tokio::task::{spawn_blocking, JoinError, JoinHandle};
23
pub use tracing::error_span as __error_span;
24
use tracing::{Instrument, Span};
25
26
use crate::origin_context::{ActiveOriginContext, ContextAwareFuture, OriginContext};
27
28
1.46k
pub fn __spawn_with_span_and_context<F, T>(
29
1.46k
    f: F,
30
1.46k
    span: Span,
31
1.46k
    ctx: Option<Arc<OriginContext>>,
32
1.46k
) -> JoinHandle<T>
33
1.46k
where
34
1.46k
    T: Send + 'static,
35
1.46k
    F: Future<Output = T> + Send + 'static,
36
1.46k
{
37
1.46k
    #[allow(clippy::disallowed_methods)]
38
1.46k
    tokio::spawn(ContextAwareFuture::new(ctx, f.instrument(span)))
39
1.46k
}
40
41
1.46k
pub fn __spawn_with_span<F, T>(f: F, span: Span) -> JoinHandle<T>
42
1.46k
where
43
1.46k
    T: Send + 'static,
44
1.46k
    F: Future<Output = T> + Send + 'static,
45
1.46k
{
46
1.46k
    __spawn_with_span_and_context(f, span, ActiveOriginContext::get())
47
1.46k
}
48
49
666
pub fn __spawn_blocking<F, T>(f: F, span: Span) -> JoinHandle<T>
50
666
where
51
666
    F: FnOnce() -> T + Send + 'static,
52
666
    T: Send + 'static,
53
666
{
54
666
    #[allow(clippy::disallowed_methods)]
55
666
    spawn_blocking(move || 
span.in_scope(f)665
)
56
666
}
57
58
#[macro_export]
59
macro_rules! background_spawn {
60
    ($name:expr, $fut:expr) => {{
61
        $crate::task::__spawn_with_span($fut, $crate::task::__error_span!($name))
62
    }};
63
    ($name:expr, $fut:expr, $($fields:tt)*) => {{
64
        $crate::task::__spawn_with_span($fut, $crate::task::__error_span!($name, $($fields)*))
65
    }};
66
    (name: $name:expr, fut: $fut:expr, target: $target:expr, $($fields:tt)*) => {{
67
        $crate::task::__spawn_with_span($fut, $crate::task::__error_span!(target: $target, $name, $($fields)*))
68
    }};
69
    (span: $span:expr, ctx: $ctx:expr, fut: $fut:expr) => {{
70
        $crate::task::__spawn_with_span_and_context($fut, $span, $ctx)
71
    }};
72
}
73
74
#[macro_export]
75
macro_rules! spawn {
76
    ($name:expr, $fut:expr) => {{
77
        $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut))
78
    }};
79
    ($name:expr, $fut:expr, $($fields:tt)*) => {{
80
        $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut, $($fields)*))
81
    }};
82
    (name: $name:expr, fut: $fut:expr, target: $target:expr, $($fields:tt)*) => {{
83
        $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut, target: $target, $($fields)*))
84
    }};
85
}
86
87
#[macro_export]
88
macro_rules! spawn_blocking {
89
    ($name:expr, $fut:expr) => {{
90
        $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!($name)))
91
    }};
92
    ($name:expr, $fut:expr, $($fields:tt)*) => {{
93
        $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!($name, $($fields)*)))
94
    }};
95
    ($name:expr, $fut:expr, target: $target:expr) => {{
96
        $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!(target: $target, $name)))
97
    }};
98
    ($name:expr, $fut:expr, target: $target:expr, $($fields:tt)*) => {{
99
        $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!(target: $target, $name, $($fields)*)))
100
    }};
101
}
102
103
/// Simple wrapper that will abort a future that is running in another spawn in the
104
/// event that this handle gets dropped.
105
#[derive(Debug)]
106
#[must_use]
107
pub struct JoinHandleDropGuard<T> {
108
    inner: JoinHandle<T>,
109
}
110
111
impl<T> JoinHandleDropGuard<T> {
112
2.01k
    pub fn new(inner: JoinHandle<T>) -> Self {
113
2.01k
        Self { inner }
114
2.01k
    }
115
}
116
117
impl<T> Future for JoinHandleDropGuard<T> {
118
    type Output = Result<T, JoinError>;
119
120
3.93k
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
121
3.93k
        Pin::new(&mut self.inner).poll(cx)
122
3.93k
    }
123
}
124
125
impl<T> Drop for JoinHandleDropGuard<T> {
126
2.01k
    fn drop(&mut self) {
127
2.01k
        self.inner.abort();
128
2.01k
    }
129
}
130
131
#[derive(Clone)]
132
pub struct TaskExecutor(TokioExecutor);
133
134
impl TaskExecutor {
135
0
    pub fn new() -> Self {
136
0
        Self(TokioExecutor::new())
137
0
    }
138
}
139
140
impl Default for TaskExecutor {
141
0
    fn default() -> Self {
142
0
        Self::new()
143
0
    }
144
}
145
146
impl<F> Executor<F> for TaskExecutor
147
where
148
    F: Future + Send + 'static,
149
    F::Output: Send + 'static,
150
{
151
0
    fn execute(&self, fut: F) {
152
0
        background_spawn!("http_executor", fut);
153
0
    }
154
}