Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/shutdown_guard.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
17
use tokio::sync::watch;
18
19
#[derive(Clone, Copy, Eq, PartialEq, Hash)]
20
pub enum Priority {
21
    // The least important priority.
22
    LeastImportant = 2,
23
    // All priorities greater than 1 must be complted.
24
    P1 = 1,
25
    // All other priorities must be completed.
26
    P0 = 0,
27
}
28
29
impl From<usize> for Priority {
30
0
    fn from(value: usize) -> Self {
31
0
        match value {
32
0
            0 => Priority::P0,
33
0
            1 => Priority::P1,
34
0
            _ => Priority::LeastImportant,
35
        }
36
0
    }
37
}
38
39
impl From<Priority> for usize {
40
0
    fn from(priority: Priority) -> usize {
41
0
        match priority {
42
0
            Priority::P0 => 0,
43
0
            Priority::P1 => 1,
44
0
            Priority::LeastImportant => 2,
45
        }
46
0
    }
47
}
48
49
/// Tracks other services that have registered to be notified when
50
/// the process is being shut down.
51
pub struct ShutdownGuard {
52
    priority: Priority,
53
    tx: watch::Sender<HashMap<Priority, usize>>,
54
    rx: watch::Receiver<HashMap<Priority, usize>>,
55
}
56
57
impl ShutdownGuard {
58
    /// Waits for all priorities less important than the given
59
    /// priority to be completed.
60
0
    pub async fn wait_for(&mut self, priority: Priority) {
61
0
        if priority != self.priority {
  Branch (61:12): [Folded - Ignored]
  Branch (61:12): [Folded - Ignored]
62
0
            // Promote our priority to the new priority.
63
0
            self.tx.send_modify(|map| {
64
0
                let old_count = map.remove(&self.priority).unwrap_or(0).saturating_sub(1);
65
0
                map.insert(self.priority, old_count);
66
0
67
0
                self.priority = priority;
68
0
69
0
                let new_count = map.get(&priority).unwrap_or(&0).saturating_add(1);
70
0
                map.insert(priority, new_count);
71
0
            });
72
0
        }
73
        // Ignore error because the receiver will never be closed
74
        // if the sender is still alive here.
75
0
        let _ = self
76
0
            .rx
77
0
            .wait_for(|map| {
78
0
                let start = usize::from(priority) + 1;
79
0
                let end = usize::from(Priority::LeastImportant);
80
0
                for p in start..=end {
81
0
                    if *map.get(&p.into()).unwrap_or(&0) > 0 {
  Branch (81:24): [Folded - Ignored]
  Branch (81:24): [Folded - Ignored]
82
0
                        return false;
83
0
                    }
84
                }
85
0
                true
86
0
            })
87
0
            .await;
88
0
    }
89
}
90
91
impl Default for ShutdownGuard {
92
0
    fn default() -> Self {
93
0
        let priority = Priority::LeastImportant;
94
0
        let mut map = HashMap::new();
95
0
        map.insert(priority, 0);
96
0
        let (tx, rx) = watch::channel(map);
97
0
        Self { priority, tx, rx }
98
0
    }
99
}
100
101
impl Clone for ShutdownGuard {
102
0
    fn clone(&self) -> Self {
103
0
        self.tx.send_modify(|map| {
104
0
            map.insert(
105
0
                self.priority,
106
0
                map.get(&Priority::LeastImportant)
107
0
                    .unwrap_or(&0)
108
0
                    .saturating_add(1),
109
0
            );
110
0
        });
111
0
        Self {
112
0
            priority: Priority::LeastImportant,
113
0
            tx: self.tx.clone(),
114
0
            rx: self.rx.clone(),
115
0
        }
116
0
    }
117
}
118
119
impl Drop for ShutdownGuard {
120
0
    fn drop(&mut self) {
121
0
        self.tx.send_modify(|map| {
122
0
            map.insert(
123
0
                self.priority,
124
0
                map.get(&self.priority).unwrap_or(&0).saturating_sub(1),
125
0
            );
126
0
        });
127
0
    }
128
}