Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/filesystem_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::ffi::{OsStr, OsString};
17
use std::fmt::{Debug, Formatter};
18
use std::pin::Pin;
19
use std::sync::atomic::{AtomicU64, Ordering};
20
use std::sync::{Arc, Weak};
21
use std::time::{Duration, SystemTime};
22
23
use async_lock::RwLock;
24
use async_trait::async_trait;
25
use bytes::BytesMut;
26
use filetime::{set_file_atime, FileTime};
27
use futures::stream::{StreamExt, TryStreamExt};
28
use futures::{Future, TryFutureExt};
29
use nativelink_config::stores::FilesystemSpec;
30
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
31
use nativelink_metric::MetricsComponent;
32
use nativelink_util::buf_channel::{
33
    make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf,
34
};
35
use nativelink_util::common::{fs, DigestInfo};
36
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
37
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
38
use nativelink_util::store_trait::{
39
    StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo,
40
};
41
use nativelink_util::{background_spawn, spawn_blocking};
42
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
43
use tokio::time::{sleep, timeout, Sleep};
44
use tokio_stream::wrappers::ReadDirStream;
45
use tracing::{event, Level};
46
47
use crate::cas_utils::is_zero_digest;
48
49
// Default size to allocate memory of the buffer when reading files.
50
const DEFAULT_BUFF_SIZE: usize = 32 * 1024;
51
// Default block size of all major filesystems is 4KB
52
const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024;
53
54
pub const STR_FOLDER: &str = "s";
55
pub const DIGEST_FOLDER: &str = "d";
56
57
#[derive(Clone, Copy, Debug)]
58
pub enum FileType {
59
    Digest,
60
    String,
61
}
62
63
#[derive(Debug, MetricsComponent)]
64
pub struct SharedContext {
65
    // Used in testing to know how many active drop() spawns are running.
66
    // TODO(allada) It is probably a good idea to use a spin lock during
67
    // destruction of the store to ensure that all files are actually
68
    // deleted (similar to how it is done in tests).
69
    #[metric(help = "Number of active drop spawns")]
70
    pub active_drop_spawns: AtomicU64,
71
    #[metric(help = "Path to the configured temp path")]
72
    temp_path: String,
73
    #[metric(help = "Path to the configured content path")]
74
    content_path: String,
75
}
76
77
#[derive(Eq, PartialEq, Debug)]
78
enum PathType {
79
    Content,
80
    Temp,
81
    Custom(OsString),
82
}
83
84
/// [`EncodedFilePath`] stores the path to the file
85
/// including the context, path type and key to the file.
86
/// The whole [`StoreKey`] is stored as opposed to solely
87
/// the [`DigestInfo`] so that it is more usable for things
88
/// such as BEP -see Issue #1108
89
pub struct EncodedFilePath {
90
    shared_context: Arc<SharedContext>,
91
    path_type: PathType,
92
    key: StoreKey<'static>,
93
}
94
95
impl EncodedFilePath {
96
    #[inline]
97
298
    fn get_file_path(&self) -> Cow<'_, OsStr> {
98
298
        get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.key)
99
298
    }
100
}
101
102
#[inline]
103
377
fn get_file_path_raw<'a>(
104
377
    path_type: &'a PathType,
105
377
    shared_context: &SharedContext,
106
377
    key: &StoreKey<'a>,
107
377
) -> Cow<'a, OsStr> {
108
377
    let 
folder372
= match path_type {
109
207
        PathType::Content => &shared_context.content_path,
110
165
        PathType::Temp => &shared_context.temp_path,
111
5
        PathType::Custom(path) => return Cow::Borrowed(path),
112
    };
113
372
    Cow::Owned(to_full_path_from_key(folder, key))
114
377
}
115
116
impl Drop for EncodedFilePath {
117
80
    fn drop(&mut self) {
118
80
        // `drop()` can be called during shutdown, so we use `path_type` flag to know if the
119
80
        // file actually needs to be deleted.
120
80
        if self.path_type == PathType::Content {
  Branch (120:12): [True: 64, False: 16]
  Branch (120:12): [Folded - Ignored]
121
64
            return;
122
16
        }
123
16
124
16
        let file_path = self.get_file_path().to_os_string();
125
16
        let shared_context = self.shared_context.clone();
126
16
        shared_context
127
16
            .active_drop_spawns
128
16
            .fetch_add(1, Ordering::Relaxed);
129
16
        background_spawn!("filesystem_delete_file", async move {
130
15
            event!(Level::INFO, ?file_path, 
"File deleted"0
,);
131
15
            let 
result12
= fs::remove_file(&file_path)
132
15
                .await
133
12
                .err_tip(|| 
format!("Failed to remove file {file_path:?}")0
);
134
12
            if let Err(
err0
) = result {
  Branch (134:20): [True: 0, False: 12]
  Branch (134:20): [Folded - Ignored]
135
0
                event!(Level::ERROR, ?file_path, ?err, "Failed to delete file",);
136
12
            }
137
12
            shared_context
138
12
                .active_drop_spawns
139
12
                .fetch_sub(1, Ordering::Relaxed);
140
16
        });
141
80
    }
142
}
143
144
/// This creates the file path from the [`StoreKey`]. If
145
/// it is a string, the string, prefixed with [`STR_PREFIX`]
146
/// for backwards compatibility, is stored.
147
///
148
/// If it is a [`DigestInfo`], it is prefixed by [`DIGEST_PREFIX`]
149
/// followed by the string representation of a digest - the hash in hex,
150
/// a hyphen then the size in bytes
151
///
152
/// Previously, only the string representation of the [`DigestInfo`] was
153
/// used with no prefix
154
#[inline]
155
389
fn to_full_path_from_key(folder: &str, key: &StoreKey<'_>) -> OsString {
156
389
    match key {
157
3
        StoreKey::Str(str) => format!("{folder}/{STR_FOLDER}/{str}"),
158
386
        StoreKey::Digest(digest_info) => format!("{folder}/{DIGEST_FOLDER}/{digest_info}"),
159
    }
160
389
    .into()
161
389
}
162
163
pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
164
    /// Responsible for creating the underlying `FileEntry`.
165
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self;
166
167
    /// Creates a (usually) temp file, opens it and returns the path to the temp file.
168
    fn make_and_open_file(
169
        block_size: u64,
170
        encoded_file_path: EncodedFilePath,
171
    ) -> impl Future<Output = Result<(Self, fs::ResumeableFileSlot, OsString), Error>> + Send
172
    where
173
        Self: Sized;
174
175
    /// Returns the underlying reference to the size of the data in bytes
176
    fn data_size_mut(&mut self) -> &mut u64;
