Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/s3_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::future::Future;
17
use std::pin::Pin;
18
use std::sync::Arc;
19
use std::task::{Context, Poll};
20
use std::time::Duration;
21
use std::{cmp, env};
22
23
use async_trait::async_trait;
24
use aws_config::default_provider::credentials;
25
use aws_config::{AppName, BehaviorVersion};
26
use aws_sdk_s3::config::Region;
27
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput;
28
use aws_sdk_s3::operation::get_object::GetObjectError;
29
use aws_sdk_s3::operation::head_object::HeadObjectError;
30
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
31
use aws_sdk_s3::types::builders::{CompletedMultipartUploadBuilder, CompletedPartBuilder};
32
use aws_sdk_s3::Client;
33
use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder;
34
use bytes::Bytes;
35
use futures::future::FusedFuture;
36
use futures::stream::{unfold, FuturesUnordered};
37
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
38
use http_body::{Frame, SizeHint};
39
use hyper::client::connect::{Connected, Connection, HttpConnector};
40
use hyper::service::Service;
41
use hyper::Uri;
42
use hyper_rustls::{HttpsConnector, MaybeHttpsStream};
43
use nativelink_config::stores::S3Spec;
44
// Note: S3 store should be very careful about the error codes it returns
45
// when in a retryable wrapper. Always prefer Code::Aborted or another
46
// retryable code over Code::InvalidArgument or make_input_err!().
47
// ie: Don't import make_input_err!() to help prevent this.
48
use nativelink_error::{make_err, Code, Error, ResultExt};
49
use nativelink_metric::MetricsComponent;
50
use nativelink_util::buf_channel::{
51
    make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf,
52
};
53
use nativelink_util::fs;
54
use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator};
55
use nativelink_util::instant_wrapper::InstantWrapper;
56
use nativelink_util::retry::{Retrier, RetryResult};
57
use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo};
58
use rand::rngs::OsRng;
59
use rand::Rng;
60
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
61
use tokio::net::TcpStream;
62
use tokio::sync::{mpsc, SemaphorePermit};
63
use tokio::time::sleep;
64
use tracing::{event, Level};
65
66
use crate::cas_utils::is_zero_digest;
67
68
// S3 parts cannot be smaller than this number. See:
69
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
70
const MIN_MULTIPART_SIZE: u64 = 5 * 1024 * 1024; // 5MB.
71
72
// S3 parts cannot be larger than this number. See:
73
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
74
const MAX_MULTIPART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB.
75
76
// S3 parts cannot be more than this number. See:
77
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
78
const MAX_UPLOAD_PARTS: usize = 10_000;
79
80
// Default max buffer size for retrying upload requests.
81
// Note: If you change this, adjust the docs in the config.
82
const DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST: usize = 5 * 1024 * 1024; // 5MB.
83
84
// Default limit for concurrent part uploads per multipart upload.
85
// Note: If you change this, adjust the docs in the config.
86
const DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS: usize = 10;
87
88
pub struct ConnectionWithPermit<T: Connection + AsyncRead + AsyncWrite + Unpin> {
89
    connection: T,
90
    _permit: SemaphorePermit<'static>,
91
}
92
93
impl<T: Connection + AsyncRead + AsyncWrite + Unpin> Connection for ConnectionWithPermit<T> {
94
0
    fn connected(&self) -> Connected {
95
0
        self.connection.connected()
96
0
    }
97
}
98
99
impl<T: Connection + AsyncRead + AsyncWrite + Unpin> AsyncRead for ConnectionWithPermit<T> {
100
    #[inline]
101
0
    fn poll_read(
102
0
        self: Pin<&mut Self>,
103
0
        cx: &mut Context,
104
0
        buf: &mut ReadBuf<'_>,
105
0
    ) -> Poll<Result<(), tokio::io::Error>> {
106
0
        Pin::new(&mut Pin::get_mut(self).connection).poll_read(cx, buf)
107
0
    }
108
}
109
110
impl<T: Connection + AsyncWrite + AsyncRead + Unpin> AsyncWrite for ConnectionWithPermit<T> {
111
    #[inline]
112
0
    fn poll_write(
113
0
        self: Pin<&mut Self>,
114
0
        cx: &mut Context<'_>,
115
0
        buf: &[u8],
116
0
    ) -> Poll<Result<usize, tokio::io::Error>> {
117
0
        Pin::new(&mut Pin::get_mut(self).connection).poll_write(cx, buf)
118
0
    }
119
120
    #[inline]
121
0
    fn poll_flush(
122
0
        self: Pin<&mut Self>,
123
0
        cx: &mut Context<'_>,
124
0
    ) -> Poll<Result<(), tokio::io::Error>> {
125
0
        Pin::new(&mut Pin::get_mut(self).connection).poll_flush(cx)
126
0
    }
127
128
    #[inline]
129
0
    fn poll_shutdown(
130
0
        self: Pin<&mut Self>,
131
0
        cx: &mut Context<'_>,
132
0
    ) -> Poll<Result<(), tokio::io::Error>> {
133
0
        Pin::new(&mut Pin::get_mut(self).connection).poll_shutdown(cx)
134
0
    }
135
}
136
137
#[derive(Clone)]
138
pub struct TlsConnector {
139
    connector: HttpsConnector<HttpConnector>,
140
    retrier: Retrier,
141
}
142
143
impl TlsConnector {
144
    #[must_use]
145
0
    pub fn new(spec: &S3Spec, jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>) -> Self {
146
0
        let connector_with_roots = hyper_rustls::HttpsConnectorBuilder::new().with_webpki_roots();
147
148
0
        let connector_with_schemes = if spec.insecure_allow_http {
  Branch (148:41): [True: 0, False: 0]
  Branch (148:41): [Folded - Ignored]
149
0
            connector_with_roots.https_or_http()
150
        } else {
151
0
            connector_with_roots.https_only()
152
        };
153
154
0
        let connector = if spec.disable_http2 {
  Branch (154:28): [True: 0, False: 0]
  Branch (154:28): [Folded - Ignored]
155
0
            connector_with_schemes.enable_http1().build()
156
        } else {
157
0
            connector_with_schemes.enable_http1().enable_http2().build()
158
        };
159
160
0
        Self {
161
0
            connector,
162
0
            retrier: Retrier::new(
163
0
                Arc::new(|duration| Box::pin(sleep(duration))),
164
0
                jitter_fn,
165
0
                spec.retry.clone(),
166
0
            ),
167
0
        }
168
0
    }
169
170
0
    async fn call_with_retry(
171
0
        &self,
172
0
        req: &Uri,
173
0
    ) -> Result<ConnectionWithPermit<MaybeHttpsStream<TcpStream>>, Error> {
174
0
        let retry_stream_fn = unfold(self.connector.clone(), move |mut connector| async move {
175
0
            let _permit = fs::get_permit().await.unwrap();
176
0
            match connector.call(req.clone()).await {
177
0
                Ok(connection) => Some((
178
0
                    RetryResult::Ok(ConnectionWithPermit {
179
0
                        connection,
180
0
                        _permit,
181
0
                    }),
182
0
                    connector,
183
0
                )),
184
0
                Err(e) => Some((
185
0
                    RetryResult::Retry(make_err!(
186
0
                        Code::Unavailable,
187
0
                        "Failed to call S3 connector: {e:?}"
188
0
                    )),
189
0
                    connector,
190
0
                )),
191
            }
192
0
        });
193
0
        self.retrier.retry(retry_stream_fn).await
194
0
    }
195
}
196
197
impl Service<Uri> for TlsConnector {
198
    type Response = ConnectionWithPermit<MaybeHttpsStream<TcpStream>>;
199
    type Error = Error;
200
    type Future =
201
        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
202
203
0
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
204
0
        self.connector
205
0
            .poll_ready(cx)
206
0
            .map_err(|e| make_err!(Code::Unavailable, "Failed poll in S3: {e}"))
207
0
    }
