Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/simple_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::SystemTime;
17
18
use async_trait::async_trait;
19
use futures::Future;
20
use nativelink_config::schedulers::SimpleSpec;
21
use nativelink_error::{Code, Error, ResultExt};
22
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
23
use nativelink_util::action_messages::{ActionInfo, ActionState, OperationId, WorkerId};
24
use nativelink_util::instant_wrapper::InstantWrapper;
25
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
26
use nativelink_util::operation_state_manager::{
27
    ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
28
    OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType,
29
};
30
use nativelink_util::spawn;
31
use nativelink_util::task::JoinHandleDropGuard;
32
use tokio::sync::Notify;
33
use tokio::time::Duration;
34
use tokio_stream::StreamExt;
35
use tracing::{event, Level};
36
37
use crate::api_worker_scheduler::ApiWorkerScheduler;
38
use crate::awaited_action_db::AwaitedActionDb;
39
use crate::platform_property_manager::PlatformPropertyManager;
40
use crate::simple_scheduler_state_manager::SimpleSchedulerStateManager;
41
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp};
42
use crate::worker_scheduler::WorkerScheduler;
43
44
/// Default timeout for workers in seconds.
45
/// If this changes, remember to change the documentation in the config.
46
const DEFAULT_WORKER_TIMEOUT_S: u64 = 5;
47
48
/// Mark operations as completed with error if no client has updated them
49
/// within this duration.
50
/// If this changes, remember to change the documentation in the config.
51
const DEFAULT_CLIENT_ACTION_TIMEOUT_S: u64 = 60;
52
53
/// Default times a job can retry before failing.
54
/// If this changes, remember to change the documentation in the config.
55
const DEFAULT_MAX_JOB_RETRIES: usize = 3;
56
57
struct SimpleSchedulerActionStateResult {
58
    client_operation_id: OperationId,
59
    action_state_result: Box<dyn ActionStateResult>,
60
}
61
62
impl SimpleSchedulerActionStateResult {
63
25
    fn new(
64
25
        client_operation_id: OperationId,
65
25
        action_state_result: Box<dyn ActionStateResult>,
66
25
    ) -> Self {
67
25
        Self {
68
25
            client_operation_id,
69
25
            action_state_result,
70
25
        }
71
25
    }
72
}
73
74
#[async_trait]
75
impl ActionStateResult for SimpleSchedulerActionStateResult {
76
3
    async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
77
3
        let mut action_state = self
78
3
            .action_state_result
79
3
            .as_state()
80
3
            .await
81
3
            .err_tip(|| 
"In SimpleSchedulerActionStateResult"0
)
?0
;
82
        // We need to ensure the client is not aware of the downstream
83
        // operation id, so override it before it goes out.
84
3
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
85
3
        Ok(action_state)
86
6
    }
87
88
40
    async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
89
40
        let 
mut action_state39
= self
90
40
            .action_state_result
91
40
            .changed()
92
40
            .await
93
39
            .err_tip(|| 
"In SimpleSchedulerActionStateResult"0
)
?0
;
94
        // We need to ensure the client is not aware of the downstream
95
        // operation id, so override it before it goes out.
96
39
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
97
39
        Ok(action_state)
98
79
    }
99
100
0
    async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
101
0
        self.action_state_result
102
0
            .as_action_info()
103
0
            .await
104
0
            .err_tip(|| "In SimpleSchedulerActionStateResult")
105
0
    }