177
178
    /// Returns the actual size of the underlying file on the disk after accounting for filesystem block size.
179
    fn size_on_disk(&self) -> u64;
180
181
    /// Gets the underlying `EncodedfilePath`.
182
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath>;
183
184
    /// Returns a reader that will read part of the underlying file.
185
    fn read_file_part(
186
        &self,
187
        offset: u64,
188
        length: u64,
189
    ) -> impl Future<Output = Result<fs::ResumeableFileSlot, Error>> + Send;
190
191
    /// This function is a safe way to extract the file name of the underlying file. To protect users from
192
    /// accidentally creating undefined behavior we encourage users to do the logic they need to do with
193
    /// the filename inside this function instead of extracting the filename and doing the logic outside.
194
    /// This is because the filename is not guaranteed to exist after this function returns, however inside
195
    /// the callback the file is always guaranteed to exist and immutable.
196
    /// DO NOT USE THIS FUNCTION TO EXTRACT THE FILENAME AND STORE IT FOR LATER USE.
197
    fn get_file_path_locked<
198
        T,
199
        Fut: Future<Output = Result<T, Error>> + Send,
200
        F: FnOnce(OsString) -> Fut + Send,
201
    >(
202
        &self,
203
        handler: F,
204
    ) -> impl Future<Output = Result<T, Error>> + Send;
205
}
206
207
pub struct FileEntryImpl {
208
    data_size: u64,
209
    block_size: u64,
210
    encoded_file_path: RwLock<EncodedFilePath>,
211
}
212
213
impl FileEntryImpl {
214
9
    pub fn get_shared_context_for_test(&mut self) -> Arc<SharedContext> {
215
9
        self.encoded_file_path.get_mut().shared_context.clone()
216
9
    }
217
}
218
219
impl FileEntry for FileEntryImpl {
220
80
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
221
80
        Self {
222
80
            data_size,
223
80
            block_size,
224
80
            encoded_file_path,
225
80
        }
226
80
    }
227
228
    /// This encapsulates the logic for the edge case of if the file fails to create
229
    /// the cleanup of the file is handled without creating a `FileEntry`, which would
230
    /// try to cleanup the file as well during `drop()`.
231
74
    async fn make_and_open_file(
232
74
        block_size: u64,
233
74
        encoded_file_path: EncodedFilePath,
234
74
    ) -> Result<(FileEntryImpl, fs::ResumeableFileSlot, OsString), Error> {
235
74
        let temp_full_path = encoded_file_path.get_file_path().to_os_string();
236
74
        let temp_file_result = fs::create_file(temp_full_path.clone())
237
74
            .or_else(|mut err| async 
{0
238
0
                let remove_result = fs::remove_file(&temp_full_path).await.err_tip(|| {
239
0
                    format!("Failed to remove file {temp_full_path:?} in filesystem store")
240
0
                });
241
0
                if let Err(remove_err) = remove_result {
  Branch (241:24): [True: 0, False: 0]
  Branch (241:24): [True: 0, False: 0]
  Branch (241:24): [True: 0, False: 0]
  Branch (241:24): [Folded - Ignored]
  Branch (241:24): [True: 0, False: 0]
242
0
                    err = err.merge(remove_err);
243
0
                }
244
0
                event!(
245
0
                    Level::WARN,
246
                    ?err,
247
                    ?block_size,
248
                    ?temp_full_path,
249
0
                    "Failed to create file",
250
                );
251
0
                Err(err)
252
0
                    .err_tip(|| format!("Failed to create {temp_full_path:?} in filesystem store"))
253
74
            
}0
)
254
74
            .await
?0
;
255
256
74
        Ok((
257
74
            <FileEntryImpl as FileEntry>::create(
258
74
                0, /* Unknown yet, we will fill it in later */
259
74
                block_size,
260
74
                RwLock::new(encoded_file_path),
261
74
            ),
262
74
            temp_file_result,
263
74
            temp_full_path,
264
74
        ))
265
74
    }
266
267
74
    fn data_size_mut(&mut self) -> &mut u64 {
268
74
        &mut self.data_size
269
74
    }
270
271
153
    fn size_on_disk(&self) -> u64 {
272
153
        self.data_size.div_ceil(self.block_size) * self.block_size
273
153
    }
274
275
191
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath> {
276
191
        &self.encoded_file_path
277
191
    }
278
279
28
    async fn read_file_part(
280
28
        &self,
281
28
        offset: u64,
282
28
        length: u64,
283
28
    ) -> Result<fs::ResumeableFileSlot, Error> {
284
28
        let (mut file, full_content_path_for_debug_only) = self
285
28
            .get_file_path_locked(|full_content_path| async move {
286
28
                let file = fs::open_file(full_content_path.clone(), length)
287
28
                    .await
288
28
                    .err_tip(|| {
289
0
                        format!("Failed to open file in filesystem store {full_content_path:?}")
290
28
                    })
?0
;
291
28
                Ok((file, full_content_path))
292
56
            })
293
28
            .await
?0
;
294
295
28
        file.as_reader()
296
28
            .await
297
28
            .err_tip(|| 
"Could not seek file in read_file_part()"0
)
?0
298
28
            .get_mut()
299
28
            .seek(SeekFrom::Start(offset))
300
28
            .await
301
28
            .err_tip(|| 
format!("Failed to seek file: {full_content_path_for_debug_only:?}")0
)
?0
;
302
28
        Ok(file)
303
28
    }
304
305
112
    async fn get_file_path_locked<
306
112
        T,
307
112
        Fut: Future<Output = Result<T, Error>> + Send,
308
112
        F: FnOnce(OsString) -> Fut + Send,
309
112
    >(
310
112
        &self,
311
112
        handler: F,
312
112
    ) -> Result<T, Error> {
313
112
        let encoded_file_path = self.get_encoded_file_path().read().await;
314
112
        handler(encoded_file_path.get_file_path().to_os_string()).await
315
112
    }
316
}
317
318
impl Debug for FileEntryImpl {
319
0
    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
320
0
        f.debug_struct("FileEntryImpl")
321
0
            .field("data_size", &self.data_size)
322
0
            .field("encoded_file_path", &"<behind mutex>")
323
0
            .finish()
324
0
    }
325
}
326
327
91
fn make_temp_digest(mut digest: DigestInfo) -> DigestInfo {
328
    static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
329
91
    let mut hash = *digest.packed_hash();
330
91
    hash[24..].clone_from_slice(
331
91
        &DELETE_FILE_COUNTER
332
91
            .fetch_add(1, Ordering::Relaxed)
333
91
            .to_le_bytes(),
334
91
    );
335
91
    digest.set_packed_hash(*hash);
336
91
    digest
337
91
}
338
339
91
fn make_temp_key(key: &StoreKey) -> StoreKey<'static> {
340
91
    StoreKey::Digest(make_temp_digest(key.borrow().into_digest()))
