Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/origin_event.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::marker::PhantomData;
16
use std::pin::Pin;
17
use std::sync::{Arc, OnceLock};
18
19
use futures::future::ready;
20
use futures::{Future, FutureExt, Stream, StreamExt};
21
use nativelink_proto::build::bazel::remote::execution::v2::{
22
    ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest,
23
    BatchUpdateBlobsResponse, ExecuteRequest, FindMissingBlobsRequest, FindMissingBlobsResponse,
24
    GetActionResultRequest, GetCapabilitiesRequest, GetTreeRequest, GetTreeResponse,
25
    RequestMetadata, ServerCapabilities, UpdateActionResultRequest, WaitExecutionRequest,
26
};
27
use nativelink_proto::com::github::trace_machina::nativelink::events::{
28
    batch_read_blobs_response_override, batch_update_blobs_request_override, event, request_event,
29
    response_event, stream_event, BatchReadBlobsResponseOverride, BatchUpdateBlobsRequestOverride,
30
    Event, OriginEvent, RequestEvent, ResponseEvent, StreamEvent, WriteRequestOverride,
31
};
32
use nativelink_proto::google::bytestream::{
33
    QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest,
34
    WriteResponse,
35
};
36
use nativelink_proto::google::longrunning::Operation;
37
use nativelink_proto::google::rpc::Status;
38
use rand::RngCore;
39
use tokio::sync::mpsc;
40
use tonic::{Response, Status as TonicStatus, Streaming};
41
use uuid::Uuid;
42
43
use crate::make_symbol;
44
use crate::origin_context::ActiveOriginContext;
45
46
const ORIGIN_EVENT_VERSION: u32 = 0;
47
48
static NODE_ID: OnceLock<[u8; 6]> = OnceLock::new();
49
50
/// Returns a unique ID for the given event.
51
/// This ID is used to identify the event type.
52
/// The max value that could be output is 0x0FFF,
53
/// meaning you may use the first nibble for other
54
/// purposes.
55
#[inline]
56
30
pub fn get_id_for_event(event: &Event) -> [u8; 2] {
57
29
    match &event.event {
58
1
        None => [0x00, 0x00],
59
13
        Some(event::Event::Request(req)) => match 
req.event12
{
60
1
            None => [0x01, 0x00],
61
1
            Some(request_event::Event::GetCapabilitiesRequest(_)) => [0x01, 0x01],
62
1
            Some(request_event::Event::GetActionResultRequest(_)) => [0x01, 0x02],
63
1
            Some(request_event::Event::UpdateActionResultRequest(_)) => [0x01, 0x03],
64
1
            Some(request_event::Event::FindMissingBlobsRequest(_)) => [0x01, 0x04],
65
1
            Some(request_event::Event::BatchReadBlobsRequest(_)) => [0x01, 0x05],
66
1
            Some(request_event::Event::BatchUpdateBlobsRequest(_)) => [0x01, 0x06],
67
1
            Some(request_event::Event::GetTreeRequest(_)) => [0x01, 0x07],
68
1
            Some(request_event::Event::ReadRequest(_)) => [0x01, 0x08],
69
1
            Some(request_event::Event::WriteRequest(())) => [0x01, 0x09],
70
1
            Some(request_event::Event::QueryWriteStatusRequest(_)) => [0x01, 0x0A],
71
1
            Some(request_event::Event::ExecuteRequest(_)) => [0x01, 0x0B],
72
1
            Some(request_event::Event::WaitExecutionRequest(_)) => [0x01, 0x0C],
73
        },
74
10
        Some(event::Event::Response(res)) => match 
res.event9
{
75
1
            None => [0x02, 0x00],
76
1
            Some(response_event::Event::Error(_)) => [0x02, 0x01],
77
1
            Some(response_event::Event::ServerCapabilities(_)) => [0x02, 0x02],
78
1
            Some(response_event::Event::ActionResult(_)) => [0x02, 0x03],
79
1
            Some(response_event::Event::FindMissingBlobsResponse(_)) => [0x02, 0x04],
80
1
            Some(response_event::Event::BatchReadBlobsResponse(_)) => [0x02, 0x05],
81
1
            Some(response_event::Event::BatchUpdateBlobsResponse(_)) => [0x02, 0x06],
82
1
            Some(response_event::Event::WriteResponse(_)) => [0x02, 0x07],
83
1
            Some(response_event::Event::QueryWriteStatusResponse(_)) => [0x02, 0x08],
84
1
            Some(response_event::Event::Empty(())) => [0x02, 0x09],
85
        },
86
6
        Some(event::Event::Stream(stream)) => match 
stream.event5
{
87
1
            None => [0x03, 0x00],
88
1
            Some(stream_event::Event::Error(_)) => [0x03, 0x01],
89
1
            Some(stream_event::Event::GetTreeResponse(_)) => [0x03, 0x02],
90
1
            Some(stream_event::Event::DataLength(_)) => [0x03, 0x03],
91
1
            Some(stream_event::Event::WriteRequest(_)) => [0x03, 0x04],
92
1
            Some(stream_event::Event::Operation(_)) => [0x03, 0x05],
93
        },
94
    }
95
30
}
96
97
/// Returns a unique node ID for this process.
98
0
pub fn get_node_id(event: Option<&Event>) -> [u8; 6] {
99
0
    let mut node_id = *NODE_ID.get_or_init(|| {
100
0
        let mut rng = rand::thread_rng();
101
0
        let mut out = [0; 6];
102
0
        rng.fill_bytes(&mut out);
103
0
        out
104
0
    });
105
0
    let Some(event) = event else {
  Branch (105:9): [True: 0, False: 0]
  Branch (105:9): [Folded - Ignored]
106
0
        return node_id;
107
    };
108
0
    let event_id = get_id_for_event(event);
109
0
    node_id[0] = (node_id[0] & 0xF0) | event_id[0];
110
0
    node_id[1] = event_id[1];
111
0
    node_id
112
0
}
113
114
pub struct OriginEventCollector {
115
    sender: mpsc::Sender<OriginEvent>,
116
    identity: String,
117
    bazel_metadata: Option<RequestMetadata>,
118
}
119
120
impl OriginEventCollector {
121
0
    pub fn new(
122
0
        sender: mpsc::Sender<OriginEvent>,
123
0
        identity: String,
124
0
        bazel_metadata: Option<RequestMetadata>,
125
0
    ) -> Self {
126
0
        Self {
127
0
            sender,
128
0
            identity,
129
0
            bazel_metadata,
130
0
        }
131
0
    }
132
133
0
    async fn publish_origin_event(&self, event: Event, parent_event_id: Option<Uuid>) -> Uuid {
134
0
        let event_id = Uuid::now_v6(&get_node_id(Some(&event)));
135
0
        let parent_event_id =
136
0
            parent_event_id.map_or_else(String::new, |id| id.as_hyphenated().to_string());
137
0
        // Failing to send this event means that the receiver has been dropped.
138
0
        let _ = self
139
0
            .sender
140
0
            .send(OriginEvent {
141
0
                version: ORIGIN_EVENT_VERSION,
142
0
                event_id: event_id.as_hyphenated().to_string(),
143
0
                parent_event_id,
144
0
                bazel_request_metadata: self.bazel_metadata.clone(),
145
0
                identity: self.identity.clone(),
146
0
                event: Some(event),
147
0
            })
148
0
            .await;
149
0
        event_id
150
0
    }
151
}
152
153
make_symbol!(ORIGIN_EVENT_COLLECTOR, OriginEventCollector);
154
155
pub struct OriginEventContext<T> {
156
    inner: Option<OriginEventContextImpl>,
157
    _phantom: PhantomData<T>,
158
}
159
160
impl<T> Clone for OriginEventContext<T> {
161
0
    fn clone(&self) -> Self {
162
0
        Self {
163
0
            inner: self.inner.clone(),
164
0
            _phantom: PhantomData,
165
0
        }
166
0
    }
167
}
168
169
impl OriginEventContext<()> {
170
38
    pub fn new<'a, T, U>(
171
38
        source_cb: impl Fn() -> &'a T,
172
38
    ) -> impl Future<Output = OriginEventContext<T>> + 'static
