Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/worker.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
use std::hash::{Hash, Hasher};
17
use std::sync::Arc;
18
use std::time::{SystemTime, UNIX_EPOCH};
19
20
use nativelink_error::{make_err, Code, Error, ResultExt};
21
use nativelink_metric::MetricsComponent;
22
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
23
    update_for_worker, ConnectionResult, StartExecute, UpdateForWorker,
24
};
25
use nativelink_util::action_messages::{ActionInfo, OperationId, WorkerId};
26
use nativelink_util::metrics_utils::{CounterWithTime, FuncCounterWrapper};
27
use nativelink_util::platform_properties::{PlatformProperties, PlatformPropertyValue};
28
use tokio::sync::mpsc::UnboundedSender;
29
30
pub type WorkerTimestamp = u64;
31
32
/// Represents the action info and the platform properties of the action.
33
/// These platform properties have the type of the properties as well as
34
/// the value of the properties, unlike `ActionInfo`, which only has the
35
/// string value of the properties.
36
#[derive(Clone, Debug, MetricsComponent)]
37
pub struct ActionInfoWithProps {
38
    /// The action info of the action.
39
    #[metric(group = "action_info")]
40
    pub inner: Arc<ActionInfo>,
41
    /// The platform properties of the action.
42
    #[metric(group = "platform_properties")]
43
    pub platform_properties: PlatformProperties,
44
}
45
46
/// Notifications to send worker about a requested state change.
47
pub enum WorkerUpdate {
48
    /// Requests that the worker begin executing this action.
49
    RunAction((OperationId, ActionInfoWithProps)),
50
51
    /// Request that the worker is no longer in the pool and may discard any jobs.
52
    Disconnect,
53
}
54
55
/// Represents a connection to a worker and used as the medium to
56
/// interact with the worker from the client/scheduler.
57
#[derive(MetricsComponent)]
58
pub struct Worker {
59
    /// Unique identifier of the worker.
60
    #[metric(help = "The unique identifier of the worker.")]
61
    pub id: WorkerId,
62
63
    /// Properties that describe the capabilities of this worker.
64
    #[metric(group = "platform_properties")]
65
    pub platform_properties: PlatformProperties,
66
67
    /// Channel to send commands from scheduler to worker.
68
    pub tx: UnboundedSender<UpdateForWorker>,
69
70
    /// The action info of the running actions on the worker.
71
    #[metric(group = "running_action_infos")]
72
    pub running_action_infos: HashMap<OperationId, ActionInfoWithProps>,
73
74
    /// Timestamp of last time this worker had been communicated with.
75
    // Warning: Do not update this timestamp without updating the placement of the worker in
76
    // the LRUCache in the Workers struct.
77
    #[metric(help = "Last time this worker was communicated with.")]
78
    pub last_update_timestamp: WorkerTimestamp,
79
80
    /// Whether the worker rejected the last action due to back pressure.
81
    #[metric(help = "If the worker is paused.")]
82
    pub is_paused: bool,
83
84
    /// Whether the worker is draining.
85
    #[metric(help = "If the worker is draining.")]
86
    pub is_draining: bool,
87
88
    /// Stats about the worker.
89
    #[metric]
90
    metrics: Arc<Metrics>,
91
}
92
93
66
fn send_msg_to_worker(
94
66
    tx: &mut UnboundedSender<UpdateForWorker>,
95
66
    msg: update_for_worker::Update,
96
66
) -> Result<(), Error> {
97
66
    tx.send(UpdateForWorker { update: Some(msg) })
98
66
        .map_err(|_| 
make_err!(Code::Internal, "Worker disconnected")3
)
99
66
}
100
101
/// Reduces the platform properties available on the worker based on the platform properties provided.
102
/// This is used because we allow more than 1 job to run on a worker at a time, and this is how the
103
/// scheduler knows if more jobs can run on a given worker.
104
28
fn reduce_platform_properties(
105
28
    parent_props: &mut PlatformProperties,
106
28
    reduction_props: &PlatformProperties,
107
28
) {
108
28
    debug_assert!(
reduction_props.is_satisfied_by(parent_props)0
);
109
32
    for (
property, prop_value4
) in &reduction_props.properties {
110
4
        if let PlatformPropertyValue::Minimum(
value3
) = prop_value {
  Branch (110:16): [True: 3, False: 1]
  Branch (110:16): [Folded - Ignored]
111
3
            let worker_props = &mut parent_props.properties;
112
3
            if let &mut PlatformPropertyValue::Minimum(worker_value) =
  Branch (112:20): [True: 3, False: 0]
  Branch (112:20): [Folded - Ignored]
113
3
                &mut worker_props.get_mut(property).unwrap()
114
3
            {
115
3
                *worker_value -= value;
116
3
            
}0
117
1
        }
118
    }
119
28
}
120
121
impl Worker {
122
31
    pub fn new(
123
31
        id: WorkerId,
124
31
        platform_properties: PlatformProperties,
125
31
        tx: UnboundedSender<UpdateForWorker>,
126
31
        timestamp: WorkerTimestamp,
127
31
    ) -> Self {
128
31
        Self {
129
31
            id,
130
31
            platform_properties,
131
31
            tx,
132
31
            running_action_infos: HashMap::new(),
133
31
            last_update_timestamp: timestamp,
134
31
            is_paused: false,
135
31
            is_draining: false,
136
31
            metrics: Arc::new(Metrics {
137
31
                connected_timestamp: SystemTime::now()
138
31
                    .duration_since(UNIX_EPOCH)
139
31
                    .unwrap()
140
31
                    .as_secs(),
141
31
                actions_completed: CounterWithTime::default(),
142
31
                run_action: FuncCounterWrapper::default(),
143
31
                keep_alive: FuncCounterWrapper::default(),
144
31
                notify_disconnect: CounterWithTime::default(),
145
31
            }),
146
31
        }
147
31
    }
148
149
    /// Sends the initial connection information to the worker. This generally is just meta info.
150
    /// This should only be sent once and should always be the first item in the stream.
151
31
    pub fn send_initial_connection_result(&mut self) -> Result<(), Error> {
152
31
        send_msg_to_worker(
153
31
            &mut self.tx,
154
31
            update_for_worker::Update::ConnectionResult(ConnectionResult {
155
31
                worker_id: self.id.to_string(),
156
31
            }),
157
31
        )
158
31
        .err_tip(|| 
format!("Failed to send ConnectionResult to worker : {}", self.id)0
)
159
31
    }
160
161
    /// Notifies the worker of a requested state change.
162
34
    pub fn notify_update(&mut self, worker_update: WorkerUpdate) -> Result<(), Error> {
163
34
        match worker_update {
164
28
            WorkerUpdate::RunAction((operation_id, action_info)) => {
165
28
                self.run_action(operation_id, action_info)
166
            }
167
            WorkerUpdate::Disconnect => {
168
6
                self.metrics.notify_disconnect.inc();
169
6
                send_msg_to_worker(&mut self.tx, update_for_worker::Update::Disconnect(()))
170
            }
171
        }
172
34
    }
173
174
1
    pub fn keep_alive(&mut self) -> Result<(), Error> {
175
1
        let tx = &mut self.tx;
176
1
        let id = self.id;
177
1
        self.metrics.keep_alive.wrap(move || {
178
1
            send_msg_to_worker(tx, update_for_worker::Update::KeepAlive(()))
179
1
                .err_tip(|| 
format!("Failed to send KeepAlive to worker : {id}")0
)
180
1
        })
181
1
    }
182
183
28
    fn run_action(
184
28
        &mut self,
185
28
        operation_id: OperationId,
186
28
        action_info: ActionInfoWithProps,
187
28
    ) -> Result<(), Error> {
188
28
        let tx = &mut self.tx;
189
28
        let worker_platform_properties = &mut self.platform_properties;
190
28
        let running_action_infos = &mut self.running_action_infos;
191
28
        self.metrics.run_action.wrap(move || {
192
28
            let action_info_clone = action_info.clone();
193
28
            let operation_id_string = operation_id.to_string();
194
28
            running_action_infos.insert(operation_id, action_info.clone());
195
28
            reduce_platform_properties(
196
28
                worker_platform_properties,
197
28
                &action_info.platform_properties,
198
28
            );
199
28
            send_msg_to_worker(
200
28
                tx,
201
28
                update_for_worker::Update::StartAction(StartExecute {
202
28
                    execute_request: Some(action_info_clone.inner.as_ref().into()),
203
28
                    operation_id: operation_id_string,
204
28
                    queued_timestamp: Some(action_info.inner.insert_timestamp.into()),
205
28
                }),
206
28
            )
207
28
        })
208
28
    }
209
210
9
    pub(crate) fn complete_action(&mut self, operation_id: &OperationId) -> Result<(), Error> {
211
9
        let action_info = self.running_action_infos.remove(operation_id).err_tip(|| {
212
0
            format!(
213
0
                "Worker {} tried to complete operation {} that was not running",
214
0
                self.id, operation_id
215
0
            )
216
9
        })
?0
;
217
9
        self.restore_platform_properties(&action_info.platform_properties);
218
9
        self.is_paused = false;
219
9
        self.metrics.actions_completed.inc();
220
9
        Ok(())
221
9
    }
222
223
0
    pub fn has_actions(&self) -> bool {
224
0
        !self.running_action_infos.is_empty()
225
0
    }
226
227
9
    fn restore_platform_properties(&mut self, props: &PlatformProperties) {
228
11
        for (
property, prop_value2
) in &props.properties {
229
2
            if let PlatformPropertyValue::Minimum(value) = prop_value {
  Branch (229:20): [True: 2, False: 0]
  Branch (229:20): [Folded - Ignored]
230
2
                let worker_props = &mut self.platform_properties.properties;
231
2
                if let PlatformPropertyValue::Minimum(worker_value) =
  Branch (231:24): [True: 2, False: 0]
  Branch (231:24): [Folded - Ignored]
232
2
                    worker_props.get_mut(property).unwrap()
233
2
                {
234
2
                    *worker_value += value;
235
2
                
}0
236
0
            }
237
        }
238
9
    }
239
240
43
    pub fn can_accept_work(&self) -> bool {
241
43
        !self.is_paused && !self.is_draining
  Branch (241:9): [True: 43, False: 0]
  Branch (241:9): [Folded - Ignored]
242
43
    }
243
}
244
245
impl PartialEq for Worker {
246
0
    fn eq(&self, other: &Self) -> bool {
247
0
        self.id == other.id
248
0
    }
249
}
250
251
impl Eq for Worker {}
252
253
impl Hash for Worker {
254
0
    fn hash<H: Hasher>(&self, state: &mut H) {
255
0
        self.id.hash(state);
256
0
    }
257
}
258
259
#[derive(Default, MetricsComponent)]
260
struct Metrics {
261
    #[metric(help = "The timestamp of when this worker connected.")]
262
    connected_timestamp: u64,
263
    #[metric(help = "The number of actions completed for this worker.")]
264
    actions_completed: CounterWithTime,
265
    #[metric(help = "The number of actions started for this worker.")]
266
    run_action: FuncCounterWrapper,
267
    #[metric(help = "The number of keep_alive sent to this worker.")]
268
    keep_alive: FuncCounterWrapper,
269
    #[metric(help = "The number of notify_disconnect sent to this worker.")]
270
    notify_disconnect: CounterWithTime,
271
}