208
209
0
    fn call(&mut self, req: Uri) -> Self::Future {
210
0
        let connector_clone = self.clone();
211
0
        Box::pin(async move { connector_clone.call_with_retry(&req).await })
212
0
    }
213
}
214
215
pub struct BodyWrapper {
216
    reader: DropCloserReadHalf,
217
    size: u64,
218
}
219
220
impl http_body::Body for BodyWrapper {
221
    type Data = Bytes;
222
    type Error = std::io::Error;
223
224
76
    fn poll_frame(
225
76
        self: Pin<&mut Self>,
226
76
        cx: &mut Context<'_>,
227
76
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
228
76
        let reader = Pin::new(&mut Pin::get_mut(self).reader);
229
76
        reader
230
76
            .poll_next(cx)
231
76
            .map(|maybe_bytes_res| 
maybe_bytes_res.map(51
|res|
res.map(Frame::data)50
)51
)
232
76
    }
233
234
2
    fn size_hint(&self) -> SizeHint {
235
2
        SizeHint::with_exact(self.size)
236
2
    }
237
}
238
239
#[derive(MetricsComponent)]
240
pub struct S3Store<NowFn> {
241
    s3_client: Arc<Client>,
242
    now_fn: NowFn,
243
    #[metric(help = "The bucket name for the S3 store")]
244
    bucket: String,
245
    #[metric(help = "The key prefix for the S3 store")]
246
    key_prefix: String,
247
    retrier: Retrier,
248
    #[metric(help = "The number of seconds to consider an object expired")]
249
    consider_expired_after_s: i64,
250
    #[metric(help = "The number of bytes to buffer for retrying requests")]
251
    max_retry_buffer_per_request: usize,
252
    #[metric(help = "The number of concurrent uploads allowed for multipart uploads")]
253
    multipart_max_concurrent_uploads: usize,
254
}
255
256
impl<I, NowFn> S3Store<NowFn>
257
where
258
    I: InstantWrapper,