106
}
107
108
/// Engine used to manage the queued/running tasks and relationship with
109
/// the worker nodes. All state on how the workers and actions are interacting
110
/// should be held in this struct.
111
#[derive(MetricsComponent)]
112
pub struct SimpleScheduler {
113
    /// Manager for matching engine side of the state manager.
114
    #[metric(group = "matching_engine_state_manager")]
115
    matching_engine_state_manager: Arc<dyn MatchingEngineStateManager>,
116
117
    /// Manager for client state of this scheduler.
118
    #[metric(group = "client_state_manager")]
119
    client_state_manager: Arc<dyn ClientStateManager>,
120
121
    /// Manager for platform of this scheduler.
122
    #[metric(group = "platform_properties")]
123
    platform_property_manager: Arc<PlatformPropertyManager>,
124
125
    /// A `Workers` pool that contains all workers that are available to execute actions in a priority
126
    /// order based on the allocation strategy.
127
    #[metric(group = "worker_scheduler")]
128
    worker_scheduler: Arc<ApiWorkerScheduler>,
129
130
    /// Background task that tries to match actions to workers. If this struct
131
    /// is dropped the spawn will be cancelled as well.
132
    _task_worker_matching_spawn: JoinHandleDropGuard<()>,
133
}
134
135
impl SimpleScheduler {
136
    /// Attempts to find a worker to execute an action and begins executing it.
137
    /// If an action is already running that is cacheable it may merge this
138
    /// action with the results and state changes of the already running
139
    /// action. If the task cannot be executed immediately it will be queued
140
    /// for execution based on priority and other metrics.
141
    /// All further updates to the action will be provided through the returned
142
    /// value.
143
25
    async fn inner_add_action(
144
25
        &self,
145
25
        client_operation_id: OperationId,
146
25
        action_info: Arc<ActionInfo>,
147
25
    ) -> Result<Box<dyn ActionStateResult>, Error> {
148
25
        let action_state_result = self
149
25
            .client_state_manager
150
25
            .add_action(client_operation_id.clone(), action_info)
151
25
            .await
152
25
            .err_tip(|| 
"In SimpleScheduler::add_action"0
)
?0
;
153
25
        Ok(Box::new(SimpleSchedulerActionStateResult::new(
154
25
            client_operation_id.clone(),
155
25
            action_state_result,
156
25
        )))
157
25
    }
158
159
503
    async fn inner_filter_operations(
160
503
        &self,
161
503
        filter: OperationFilter,
162
503
    ) -> Result<ActionStateResultStream, Error> {
163
503
        self.client_state_manager
164
503
            .filter_operations(filter)
165
503
            .await
166
503
            .err_tip(|| 
"In SimpleScheduler::find_by_client_operation_id getting filter result"0
)
167
503
    }
168
169
91
    async fn get_queued_operations(&self) -> Result<ActionStateResultStream, Error> {
170
91
        let filter = OperationFilter {
171
91
            stages: OperationStageFlags::Queued,
172
91
            order_by_priority_direction: Some(OrderDirection::Desc),
173
91
            ..Default::default()
174
91
        };
175
91
        self.matching_engine_state_manager
176
91
            .filter_operations(filter)
177
91
            .await
178
91
            .err_tip(|| 
"In SimpleScheduler::get_queued_operations getting filter result"0
)
179
91
    }
180
181
3
    pub async fn do_try_match_for_test(&self) -> Result<(), Error> {
182
3
        self.do_try_match().await
183
3
    }
184
185
    // TODO(blaise.bruer) This is an O(n*m) (aka n^2) algorithm. In theory we
186
    // can create a map of capabilities of each worker and then try and match
187
    // the actions to the worker using the map lookup (ie. map reduce).
188
91
    async fn do_try_match(&self) -> Result<(), Error> {
189
42
        async fn match_action_to_worker(
190
42
            action_state_result: &dyn ActionStateResult,
191
42
            workers: &ApiWorkerScheduler,
192
42
            matching_engine_state_manager: &dyn MatchingEngineStateManager,
193
42
            platform_property_manager: &PlatformPropertyManager,
194
42
        ) -> Result<(), Error> {
195
42
            let action_info = action_state_result
196
42
                .as_action_info()
197
42
                .await
198
42
                .err_tip(|| 
"Failed to get action_info from as_action_info_result stream"0
)
?0
;
199
200
            // TODO(allada) We should not compute this every time and instead store
201
            // it with the ActionInfo when we receive it.
202
42
            let platform_properties = platform_property_manager
203
42
                .make_platform_properties(action_info.platform_properties.clone())
204
42
                .err_tip(|| {
205
0
                    "Failed to make platform properties in SimpleScheduler::do_try_match"
206
42
                })
?0
;
207
208
42
            let action_info = ActionInfoWithProps {
209
42
                inner: action_info,
210
42
                platform_properties,
211
42
            };
212
213
            // Try to find a worker for the action.
214
28
            let worker_id = {
215
42
                match workers
216
42
                    .find_worker_for_action(&action_info.platform_properties)
217
42
                    .await
218
                {
219
28
                    Some(worker_id) => worker_id,
220
                    // If we could not find a worker for the action,
221
                    // we have nothing to do.
222
14
                    None => return Ok(()),
223
                }
224
            };
225
226
            // Extract the operation_id from the action_state.
227
28
            let operation_id = {
228
28
                let action_state = action_state_result
229
28
                    .as_state()
230
28
                    .await
231
28
                    .err_tip(|| 
"Failed to get action_info from as_state_result stream"0
)
?0
;
232
28
                action_state.client_operation_id.clone()
233
            };
234
235
            // Tell the matching engine that the operation is being assigned to a worker.
236
28
            let assign_result = matching_engine_state_manager
237
28
                .assign_operation(&operation_id, Ok(&worker_id))
238
28
                .await
239
28
                .err_tip(|| 
"Failed to assign operation in do_try_match"1
);
240
28
            if let Err(
err1
) = assign_result {
  Branch (240:20): [True: 0, False: 0]
  Branch (240:20): [Folded - Ignored]
  Branch (240:20): [True: 1, False: 27]
241
1
                if err.code == Code::Aborted {
  Branch (241:20): [True: 0, False: 0]
  Branch (241:20): [Folded - Ignored]
  Branch (241:20): [True: 0, False: 1]
242
                    // If the operation was aborted, it means that the operation was
243
                    // cancelled due to another operation being assigned to the worker.
244
0
                    return Ok(());
245
1
                }
246
1
                // Any other error is a real error.
247
1
                return Err(err);
248
27
            }
249
27
250
27
            // Notify the worker to run the action.
251
27
            {
252
27
                workers
253
27
                    .worker_notify_run_action(worker_id, operation_id, action_info)
254
27
                    .await
255
27
                    .err_tip(|| {
256
1
                        "Failed to run worker_notify_run_action in SimpleScheduler::do_try_match"
257
27
                    })
258
            }
259
42
        }
260
261
91
        let mut result = Ok(());
262
263
91
        let mut stream = self
264
91
            .get_queued_operations()
265
91
            .await
266
91
            .err_tip(|| 
"Failed to get queued operations in do_try_match"0
)
?0
;
267
268
133
        while let Some(
action_state_result42
) = stream.next().await {
  Branch (268:19): [True: 0, False: 0]
  Branch (268:19): [Folded - Ignored]
  Branch (268:19): [True: 42, False: 91]
269
42
            result = result.merge(
270
42
                match_action_to_worker(
271
42
                    action_state_result.as_ref(),
272
42
                    self.worker_scheduler.as_ref(),
273
42
                    self.matching_engine_state_manager.as_ref(),
274
42
                    self.platform_property_manager.as_ref(),
275
42
                )
276
42
                .await,
277
            );
278
        }
279
91
        result
280
91
    }
281
}
282
283
impl SimpleScheduler {
284
0
    pub fn new<A: AwaitedActionDb>(
285
0
        spec: &SimpleSpec,
286
0
        awaited_action_db: A,
287
0
        task_change_notify: Arc<Notify>,
288
0
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
289
0
        Self::new_with_callback(
290
0
            spec,
291
0
            awaited_action_db,
292
0
            || {
293
0
                // The cost of running `do_try_match()` is very high, but constant
294
0
                // in relation to the number of changes that have happened. This
295
0
                // means that grabbing this lock to process `do_try_match()` should
296
0
                // always yield to any other tasks that might want the lock. The
297
0
                // easiest and most fair way to do this is to sleep for a small
298
0
                // amount of time. Using something like tokio::task::yield_now()
299
0
                // does not yield as aggresively as we'd like if new futures are
300
0
                // scheduled within a future.
301
0
                tokio::time::sleep(Duration::from_millis(1))
302
0
            },
303
0
            task_change_notify,
304
0
            SystemTime::now,
305
0
        )
306
0
    }
307
308
21
    pub fn new_with_callback<
309
21
        Fut: Future<Output = ()> + Send,
310
21
        F: Fn() -> Fut + Send + Sync + 'static,
311
21
        A: AwaitedActionDb,
312
21
        I: InstantWrapper,
313
21
        NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
314
21
    >(
315
21
        spec: &SimpleSpec,
316
21
        awaited_action_db: A,
317
21
        on_matching_engine_run: F,
318
21
        task_change_notify: Arc<Notify>,
319
21
        now_fn: NowFn,
320
21
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
321
21
        let platform_property_manager = Arc::new(PlatformPropertyManager::new(
322
21
            spec.supported_platform_properties
323
21
                .clone()
324
21
                .unwrap_or_default(),
325
21
        ));
326
21
327
21
        let mut worker_timeout_s = spec.worker_timeout_s;
328
21
        if worker_timeout_s == 0 {
  Branch (328:12): [True: 0, False: 0]
  Branch (328:12): [True: 0, False: 0]
  Branch (328:12): [Folded - Ignored]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 0, False: 1]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 0, False: 1]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 0, False: 1]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 0, False: 1]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
