Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/ac_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
use std::convert::Into;
17
use std::fmt::Debug;
18
19
use bytes::BytesMut;
20
use nativelink_config::cas_server::{AcStoreConfig, InstanceName};
21
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
22
use nativelink_proto::build::bazel::remote::execution::v2::action_cache_server::{
23
    ActionCache, ActionCacheServer as Server,
24
};
25
use nativelink_proto::build::bazel::remote::execution::v2::{
26
    ActionResult, GetActionResultRequest, UpdateActionResultRequest,
27
};
28
use nativelink_store::ac_utils::{get_and_decode_digest, ESTIMATED_DIGEST_SIZE};
29
use nativelink_store::grpc_store::GrpcStore;
30
use nativelink_store::store_manager::StoreManager;
31
use nativelink_util::common::DigestInfo;
32
use nativelink_util::digest_hasher::make_ctx_for_hash_func;
33
use nativelink_util::origin_event::OriginEventContext;
34
use nativelink_util::store_trait::{Store, StoreLike};
35
use prost::Message;
36
use tonic::{Request, Response, Status};
37
use tracing::{error_span, event, instrument, Level};
38
39
#[derive(Clone)]
40
pub struct AcStoreInfo {
41
    store: Store,
42
    read_only: bool,
43
}
44
45
pub struct AcServer {
46
    stores: HashMap<String, AcStoreInfo>,
47
}
48
49
impl Debug for AcServer {
50
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51
0
        f.debug_struct("AcServer").finish()
52
0
    }
53
}
54
55
impl AcServer {
56
4
    pub fn new(
57
4
        config: &HashMap<InstanceName, AcStoreConfig>,
58
4
        store_manager: &StoreManager,
59
4
    ) -> Result<Self, Error> {
60
4
        let mut stores = HashMap::with_capacity(config.len());
61
8
        for (
instance_name, ac_cfg4
) in config {
62
4
            let store = store_manager.get_store(&ac_cfg.ac_store).ok_or_else(|| {
63
0
                make_input_err!("'ac_store': '{}' does not exist", ac_cfg.ac_store)
64
4
            })
?0
;
65
4
            stores.insert(
66
4
                instance_name.to_string(),
67
4
                AcStoreInfo {
68
4
                    store,
69
4
                    read_only: ac_cfg.read_only,
70
4
                },
71
4
            );
72
        }
73
4
        Ok(AcServer {
74
4
            stores: stores.clone(),
75
4
        })
76
4
    }
77
78
0
    pub fn into_service(self) -> Server<AcServer> {
79
0
        Server::new(self)
80
0
    }
81
82
3
    async fn inner_get_action_result(
83
3
        &self,
84
3
        request: GetActionResultRequest,
85
3
    ) -> Result<Response<ActionResult>, Error> {
86
3
        let instance_name = &request.instance_name;
87
3
        let store_info = self
88
3
            .stores
89
3
            .get(instance_name)
90
3
            .err_tip(|| 
format!("'instance_name' not configured for '{instance_name}'")0
)
?0
;
91
92
        // TODO(blaise.bruer) We should write a test for these errors.
93
3
        let digest: DigestInfo = request
94
3
            .action_digest
95
3
            .clone()
96
3
            .err_tip(|| 
"Action digest was not set in message"0
)
?0
97
3
            .try_into()
?0
;
98
99
        // If we are a GrpcStore we shortcut here, as this is a special store.
100
3
        if let Some(
grpc_store0
) = store_info
  Branch (100:16): [True: 0, False: 3]
  Branch (100:16): [Folded - Ignored]
101
3
            .store
102
3
            .downcast_ref::<GrpcStore>(Some(digest.into()))
103
        {
104
0
            return grpc_store.get_action_result(Request::new(request)).await;
105
3
        }
106
107
3
        let res = get_and_decode_digest::<ActionResult>(&store_info.store, digest.into()).await;
108
3
        match res {
109
1
            Ok(action_result) => Ok(Response::new(action_result)),
110
2
            Err(mut e) => {
111
2
                if e.code == Code::NotFound {
  Branch (111:20): [True: 2, False: 0]
  Branch (111:20): [Folded - Ignored]
112
2
                    // `get_action_result` is frequent to get NotFound errors, so remove all
113
2
                    // messages to save space.
114
2
                    e.messages.clear();
115
2
                
}0
116
2
                Err(e)
117
            }
118
        }
119
3
    }
120
121
1
    async fn inner_update_action_result(
122
1
        &self,
123
1
        request: UpdateActionResultRequest,
124
1
    ) -> Result<Response<ActionResult>, Error> {
125
1
        let instance_name = &request.instance_name;
126
1
        let store_info = self
127
1
            .stores
128
1
            .get(instance_name)
129
1
            .err_tip(|| 
format!("'instance_name' not configured for '{instance_name}'")0
)
?0
;
130
131
1
        if store_info.read_only {
  Branch (131:12): [True: 0, False: 1]
  Branch (131:12): [Folded - Ignored]
132
0
            return Err(make_err!(
133
0
                Code::PermissionDenied,
134
0
                "The store '{instance_name}' is read only on this endpoint",
135
0
            ));
136
1
        }
137
138
1
        let digest: DigestInfo = request
139
1
            .action_digest
140
1
            .clone()
141
1
            .err_tip(|| 
"Action digest was not set in message"0
)
?0
142
1
            .try_into()
?0
;
143
144
        // If we are a GrpcStore we shortcut here, as this is a special store.
145
1
        if let Some(
grpc_store0
) = store_info
  Branch (145:16): [True: 0, False: 1]
  Branch (145:16): [Folded - Ignored]
146
1
            .store
147
1
            .downcast_ref::<GrpcStore>(Some(digest.into()))
148
        {
149
0
            return grpc_store.update_action_result(Request::new(request)).await;
150
1
        }
151
152
1
        let action_result = request
153
1
            .action_result
154
1
            .err_tip(|| 
"Action result was not set in message"0
)
?0
;
155
156
1
        let mut store_data = BytesMut::with_capacity(ESTIMATED_DIGEST_SIZE);
157
1
        action_result
158
1
            .encode(&mut store_data)
159
1
            .err_tip(|| 
"Provided ActionResult could not be serialized"0
)
?0
;
160
161
1
        store_info
162
1
            .store
163
1
            .update_oneshot(digest, store_data.freeze())
164
1
            .await
165
1
            .err_tip(|| 
"Failed to update in action cache"0
)
?0
;
166
1
        Ok(Response::new(action_result))
167
1
    }
168
}
169
170
#[tonic::async_trait]
171
impl ActionCache for AcServer {
172
    #[allow(clippy::blocks_in_conditions)]
173
    #[instrument(
174
        ret(level = Level::INFO),
175
        level = Level::ERROR,
176
        skip_all,
177
        fields(request = ?grpc_request.get_ref())
178
    )]
