Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::{SystemTime, UNIX_EPOCH};
17
18
use nativelink_error::{make_input_err, Error, ResultExt};
19
use nativelink_metric::{
20
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
21
};
22
use nativelink_util::action_messages::{
23
    ActionInfo, ActionStage, ActionState, OperationId, WorkerId,
24
};
25
use serde::{Deserialize, Serialize};
26
use static_assertions::{assert_eq_size, const_assert, const_assert_eq};
27
28
/// The version of the awaited action.
29
/// This number will always increment by one each time
30
/// the action is updated.
31
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
32
struct AwaitedActionVersion(u64);
33
34
impl MetricsComponent for AwaitedActionVersion {
35
0
    fn publish(
36
0
        &self,
37
0
        _kind: MetricKind,
38
0
        _field_metadata: MetricFieldData,
39
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
40
0
        Ok(MetricPublishKnownKindData::Counter(self.0))
41
0
    }
42
}
43
44
/// An action that is being awaited on and last known state.
45
#[derive(Debug, Clone, MetricsComponent, Serialize, Deserialize)]
46
pub struct AwaitedAction {
47
    /// The current version of the action.
48
    #[metric(help = "The version of the AwaitedAction")]
49
    version: AwaitedActionVersion,
50
51
    /// The action that is being awaited on.
52
    #[metric(help = "The action info of the AwaitedAction")]
53
    action_info: Arc<ActionInfo>,
54
55
    /// The operation id of the action.
56
    // If you need the client operation id, it may be set in
57
    // ActionState::operation_id.
58
    #[metric(help = "The operation id of the AwaitedAction")]
59
    operation_id: OperationId,
60
61
    /// The currentsort key used to order the actions.
62
    #[metric(help = "The sort key of the AwaitedAction")]
63
    sort_key: AwaitedActionSortKey,
64
65
    /// The time the action was last updated.
66
    #[metric(help = "The last time the worker updated the AwaitedAction")]
67
    last_worker_updated_timestamp: SystemTime,
68
69
    /// The last time the client sent a keepalive message.
70
    #[metric(help = "The last time the client sent a keepalive message")]
71
    last_client_keepalive_timestamp: SystemTime,
72
73
    /// Worker that is currently running this action, None if unassigned.
74
    #[metric(help = "The worker id of the AwaitedAction")]
75
    worker_id: Option<WorkerId>,
76
77
    /// The current state of the action.
78
    #[metric(help = "The state of the AwaitedAction")]
79
    state: Arc<ActionState>,
80
81
    /// Number of attempts the job has been tried.
82
    #[metric(help = "The number of attempts the AwaitedAction has been tried")]
83
    pub attempts: usize,
84
}
85
86
impl AwaitedAction {
87
33
    pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>, now: SystemTime) -> Self {
88
33
        let stage = ActionStage::Queued;
89
33
        let sort_key = AwaitedActionSortKey::new_with_unique_key(
90
33
            action_info.priority,
91
33
            &action_info.insert_timestamp,
92
33
        );
93
33
        let state = Arc::new(ActionState {
94
33
            stage,
95
33
            // Note: We don't use the real client_operation_id here because
96
33
            // the only place AwaitedAction::new should ever be called is
97
33
            // when the action is first created and this struct will be stored
98
33
            // in the database, so we don't want to accidentally leak the
99
33
            // client_operation_id to all clients.
100
33
            client_operation_id: operation_id.clone(),
101
33
            action_digest: action_info.unique_qualifier.digest(),
102
33
        });
103
33
        Self {
104
33
            version: AwaitedActionVersion(0),
105
33
            action_info,
106
33
            operation_id,
107
33
            sort_key,
108
33
            attempts: 0,
109
33
            last_worker_updated_timestamp: now,
110
33
            last_client_keepalive_timestamp: now,
111
33
            worker_id: None,
112
33
            state,
113
33
        }
114
33
    }
115
116
80
    pub(crate) fn version(&self) -> u64 {
117
80
        self.version.0
118
80
    }
119
120
2
    pub(crate) fn set_version(&mut self, version: u64) {
121
2
        self.version = AwaitedActionVersion(version);
122
2
    }
123
124
39
    pub(crate) fn increment_version(&mut self) {
125
39
        self.version = AwaitedActionVersion(self.version.0 + 1);
126
39
    }
127
128
170
    pub fn action_info(&self) -> &Arc<ActionInfo> {
129
170
        &self.action_info
130
170
    }
131
132
91
    pub fn operation_id(&self) -> &OperationId {
133
91
        &self.operation_id
134
91
    }
135
136
64
    pub(crate) fn sort_key(&self) -> AwaitedActionSortKey {
137
64
        self.sort_key
138
64
    }
139
140
371
    pub fn state(&self) -> &Arc<ActionState> {
141
371
        &self.state
142
371
    }
143
144
53
    pub(crate) fn worker_id(&self) -> Option<WorkerId> {
145
53
        self.worker_id
146
53
    }
147
148
46
    pub(crate) fn last_worker_updated_timestamp(&self) -> SystemTime {
149
46
        self.last_worker_updated_timestamp
150
46
    }
151
152
77
    pub(crate) fn worker_keep_alive(&mut self, now: SystemTime) {
153
77
        self.last_worker_updated_timestamp = now;
154
77
    }
155
156
47
    pub(crate) fn last_client_keepalive_timestamp(&self) -> SystemTime {
157
47
        self.last_client_keepalive_timestamp
158
47
    }