341
91
}
342
343
impl LenEntry for FileEntryImpl {
344
    #[inline]
345
151
    fn len(&self) -> u64 {
346
151
        self.size_on_disk()
347
151
    }
348
349
0
    fn is_empty(&self) -> bool {
350
0
        self.data_size == 0
351
0
    }
352
353
    #[inline]
354
74
    async fn touch(&self) -> bool {
355
74
        let result = self
356
74
            .get_file_path_locked(move |full_content_path| async move {
357
74
                let full_content_path = full_content_path.clone();
358
74
                spawn_blocking!("filesystem_touch_set_mtime", move || {
359
74
                    set_file_atime(&full_content_path, FileTime::now()).err_tip(|| {
360
2
                        format!("Failed to touch file in filesystem store {full_content_path:?}")
361
74
                    })
362
74
                })
363
74
                .await
364
74
                .map_err(|e| {
365
0
                    make_err!(
366
0
                        Code::Internal,
367
0
                        "Failed to change atime of file due to spawn failing {:?}",
368
0
                        e
369
0
                    )
370
74
                })
?0
371
148
            })
372
74
            .await;
373
74
        if let Err(
err2
) = result {
  Branch (373:16): [True: 0, False: 0]
  Branch (373:16): [True: 2, False: 14]
  Branch (373:16): [True: 0, False: 0]
  Branch (373:16): [True: 0, False: 4]
  Branch (373:16): [Folded - Ignored]
  Branch (373:16): [True: 0, False: 54]
374
2
            event!(Level::ERROR, ?err, "Failed to touch file",);
375
2
            return false;
376
72
        }
377
72
        true
378
74
    }
379
380
    // unref() only triggers when an item is removed from the eviction_map. It is possible
381
    // that another place in code has a reference to `FileEntryImpl` and may later read the
382
    // file. To support this edge case, we first move the file to a temp file and point
383
    // target file location to the new temp file. `unref()` should only ever be called once.
384
    #[inline]
385
18
    async fn unref(&self) {
386
        {
387
18
            let mut encoded_file_path = self.encoded_file_path.write().await;
388
18
            if encoded_file_path.path_type == PathType::Temp {
  Branch (388:16): [True: 0, False: 0]
  Branch (388:16): [True: 1, False: 11]
  Branch (388:16): [True: 0, False: 0]
  Branch (388:16): [True: 0, False: 0]
  Branch (388:16): [Folded - Ignored]
  Branch (388:16): [True: 0, False: 6]
389
                // We are already a temp file that is now marked for deletion on drop.
390
                // This is very rare, but most likely the rename into the content path failed.
391
1
                return;
392
17
            }
393
17
            let from_path = encoded_file_path.get_file_path();
394
17
            let new_key = make_temp_key(&encoded_file_path.key);
395
17
396
17
            let to_path =
397
17
                to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key);
398
399
17
            if let Err(
err2
) = fs::rename(&from_path, &to_path).await {
  Branch (399:20): [True: 0, False: 0]
  Branch (399:20): [True: 2, False: 9]
  Branch (399:20): [True: 0, False: 0]
  Branch (399:20): [True: 0, False: 0]
  Branch (399:20): [Folded - Ignored]
  Branch (399:20): [True: 0, False: 6]
400
2
                event!(
401
2
                    Level::WARN,
402
2
                    key = ?encoded_file_path.key,
403
2
                    ?from_path,
404
2
                    ?to_path,
405
2
                    ?err,
406
2
                    "Failed to rename file",
407
                );
408
            } else {
409
15
                event!(
410
15
                    Level::INFO,
411
0
                    key = ?encoded_file_path.key,
412
0
                    ?from_path,
413
0
                    ?to_path,
414
0
                    "Renamed file",
415
                );
416
15
                encoded_file_path.path_type = PathType::Temp;
417
15
                encoded_file_path.key = new_key;
418
            }
419
        }
420
18
    }
421
}
422
423
#[inline]
424
2
fn digest_from_filename(file_name: &str) -> Result<DigestInfo, Error> {
425
2
    let (hash, size) = file_name.split_once('-').err_tip(|| 
""0
)
?0
;
426
2
    let size = size.parse::<i64>()
?0
;
427
2
    DigestInfo::try_new(hash, size)
428
2
}
429
430
2
pub fn key_from_file(file_name: &str, file_type: FileType) -> Result<StoreKey<'_>, Error> {
431
2
    match file_type {
432
0
        FileType::String => Ok(StoreKey::new_str(file_name)),
433
2
        FileType::Digest => digest_from_filename(file_name).map(StoreKey::Digest),
434
    }