179
    async fn get_action_result(
180
        &self,
181
        grpc_request: Request<GetActionResultRequest>,
182
3
    ) -> Result<Response<ActionResult>, Status> {
183
3
        let request = grpc_request.into_inner();
184
3
        let ctx = OriginEventContext::new(|| 
&request0
).await;
185
186
3
        let resp = make_ctx_for_hash_func(request.digest_function)
187
3
            .err_tip(|| 
"In AcServer::get_action_result"0
)
?0
188
            .wrap_async(
189
3
                error_span!("ac_server_get_action_result"),
190
3
                self.inner_get_action_result(request),
191
3
            )
192
3
            .await;
193
194
3
        if resp.is_err() && 
resp.as_ref().err().unwrap().code != Code::NotFound2
{
  Branch (194:12): [True: 2, False: 1]
  Branch (194:29): [True: 0, False: 2]
  Branch (194:12): [Folded - Ignored]
  Branch (194:29): [Folded - Ignored]
195
0
            event!(Level::ERROR, return = ?resp);
196
3
        }
197
3
        let resp = resp.map_err(Into::into);
198
3
        ctx.emit(|| 
&resp0
).await;
199
3
        resp
200
6
    }
201
202
    #[allow(clippy::blocks_in_conditions)]
203
    #[instrument(
204
        err,
205
        ret(level = Level::INFO),
206
        level = Level::ERROR,
207
        skip_all,
208
        fields(request = ?grpc_request.get_ref())
209
    )]
210
    async fn update_action_result(
211
        &self,
212
        grpc_request: Request<UpdateActionResultRequest>,
213
1
    ) -> Result<Response<ActionResult>, Status> {
214
1
        let request = grpc_request.into_inner();
215
1
        let ctx = OriginEventContext::new(|| 
&request0
).await;
216
1
        let resp = make_ctx_for_hash_func(request.digest_function)
217
1
            .err_tip(|| 
"In AcServer::update_action_result"0
)
?0
218
            .wrap_async(
219
1
                error_span!("ac_server_update_action_result"),
220
1
                self.inner_update_action_result(request),
221
1
            )
222
1
            .await
223
1
            .map_err(Into::into);
224
1
        ctx.emit(|| 
&resp0
).await;
225
1
        resp
226
2
    }
227
}