Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/capabilities_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
use std::sync::Arc;
17
18
use nativelink_config::cas_server::{CapabilitiesConfig, InstanceName};
19
use nativelink_error::{Error, ResultExt};
20
use nativelink_proto::build::bazel::remote::execution::v2::capabilities_server::{
21
    Capabilities, CapabilitiesServer as Server,
22
};
23
use nativelink_proto::build::bazel::remote::execution::v2::digest_function::Value as DigestFunction;
24
use nativelink_proto::build::bazel::remote::execution::v2::priority_capabilities::PriorityRange;
25
use nativelink_proto::build::bazel::remote::execution::v2::symlink_absolute_path_strategy::Value as SymlinkAbsolutePathStrategy;
26
use nativelink_proto::build::bazel::remote::execution::v2::{
27
    ActionCacheUpdateCapabilities, CacheCapabilities, ExecutionCapabilities,
28
    GetCapabilitiesRequest, PriorityCapabilities, ServerCapabilities,
29
};
30
use nativelink_proto::build::bazel::semver::SemVer;
31
use nativelink_util::digest_hasher::default_digest_hasher_func;
32
use nativelink_util::operation_state_manager::ClientStateManager;
33
use nativelink_util::origin_event::OriginEventContext;
34
use tonic::{Request, Response, Status};
35
use tracing::{event, instrument, Level};
36
37
const MAX_BATCH_TOTAL_SIZE: i64 = 64 * 1024;
38
39
#[derive(Debug, Default)]
40
pub struct CapabilitiesServer {
41
    supported_node_properties_for_instance: HashMap<InstanceName, Vec<String>>,
42
}
43
44
impl CapabilitiesServer {
45
0
    pub async fn new(
46
0
        config: &HashMap<InstanceName, CapabilitiesConfig>,
47
0
        scheduler_map: &HashMap<String, Arc<dyn ClientStateManager>>,
48
0
    ) -> Result<Self, Error> {
49
0
        let mut supported_node_properties_for_instance = HashMap::new();
50
0
        for (instance_name, cfg) in config {
51
0
            let mut properties = Vec::new();
52
0
            if let Some(remote_execution_cfg) = &cfg.remote_execution {
  Branch (52:20): [Folded - Ignored]
  Branch (52:20): [Folded - Ignored]
53
0
                let scheduler =
54
0
                    scheduler_map
55
0
                        .get(&remote_execution_cfg.scheduler)
56
0
                        .err_tip(|| {
57
0
                            format!(
58
0
                                "Scheduler needs config for '{}' because it exists in capabilities",
59
0
                                remote_execution_cfg.scheduler
60
0
                            )
61
0
                        })?;
62
0
                if let Some(props_provider) = scheduler.as_known_platform_property_provider() {
  Branch (62:24): [Folded - Ignored]
  Branch (62:24): [Folded - Ignored]
63
0
                    for platform_key in props_provider
64
0
                        .get_known_properties(instance_name)
65
0
                        .await
66
0
                        .err_tip(|| {
67
0
                            format!("Failed to get platform properties for {instance_name}")
68
0
                        })?
69
0
                    {
70
0
                        properties.push(platform_key.clone());
71
0
                    }
72
                } else {
73
0
                    event!(
74
0
                        Level::WARN,
75
0
                        "Scheduler '{}' does not implement KnownPlatformPropertyProvider",
76
                        remote_execution_cfg.scheduler
77
                    );
78
                }
79
0
            }
80
0
            supported_node_properties_for_instance.insert(instance_name.clone(), properties);
81
        }
82
0
        Ok(CapabilitiesServer {
83
0
            supported_node_properties_for_instance,
84
0
        })
85
0
    }
86
87
0
    pub fn into_service(self) -> Server<CapabilitiesServer> {
88
0
        Server::new(self)
89
0
    }
90
}
91
92
#[tonic::async_trait]
93
impl Capabilities for CapabilitiesServer {
94
    #[allow(clippy::blocks_in_conditions)]
95
    #[instrument(
96
        err,
97
        ret(level = Level::INFO),
98
        level = Level::ERROR,
99
        skip_all,
100
        fields(request = ?grpc_request.get_ref())
101
    )]
102
    async fn get_capabilities(
103
        &self,
104
        grpc_request: Request<GetCapabilitiesRequest>,
105
0
    ) -> Result<Response<ServerCapabilities>, Status> {
106
0
        let request = grpc_request.into_inner();
107
0
        let ctx = OriginEventContext::new(|| &request).await;
108
109
0
        let instance_name = request.instance_name;
110
0
        let maybe_supported_node_properties = self
111
0
            .supported_node_properties_for_instance
112
0
            .get(&instance_name);
113
0
        let execution_capabilities =
114
0
            maybe_supported_node_properties.map(|props_for_instance| ExecutionCapabilities {
115
0
                digest_function: default_digest_hasher_func().proto_digest_func().into(),
116
0
                exec_enabled: true, // TODO(blaise.bruer) Make this configurable.
117
0
                execution_priority_capabilities: Some(PriorityCapabilities {
118
0
                    priorities: vec![PriorityRange {
119
0
                        min_priority: 0,
120
0
                        max_priority: i32::MAX,
121
0
                    }],
122
0
                }),
123
0
                supported_node_properties: props_for_instance.clone(),
124
0
                digest_functions: vec![
125
0
                    DigestFunction::Sha256.into(),
126
0
                    DigestFunction::Blake3.into(),
127
0
                ],
128
0
            });
129
0
130
0
        let resp = ServerCapabilities {
131
0
            cache_capabilities: Some(CacheCapabilities {
132
0
                digest_functions: vec![
133
0
                    DigestFunction::Sha256.into(),
134
0
                    DigestFunction::Blake3.into(),
135
0
                ],
136
0
                action_cache_update_capabilities: Some(ActionCacheUpdateCapabilities {
137
0
                    update_enabled: true,
138
0
                }),
139
0
                cache_priority_capabilities: None,
140
0
                max_batch_total_size_bytes: MAX_BATCH_TOTAL_SIZE,
141
0
                symlink_absolute_path_strategy: SymlinkAbsolutePathStrategy::Disallowed.into(),
142
0
                supported_compressors: vec![],
143
0
                supported_batch_update_compressors: vec![],
144
0
            }),
145
0
            execution_capabilities,
146
0
            deprecated_api_version: None,
147
0
            low_api_version: Some(SemVer {
148
0
                major: 2,
149
0
                minor: 0,
150
0
                patch: 0,
151
0
                prerelease: String::new(),
152
0
            }),
153
0
            high_api_version: Some(SemVer {
154
0
                major: 2,
155
0
                minor: 3,
156
0
                patch: 0,
157
0
                prerelease: String::new(),
158
0
            }),
159
0
        };
160
0
        ctx.emit(|| &resp).await;
161
0
        Ok(Response::new(resp))
162
0
    }
163
}