173
38
    where
174
38
        T: OriginEventSource<U> + 'static,
175
38
    {
176
0
        let Ok(Some(origin_event_collector)) =
  Branch (176:13): [True: 0, False: 15]
  Branch (176:13): [True: 0, False: 3]
  Branch (176:13): [True: 0, False: 3]
  Branch (176:13): [True: 0, False: 0]
  Branch (176:13): [True: 0, False: 6]
  Branch (176:13): [True: 0, False: 0]
  Branch (176:13): [True: 0, False: 1]
  Branch (176:13): [True: 0, False: 3]
  Branch (176:13): [True: 0, False: 0]
  Branch (176:13): [True: 0, False: 2]
  Branch (176:13): [True: 0, False: 4]
  Branch (176:13): [True: 0, False: 1]
  Branch (176:13): [Folded - Ignored]
  Branch (176:13): [Folded - Ignored]
177
38
            ActiveOriginContext::get_value(&ORIGIN_EVENT_COLLECTOR)
178
        else {
179
38
            return ready(OriginEventContext {
180
38
                inner: None,
181
38
                _phantom: PhantomData,
182
38
            })
183
38
            .left_future();
184
        };
185
0
        let event = source_cb().as_event();
186
0
        async move {
187
0
            let parent_event_id = origin_event_collector
188
0
                .publish_origin_event(event, None)
189
0
                .await;
190
0
            OriginEventContext {
191
0
                inner: Some(OriginEventContextImpl {
192
0
                    origin_event_collector,
193
0
                    parent_event_id,
194
0
                }),
195
0
                _phantom: PhantomData,
196
0
            }
197
0
        }
198
0
        .right_future()
199
38
    }
