Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/bep_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::pin::Pin;
17
18
use bytes::BytesMut;
19
use futures::stream::unfold;
20
use futures::Stream;
21
use nativelink_error::{Error, ResultExt};
22
use nativelink_proto::com::github::trace_machina::nativelink::events::{bep_event, BepEvent};
23
use nativelink_proto::google::devtools::build::v1::publish_build_event_server::{
24
    PublishBuildEvent, PublishBuildEventServer,
25
};
26
use nativelink_proto::google::devtools::build::v1::{
27
    PublishBuildToolEventStreamRequest, PublishBuildToolEventStreamResponse,
28
    PublishLifecycleEventRequest,
29
};
30
use nativelink_store::store_manager::StoreManager;
31
use nativelink_util::origin_context::{ActiveOriginContext, ORIGIN_IDENTITY};
32
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike};
33
use prost::Message;
34
use tonic::{Request, Response, Result, Status, Streaming};
35
use tracing::{instrument, Level};
36
37
/// Current version of the BEP event. This might be used in the future if
38
/// there is a breaking change in the BEP event format.
39
const BEP_EVENT_VERSION: u32 = 0;
40
41
2
fn get_identity() -> Result<Option<String>, Status> {
42
2
    ActiveOriginContext::get()
43
2
        .map_or(Ok(None), |ctx| ctx.get_value(&ORIGIN_IDENTITY))
44
2
        .err_tip(|| 
"In BepServer"0
)
45
2
        .map_or_else(|e| 
Err(e.into())0
, |v| Ok(v.map(|v|
v.as_ref().clone()0
)))
46
2
}
47
48
pub struct BepServer {
49
    store: Store,
50
}
51
52
impl BepServer {
53
2
    pub fn new(
54
2
        config: &nativelink_config::cas_server::BepConfig,
55
2
        store_manager: &StoreManager,
56
2
    ) -> Result<Self, Error> {
57
2
        let store = store_manager
58
2
            .get_store(&config.store)
59
2
            .err_tip(|| 
format!("Expected store {} to exist in store manager", &config.store)0
)
?0
;
60
61
2
        Ok(Self { store })
62
2
    }
63
64
0
    pub fn into_service(self) -> PublishBuildEventServer<BepServer> {
65
0
        PublishBuildEventServer::new(self)
66
0
    }
67
68
1
    async fn inner_publish_lifecycle_event(
69
1
        &self,
70
1
        request: PublishLifecycleEventRequest,
71
1
        identity: Option<String>,
72
1
    ) -> Result<Response<()>, Error> {
73
1
        let build_event = request
74
1
            .build_event
75
1
            .as_ref()
76
1
            .err_tip(|| 
"Expected build_event to be set"0
)
?0
;
77
1
        let stream_id = build_event
78
1
            .stream_id
79
1
            .as_ref()
80
1
            .err_tip(|| 
"Expected stream_id to be set"0
)
?0
;
81
82
1
        let sequence_number = build_event.sequence_number;
83
1
84
1
        let store_key = StoreKey::Str(Cow::Owned(format!(
85
1
            "BepEvent:le:{}:{}:{}",
86
1
            &stream_id.build_id, &stream_id.invocation_id, sequence_number,
87
1
        )));
88
1
89
1
        let bep_event = BepEvent {
90
1
            version: BEP_EVENT_VERSION,
91
1
            identity: identity.unwrap_or_default(),
92
1
            event: Some(bep_event::Event::LifecycleEvent(request)),
93
1
        };
94
1
        let mut buf = BytesMut::new();
95
1
        bep_event
96
1
            .encode(&mut buf)
97
1
            .err_tip(|| 
"Could not encode PublishLifecycleEventRequest proto"0
)
?0
;
98
99
1
        self.store
100
1
            .update_oneshot(store_key, buf.freeze())
101
1
            .await
102
1
            .err_tip(|| 
"Failed to store PublishLifecycleEventRequest"0
)
?0
;
103
104
1
        Ok(Response::new(()))
105
1
    }
106
107
1
    async fn inner_publish_build_tool_event_stream(
108
1
        &self,
109
1
        stream: Streaming<PublishBuildToolEventStreamRequest>,
110
1
        identity: Option<String>,
111
1
    ) -> Result<Response<PublishBuildToolEventStreamStream>, Error> {
112
5
        async fn process_request(
113
5
            store: Pin<&dyn StoreDriver>,
114
5
            request: PublishBuildToolEventStreamRequest,
115
5
            identity: String,
116
5
        ) -> Result<PublishBuildToolEventStreamResponse, Status> {
117
5
            let ordered_build_event = request
118
5
                .ordered_build_event
119
5
                .as_ref()
120
5
                .err_tip(|| 
"Expected ordered_build_event to be set"0
)
?0
;
121
5
            let stream_id = ordered_build_event
122
5
                .stream_id
123
5
                .as_ref()
124
5
                .err_tip(|| 
"Expected stream_id to be set"0
)
?0
125
5
                .clone();
126
5
127
5
            let sequence_number = ordered_build_event.sequence_number;
128
5
129
5
            let bep_event = BepEvent {
130
5
                version: BEP_EVENT_VERSION,
131
5
                identity,
132
5
                event: Some(bep_event::Event::BuildToolEvent(request)),
133
5
            };
134
5
            let mut buf = BytesMut::new();
135
5
136
5
            bep_event
137
5
                .encode(&mut buf)
138
5
                .err_tip(|| 
"Could not encode PublishBuildToolEventStreamRequest proto"0
)
?0
;
139
140
5
            store
141
5
                .update_oneshot(
142
5
                    StoreKey::Str(Cow::Owned(format!(
143
5
                        "BepEvent:be:{}:{}:{}",
144
5
                        &stream_id.build_id, &stream_id.invocation_id, sequence_number,
145
5
                    ))),
146
5
                    buf.freeze(),
147
5
                )
148
5
                .await
149
5
                .err_tip(|| 
"Failed to store PublishBuildToolEventStreamRequest"0
)
?0
;
150
151
5
            Ok(PublishBuildToolEventStreamResponse {
152
5
                stream_id: Some(stream_id.clone()),
153
5
                sequence_number,
154
5
            })
155
5
        }
156
157
        struct State {
158
            store: Store,
159
            stream: Streaming<PublishBuildToolEventStreamRequest>,
160
            identity: String,
161
        }
162
163
1
        let response_stream =
164
1
            unfold(
165
1
                Some(State {
166
1
                    store: self.store.clone(),
167
1
                    stream,
168
1
                    identity: identity.unwrap_or_default(),
169
1
                }),
170
5
                move |maybe_state| async move {
171
5
                    let mut state = maybe_state
?0
;
172
5
                    let request =
173
5
                        match state.stream.message().await.err_tip(|| {
174
0
                            "While receiving message in publish_build_tool_event_stream"
175
5
                        }) {
176
5
                            Ok(Some(request)) => request,
177
0
                            Ok(None) => return None,
178
0
                            Err(e) => return Some((Err(e.into()), None)),
179
                        };
180
5
                    process_request(
181
5
                        state.store.as_store_driver_pin(),
182
5
                        request,
183
5
                        state.identity.clone(),
184
5
                    )
185
5
                    .await
186
5
                    .map_or_else(
187
5
                        |e| 
Some((Err(e), None))0
,
188
5
                        |response| Some((Ok(response), Some(state))),
189
5
                    )
190
10
                },
191
1
            );
192
1
193
1
        Ok(Response::new(Box::pin(response_stream)))
194
1
    }
195
}
196
197
type PublishBuildToolEventStreamStream = Pin<
198
    Box<dyn Stream<Item = Result<PublishBuildToolEventStreamResponse, Status>> + Send + 'static>,
