Coverage Report

Created: 2024-12-20 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/src/bin/nativelink.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::{HashMap, HashSet};
16
use std::net::SocketAddr;
17
use std::sync::Arc;
18
use std::time::{Duration, SystemTime, UNIX_EPOCH};
19
20
use async_lock::Mutex as AsyncMutex;
21
use axum::Router;
22
use clap::Parser;
23
use futures::future::{try_join_all, BoxFuture, Either, OptionFuture, TryFutureExt};
24
use futures::FutureExt;
25
use hyper::{Response, StatusCode};
26
use hyper_util::rt::tokio::TokioIo;
27
use hyper_util::server::conn::auto;
28
use hyper_util::service::TowerToHyperService;
29
use mimalloc::MiMalloc;
30
use nativelink_config::cas_server::{
31
    CasConfig, GlobalConfig, HttpCompressionAlgorithm, ListenerConfig, ServerConfig, WorkerConfig,
32
};
33
use nativelink_config::stores::ConfigDigestHashFunction;
34
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
35
use nativelink_metric::{
36
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, RootMetricsComponent,
37
};
38
use nativelink_metric_collector::{otel_export, MetricsCollectorLayer};
39
use nativelink_scheduler::default_scheduler_factory::scheduler_factory;
40
use nativelink_service::ac_server::AcServer;
41
use nativelink_service::bep_server::BepServer;
42
use nativelink_service::bytestream_server::ByteStreamServer;
43
use nativelink_service::capabilities_server::CapabilitiesServer;
44
use nativelink_service::cas_server::CasServer;
45
use nativelink_service::execution_server::ExecutionServer;
46
use nativelink_service::health_server::HealthServer;
47
use nativelink_service::worker_api_server::WorkerApiServer;
48
use nativelink_store::default_store_factory::store_factory;
49
use nativelink_store::store_manager::StoreManager;
50
use nativelink_util::action_messages::WorkerId;
51
use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit};
52
use nativelink_util::digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc};
53
use nativelink_util::health_utils::HealthRegistryBuilder;
54
use nativelink_util::metrics_utils::{set_metrics_enabled_for_this_thread, Counter};
55
use nativelink_util::operation_state_manager::ClientStateManager;
56
use nativelink_util::origin_context::{ActiveOriginContext, OriginContext};
57
use nativelink_util::origin_event_middleware::OriginEventMiddlewareLayer;
58
use nativelink_util::origin_event_publisher::OriginEventPublisher;
59
use nativelink_util::shutdown_guard::{Priority, ShutdownGuard};
60
use nativelink_util::store_trait::{
61
    set_default_digest_size_health_check, DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG,
62
};
63
use nativelink_util::task::TaskExecutor;
64
use nativelink_util::{background_spawn, init_tracing, spawn, spawn_blocking};
65
use nativelink_worker::local_worker::new_local_worker;
66
use opentelemetry::metrics::MeterProvider;
67
use opentelemetry_sdk::metrics::SdkMeterProvider;
68
use parking_lot::{Mutex, RwLock};
69
use prometheus::{Encoder, TextEncoder};
70
use rustls_pemfile::{certs as extract_certs, crls as extract_crls};
71
use scopeguard::guard;
72
use tokio::net::TcpListener;
73
use tokio::select;
74
#[cfg(target_family = "unix")]
75
use tokio::signal::unix::{signal, SignalKind};
76
use tokio::sync::{broadcast, mpsc};
77
use tokio_rustls::rustls::pki_types::{CertificateDer, CertificateRevocationListDer};
78
use tokio_rustls::rustls::server::WebPkiClientVerifier;
79
use tokio_rustls::rustls::{RootCertStore, ServerConfig as TlsServerConfig};
80
use tokio_rustls::TlsAcceptor;
81
use tonic::codec::CompressionEncoding;
82
use tonic::transport::Server as TonicServer;
83
use tracing::{error_span, event, trace_span, Level};
84
use tracing_subscriber::layer::SubscriberExt;
85
86
#[global_allocator]
87
static GLOBAL: MiMalloc = MiMalloc;
88
89
/// Note: This must be kept in sync with the documentation in `PrometheusConfig::path`.
90
const DEFAULT_PROMETHEUS_METRICS_PATH: &str = "/metrics";
91
92
/// Note: This must be kept in sync with the documentation in `AdminConfig::path`.
93
const DEFAULT_ADMIN_API_PATH: &str = "/admin";
94
95
// Note: This must be kept in sync with the documentation in `HealthConfig::path`.
96
const DEFAULT_HEALTH_STATUS_CHECK_PATH: &str = "/status";
97
98
/// Name of environment variable to disable metrics.
99
const METRICS_DISABLE_ENV: &str = "NATIVELINK_DISABLE_METRICS";
100
101
// Note: This must be kept in sync with the documentation in
102
// `OriginEventsConfig::max_event_queue_size`.
103
const DEFAULT_MAX_QUEUE_EVENTS: usize = 65536;
104
105
/// Broadcast Channel Capacity
106
/// Note: The actual capacity may be greater than the provided capacity.
107
const BROADCAST_CAPACITY: usize = 1;
108
109
/// Backend for bazel remote execution / cache API.
110
#[derive(Parser, Debug)]
111
#[clap(
112
    author = "Trace Machina, Inc. <nativelink@tracemachina.com>",
113
    version,
114
    about,
115
    long_about = None
