/build/source/nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::sync::Arc; |
16 | | use std::time::{SystemTime, UNIX_EPOCH}; |
17 | | |
18 | | use nativelink_error::{make_input_err, Error, ResultExt}; |
19 | | use nativelink_metric::{ |
20 | | MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, |
21 | | }; |
22 | | use nativelink_util::action_messages::{ |
23 | | ActionInfo, ActionStage, ActionState, OperationId, WorkerId, |
24 | | }; |
25 | | use serde::{Deserialize, Serialize}; |
26 | | use static_assertions::{assert_eq_size, const_assert, const_assert_eq}; |
27 | | |
28 | | /// The version of the awaited action. |
29 | | /// This number will always increment by one each time |
30 | | /// the action is updated. |
31 | | #[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)] |
32 | | struct AwaitedActionVersion(u64); |
33 | | |
34 | | impl MetricsComponent for AwaitedActionVersion { |
35 | 0 | fn publish( |
36 | 0 | &self, |
37 | 0 | _kind: MetricKind, |
38 | 0 | _field_metadata: MetricFieldData, |
39 | 0 | ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { |
40 | 0 | Ok(MetricPublishKnownKindData::Counter(self.0)) |
41 | 0 | } |
42 | | } |
43 | | |
44 | | /// An action that is being awaited on and last known state. |
45 | | #[derive(Debug, Clone, MetricsComponent, Serialize, Deserialize)] |
46 | | pub struct AwaitedAction { |
47 | | /// The current version of the action. |
48 | | #[metric(help = "The version of the AwaitedAction")] |
49 | | version: AwaitedActionVersion, |
50 | | |
51 | | /// The action that is being awaited on. |
52 | | #[metric(help = "The action info of the AwaitedAction")] |
53 | | action_info: Arc<ActionInfo>, |
54 | | |
55 | | /// The operation id of the action. |
56 | | // If you need the client operation id, it may be set in |
57 | | // ActionState::operation_id. |
58 | | #[metric(help = "The operation id of the AwaitedAction")] |
59 | | operation_id: OperationId, |
60 | | |
61 | | /// The currentsort key used to order the actions. |
62 | | #[metric(help = "The sort key of the AwaitedAction")] |
63 | | sort_key: AwaitedActionSortKey, |
64 | | |
65 | | /// The time the action was last updated. |
66 | | #[metric(help = "The last time the worker updated the AwaitedAction")] |
67 | | last_worker_updated_timestamp: SystemTime, |
68 | | |
69 | | /// The last time the client sent a keepalive message. |
70 | | #[metric(help = "The last time the client sent a keepalive message")] |
71 | | last_client_keepalive_timestamp: SystemTime, |
72 | | |
73 | | /// Worker that is currently running this action, None if unassigned. |
74 | | #[metric(help = "The worker id of the AwaitedAction")] |
75 | | worker_id: Option<WorkerId>, |
76 | | |
77 | | /// The current state of the action. |
78 | | #[metric(help = "The state of the AwaitedAction")] |
79 | | state: Arc<ActionState>, |
80 | | |
81 | | /// Number of attempts the job has been tried. |
82 | | #[metric(help = "The number of attempts the AwaitedAction has been tried")] |
83 | | pub attempts: usize, |
84 | | } |
85 | | |
86 | | impl AwaitedAction { |
87 | 33 | pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>, now: SystemTime) -> Self { |
88 | 33 | let stage = ActionStage::Queued; |
89 | 33 | let sort_key = AwaitedActionSortKey::new_with_unique_key( |
90 | 33 | action_info.priority, |
91 | 33 | &action_info.insert_timestamp, |
92 | 33 | ); |
93 | 33 | let state = Arc::new(ActionState { |
94 | 33 | stage, |
95 | 33 | // Note: We don't use the real client_operation_id here because |
96 | 33 | // the only place AwaitedAction::new should ever be called is |
97 | 33 | // when the action is first created and this struct will be stored |
98 | 33 | // in the database, so we don't want to accidentally leak the |
99 | 33 | // client_operation_id to all clients. |
100 | 33 | client_operation_id: operation_id.clone(), |
101 | 33 | action_digest: action_info.unique_qualifier.digest(), |
102 | 33 | }); |
103 | 33 | Self { |
104 | 33 | version: AwaitedActionVersion(0), |
105 | 33 | action_info, |
106 | 33 | operation_id, |
107 | 33 | sort_key, |
108 | 33 | attempts: 0, |
109 | 33 | last_worker_updated_timestamp: now, |
110 | 33 | last_client_keepalive_timestamp: now, |
111 | 33 | worker_id: None, |
112 | 33 | state, |
113 | 33 | } |
114 | 33 | } |
115 | | |
116 | 80 | pub(crate) fn version(&self) -> u64 { |
117 | 80 | self.version.0 |
118 | 80 | } |
119 | | |
120 | 2 | pub(crate) fn set_version(&mut self, version: u64) { |
121 | 2 | self.version = AwaitedActionVersion(version); |
122 | 2 | } |
123 | | |
124 | 39 | pub(crate) fn increment_version(&mut self) { |
125 | 39 | self.version = AwaitedActionVersion(self.version.0 + 1); |
126 | 39 | } |
127 | | |
128 | 170 | pub fn action_info(&self) -> &Arc<ActionInfo> { |
129 | 170 | &self.action_info |
130 | 170 | } |
131 | | |
132 | 91 | pub fn operation_id(&self) -> &OperationId { |
133 | 91 | &self.operation_id |
134 | 91 | } |
135 | | |
136 | 64 | pub(crate) fn sort_key(&self) -> AwaitedActionSortKey { |
137 | 64 | self.sort_key |
138 | 64 | } |
139 | | |
140 | 371 | pub fn state(&self) -> &Arc<ActionState> { |
141 | 371 | &self.state |
142 | 371 | } |
143 | | |
144 | 53 | pub(crate) fn worker_id(&self) -> Option<WorkerId> { |
145 | 53 | self.worker_id |
146 | 53 | } |
147 | | |
148 | 46 | pub(crate) fn last_worker_updated_timestamp(&self) -> SystemTime { |
149 | 46 | self.last_worker_updated_timestamp |
150 | 46 | } |
151 | | |
152 | 77 | pub(crate) fn worker_keep_alive(&mut self, now: SystemTime) { |
153 | 77 | self.last_worker_updated_timestamp = now; |
154 | 77 | } |
155 | | |
156 | 47 | pub(crate) fn last_client_keepalive_timestamp(&self) -> SystemTime { |
157 | 47 | self.last_client_keepalive_timestamp |
158 | 47 | } |
159 | 0 | pub(crate) fn update_client_keep_alive(&mut self, now: SystemTime) { |
160 | 0 | self.last_client_keepalive_timestamp = now; |
161 | 0 | } |
162 | | |
163 | 60 | pub(crate) fn set_client_operation_id(&mut self, client_operation_id: OperationId) { |
164 | 60 | Arc::make_mut(&mut self.state).client_operation_id = client_operation_id; |
165 | 60 | } |
166 | | |
167 | | /// Sets the worker id that is currently processing this action. |
168 | 41 | pub(crate) fn set_worker_id(&mut self, new_maybe_worker_id: Option<WorkerId>, now: SystemTime) { |
169 | 41 | if self.worker_id != new_maybe_worker_id { Branch (169:12): [True: 35, False: 6]
Branch (169:12): [Folded - Ignored]
|
170 | 35 | self.worker_id = new_maybe_worker_id; |
171 | 35 | self.worker_keep_alive(now); |
172 | 35 | }6 |
173 | 41 | } |
174 | | |
175 | | /// Sets the current state of the action and updates the last worker updated timestamp. |
176 | 42 | pub fn worker_set_state(&mut self, mut state: Arc<ActionState>, now: SystemTime) { |
177 | 42 | std::mem::swap(&mut self.state, &mut state); |
178 | 42 | self.worker_keep_alive(now); |
179 | 42 | } |
180 | | } |
181 | | |
182 | | impl TryFrom<&[u8]> for AwaitedAction { |
183 | | type Error = Error; |
184 | 0 | fn try_from(value: &[u8]) -> Result<Self, Self::Error> { |
185 | 0 | serde_json::from_slice(value) |
186 | 0 | .map_err(|e| make_input_err!("{}", e.to_string())) |
187 | 0 | .err_tip(|| "In AwaitedAction::TryFrom::&[u8]") |
188 | 0 | } |
189 | | } |
190 | | |
191 | | /// The key used to sort the awaited actions. |
192 | | /// |
193 | | /// The rules for sorting are as follows: |
194 | | /// 1. priority of the action |
195 | | /// 2. insert order of the action (lower = higher priority) |
196 | | /// 3. (mostly random hash based on the action info) |
197 | | #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] |
198 | | #[repr(transparent)] |
199 | | pub struct AwaitedActionSortKey(u64); |
200 | | |
201 | | impl MetricsComponent for AwaitedActionSortKey { |
202 | 0 | fn publish( |
203 | 0 | &self, |
204 | 0 | _kind: MetricKind, |
205 | 0 | _field_metadata: MetricFieldData, |
206 | 0 | ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { |
207 | 0 | Ok(MetricPublishKnownKindData::Counter(self.0)) |
208 | 0 | } |
209 | | } |
210 | | |
211 | | impl AwaitedActionSortKey { |
212 | | #[rustfmt::skip] |
213 | 33 | const fn new(priority: i32, insert_timestamp: u32) -> Self { |
214 | | // Shift `new_priority` so [`i32::MIN`] is represented by zero. |
215 | | // This makes it so any nagative values are positive, but |
216 | | // maintains ordering. |
217 | | const MIN_I32: i64 = (i32::MIN as i64).abs(); |
218 | 33 | let priority = ((priority as i64 + MIN_I32) as u32).to_be_bytes(); |
219 | 33 | |
220 | 33 | // Invert our timestamp so the larger the timestamp the lower the number. |
221 | 33 | // This makes timestamp descending order instead of ascending. |
222 | 33 | let timestamp = (insert_timestamp ^ u32::MAX).to_be_bytes(); |
223 | 33 | |
224 | 33 | AwaitedActionSortKey(u64::from_be_bytes([ |
225 | 33 | priority[0], priority[1], priority[2], priority[3], |
226 | 33 | timestamp[0], timestamp[1], timestamp[2], timestamp[3], |
227 | 33 | ])) |
228 | 33 | } |
229 | | |
230 | 33 | fn new_with_unique_key(priority: i32, insert_timestamp: &SystemTime) -> Self { |
231 | 33 | let timestamp = insert_timestamp |
232 | 33 | .duration_since(UNIX_EPOCH) |
233 | 33 | .unwrap() |
234 | 33 | .as_secs() as u32; |
235 | 33 | Self::new(priority, timestamp) |
236 | 33 | } |
237 | | |
238 | 2 | pub(crate) fn as_u64(self) -> u64 { |
239 | 2 | self.0 |
240 | 2 | } |
241 | | } |
242 | | |
243 | | // Ensure the size of the sort key is the same as a `u64`. |
244 | | assert_eq_size!(AwaitedActionSortKey, u64); |
245 | | |
246 | | const_assert_eq!( |
247 | | AwaitedActionSortKey::new(0x1234_5678, 0x9abc_def0).0, |
248 | | // Note: Result has 0x12345678 + 0x80000000 = 0x92345678 because we need |
249 | | // to shift the `i32::MIN` value to be represented by zero. |
250 | | // Note: `6543210f` are the inverted bits of `9abcdef0`. |
251 | | // This effectively inverts the priority to now have the highest priority |
252 | | // be the lowest timestamps. |
253 | | AwaitedActionSortKey(0x9234_5678_6543_210f).0 |
254 | | ); |
255 | | // Ensure the priority is used as the sort key first. |
256 | | const_assert!( |
257 | | AwaitedActionSortKey::new(i32::MAX, 0).0 > AwaitedActionSortKey::new(i32::MAX - 1, 0).0 |
258 | | ); |
259 | | const_assert!(AwaitedActionSortKey::new(i32::MAX - 1, 0).0 > AwaitedActionSortKey::new(1, 0).0); |
260 | | const_assert!(AwaitedActionSortKey::new(1, 0).0 > AwaitedActionSortKey::new(0, 0).0); |
261 | | const_assert!(AwaitedActionSortKey::new(0, 0).0 > AwaitedActionSortKey::new(-1, 0).0); |
262 | | const_assert!(AwaitedActionSortKey::new(-1, 0).0 > AwaitedActionSortKey::new(i32::MIN + 1, 0).0); |
263 | | const_assert!( |
264 | | AwaitedActionSortKey::new(i32::MIN + 1, 0).0 > AwaitedActionSortKey::new(i32::MIN, 0).0 |
265 | | ); |
266 | | |
267 | | // Ensure the insert timestamp is used as the sort key second. |
268 | | const_assert!(AwaitedActionSortKey::new(0, u32::MIN).0 > AwaitedActionSortKey::new(0, u32::MAX).0); |