200
}
201
impl<U> OriginEventContext<U> {
202
37
    pub fn emit<'a, T, F>(
203
37
        &self,
204
37
        event_cb: F,
205
37
    ) -> impl Future<Output = ()> + Send + use<'_, 'a, T, F, U>
206
37
    where
207
37
        T: OriginEventSource<U> + 'a,
208
37
        F: Fn() -> &'a T,
209
37
    {
210
37
        let Some(
inner0
) = &self.inner else {
  Branch (210:13): [True: 0, False: 0]
  Branch (210:13): [True: 0, False: 0]
  Branch (210:13): [True: 0, False: 14]
  Branch (210:13): [True: 0, False: 0]
  Branch (210:13): [True: 0, False: 3]
  Branch (210:13): [True: 0, False: 0]
  Branch (210:13): [True: 0, False: 0]
  Branch (210:13): [True: 0, False: 3]
  Branch (210:13): [True: 0, False: 0]
  Branch (210:13): [True: 0, False: 0]
  Branch (210:13): [True: 0, False: 6]
  Branch (210:13): [True: 0, False: 0]
  Branch (210:13): [True: 0, False: 0]
  Branch (210:13): [True: 0, False: 1]
  Branch (210:13): [True: 0, False: 3]
  Branch (210:13): [True: 0, False: 0]
  Branch (210:13): [True: 0, False: 2]
  Branch (210:13): [True: 0, False: 4]
  Branch (210:13): [True: 0, False: 1]
  Branch (210:13): [Folded - Ignored]
  Branch (210:13): [Folded - Ignored]
211
37
            return ready(()).left_future();
212
        };
213
0
        let v = (event_cb)();
214
0
        v.publish(inner).right_future()
215
37
    }
216
217
24
    pub fn wrap_stream<'a, O, S>(&self, stream: S) -> Pin<Box<dyn Stream<Item = O> + Send + 'a>>
218
24
    where
219
24
        U: Send + 'a,
220
24
        O: OriginEventSource<U> + Send + 'a,
221
24
        S: Stream<Item = O> + Send + 'a,
222
24
    {
223
24
        if self.inner.is_none() {
  Branch (223:12): [True: 15, False: 0]
  Branch (223:12): [True: 3, False: 0]
  Branch (223:12): [True: 0, False: 0]
  Branch (223:12): [True: 0, False: 0]
  Branch (223:12): [True: 6, False: 0]
  Branch (223:12): [True: 0, False: 0]
  Branch (223:12): [Folded - Ignored]
  Branch (223:12): [Folded - Ignored]
224
24
            return Box::pin(stream);
225
0
        }
226
0
        let ctx = self.clone();
227
0
        Box::pin(stream.then(move |item| {
228
0
            let ctx = ctx.clone();
229
0
            async move {
230
0
                ctx.emit(|| &item).await;
231
0
                item
232
0
            }
233
0
        }))
234
24
    }
235
}
236
237
#[derive(Clone)]
238
pub struct OriginEventContextImpl {
239
    origin_event_collector: Arc<OriginEventCollector>,
240
    parent_event_id: Uuid,
241
}
242
243
pub trait OriginEventSource<Source>: Sized {
244
    fn as_event(&self) -> Event;
245
246
0
    fn publish<'a>(&self, ctx: &'a OriginEventContextImpl) -> impl Future<Output = ()> + Send + 'a {
247
0
        let event = self.as_event();
248
0
        ctx.origin_event_collector