116
)]
117
struct Args {
118
    /// Config file to use.
119
    #[clap(value_parser)]
120
0
    config_file: String,
121
}
122
123
/// The root metrics collector struct. All metrics will be
124
/// collected from this struct traversing down each child
125
/// component.
126
#[derive(MetricsComponent)]
127
struct RootMetrics {
128
    #[metric(group = "stores")]
129
    stores: Arc<dyn RootMetricsComponent>,
130
    #[metric(group = "servers")]
131
    servers: HashMap<String, Arc<dyn RootMetricsComponent>>,
132
    #[metric(group = "workers")]
133
    workers: HashMap<String, Arc<dyn RootMetricsComponent>>,
134
    // TODO(allada) We cannot upcast these to RootMetricsComponent because
135
    // of https://github.com/rust-lang/rust/issues/65991.
136
    // TODO(allada) To prevent output from being too verbose we only
137
    // print the action_schedulers.
138
    #[metric(group = "action_schedulers")]
139
    schedulers: HashMap<String, Arc<dyn ClientStateManager>>,
140
}
141
142
impl RootMetricsComponent for RootMetrics {}
143
144
/// Wrapper to allow us to hash `SocketAddr` for metrics.
145
#[derive(Hash, PartialEq, Eq)]
146
struct SocketAddrWrapper(SocketAddr);
147
148
impl MetricsComponent for SocketAddrWrapper {
149
0
    fn publish(
150
0
        &self,
151
0
        _kind: MetricKind,
152
0
        _field_metadata: MetricFieldData,
153
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
154
0
        Ok(MetricPublishKnownKindData::String(self.0.to_string()))
155
0
    }
156
}
157
158
impl RootMetricsComponent for SocketAddrWrapper {}
159
160
/// Simple wrapper to enable us to register the Hashmap so it can
161
/// report metrics about what clients are connected.
162
#[derive(MetricsComponent)]
163
struct ConnectedClientsMetrics {
164
    #[metric(group = "currently_connected_clients")]
165
    inner: Mutex<HashSet<SocketAddrWrapper>>,
166
    #[metric(help = "Total client connections since server started")]
167
    counter: Counter,
168
    #[metric(help = "Timestamp when the server started")]
169
    server_start_ts: u64,
170
}
171
172
impl RootMetricsComponent for ConnectedClientsMetrics {}
173
174
0
async fn inner_main(
175
0
    cfg: CasConfig,
176
0
    server_start_timestamp: u64,
177
0
    shutdown_tx: broadcast::Sender<ShutdownGuard>,
178
0
) -> Result<(), Error> {
179
0
    fn into_encoding(from: HttpCompressionAlgorithm) -> Option<CompressionEncoding> {
180
0
        match from {
181
0
            HttpCompressionAlgorithm::gzip => Some(CompressionEncoding::Gzip),
182
0
            HttpCompressionAlgorithm::none => None,
183
        }
184
0
    }
185
186
0
    let health_registry_builder =
187
0
        Arc::new(AsyncMutex::new(HealthRegistryBuilder::new("nativelink")));
188
0
189
0
    let store_manager = Arc::new(StoreManager::new());
190
    {
191
0
        let mut health_registry_lock = health_registry_builder.lock().await;
192
193
0
        for (name, store_cfg) in cfg.stores {
194
0
            let health_component_name = format!("stores/{name}");
195
0
            let mut health_register_store =
196
0
                health_registry_lock.sub_builder(&health_component_name);
197
0
            let store = store_factory(&store_cfg, &store_manager, Some(&mut health_register_store))
198
0
                .await
199
0
                .err_tip(|| format!("Failed to create store '{name}'"))?;
200
0
            store_manager.add_store(&name, store);
201
        }
202
    }
203
204
0
    let mut action_schedulers = HashMap::new();
205
0
    let mut worker_schedulers = HashMap::new();
206
0
    if let Some(schedulers_cfg) = cfg.schedulers {
  Branch (206:12): [Folded - Ignored]
207
0
        for (name, scheduler_cfg) in schedulers_cfg {
208
0
            let (maybe_action_scheduler, maybe_worker_scheduler) =
209
0
                scheduler_factory(&scheduler_cfg, &store_manager)
210
0
                    .err_tip(|| format!("Failed to create scheduler '{name}'"))?;
211
0
            if let Some(action_scheduler) = maybe_action_scheduler {
  Branch (211:20): [Folded - Ignored]
212
0
                action_schedulers.insert(name.clone(), action_scheduler.clone());
213
0
            }
214
0
            if let Some(worker_scheduler) = maybe_worker_scheduler {
  Branch (214:20): [Folded - Ignored]
215
0
                worker_schedulers.insert(name.clone(), worker_scheduler.clone());
216
0
            }
217
        }
218
0
    }
219
220
0
    let mut server_metrics: HashMap<String, Arc<dyn RootMetricsComponent>> = HashMap::new();
221
0
    // Registers all the ConnectedClientsMetrics to the registries
222
0
    // and zips them in. It is done this way to get around the need
223
0
    // for `root_metrics_registry` to become immutable in the loop.
224
0
    let servers_and_clients: Vec<(ServerConfig, _)> = cfg
225
0
        .servers
226
0
        .into_iter()
227
0
        .enumerate()
228
0
        .map(|(i, server_cfg)| {
229
0
            let name = if server_cfg.name.is_empty() {
  Branch (229:27): [Folded - Ignored]
230
0
                format!("{i}")
231
            } else {
232
0
                server_cfg.name.clone()
233
            };
234
0
            let connected_clients_mux = Arc::new(ConnectedClientsMetrics {
235
0
                inner: Mutex::new(HashSet::new()),
236
0
                counter: Counter::default(),
237
0
                server_start_ts: server_start_timestamp,
238
0
            });
239
0
            server_metrics.insert(name.clone(), connected_clients_mux.clone());
240
0
241
0
            (server_cfg, connected_clients_mux)
242
0
        })
243
0
        .collect();
244
0
245
0
    let mut root_futures: Vec<BoxFuture<Result<(), Error>>> = Vec::new();
246
0
247
0
    let root_metrics = Arc::new(RwLock::new(RootMetrics {
248
0
        stores: store_manager.clone(),
249
0
        servers: server_metrics,
250
0
        workers: HashMap::new(), // Will be filled in later.
251
0
        schedulers: action_schedulers.clone(),
252
0
    }));
253
254
0
    let maybe_origin_event_tx = cfg
255
0
        .experimental_origin_events
256
0
        .as_ref()
257
0
        .map(|origin_events_cfg| {
258
0
            let mut max_queued_events = origin_events_cfg.max_event_queue_size;
259
0
            if max_queued_events == 0 {
  Branch (259:16): [Folded - Ignored]
260
0
                max_queued_events = DEFAULT_MAX_QUEUE_EVENTS;
261
0
            }
262
0
            let (tx, rx) = mpsc::channel(max_queued_events);
263
0
            let store_name = origin_events_cfg.publisher.store.as_str();
264
0
            let store = store_manager.get_store(store_name).err_tip(|| {
265
0
                format!("Could not get store {store_name} for origin event publisher")
266
0
            })?;
267
268
0
            root_futures.push(Box::pin(
269
0
                OriginEventPublisher::new(store, rx, shutdown_tx.clone())
270
0
                    .run()
271
0
                    .map(Ok),
272
0
            ));
273
0
274
0
            Ok::<_, Error>(tx)
275
0
        })
276
0
        .transpose()?;
277
278
0
    for (server_cfg, connected_clients_mux) in servers_and_clients {
279
0
        let services = server_cfg
280
0
            .services
281
0
            .err_tip(|| "'services' must be configured")?;
282
283
        // Currently we only support http as our socket type.
284
0
        let ListenerConfig::http(http_config) = server_cfg.listener;
285
286
0
        let tonic_services = TonicServer::builder()
287
0
            .add_optional_service(
288
0
                services
289
0
                    .ac
290
0
                    .map_or(Ok(None), |cfg| {
291
0
                        AcServer::new(&cfg, &store_manager).map(|v| {
292
0
                            let mut service = v.into_service();
293
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
294
0
                            if let Some(encoding) =
  Branch (294:36): [Folded - Ignored]
295
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::none))
296
0
                            {
297
0
                                service = service.send_compressed(encoding);
298
0
                            }
299
0
                            for encoding in http_config
300
0
                                .compression
301
0
                                .accepted_compression_algorithms
302
0
                                .iter()
303
0
                                // Filter None values.
304
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
305
0
                            {
306
0
                                service = service.accept_compressed(encoding);
307
0
                            }
308
0
                            Some(service)
309
0
                        })
310
0
                    })
311
0
                    .err_tip(|| "Could not create AC service")?,
312
            )
