Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-config/src/schedulers.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
17
use serde::Deserialize;
18
19
use crate::serde_utils::{convert_duration_with_shellexpand, convert_numeric_with_shellexpand};
20
use crate::stores::{GrpcEndpoint, Retry, StoreRefName};
21
22
#[allow(non_camel_case_types)]
23
#[derive(Deserialize, Debug)]
24
pub enum SchedulerSpec {
25
    simple(SimpleSpec),
26
    grpc(GrpcSpec),
27
    cache_lookup(CacheLookupSpec),
28
    property_modifier(PropertyModifierSpec),
29
}
30
31
/// When the scheduler matches tasks to workers that are capable of running
32
/// the task, this value will be used to determine how the property is treated.
33
#[allow(non_camel_case_types)]
34
#[derive(Deserialize, Debug, Clone, Copy, Hash, Eq, PartialEq)]
35
pub enum PropertyType {
36
    /// Requires the platform property to be a u64 and when the scheduler looks
37
    /// for appropriate worker nodes that are capable of executing the task,
38
    /// the task will not run on a node that has less than this value.
39
    minimum,
40
41
    /// Requires the platform property to be a string and when the scheduler
42
    /// looks for appropriate worker nodes that are capable of executing the
43
    /// task, the task will not run on a node that does not have this property
44
    /// set to the value with exact string match.
45
    exact,
46
47
    /// Does not restrict on this value and instead will be passed to the worker
48
    /// as an informational piece.
49
    /// TODO(allada) In the future this will be used by the scheduler and worker
50
    /// to cause the scheduler to prefer certain workers over others, but not
51
    /// restrict them based on these values.
52
    priority,
53
}
54
55
/// When a worker is being searched for to run a job, this will be used
56
/// on how to choose which worker should run the job when multiple
57
/// workers are able to run the task.
58
#[allow(non_camel_case_types)]
59
#[derive(Copy, Clone, Deserialize, Debug, Default)]
60
pub enum WorkerAllocationStrategy {
61
    /// Prefer workers that have been least recently used to run a job.
62
    #[default]
63
    least_recently_used,
64
    /// Prefer workers that have been most recently used to run a job.
65
    most_recently_used,
66
}
67
68
0
#[derive(Deserialize, Debug, Default)]
69
#[serde(deny_unknown_fields)]
70
pub struct SimpleSpec {
71
    /// A list of supported platform properties mapped to how these properties
72
    /// are used when the scheduler looks for worker nodes capable of running
73
    /// the task.
74
    ///
75
    /// For example, a value of:
76
    /// ```json
77
    /// { "cpu_count": "minimum", "cpu_arch": "exact" }
78
    /// ```
79
    /// With a job that contains:
80
    /// ```json
81
    /// { "cpu_count": "8", "cpu_arch": "arm" }
82
    /// ```
83
    /// Will result in the scheduler filtering out any workers that do not have
84
    /// `"cpu_arch" = "arm"` and filter out any workers that have less than 8 cpu
85
    /// cores available.
86
    ///
87
    /// The property names here must match the property keys provided by the
88
    /// worker nodes when they join the pool. In other words, the workers will
89
    /// publish their capabilities to the scheduler when they join the worker
90
    /// pool. If the worker fails to notify the scheduler of its (for example)
91
    /// `"cpu_arch"`, the scheduler will never send any jobs to it, if all jobs
92
    /// have the `"cpu_arch"` label. There is no special treatment of any platform
93
    /// property labels other and entirely driven by worker configs and this
94
    /// config.
95
    pub supported_platform_properties: Option<HashMap<String, PropertyType>>,
96
97
    /// The amount of time to retain completed actions in memory for in case
98
    /// a `WaitExecution` is called after the action has completed.
99
    /// Default: 60 (seconds)
100
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
101
    pub retain_completed_for_s: u32,
102
103
    /// Mark operations as completed with error if no client has updated them
104
    /// within this duration.
105
    /// Default: 60 (seconds)
106
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
107
    pub client_action_timeout_s: u64,
108
109
    /// Remove workers from pool once the worker has not responded in this
110
    /// amount of time in seconds.
111
    /// Default: 5 (seconds)
112
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
113
    pub worker_timeout_s: u64,
114
115
    /// If a job returns an internal error or times out this many times when
116
    /// attempting to run on a worker the scheduler will return the last error
117
    /// to the client. Jobs will be retried and this configuration is to help
118
    /// prevent one rogue job from infinitely retrying and taking up a lot of
119
    /// resources when the task itself is the one causing the server to go
120
    /// into a bad state.
121
    /// Default: 3
122
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
123
    pub max_job_retries: usize,
124
125
    /// The strategy used to assign workers jobs.
126
    #[serde(default)]
127
    pub allocation_strategy: WorkerAllocationStrategy,
128
129
    /// The storage backend to use for the scheduler.
130
    /// Default: memory
131
    pub experimental_backend: Option<ExperimentalSimpleSchedulerBackend>,
132
}
133
134
#[allow(non_camel_case_types)]
135
#[derive(Deserialize, Debug)]
136
pub enum ExperimentalSimpleSchedulerBackend {
137
    /// Use an in-memory store for the scheduler.
138
    memory,
139
    /// Use a redis store for the scheduler.
140
    redis(ExperimentalRedisSchedulerBackend),
141
}
142
143
#[derive(Deserialize, Debug, Default)]
144
#[serde(deny_unknown_fields)]
145
pub struct ExperimentalRedisSchedulerBackend {
146
    /// A reference to the redis store to use for the scheduler.
147
    /// Note: This MUST resolve to a `RedisSpec`.
148
    pub redis_store: StoreRefName,
149
}
150
151
/// A scheduler that simply forwards requests to an upstream scheduler.  This
152
/// is useful to use when doing some kind of local action cache or CAS away from
153
/// the main cluster of workers.  In general, it's more efficient to point the
154
/// build at the main scheduler directly though.
155
#[derive(Deserialize, Debug)]
156
#[serde(deny_unknown_fields)]
157
pub struct GrpcSpec {
158
    /// The upstream scheduler to forward requests to.
159
    pub endpoint: GrpcEndpoint,
160
161
    /// Retry configuration to use when a network request fails.
162
    #[serde(default)]
163
    pub retry: Retry,
164
165
    /// Limit the number of simultaneous upstream requests to this many.  A
166
    /// value of zero is treated as unlimited.  If the limit is reached the
167
    /// request is queued.
168
    #[serde(default)]
169
    pub max_concurrent_requests: usize,
170
171
    /// The number of connections to make to each specified endpoint to balance
172
    /// the load over multiple TCP connections.  Default 1.
173
    #[serde(default)]
174
    pub connections_per_endpoint: usize,
175
}
176
177
#[derive(Deserialize, Debug)]
178
#[serde(deny_unknown_fields)]
179
pub struct CacheLookupSpec {
180
    /// The reference to the action cache store used to return cached
181
    /// actions from rather than running them again.
182
    /// To prevent unintended issues, this store should probably be a `CompletenessCheckingSpec`.
183
    pub ac_store: StoreRefName,
184
185
    /// The nested scheduler to use if cache lookup fails.
186
    pub scheduler: Box<SchedulerSpec>,
187
}
188
189
#[derive(Deserialize, Debug, Clone)]
190
#[serde(deny_unknown_fields)]
191
pub struct PlatformPropertyAddition {
192
    /// The name of the property to add.
193
    pub name: String,
194
    /// The value to assign to the property.
195
    pub value: String,
196
}
197
198
#[allow(non_camel_case_types)]
199
#[derive(Deserialize, Debug, Clone)]
200
pub enum PropertyModification {
201
    /// Add a property to the action properties.
202
    add(PlatformPropertyAddition),
203
    /// Remove a named property from the action.
204
    remove(String),
205
}
206
207
#[derive(Deserialize, Debug)]
208
#[serde(deny_unknown_fields)]
209
pub struct PropertyModifierSpec {
210
    /// A list of modifications to perform to incoming actions for the nested
211
    /// scheduler.  These are performed in order and blindly, so removing a
212
    /// property that doesn't exist is fine and overwriting an existing property
213
    /// is also fine.  If adding properties that do not exist in the nested
214
    /// scheduler is not supported and will likely cause unexpected behaviour.
215
    pub modifications: Vec<PropertyModification>,
216
217
    /// The nested scheduler to use after modifying the properties.
218
    pub scheduler: Box<SchedulerSpec>,
219
}