199
>;
200
201
#[tonic::async_trait]
202
impl PublishBuildEvent for BepServer {
203
    type PublishBuildToolEventStreamStream = PublishBuildToolEventStreamStream;
204
205
    #[allow(clippy::blocks_in_conditions)]
206
    #[instrument(
207
        err,
208
        ret(level = Level::INFO),
209
        level = Level::ERROR,
210
        skip_all,
211
        fields(request = ?grpc_request.get_ref())
212
    )]
213
    async fn publish_lifecycle_event(
214
        &self,
215
        grpc_request: Request<PublishLifecycleEventRequest>,
216
1
    ) -> Result<Response<()>, Status> {
217
1
        self.inner_publish_lifecycle_event(grpc_request.into_inner(), get_identity()
?0
)
218
1
            .await
219
1
            .map_err(Error::into)
220
2
    }
221
222
    #[allow(clippy::blocks_in_conditions)]
223
    #[instrument(
224
      err,
225
      level = Level::ERROR,
226
      skip_all,
227
      fields(request = ?grpc_request.get_ref())
228
    )]
229
    async fn publish_build_tool_event_stream(
230
        &self,
231
        grpc_request: Request<Streaming<PublishBuildToolEventStreamRequest>>,
232
1
    ) -> Result<Response<Self::PublishBuildToolEventStreamStream>, Status> {
233
1
        self.inner_publish_build_tool_event_stream(grpc_request.into_inner(), get_identity()
?0
)
234
1
            .await
235
1
            .map_err(Error::into)
236
2
    }
237
}