313
            .add_optional_service(
314
0
                services
315
0
                    .cas
316
0
                    .map_or(Ok(None), |cfg| {
317
0
                        CasServer::new(&cfg, &store_manager).map(|v| {
318
0
                            let mut service = v.into_service();
319
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
320
0
                            if let Some(encoding) =
  Branch (320:36): [Folded - Ignored]
321
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::none))
322
0
                            {
323
0
                                service = service.send_compressed(encoding);
324
0
                            }
325
0
                            for encoding in http_config
326
0
                                .compression
327
0
                                .accepted_compression_algorithms
328
0
                                .iter()
329
0
                                // Filter None values.
330
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
331
0
                            {
332
0
                                service = service.accept_compressed(encoding);
333
0
                            }
334
0
                            Some(service)
335
0
                        })
336
0
                    })
337
0
                    .err_tip(|| "Could not create CAS service")?,
338
            )
339
            .add_optional_service(
340
0
                services
341
0
                    .execution
342
0
                    .map_or(Ok(None), |cfg| {
343
0
                        ExecutionServer::new(&cfg, &action_schedulers, &store_manager).map(|v| {
344
0
                            let mut service = v.into_service();
345
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
346
0
                            if let Some(encoding) =
  Branch (346:36): [Folded - Ignored]
347
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::none))
348
0
                            {
349
0
                                service = service.send_compressed(encoding);
350
0
                            }
351
0
                            for encoding in http_config
352
0
                                .compression
353
0
                                .accepted_compression_algorithms
354
0
                                .iter()
355
0
                                // Filter None values.
356
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
357
0
                            {
358
0
                                service = service.accept_compressed(encoding);
359
0
                            }
360
0
                            Some(service)
361
0
                        })
362
0
                    })
363
0
                    .err_tip(|| "Could not create Execution service")?,
364
            )
365
            .add_optional_service(
366
0
                services
367
0
                    .bytestream
368
0
                    .map_or(Ok(None), |cfg| {
369
0
                        ByteStreamServer::new(&cfg, &store_manager).map(|v| {
370
0
                            let mut service = v.into_service();
371
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
372
0
                            if let Some(encoding) =
  Branch (372:36): [Folded - Ignored]
373
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::none))
374
0
                            {
375
0
                                service = service.send_compressed(encoding);
376
0
                            }
377
0
                            for encoding in http_config
378
0
                                .compression
379
0
                                .accepted_compression_algorithms
380
0
                                .iter()
381
0
                                // Filter None values.
382
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
383
0
                            {
384
0
                                service = service.accept_compressed(encoding);
385
0
                            }
386
0
                            Some(service)
387
0
                        })
388
0
                    })
389
0
                    .err_tip(|| "Could not create ByteStream service")?,
390
            )
391
            .add_optional_service(
392
0
                OptionFuture::from(
393
0
                    services
394
0
                        .capabilities
395
0
                        .as_ref()
396
0
                        // Borrow checker fighting here...
397
0
                        .map(|_| {
398
0
                            CapabilitiesServer::new(
399
0
                                services.capabilities.as_ref().unwrap(),
400
0
                                &action_schedulers,
401
0
                            )
402
0
                        }),
403
0
                )
404
0
                .await
405
0
                .map_or(Ok::<Option<CapabilitiesServer>, Error>(None), |server| {
406
0
                    Ok(Some(server?))
407
0
                })
408
0
                .err_tip(|| "Could not create Capabilities service")?
409
0
                .map(|v| {
410
0
                    let mut service = v.into_service();
411
0
                    let send_algo = &http_config.compression.send_compression_algorithm;
412
0
                    if let Some(encoding) =
  Branch (412:28): [Folded - Ignored]
413
0
                        into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::none))
414
0
                    {
415
0
                        service = service.send_compressed(encoding);
416
0
                    }
417
0
                    for encoding in http_config
418
0
                        .compression
419
0
                        .accepted_compression_algorithms
420
0
                        .iter()
421
0
                        // Filter None values.
422
0
                        .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
423
0
                    {
424
0
                        service = service.accept_compressed(encoding);
425
0
                    }
426
0
                    service
427
0
                }),
428
0
            )
429
0
            .add_optional_service(
430
0
                services
431
0
                    .worker_api
432
0
                    .map_or(Ok(None), |cfg| {
433
0
                        WorkerApiServer::new(&cfg, &worker_schedulers).map(|v| {
434
0
                            let mut service = v.into_service();
435
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
436
0
                            if let Some(encoding) =
  Branch (436:36): [Folded - Ignored]
437
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::none))
438
0
                            {
439
0
                                service = service.send_compressed(encoding);
440
0
                            }
441
0
                            for encoding in http_config
442
0
                                .compression
443
0
                                .accepted_compression_algorithms
444
0
                                .iter()
445
0
                                // Filter None values.
446
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
447
0
                            {
448
0
                                service = service.accept_compressed(encoding);
449
0
                            }
450
0
                            Some(service)
451
0
                        })
452
0
                    })
453
0
                    .err_tip(|| "Could not create WorkerApi service")?,
454
            )