259
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
260
{
261
0
    pub async fn new(spec: &S3Spec, now_fn: NowFn) -> Result<Arc<Self>, Error> {
262
0
        let jitter_amt = spec.retry.jitter;
263
0
        let jitter_fn = Arc::new(move |delay: Duration| {
264
0
            if jitter_amt == 0. {
  Branch (264:16): [True: 0, False: 0]
  Branch (264:16): [Folded - Ignored]
265
0
                return delay;
266
0
            }
267
0
            let min = 1. - (jitter_amt / 2.);
268
0
            let max = 1. + (jitter_amt / 2.);
269
0
            delay.mul_f32(OsRng.gen_range(min..max))
270
0
        });
271
0
        let s3_client = {
272
0
            let http_client =
273
0
                HyperClientBuilder::new().build(TlsConnector::new(spec, jitter_fn.clone()));
274
0
            let credential_provider = credentials::default_provider().await;
275
0
            let mut config_builder = aws_config::defaults(BehaviorVersion::v2024_03_28())
276
0
                .credentials_provider(credential_provider)
277
0
                .app_name(AppName::new("nativelink").expect("valid app name"))
278
0
                .timeout_config(
279
0
                    aws_config::timeout::TimeoutConfig::builder()
280
0
                        .connect_timeout(Duration::from_secs(15))
281
0
                        .build(),
282
0
                )
283
0
                .region(Region::new(Cow::Owned(spec.region.clone())))
284
0
                .http_client(http_client);
285
            // TODO(allada) When aws-sdk supports this env variable we should be able
286
            // to remove this.
287
            // See: https://github.com/awslabs/aws-sdk-rust/issues/932
288
0
            if let Ok(endpoint_url) = env::var("AWS_ENDPOINT_URL") {
  Branch (288:20): [True: 0, False: 0]
  Branch (288:20): [Folded - Ignored]
289
0
                config_builder = config_builder.endpoint_url(endpoint_url);
290
0
            }
291
0
            aws_sdk_s3::Client::new(&config_builder.load().await)
292
        };
293
0
        Self::new_with_client_and_jitter(spec, s3_client, jitter_fn, now_fn)
294
0
    }
295
296
12
    pub fn new_with_client_and_jitter(
297
12
        spec: &S3Spec,
298
12
        s3_client: Client,
299
12
        jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>,
300
12
        now_fn: NowFn,
301
12
    ) -> Result<Arc<Self>, Error> {
302
12
        Ok(Arc::new(Self {
303
12
            s3_client: Arc::new(s3_client),
304
12
            now_fn,
305
12
            bucket: spec.bucket.to_string(),
306
12
            key_prefix: spec.key_prefix.as_ref().unwrap_or(&String::new()).clone(),
307
12
            retrier: Retrier::new(
308
12
                Arc::new(|duration| 
Box::pin(sleep(duration))6
),
309
12
                jitter_fn,
310
12
                spec.retry.clone(),
311
12
            ),
312
12
            consider_expired_after_s: i64::from(spec.consider_expired_after_s),
313
12
            max_retry_buffer_per_request: spec
314
12
                .max_retry_buffer_per_request
315
12
                .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST),
316
12
            multipart_max_concurrent_uploads: spec
317
12
                .multipart_max_concurrent_uploads
318
12
                .map_or(DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS, |v| 
v0
),
319
12
        }))
320
12
    }
321
322
14
    fn make_s3_path(&self, key: &StoreKey<'_>) -> String {
323
14
        format!("{}{}", self.key_prefix, key.as_str(),)
324
14
    }