435
2
}
436
437
/// The number of files to read the metadata for at the same time when running
438
/// `add_files_to_cache`.
439
const SIMULTANEOUS_METADATA_READS: usize = 200;
440
441
38
async fn add_files_to_cache<Fe: FileEntry>(
442
38
    evicting_map: &EvictingMap<StoreKeyBorrow, Arc<Fe>, SystemTime>,
443
38
    anchor_time: &SystemTime,
444
38
    shared_context: &Arc<SharedContext>,
445
38
    block_size: u64,
446
38
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
447
38
) -> Result<(), Error> {
448
    #[expect(clippy::too_many_arguments)]
449
1
    async fn process_entry<Fe: FileEntry>(
450
1
        evicting_map: &EvictingMap<StoreKeyBorrow, Arc<Fe>, SystemTime>,
451
1
        file_name: &str,
452
1
        file_type: FileType,
453
1
        atime: SystemTime,
454
1
        data_size: u64,
455
1
        block_size: u64,
456
1
        anchor_time: &SystemTime,
457
1
        shared_context: &Arc<SharedContext>,
458
1
    ) -> Result<(), Error> {
459
1
        let key = key_from_file(file_name, file_type)
?0
;
460
461
1
        let file_entry = Fe::create(
462
1
            data_size,
463
1
            block_size,
464
1
            RwLock::new(EncodedFilePath {
465
1
                shared_context: shared_context.clone(),
466
1
                path_type: PathType::Content,
467
1
                key: key.borrow().into_owned(),
468
1
            }),
469
1
        );
470
1
        let time_since_anchor = anchor_time
471
1
            .duration_since(atime)
472
1
            .map_err(|_| 
make_input_err!("File access time newer than now")0
)
?0
;
473
1
        evicting_map
474
1
            .insert_with_time(
475
1
                key.into_owned().into(),
476
1
                Arc::new(file_entry),
477
1
                time_since_anchor.as_secs() as i32,
478
1
            )
479
1
            .await;
480
1
        Ok(())
481
1
    }
482
483
114
    async fn read_files(
484
114
        folder: Option<&str>,
485
114
        shared_context: &SharedContext,
486
114
    ) -> Result<Vec<(String, SystemTime, u64, bool)>, Error> {
487
        // Note: In Dec 2024 this is for backwards compatibility with the old
488
        // way files were stored on disk. Previously all files were in a single
489
        // folder regardless of the StoreKey type. This allows old versions of
490
        // nativelink file layout to be upgraded at startup time.
491
        // This logic can be removed once more time has passed.
492
114
        let read_dir = if let Some(
folder76
) = folder {
  Branch (492:31): [True: 0, False: 0]
  Branch (492:31): [True: 40, False: 20]
  Branch (492:31): [True: 4, False: 2]
  Branch (492:31): [Folded - Ignored]
  Branch (492:31): [True: 32, False: 16]
493
76
            format!("{}/{folder}/", shared_context.content_path)
494
        } else {
495
38
            format!("{}/", shared_context.content_path)
496
        };
497
114
        let (_permit, dir_handle) = fs::read_dir(read_dir)
498
114
            .await
499
114
            .err_tip(|| 
"Failed opening content directory for iterating in filesystem store"0
)
?0
500
114
            .into_inner();
501
114
502
114
        let read_dir_stream = ReadDirStream::new(dir_handle);
503
114
        read_dir_stream
504
114
            .map(|dir_entry| async move {
505
77
                let dir_entry = dir_entry.unwrap();
506
77
                let file_name = dir_entry.file_name().into_string().unwrap();
507
77
                let metadata = dir_entry
508
77
                    .metadata()
509
77
                    .await
510
77
                    .err_tip(|| 
"Failed to get metadata in filesystem store"0
)
?0
;
511
                // We need to filter out folders - we do not want to try to cache the s and d folders.
512
77
                let is_file =
513
77
                    metadata.is_file() || !(
file_name == STR_FOLDER76
||
file_name == DIGEST_FOLDER38
);
  Branch (513:21): [True: 0, False: 0]
  Branch (513:45): [True: 0, False: 0]
  Branch (513:21): [True: 1, False: 40]
  Branch (513:45): [True: 20, False: 20]
  Branch (513:21): [True: 0, False: 4]
  Branch (513:45): [True: 2, False: 2]
  Branch (513:21): [Folded - Ignored]
  Branch (513:45): [Folded - Ignored]
  Branch (513:21): [True: 0, False: 32]
  Branch (513:45): [True: 16, False: 16]
514
77
                let atime = match metadata.accessed() {
515
77
                    Ok(atime) => atime,
516
0
                    Err(err) => {
517
0
                        panic!(
518
0
                            "{}{}{} : {} {:?}",
519
0
                            "It appears this filesystem does not support access time. ",
520
0
                            "Please configure this program to run on a drive that supports ",
521
0
                            "atime",
522
0
                            file_name,
523
0
                            err
524
0
                        );
525
                    }
526
                };
527
77
                Result::<(String, SystemTime, u64, bool), Error>::Ok((
528
77
                    file_name,
529
77
                    atime,
530
77
                    metadata.len(),
531
77
                    is_file,
532
77
                ))
533
154
            })
534
114
            .buffer_unordered(SIMULTANEOUS_METADATA_READS)
535
114
            .try_collect()
536
114
            .await
537
114
    }
538
539
    /// Note: In Dec 2024 this is for backwards compatibility with the old
540
    /// way files were stored on disk. Previously all files were in a single
541
    /// folder regardless of the [`StoreKey`] type. This moves files from the old cache
542
    /// location to the new cache location, under [`DIGEST_FOLDER`].
543
38
    async fn move_old_cache(
544
38
        shared_context: &Arc<SharedContext>,
545
38
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
546
38
    ) -> Result<(), Error> {
547
38
        let file_infos = read_files(None, shared_context).await
?0
;
548
549
38
        let from_path = shared_context.content_path.to_string();
550
38
551
38
        let to_path = format!("{}/{DIGEST_FOLDER}", shared_context.content_path);
552
553
76
        for (
file_name0
, _, _, _) in
file_infos.into_iter().filter(38
|x| x.3
)38
{
554
0
            let from_file: OsString = format!("{from_path}/{file_name}").into();
555
0
            let to_file: OsString = format!("{to_path}/{file_name}").into();
556
557
0
            if let Err(err) = rename_fn(&from_file, &to_file) {
  Branch (557:20): [True: 0, False: 0]
  Branch (557:20): [True: 0, False: 0]
  Branch (557:20): [True: 0, False: 0]
  Branch (557:20): [Folded - Ignored]
  Branch (557:20): [True: 0, False: 0]
558
0
                event!(
559
0
                    Level::WARN,
560
                    ?from_file,
561
                    ?to_file,
562
                    ?err,
563
0
                    "Failed to rename file",
564
                );
565
            } else {
566
0
                event!(Level::INFO, ?from_file, ?to_file, "Renamed file",);
567
            }
568
        }
569
38
        Ok(())
570
38
    }
571
572
76
    async fn add_files_to_cache<Fe: FileEntry>(
573
76
        evicting_map: &EvictingMap<StoreKeyBorrow, Arc<Fe>, SystemTime>,
574
76
        anchor_time: &SystemTime,
575
76
        shared_context: &Arc<SharedContext>,
576
76
        block_size: u64,
577
76
        folder: &str,
578
76
    ) -> Result<(), Error> {
579
76
        let file_infos = read_files(Some(folder), shared_context).await
?0
;
580
76
        let file_type = match folder {
581
76
            STR_FOLDER => 
FileType::String38
,
582
38
            DIGEST_FOLDER => FileType::Digest,
583
0
            _ => panic!("Invalid folder type"),
584
        };
585
586
76
        let path_root = format!("{}/{folder}", shared_context.content_path);
587
588
76
        for (
file_name, atime, data_size1
, _) in file_infos.into_iter().filter(|x|
x.31
) {
589
1
            let result = process_entry(
590
1
                evicting_map,
591
1
                &file_name,
592
1
                file_type,
593
1
                atime,
594
1
                data_size,
595
1
                block_size,
596
1
                anchor_time,
597
1
                shared_context,
598
1
            )
599
1
            .await;
600
1
            if let Err(
err0
) = result {
  Branch (600:20): [True: 0, False: 0]
  Branch (600:20): [True: 0, False: 0]
  Branch (600:20): [True: 0, False: 0]
  Branch (600:20): [True: 0, False: 0]
  Branch (600:20): [True: 0, False: 0]
  Branch (600:20): [True: 0, False: 0]
  Branch (600:20): [True: 0, False: 1]
  Branch (600:20): [True: 0, False: 0]
  Branch (600:20): [Folded - Ignored]
  Branch (600:20): [True: 0, False: 0]
601
0
                event!(
602
0
                    Level::WARN,
603
                    ?file_name,
604
                    ?err,
605
0
                    "Failed to add file to eviction cache",
606
                );
607
                // Ignore result.
608
0
                let _ = fs::remove_file(format!("{path_root}/{file_name}")).await;
609
1
            }
610
        }
611
76
        Ok(())
612
76
    }
613
614
38
    move_old_cache(shared_context, rename_fn).await
?0
;
615
616
38
    add_files_to_cache(
617
38
        evicting_map,
618
38
        anchor_time,
619
38
        shared_context,
620
38
        block_size,
621
38
        DIGEST_FOLDER,
622
38
    )
623
38
    .await
?0
;
624
625
38
    add_files_to_cache(
626
38
        evicting_map,
627
38
        anchor_time,
628
38
        shared_context,
629
38
        block_size,
630
38
        STR_FOLDER,
631
38
    )
632
38
    .await
?0
;
633
38
    Ok(())
634
38
}
635
636
38
async fn prune_temp_path(temp_path: &str) -> Result<(), Error> {
637
76
    async fn prune_temp_inner(temp_path: &str, subpath: &str) -> Result<(), Error> {
638
76
        let (_permit, dir_handle) = fs::read_dir(format!("{temp_path}/{subpath}"))
639
76
            .await
640
76
            .err_tip(|| {
641
0
                "Failed opening temp directory to prune partial downloads in filesystem store"
642
76
            })
?0
643
76
            .into_inner();
644
76
645
76
        let mut read_dir_stream = ReadDirStream::new(dir_handle);
646
76
        while let Some(
dir_entry0
) = read_dir_stream.next().await {
  Branch (646:19): [True: 0, False: 0]
  Branch (646:19): [True: 0, False: 40]
  Branch (646:19): [True: 0, False: 4]
  Branch (646:19): [Folded - Ignored]
  Branch (646:19): [True: 0, False: 32]
647
0
            let path = dir_entry?.path();
648
0
            if let Err(err) = fs::remove_file(&path).await {
  Branch (648:20): [True: 0, False: 0]
  Branch (648:20): [True: 0, False: 0]
  Branch (648:20): [True: 0, False: 0]
  Branch (648:20): [Folded - Ignored]
  Branch (648:20): [True: 0, False: 0]
649
0
                event!(Level::WARN, ?path, ?err, "Failed to delete file",);
650
0
            }
651
        }
652
76
        Ok(())
653
76
    }
654
655
38
    prune_temp_inner(temp_path, STR_FOLDER).await
?0
;
656
38
    prune_temp_inner(temp_path, DIGEST_FOLDER).await
?0
;
657
38
    Ok(())
658
38
}
659
660
#[derive(MetricsComponent)]
661
pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
662
    #[metric]