329
17
            worker_timeout_s = DEFAULT_WORKER_TIMEOUT_S;
330
17
        
}4
331
332
21
        let mut client_action_timeout_s = spec.client_action_timeout_s;
333
21
        if client_action_timeout_s == 0 {
  Branch (333:12): [True: 0, False: 0]
  Branch (333:12): [True: 0, False: 0]
  Branch (333:12): [Folded - Ignored]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
  Branch (333:12): [True: 1, False: 0]
334
21
            client_action_timeout_s = DEFAULT_CLIENT_ACTION_TIMEOUT_S;
335
21
        
}0
336
337
21
        let mut max_job_retries = spec.max_job_retries;
338
21
        if max_job_retries == 0 {
  Branch (338:12): [True: 0, False: 0]
  Branch (338:12): [True: 0, False: 0]
  Branch (338:12): [Folded - Ignored]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 0, False: 1]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
  Branch (338:12): [True: 1, False: 0]
339
20
            max_job_retries = DEFAULT_MAX_JOB_RETRIES;
340
20
        
}1
341
342
21
        let worker_change_notify = Arc::new(Notify::new());
343
21
        let state_manager = SimpleSchedulerStateManager::new(
344
21
            max_job_retries,
345
21
            Duration::from_secs(worker_timeout_s),
346
21
            Duration::from_secs(client_action_timeout_s),
347
21
            awaited_action_db,
348
21
            now_fn,
349
21
        );
