Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/execution_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
use std::convert::Into;
17
use std::fmt;
18
use std::pin::Pin;
19
use std::sync::Arc;
20
use std::time::{Duration, SystemTime, UNIX_EPOCH};
21
22
use futures::stream::unfold;
23
use futures::{Stream, StreamExt};
24
use nativelink_config::cas_server::{ExecutionConfig, InstanceName};
25
use nativelink_error::{make_input_err, Error, ResultExt};
26
use nativelink_proto::build::bazel::remote::execution::v2::execution_server::{
27
    Execution, ExecutionServer as Server,
28
};
29
use nativelink_proto::build::bazel::remote::execution::v2::{
30
    Action, Command, ExecuteRequest, WaitExecutionRequest,
31
};
32
use nativelink_proto::google::longrunning::Operation;
33
use nativelink_store::ac_utils::get_and_decode_digest;
34
use nativelink_store::store_manager::StoreManager;
35
use nativelink_util::action_messages::{
36
    ActionInfo, ActionUniqueKey, ActionUniqueQualifier, OperationId, DEFAULT_EXECUTION_PRIORITY,
37
};
38
use nativelink_util::common::DigestInfo;
39
use nativelink_util::digest_hasher::{make_ctx_for_hash_func, DigestHasherFunc};
40
use nativelink_util::operation_state_manager::{
41
    ActionStateResult, ClientStateManager, OperationFilter,
42
};
43
use nativelink_util::origin_event::OriginEventContext;
44
use nativelink_util::store_trait::Store;
45
use tonic::{Request, Response, Status};
46
use tracing::{error_span, event, instrument, Level};
47
48
type InstanceInfoName = String;
49
50
struct NativelinkOperationId {
51
    instance_name: InstanceInfoName,
52
    client_operation_id: OperationId,
53
}
54
55
impl NativelinkOperationId {
56
0
    fn new(instance_name: InstanceInfoName, client_operation_id: OperationId) -> Self {
57
0
        Self {
58
0
            instance_name,
59
0
            client_operation_id,
60
0
        }
61
0
    }
62
63
0
    fn from_name(name: &str) -> Result<Self, Error> {
64
0
        let (instance_name, name) = name
65
0
            .split_once('/')
66
0
            .err_tip(|| "Expected instance_name and name to be separated by '/'")?;
67
0
        Ok(NativelinkOperationId::new(
68
0
            instance_name.to_string(),
69
0
            OperationId::from(name),
70
0
        ))
71
0
    }
72
}
73
74
impl fmt::Display for NativelinkOperationId {
75
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76
0
        write!(f, "{}/{}", self.instance_name, self.client_operation_id)
77
0
    }