663
    shared_context: Arc<SharedContext>,
664
    #[metric(group = "evicting_map")]
665
    evicting_map: Arc<EvictingMap<StoreKeyBorrow, Arc<Fe>, SystemTime>>,
666
    #[metric(help = "Block size of the configured filesystem")]
667
    block_size: u64,
668
    #[metric(help = "Size of the configured read buffer size")]
669
    read_buffer_size: usize,
670
    weak_self: Weak<Self>,
671
    sleep_fn: fn(Duration) -> Sleep,
672
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
673
}
674
675
impl<Fe: FileEntry> FilesystemStore<Fe> {
676
31
    pub async fn new(spec: &FilesystemSpec) -> Result<Arc<Self>, Error> {
677
69
        Self::new_with_timeout_and_rename_fn(spec, sleep, |from, to| std::fs::rename(from, to))
678
31
            .await
679
31
    }
680
681
38
    pub async fn new_with_timeout_and_rename_fn(
682
38
        spec: &FilesystemSpec,
683
38
        sleep_fn: fn(Duration) -> Sleep,
684
38
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
685
38
    ) -> Result<Arc<Self>, Error> {
686
76
        async fn create_subdirs(path: &str) -> Result<(), Error> {
687
76
            fs::create_dir_all(format!("{path}/{STR_FOLDER}"))
688
76
                .await
689
76
                .err_tip(|| 
format!("Failed to create directory {path}/{STR_FOLDER}")0
)
?0
;
690
76
            fs::create_dir_all(format!("{path}/{DIGEST_FOLDER}"))
691
76
                .await
692
76
                .err_tip(|| 
format!("Failed to create directory {path}/{DIGEST_FOLDER}")0
)
693
76
        }
694
695
38
        let now = SystemTime::now();
696
38
697
38
        let empty_policy = nativelink_config::stores::EvictionPolicy::default();
698
38
        let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy);
699
38
        let evicting_map = Arc::new(EvictingMap::new(eviction_policy, now));
700
38
701
38
        // Create temp and content directories and the s and d subdirectories.
702
38
703
38
        create_subdirs(&spec.temp_path).await
?0
;
704
38
        create_subdirs(&spec.content_path).await
?0
;
705
706
38
        let shared_context = Arc::new(SharedContext {
707
38
            active_drop_spawns: AtomicU64::new(0),
708
38
            temp_path: spec.temp_path.clone(),
709
38
            content_path: spec.content_path.clone(),
710
38
        });
711
712
38
        let block_size = if spec.block_size == 0 {
  Branch (712:29): [True: 0, False: 0]
  Branch (712:29): [True: 0, False: 1]
  Branch (712:29): [True: 0, False: 1]
  Branch (712:29): [True: 1, False: 0]
  Branch (712:29): [True: 0, False: 1]
  Branch (712:29): [True: 1, False: 0]
  Branch (712:29): [True: 14, False: 1]
  Branch (712:29): [True: 2, False: 0]
  Branch (712:29): [Folded - Ignored]
  Branch (712:29): [True: 16, False: 0]
713
34
            DEFAULT_BLOCK_SIZE
714
        } else {
715
4
            spec.block_size
716
        };
717
38
        add_files_to_cache(
718
38
            evicting_map.as_ref(),
719
38
            &now,
720
38
            &shared_context,
721
38
            block_size,
722
38
            rename_fn,
723
38
        )
724
38
        .await
?0
;
725
38
        prune_temp_path(&shared_context.temp_path).await
?0
;
726
727
38
        let read_buffer_size = if spec.read_buffer_size == 0 {
  Branch (727:35): [True: 0, False: 0]
  Branch (727:35): [True: 1, False: 0]
  Branch (727:35): [True: 0, False: 1]
  Branch (727:35): [True: 1, False: 0]
  Branch (727:35): [True: 0, False: 1]
  Branch (727:35): [True: 1, False: 0]
  Branch (727:35): [True: 6, False: 9]
  Branch (727:35): [True: 2, False: 0]
  Branch (727:35): [Folded - Ignored]
  Branch (727:35): [True: 16, False: 0]
728
27
            DEFAULT_BUFF_SIZE
729
        } else {
730
11
            spec.read_buffer_size as usize
731
        };
732
38
        Ok(Arc::new_cyclic(|weak_self| Self {
733
38
            shared_context,
734
38
            evicting_map,
735
38
            block_size,
736
38
            read_buffer_size,
737
38
            weak_self: weak_self.clone(),
738
38
            sleep_fn,
739
38
            rename_fn,
740
38
        }))
741
38
    }