325
326
5
    async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result<Option<u64>, Error> {
327
5
        self.retrier
328
8
            .retry(unfold((), move |state| async move {
329
8
                let result = self
330
8
                    .s3_client
331
8
                    .head_object()
332
8
                    .bucket(&self.bucket)
333
8
                    .key(self.make_s3_path(&digest.borrow()))
334
8
                    .send()
335
8
                    .await;
336
337
8
                match result {
338
4
                    Ok(head_object_output) => {
339
4
                        if self.consider_expired_after_s != 0 {
  Branch (339:28): [True: 0, False: 0]
  Branch (339:28): [Folded - Ignored]
  Branch (339:28): [True: 2, False: 2]
340
2
                            if let Some(last_modified) = head_object_output.last_modified {
  Branch (340:36): [True: 0, False: 0]
  Branch (340:36): [Folded - Ignored]
  Branch (340:36): [True: 2, False: 0]
341
2
                                let now_s = (self.now_fn)().unix_timestamp() as i64;
342
2
                                if last_modified.secs() + self.consider_expired_after_s <= now_s {
  Branch (342:36): [True: 0, False: 0]
  Branch (342:36): [Folded - Ignored]
  Branch (342:36): [True: 1, False: 1]
343
1
                                    return Some((RetryResult::Ok(None), state));
344
1
                                }
345
0
                            }
346
2
                        }
347
3
                        let Some(length) = head_object_output.content_length else {
  Branch (347:29): [True: 0, False: 0]
  Branch (347:29): [Folded - Ignored]
  Branch (347:29): [True: 3, False: 0]
348
0
                            return Some((RetryResult::Ok(None), state));
349
                        };
350
3
                        if length >= 0 {
  Branch (350:28): [True: 0, False: 0]
  Branch (350:28): [Folded - Ignored]
  Branch (350:28): [True: 3, False: 0]
351
3
                            return Some((RetryResult::Ok(Some(length as u64)), state));
352
0
                        }
353
0
                        Some((
354
0
                            RetryResult::Err(make_err!(
355
0
                                Code::InvalidArgument,
356
0
                                "Negative content length in S3: {length:?}",
357
0
                            )),
358
0
                            state,
359
0
                        ))
360
                    }
361
4
                    Err(sdk_error) => match sdk_error.into_service_error() {
362
1
                        HeadObjectError::NotFound(_) => Some((RetryResult::Ok(None), state)),
363
3
                        other => Some((
364
3
                            RetryResult::Retry(make_err!(
365
3
                                Code::Unavailable,
366
3
                                "Unhandled HeadObjectError in S3: {other:?}"
367
3
                            )),
368
3
                            state,
369
3
                        )),
370
                    },
371
                }
372
16
            }))
373
5
            .await
374
5
    }
375
}
376
377
#[async_trait]
378
impl<I, NowFn> StoreDriver for S3Store<NowFn>
379
where
380
    I: InstantWrapper,