455
            .add_optional_service(
456
0
                services
457
0
                    .experimental_bep
458
0
                    .map_or(Ok(None), |cfg| {
459
0
                        BepServer::new(&cfg, &store_manager).map(|v| {
460
0
                            let mut service = v.into_service();
461
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
462
0
                            if let Some(encoding) =
  Branch (462:36): [Folded - Ignored]
463
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::none))
464
0
                            {
465
0
                                service = service.send_compressed(encoding);
466
0
                            }
467
0
                            for encoding in http_config
468
0
                                .compression
469
0
                                .accepted_compression_algorithms
470
0
                                .iter()
471
0
                                // Filter None values.
472
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
473
0
                            {
474
0
                                service = service.accept_compressed(encoding);
475
0
                            }
476
0
                            Some(service)
477
0
                        })
478
0
                    })
479
0
                    .err_tip(|| "Could not create BEP service")?,
480
            );
481
482
0
        let health_registry = health_registry_builder.lock().await.build();
483
0
484
0
        let mut svc = Router::new().merge(tonic_services.into_service().into_axum_router().layer(
485
0
            OriginEventMiddlewareLayer::new(
486
0
                maybe_origin_event_tx.clone(),
487
0
                server_cfg.experimental_identity_header.clone(),
488
0
            ),
489
0
        ));
490
491
0
        if let Some(health_cfg) = services.health {
  Branch (491:16): [Folded - Ignored]
492
0
            let path = if health_cfg.path.is_empty() {
  Branch (492:27): [Folded - Ignored]
493
0
                DEFAULT_HEALTH_STATUS_CHECK_PATH
494
            } else {
495
0
                &health_cfg.path
496
            };
497
0
            svc = svc.route_service(path, HealthServer::new(health_registry));
498
0
        }
499
500
0
        if let Some(prometheus_cfg) = services.experimental_prometheus {
  Branch (500:16): [Folded - Ignored]
501
0
            fn error_to_response<E: std::error::Error>(e: E) -> Response<axum::body::Body> {
502
0
                let mut response = Response::new(format!("Error: {e:?}").into());
503
0
                *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
504
0
                response
505
0
            }
506
0
            let path = if prometheus_cfg.path.is_empty() {
  Branch (506:27): [Folded - Ignored]
507
0
                DEFAULT_PROMETHEUS_METRICS_PATH
508
            } else {
509
0
                &prometheus_cfg.path
510
            };
511
512
0
            let root_metrics_clone = root_metrics.clone();
513
0
514
0
            svc = svc.route_service(
515
0
                path,
516
0
                axum::routing::get(move |request: hyper::Request<axum::body::Body>| {
517
0
                    ActiveOriginContext::get()
518
0
                        .expect("OriginContext should be set here")
519
0
                        .wrap_async(trace_span!("prometheus_ctx"), async move {
520
0
                            // We spawn on a thread that can block to give more freedom to our metrics
521
0
                            // collection. This allows it to call functions like `tokio::block_in_place`
522
0
                            // if it needs to wait on a future.
523
0
                            spawn_blocking!("prometheus_metrics", move || {
524
0
                                let (layer, output_metrics) = MetricsCollectorLayer::new();
525
0
526
0
                                // Traverse all the MetricsComponent's. The `MetricsCollectorLayer` will
527
0
                                // collect all the metrics and store them in `output_metrics`.
528
0
                                tracing::subscriber::with_default(
529
0
                                    tracing_subscriber::registry().with(layer),
530
0
                                    || {
531
0
                                        let metrics_component = root_metrics_clone.read();
532
0
                                        MetricsComponent::publish(
533
0
                                            &*metrics_component,
534
0
                                            MetricKind::Component,
535
0
                                            MetricFieldData::default(),
536
0
                                        )
537
0
                                    },
538
0
                                )
539
0
                                .map_err(|e| make_err!(Code::Internal, "{e}"))
540
0
                                .err_tip(|| "While processing prometheus metrics")?;
541
542
                                // Convert the collected metrics into OpenTelemetry metrics then
543
                                // encode them into Prometheus format and populate them into a
544
                                // hyper::Response.
545
0
                                let response = {
546
0
                                    let registry = prometheus::Registry::new();
547
0
                                    let exporter = opentelemetry_prometheus::exporter()
548
0
                                        .with_registry(registry.clone())
549
0
                                        .without_counter_suffixes()
550
0
                                        .without_scope_info()
551
0
                                        .build()
552
0
                                        .map_err(|e| make_err!(Code::Internal, "{e}"))
553
0
                                        .err_tip(|| {
554
0
                                            "While creating OpenTelemetry Prometheus exporter"
555
0
                                        })?;
556
557
                                    // Prepare our OpenTelemetry collector/exporter.
558
0
                                    let provider =
559
0
                                        SdkMeterProvider::builder().with_reader(exporter).build();
560
0
                                    let meter = provider.meter("nativelink");
561
562
                                    // TODO(allada) We should put this as part of the config instead of a magic
563
                                    // request header.
564
0
                                    if let Some(json_type) =
  Branch (564:44): [Folded - Ignored]
565
0
                                        request.headers().get("x-nativelink-json")
566
                                    {
567
0
                                        let json_data = if json_type == "pretty" {
  Branch (567:60): [Folded - Ignored]
568
0
                                            serde_json::to_string_pretty(&*output_metrics.lock())
569
0
                                                .map_err(|e| {
570
0
                                                    make_err!(
571
0
                                                        Code::Internal,
572
0
                                                        "Could not convert to json {e:?}"
573
0
                                                    )
574
0
                                                })?
575
                                        } else {
576
0
                                            serde_json::to_string(&*output_metrics.lock()).map_err(
577
0
                                                |e| {
578
0
                                                    make_err!(
579
0
                                                        Code::Internal,
580
0
                                                        "Could not convert to json {e:?}"
581
0
                                                    )
582
0
                                                },
583
0
                                            )?
584
                                        };
585
0
                                        let mut response =
586
0
                                            Response::new(axum::body::Body::from(json_data));
587
0
                                        response.headers_mut().insert(
588
0
                                            hyper::header::CONTENT_TYPE,
589
0
                                            hyper::header::HeaderValue::from_static(
590
0
                                                "application/json",
591
0
                                            ),
592
0
                                        );
593
0
                                        return Ok(response);
594
0
                                    }
595
0
596
0
                                    // Export the metrics to OpenTelemetry.
597
0
                                    otel_export(
598
0
                                        "nativelink".to_string(),
599
0
                                        &meter,
600
0
                                        &output_metrics.lock(),
601
0
                                    );
602
0
603
0
                                    // Translate the OpenTelemetry metrics to Prometheus format and encode
604
0
                                    // them into a hyper::Response.
605
0
                                    let mut result = vec![];
606
0
                                    TextEncoder::new()
607
0
                                        .encode(&registry.gather(), &mut result)
608
0
                                        .unwrap();
609
0
                                    let mut response =
610
0
                                        Response::new(axum::body::Body::from(result));
611
0
                                    // Per spec we should probably use `application/openmetrics-text; version=1.0.0; charset=utf-8`
612
0
                                    // https://github.com/OpenObservability/OpenMetrics/blob/1386544931307dff279688f332890c31b6c5de36/specification/OpenMetrics.md#overall-structure
613
0
                                    // However, this makes debugging more difficult, so we use the old text/plain instead.
614
0
                                    response.headers_mut().insert(
615
0
                                        hyper::header::CONTENT_TYPE,
616
0
                                        hyper::header::HeaderValue::from_static(
617
0
                                            "text/plain; version=0.0.4; charset=utf-8",
618
0
                                        ),
619
0
                                    );
620
0
                                    Result::<_, Error>::Ok(response)
621
0
                                };
622
0
                                response
623
0
                            })
624
0
                            .await
625
0
                            .unwrap_or_else(|e| Ok(error_to_response(e)))
626
0
                            .unwrap_or_else(error_to_response)
627
0
                        })
628
0
                }),
629
0
            );
630
0
        }