78
}
79
80
struct InstanceInfo {
81
    scheduler: Arc<dyn ClientStateManager>,
82
    cas_store: Store,
83
}
84
85
impl InstanceInfo {
86
0
    async fn build_action_info(
87
0
        &self,
88
0
        instance_name: String,
89
0
        action_digest: DigestInfo,
90
0
        action: Action,
91
0
        priority: i32,
92
0
        skip_cache_lookup: bool,
93
0
        digest_function: DigestHasherFunc,
94
0
    ) -> Result<ActionInfo, Error> {
95
0
        let command_digest = DigestInfo::try_from(
96
0
            action
97
0
                .command_digest
98
0
                .clone()
99
0
                .err_tip(|| "Expected command_digest to exist")?,
100
        )
101
0
        .err_tip(|| "Could not decode command digest")?;
102
103
0
        let input_root_digest = DigestInfo::try_from(
104
0
            action
105
0
                .clone()
106
0
                .input_root_digest
107
0
                .err_tip(|| "Expected input_digest_root")?,
108
0
        )?;
109
0
        let timeout = action.timeout.map_or(Duration::MAX, |v| {
110
0
            Duration::new(v.seconds as u64, v.nanos as u32)
111
0
        });
112
0
113
0
        let mut platform_properties = HashMap::new();
114
0
        if let Some(platform) = action.platform {
  Branch (114:16): [True: 0, False: 0]
  Branch (114:16): [Folded - Ignored]
115
0
            for property in platform.properties {
116
0
                platform_properties.insert(property.name, property.value);
117
0
            }
118
0
        }
119
120
        // Goma puts the properties in the Command.
121
0
        if platform_properties.is_empty() {
  Branch (121:12): [True: 0, False: 0]
  Branch (121:12): [Folded - Ignored]
122
0
            let command =
123
0
                get_and_decode_digest::<Command>(&self.cas_store, command_digest.into()).await?;
124
0
            if let Some(platform) = command.platform {
  Branch (124:20): [True: 0, False: 0]
  Branch (124:20): [Folded - Ignored]
125
0
                for property in platform.properties {
126
0
                    platform_properties.insert(property.name, property.value);
127
0
                }
128
0
            }
129
0
        }
130
131
0
        let action_key = ActionUniqueKey {
132
0
            instance_name,
133
0
            digest_function,
134
0
            digest: action_digest,
135
0
        };
136
0
        let unique_qualifier = if skip_cache_lookup {
  Branch (136:35): [True: 0, False: 0]
  Branch (136:35): [Folded - Ignored]
137
0
            ActionUniqueQualifier::Uncachable(action_key)
138
        } else {
139
0
            ActionUniqueQualifier::Cachable(action_key)
140
        };
141
142
0
        Ok(ActionInfo {
143
0
            command_digest,
144
0
            input_root_digest,
145
0
            timeout,
146
0
            platform_properties,
147
0
            priority,
148
0
            load_timestamp: UNIX_EPOCH,
149
0
            insert_timestamp: SystemTime::now(),
150
0
            unique_qualifier,
151
0
        })
152
0
    }
153
}
154
155
pub struct ExecutionServer {
156
    instance_infos: HashMap<InstanceName, InstanceInfo>,
157
}
158
159
type ExecuteStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send + 'static>>;
160
161
impl ExecutionServer {
162
0
    pub fn new(
163
0
        config: &HashMap<InstanceName, ExecutionConfig>,
164
0
        scheduler_map: &HashMap<String, Arc<dyn ClientStateManager>>,
165
0
        store_manager: &StoreManager,
166
0
    ) -> Result<Self, Error> {
167
0
        let mut instance_infos = HashMap::with_capacity(config.len());
168
0
        for (instance_name, exec_cfg) in config {
169
0
            let cas_store = store_manager
170
0
                .get_store(&exec_cfg.cas_store)
171
0
                .ok_or_else(|| {
172
0
                    make_input_err!("'cas_store': '{}' does not exist", exec_cfg.cas_store)
173
0
                })?;
174
0
            let scheduler = scheduler_map
175
0
                .get(&exec_cfg.scheduler)
176
0
                .err_tip(|| {
177
0
                    format!(
178
0
                        "Scheduler needs config for '{}' because it exists in execution",
179
0
                        exec_cfg.scheduler
180
0
                    )
181
0
                })?
182
0
                .clone();
183
0
184
0
            instance_infos.insert(
185
0
                instance_name.to_string(),
186
0
                InstanceInfo {
187
0
                    scheduler,
188
0
                    cas_store,
189
0
                },
190
0
            );
191
        }
192
0
        Ok(Self { instance_infos })
193
0
    }
194
195
0
    pub fn into_service(self) -> Server<ExecutionServer> {
196
0
        Server::new(self)
197
0
    }
198
199
0
    fn to_execute_stream(
200
0
        nl_client_operation_id: &NativelinkOperationId,
201
0
        action_listener: Box<dyn ActionStateResult>,
202
0
    ) -> impl Stream<Item = Result<Operation, Status>> + Send + 'static {
203
0
        let client_operation_id = OperationId::from(nl_client_operation_id.to_string());
204
0
        unfold(Some(action_listener), move |maybe_action_listener| {
205
0
            let client_operation_id = client_operation_id.clone();
206
0
            async move {
207
0
                let mut action_listener = maybe_action_listener?;
208
0
                match action_listener.changed().await {
209
0
                    Ok(action_update) => {
210
0
                        event!(Level::INFO, ?action_update, "Execute Resp Stream");
211
                        // If the action is finished we won't be sending any more updates.
212
0
                        let maybe_action_listener = if action_update.stage.is_finished() {
  Branch (212:56): [True: 0, False: 0]
  Branch (212:56): [Folded - Ignored]
213
0
                            None
214
                        } else {
215
0
                            Some(action_listener)
216
                        };
217
0
                        Some((
218
0
                            Ok(action_update.as_operation(client_operation_id)),
219
0
                            maybe_action_listener,
220
0
                        ))
221
                    }
222
0
                    Err(err) => {
223
0
                        event!(Level::ERROR, ?err, "Error in action_listener stream");
224
0
                        Some((Err(err.into()), None))
225
                    }
226
                }
227
0
            }
228
0
        })
229
0
    }
230
231
0
    async fn inner_execute(
232
0
        &self,
233
0
        request: ExecuteRequest,
234
0
    ) -> Result<impl Stream<Item = Result<Operation, Status>> + Send + 'static, Error> {
235
0
        let instance_name = request.instance_name;
236
237
0
        let instance_info = self
238
0
            .instance_infos
239
0
            .get(&instance_name)
240
0
            .err_tip(|| "Instance name '{}' not configured")?;
241
242
0
        let digest = DigestInfo::try_from(
243
0
            request
244
0
                .action_digest
245
0
                .err_tip(|| "Expected action_digest to exist")?,
246
        )
247
0
        .err_tip(|| "Failed to unwrap action cache")?;
248
249
0
        let priority = request
250
0
            .execution_policy
251
0
            .map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority);
252
253
0
        let action =
254
0
            get_and_decode_digest::<Action>(&instance_info.cas_store, digest.into()).await?;
255
0
        let action_info = instance_info
