Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/cas_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::{HashMap, VecDeque};
16
use std::convert::Into;
17
use std::pin::Pin;
18
19
use bytes::Bytes;
20
use futures::stream::{FuturesUnordered, Stream};
21
use futures::{StreamExt, TryStreamExt};
22
use nativelink_config::cas_server::{CasStoreConfig, InstanceName};
23
use nativelink_error::{error_if, make_input_err, Code, Error, ResultExt};
24
use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_server::{
25
    ContentAddressableStorage, ContentAddressableStorageServer as Server,
26
};
27
use nativelink_proto::build::bazel::remote::execution::v2::{
28
    batch_read_blobs_response, batch_update_blobs_response, compressor, BatchReadBlobsRequest,
29
    BatchReadBlobsResponse, BatchUpdateBlobsRequest, BatchUpdateBlobsResponse, Directory,
30
    FindMissingBlobsRequest, FindMissingBlobsResponse, GetTreeRequest, GetTreeResponse,
31
};
32
use nativelink_proto::google::rpc::Status as GrpcStatus;
33
use nativelink_store::ac_utils::get_and_decode_digest;
34
use nativelink_store::grpc_store::GrpcStore;
35
use nativelink_store::store_manager::StoreManager;
36
use nativelink_util::common::DigestInfo;
37
use nativelink_util::digest_hasher::make_ctx_for_hash_func;
38
use nativelink_util::origin_event::OriginEventContext;
39
use nativelink_util::store_trait::{Store, StoreLike};
40
use tonic::{Request, Response, Status};
41
use tracing::{error_span, event, instrument, Level};
42
43
pub struct CasServer {
44
    stores: HashMap<String, Store>,
45
}
46
47
type GetTreeStream = Pin<Box<dyn Stream<Item = Result<GetTreeResponse, Status>> + Send + 'static>>;
48
49
impl CasServer {
50
8
    pub fn new(
51
8
        config: &HashMap<InstanceName, CasStoreConfig>,
52
8
        store_manager: &StoreManager,
53
8
    ) -> Result<Self, Error> {
54
8
        let mut stores = HashMap::with_capacity(config.len());
55
16
        for (
instance_name, cas_cfg8
) in config {
56
8
            let store = store_manager.get_store(&cas_cfg.cas_store).ok_or_else(|| {
57
0
                make_input_err!("'cas_store': '{}' does not exist", cas_cfg.cas_store)
58
8
            })
?0
;
59
8
            stores.insert(instance_name.to_string(), store);
60
        }
61
8
        Ok(CasServer { stores })
62
8
    }
63
64
0
    pub fn into_service(self) -> Server<CasServer> {
65
0
        Server::new(self)
66
0
    }
67
68
4
    async fn inner_find_missing_blobs(
69
4
        &self,
70
4
        request: FindMissingBlobsRequest,
71
4
    ) -> Result<Response<FindMissingBlobsResponse>, Error> {
72
4
        let instance_name = &request.instance_name;
73
4
        let store = self
74
4
            .stores
75
4
            .get(instance_name)
76
4
            .err_tip(|| 
format!("'instance_name' not configured for '{instance_name}'")0
)
?0
77
4
            .clone();
78
4
79
4
        let mut requested_blobs = Vec::with_capacity(request.blob_digests.len());
80
10
        for 
digest7
in &request.blob_digests {
81
7
            requested_blobs.push(DigestInfo::try_from(digest.clone())
?1
.
into()6
);
82
        }
83
3
        let sizes = store
84
3
            .has_many(&requested_blobs)
85
3
            .await
86
3
            .err_tip(|| 
"In find_missing_blobs"0
)
?0
;
87
3
        let missing_blob_digests = sizes
88
3
            .into_iter()
89
3
            .zip(request.blob_digests)
90
5
            .filter_map(|(maybe_size, digest)| maybe_size.map_or_else(|| 
Some(digest)2
, |_|
None3
))
91
3
            .collect();
92
3
93
3
        Ok(Response::new(FindMissingBlobsResponse {
94
3
            missing_blob_digests,
95
3
        }))
96
4
    }
97
98
2
    async fn inner_batch_update_blobs(
99
2
        &self,
100
2
        request: BatchUpdateBlobsRequest,
101
2
    ) -> Result<Response<BatchUpdateBlobsResponse>, Error> {
102
2
        let instance_name = &request.instance_name;
103
104
2
        let store = self
105
2
            .stores
106
2
            .get(instance_name)
107
2
            .err_tip(|| 
format!("'instance_name' not configured for '{instance_name}'")0
)
?0
108
2
            .clone();
109
110
        // If we are a GrpcStore we shortcut here, as this is a special store.
111
        // Note: We don't know the digests here, so we try perform a very shallow
112
        // check to see if it's a grpc store.
113
2
        if let Some(
grpc_store0
) = store.downcast_ref::<GrpcStore>(None) {
  Branch (113:16): [True: 0, False: 2]
  Branch (113:16): [Folded - Ignored]
114
0
            return grpc_store.batch_update_blobs(Request::new(request)).await;
115
2
        }
116
2
117
2
        let store_ref = &store;
118
2
        let update_futures: FuturesUnordered<_> = request
119
2
            .requests
120
2
            .into_iter()
121
3
            .map(|request| async move {
122
3
                let digest = request
123
3
                    .digest
124
3
                    .clone()
125
3
                    .err_tip(|| 
"Digest not found in request"0
)
?0
;
126
3
                let request_data = request.data;
127
3
                let digest_info = DigestInfo::try_from(digest.clone())
?0
;
128
3
                let size_bytes = usize::try_from(digest_info.size_bytes())
129
3
                    .err_tip(|| 
"Digest size_bytes was not convertible to usize"0
)
?0
;
130
0
                error_if!(
131
3
                    size_bytes != request_data.len(),
  Branch (131:21): [True: 0, False: 3]
  Branch (131:21): [Folded - Ignored]
132
                    "Digest for upload had mismatching sizes, digest said {} data  said {}",
133
                    size_bytes,
134
0
                    request_data.len()
135
                );
136
3
                let result = store_ref
137
3
                    .update_oneshot(digest_info, request_data)
138
3
                    .await
139
3
                    .err_tip(|| 
"Error writing to store"0
);
140
3
                Ok::<_, Error>(batch_update_blobs_response::Response {
141
3
                    digest: Some(digest),
142
3
                    status: Some(result.map_or_else(Into::into, |()| GrpcStatus::default())),
143
3
                })
144
6
            })
145
2
            .collect();
146
2
        let responses = update_futures
147
2
            .try_collect::<Vec<batch_update_blobs_response::Response>>()
148
2
            .await
?0
;
149
150
2
        Ok(Response::new(BatchUpdateBlobsResponse { responses }))
151
2
    }
152
153
1
    async fn inner_batch_read_blobs(
154
1
        &self,
155
1
        request: BatchReadBlobsRequest,
156
1
    ) -> Result<Response<BatchReadBlobsResponse>, Error> {
157
1
        let instance_name = &request.instance_name;
158
159
1
        let store = self
160
1
            .stores
161
1
            .get(instance_name)
162
1
            .err_tip(|| 
format!("'instance_name' not configured for '{instance_name}'")0
)
?0
163
1
            .clone();
164
165
        // If we are a GrpcStore we shortcut here, as this is a special store.
166
        // Note: We don't know the digests here, so we try perform a very shallow
167
        // check to see if it's a grpc store.
168
1
        if let Some(
grpc_store0
) = store.downcast_ref::<GrpcStore>(None) {
  Branch (168:16): [True: 0, False: 1]
  Branch (168:16): [Folded - Ignored]
169
0
            return grpc_store.batch_read_blobs(Request::new(request)).await;
170
1
        }
171
1
172
1
        let store_ref = &store;
173
1
        let read_futures: FuturesUnordered<_> = request
174
1
            .digests
175
1
            .into_iter()
176
3
            .map(|digest| async move {
177
3
                let digest_copy = DigestInfo::try_from(digest.clone())
?0
;
178
                // TODO(allada) There is a security risk here of someone taking all the memory on the instance.
179
3
                let result = store_ref
180
3
                    .get_part_unchunked(digest_copy, 0, None)
181
3
                    .await
182
3
                    .err_tip(|| 
"Error reading from store"1
);
183
3
                let (status, data) = result.map_or_else(
184
3
                    |mut e| {
185
1
                        if e.code == Code::NotFound {
  Branch (185:28): [True: 1, False: 0]
  Branch (185:28): [Folded - Ignored]
186
1
                            // Trim the error code. Not Found is quite common and we don't want to send a large
187
1
                            // error (debug) message for something that is common. We resize to just the last
188
1
                            // message as it will be the most relevant.
189
1
                            e.messages.resize_with(1, String::new);
190
1
                        
}0
191
1
                        (e.into(), Bytes::new())
192
3
                    },
193
3
                    |v| 
(GrpcStatus::default(), v)2
,
194
3
                );
195
3
                Ok::<_, Error>(batch_read_blobs_response::Response {
196
3
                    status: Some(status),
197
3
                    digest: Some(digest),
198
3
                    compressor: compressor::Value::Identity.into(),
199
3
                    data,
200
3
                })
201
6
            })
202
1
            .collect();
203
1
        let responses = read_futures
204
1
            .try_collect::<Vec<batch_read_blobs_response::Response>>()
205
1
            .await
?0
;
206
207
1
        Ok(Response::new(BatchReadBlobsResponse { responses }))
208
1
    }
209
210
6
    async fn inner_get_tree(
211
6
        &self,
212
6
        request: GetTreeRequest,
213
6
    ) -> Result<impl Stream<Item = Result<GetTreeResponse, Status>> + Send + 'static, Error> {
214
6
        let instance_name = &request.instance_name;
215
216
6
        let store = self
217
6
            .stores
218
6
            .get(instance_name)
219
6
            .err_tip(|| 
format!("'instance_name' not configured for '{instance_name}'")0
)
?0
220
6
            .clone();
221
222
        // If we are a GrpcStore we shortcut here, as this is a special store.
223
        // Note: We don't know the digests here, so we try perform a very shallow
224
        // check to see if it's a grpc store.
225
6
        if let Some(
grpc_store0
) = store.downcast_ref::<GrpcStore>(None) {
  Branch (225:16): [True: 0, False: 6]
  Branch (225:16): [Folded - Ignored]
226
0
            let stream = grpc_store
227
0
                .get_tree(Request::new(request))
228
0
                .await?
229
0
                .into_inner();
230
0
            return Ok(stream.left_stream());
231
6
        }
232
6
        let root_digest: DigestInfo = request
233
6
            .root_digest
234
6
            .err_tip(|| 
"Expected root_digest to exist in GetTreeRequest"0
)
?0
235
6
            .try_into()
236
6
            .err_tip(|| 
"In GetTreeRequest::root_digest"0
)
?0
;
237
238
6
        let mut deque: VecDeque<DigestInfo> = VecDeque::new();
239
6
        let mut directories: Vec<Directory> = Vec::new();
240
        // `page_token` will return the `{hash_str}-{size_bytes}` of the current request's first directory digest.
241
6
        let page_token_digest = if request.page_token.is_empty() {
  Branch (241:36): [True: 2, False: 4]
  Branch (241:36): [Folded - Ignored]
242
2
            root_digest
243
        } else {
244
4
            let mut page_token_parts = request.page_token.split('-');
245
4
            DigestInfo::try_new(
246
4
                page_token_parts
247
4
                    .next()
248
4
                    .err_tip(|| 
"Failed to parse `hash_str` in `page_token`"0
)
?0
,
249
4
                page_token_parts
250
4
                    .next()
251
4
                    .err_tip(|| 
"Failed to parse `size_bytes` in `page_token`"0
)
?0
252
4
                    .parse::<i64>()
253
4
                    .err_tip(|| 
"Failed to parse `size_bytes` as i64"0
)
?0
,
254
            )
255
4
            .err_tip(|| 
"Failed to parse `page_token` as `Digest` in `GetTreeRequest`"0
)
?0
256
        };
257
6
        let page_size = request.page_size;
258
6
        // If `page_size` is 0, paging is not necessary.
259
6
        let mut page_token_matched = page_size == 0;
260
6
        deque.push_back(root_digest);
261
262
28
        while !deque.is_empty() {
  Branch (262:15): [True: 26, False: 2]
  Branch (262:15): [Folded - Ignored]
263
26
            let digest: DigestInfo = deque.pop_front().err_tip(|| 
"In VecDeque::pop_front"0
)
?0
;
264
26
            let directory = get_and_decode_digest::<Directory>(&store, digest.into())
265
26
                .await
266
26
                .err_tip(|| 
"Converting digest to Directory"0
)
?0
;
267
26
            if digest == page_token_digest {
  Branch (267:16): [True: 6, False: 20]
  Branch (267:16): [Folded - Ignored]
268
6
                page_token_matched = true;
269
20
            }
270
56
            for 
directory30
in &directory.directories {
271
30
                let digest: DigestInfo = directory
272
30
                    .digest
273
30
                    .clone()
274
30
                    .err_tip(|| 
"Expected Digest to exist in Directory::directories::digest"0
)
?0
275
30
                    .try_into()
276
30
                    .err_tip(|| 
"In Directory::file::digest"0
)
?0
;
277
30
                deque.push_back(digest);
278
            }
279
26
            if page_token_matched {
  Branch (279:16): [True: 20, False: 6]
  Branch (279:16): [Folded - Ignored]
280
20
                directories.push(directory);
281
20
                if directories.len() as i32 == page_size {
  Branch (281:20): [True: 4, False: 16]
  Branch (281:20): [Folded - Ignored]
282
4
                    break;
283
16
                }
284
6
            }
285
        }
286
        // `next_page_token` will return the `{hash_str}:{size_bytes}` of the next request's first directory digest.
287
        // It will be an empty string when it reached the end of the directory tree.
288
6
        let next_page_token: String = if let Some(
value3
) = deque.front() {
  Branch (288:46): [True: 3, False: 3]
  Branch (288:46): [Folded - Ignored]
289
3
            format!("{value}")
290
        } else {
291
3
            String::new()
292
        };
293
294
6
        Ok(futures::stream::once(async {
295
6
            Ok(GetTreeResponse {
296
6
                directories,
297
6
                next_page_token,
298
6
            })
299
6
        })
300
6
        .right_stream())
301
6
    }