631
632
0
        if let Some(admin_config) = services.admin {
  Branch (632:16): [Folded - Ignored]
633
0
            let path = if admin_config.path.is_empty() {
  Branch (633:27): [Folded - Ignored]
634
0
                DEFAULT_ADMIN_API_PATH
635
            } else {
636
0
                &admin_config.path
637
            };
638
0
            let worker_schedulers = Arc::new(worker_schedulers.clone());
639
0
            svc = svc.nest_service(
640
0
                path,
641
0
                Router::new().route(
642
0
                    "/scheduler/:instance_name/set_drain_worker/:worker_id/:is_draining",
643
0
                    axum::routing::post(
644
0
                        move |params: axum::extract::Path<(String, String, String)>| async move {
645
0
                            let (instance_name, worker_id, is_draining) = params.0;
646
0
                            (async move {
647
0
                                let is_draining = match is_draining.as_str() {
648
0
                                    "0" => false,
649
0
                                    "1" => true,
650
                                    _ => {
651
0
                                        return Err(make_err!(
652
0
                                            Code::Internal,
653
0
                                            "{} is neither 0 nor 1",
654
0
                                            is_draining
655
0
                                        ))
656
                                    }
657
                                };
658
0
                                worker_schedulers
659
0
                                    .get(&instance_name)
660
0
                                    .err_tip(|| {
661
0
                                        format!(
662
0
                                            "Can not get an instance with the name of '{}'",
663
0
                                            &instance_name
664
0
                                        )
665
0
                                    })?
666
0
                                    .clone()
667
0
                                    .set_drain_worker(
668
0
                                        &WorkerId::try_from(worker_id.clone())?,
669
0
                                        is_draining,
670
0
                                    )
671
0
                                    .await?;
672
0
                                Ok::<_, Error>(format!("Draining worker {worker_id}"))
673
0
                            })
674
0
                            .await
675
0
                            .map_err(|e| {
676
0
                                Err::<String, _>((
677
0
                                    axum::http::StatusCode::INTERNAL_SERVER_ERROR,
678
0
                                    format!("Error: {e:?}"),
679
0
                                ))
680
0
                            })
681
0
                        },
682
0
                    ),
683
0
                ),
684
0
            );
685
0
        }
686
687
0
        svc = svc
688
0
            // This is the default service that executes if no other endpoint matches.
689
0
            .fallback((StatusCode::NOT_FOUND, "Not Found"));
690
691
        // Configure our TLS acceptor if we have TLS configured.
692
0
        let maybe_tls_acceptor = http_config.tls.map_or(Ok(None), |tls_config| {
693
0
            fn read_cert(cert_file: &str) -> Result<Vec<CertificateDer<'static>>, Error> {
694
0
                let mut cert_reader = std::io::BufReader::new(
695
0
                    std::fs::File::open(cert_file)
696
0
                        .err_tip(|| format!("Could not open cert file {cert_file}"))?,
697
                );
698
0
                let certs = extract_certs(&mut cert_reader)
699
0
                    .map(|certificate| certificate.map(CertificateDer::from))
700
0
                    .collect::<Result<Vec<CertificateDer<'_>>, _>>()
701
0
                    .err_tip(|| format!("Could not extract certs from file {cert_file}"))?;
702
0
                Ok(certs)
703
0
            }
704
0
            let certs = read_cert(&tls_config.cert_file)?;
705
0
            let mut key_reader = std::io::BufReader::new(
706
0
                std::fs::File::open(&tls_config.key_file)
707
0
                    .err_tip(|| format!("Could not open key file {}", tls_config.key_file))?,
708
            );
709
0
            let key = match rustls_pemfile::read_one(&mut key_reader)
710
0
                .err_tip(|| format!("Could not extract key(s) from file {}", tls_config.key_file))?
711
            {
712
0
                Some(rustls_pemfile::Item::Pkcs8Key(key)) => key.into(),
713
0
                Some(rustls_pemfile::Item::Sec1Key(key)) => key.into(),
714
0
                Some(rustls_pemfile::Item::Pkcs1Key(key)) => key.into(),
715
                _ => {
716
0
                    return Err(make_err!(
717
0
                        Code::Internal,
718
0
                        "No keys found in file {}",
719
0
                        tls_config.key_file
720
0
                    ))
721
                }
722
            };
723
0
            if let Ok(Some(_)) = rustls_pemfile::read_one(&mut key_reader) {
  Branch (723:20): [Folded - Ignored]
724
0
                return Err(make_err!(
725
0
                    Code::InvalidArgument,
726
0
                    "Expected 1 key in file {}",
727
0
                    tls_config.key_file
728
0
                ));
729
0
            }
730
0
            let verifier = if let Some(client_ca_file) = &tls_config.client_ca_file {
  Branch (730:35): [Folded - Ignored]
731
0
                let mut client_auth_roots = RootCertStore::empty();
732
0
                for cert in read_cert(client_ca_file)? {
733
0
                    client_auth_roots.add(cert).map_err(|e| {
734
0
                        make_err!(Code::Internal, "Could not read client CA: {e:?}")
735
0
                    })?;
736
                }
737
0
                let crls = if let Some(client_crl_file) = &tls_config.client_crl_file {
  Branch (737:35): [Folded - Ignored]
738
0
                    let mut crl_reader = std::io::BufReader::new(
739
0
                        std::fs::File::open(client_crl_file)
740
0
                            .err_tip(|| format!("Could not open CRL file {client_crl_file}"))?,
741
                    );
742
0
                    extract_crls(&mut crl_reader)
743
0
                        .map(|crl| crl.map(CertificateRevocationListDer::from))
744
0
                        .collect::<Result<_, _>>()
745
0
                        .err_tip(|| format!("Could not extract CRLs from file {client_crl_file}"))?
746
                } else {
747
0
                    Vec::new()
748
                };
749
0
                WebPkiClientVerifier::builder(Arc::new(client_auth_roots))
750
0
                    .with_crls(crls)
751
0
                    .build()
752
0
                    .map_err(|e| {
753
0
                        make_err!(
754
0
                            Code::Internal,
755
0
                            "Could not create WebPkiClientVerifier: {e:?}"
756
0
                        )
757
0
                    })?
758
            } else {
759
0
                WebPkiClientVerifier::no_client_auth()
760
            };
761
0
            let mut config = TlsServerConfig::builder()
762
0
                .with_client_cert_verifier(verifier)
763
0
                .with_single_cert(certs, key)
764
0
                .map_err(|e| {
765
0
                    make_err!(Code::Internal, "Could not create TlsServerConfig : {e:?}")
766
0
                })?;
767
768
0
            config.alpn_protocols.push("h2".into());
769
0
            Ok(Some(TlsAcceptor::from(Arc::new(config))))
770
0
        })?;