256
0
            .build_action_info(
257
0
                instance_name.clone(),
258
0
                digest,
259
0
                action,
260
0
                priority,
261
0
                request.skip_cache_lookup,
262
0
                request
263
0
                    .digest_function
264
0
                    .try_into()
265
0
                    .err_tip(|| "Could not convert digest function in inner_execute()")?,
266
            )
267
0
            .await?;
268
269
0
        let action_listener = instance_info
270
0
            .scheduler
271
0
            .add_action(OperationId::default(), Arc::new(action_info))
272
0
            .await
273
0
            .err_tip(|| "Failed to schedule task")?;
274
275
        Ok(Box::pin(Self::to_execute_stream(
276
            &NativelinkOperationId::new(
277
0
                instance_name,
278
0
                action_listener
279
0
                    .as_state()
280
0
                    .await
281
0
                    .err_tip(|| "In ExecutionServer::inner_execute")?
282
                    .client_operation_id
283
0
                    .clone(),
284
0
            ),
285
0
            action_listener,
286
        )))
287
0
    }
288
289
0
    async fn inner_wait_execution(
290
0
        &self,
291
0
        request: WaitExecutionRequest,
292
0
    ) -> Result<impl Stream<Item = Result<Operation, Status>> + Send + 'static, Status> {
293
0
        let nl_operation_id = NativelinkOperationId::from_name(&request.name)
294
0
            .err_tip(|| "Failed to parse operation_id in ExecutionServer::wait_execution")?;
295
0
        let Some(instance_info) = self.instance_infos.get(&nl_operation_id.instance_name) else {
  Branch (295:13): [True: 0, False: 0]
  Branch (295:13): [Folded - Ignored]
296
0
            return Err(Status::not_found(format!(
297
0
                "No scheduler with the instance name {}",
298
0
                nl_operation_id.instance_name,
299
0
            )));
300
        };
301
0
        let Some(rx) = instance_info
  Branch (301:13): [True: 0, False: 0]
  Branch (301:13): [Folded - Ignored]
302
0
            .scheduler
303
0
            .filter_operations(OperationFilter {
304
0
                client_operation_id: Some(nl_operation_id.client_operation_id.clone()),
305
0
                ..Default::default()
306
0
            })
307
0
            .await
308
0
            .err_tip(|| "Error running find_existing_action in ExecutionServer::wait_execution")?
309
0
            .next()
310
0
            .await
311
        else {
312
0
            return Err(Status::not_found("Failed to find existing task"));
313
        };
314
0
        Ok(Self::to_execute_stream(&nl_operation_id, rx))
315
0
    }
316
}
317
318
#[tonic::async_trait]
319
impl Execution for ExecutionServer {
320
    type ExecuteStream = ExecuteStream;
321
    type WaitExecutionStream = ExecuteStream;
322
323
    #[allow(clippy::blocks_in_conditions)]
324
    #[instrument(
325
        err,
326
        level = Level::ERROR,
327
        skip_all,
328
        fields(request = ?grpc_request.get_ref())
329
    )]
330
    async fn execute(
331
        &self,
332
        grpc_request: Request<ExecuteRequest>,
333
0
    ) -> Result<Response<ExecuteStream>, Status> {
334
0
        let request = grpc_request.into_inner();
335
0
        let ctx = OriginEventContext::new(|| &request).await;
336
0
        let resp = make_ctx_for_hash_func(request.digest_function)
337
0
            .err_tip(|| "In ExecutionServer::execute")?
338
            .wrap_async(
339
0
                error_span!("execution_server_execute"),
340
0
                self.inner_execute(request),
341
0
            )
342
0
            .await
343
0
            .map(|stream| ctx.wrap_stream(stream))
344
0
            .map(Response::new)
345
0
            .err_tip(|| "Failed on execute() command")
346
0
            .map_err(Into::into);
347
0
        ctx.emit(|| &resp).await;
348
0
        resp
349
0
    }
350
351
    #[allow(clippy::blocks_in_conditions)]
352
    #[instrument(
353
        err,
354
        level = Level::ERROR,
355
        skip_all,
356
        fields(request = ?grpc_request.get_ref())
357
    )]
358
    async fn wait_execution(
359
        &self,
360
        grpc_request: Request<WaitExecutionRequest>,
361
0
    ) -> Result<Response<ExecuteStream>, Status> {
362
0
        let request = grpc_request.into_inner();
363
0
        let ctx = OriginEventContext::new(|| &request).await;
364
0
        let resp = self
365
0
            .inner_wait_execution(request)
366
0
            .await
367
0
            .err_tip(|| "Failed on wait_execution() command")
368
0
            .map(|stream| ctx.wrap_stream(stream))
369
0
            .map(Response::new)
370
0
            .map_err(Into::into);
371
0
372
0
        if resp.is_ok() {
  Branch (372:12): [True: 0, False: 0]
  Branch (372:12): [Folded - Ignored]
373
0
            event!(Level::DEBUG, return = "Ok(<stream>)");
374
0
        }
375
0
        resp
376
0
    }
377
}