159
0
    pub(crate) fn update_client_keep_alive(&mut self, now: SystemTime) {
160
0
        self.last_client_keepalive_timestamp = now;
161
0
    }
162
163
60
    pub(crate) fn set_client_operation_id(&mut self, client_operation_id: OperationId) {
164
60
        Arc::make_mut(&mut self.state).client_operation_id = client_operation_id;
165
60
    }
166
167
    /// Sets the worker id that is currently processing this action.
168
41
    pub(crate) fn set_worker_id(&mut self, new_maybe_worker_id: Option<WorkerId>, now: SystemTime) {
169
41
        if self.worker_id != new_maybe_worker_id {
  Branch (169:12): [True: 35, False: 6]
  Branch (169:12): [Folded - Ignored]
170
35
            self.worker_id = new_maybe_worker_id;
171
35
            self.worker_keep_alive(now);
172
35
        
}6
173
41
    }
174
175
    /// Sets the current state of the action and updates the last worker updated timestamp.
176
42
    pub fn worker_set_state(&mut self, mut state: Arc<ActionState>, now: SystemTime) {
177
42
        std::mem::swap(&mut self.state, &mut state);
178
42
        self.worker_keep_alive(now);
179
42
    }
180
}
181
182
impl TryFrom<&[u8]> for AwaitedAction {
183
    type Error = Error;
184
0
    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
185
0
        serde_json::from_slice(value)
186
0
            .map_err(|e| make_input_err!("{}", e.to_string()))
187
0
            .err_tip(|| "In AwaitedAction::TryFrom::&[u8]")
188
0
    }
189
}
190
191
/// The key used to sort the awaited actions.
192
///
193
/// The rules for sorting are as follows:
194
/// 1. priority of the action
195
/// 2. insert order of the action (lower = higher priority)
196
/// 3. (mostly random hash based on the action info)
197
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
198
#[repr(transparent)]
199
pub struct AwaitedActionSortKey(u64);
200
201
impl MetricsComponent for AwaitedActionSortKey {
202
0
    fn publish(
203
0
        &self,
204
0
        _kind: MetricKind,
205
0
        _field_metadata: MetricFieldData,
206
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
207
0
        Ok(MetricPublishKnownKindData::Counter(self.0))
208
0
    }
209
}
210
211
impl AwaitedActionSortKey {
212
    #[rustfmt::skip]
213
33
    const fn new(priority: i32, insert_timestamp: u32) -> Self {
214
        // Shift `new_priority` so [`i32::MIN`] is represented by zero.
215
        // This makes it so any nagative values are positive, but
216
        // maintains ordering.
217
        const MIN_I32: i64 = (i32::MIN as i64).abs();
218
33
        let priority = ((priority as i64 + MIN_I32) as u32).to_be_bytes();
219
33
220
33
        // Invert our timestamp so the larger the timestamp the lower the number.
221
33
        // This makes timestamp descending order instead of ascending.
222
33
        let timestamp = (insert_timestamp ^ u32::MAX).to_be_bytes();
223
33
224
33
        AwaitedActionSortKey(u64::from_be_bytes([
225
33
            priority[0], priority[1], priority[2], priority[3],
226
33
            timestamp[0], timestamp[1], timestamp[2], timestamp[3],
227
33
        ]))
228
33
    }
229
230
33
    fn new_with_unique_key(priority: i32, insert_timestamp: &SystemTime) -> Self {
231
33
        let timestamp = insert_timestamp
232
33
            .duration_since(UNIX_EPOCH)
233
33
            .unwrap()
234
33
            .as_secs() as u32;
235
33
        Self::new(priority, timestamp)
236
33
    }
237
238
2
    pub(crate) fn as_u64(self) -> u64 {
239
2
        self.0
240
2
    }
241
}
242
243
// Ensure the size of the sort key is the same as a `u64`.
244
assert_eq_size!(AwaitedActionSortKey, u64);
245
246
const_assert_eq!(
247
    AwaitedActionSortKey::new(0x1234_5678, 0x9abc_def0).0,
248
    // Note: Result has 0x12345678 + 0x80000000 = 0x92345678 because we need
249
    // to shift the `i32::MIN` value to be represented by zero.
250
    // Note: `6543210f` are the inverted bits of `9abcdef0`.
251
    // This effectively inverts the priority to now have the highest priority
252
    // be the lowest timestamps.
253
    AwaitedActionSortKey(0x9234_5678_6543_210f).0
254
);
255
// Ensure the priority is used as the sort key first.
256
const_assert!(
257
    AwaitedActionSortKey::new(i32::MAX, 0).0 > AwaitedActionSortKey::new(i32::MAX - 1, 0).0
258
);
259
const_assert!(AwaitedActionSortKey::new(i32::MAX - 1, 0).0 > AwaitedActionSortKey::new(1, 0).0);
260
const_assert!(AwaitedActionSortKey::new(1, 0).0 > AwaitedActionSortKey::new(0, 0).0);
261
const_assert!(AwaitedActionSortKey::new(0, 0).0 > AwaitedActionSortKey::new(-1, 0).0);
262
const_assert!(AwaitedActionSortKey::new(-1, 0).0 > AwaitedActionSortKey::new(i32::MIN + 1, 0).0);
263
const_assert!(
264
    AwaitedActionSortKey::new(i32::MIN + 1, 0).0 > AwaitedActionSortKey::new(i32::MIN, 0).0
265
);
266
267
// Ensure the insert timestamp is used as the sort key second.
268
const_assert!(AwaitedActionSortKey::new(0, u32::MIN).0 > AwaitedActionSortKey::new(0, u32::MAX).0);