771
772
0
        let socket_addr = http_config
773
0
            .socket_address
774
0
            .parse::<SocketAddr>()
775
0
            .map_err(|e| {
776
0
                make_input_err!("Invalid address '{}' - {e:?}", http_config.socket_address)
777
0
            })?;
778
0
        let tcp_listener = TcpListener::bind(&socket_addr).await?;
779
0
        let mut http = auto::Builder::new(TaskExecutor::default());
780
0
781
0
        let http_config = &http_config.advanced_http;
782
0
        if let Some(value) = http_config.http2_keep_alive_interval {
  Branch (782:16): [Folded - Ignored]
783
0
            http.http2()
784
0
                .keep_alive_interval(Duration::from_secs(u64::from(value)));
785
0
        }
786
787
0
        if let Some(value) = http_config.experimental_http2_max_pending_accept_reset_streams {
  Branch (787:16): [Folded - Ignored]
788
0
            http.http2()
789
0
                .max_pending_accept_reset_streams(usize::try_from(value).err_tip(|| {
790
0
                    "Could not convert experimental_http2_max_pending_accept_reset_streams"
791
0
                })?);
792
0
        }
793
0
        if let Some(value) = http_config.experimental_http2_initial_stream_window_size {
  Branch (793:16): [Folded - Ignored]
794
0
            http.http2().initial_stream_window_size(value);
795
0
        }
796
0
        if let Some(value) = http_config.experimental_http2_initial_connection_window_size {
  Branch (796:16): [Folded - Ignored]
797
0
            http.http2().initial_connection_window_size(value);
798
0
        }
799
0
        if let Some(value) = http_config.experimental_http2_adaptive_window {
  Branch (799:16): [Folded - Ignored]
800
0
            http.http2().adaptive_window(value);
801
0
        }
802
0
        if let Some(value) = http_config.experimental_http2_max_frame_size {
  Branch (802:16): [Folded - Ignored]
803
0
            http.http2().max_frame_size(value);
804
0
        }
805
0
        if let Some(value) = http_config.experimental_http2_max_concurrent_streams {
  Branch (805:16): [Folded - Ignored]
806
0
            http.http2().max_concurrent_streams(value);
807
0
        }
808
0
        if let Some(value) = http_config.experimental_http2_keep_alive_timeout {
  Branch (808:16): [Folded - Ignored]
809
0
            http.http2()
810
0
                .keep_alive_timeout(Duration::from_secs(u64::from(value)));
811
0
        }
812
0
        if let Some(value) = http_config.experimental_http2_max_send_buf_size {
  Branch (812:16): [Folded - Ignored]
813
0
            http.http2().max_send_buf_size(
814
0
                usize::try_from(value).err_tip(|| "Could not convert http2_max_send_buf_size")?,
815
            );
816
0
        }
817
0
        if let Some(true) = http_config.experimental_http2_enable_connect_protocol {
  Branch (817:16): [Folded - Ignored]
818
0
            http.http2().enable_connect_protocol();
819
0
        }
820
0
        if let Some(value) = http_config.experimental_http2_max_header_list_size {
  Branch (820:16): [Folded - Ignored]
821
0
            http.http2().max_header_list_size(value);
822
0
        }
823
0
        event!(Level::WARN, "Ready, listening on {socket_addr}",);
