Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/existence_cache_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::pin::Pin;
17
use std::sync::Arc;
18
use std::time::SystemTime;
19
20
use async_trait::async_trait;
21
use nativelink_config::stores::{EvictionPolicy, ExistenceCacheSpec};
22
use nativelink_error::{error_if, Error, ResultExt};
23
use nativelink_metric::MetricsComponent;
24
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
25
use nativelink_util::common::DigestInfo;
26
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
27
use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator};
28
use nativelink_util::instant_wrapper::InstantWrapper;
29
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
30
31
#[derive(Clone, Debug)]
32
struct ExistanceItem(u64);
33
34
impl LenEntry for ExistanceItem {
35
    #[inline]
36
15
    fn len(&self) -> u64 {
37
15
        self.0
38
15
    }
39
40
    #[inline]
41
0
    fn is_empty(&self) -> bool {
42
0
        false
43
0
    }
44
}
45
46
#[derive(MetricsComponent)]
47
pub struct ExistenceCacheStore<I: InstantWrapper> {
48
    #[metric(group = "inner_store")]
49
    inner_store: Store,
50
    existence_cache: EvictingMap<DigestInfo, ExistanceItem, I>,
51
}
52
53
impl ExistenceCacheStore<SystemTime> {
54
3
    pub fn new(spec: &ExistenceCacheSpec, inner_store: Store) -> Arc<Self> {
55
3
        Self::new_with_time(spec, inner_store, SystemTime::now())
56
3
    }
57
}
58
59
impl<I: InstantWrapper> ExistenceCacheStore<I> {
60
4
    pub fn new_with_time(
61
4
        spec: &ExistenceCacheSpec,
62
4
        inner_store: Store,
63
4
        anchor_time: I,
64
4
    ) -> Arc<Self> {
65
4
        let empty_policy = EvictionPolicy::default();
66
4
        let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy);
67
4
        Arc::new(Self {
68
4
            inner_store,
69
4
            existence_cache: EvictingMap::new(eviction_policy, anchor_time),
70
4
        })
71
4
    }
72
73
4
    pub async fn exists_in_cache(&self, digest: &DigestInfo) -> bool {
74
4
        let mut results = [None];
75
4
        self.existence_cache
76
4
            .sizes_for_keys([digest], &mut results[..], true /* peek */)
77
4
            .await;
78
4
        results[0].is_some()
79
4
    }
80
81
1
    pub async fn remove_from_cache(&self, digest: &DigestInfo) {
82
1
        self.existence_cache.remove(digest).await;
83
1
    }
84
85
8
    async fn inner_has_with_results(
86
8
        self: Pin<&Self>,
87
8
        keys: &[DigestInfo],
88
8
        results: &mut [Option<u64>],
89
8
    ) -> Result<(), Error> {
90
8
        self.existence_cache
91
8
            .sizes_for_keys(keys, results, true /* peek */)
92
8
            .await;
93
94
8
        let not_cached_keys: Vec<_> = keys
95
8
            .iter()
96
8
            .zip(results.iter())
97
8
            .filter_map(|(digest, result)| result.map_or_else(|| 
Some(digest.into())5
, |_|
None3
))
98
8
            .collect();
99
8
100
8
        // Hot path optimization when all keys are cached.
101
8
        if not_cached_keys.is_empty() {
  Branch (101:12): [True: 0, False: 0]
  Branch (101:12): [True: 0, False: 3]
  Branch (101:12): [True: 3, False: 2]
  Branch (101:12): [Folded - Ignored]
102
3
            return Ok(());
103
5
        }
104
5
105
5
        // Now query only the items not found in the cache.
106
5
        let mut inner_results = vec![None; not_cached_keys.len()];
107
5
        self.inner_store
108
5
            .has_with_results(&not_cached_keys, &mut inner_results)
109
5
            .await
110
5
            .err_tip(|| 
"In ExistenceCacheStore::inner_has_with_results"0
)
?0
;
111
112
        // Insert found from previous query into our cache.
113
        {
114
            // Note: Sadly due to some weird lifetime issues we need to collect here, but
115
            // in theory we don't actually need to collect.
116
5
            let inserts = not_cached_keys
117
5
                .iter()
118
5
                .zip(inner_results.iter())
119
5
                .filter_map(|(key, result)| {
120
5
                    result.map(|size| 
(key.borrow().into_digest(), ExistanceItem(size))2
)
121
5
                })
122
5
                .collect::<Vec<_>>();
123
5
            let _ = self.existence_cache.insert_many(inserts).await;
124
        }
125
126
        // Merge the results from the cache and the query.
127
        {
128
5
            let mut inner_results_iter = inner_results.into_iter();
129
            // We know at this point that any None in results was queried and will have
130
            // a result in inner_results_iter, so use this knowledge to fill in the results.
131
5
            for result in results.iter_mut() {
132
5
                if result.is_none() {
  Branch (132:20): [True: 0, False: 0]
  Branch (132:20): [True: 3, False: 0]
  Branch (132:20): [True: 2, False: 0]
  Branch (132:20): [Folded - Ignored]
133
5
                    *result = inner_results_iter
134
5
                        .next()
135
5
                        .expect("has_with_results returned less results than expected");
136
5
                
}0
137
            }
138
            // Ensure that there was no logic error by ensuring our iterator is not empty.
139
0
            error_if!(
140
5
                inner_results_iter.next().is_some(),
  Branch (140:17): [True: 0, False: 0]
  Branch (140:17): [True: 0, False: 3]
  Branch (140:17): [True: 0, False: 2]
  Branch (140:17): [Folded - Ignored]
141
                "has_with_results returned more results than expected"
142
            );
143
        }
144
145
5
        Ok(())
146
8
    }