302
}
303
304
#[tonic::async_trait]
305
impl ContentAddressableStorage for CasServer {
306
    type GetTreeStream = GetTreeStream;
307
308
    #[allow(clippy::blocks_in_conditions)]
309
    #[instrument(
310
        err,
311
        ret(level = Level::INFO),
312
        level = Level::ERROR,
313
        skip_all,
314
        fields(request = ?grpc_request.get_ref())
315
    )]
316
    async fn find_missing_blobs(
317
        &self,
318
        grpc_request: Request<FindMissingBlobsRequest>,
319
4
    ) -> Result<Response<FindMissingBlobsResponse>, Status> {
320
4
        let request = grpc_request.into_inner();
321
4
        let ctx = OriginEventContext::new(|| 
&request0
).await;
322
4
        let resp = make_ctx_for_hash_func(request.digest_function)
323
4
            .err_tip(|| 
"In CasServer::find_missing_blobs"0
)
?0
324
            .wrap_async(
325
4
                error_span!("cas_server_find_missing_blobs"),
326
4
                self.inner_find_missing_blobs(request),
327
4
            )
328
4
            .await
329
4
            .err_tip(|| 
"Failed on find_missing_blobs() command"1
)
330
4
            .map_err(Into::into);
331
4
        ctx.emit(|| 
&resp0
).await;
332
4
        resp
333
8
    }