742
743
15
    pub fn get_arc(&self) -> Option<Arc<Self>> {
744
15
        self.weak_self.upgrade()
745
15
    }
746
747
11
    pub async fn get_file_entry_for_digest(&self, digest: &DigestInfo) -> Result<Arc<Fe>, Error> {
748
11
        self.evicting_map
749
11
            .get::<StoreKey<'static>>(&digest.into())
750
11
            .await
751
11
            .ok_or_else(|| 
make_err!(Code::NotFound, "{digest} not found in filesystem store")0
)
752
11
    }
753
754
74
    async fn update_file<'a>(
755
74
        self: Pin<&'a Self>,
756
74
        mut entry: Fe,
757
74
        mut resumeable_temp_file: fs::ResumeableFileSlot,
758
74
        final_key: StoreKey<'static>,
759
74
        mut reader: DropCloserReadHalf,
760
74
    ) -> Result<(), Error> {
761
74
        let mut data_size = 0;
762
        loop {
763
132
            let Ok(data_result) = timeout(fs::idle_file_descriptor_timeout(), reader.recv()).await
  Branch (763:17): [True: 0, False: 0]
  Branch (763:17): [True: 4, False: 0]
  Branch (763:17): [True: 4, False: 0]
  Branch (763:17): [True: 4, False: 0]
  Branch (763:17): [True: 4, False: 0]
  Branch (763:17): [True: 2, False: 0]
  Branch (763:17): [True: 34, False: 0]
  Branch (763:17): [True: 0, False: 0]
  Branch (763:17): [Folded - Ignored]
  Branch (763:17): [True: 80, False: 0]
764
            else {
765
                // In the event we timeout, we want to close the writing file, to prevent
766
                // the file descriptor left open for long periods of time.
767
                // This is needed because we wrap `fs` so only a fixed number of file
768
                // descriptors may be open at any given time. If we are streaming from
769
                // File -> File, it can cause a deadlock if the Write file is not sending
770
                // data because it is waiting for a file descriotor to open before sending data.
771
0
                resumeable_temp_file.close_file().await.err_tip(|| {
772
0
                    "Could not close file due to timeout in FileSystemStore::update_file"
773
0
                })?;
774
0
                continue;
775
            };
776
132
            let mut data = data_result.err_tip(|| 
"Failed to receive data in filesystem store"0
)
?0
;
777
132
            let data_len = data.len();
778
132
            if data_len == 0 {
  Branch (778:16): [True: 0, False: 0]
  Branch (778:16): [True: 2, False: 2]
  Branch (778:16): [True: 2, False: 2]
  Branch (778:16): [True: 2, False: 2]
  Branch (778:16): [True: 2, False: 2]
  Branch (778:16): [True: 1, False: 1]
  Branch (778:16): [True: 18, False: 16]
  Branch (778:16): [True: 0, False: 0]
  Branch (778:16): [Folded - Ignored]
  Branch (778:16): [True: 47, False: 33]
779
74
                break; // EOF.
780
58
            }
781
58
            resumeable_temp_file
782
58
                .as_writer()
783
58
                .await
784
58
                .err_tip(|| 
"in filesystem_store::update_file"0
)
?0
785
58
                .write_all_buf(&mut data)
786
58
                .await
787
58
                .err_tip(|| 
"Failed to write data into filesystem store"0
)
?0
;
788
58
            data_size += data_len as u64;
789
        }
790
791
74
        resumeable_temp_file
792
74
            .as_writer()
793
74
            .await
794
74
            .err_tip(|| 
"in filesystem_store::update_file"0
)
?0
795
74
            .as_ref()
796
74
            .sync_all()
797
74
            .await
798
74
            .err_tip(|| 
"Failed to sync_data in filesystem store"0
)
?0
;
799
800
74
        drop(resumeable_temp_file);
801
74
802
74
        *entry.data_size_mut() = data_size;
803
74
        self.emplace_file(final_key, Arc::new(entry)).await
804
73
    }