147
}
148
149
#[async_trait]
150
impl<I: InstantWrapper> StoreDriver for ExistenceCacheStore<I> {
151
    async fn has_with_results(
152
        self: Pin<&Self>,
153
        digests: &[StoreKey<'_>],
154
        results: &mut [Option<u64>],
155
6
    ) -> Result<(), Error> {
156
        // TODO(allada) This is a bit of a hack to get around the lifetime issues with the
157
        // existence_cache. We need to convert the digests to owned values to be able to
158
        // insert them into the cache. In theory it should be able to elide this conversion
159
        // but it seems to be a bit tricky to get right.
160
6
        let digests: Vec<_> = digests
161
6
            .iter()
162
6
            .map(|key| key.borrow().into_digest())
163
6
            .collect();
164
6
        self.inner_has_with_results(&digests, results).await
165
12
    }
166
167
    async fn update(
168
        self: Pin<&Self>,
169
        key: StoreKey<'_>,
170
        mut reader: DropCloserReadHalf,
171
        size_info: UploadSizeInfo,
172
2
    ) -> Result<(), Error> {
173
2
        let digest = key.into_digest();
174
2
        let mut exists = [None];
175
2
        self.inner_has_with_results(&[digest], &mut exists)
176
2
            .await
177
2
            .err_tip(|| 
"In ExistenceCacheStore::update"0
)
?0
;
178
2
        if exists[0].is_some() {
  Branch (178:12): [True: 0, False: 0]
  Branch (178:12): [True: 0, False: 2]
  Branch (178:12): [True: 0, False: 0]
  Branch (178:12): [Folded - Ignored]
179
            // We need to drain the reader to avoid the writer complaining that we dropped
180
            // the connection prematurely.
181
0
            reader
182
0
                .drain()
183
0
                .await
184
0
                .err_tip(|| "In ExistenceCacheStore::update")?;
185
0
            return Ok(());
186
2
        }
187
2
        let result = self.inner_store.update(digest, reader, size_info).await;
188
2
        if result.is_ok() {
  Branch (188:12): [True: 0, False: 0]
  Branch (188:12): [True: 2, False: 0]
  Branch (188:12): [True: 0, False: 0]
  Branch (188:12): [Folded - Ignored]
189
2
            if let UploadSizeInfo::ExactSize(size) = size_info {
  Branch (189:20): [True: 0, False: 0]
  Branch (189:20): [True: 2, False: 0]
  Branch (189:20): [True: 0, False: 0]
  Branch (189:20): [Folded - Ignored]
190
2
                let _ = self
191
2
                    .existence_cache
192
2
                    .insert(digest, ExistanceItem(size))
193
2
                    .await;
194
0
            }
195
0
        }
196
2
        result
197
4
    }
198
199
    async fn get_part(
200
        self: Pin<&Self>,
201
        key: StoreKey<'_>,
202
        writer: &mut DropCloserWriteHalf,
203
        offset: u64,
204
        length: Option<u64>,
205
1
    ) -> Result<(), Error> {
206
1
        let digest = key.into_digest();
207
1
        let result = self
208
1
            .inner_store
209
1
            .get_part(digest, writer, offset, length)
210
1
            .await;
211
1
        if result.is_ok() {
  Branch (211:12): [True: 0, False: 0]
  Branch (211:12): [True: 1, False: 0]
  Branch (211:12): [True: 0, False: 0]
  Branch (211:12): [Folded - Ignored]
212
1
            let _ = self
213
1
                .existence_cache
214
1
                .insert(digest, ExistanceItem(digest.size_bytes()))
215
1
                .await;
216
0
        }
217
1
        result
218
2
    }
219
220
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
221
0
        self
222
0
    }
223
224
0
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
225
0
        self
226
0
    }
227
228
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
229
0
        self
230
0
    }
231
}
232
233
#[async_trait]
234
impl<I: InstantWrapper> HealthStatusIndicator for ExistenceCacheStore<I> {
235
0
    fn get_name(&self) -> &'static str {
236
0
        "ExistenceCacheStore"
237
0
    }
238
239
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
240
0
        StoreDriver::check_health(Pin::new(self), namespace).await
241
0
    }
242
}