334
335
    #[allow(clippy::blocks_in_conditions)]
336
    #[instrument(
337
        err,
338
        ret(level = Level::INFO),
339
        level = Level::ERROR,
340
        skip_all,
341
        fields(request = ?grpc_request.get_ref())
342
    )]
343
    async fn batch_update_blobs(
344
        &self,
345
        grpc_request: Request<BatchUpdateBlobsRequest>,
346
2
    ) -> Result<Response<BatchUpdateBlobsResponse>, Status> {
347
2
        let request = grpc_request.into_inner();
348
2
        let ctx = OriginEventContext::new(|| 
&request0
).await;
349
2
        let resp = make_ctx_for_hash_func(request.digest_function)
350
2
            .err_tip(|| 
"In CasServer::batch_update_blobs"0
)
?0
351
            .wrap_async(
352
2
                error_span!("cas_server_batch_update_blobs"),
353
2
                self.inner_batch_update_blobs(request),
354
2
            )
355
2
            .await
356
2
            .err_tip(|| 
"Failed on batch_update_blobs() command"0
)
357
2
            .map_err(Into::into);
358
2
        ctx.emit(|| 
&resp0
).await;
359
2
        resp
360
4
    }
361
362
    #[allow(clippy::blocks_in_conditions)]
363
    #[instrument(
