Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/origin_event_publisher.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use bytes::BytesMut;
16
use futures::{future, FutureExt};
17
use nativelink_proto::com::github::trace_machina::nativelink::events::{OriginEvent, OriginEvents};
18
use prost::Message;
19
use tokio::sync::{broadcast, mpsc};
20
use tracing::error;
21
use uuid::Uuid;
22
23
use crate::origin_event::get_node_id;
24
use crate::shutdown_guard::{Priority, ShutdownGuard};
25
use crate::store_trait::{Store, StoreLike};
26
27
/// Publishes origin events to the store.
28
pub struct OriginEventPublisher {
29
    store: Store,
30
    rx: mpsc::Receiver<OriginEvent>,
31
    shutdown_tx: broadcast::Sender<ShutdownGuard>,
32
}
33
34
impl OriginEventPublisher {
35
0
    pub fn new(
36
0
        store: Store,
37
0
        rx: mpsc::Receiver<OriginEvent>,
38
0
        shutdown_tx: broadcast::Sender<ShutdownGuard>,
39
0
    ) -> Self {
40
0
        Self {
41
0
            store,
42
0
            rx,
43
0
            shutdown_tx,
44
0
        }
45
0
    }
46
47
    /// Runs the origin event publisher.
48
0
    pub async fn run(mut self) {
49
        const MAX_EVENTS_PER_BATCH: usize = 1024;
50
0
        let mut batch: Vec<OriginEvent> = Vec::with_capacity(MAX_EVENTS_PER_BATCH);
51
0
        let mut shutdown_rx = self.shutdown_tx.subscribe();
52
0
        let shutdown_fut = shutdown_rx.recv().fuse();
53
0
        tokio::pin!(shutdown_fut);
54
0
        let shutdown_guard = future::pending().left_future();
55
0
        tokio::pin!(shutdown_guard);
56
        loop {
57
0
            tokio::select! {
58
                biased;
59
0
                _ = self.rx.recv_many(&mut batch, MAX_EVENTS_PER_BATCH) => {
60
0
                    self.handle_batch(&mut batch).await;
61
                }
62
0
                shutdown_guard_res = &mut shutdown_fut => {
63
0
                    tracing::info!("Received shutdown down in origin event publisher");
64
0
                    let Ok(mut local_shutdown_guard) = shutdown_guard_res else {
  Branch (64:25): [Folded - Ignored]
  Branch (64:25): [Folded - Ignored]
65
0
                        tracing::error!("Received shutdown down in origin event publisher but failed to get shutdown guard");
66
0
                        return;
67
                    };
68
0
                    shutdown_guard.set(async move {
69
0
                        local_shutdown_guard.wait_for(Priority::P0).await;
70
0
                    }
71
0
                    .right_future());
72
0
                }
73
0
                () = &mut shutdown_guard => {
74
                    // All other services with less priority have completed.
75
                    // We may still need to process any remaining events.
76
0
                    while !self.rx.is_empty() {
  Branch (76:27): [Folded - Ignored]
  Branch (76:27): [Folded - Ignored]
77
0
                        self.rx.recv_many(&mut batch, MAX_EVENTS_PER_BATCH).await;
78
0
                        self.handle_batch(&mut batch).await;
79
                    }
80
0
                    return;
81
                }
82
            }
83
        }
84
0
    }
85
86
0
    async fn handle_batch(&self, batch: &mut Vec<OriginEvent>) {
87
0
        let uuid = Uuid::now_v6(&get_node_id(None));
88
0
        let events = OriginEvents {
89
0
            // Clippy wants us to use use `mem::take`, but this would
90
0
            // move all capacity as well to the new vector. Since it is
91
0
            // much more likely that we will have a small number of events
92
0
            // in the batch, we prefer to use `drain` and `collect` here,
93
0
            // so we only need to allocate the exact amount of memory needed
94
0
            // and let the batch vector's capacity be reused.
95
0
            #[allow(clippy::drain_collect)]
96
0
            events: batch.drain(..).collect(),
97
0
        };
98
0
        let mut data = BytesMut::new();
99
0
        if let Err(e) = events.encode(&mut data) {
  Branch (99:16): [Folded - Ignored]
  Branch (99:16): [Folded - Ignored]
100
0
            error!("Failed to encode origin events: {}", e);
101
0
            return;
102
0
        }
103
0
        let update_result = self
104
0
            .store
105
0
            .as_store_driver_pin()
106
0
            .update_oneshot(
107
0
                format!("OriginEvents:{}", uuid.hyphenated()).into(),
108
0
                data.freeze(),
109
0
            )
110
0
            .await;
111
0
        if let Err(err) = update_result {
  Branch (111:16): [Folded - Ignored]
  Branch (111:16): [Folded - Ignored]
112
0
            error!("Failed to upload origin events: {}", err);
113
0
        }
114
0
    }
115
}