249
0
            .publish_origin_event(event, Some(ctx.parent_event_id))
250
0
            // We don't need the Uuid here.
251
0
            .map(|_| ())
252
0
    }
253
}
254
255
impl<'a, T, U> OriginEventSource<U> for &'a Response<T>
256
where
257
    T: OriginEventSource<U> + 'a,
258
{
259
    #[inline]
260
0
    fn as_event(&self) -> Event {
261
0
        self.get_ref().as_event()
262
0
    }
263
}
264
265
impl<T, U> OriginEventSource<U> for Result<Response<T>, TonicStatus>
266
where
267
    T: OriginEventSource<U>,
268
    TonicStatus: OriginEventSource<U>,
269
{
270
    #[inline]
271
0
    fn as_event(&self) -> Event {
272
0
        match self {
273
0
            Ok(v) => v.as_event(),
274
0
            Err(v) => v.as_event(),
275
        }
276
0
    }
277
}
278
279
0
fn tonic_status_to_proto_status(tonic_status: &TonicStatus) -> Status {
280
0
    Status {
281
0
        code: tonic_status.code().into(),
282
0
        message: tonic_status.message().into(),
283
0
        details: Vec::new(),
284
0
    }
285
0
}
286
287
macro_rules! get_event_type {
288
    (Request, $variant:ident, $data:expr) => {
289
        event::Event::Request(RequestEvent {
290
            event: Some(request_event::Event::$variant($data)),
291
        })
292
    };
293
    (Response, $variant:ident, $data:expr) => {
294
        event::Event::Response(ResponseEvent {
295
            event: Some(response_event::Event::$variant($data)),
296
        })
297
    };
298
    (Stream, $variant:ident, $data:expr) => {
299
        event::Event::Stream(StreamEvent {
300
            event: Some(stream_event::Event::$variant($data)),
301
        })
302
    };
303
}
304
305
macro_rules! impl_as_event {
306
    (Stream, $origin:ident, $type:ident) => {
307
        impl_as_event! {Stream, $origin, $type, $type, {
308
            #[inline]
309
0
            |val: &$type| val.clone()
310
        }}
311
    };
312
    (Stream, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => {
313
        impl_as_event! {__inner, Stream, $origin, Result<$type, TonicStatus>, $variant, {
314
            #[inline]
315
0
            |val: &Result<$type, TonicStatus>| match val {
316
0
                Ok(v) => get_event_type!(Stream, $variant, $data_fn(v)),
317
0
                Err(e) => get_event_type!(Stream, Error, tonic_status_to_proto_status(e)),
318
0
            }
319
        }}
320
    };
321
    ($event_type:ident, $origin:ty, $type:ident) => {
322
        impl_as_event!(__inner, $event_type, $origin, $type, $type, {
323
            #[inline]
324
0
            |val: &$type| get_event_type!($event_type, $type, val.clone())
325
        });
326
    };
327
    ($event_type:ident, $origin:ty, $type:ty, $variant:ident) => {
328
        impl_as_event!(__inner, $event_type, $origin, $type, $variant, {
329
            #[inline]
330
            |val: &$type| get_event_type!($event_type, $variant, val.clone())
331
        });
332
    };
333
    ($event_type:ident, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => {
334
        impl_as_event! {__inner, $event_type, $origin, $type, $variant, $data_fn}
335
    };
336
    (__inner, $event_type:ident, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => {
337
        impl OriginEventSource<$origin> for $type {
338
            #[inline]
339
0
            fn as_event(&self) -> Event {
340
0
                Event {
341
0
                    event: Some($data_fn(self)),
342
0
                }
343
0
            }
344
        }
345
    };
346
}
347
348
impl<Origin> OriginEventSource<Origin> for TonicStatus {
349
    #[inline]
350
0
    fn as_event(&self) -> Event {
351
0
        Event {
352
0
            event: Some(get_event_type!(
353
0
                Response,
354
0
                Error,
355
0
                tonic_status_to_proto_status(self)
356
0
            )),
357
0
        }
358
0
    }
359
}
360
361
#[inline]
362
0
fn to_batch_update_blobs_request_override(val: &BatchUpdateBlobsRequest) -> event::Event {
363
0
    get_event_type!(
364
0
        Request,
365
0
        BatchUpdateBlobsRequest,
366
0
        BatchUpdateBlobsRequestOverride {
367
0
            instance_name: val.instance_name.clone(),
368
0
            requests: val
369
0
                .requests
370
0
                .iter()
371
0
                .map(|v| batch_update_blobs_request_override::Request {
372
0
                    digest: v.digest.clone(),
373
0
                    compressor: v.compressor,
374
0
                    data_len: u64::try_from(v.data.len()).unwrap_or_default(),
375
0
                })
376
0
                .collect(),
377
0
            digest_function: val.digest_function,
378
0
        }
379
0
    )
380
0
}
381
382
#[inline]
383
0
fn to_batch_read_blobs_response_override(val: &BatchReadBlobsResponse) -> event::Event {
384
0
    get_event_type!(
385
0
        Response,
386
0
        BatchReadBlobsResponse,
387
0
        BatchReadBlobsResponseOverride {
388
0
            responses: val
389
0
                .responses
390
0
                .iter()
391
0
                .map(|v| batch_read_blobs_response_override::Response {
392
0
                    digest: v.digest.clone(),
393
0
                    compressor: v.compressor,
394
0
                    status: v.status.clone(),
395
0
                    data_len: u64::try_from(v.data.len()).unwrap_or_default(),
396
0
                })
397
0
                .collect(),
398
0
        }
399
0
    )
400
0
}
401
402
#[inline]
403
0
fn to_empty<T>(_: T) -> event::Event {
404
0
    get_event_type!(Response, Empty, ())
405
0
}
406
407
// -- Requests --
408
409
impl_as_event! {Request, (), GetCapabilitiesRequest}
410
impl_as_event! {Request, (), GetActionResultRequest}
411
impl_as_event! {Request, (), UpdateActionResultRequest}
412
impl_as_event! {Request, (), FindMissingBlobsRequest}
413
impl_as_event! {Request, (), BatchReadBlobsRequest}
414
impl_as_event! {Request, (), BatchUpdateBlobsRequest, BatchUpdateBlobsRequest, to_batch_update_blobs_request_override}
415
impl_as_event! {Request, (), GetTreeRequest}
416
impl_as_event! {Request, (), ReadRequest}
417
impl_as_event! {Request, (), Streaming<WriteRequest>, WriteRequest, to_empty}
418
impl_as_event! {Request, (), QueryWriteStatusRequest}
419
impl_as_event! {Request, (), ExecuteRequest}
420
impl_as_event! {Request, (), WaitExecutionRequest}
421
422
// -- Responses --
423
424
impl_as_event! {Response, GetCapabilitiesRequest, ServerCapabilities}
425
impl_as_event! {Response, GetActionResultRequest, ActionResult}
426
impl_as_event! {Response, UpdateActionResultRequest, ActionResult}
427
impl_as_event! {Response, Streaming<WriteRequest>, WriteResponse}
428
impl_as_event! {Response, ReadRequest, Pin<Box<dyn Stream<Item = Result<ReadResponse, TonicStatus>> + Send + '_>>, Empty, to_empty}
429
impl_as_event! {Response, QueryWriteStatusRequest, QueryWriteStatusResponse}
430
impl_as_event! {Response, FindMissingBlobsRequest, FindMissingBlobsResponse}
431
impl_as_event! {Response, BatchUpdateBlobsRequest, BatchUpdateBlobsResponse}
432
impl_as_event! {Response, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchReadBlobsResponseOverride, to_batch_read_blobs_response_override}
433
impl_as_event! {Response, GetTreeRequest, Pin<Box<dyn Stream<Item = Result<GetTreeResponse, TonicStatus>> + Send + '_>>, Empty, to_empty}
434
impl_as_event! {Response, ExecuteRequest, Pin<Box<dyn Stream<Item = Result<Operation, TonicStatus>> + Send + '_>>, Empty, to_empty}
435
436
// -- Streams --
437
438
impl_as_event! {Stream, ReadRequest, ReadResponse, DataLength, {
439
0
    |val: &ReadResponse| val.data.len() as u64
440
}}
441
impl_as_event! {Stream, Streaming<WriteRequest>, WriteRequest, WriteRequest, {
442
0
    |val: &WriteRequest| WriteRequestOverride {
443
0
        resource_name: val.resource_name.clone(),
444
0
        write_offset: val.write_offset,
445
0
        finish_write: val.finish_write,
446
0
        data_len: u64::try_from(val.data.len()).unwrap_or_default(),
447
0
    }
448
}}
449
impl_as_event! {Stream, GetTreeRequest, GetTreeResponse}
450
impl_as_event! {Stream, ExecuteRequest, Operation}
451
impl_as_event! {Stream, WaitExecutionRequest, Operation}