381
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
382
{
383
    async fn has_with_results(
384
        self: Pin<&Self>,
385
        keys: &[StoreKey<'_>],
386
        results: &mut [Option<u64>],
387
6
    ) -> Result<(), Error> {
388
6
        keys.iter()
389
6
            .zip(results.iter_mut())
390
6
            .map(|(key, result)| async move {
391
6
                // We need to do a special pass to ensure our zero key exist.
392
6
                if is_zero_digest(key.borrow()) {
  Branch (392:20): [True: 0, False: 0]
  Branch (392:20): [Folded - Ignored]
  Branch (392:20): [True: 1, False: 5]
393
1
                    *result = Some(0);
394
1
                    return Ok::<_, Error>(());
395
5
                }
396
5
                *result = self.has(key).await
?0
;
397
5
                Ok::<_, Error>(())
398
12
            })
399
6
            .collect::<FuturesUnordered<_>>()
400
6
            .try_collect()
401
6
            .await
402
12
    }
403
404
    async fn update(
405
        self: Pin<&Self>,
406
        digest: StoreKey<'_>,
407
        mut reader: DropCloserReadHalf,
408
        upload_size: UploadSizeInfo,
409
2
    ) -> Result<(), Error> {
410
2
        let s3_path = &self.make_s3_path(&digest.borrow());
411
412
2
        let max_size = match upload_size {
413
2
            UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(
sz0
) => sz,
414
2
        };
415
2
416
2
        // Note(allada) It might be more optimal to use a different
417
2
        // heuristic here, but for simplicity we use a hard coded value.
418
2
        // Anything going down this if-statement will have the advantage of only
419
2
        // 1 network request for the upload instead of minimum of 3 required for
420
2
        // multipart upload requests.
421
2
        //
422
2
        // Note(allada) If the upload size is not known, we go down the multipart upload path.
423
2
        // This is not very efficient, but it greatly reduces the complexity of the code.
424
2
        if max_size < MIN_MULTIPART_SIZE && 
matches!0
(
upload_size1
, UploadSizeInfo::ExactSize(_)) {
  Branch (424:12): [True: 0, False: 0]
  Branch (424:12): [Folded - Ignored]
  Branch (424:12): [True: 1, False: 1]
425
1
            let UploadSizeInfo::ExactSize(sz) = upload_size else {
  Branch (425:17): [True: 0, False: 0]
  Branch (425:17): [Folded - Ignored]
  Branch (425:17): [True: 1, False: 0]
426
0
                unreachable!("upload_size must be UploadSizeInfo::ExactSize here");
427
            };
428
1
            reader.set_max_recent_data_size(
429
1
                u64::try_from(self.max_retry_buffer_per_request)
430
1
                    .err_tip(|| 
"Could not convert max_retry_buffer_per_request to u64"0
)
?0
,
431
            );
432
1
            return self
433
1
                .retrier
434
1
                .retry(unfold(reader, move |mut reader| async move {
435
1
                    // We need to make a new pair here because the aws sdk does not give us
436
1
                    // back the body after we send it in order to retry.
437
1
                    let (mut tx, rx) = make_buf_channel_pair();
438
439
                    // Upload the data to the S3 backend.
440
1
                    let result = {
441
1
                        let reader_ref = &mut reader;
442
1
                        let (upload_res, bind_res) = tokio::join!(
443
1
                            self.s3_client
444
1
                                .put_object()
445
1
                                .bucket(&self.bucket)
446
1
                                .key(s3_path.clone())
447
1
                                .content_length(sz as i64)
448
1
                                .body(ByteStream::from_body_1_x(BodyWrapper {
449
1
                                    reader: rx,
450
1
                                    size: sz,
451
1
                                }))
452
1
                                .send()
453
1
                                .map_ok_or_else(|e| 
Err(make_err!(Code::Aborted, "{e:?}"))0
, |_| Ok(())),
454
1
                            // Stream all data from the reader channel to the writer channel.
455
1
                            tx.bind_buffered(reader_ref)
456
1
                        );
457
1
                        upload_res
458
1
                            .merge(bind_res)
459
1
                            .err_tip(|| 
"Failed to upload file to s3 in single chunk"0
)
460
1
                    };
461
1
462
1
                    // If we failed to upload the file, check to see if we can retry.
463
1
                    let retry_result = result.map_or_else(|mut err| {
464
0
                        // Ensure our code is Code::Aborted, so the client can retry if possible.
465
0
                        err.code = Code::Aborted;
466
0
                        let bytes_received = reader.get_bytes_received();
467
0
                        if let Err(try_reset_err) = reader.try_reset_stream() {
  Branch (467:32): [True: 0, False: 0]
  Branch (467:32): [Folded - Ignored]
  Branch (467:32): [True: 0, False: 0]
468
0
                            event!(
469
0
                                Level::ERROR,
470
                                ?bytes_received,
471
                                err = ?try_reset_err,
472
0
                                "Unable to reset stream after failed upload in S3Store::update"
473
                            );
474
0
                            return RetryResult::Err(err
475
0
                                .merge(try_reset_err)
476
0
                                .append(format!("Failed to retry upload with {bytes_received} bytes received in S3Store::update")));
477
0
                        }
478
0
                        let err = err.append(format!("Retry on upload happened with {bytes_received} bytes received in S3Store::update"));
479
0
                        event!(
480
0
                            Level::INFO,
481
                            ?err,
482
                            ?bytes_received,
483
0
                            "Retryable S3 error"
484
                        );
485
0
                        RetryResult::Retry(err)
486
1
                    
}0
, |()| RetryResult::Ok(()));
487
1
                    Some((retry_result, reader))
488
1
                }))
489
1
                .await;
490
1
        }
491
492
1
        let upload_id = &self
493
1
            .retrier
494
1
            .retry(unfold((), move |()| async move {
495
1
                let retry_result = self
496
1
                    .s3_client
497
1
                    .create_multipart_upload()
498
1
                    .bucket(&self.bucket)
499
1
                    .key(s3_path)
500
1
                    .send()
501
1
                    .await
502
1
                    .map_or_else(
503
1
                        |e| {
504
0
                            RetryResult::Retry(make_err!(
505
0
                                Code::Aborted,
506
0
                                "Failed to create multipart upload to s3: {e:?}"
507
0
                            ))
508
1
                        },
509
1
                        |CreateMultipartUploadOutput { upload_id, .. }| {
510
1
                            upload_id.map_or_else(
511
1
                                || {
512
0
                                    RetryResult::Err(make_err!(
513
0
                                        Code::Internal,
514
0
                                        "Expected upload_id to be set by s3 response"
515
0
                                    ))
516
1
                                },
517
1
                                RetryResult::Ok,
518
1
                            )
519
1
                        },
520
1
                    );
521
1
                Some((retry_result, ()))
522
1
            }))
523
1
            .await
?0
;
524
525
        // S3 requires us to upload in parts if the size is greater than 5GB. The part size must be at least
526
        // 5mb (except last part) and can have up to 10,000 parts.
527
1
        let bytes_per_upload_part =
528
1
            (max_size / (MIN_MULTIPART_SIZE - 1)).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE);
529
1
530
1
        let upload_parts = move || async move {
531
1
            // This will ensure we only have `multipart_max_concurrent_uploads` * `bytes_per_upload_part`
532
1
            // bytes in memory at any given time waiting to be uploaded.
533
1
            let (tx, mut rx) = mpsc::channel(self.multipart_max_concurrent_uploads);
534
1
535
1
            let read_stream_fut = async move {
536
1
                let retrier = &Pin::get_ref(self).retrier;
537
                // Note: Our break condition is when we reach EOF.
538
4
                for part_number in 1..i32::MAX {
539
4
                    let write_buf = reader
540
4
                        .consume(Some(usize::try_from(bytes_per_upload_part).err_tip(|| 
"Could not convert bytes_per_upload_part to usize"0
)
?0
))
541
4
                        .await
542
4
                        .err_tip(|| 
"Failed to read chunk in s3_store"0
)
?0
;
543
4
                    if write_buf.is_empty() {
  Branch (543:24): [True: 0, False: 0]
  Branch (543:24): [Folded - Ignored]
  Branch (543:24): [True: 1, False: 3]
544
1
                        break; // Reached EOF.
545
3
                    }
546
3
547
3
                    tx.send(retrier.retry(unfold(
548
3
                        write_buf,
549
3
                        move |write_buf| {
550
3
                            async move {
551
3
                                let retry_result = self
552
3
                                    .s3_client
553
3
                                    .upload_part()
554
3
                                    .bucket(&self.bucket)
555
3
                                    .key(s3_path)
556
3
                                    .upload_id(upload_id)
557
3
                                    .body(ByteStream::new(SdkBody::from(write_buf.clone())))
558
3
                                    .part_number(part_number)
559
3
                                    .send()
560
3
                                    .await
561
3
                                    .map_or_else(
562
3
                                        |e| {
563
0
                                            RetryResult::Retry(make_err!(
564
0
                                                Code::Aborted,
565
0
                                                "Failed to upload part {part_number} in S3 store: {e:?}"
566
0
                                            ))
567
3
                                        },
568
3
                                        |mut response| {
569
3
                                            RetryResult::Ok(
570
3
                                                CompletedPartBuilder::default()
571
3
                                                    // Only set an entity tag if it exists. This saves
572
3
                                                    // 13 bytes per part on the final request if it can
573
3
                                                    // omit the `<ETAG><ETAG/>` string.
574
3
                                                    .set_e_tag(response.e_tag.take())
575
3
                                                    .part_number(part_number)
576
3
                                                    .build(),
577
3
                                            )
578
3
                                        },
579
3
                                    );
580
3
                                Some((retry_result, write_buf))
581
3
                            }
582
3
                        }
583
3
                    ))).await.map_err(|_| 
make_err!(Code::Internal, "Failed to send part to channel in s3_store")0
)
?0
;
584
                }
585
1
                Result::<_, Error>::Ok(())
586
1
            }.fuse();
587
1
588
1
            let mut upload_futures = FuturesUnordered::new();
589
590
1
            let mut completed_parts = Vec::with_capacity(
591
1
                usize::try_from(cmp::min(
592
1
                    MAX_UPLOAD_PARTS as u64,
593
1
                    (max_size / bytes_per_upload_part) + 1,
594
1
                ))
595
1
                .err_tip(|| 
"Could not convert u64 to usize"0
)
?0
,
596
            );
597
1
            tokio::pin!(read_stream_fut);
598
            loop {
599
8
                if read_stream_fut.is_terminated() && 
rx.is_empty()7
&&
upload_futures.is_empty()2
{
  Branch (599:20): [True: 0, False: 0]
  Branch (599:55): [True: 0, False: 0]
  Branch (599:72): [True: 0, False: 0]
  Branch (599:20): [Folded - Ignored]
  Branch (599:55): [Folded - Ignored]
  Branch (599:72): [Folded - Ignored]
  Branch (599:20): [True: 7, False: 1]
  Branch (599:55): [True: 2, False: 5]
  Branch (599:72): [True: 1, False: 1]
600
1
                    break; // No more data to process.
601
7
                }
602
7
                tokio::select! {
603
7
                    
result1
= &mut read_stream_fut =>
result1
?0
, // Return error or wait for other futures.
604
7
                    Some(
upload_result3
) = upload_futures.next() =>
completed_parts.push(upload_result3
?0
),
605
7
                    Some(
fut3
) = rx.recv() =>
upload_futures.push(fut)3
,
606
                }
607
            }
608
609
            // Even though the spec does not require parts to be sorted by number, we do it just in case
610
            // there's an S3 implementation that requires it.
611
4
            
completed_parts.sort_unstable_by_key(1
|part| part.part_number);
612
1
613
1
            self.retrier
614
1
                .retry(unfold(completed_parts, move |completed_parts| async move {
615
1
                    Some((
616
1
                        self.s3_client
617
1
                            .complete_multipart_upload()
618
1
                            .bucket(&self.bucket)
619
1
                            .key(s3_path)
620
1
                            .multipart_upload(
621
1
                                CompletedMultipartUploadBuilder::default()
622
1
                                    .set_parts(Some(completed_parts.clone()))
623
1
                                    .build(),
624
1
                            )
625
1
                            .upload_id(upload_id)
626
1
                            .send()
627
1
                            .await
628
1
                            .map_or_else(
629
1
                                |e| {
630
0
                                    RetryResult::Retry(make_err!(
631
0
                                        Code::Aborted,
632
0
                                        "Failed to complete multipart upload in S3 store: {e:?}"
633
0
                                    ))
634
1
                                },
635
1
                                |_| RetryResult::Ok(()),
636
1
                            ),
637
1
                        completed_parts,
638
1
                    ))
639
1
                }))
640
1
                .await
641
2
        };
642
        // Upload our parts and complete the multipart upload.
643
        // If we fail attempt to abort the multipart upload (cleanup).
644
1
        upload_parts()
645
1
            .or_else(move |e| async move {
646
0
                Result::<(), _>::Err(e).merge(
647
0
                    // Note: We don't retry here because this is just a best attempt.
648
0
                    self.s3_client
649
0
                        .abort_multipart_upload()
650
0
                        .bucket(&self.bucket)
651
0
                        .key(s3_path)
652
0
                        .upload_id(upload_id)
653
0
                        .send()
654
0
                        .await
655
0
                        .map_or_else(
656
0
                            |e| {
657
0
                                let err = make_err!(
658
0
                                    Code::Aborted,
659
0
                                    "Failed to abort multipart upload in S3 store : {e:?}"
660
0
                                );
661
0
                                event!(Level::INFO, ?err, "Multipart upload error");
662
0
                                Err(err)
663
0
                            },
664
0
                            |_| Ok(()),
665
0
                        ),
666
0
                )
667
1
            
}0
)
668
1
            .await
669
4
    }
670
671
    async fn get_part(
672
        self: Pin<&Self>,
673
        key: StoreKey<'_>,
674
        writer: &mut DropCloserWriteHalf,
675
        offset: u64,
676
        length: Option<u64>,
677
5
    ) -> Result<(), Error> {
678
5
        if is_zero_digest(key.borrow()) {
  Branch (678:12): [True: 0, False: 0]
  Branch (678:12): [Folded - Ignored]
  Branch (678:12): [True: 1, False: 4]
679
1
            writer
680
1
                .send_eof()
681
1
                .err_tip(|| 
"Failed to send zero EOF in filesystem store get_part"0
)
?0
;
682
1
            return Ok(());
683
4
        }
684
4
685
4
        let s3_path = &self.make_s3_path(&key);
686
4
        let end_read_byte = length
687
4
            .map_or(Some(None), |length| 
Some(offset.checked_add(length))2
)
688
4
            .err_tip(|| 
"Integer overflow protection triggered"0
)
?0
;
689
690
4
        self.retrier
691
7
            .retry(unfold(writer, move |writer| async move {
692
7
                let result = self
693
7
                    .s3_client
694
7
                    .get_object()
695
7
                    .bucket(&self.bucket)
696
7
                    .key(s3_path)
697
7
                    .range(format!(
698
7
                        "bytes={}-{}",
699
7
                        offset + writer.get_bytes_written(),
700
7
                        end_read_byte.map_or_else(String::new, |v| 
v.to_string()2
)
701
7
                    ))
702
7
                    .send()
703
7
                    .await;
704
705
7
                let 
mut s3_in_stream4
= match result {
706
4
                    Ok(head_object_output) => head_object_output.body,
707
3
                    Err(sdk_error) => match sdk_error.into_service_error() {
708
0
                        GetObjectError::NoSuchKey(e) => {
709
0
                            return Some((
710
0
                                RetryResult::Err(make_err!(
711
0
                                    Code::NotFound,
712
0
                                    "No such key in S3: {e}"
713
0
                                )),
714
0
                                writer,
715
0
                            ));
716
                        }
717
3
                        other => {
718
3
                            return Some((
719
3
                                RetryResult::Retry(make_err!(
720
3
                                    Code::Unavailable,
721
3
                                    "Unhandled GetObjectError in S3: {other:?}",
722
3
                                )),
723
3
                                writer,
724
3
                            ));
725
                        }
726
                    },
727
                };
728
729
                // Copy data from s3 input stream to the writer stream.
730
8
                while let Some(
maybe_bytes4
) = s3_in_stream.next().await {
  Branch (730:27): [True: 0, False: 0]
  Branch (730:27): [Folded - Ignored]
  Branch (730:27): [True: 4, False: 4]
731
4
                    match maybe_bytes {
732
4
                        Ok(bytes) => {
733
4
                            if bytes.is_empty() {
  Branch (733:32): [True: 0, False: 0]
  Branch (733:32): [Folded - Ignored]
  Branch (733:32): [True: 1, False: 3]
734
                                // Ignore possible EOF. Different implimentations of S3 may or may not
735
                                // send EOF this way.
736
1
                                continue;
737
3
                            }
738
3
                            if let Err(
e0
) = writer.send(bytes).await {
  Branch (738:36): [True: 0, False: 0]
  Branch (738:36): [Folded - Ignored]
  Branch (738:36): [True: 0, False: 3]
739
0
                                return Some((
740
0
                                    RetryResult::Err(make_err!(
741
0
                                        Code::Aborted,
742
0
                                        "Error sending bytes to consumer in S3: {e}"
743
0
                                    )),
744
0
                                    writer,
745
0
                                ));
746
3
                            }
747
                        }
748
0
                        Err(e) => {
749
0
                            return Some((
750
0
                                RetryResult::Retry(make_err!(
751
0
                                    Code::Aborted,
752
0
                                    "Bad bytestream element in S3: {e}"
753
0
                                )),
754
0
                                writer,
755
0
                            ));
756
                        }
757
                    }
758
                }
759
4
                if let Err(
e0
) = writer.send_eof() {
  Branch (759:24): [True: 0, False: 0]
  Branch (759:24): [Folded - Ignored]
  Branch (759:24): [True: 0, False: 4]
760
0
                    return Some((
761
0
                        RetryResult::Err(make_err!(
762
0
                            Code::Aborted,
763
0
                            "Failed to send EOF to consumer in S3: {e}"
764
0
                        )),
765
0
                        writer,
766
0
                    ));
767
4
                }
768
4
                Some((RetryResult::Ok(()), writer))
769
14
            }))
770
4
            .await
771
10
    }
772
773
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &'_ dyn StoreDriver {
774
0
        self
775
0
    }
776
777
0
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
778
0
        self
779
0
    }
780
781
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
782
0
        self
783
0
    }
784
}
785
786
#[async_trait]
787
impl<I, NowFn> HealthStatusIndicator for S3Store<NowFn>
788
where
789
    I: InstantWrapper,
790
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
791
{
792
0
    fn get_name(&self) -> &'static str {
793
0
        "S3Store"
794
0
    }
795
796
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
797
0
        StoreDriver::check_health(Pin::new(self), namespace).await
798
0
    }
799
}