350
21
351
21
        let worker_scheduler = ApiWorkerScheduler::new(
352
21
            state_manager.clone(),
353
21
            platform_property_manager.clone(),
354
21
            spec.allocation_strategy,
355
21
            worker_change_notify.clone(),
356
21
            worker_timeout_s,
357
21
        );
358
21
359
21
        let worker_scheduler_clone = worker_scheduler.clone();
360
21
361
21
        let action_scheduler = Arc::new_cyclic(move |weak_self| -> Self {
362
21
            let weak_inner = weak_self.clone();
363
21
            let task_worker_matching_spawn =
364
21
                spawn!("simple_scheduler_task_worker_matching", async move 
{20
365
                    // Break out of the loop only when the inner is dropped.
366
                    loop {
367
108
                        let task_change_fut = task_change_notify.notified();
368
108
                        let worker_change_fut = worker_change_notify.notified();
369
108
                        tokio::pin!(task_change_fut);
370
108
                        tokio::pin!(worker_change_fut);
371
108
                        // Wait for either of these futures to be ready.
372
108
                        let _ = futures::future::select(task_change_fut, worker_change_fut).await;
373
88
                        let result = match weak_inner.upgrade() {
374
88
                            Some(scheduler) => scheduler.do_try_match().await,
375
                            // If the inner went away it means the scheduler is shutting
376
                            // down, so we need to resolve our future.
377
0
                            None => return,
378
                        };
379
88
                        if let Err(
err1
) = result {
  Branch (379:32): [True: 0, False: 0]
  Branch (379:32): [True: 0, False: 0]
  Branch (379:32): [Folded - Ignored]
  Branch (379:32): [True: 0, False: 3]
  Branch (379:32): [True: 0, False: 1]
  Branch (379:32): [True: 0, False: 1]
  Branch (379:32): [True: 0, False: 0]
  Branch (379:32): [True: 0, False: 1]
  Branch (379:32): [True: 0, False: 3]
  Branch (379:32): [True: 0, False: 3]
  Branch (379:32): [True: 0, False: 4]
  Branch (379:32): [True: 0, False: 5]
  Branch (379:32): [True: 0, False: 7]
  Branch (379:32): [True: 0, False: 4]
  Branch (379:32): [True: 0, False: 7]
  Branch (379:32): [True: 0, False: 6]
  Branch (379:32): [True: 0, False: 6]
  Branch (379:32): [True: 0, False: 9]
  Branch (379:32): [True: 0, False: 3]
  Branch (379:32): [True: 0, False: 3]
  Branch (379:32): [True: 0, False: 4]
  Branch (379:32): [True: 0, False: 7]
  Branch (379:32): [True: 1, False: 3]
  Branch (379:32): [True: 0, False: 7]
380
1
                            event!(Level::ERROR, ?err, "Error while running do_try_match");
381
87
                        }
382
383
88
                        on_matching_engine_run().await;
384
                    }
385
                    // Unreachable.
386
21
                
}0
);
387
21
            SimpleScheduler {
388
21
                matching_engine_state_manager: state_manager.clone(),
389
21
                client_state_manager: state_manager.clone(),
390
21
                worker_scheduler,
391
21
                platform_property_manager,
392
21
                _task_worker_matching_spawn: task_worker_matching_spawn,
393
21
            }
394
21
        });
