/build/source/nativelink-service/src/bep_server.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::borrow::Cow; |
16 | | use std::pin::Pin; |
17 | | |
18 | | use bytes::BytesMut; |
19 | | use futures::stream::unfold; |
20 | | use futures::Stream; |
21 | | use nativelink_error::{Error, ResultExt}; |
22 | | use nativelink_proto::com::github::trace_machina::nativelink::events::{bep_event, BepEvent}; |
23 | | use nativelink_proto::google::devtools::build::v1::publish_build_event_server::{ |
24 | | PublishBuildEvent, PublishBuildEventServer, |
25 | | }; |
26 | | use nativelink_proto::google::devtools::build::v1::{ |
27 | | PublishBuildToolEventStreamRequest, PublishBuildToolEventStreamResponse, |
28 | | PublishLifecycleEventRequest, |
29 | | }; |
30 | | use nativelink_store::store_manager::StoreManager; |
31 | | use nativelink_util::origin_context::{ActiveOriginContext, ORIGIN_IDENTITY}; |
32 | | use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike}; |
33 | | use prost::Message; |
34 | | use tonic::{Request, Response, Result, Status, Streaming}; |
35 | | use tracing::{instrument, Level}; |
36 | | |
37 | | /// Current version of the BEP event. This might be used in the future if |
38 | | /// there is a breaking change in the BEP event format. |
39 | | const BEP_EVENT_VERSION: u32 = 0; |
40 | | |
41 | 2 | fn get_identity() -> Result<Option<String>, Status> { |
42 | 2 | ActiveOriginContext::get() |
43 | 2 | .map_or(Ok(None), |ctx| ctx.get_value(&ORIGIN_IDENTITY)) |
44 | 2 | .err_tip(|| "In BepServer"0 ) |
45 | 2 | .map_or_else(|e| Err(e.into())0 , |v| Ok(v.map(|v| v.as_ref().clone()0 ))) |
46 | 2 | } |
47 | | |
48 | | pub struct BepServer { |
49 | | store: Store, |
50 | | } |
51 | | |
52 | | impl BepServer { |
53 | 2 | pub fn new( |
54 | 2 | config: &nativelink_config::cas_server::BepConfig, |
55 | 2 | store_manager: &StoreManager, |
56 | 2 | ) -> Result<Self, Error> { |
57 | 2 | let store = store_manager |
58 | 2 | .get_store(&config.store) |
59 | 2 | .err_tip(|| format!("Expected store {} to exist in store manager", &config.store)0 )?0 ; |
60 | | |
61 | 2 | Ok(Self { store }) |
62 | 2 | } |
63 | | |
64 | 0 | pub fn into_service(self) -> PublishBuildEventServer<BepServer> { |
65 | 0 | PublishBuildEventServer::new(self) |
66 | 0 | } |
67 | | |
68 | 1 | async fn inner_publish_lifecycle_event( |
69 | 1 | &self, |
70 | 1 | request: PublishLifecycleEventRequest, |
71 | 1 | identity: Option<String>, |
72 | 1 | ) -> Result<Response<()>, Error> { |
73 | 1 | let build_event = request |
74 | 1 | .build_event |
75 | 1 | .as_ref() |
76 | 1 | .err_tip(|| "Expected build_event to be set"0 )?0 ; |
77 | 1 | let stream_id = build_event |
78 | 1 | .stream_id |
79 | 1 | .as_ref() |
80 | 1 | .err_tip(|| "Expected stream_id to be set"0 )?0 ; |
81 | | |
82 | 1 | let sequence_number = build_event.sequence_number; |
83 | 1 | |
84 | 1 | let store_key = StoreKey::Str(Cow::Owned(format!( |
85 | 1 | "BepEvent:le:{}:{}:{}", |
86 | 1 | &stream_id.build_id, &stream_id.invocation_id, sequence_number, |
87 | 1 | ))); |
88 | 1 | |
89 | 1 | let bep_event = BepEvent { |
90 | 1 | version: BEP_EVENT_VERSION, |
91 | 1 | identity: identity.unwrap_or_default(), |
92 | 1 | event: Some(bep_event::Event::LifecycleEvent(request)), |
93 | 1 | }; |
94 | 1 | let mut buf = BytesMut::new(); |
95 | 1 | bep_event |
96 | 1 | .encode(&mut buf) |
97 | 1 | .err_tip(|| "Could not encode PublishLifecycleEventRequest proto"0 )?0 ; |
98 | | |
99 | 1 | self.store |
100 | 1 | .update_oneshot(store_key, buf.freeze()) |
101 | 1 | .await |
102 | 1 | .err_tip(|| "Failed to store PublishLifecycleEventRequest"0 )?0 ; |
103 | | |
104 | 1 | Ok(Response::new(())) |
105 | 1 | } |
106 | | |
107 | 1 | async fn inner_publish_build_tool_event_stream( |
108 | 1 | &self, |
109 | 1 | stream: Streaming<PublishBuildToolEventStreamRequest>, |
110 | 1 | identity: Option<String>, |
111 | 1 | ) -> Result<Response<PublishBuildToolEventStreamStream>, Error> { |
112 | 5 | async fn process_request( |
113 | 5 | store: Pin<&dyn StoreDriver>, |
114 | 5 | request: PublishBuildToolEventStreamRequest, |
115 | 5 | identity: String, |
116 | 5 | ) -> Result<PublishBuildToolEventStreamResponse, Status> { |
117 | 5 | let ordered_build_event = request |
118 | 5 | .ordered_build_event |
119 | 5 | .as_ref() |
120 | 5 | .err_tip(|| "Expected ordered_build_event to be set"0 )?0 ; |
121 | 5 | let stream_id = ordered_build_event |
122 | 5 | .stream_id |
123 | 5 | .as_ref() |
124 | 5 | .err_tip(|| "Expected stream_id to be set"0 )?0 |
125 | 5 | .clone(); |
126 | 5 | |
127 | 5 | let sequence_number = ordered_build_event.sequence_number; |
128 | 5 | |
129 | 5 | let bep_event = BepEvent { |
130 | 5 | version: BEP_EVENT_VERSION, |
131 | 5 | identity, |
132 | 5 | event: Some(bep_event::Event::BuildToolEvent(request)), |
133 | 5 | }; |
134 | 5 | let mut buf = BytesMut::new(); |
135 | 5 | |
136 | 5 | bep_event |
137 | 5 | .encode(&mut buf) |
138 | 5 | .err_tip(|| "Could not encode PublishBuildToolEventStreamRequest proto"0 )?0 ; |
139 | | |
140 | 5 | store |
141 | 5 | .update_oneshot( |
142 | 5 | StoreKey::Str(Cow::Owned(format!( |
143 | 5 | "BepEvent:be:{}:{}:{}", |
144 | 5 | &stream_id.build_id, &stream_id.invocation_id, sequence_number, |
145 | 5 | ))), |
146 | 5 | buf.freeze(), |
147 | 5 | ) |
148 | 5 | .await |
149 | 5 | .err_tip(|| "Failed to store PublishBuildToolEventStreamRequest"0 )?0 ; |
150 | | |
151 | 5 | Ok(PublishBuildToolEventStreamResponse { |
152 | 5 | stream_id: Some(stream_id.clone()), |
153 | 5 | sequence_number, |
154 | 5 | }) |
155 | 5 | } |
156 | | |
157 | | struct State { |
158 | | store: Store, |
159 | | stream: Streaming<PublishBuildToolEventStreamRequest>, |
160 | | identity: String, |
161 | | } |
162 | | |
163 | 1 | let response_stream = |
164 | 1 | unfold( |
165 | 1 | Some(State { |
166 | 1 | store: self.store.clone(), |
167 | 1 | stream, |
168 | 1 | identity: identity.unwrap_or_default(), |
169 | 1 | }), |
170 | 5 | move |maybe_state| async move { |
171 | 5 | let mut state = maybe_state?0 ; |
172 | 5 | let request = |
173 | 5 | match state.stream.message().await.err_tip(|| { |
174 | 0 | "While receiving message in publish_build_tool_event_stream" |
175 | 5 | }) { |
176 | 5 | Ok(Some(request)) => request, |
177 | 0 | Ok(None) => return None, |
178 | 0 | Err(e) => return Some((Err(e.into()), None)), |
179 | | }; |
180 | 5 | process_request( |
181 | 5 | state.store.as_store_driver_pin(), |
182 | 5 | request, |
183 | 5 | state.identity.clone(), |
184 | 5 | ) |
185 | 5 | .await |
186 | 5 | .map_or_else( |
187 | 5 | |e| Some((Err(e), None))0 , |
188 | 5 | |response| Some((Ok(response), Some(state))), |
189 | 5 | ) |
190 | 10 | }, |
191 | 1 | ); |
192 | 1 | |
193 | 1 | Ok(Response::new(Box::pin(response_stream))) |
194 | 1 | } |
195 | | } |
196 | | |
197 | | type PublishBuildToolEventStreamStream = Pin< |
198 | | Box<dyn Stream<Item = Result<PublishBuildToolEventStreamResponse, Status>> + Send + 'static>, |
199 | | >; |
200 | | |
201 | | #[tonic::async_trait] |
202 | | impl PublishBuildEvent for BepServer { |
203 | | type PublishBuildToolEventStreamStream = PublishBuildToolEventStreamStream; |
204 | | |
205 | | #[allow(clippy::blocks_in_conditions)] |
206 | | #[instrument( |
207 | | err, |
208 | | ret(level = Level::INFO), |
209 | | level = Level::ERROR, |
210 | | skip_all, |
211 | | fields(request = ?grpc_request.get_ref()) |
212 | | )] |
213 | | async fn publish_lifecycle_event( |
214 | | &self, |
215 | | grpc_request: Request<PublishLifecycleEventRequest>, |
216 | 1 | ) -> Result<Response<()>, Status> { |
217 | 1 | self.inner_publish_lifecycle_event(grpc_request.into_inner(), get_identity()?0 ) |
218 | 1 | .await |
219 | 1 | .map_err(Error::into) |
220 | 2 | } |
221 | | |
222 | | #[allow(clippy::blocks_in_conditions)] |
223 | | #[instrument( |
224 | | err, |
225 | | level = Level::ERROR, |
226 | | skip_all, |
227 | | fields(request = ?grpc_request.get_ref()) |
228 | | )] |
229 | | async fn publish_build_tool_event_stream( |
230 | | &self, |
231 | | grpc_request: Request<Streaming<PublishBuildToolEventStreamRequest>>, |
232 | 1 | ) -> Result<Response<Self::PublishBuildToolEventStreamStream>, Status> { |
233 | 1 | self.inner_publish_build_tool_event_stream(grpc_request.into_inner(), get_identity()?0 ) |
234 | 1 | .await |
235 | 1 | .map_err(Error::into) |
236 | 2 | } |
237 | | } |