364
        err,
365
        ret(level = Level::INFO),
366
        level = Level::ERROR,
367
        skip_all,
368
        fields(request = ?grpc_request.get_ref())
369
    )]
370
    async fn batch_read_blobs(
371
        &self,
372
        grpc_request: Request<BatchReadBlobsRequest>,
373
1
    ) -> Result<Response<BatchReadBlobsResponse>, Status> {
374
1
        let request = grpc_request.into_inner();
375
1
        let ctx = OriginEventContext::new(|| 
&request0
).await;
376
1
        let resp = make_ctx_for_hash_func(request.digest_function)
377
1
            .err_tip(|| 
"In CasServer::batch_read_blobs"0
)
?0
378
            .wrap_async(
379
1
                error_span!("cas_server_batch_read_blobs"),
380
1
                self.inner_batch_read_blobs(request),
381
1
            )
382
1
            .await
383
1
            .err_tip(|| 
"Failed on batch_read_blobs() command"0
)
384
1
            .map_err(Into::into);
385
1
        ctx.emit(|| 
&resp0
).await;
386
1
        resp
387
2
    }
388
389
    #[allow(clippy::blocks_in_conditions)]
390
    #[instrument(
391
        err,
392
        level = Level::ERROR,
393
        skip_all,
394
        fields(request = ?grpc_request.get_ref())
395
    )]
396
    async fn get_tree(
397
        &self,
398
        grpc_request: Request<GetTreeRequest>,
399
6
    ) -> Result<Response<Self::GetTreeStream>, Status> {
400
6
        let request = grpc_request.into_inner();
401
6
        let ctx = OriginEventContext::new(|| 
&request0
).await;
402
6
        let resp = make_ctx_for_hash_func(request.digest_function)
403
6
            .err_tip(|| 
"In CasServer::get_tree"0
)
?0
404
            .wrap_async(
405
6
                error_span!("cas_server_get_tree"),
406
6
                self.inner_get_tree(request),
407
6
            )
408
6
            .await
409
6
            .err_tip(|| 
"Failed on get_tree() command"0
)
410
6
            .map(|stream| -> Response<Self::GetTreeStream> {
411
6
                Response::new(ctx.wrap_stream(stream))
412
6
            })
413
6
            .map_err(Into::into);
414
6
        if resp.is_ok() {
  Branch (414:12): [True: 6, False: 0]
  Branch (414:12): [Folded - Ignored]
415
6
            event!(Level::DEBUG, return = "Ok(<stream>)");
416
0
        }
417
6
        ctx.emit(|| 
&resp0
).await;
418
6
        resp
419
12
    }
420
}