824
0
        root_futures.push(Box::pin(async move {
825
            loop {
826
0
                select! {
827
0
                    accept_result = tcp_listener.accept() => {
828
0
                        match accept_result {
829
0
                            Ok((tcp_stream, remote_addr)) => {
830
0
                                event!(
831
                                    target: "nativelink::services",
832
0
                                    Level::INFO,
833
                                    ?remote_addr,
834
                                    ?socket_addr,
835
0
                                    "Client connected"
836
                                );
837
0
                                connected_clients_mux
838
0
                                    .inner
839
0
                                    .lock()
840
0
                                    .insert(SocketAddrWrapper(remote_addr));
841
0
                                connected_clients_mux.counter.inc();
842
0
843
0
                                // This is the safest way to guarantee that if our future
844
0
                                // is ever dropped we will cleanup our data.
845
0
                                let scope_guard = guard(
846
0
                                    Arc::downgrade(&connected_clients_mux),
847
0
                                    move |weak_connected_clients_mux| {
848
0
                                        event!(
849
                                            target: "nativelink::services",
850
0
                                            Level::INFO,
851
                                            ?remote_addr,
852
                                            ?socket_addr,
853
0
                                            "Client disconnected"
854
                                        );
855
0
                                        if let Some(connected_clients_mux) = weak_connected_clients_mux.upgrade() {
  Branch (855:48): [Folded - Ignored]
856
0
                                            connected_clients_mux
857
0
                                                .inner
858
0
                                                .lock()
859
0
                                                .remove(&SocketAddrWrapper(remote_addr));
860
0
                                        }
861
0
                                    },
862
0
                                );
863
0
864
0
                                let (http, svc, maybe_tls_acceptor) =
865
0
                                    (http.clone(), svc.clone(), maybe_tls_acceptor.clone());
866
0
                                Arc::new(OriginContext::new()).background_spawn(
867
0
                                    error_span!(
868
0
                                        target: "nativelink::services",
869
0
                                        "http_connection",
870
0
                                        ?remote_addr,
871
0
                                        ?socket_addr
872
0
                                    ),
873
0
                                    async move {},
874
0
                                );
875
0
                                background_spawn!(
876
0
                                    name: "http_connection",
877
0
                                    fut: async move {
878
0
                                        // Move it into our spawn, so if our spawn dies the cleanup happens.
879
0
                                        let _guard = scope_guard;
880
0
                                        let serve_connection = if let Some(tls_acceptor) = maybe_tls_acceptor {
  Branch (880:71): [Folded - Ignored]
881
0
                                            match tls_acceptor.accept(tcp_stream).await {
882
0
                                                Ok(tls_stream) => Either::Left(http.serve_connection(
883
0
                                                    TokioIo::new(tls_stream),
884
0
                                                    TowerToHyperService::new(svc),
885
0
                                                )),
886
0
                                                Err(err) => {
887
0
                                                    event!(Level::ERROR, ?err, "Failed to accept tls stream");
888
0
                                                    return;
889
                                                }
890
                                            }
891
                                        } else {
892
0
                                            Either::Right(http.serve_connection(
893
0
                                                TokioIo::new(tcp_stream),
894
0
                                                TowerToHyperService::new(svc),
895
0
                                            ))
896
                                        };
897
898
0
                                        if let Err(err) = serve_connection.await {
  Branch (898:48): [Folded - Ignored]
899
0
                                            event!(
900
                                                target: "nativelink::services",
901
0
                                                Level::ERROR,
902
                                                ?err,
903
0
                                                "Failed running service"
904
                                            );
905
0
                                        }
906
0
                                    },
907
0
                                    target: "nativelink::services",
908
0
                                    ?remote_addr,
909
0
                                    ?socket_addr,
910
0
                                );
911
                            },
912
0
                            Err(err) => {
913
0
                                event!(Level::ERROR, ?err, "Failed to accept tcp connection");
914
0
                                continue;
915
                            }
916
                        }
917
                    },
918
                }
919
            }
920
            // Unreachable
921
0
        }));
922
0
    }
923
924
    {
925
        // We start workers after our TcpListener is setup so if our worker connects to one
926
        // of these services it will be able to connect.
927
0
        let worker_cfgs = cfg.workers.unwrap_or_default();
928
0
        let mut worker_names = HashSet::with_capacity(worker_cfgs.len());
929
0
        let mut worker_metrics: HashMap<String, Arc<dyn RootMetricsComponent>> = HashMap::new();
930
0
        for (i, worker_cfg) in worker_cfgs.into_iter().enumerate() {
931
0
            let spawn_fut = match worker_cfg {
932
0
                WorkerConfig::local(local_worker_cfg) => {
933
0
                    let fast_slow_store = store_manager
934
0
                        .get_store(&local_worker_cfg.cas_fast_slow_store)
935
0
                        .err_tip(|| {
936
0
                            format!(
937
0
                                "Failed to find store for cas_store_ref in worker config : {}",
938
0
                                local_worker_cfg.cas_fast_slow_store
939
0
                            )
940
0
                        })?;
941
942
0
                    let maybe_ac_store = if let Some(ac_store_ref) =
  Branch (942:49): [Folded - Ignored]
943
0
                        &local_worker_cfg.upload_action_result.ac_store
944
                    {
945
0
                        Some(store_manager.get_store(ac_store_ref).err_tip(|| {
946
0
                            format!("Failed to find store for ac_store in worker config : {ac_store_ref}")
947
0
                        })?)
948
                    } else {
949
0
                        None
950
                    };
951
                    // Note: Defaults to fast_slow_store if not specified. If this ever changes it must
952
                    // be updated in config documentation for the `historical_results_store` the field.
953
0
                    let historical_store = if let Some(cas_store_ref) = &local_worker_cfg
  Branch (953:51): [Folded - Ignored]
954
0
                        .upload_action_result
955
0
                        .historical_results_store
956
                    {
957
0
                        store_manager.get_store(cas_store_ref).err_tip(|| {
958
0
                                format!(
959
0
                                "Failed to find store for historical_results_store in worker config : {cas_store_ref}"
960
0
                            )
961
0
                            })?
962
                    } else {
963
0
                        fast_slow_store.clone()
964
                    };
965
0
                    let (local_worker, metrics) = new_local_worker(
966
0
                        Arc::new(local_worker_cfg),
967
0
                        fast_slow_store,
968
0
                        maybe_ac_store,
969
0
                        historical_store,
970
0
                    )
971
0
                    .await
972
0
                    .err_tip(|| "Could not make LocalWorker")?;
973
974
0
                    let name = if local_worker.name().is_empty() {
  Branch (974:35): [Folded - Ignored]
975
0
                        format!("worker_{i}")
976
                    } else {
977
0
                        local_worker.name().clone()
978
                    };
979
980
0
                    if worker_names.contains(&name) {
  Branch (980:24): [Folded - Ignored]
981
0
                        Err(make_input_err!(
982
0
                            "Duplicate worker name '{}' found in config",
983
0
                            name
984
0
                        ))?;
985
0
                    }
986
0
                    worker_names.insert(name.clone());
987
0
                    worker_metrics.insert(name.clone(), metrics);
988
0
                    let shutdown_rx = shutdown_tx.subscribe();
989
0
                    let fut = Arc::new(OriginContext::new())
990
0
                        .wrap_async(trace_span!("worker_ctx"), local_worker.run(shutdown_rx));
991
0
                    spawn!("worker", fut, ?name)
992
                }
993
            };
994
0
            root_futures.push(Box::pin(spawn_fut.map_ok_or_else(|e| Err(e.into()), |v| v)));
995
0
        }
996
0
        root_metrics.write().workers = worker_metrics;
997
    }
998
999
0
    if let Err(e) = try_join_all(root_futures).await {
  Branch (999:12): [Folded - Ignored]
1000
0
        panic!("{e:?}");
1001
0
    };