395
21
        (action_scheduler, worker_scheduler_clone)
396
21
    }
397
}
398
399
#[async_trait]
400
impl ClientStateManager for SimpleScheduler {
401
    async fn add_action(
402
        &self,
403
        client_operation_id: OperationId,
404
        action_info: Arc<ActionInfo>,
405
25
    ) -> Result<Box<dyn ActionStateResult>, Error> {
406
25
        self.inner_add_action(client_operation_id, action_info)
407
25
            .await
408
50
    }
409
410
    async fn filter_operations<'a>(
411
        &'a self,
412
        filter: OperationFilter,
413
503
    ) -> Result<ActionStateResultStream<'a>, Error> {
414
503
        self.inner_filter_operations(filter).await
415
1.00k
    }
416
417
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
418
0
        Some(self)
419
0
    }
420
}
421
422
#[async_trait]
423
impl KnownPlatformPropertyProvider for SimpleScheduler {
424
0
    async fn get_known_properties(&self, _instance_name: &str) -> Result<Vec<String>, Error> {
425
0
        Ok(self
426
0
            .worker_scheduler
427
0
            .get_platform_property_manager()
428
0
            .get_known_properties()
429
0
            .keys()
430
0
            .cloned()
431
0
            .collect())
432
0
    }
433
}
434
435
#[async_trait]
436
impl WorkerScheduler for SimpleScheduler {
437
0
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
438
0
        self.worker_scheduler.get_platform_property_manager()
439
0
    }
440
441
25
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
442
25
        self.worker_scheduler.add_worker(worker).await
443
50
    }
444
445
    async fn update_action(
446
        &self,
447
        worker_id: &WorkerId,
448
        operation_id: &OperationId,
449
        update: UpdateOperationType,
450
9
    ) -> Result<(), Error> {
451
9
        self.worker_scheduler
452
9
            .update_action(worker_id, operation_id, update)
453
9
            .await
454
18
    }
455
456
    async fn worker_keep_alive_received(
457
        &self,
458
        worker_id: &WorkerId,
459
        timestamp: WorkerTimestamp,
460
1
    ) -> Result<(), Error> {
461
1
        self.worker_scheduler
462
1
            .worker_keep_alive_received(worker_id, timestamp)
463
1
            .await
464
2
    }
465
466
1
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
467
1
        self.worker_scheduler.remove_worker(worker_id).await
468
2
    }
469
470
1
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
471
1
        self.worker_scheduler
472
1
            .remove_timedout_workers(now_timestamp)
473
1
            .await
474
2
    }
475
476
2
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
477
2
        self.worker_scheduler
478
2
            .set_drain_worker(worker_id, is_draining)
479
2
            .await
480
4
    }
481
}
482
483
impl RootMetricsComponent for SimpleScheduler {}