Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/property_modifier_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::{HashMap, HashSet};
16
use std::sync::Arc;
17
18
use async_trait::async_trait;
19
use nativelink_config::schedulers::{PropertyModification, PropertyModifierSpec};
20
use nativelink_error::{Error, ResultExt};
21
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
22
use nativelink_util::action_messages::{ActionInfo, OperationId};
23
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
24
use nativelink_util::operation_state_manager::{
25
    ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
26
};
27
use parking_lot::Mutex;
28
29
#[derive(MetricsComponent)]
30
pub struct PropertyModifierScheduler {
31
    modifications: Vec<PropertyModification>,
32
    #[metric(group = "scheduler")]
33
    scheduler: Arc<dyn ClientStateManager>,
34
    #[metric(group = "property_manager")]
35
    known_properties: Mutex<HashMap<String, Vec<String>>>,
36
}
37
38
impl PropertyModifierScheduler {
39
8
    pub fn new(spec: &PropertyModifierSpec, scheduler: Arc<dyn ClientStateManager>) -> Self {
40
8
        Self {
41
8
            modifications: spec.modifications.clone(),
42
8
            scheduler,
43
8
            known_properties: Mutex::new(HashMap::new()),
44
8
        }
45
8
    }
46
47
2
    async fn inner_get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> {
48
2
        {
49
2
            let known_properties = self.known_properties.lock();
50
2
            if let Some(
property_manager0
) = known_properties.get(instance_name) {
  Branch (50:20): [True: 0, False: 2]
  Branch (50:20): [Folded - Ignored]
51
0
                return Ok(property_manager.clone());
52
2
            }
53
        }
54
2
        let known_platform_property_provider = self
55
2
            .scheduler
56
2
            .as_known_platform_property_provider()
57
2
            .err_tip(|| 
"Inner scheduler does not implement KnownPlatformPropertyProvider for PropertyModifierScheduler"0
)
?0
;
58
2
        let mut known_properties = HashSet::<String>::from_iter(
59
2
            known_platform_property_provider
60
2
                .get_known_properties(instance_name)
61
2
                .await
?0
,
62
        );
63
4
        for 
modification2
in &self.modifications {
64
2
            match modification {
65
2
                PropertyModification::remove(name) => {
66
2
                    known_properties.insert(name.clone());
67
2
                }
68
0
                PropertyModification::add(_) => (),
69
            }
70
        }
71
2
        let final_known_properties: Vec<String> = known_properties.into_iter().collect();
72
2
        self.known_properties
73
2
            .lock()
74
2
            .insert(instance_name.to_string(), final_known_properties.clone());
75
2
76
2
        Ok(final_known_properties)
77
2
    }
78
79
5
    async fn inner_add_action(
80
5
        &self,
81
5
        client_operation_id: OperationId,
82
5
        mut action_info: Arc<ActionInfo>,
83
5
    ) -> Result<Box<dyn ActionStateResult>, Error> {
84
5
        let action_info_mut = Arc::make_mut(&mut action_info);
85
12
        for 
modification7
in &self.modifications {
86
7
            match modification {
87
4
                PropertyModification::add(addition) => action_info_mut
88
4
                    .platform_properties
89
4
                    .insert(addition.name.clone(), addition.value.clone()),
90
3
                PropertyModification::remove(name) => {
91
3
                    action_info_mut.platform_properties.remove(name)
92
                }
93
            };
94
        }
95
5
        self.scheduler
96
5
            .add_action(client_operation_id, action_info)
97
5
            .await
98
5
    }
99
100
1
    async fn inner_filter_operations(
101
1
        &self,
102
1
        filter: OperationFilter,
103
1
    ) -> Result<ActionStateResultStream, Error> {
104
1
        self.scheduler.filter_operations(filter).await
105
1
    }
106
}
107
108
#[async_trait]
109
impl KnownPlatformPropertyProvider for PropertyModifierScheduler {
110
2
    async fn get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> {
111
2
        self.inner_get_known_properties(instance_name).await
112
4
    }
113
}
114
115
#[async_trait]
116
impl ClientStateManager for PropertyModifierScheduler {
117
    async fn add_action(
118
        &self,
119
        client_operation_id: OperationId,
120
        action_info: Arc<ActionInfo>,
121
5
    ) -> Result<Box<dyn ActionStateResult>, Error> {
122
5
        self.inner_add_action(client_operation_id, action_info)
123
5
            .await
124
10
    }
125
126
    async fn filter_operations<'a>(
127
        &'a self,
128
        filter: OperationFilter,
129
1
    ) -> Result<ActionStateResultStream<'a>, Error> {
130
1
        self.inner_filter_operations(filter).await
131
2
    }
132
133
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
134
0
        Some(self)
135
0
    }
136
}
137
138
impl RootMetricsComponent for PropertyModifierScheduler {}