1002
0
1003
0
    Ok(())
1004
0
}
1005
1006
0
async fn get_config() -> Result<CasConfig, Box<dyn std::error::Error>> {
1007
0
    let args = Args::parse();
1008
0
    let json_contents = String::from_utf8(
1009
0
        std::fs::read(&args.config_file)
1010
0
            .err_tip(|| format!("Could not open config file {}", args.config_file))?,
1011
0
    )?;
1012
0
    Ok(serde_json5::from_str(&json_contents)?)
1013
0
}
1014
1015
0
fn main() -> Result<(), Box<dyn std::error::Error>> {
1016
0
    init_tracing()?;
1017
1018
0
    let mut cfg = futures::executor::block_on(get_config())?;
1019
1020
0
    let (mut metrics_enabled, max_blocking_threads) = {
1021
0
        // Note: If the default changes make sure you update the documentation in
1022
0
        // `config/cas_server.rs`.
1023
0
        const DEFAULT_MAX_OPEN_FILES: usize = 512;
1024
0
        // Note: If the default changes make sure you update the documentation in
1025
0
        // `config/cas_server.rs`.
1026
0
        const DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS: u64 = 1000;
1027
0
        let global_cfg = if let Some(global_cfg) = &mut cfg.global {
  Branch (1027:33): [Folded - Ignored]
1028
0
            if global_cfg.max_open_files == 0 {
  Branch (1028:16): [Folded - Ignored]
1029
0
                global_cfg.max_open_files = DEFAULT_MAX_OPEN_FILES;
1030
0
            }
1031
0
            if global_cfg.idle_file_descriptor_timeout_millis == 0 {
  Branch (1031:16): [Folded - Ignored]
1032
0
                global_cfg.idle_file_descriptor_timeout_millis =
1033
0
                    DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS;
1034
0
            }
1035
0
            if global_cfg.default_digest_size_health_check == 0 {
  Branch (1035:16): [Folded - Ignored]
1036
0
                global_cfg.default_digest_size_health_check = DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG;
1037
0
            }
1038
1039
0
            *global_cfg
1040
        } else {
1041
0
            GlobalConfig {
1042
0
                max_open_files: DEFAULT_MAX_OPEN_FILES,
1043
0
                idle_file_descriptor_timeout_millis: DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS,
1044
0
                disable_metrics: cfg.servers.iter().all(|v| {
1045
0
                    let Some(service) = &v.services else {
  Branch (1045:25): [Folded - Ignored]
1046
0
                        return true;
1047
                    };
1048
0
                    service.experimental_prometheus.is_none()
1049
0
                }),
1050
0
                default_digest_hash_function: None,
1051
0
                default_digest_size_health_check: DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG,
1052
0
            }
1053
        };
1054
0
        set_open_file_limit(global_cfg.max_open_files);
1055
0
        set_idle_file_descriptor_timeout(Duration::from_millis(
1056
0
            global_cfg.idle_file_descriptor_timeout_millis,
1057
0
        ))?;
1058
0
        set_default_digest_hasher_func(DigestHasherFunc::from(
1059
0
            global_cfg
1060
0
                .default_digest_hash_function
1061
0
                .unwrap_or(ConfigDigestHashFunction::sha256),
1062
0
        ))?;
1063
0
        set_default_digest_size_health_check(global_cfg.default_digest_size_health_check)?;
1064
        // TODO (#513): prevent deadlocks by assigning max blocking threads number of open files * ten
1065
0
        (!global_cfg.disable_metrics, global_cfg.max_open_files * 10)
1066
0
    };
1067
0
    // Override metrics enabled if the environment variable is set.
1068
0
    if std::env::var(METRICS_DISABLE_ENV).is_ok() {
  Branch (1068:8): [Folded - Ignored]
1069
0
        metrics_enabled = false;
1070
0
    }
1071
0
    let server_start_time = SystemTime::now()
1072
0
        .duration_since(UNIX_EPOCH)
1073
0
        .unwrap()
1074
0
        .as_secs();
1075
    #[allow(clippy::disallowed_methods)]
1076
    {
1077
0
        let runtime = tokio::runtime::Builder::new_multi_thread()
1078
0
            .max_blocking_threads(max_blocking_threads)
1079
0
            .enable_all()
1080
0
            .on_thread_start(move || set_metrics_enabled_for_this_thread(metrics_enabled))
1081
0
            .build()?;
1082
1083
        // Initiates the shutdown process by broadcasting the shutdown signal via the `oneshot::Sender` to all listeners.
1084
        // Each listener will perform its cleanup and then drop its `oneshot::Sender`, signaling completion.
1085
        // Once all `oneshot::Sender` instances are dropped, the worker knows it can safely terminate.
1086
0
        let (shutdown_tx, _) = broadcast::channel::<ShutdownGuard>(BROADCAST_CAPACITY);
1087
0
        let shutdown_tx_clone = shutdown_tx.clone();
1088
0
        let mut shutdown_guard = ShutdownGuard::default();
1089
0
1090
0
        runtime.spawn(async move {
1091
0
            tokio::signal::ctrl_c()
1092
0
                .await
1093
0
                .expect("Failed to listen to SIGINT");
1094
0
            eprintln!("User terminated process via SIGINT");
1095
0
            std::process::exit(130);
1096
0
        });
1097
0
1098
0
        #[cfg(target_family = "unix")]
1099
0
        {
1100
0
            runtime.spawn(async move {
1101
0
                signal(SignalKind::terminate())
1102
0
                    .expect("Failed to listen to SIGTERM")
1103
0
                    .recv()
1104
0
                    .await;
1105
0
                event!(Level::WARN, "Process terminated via SIGTERM",);
1106
0
                let _ = shutdown_tx_clone.send(shutdown_guard.clone());
1107
0
                let () = shutdown_guard.wait_for(Priority::P0).await;
1108
0
                event!(Level::WARN, "Successfully shut down nativelink.",);
1109
0
                std::process::exit(143);
1110
0
            });
1111
0
        }
1112
0
1113
0
        runtime
1114
0
            .block_on(Arc::new(OriginContext::new()).wrap_async(
1115
0
                trace_span!("main"),
1116
0
                inner_main(cfg, server_start_time, shutdown_tx),
1117
0
            ))
1118
0
            .err_tip(|| "main() function failed")?;
1119
    }
1120
0
    Ok(())
1121
0
}