805
806
79
    async fn emplace_file(&self, key: StoreKey<'static>, entry: Arc<Fe>) -> Result<(), Error> {
807
79
        // This sequence of events is quite ticky to understand due to the amount of triggers that
808
79
        // happen, async'ness of it and the locking. So here is a breakdown of what happens:
809
79
        // 1. Here will hold a write lock on any file operations of this FileEntry.
810
79
        // 2. Then insert the entry into the evicting map. This may trigger an eviction of other
811
79
        //    entries.
812
79
        // 3. Eviction triggers `unref()`, which grabs a write lock on the evicted FileEntrys
813
79
        //    during the rename.
814
79
        // 4. It should be impossible for items to be added while eviction is happening, so there
815
79
        //    should not be a deadlock possability. However, it is possible for the new FileEntry
816
79
        //    to be evicted before the file is moved into place. Eviction of the newly inserted
817
79
        //    item is not possible within the `insert()` call because the write lock inside the
818
79
        //    eviction map. If an eviction of new item happens after `insert()` but before
819
79
        //    `rename()` then we get to finish our operation because the `unref()` of the new item
820
79
        //    will be blocked on us because we currently have the lock.
821
79
        // 5. Move the file into place. Since we hold a write lock still anyone that gets our new
822
79
        //    FileEntry (which has not yet been placed on disk) will not be able to read the file's
823
79
        //    contents until we relese the lock.
824
79
        let evicting_map = self.evicting_map.clone();
825
79
        let rename_fn = self.rename_fn;
826
79
827
79
        // We need to guarantee that this will get to the end even if the parent future is dropped.
828
79
        // See: https://github.com/TraceMachina/nativelink/issues/495
829
79
        background_spawn!("filesystem_store_emplace_file", async move {
830
79
            let mut encoded_file_path = entry.get_encoded_file_path().write().await;
831
79
            let final_path = get_file_path_raw(
832
79
                &PathType::Content,
833
79
                encoded_file_path.shared_context.as_ref(),
834
79
                &key,
835
79
            );
836
79
837
79
            evicting_map
838
79
                .insert(key.borrow().into_owned().into(), entry.clone())
839
79
                .await;
840
841
79
            let from_path = encoded_file_path.get_file_path();
842
79
            // Internally tokio spawns fs commands onto a blocking thread anyways.
843
79
            // Since we are already on a blocking thread, we just need the `fs` wrapper to manage
844
79
            // an open-file permit (ensure we don't open too many files at once).
845
79
            let result = (rename_fn)(&from_path, &final_path)
846
79
                .err_tip(|| 
format!("Failed to rename temp file to final path {final_path:?}")1
);
847
848
            // In the event our move from temp file to final file fails we need to ensure we remove
849
            // the entry from our map.
850
            // Remember: At this point it is possible for another thread to have a reference to
851
            // `entry`, so we can't delete the file, only drop() should ever delete files.
852
79
            if let Err(
err1
) = result {
  Branch (852:20): [True: 0, False: 0]
  Branch (852:20): [True: 0, False: 2]
  Branch (852:20): [True: 0, False: 2]
  Branch (852:20): [True: 0, False: 2]
  Branch (852:20): [True: 0, False: 2]
  Branch (852:20): [True: 1, False: 0]
  Branch (852:20): [True: 0, False: 21]
  Branch (852:20): [True: 0, False: 0]
  Branch (852:20): [Folded - Ignored]
  Branch (852:20): [True: 0, False: 49]
853
1
                event!(
854
1
                    Level::ERROR,
855
                    ?err,
856
                    ?from_path,
857
                    ?final_path,
858
1
                    "Failed to rename file",
859
                );
860
                // Warning: To prevent deadlock we need to release our lock or during `remove_if()`
861
                // it will call `unref()`, which triggers a write-lock on `encoded_file_path`.
862
1
                drop(encoded_file_path);
863
1
                // It is possible that the item in our map is no longer the item we inserted,
864
1
                // So, we need to conditionally remove it only if the pointers are the same.
865
1
866
1
                evicting_map
867
1
                    .remove_if(&key, |map_entry| Arc::<Fe>::ptr_eq(map_entry, &entry))
868
1
                    .await;
869
1
                return Err(err);
870
78
            }
871
78
            encoded_file_path.path_type = PathType::Content;
872
78
            encoded_file_path.key = key;
873
78
            Ok(())
874
79
        })
875
79
        .await
876
78
        .err_tip(|| 
"Failed to create spawn in filesystem store update_file"0
)
?0
877
78
    }
878
}
879
880
#[async_trait]
881
impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
882
    async fn has_with_results(
883
        self: Pin<&Self>,
884
        keys: &[StoreKey<'_>],
885
        results: &mut [Option<u64>],
886
53
    ) -> Result<(), Error> {
887
53
        self.evicting_map
888
53
            .sizes_for_keys::<_, StoreKey<'_>, &StoreKey<'_>>(
889
53
                keys.iter(),
890
53
                results,
891
53
                false, /* peek */
892
53
            )
893
53
            .await;
894
        // We need to do a special pass to ensure our zero files exist.
895
        // If our results failed and the result was a zero file, we need to
896
        // create the file by spec.
897
53
        for (key, result) in keys.iter().zip(results.iter_mut()) {
898
53
            if result.is_some() || 
!is_zero_digest(key.borrow())18
{
  Branch (898:16): [True: 0, False: 0]
  Branch (898:36): [True: 0, False: 0]
  Branch (898:16): [True: 0, False: 0]
  Branch (898:36): [True: 0, False: 0]
  Branch (898:16): [True: 0, False: 0]
  Branch (898:36): [True: 0, False: 0]
  Branch (898:16): [True: 0, False: 0]
  Branch (898:36): [True: 0, False: 0]
  Branch (898:16): [True: 0, False: 0]
  Branch (898:36): [True: 0, False: 0]
  Branch (898:16): [True: 0, False: 1]
  Branch (898:36): [True: 1, False: 0]
  Branch (898:16): [True: 1, False: 4]
  Branch (898:36): [True: 2, False: 2]
  Branch (898:16): [True: 0, False: 0]
  Branch (898:36): [True: 0, False: 0]
  Branch (898:16): [Folded - Ignored]
  Branch (898:36): [Folded - Ignored]
  Branch (898:16): [True: 34, False: 13]
  Branch (898:36): [True: 13, False: 0]
899
51
                continue;
900
2
            }
901
2
            let (mut tx, rx) = make_buf_channel_pair();
902
2
            let send_eof_result = tx.send_eof();
903
2
            self.update(key.borrow(), rx, UploadSizeInfo::ExactSize(0))
904
2
                .await
905
2
                .err_tip(|| 
format!("Failed to create zero file for key {}", key.as_str())0
)
906
2
                .merge(
907
2
                    send_eof_result
908
2
                        .err_tip(|| 
"Failed to send zero file EOF in filesystem store has"0
),
909
2
                )
?0
;
910
911
2
            *result = Some(0);
912
        }
913
53
        Ok(())
914
106
    }
915
916
    async fn update(
917
        self: Pin<&Self>,
918
        key: StoreKey<'_>,
919
        reader: DropCloserReadHalf,
920
        _upload_size: UploadSizeInfo,
921
74
    ) -> Result<(), Error> {
922
74
        let temp_key = make_temp_key(&key);
923
74
        let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(
924
74
            self.block_size,
925
74
            EncodedFilePath {
926
74
                shared_context: self.shared_context.clone(),
927
74
                path_type: PathType::Temp,
928
74
                key: temp_key,
929
74
            },
930
74
        )
931
74
        .await
?0
;
932
933
74
        self.update_file(entry, temp_file, key.into_owned(), reader)
934
74
            .await
935
73
            .err_tip(|| 
format!("While processing with temp file {temp_full_path:?}")1
)
936
147
    }
937
938
44
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
939
44
        optimization == StoreOptimizations::FileUpdates
940
44
    }
