/build/source/nativelink-service/src/cas_server.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::collections::{HashMap, VecDeque}; |
16 | | use std::convert::Into; |
17 | | use std::pin::Pin; |
18 | | |
19 | | use bytes::Bytes; |
20 | | use futures::stream::{FuturesUnordered, Stream}; |
21 | | use futures::{StreamExt, TryStreamExt}; |
22 | | use nativelink_config::cas_server::{CasStoreConfig, InstanceName}; |
23 | | use nativelink_error::{error_if, make_input_err, Code, Error, ResultExt}; |
24 | | use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_server::{ |
25 | | ContentAddressableStorage, ContentAddressableStorageServer as Server, |
26 | | }; |
27 | | use nativelink_proto::build::bazel::remote::execution::v2::{ |
28 | | batch_read_blobs_response, batch_update_blobs_response, compressor, BatchReadBlobsRequest, |
29 | | BatchReadBlobsResponse, BatchUpdateBlobsRequest, BatchUpdateBlobsResponse, Directory, |
30 | | FindMissingBlobsRequest, FindMissingBlobsResponse, GetTreeRequest, GetTreeResponse, |
31 | | }; |
32 | | use nativelink_proto::google::rpc::Status as GrpcStatus; |
33 | | use nativelink_store::ac_utils::get_and_decode_digest; |
34 | | use nativelink_store::grpc_store::GrpcStore; |
35 | | use nativelink_store::store_manager::StoreManager; |
36 | | use nativelink_util::common::DigestInfo; |
37 | | use nativelink_util::digest_hasher::make_ctx_for_hash_func; |
38 | | use nativelink_util::origin_event::OriginEventContext; |
39 | | use nativelink_util::store_trait::{Store, StoreLike}; |
40 | | use tonic::{Request, Response, Status}; |
41 | | use tracing::{error_span, event, instrument, Level}; |
42 | | |
43 | | pub struct CasServer { |
44 | | stores: HashMap<String, Store>, |
45 | | } |
46 | | |
47 | | type GetTreeStream = Pin<Box<dyn Stream<Item = Result<GetTreeResponse, Status>> + Send + 'static>>; |
48 | | |
49 | | impl CasServer { |
50 | 8 | pub fn new( |
51 | 8 | config: &HashMap<InstanceName, CasStoreConfig>, |
52 | 8 | store_manager: &StoreManager, |
53 | 8 | ) -> Result<Self, Error> { |
54 | 8 | let mut stores = HashMap::with_capacity(config.len()); |
55 | 16 | for (instance_name, cas_cfg8 ) in config { |
56 | 8 | let store = store_manager.get_store(&cas_cfg.cas_store).ok_or_else(|| { |
57 | 0 | make_input_err!("'cas_store': '{}' does not exist", cas_cfg.cas_store) |
58 | 8 | })?0 ; |
59 | 8 | stores.insert(instance_name.to_string(), store); |
60 | | } |
61 | 8 | Ok(CasServer { stores }) |
62 | 8 | } |
63 | | |
64 | 0 | pub fn into_service(self) -> Server<CasServer> { |
65 | 0 | Server::new(self) |
66 | 0 | } |
67 | | |
68 | 4 | async fn inner_find_missing_blobs( |
69 | 4 | &self, |
70 | 4 | request: FindMissingBlobsRequest, |
71 | 4 | ) -> Result<Response<FindMissingBlobsResponse>, Error> { |
72 | 4 | let instance_name = &request.instance_name; |
73 | 4 | let store = self |
74 | 4 | .stores |
75 | 4 | .get(instance_name) |
76 | 4 | .err_tip(|| format!("'instance_name' not configured for '{instance_name}'")0 )?0 |
77 | 4 | .clone(); |
78 | 4 | |
79 | 4 | let mut requested_blobs = Vec::with_capacity(request.blob_digests.len()); |
80 | 10 | for digest7 in &request.blob_digests { |
81 | 7 | requested_blobs.push(DigestInfo::try_from(digest.clone())?1 .into()6 ); |
82 | | } |
83 | 3 | let sizes = store |
84 | 3 | .has_many(&requested_blobs) |
85 | 3 | .await |
86 | 3 | .err_tip(|| "In find_missing_blobs"0 )?0 ; |
87 | 3 | let missing_blob_digests = sizes |
88 | 3 | .into_iter() |
89 | 3 | .zip(request.blob_digests) |
90 | 5 | .filter_map(|(maybe_size, digest)| maybe_size.map_or_else(|| Some(digest)2 , |_| None3 )) |
91 | 3 | .collect(); |
92 | 3 | |
93 | 3 | Ok(Response::new(FindMissingBlobsResponse { |
94 | 3 | missing_blob_digests, |
95 | 3 | })) |
96 | 4 | } |
97 | | |
98 | 2 | async fn inner_batch_update_blobs( |
99 | 2 | &self, |
100 | 2 | request: BatchUpdateBlobsRequest, |
101 | 2 | ) -> Result<Response<BatchUpdateBlobsResponse>, Error> { |
102 | 2 | let instance_name = &request.instance_name; |
103 | | |
104 | 2 | let store = self |
105 | 2 | .stores |
106 | 2 | .get(instance_name) |
107 | 2 | .err_tip(|| format!("'instance_name' not configured for '{instance_name}'")0 )?0 |
108 | 2 | .clone(); |
109 | | |
110 | | // If we are a GrpcStore we shortcut here, as this is a special store. |
111 | | // Note: We don't know the digests here, so we try perform a very shallow |
112 | | // check to see if it's a grpc store. |
113 | 2 | if let Some(grpc_store0 ) = store.downcast_ref::<GrpcStore>(None) { Branch (113:16): [True: 0, False: 2]
Branch (113:16): [Folded - Ignored]
|
114 | 0 | return grpc_store.batch_update_blobs(Request::new(request)).await; |
115 | 2 | } |
116 | 2 | |
117 | 2 | let store_ref = &store; |
118 | 2 | let update_futures: FuturesUnordered<_> = request |
119 | 2 | .requests |
120 | 2 | .into_iter() |
121 | 3 | .map(|request| async move { |
122 | 3 | let digest = request |
123 | 3 | .digest |
124 | 3 | .clone() |
125 | 3 | .err_tip(|| "Digest not found in request"0 )?0 ; |
126 | 3 | let request_data = request.data; |
127 | 3 | let digest_info = DigestInfo::try_from(digest.clone())?0 ; |
128 | 3 | let size_bytes = usize::try_from(digest_info.size_bytes()) |
129 | 3 | .err_tip(|| "Digest size_bytes was not convertible to usize"0 )?0 ; |
130 | 0 | error_if!( |
131 | 3 | size_bytes != request_data.len(), Branch (131:21): [True: 0, False: 3]
Branch (131:21): [Folded - Ignored]
|
132 | | "Digest for upload had mismatching sizes, digest said {} data said {}", |
133 | | size_bytes, |
134 | 0 | request_data.len() |
135 | | ); |
136 | 3 | let result = store_ref |
137 | 3 | .update_oneshot(digest_info, request_data) |
138 | 3 | .await |
139 | 3 | .err_tip(|| "Error writing to store"0 ); |
140 | 3 | Ok::<_, Error>(batch_update_blobs_response::Response { |
141 | 3 | digest: Some(digest), |
142 | 3 | status: Some(result.map_or_else(Into::into, |()| GrpcStatus::default())), |
143 | 3 | }) |
144 | 6 | }) |
145 | 2 | .collect(); |
146 | 2 | let responses = update_futures |
147 | 2 | .try_collect::<Vec<batch_update_blobs_response::Response>>() |
148 | 2 | .await?0 ; |
149 | | |
150 | 2 | Ok(Response::new(BatchUpdateBlobsResponse { responses })) |
151 | 2 | } |
152 | | |
153 | 1 | async fn inner_batch_read_blobs( |
154 | 1 | &self, |
155 | 1 | request: BatchReadBlobsRequest, |
156 | 1 | ) -> Result<Response<BatchReadBlobsResponse>, Error> { |
157 | 1 | let instance_name = &request.instance_name; |
158 | | |
159 | 1 | let store = self |
160 | 1 | .stores |
161 | 1 | .get(instance_name) |
162 | 1 | .err_tip(|| format!("'instance_name' not configured for '{instance_name}'")0 )?0 |
163 | 1 | .clone(); |
164 | | |
165 | | // If we are a GrpcStore we shortcut here, as this is a special store. |
166 | | // Note: We don't know the digests here, so we try perform a very shallow |
167 | | // check to see if it's a grpc store. |
168 | 1 | if let Some(grpc_store0 ) = store.downcast_ref::<GrpcStore>(None) { Branch (168:16): [True: 0, False: 1]
Branch (168:16): [Folded - Ignored]
|
169 | 0 | return grpc_store.batch_read_blobs(Request::new(request)).await; |
170 | 1 | } |
171 | 1 | |
172 | 1 | let store_ref = &store; |
173 | 1 | let read_futures: FuturesUnordered<_> = request |
174 | 1 | .digests |
175 | 1 | .into_iter() |
176 | 3 | .map(|digest| async move { |
177 | 3 | let digest_copy = DigestInfo::try_from(digest.clone())?0 ; |
178 | | // TODO(allada) There is a security risk here of someone taking all the memory on the instance. |
179 | 3 | let result = store_ref |
180 | 3 | .get_part_unchunked(digest_copy, 0, None) |
181 | 3 | .await |
182 | 3 | .err_tip(|| "Error reading from store"1 ); |
183 | 3 | let (status, data) = result.map_or_else( |
184 | 3 | |mut e| { |
185 | 1 | if e.code == Code::NotFound { Branch (185:28): [True: 1, False: 0]
Branch (185:28): [Folded - Ignored]
|
186 | 1 | // Trim the error code. Not Found is quite common and we don't want to send a large |
187 | 1 | // error (debug) message for something that is common. We resize to just the last |
188 | 1 | // message as it will be the most relevant. |
189 | 1 | e.messages.resize_with(1, String::new); |
190 | 1 | }0 |
191 | 1 | (e.into(), Bytes::new()) |
192 | 3 | }, |
193 | 3 | |v| (GrpcStatus::default(), v)2 , |
194 | 3 | ); |
195 | 3 | Ok::<_, Error>(batch_read_blobs_response::Response { |
196 | 3 | status: Some(status), |
197 | 3 | digest: Some(digest), |
198 | 3 | compressor: compressor::Value::Identity.into(), |
199 | 3 | data, |
200 | 3 | }) |
201 | 6 | }) |
202 | 1 | .collect(); |
203 | 1 | let responses = read_futures |
204 | 1 | .try_collect::<Vec<batch_read_blobs_response::Response>>() |
205 | 1 | .await?0 ; |
206 | | |
207 | 1 | Ok(Response::new(BatchReadBlobsResponse { responses })) |
208 | 1 | } |
209 | | |
210 | 6 | async fn inner_get_tree( |
211 | 6 | &self, |
212 | 6 | request: GetTreeRequest, |
213 | 6 | ) -> Result<impl Stream<Item = Result<GetTreeResponse, Status>> + Send + 'static, Error> { |
214 | 6 | let instance_name = &request.instance_name; |
215 | | |
216 | 6 | let store = self |
217 | 6 | .stores |
218 | 6 | .get(instance_name) |
219 | 6 | .err_tip(|| format!("'instance_name' not configured for '{instance_name}'")0 )?0 |
220 | 6 | .clone(); |
221 | | |
222 | | // If we are a GrpcStore we shortcut here, as this is a special store. |
223 | | // Note: We don't know the digests here, so we try perform a very shallow |
224 | | // check to see if it's a grpc store. |
225 | 6 | if let Some(grpc_store0 ) = store.downcast_ref::<GrpcStore>(None) { Branch (225:16): [True: 0, False: 6]
Branch (225:16): [Folded - Ignored]
|
226 | 0 | let stream = grpc_store |
227 | 0 | .get_tree(Request::new(request)) |
228 | 0 | .await? |
229 | 0 | .into_inner(); |
230 | 0 | return Ok(stream.left_stream()); |
231 | 6 | } |
232 | 6 | let root_digest: DigestInfo = request |
233 | 6 | .root_digest |
234 | 6 | .err_tip(|| "Expected root_digest to exist in GetTreeRequest"0 )?0 |
235 | 6 | .try_into() |
236 | 6 | .err_tip(|| "In GetTreeRequest::root_digest"0 )?0 ; |
237 | | |
238 | 6 | let mut deque: VecDeque<DigestInfo> = VecDeque::new(); |
239 | 6 | let mut directories: Vec<Directory> = Vec::new(); |
240 | | // `page_token` will return the `{hash_str}-{size_bytes}` of the current request's first directory digest. |
241 | 6 | let page_token_digest = if request.page_token.is_empty() { Branch (241:36): [True: 2, False: 4]
Branch (241:36): [Folded - Ignored]
|
242 | 2 | root_digest |
243 | | } else { |
244 | 4 | let mut page_token_parts = request.page_token.split('-'); |
245 | 4 | DigestInfo::try_new( |
246 | 4 | page_token_parts |
247 | 4 | .next() |
248 | 4 | .err_tip(|| "Failed to parse `hash_str` in `page_token`"0 )?0 , |
249 | 4 | page_token_parts |
250 | 4 | .next() |
251 | 4 | .err_tip(|| "Failed to parse `size_bytes` in `page_token`"0 )?0 |
252 | 4 | .parse::<i64>() |
253 | 4 | .err_tip(|| "Failed to parse `size_bytes` as i64"0 )?0 , |
254 | | ) |
255 | 4 | .err_tip(|| "Failed to parse `page_token` as `Digest` in `GetTreeRequest`"0 )?0 |
256 | | }; |
257 | 6 | let page_size = request.page_size; |
258 | 6 | // If `page_size` is 0, paging is not necessary. |
259 | 6 | let mut page_token_matched = page_size == 0; |
260 | 6 | deque.push_back(root_digest); |
261 | | |
262 | 28 | while !deque.is_empty() { Branch (262:15): [True: 26, False: 2]
Branch (262:15): [Folded - Ignored]
|
263 | 26 | let digest: DigestInfo = deque.pop_front().err_tip(|| "In VecDeque::pop_front"0 )?0 ; |
264 | 26 | let directory = get_and_decode_digest::<Directory>(&store, digest.into()) |
265 | 26 | .await |
266 | 26 | .err_tip(|| "Converting digest to Directory"0 )?0 ; |
267 | 26 | if digest == page_token_digest { Branch (267:16): [True: 6, False: 20]
Branch (267:16): [Folded - Ignored]
|
268 | 6 | page_token_matched = true; |
269 | 20 | } |
270 | 56 | for directory30 in &directory.directories { |
271 | 30 | let digest: DigestInfo = directory |
272 | 30 | .digest |
273 | 30 | .clone() |
274 | 30 | .err_tip(|| "Expected Digest to exist in Directory::directories::digest"0 )?0 |
275 | 30 | .try_into() |
276 | 30 | .err_tip(|| "In Directory::file::digest"0 )?0 ; |
277 | 30 | deque.push_back(digest); |
278 | | } |
279 | 26 | if page_token_matched { Branch (279:16): [True: 20, False: 6]
Branch (279:16): [Folded - Ignored]
|
280 | 20 | directories.push(directory); |
281 | 20 | if directories.len() as i32 == page_size { Branch (281:20): [True: 4, False: 16]
Branch (281:20): [Folded - Ignored]
|
282 | 4 | break; |
283 | 16 | } |
284 | 6 | } |
285 | | } |
286 | | // `next_page_token` will return the `{hash_str}:{size_bytes}` of the next request's first directory digest. |
287 | | // It will be an empty string when it reached the end of the directory tree. |
288 | 6 | let next_page_token: String = if let Some(value3 ) = deque.front() { Branch (288:46): [True: 3, False: 3]
Branch (288:46): [Folded - Ignored]
|
289 | 3 | format!("{value}") |
290 | | } else { |
291 | 3 | String::new() |
292 | | }; |
293 | | |
294 | 6 | Ok(futures::stream::once(async { |
295 | 6 | Ok(GetTreeResponse { |
296 | 6 | directories, |
297 | 6 | next_page_token, |
298 | 6 | }) |
299 | 6 | }) |
300 | 6 | .right_stream()) |
301 | 6 | } |
302 | | } |
303 | | |
304 | | #[tonic::async_trait] |
305 | | impl ContentAddressableStorage for CasServer { |
306 | | type GetTreeStream = GetTreeStream; |
307 | | |
308 | | #[allow(clippy::blocks_in_conditions)] |
309 | | #[instrument( |
310 | | err, |
311 | | ret(level = Level::INFO), |
312 | | level = Level::ERROR, |
313 | | skip_all, |
314 | | fields(request = ?grpc_request.get_ref()) |
315 | | )] |
316 | | async fn find_missing_blobs( |
317 | | &self, |
318 | | grpc_request: Request<FindMissingBlobsRequest>, |
319 | 4 | ) -> Result<Response<FindMissingBlobsResponse>, Status> { |
320 | 4 | let request = grpc_request.into_inner(); |
321 | 4 | let ctx = OriginEventContext::new(|| &request0 ).await; |
322 | 4 | let resp = make_ctx_for_hash_func(request.digest_function) |
323 | 4 | .err_tip(|| "In CasServer::find_missing_blobs"0 )?0 |
324 | | .wrap_async( |
325 | 4 | error_span!("cas_server_find_missing_blobs"), |
326 | 4 | self.inner_find_missing_blobs(request), |
327 | 4 | ) |
328 | 4 | .await |
329 | 4 | .err_tip(|| "Failed on find_missing_blobs() command"1 ) |
330 | 4 | .map_err(Into::into); |
331 | 4 | ctx.emit(|| &resp0 ).await; |
332 | 4 | resp |
333 | 8 | } |
334 | | |
335 | | #[allow(clippy::blocks_in_conditions)] |
336 | | #[instrument( |
337 | | err, |
338 | | ret(level = Level::INFO), |
339 | | level = Level::ERROR, |
340 | | skip_all, |
341 | | fields(request = ?grpc_request.get_ref()) |
342 | | )] |
343 | | async fn batch_update_blobs( |
344 | | &self, |
345 | | grpc_request: Request<BatchUpdateBlobsRequest>, |
346 | 2 | ) -> Result<Response<BatchUpdateBlobsResponse>, Status> { |
347 | 2 | let request = grpc_request.into_inner(); |
348 | 2 | let ctx = OriginEventContext::new(|| &request0 ).await; |
349 | 2 | let resp = make_ctx_for_hash_func(request.digest_function) |
350 | 2 | .err_tip(|| "In CasServer::batch_update_blobs"0 )?0 |
351 | | .wrap_async( |
352 | 2 | error_span!("cas_server_batch_update_blobs"), |
353 | 2 | self.inner_batch_update_blobs(request), |
354 | 2 | ) |
355 | 2 | .await |
356 | 2 | .err_tip(|| "Failed on batch_update_blobs() command"0 ) |
357 | 2 | .map_err(Into::into); |
358 | 2 | ctx.emit(|| &resp0 ).await; |
359 | 2 | resp |
360 | 4 | } |
361 | | |
362 | | #[allow(clippy::blocks_in_conditions)] |
363 | | #[instrument( |
364 | | err, |
365 | | ret(level = Level::INFO), |
366 | | level = Level::ERROR, |
367 | | skip_all, |
368 | | fields(request = ?grpc_request.get_ref()) |
369 | | )] |
370 | | async fn batch_read_blobs( |
371 | | &self, |
372 | | grpc_request: Request<BatchReadBlobsRequest>, |
373 | 1 | ) -> Result<Response<BatchReadBlobsResponse>, Status> { |
374 | 1 | let request = grpc_request.into_inner(); |
375 | 1 | let ctx = OriginEventContext::new(|| &request0 ).await; |
376 | 1 | let resp = make_ctx_for_hash_func(request.digest_function) |
377 | 1 | .err_tip(|| "In CasServer::batch_read_blobs"0 )?0 |
378 | | .wrap_async( |
379 | 1 | error_span!("cas_server_batch_read_blobs"), |
380 | 1 | self.inner_batch_read_blobs(request), |
381 | 1 | ) |
382 | 1 | .await |
383 | 1 | .err_tip(|| "Failed on batch_read_blobs() command"0 ) |
384 | 1 | .map_err(Into::into); |
385 | 1 | ctx.emit(|| &resp0 ).await; |
386 | 1 | resp |
387 | 2 | } |
388 | | |
389 | | #[allow(clippy::blocks_in_conditions)] |
390 | | #[instrument( |
391 | | err, |
392 | | level = Level::ERROR, |
393 | | skip_all, |
394 | | fields(request = ?grpc_request.get_ref()) |
395 | | )] |
396 | | async fn get_tree( |
397 | | &self, |
398 | | grpc_request: Request<GetTreeRequest>, |
399 | 6 | ) -> Result<Response<Self::GetTreeStream>, Status> { |
400 | 6 | let request = grpc_request.into_inner(); |
401 | 6 | let ctx = OriginEventContext::new(|| &request0 ).await; |
402 | 6 | let resp = make_ctx_for_hash_func(request.digest_function) |
403 | 6 | .err_tip(|| "In CasServer::get_tree"0 )?0 |
404 | | .wrap_async( |
405 | 6 | error_span!("cas_server_get_tree"), |
406 | 6 | self.inner_get_tree(request), |
407 | 6 | ) |
408 | 6 | .await |
409 | 6 | .err_tip(|| "Failed on get_tree() command"0 ) |
410 | 6 | .map(|stream| -> Response<Self::GetTreeStream> { |
411 | 6 | Response::new(ctx.wrap_stream(stream)) |
412 | 6 | }) |
413 | 6 | .map_err(Into::into); |
414 | 6 | if resp.is_ok() { Branch (414:12): [True: 6, False: 0]
Branch (414:12): [Folded - Ignored]
|
415 | 6 | event!(Level::DEBUG, return = "Ok(<stream>)"); |
416 | 0 | } |
417 | 6 | ctx.emit(|| &resp0 ).await; |
418 | 6 | resp |
419 | 12 | } |
420 | | } |