Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/chunked_stream.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::VecDeque;
16
use std::ops::Bound;
17
use std::pin::Pin;
18
use std::task::{Context, Poll};
19
20
use futures::{Future, Stream};
21
use pin_project::pin_project;
22
23
0
#[pin_project(project = StreamStateProj)]
24
enum StreamState<Fut> {
25
    Future(#[pin] Fut),
26
    Next,
27
}
28
29
/// Takes a range of keys and a function that returns a future that yields
30
/// an iterator of key-value pairs. The stream will yield all key-value pairs
31
/// in the range, in order and buffered. A great use case is where you need
32
/// to implement Stream, but to access the underlying data requires a lock,
33
/// but API does not require the data to be in sync with data already received.
34
127
#[pin_project]
35
pub struct ChunkedStream<K, T, F, E, Fut>
36
where
37
    K: Ord,
38
    F: FnMut(Bound<K>, Bound<K>, VecDeque<T>) -> Fut,
39
    Fut: Future<Output = Result<Option<((Bound<K>, Bound<K>), VecDeque<T>)>, E>>,
40
{
41
    chunk_fn: F,
42
    buffer: VecDeque<T>,
43
    start_key: Option<Bound<K>>,
44
    end_key: Option<Bound<K>>,
45
    #[pin]
46
    stream_state: StreamState<Fut>,
47
}
48
49
impl<K, T, F, E, Fut> ChunkedStream<K, T, F, E, Fut>
50
where
51
    K: Ord,
52
    F: FnMut(Bound<K>, Bound<K>, VecDeque<T>) -> Fut,
53
    Fut: Future<Output = Result<Option<((Bound<K>, Bound<K>), VecDeque<T>)>, E>>,
54
{
55
87
    pub fn new(start_key: Bound<K>, end_key: Bound<K>, chunk_fn: F) -> Self {
56
87
        Self {
57
87
            chunk_fn,
58
87
            buffer: VecDeque::new(),
59
87
            start_key: Some(start_key),
60
87
            end_key: Some(end_key),
61
87
            stream_state: StreamState::Next,
62
87
        }
63
87
    }
64
}
65
66
impl<K, T, F, E, Fut> Stream for ChunkedStream<K, T, F, E, Fut>
67
where
68
    K: Ord,
69
    F: FnMut(Bound<K>, Bound<K>, VecDeque<T>) -> Fut,
70
    Fut: Future<Output = Result<Option<((Bound<K>, Bound<K>), VecDeque<T>)>, E>>,
71
{
72
    type Item = Result<T, E>;
73
74
127
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
75
127
        let mut this = self.project();
76
        loop {
77
288
            if let Some(
item40
) = this.buffer.pop_front() {
  Branch (77:20): [True: 0, False: 0]
  Branch (77:20): [True: 0, False: 0]
  Branch (77:20): [Folded - Ignored]
  Branch (77:20): [Folded - Ignored]
  Branch (77:20): [True: 0, False: 0]
  Branch (77:20): [True: 40, False: 248]
78
40
                return Poll::Ready(Some(Ok(item)));
79
248
            }
80
248
            match this.stream_state.as_mut().project() {
81
124
                StreamStateProj::Future(fut) => {
82
124
                    match 
futures::ready!0
(fut.poll(cx)) {
83
37
                        Ok(Some(((start, end), mut buffer))) => {
84
37
                            *this.start_key = Some(start);
85
37
                            *this.end_key = Some(end);
86
37
                            std::mem::swap(&mut buffer, this.buffer);
87
37
                        }
88
87
                        Ok(None) => return Poll::Ready(None), // End of stream.
89
0
                        Err(err) => return Poll::Ready(Some(Err(err))),
90
                    }
91
37
                    this.stream_state.set(StreamState::Next);
92
                    // Loop again.
93
                }
94
124
                StreamStateProj::Next => {
95
124
                    this.buffer.clear();
96
124
                    // This trick is used to recycle capacity.
97
124
                    let buffer = std::mem::take(this.buffer);
98
124
                    let start_key = this
99
124
                        .start_key
100
124
                        .take()
101
124
                        .expect("start_key should never be None");
102
124
                    let end_key = this.end_key.take().expect("end_key should never be None");
103
124
                    let fut = (this.chunk_fn)(start_key, end_key, buffer);
104
124
                    this.stream_state.set(StreamState::Future(fut));
105
124
                    // Loop again.
106
124
                }
107
            }
108
        }
109
127
    }
110
}