941
942
    async fn update_with_whole_file(
943
        self: Pin<&Self>,
944
        key: StoreKey<'_>,
945
        mut file: fs::ResumeableFileSlot,
946
        upload_size: UploadSizeInfo,
947
5
    ) -> Result<Option<fs::ResumeableFileSlot>, Error> {
948
5
        let path = file.get_path().as_os_str().to_os_string();
949
5
        let file_size = match upload_size {
950
5
            UploadSizeInfo::ExactSize(size) => size,
951
0
            UploadSizeInfo::MaxSize(_) => file
952
0
                .as_reader()
953
0
                .await
954
0
                .err_tip(|| {
955
0
                    format!("While getting metadata for {path:?} in update_with_whole_file")
956
0
                })?
957
0
                .get_ref()
958
0
                .as_ref()
959
0
                .metadata()
960
0
                .await
961
0
                .err_tip(|| format!("While reading metadata for {path:?}"))?
962
0
                .len(),
963
        };
964
5
        let entry = Fe::create(
965
5
            file_size,
966
5
            self.block_size,
967
5
            RwLock::new(EncodedFilePath {
968
5
                shared_context: self.shared_context.clone(),
969
5
                path_type: PathType::Custom(path),
970
5
                key: key.borrow().into_owned(),
971
5
            }),
972
5
        );
973
5
        // We are done with the file, if we hold a reference to the file here, it could
974
5
        // result in a deadlock if `emplace_file()` also needs file descriptors.
975
5
        drop(file);
976
5
        self.emplace_file(key.into_owned(), Arc::new(entry))
977
5
            .await
978
5
            .err_tip(|| 
"Could not move file into store in upload_file_to_store, maybe dest is on different volume?"0
)
?0
;
979
5
        return Ok(None);
980
10
    }
981
982
    async fn get_part(
983
        self: Pin<&Self>,
984
        key: StoreKey<'_>,
985
        writer: &mut DropCloserWriteHalf,
986
        offset: u64,
987
        length: Option<u64>,
988
34
    ) -> Result<(), Error> {
989
34
        if is_zero_digest(key.borrow()) {
  Branch (989:12): [True: 0, False: 0]
  Branch (989:12): [True: 0, False: 0]
  Branch (989:12): [True: 0, False: 1]
  Branch (989:12): [True: 0, False: 0]
  Branch (989:12): [True: 0, False: 1]
  Branch (989:12): [True: 0, False: 0]
  Branch (989:12): [True: 1, False: 4]
  Branch (989:12): [True: 0, False: 0]
  Branch (989:12): [Folded - Ignored]
  Branch (989:12): [True: 7, False: 20]
990
8
            self.has(key.borrow())
991
8
                .await
992
8
                .err_tip(|| 
"Failed to check if zero digest exists in filesystem store"0
)
?0
;
993
8
            writer
994
8
                .send_eof()
995
8
                .err_tip(|| 
"Failed to send zero EOF in filesystem store get_part"0
)
?0
;
996
8
            return Ok(());
997
26
        }
998
999
26
        let entry = self.evicting_map.get(&key).await.ok_or_else(|| {
1000
0
            make_err!(
1001
0
                Code::NotFound,
1002
0
                "{} not found in filesystem store here",
1003
0
                key.as_str()
1004
0
            )
1005
26
        })
?0
;
1006
26
        let read_limit = length.unwrap_or(u64::MAX);
1007
26
        let mut resumeable_temp_file = entry.read_file_part(offset, read_limit).await
?0
;
1008
1009
        loop {
1010
1.09k
            let mut buf = BytesMut::with_capacity(self.read_buffer_size);
1011
1.09k
            resumeable_temp_file
1012
1.09k
                .as_reader()
1013
1.09k
                .await
1014
1.09k
                .err_tip(|| 
"In FileSystemStore::get_part()"0
)
?0
1015
1.09k
                .read_buf(&mut buf)
1016
1.09k
                .await
1017
1.09k
                .err_tip(|| 
"Failed to read data in filesystem store"0
)
?0
;
1018
1.09k
            if buf.is_empty() {
  Branch (1018:16): [True: 0, False: 0]
  Branch (1018:16): [True: 0, False: 0]
  Branch (1018:16): [True: 1, False: 10]
  Branch (1018:16): [True: 0, False: 0]
  Branch (1018:16): [True: 1, False: 10]
  Branch (1018:16): [True: 0, False: 0]
  Branch (1018:16): [True: 3, False: 1.02k]
  Branch (1018:16): [True: 0, False: 0]
  Branch (1018:16): [Folded - Ignored]
  Branch (1018:16): [True: 20, False: 20]
1019
25
                break; // EOF.
1020
1.06k
            }
1021
1.06k
            // In the event it takes a while to send the data to the client, we want to close the
1022
1.06k
            // reading file, to prevent the file descriptor left open for long periods of time.
1023
1.06k
            // Failing to do so might cause deadlocks if the receiver is unable to receive data
1024
1.06k
            // because it is waiting for a file descriptor to open before receiving data.
1025
1.06k
            // Using `ResumeableFileSlot` will re-open the file in the event it gets closed on the
1026
1.06k
            // next iteration.
1027
1.06k
            let buf_content = buf.freeze();
1028
            loop {
1029
1.06k
                let sleep_fn = (self.sleep_fn)(fs::idle_file_descriptor_timeout());
1030
1.06k
                tokio::pin!(sleep_fn);
1031
1.06k
                tokio::select! {
1032
1.06k
                    () = & mut (sleep_fn) => {
1033
0
                        resumeable_temp_file
1034
0
                            .close_file()
1035
0
                            .await
1036
0
                            .err_tip(|| "Could not close file due to timeout in FileSystemStore::get_part")?;
1037
0
                        continue;
1038
                    }
1039
1.06k
                    res = writer.send(buf_content.clone()) => {
1040
1.06k
                        match res {
1041
1.06k
                            Ok(()) => break,
1042
0
                            Err(err) => {
1043
0
                                return Err(err).err_tip(|| "Failed to send chunk in filesystem store get_part");
1044
                            }
1045
                        }
1046
                    }
1047
                }
1048
            }
1049
        }
1050
25
        writer
1051
25
            .send_eof()
1052
25
            .err_tip(|| 
"Filed to send EOF in filesystem store get_part"0
)
?0
;
1053
1054
25
        Ok(())
1055
67
    }
1056
1057
55
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
1058
55
        self
1059
55
    }
1060
1061
15
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
1062
15
        self
1063
15
    }
1064
1065
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
1066
0
        self
1067
0
    }
1068
1069
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
1070
0
        registry.register_indicator(self);
1071
0
    }
1072
}
1073
1074
#[async_trait]
1075
impl<Fe: FileEntry> HealthStatusIndicator for FilesystemStore<Fe> {
1076
0
    fn get_name(&self) -> &'static str {
1077
0
        "FilesystemStore"
1078
0
    }
1079
1080
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
1081
0
        StoreDriver::check_health(Pin::new(self), namespace).await
1082
0
    }
1083
}