/build/source/nativelink-util/src/fs.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::fs::Metadata; |
16 | | use std::io::IoSlice; |
17 | | use std::path::{Path, PathBuf}; |
18 | | use std::pin::Pin; |
19 | | use std::sync::atomic::{AtomicUsize, Ordering}; |
20 | | use std::sync::OnceLock; |
21 | | use std::task::{Context, Poll}; |
22 | | use std::time::Duration; |
23 | | |
24 | | use bytes::BytesMut; |
25 | | use futures::Future; |
26 | | use nativelink_error::{make_err, Code, Error, ResultExt}; |
27 | | /// We wrap all `tokio::fs` items in our own wrapper so we can limit the number of outstanding |
28 | | /// open files at any given time. This will greatly reduce the chance we'll hit open file limit |
29 | | /// issues. |
30 | | pub use tokio::fs::DirEntry; |
31 | | use tokio::io::{ |
32 | | AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, ReadBuf, SeekFrom, Take, |
33 | | }; |
34 | | use tokio::sync::{Semaphore, SemaphorePermit}; |
35 | | use tokio::time::timeout; |
36 | | use tracing::{event, Level}; |
37 | | |
38 | | use crate::spawn_blocking; |
39 | | |
40 | | /// Default read buffer size when reading to/from disk. |
41 | | pub const DEFAULT_READ_BUFF_SIZE: usize = 16384; |
42 | | |
43 | | type StreamPosition = u64; |
44 | | type BytesRemaining = u64; |
45 | | |
46 | | #[derive(Debug)] |
47 | | enum MaybeFileSlot { |
48 | | Open(Take<FileSlot>), |
49 | | Closed((StreamPosition, BytesRemaining)), |
50 | | } |
51 | | |
52 | | /// A wrapper around a generic `FileSlot`. This gives us the ability to |
53 | | /// close a file and then resume it later. Specifically useful for cases |
54 | | /// piping data from one location to another and one side is slow at |
55 | | /// reading or writing the data, we can have a timeout, close the file |
56 | | /// and then reopen it later. |
57 | | /// |
58 | | /// Note: This wraps both files opened for read and write, so we always |
59 | | /// need to know how the original file was opened and the location of |
60 | | /// the file. To simplify the code significantly we always require the |
61 | | /// file to be a `Take<FileSlot>`. |
62 | | #[derive(Debug)] |
63 | | pub struct ResumeableFileSlot { |
64 | | maybe_file_slot: MaybeFileSlot, |
65 | | path: PathBuf, |
66 | | is_write: bool, |
67 | | } |
68 | | |
69 | | impl ResumeableFileSlot { |
70 | 83 | pub fn new(file: FileSlot, path: PathBuf, is_write: bool) -> Self { |
71 | 83 | Self { |
72 | 83 | maybe_file_slot: MaybeFileSlot::Open(file.take(u64::MAX)), |
73 | 83 | path, |
74 | 83 | is_write, |
75 | 83 | } |
76 | 83 | } |
77 | | |
78 | 41 | pub fn new_with_take(file: Take<FileSlot>, path: PathBuf, is_write: bool) -> Self { |
79 | 41 | Self { |
80 | 41 | maybe_file_slot: MaybeFileSlot::Open(file), |
81 | 41 | path, |
82 | 41 | is_write, |
83 | 41 | } |
84 | 41 | } |
85 | | |
86 | | /// Returns the path of the file. |
87 | 5 | pub fn get_path(&self) -> &Path { |
88 | 5 | Path::new(&self.path) |
89 | 5 | } |
90 | | |
91 | | /// Returns the current read position of a file. |
92 | 2 | pub async fn stream_position(&mut self) -> Result<u64, Error> { |
93 | 2 | let file_slot = match &mut self.maybe_file_slot { |
94 | 2 | MaybeFileSlot::Open(file_slot) => file_slot, |
95 | 0 | MaybeFileSlot::Closed((pos, _)) => return Ok(*pos), |
96 | | }; |
97 | 2 | file_slot |
98 | 2 | .get_mut() |
99 | 2 | .inner |
100 | 2 | .stream_position() |
101 | 2 | .await |
102 | 2 | .err_tip(|| "Failed to get file position in digest_for_file"0 ) |
103 | 2 | } |
104 | | |
105 | 8 | pub async fn close_file(&mut self) -> Result<(), Error> { |
106 | 8 | let MaybeFileSlot::Open(file_slot) = &mut self.maybe_file_slot else { Branch (106:13): [True: 1, False: 0]
Branch (106:13): [Folded - Ignored]
Branch (106:13): [True: 0, False: 0]
Branch (106:13): [True: 0, False: 0]
Branch (106:13): [True: 0, False: 0]
Branch (106:13): [True: 0, False: 0]
Branch (106:13): [True: 0, False: 0]
Branch (106:13): [True: 0, False: 0]
Branch (106:13): [True: 0, False: 0]
Branch (106:13): [True: 0, False: 0]
Branch (106:13): [True: 7, False: 0]
Branch (106:13): [True: 0, False: 0]
Branch (106:13): [True: 0, False: 0]
Branch (106:13): [Folded - Ignored]
Branch (106:13): [True: 0, False: 0]
Branch (106:13): [True: 0, False: 0]
Branch (106:13): [True: 0, False: 0]
Branch (106:13): [True: 0, False: 0]
Branch (106:13): [True: 0, False: 0]
Branch (106:13): [True: 0, False: 0]
Branch (106:13): [True: 0, False: 0]
|
107 | 0 | return Ok(()); |
108 | | }; |
109 | 8 | let position = file_slot |
110 | 8 | .get_mut() |
111 | 8 | .inner |
112 | 8 | .stream_position() |
113 | 8 | .await |
114 | 8 | .err_tip(|| format!("Failed to get file position {:?}", self.path)0 )?0 ; |
115 | 8 | self.maybe_file_slot = MaybeFileSlot::Closed((position, file_slot.limit())); |
116 | 8 | Ok(()) |
117 | 8 | } |
118 | | |
119 | | #[inline] |
120 | 1.36k | pub async fn as_reader(&mut self) -> Result<&mut Take<FileSlot>, Error> { |
121 | 1.36k | let (stream_position, bytes_remaining7 ) = match self.maybe_file_slot { |
122 | 1.35k | MaybeFileSlot::Open(ref mut file_slot) => return Ok(file_slot), |
123 | 7 | MaybeFileSlot::Closed(pos) => pos, |
124 | | }; |
125 | 7 | let permit = OPEN_FILE_SEMAPHORE |
126 | 7 | .acquire() |
127 | 7 | .await |
128 | 7 | .map_err(|e| make_err!(Code::Internal, "Open file semaphore closed {:?}", e)0 )?0 ; |
129 | 7 | let inner = tokio::fs::OpenOptions::new() |
130 | 7 | .write(self.is_write) |
131 | 7 | .read(!self.is_write) |
132 | 7 | .open(&self.path) |
133 | 7 | .await |
134 | 7 | .err_tip(|| format!("Could not open after resume {:?}", self.path)0 )?0 ; |
135 | 7 | let mut file_slot = FileSlot { |
136 | 7 | _permit: permit, |
137 | 7 | inner, |
138 | 7 | }; |
139 | 7 | file_slot |
140 | 7 | .inner |
141 | 7 | .seek(SeekFrom::Start(stream_position)) |
142 | 7 | .await |
143 | 7 | .err_tip(|| { |
144 | 0 | format!( |
145 | 0 | "Failed to seek to position {stream_position} {:?}", |
146 | 0 | self.path |
147 | 0 | ) |
148 | 7 | })?0 ; |
149 | | |
150 | 7 | self.maybe_file_slot = MaybeFileSlot::Open(file_slot.take(bytes_remaining)); |
151 | 7 | match &mut self.maybe_file_slot { |
152 | 7 | MaybeFileSlot::Open(file_slot) => Ok(file_slot), |
153 | 0 | MaybeFileSlot::Closed(_) => unreachable!(), |
154 | | } |
155 | 1.36k | } |
156 | | |
157 | | #[inline] |
158 | 149 | pub async fn as_writer(&mut self) -> Result<&mut FileSlot, Error> { |
159 | 149 | Ok(self.as_reader().await?0 .get_mut()) |
160 | 149 | } |
161 | | |
162 | | /// Utility function to read data from a handler and handles file descriptor |
163 | | /// timeouts. Chunk size is based on the `buf`'s capacity. |
164 | | /// Note: If the `handler` changes `buf`s capacity, it is responsible for reserving |
165 | | /// more before returning. |
166 | 4 | pub async fn read_buf_cb<'b, T, F, Fut>( |
167 | 4 | &'b mut self, |
168 | 4 | (mut buf, mut state): (BytesMut, T), |
169 | 4 | mut handler: F, |
170 | 4 | ) -> Result<(BytesMut, T), Error> |
171 | 4 | where |
172 | 4 | F: (FnMut((BytesMut, T)) -> Fut) + 'b, |
173 | 4 | Fut: Future<Output = Result<(BytesMut, T), Error>> + 'b, |
174 | 4 | { |
175 | | loop { |
176 | 71 | buf.clear(); |
177 | 71 | self.as_reader() |
178 | 71 | .await |
179 | 71 | .err_tip(|| "Could not get reader from file slot in read_buf_cb"0 )?0 |
180 | 71 | .read_buf(&mut buf) |
181 | 71 | .await |
182 | 71 | .err_tip(|| "Could not read chunk during read_buf_cb"0 )?0 ; |
183 | 71 | if buf.is_empty() { Branch (183:16): [True: 3, False: 3]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [Folded - Ignored]
Branch (183:16): [True: 1, False: 64]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [Folded - Ignored]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
Branch (183:16): [True: 0, False: 0]
|
184 | 4 | return Ok((buf, state)); |
185 | 67 | } |
186 | 67 | let handler_fut = handler((buf, state)); |
187 | 67 | tokio::pin!(handler_fut); |
188 | | loop { |
189 | 67 | match timeout(idle_file_descriptor_timeout(), &mut handler_fut).await { |
190 | 67 | Ok(Ok(output)) => { |
191 | 67 | (buf, state) = output; |
192 | 67 | break; |
193 | | } |
194 | 0 | Ok(Err(err)) => { |
195 | 0 | return Err(err).err_tip(|| "read_buf_cb's handler returned an error") |
196 | | } |
197 | | Err(_) => { |
198 | 0 | self.close_file() |
199 | 0 | .await |
200 | 0 | .err_tip(|| "Could not close file due to timeout in read_buf_cb")?; |
201 | 0 | continue; |
202 | | } |
203 | | } |
204 | | } |
205 | | } |
206 | 4 | } |
207 | | } |
208 | | |
209 | | #[derive(Debug)] |
210 | | pub struct FileSlot { |
211 | | // We hold the permit because once it is dropped it goes back into the queue. |
212 | | _permit: SemaphorePermit<'static>, |
213 | | inner: tokio::fs::File, |
214 | | } |
215 | | |
216 | | impl AsRef<tokio::fs::File> for FileSlot { |
217 | 78 | fn as_ref(&self) -> &tokio::fs::File { |
218 | 78 | &self.inner |
219 | 78 | } |
220 | | } |
221 | | |
222 | | impl AsMut<tokio::fs::File> for FileSlot { |
223 | 7 | fn as_mut(&mut self) -> &mut tokio::fs::File { |
224 | 7 | &mut self.inner |
225 | 7 | } |
226 | | } |
227 | | |
228 | | impl AsyncRead for FileSlot { |
229 | 2.39k | fn poll_read( |
230 | 2.39k | mut self: Pin<&mut Self>, |
231 | 2.39k | cx: &mut Context<'_>, |
232 | 2.39k | buf: &mut ReadBuf<'_>, |
233 | 2.39k | ) -> Poll<Result<(), tokio::io::Error>> { |
234 | 2.39k | Pin::new(&mut self.inner).poll_read(cx, buf) |
235 | 2.39k | } |
236 | | } |
237 | | |
238 | | impl AsyncSeek for FileSlot { |
239 | 37 | fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> Result<(), tokio::io::Error> { |
240 | 37 | Pin::new(&mut self.inner).start_seek(position) |
241 | 37 | } |
242 | | |
243 | 112 | fn poll_complete( |
244 | 112 | mut self: Pin<&mut Self>, |
245 | 112 | cx: &mut Context<'_>, |
246 | 112 | ) -> Poll<Result<u64, tokio::io::Error>> { |
247 | 112 | Pin::new(&mut self.inner).poll_complete(cx) |
248 | 112 | } |
249 | | } |
250 | | |
251 | | impl AsyncWrite for FileSlot { |
252 | 8 | fn poll_write( |
253 | 8 | mut self: Pin<&mut Self>, |
254 | 8 | cx: &mut Context<'_>, |
255 | 8 | buf: &[u8], |
256 | 8 | ) -> Poll<Result<usize, tokio::io::Error>> { |
257 | 8 | Pin::new(&mut self.inner).poll_write(cx, buf) |
258 | 8 | } |
259 | | |
260 | 0 | fn poll_flush( |
261 | 0 | mut self: Pin<&mut Self>, |
262 | 0 | cx: &mut Context<'_>, |
263 | 0 | ) -> Poll<Result<(), tokio::io::Error>> { |
264 | 0 | Pin::new(&mut self.inner).poll_flush(cx) |
265 | 0 | } |
266 | | |
267 | 0 | fn poll_shutdown( |
268 | 0 | mut self: Pin<&mut Self>, |
269 | 0 | cx: &mut Context<'_>, |
270 | 0 | ) -> Poll<Result<(), tokio::io::Error>> { |
271 | 0 | Pin::new(&mut self.inner).poll_shutdown(cx) |
272 | 0 | } |
273 | | |
274 | 58 | fn poll_write_vectored( |
275 | 58 | mut self: Pin<&mut Self>, |
276 | 58 | cx: &mut Context<'_>, |
277 | 58 | bufs: &[IoSlice<'_>], |
278 | 58 | ) -> Poll<Result<usize, tokio::io::Error>> { |
279 | 58 | Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) |
280 | 58 | } |
281 | | |
282 | 58 | fn is_write_vectored(&self) -> bool { |
283 | 58 | self.inner.is_write_vectored() |
284 | 58 | } |
285 | | } |
286 | | |
287 | | const DEFAULT_OPEN_FILE_PERMITS: usize = 10; |
288 | | static TOTAL_FILE_SEMAPHORES: AtomicUsize = AtomicUsize::new(DEFAULT_OPEN_FILE_PERMITS); |
289 | | pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FILE_PERMITS); |
290 | | |
291 | | /// Try to acquire a permit from the open file semaphore. |
292 | | #[inline] |
293 | 609 | pub async fn get_permit() -> Result<SemaphorePermit<'static>, Error> { |
294 | 609 | OPEN_FILE_SEMAPHORE |
295 | 609 | .acquire() |
296 | 609 | .await |
297 | 609 | .map_err(|e| make_err!(Code::Internal, "Open file semaphore closed {:?}", e)0 ) |
298 | 609 | } |
299 | | /// Acquire a permit from the open file semaphore and call a raw function. |
300 | | #[inline] |
301 | 591 | pub async fn call_with_permit<F, T>(f: F) -> Result<T, Error> |
302 | 591 | where |
303 | 591 | F: FnOnce(SemaphorePermit<'static>) -> Result<T, Error> + Send + 'static, |
304 | 591 | T: Send + 'static, |
305 | 591 | { |
306 | 591 | let permit = get_permit().await?0 ; |
307 | 591 | spawn_blocking!("fs_call_with_permit", move || f(permit)590 ) |
308 | 591 | .await |
309 | 588 | .unwrap_or_else(|e| Err(make_err!(Code::Internal, "background task failed: {e:?}"))0 ) |
310 | 588 | } |
311 | | |
312 | 0 | pub fn set_open_file_limit(limit: usize) { |
313 | 0 | let current_total = TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire); |
314 | 0 | if limit < current_total { Branch (314:8): [True: 0, False: 0]
Branch (314:8): [Folded - Ignored]
|
315 | 0 | event!( |
316 | 0 | Level::ERROR, |
317 | 0 | "set_open_file_limit({}) must be greater than {}", |
318 | | limit, |
319 | | current_total |
320 | | ); |
321 | 0 | return; |
322 | 0 | } |
323 | 0 | TOTAL_FILE_SEMAPHORES.fetch_add(limit - current_total, Ordering::Release); |
324 | 0 | OPEN_FILE_SEMAPHORE.add_permits(limit - current_total); |
325 | 0 | } |
326 | | |
327 | 14 | pub fn get_open_files_for_test() -> usize { |
328 | 14 | TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire) - OPEN_FILE_SEMAPHORE.available_permits() |
329 | 14 | } |
330 | | |
331 | | /// How long a file descriptor can be open without being used before it is closed. |
332 | | static IDLE_FILE_DESCRIPTOR_TIMEOUT: OnceLock<Duration> = OnceLock::new(); |
333 | | |
334 | 1.27k | pub fn idle_file_descriptor_timeout() -> Duration { |
335 | 1.27k | *IDLE_FILE_DESCRIPTOR_TIMEOUT.get_or_init(|| Duration::MAX0 ) |
336 | 1.27k | } |
337 | | |
338 | | /// Set the idle file descriptor timeout. This is the amount of time |
339 | | /// a file descriptor can be open without being used before it is closed. |
340 | 301 | pub fn set_idle_file_descriptor_timeout(timeout: Duration) -> Result<(), Error> { |
341 | 301 | IDLE_FILE_DESCRIPTOR_TIMEOUT |
342 | 301 | .set(timeout) |
343 | 301 | .map_err(|_| make_err!(Code::Internal, "idle_file_descriptor_timeout already set")263 ) |
344 | 301 | } |
345 | | |
346 | 41 | pub async fn open_file(path: impl AsRef<Path>, limit: u64) -> Result<ResumeableFileSlot, Error> { |
347 | 41 | let path = path.as_ref().to_owned(); |
348 | 41 | let (permit, os_file, path) = call_with_permit(move |permit| { |
349 | 41 | Ok(( |
350 | 41 | permit, |
351 | 41 | std::fs::File::open(&path).err_tip(|| format!("Could not open {path:?}")0 )?0 , |
352 | 41 | path, |
353 | | )) |
354 | 41 | }) |
355 | 41 | .await?0 ; |
356 | 41 | Ok(ResumeableFileSlot::new_with_take( |
357 | 41 | FileSlot { |
358 | 41 | _permit: permit, |
359 | 41 | inner: tokio::fs::File::from_std(os_file), |
360 | 41 | } |
361 | 41 | .take(limit), |
362 | 41 | path, |
363 | 41 | false, /* is_write */ |
364 | 41 | )) |
365 | 41 | } |
366 | | |
367 | 83 | pub async fn create_file(path: impl AsRef<Path>) -> Result<ResumeableFileSlot, Error> { |
368 | 83 | let path = path.as_ref().to_owned(); |
369 | 83 | let (permit, os_file, path) = call_with_permit(move |permit| { |
370 | 83 | Ok(( |
371 | 83 | permit, |
372 | 83 | std::fs::File::options() |
373 | 83 | .read(true) |
374 | 83 | .write(true) |
375 | 83 | .create(true) |
376 | 83 | .truncate(true) |
377 | 83 | .open(&path) |
378 | 83 | .err_tip(|| format!("Could not open {path:?}")0 )?0 , |
379 | 83 | path, |
380 | | )) |
381 | 83 | }) |
382 | 83 | .await?0 ; |
383 | 83 | Ok(ResumeableFileSlot::new( |
384 | 83 | FileSlot { |
385 | 83 | _permit: permit, |
386 | 83 | inner: tokio::fs::File::from_std(os_file), |
387 | 83 | }, |
388 | 83 | path, |
389 | 83 | true, /* is_write */ |
390 | 83 | )) |
391 | 83 | } |
392 | | |
393 | 4 | pub async fn hard_link(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> { |
394 | 4 | let src = src.as_ref().to_owned(); |
395 | 4 | let dst = dst.as_ref().to_owned(); |
396 | 4 | call_with_permit(move |_| std::fs::hard_link(src, dst).map_err(Into::<Error>::into)).await |
397 | 4 | } |
398 | | |
399 | 1 | pub async fn set_permissions( |
400 | 1 | src: impl AsRef<Path>, |
401 | 1 | perm: std::fs::Permissions, |
402 | 1 | ) -> Result<(), Error> { |
403 | 1 | let src = src.as_ref().to_owned(); |
404 | 1 | call_with_permit(move |_| std::fs::set_permissions(src, perm).map_err(Into::<Error>::into)) |
405 | 1 | .await |
406 | 1 | } |
407 | | |
408 | 20 | pub async fn create_dir(path: impl AsRef<Path>) -> Result<(), Error> { |
409 | 20 | let path = path.as_ref().to_owned(); |
410 | 20 | call_with_permit(move |_| std::fs::create_dir(path).map_err(Into::<Error>::into)).await |
411 | 20 | } |
412 | | |
413 | 177 | pub async fn create_dir_all(path: impl AsRef<Path>) -> Result<(), Error> { |
414 | 177 | let path = path.as_ref().to_owned(); |
415 | 177 | call_with_permit(move |_| std::fs::create_dir_all(path).map_err(Into::<Error>::into)).await |
416 | 177 | } |
417 | | |
418 | | #[cfg(target_family = "unix")] |
419 | 1 | pub async fn symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> { |
420 | 1 | let src = src.as_ref().to_owned(); |
421 | 1 | let dst = dst.as_ref().to_owned(); |
422 | 1 | call_with_permit(move |_| { |
423 | 1 | tokio::runtime::Handle::current() |
424 | 1 | .block_on(tokio::fs::symlink(src, dst)) |
425 | 1 | .map_err(Into::<Error>::into) |
426 | 1 | }) |
427 | 1 | .await |
428 | 1 | } |
429 | | |
430 | 0 | pub async fn read_link(path: impl AsRef<Path>) -> Result<std::path::PathBuf, Error> { |
431 | 0 | let path = path.as_ref().to_owned(); |
432 | 0 | call_with_permit(move |_| std::fs::read_link(path).map_err(Into::<Error>::into)).await |
433 | 0 | } |
434 | | |
435 | | pub struct ReadDir { |
436 | | // We hold the permit because once it is dropped it goes back into the queue. |
437 | | permit: SemaphorePermit<'static>, |
438 | | inner: tokio::fs::ReadDir, |
439 | | } |
440 | | |
441 | | impl ReadDir { |
442 | 202 | pub fn into_inner(self) -> (SemaphorePermit<'static>, tokio::fs::ReadDir) { |
443 | 202 | (self.permit, self.inner) |
444 | 202 | } |
445 | | } |
446 | | |
447 | | impl AsRef<tokio::fs::ReadDir> for ReadDir { |
448 | 0 | fn as_ref(&self) -> &tokio::fs::ReadDir { |
449 | 0 | &self.inner |
450 | 0 | } |
451 | | } |
452 | | |
453 | | impl AsMut<tokio::fs::ReadDir> for ReadDir { |
454 | 2 | fn as_mut(&mut self) -> &mut tokio::fs::ReadDir { |
455 | 2 | &mut self.inner |
456 | 2 | } |
457 | | } |
458 | | |
459 | 204 | pub async fn read_dir(path: impl AsRef<Path>) -> Result<ReadDir, Error> { |
460 | 204 | let path = path.as_ref().to_owned(); |
461 | 204 | let (permit, inner) = call_with_permit(move |permit| { |
462 | 204 | Ok(( |
463 | 204 | permit, |
464 | 204 | tokio::runtime::Handle::current() |
465 | 204 | .block_on(tokio::fs::read_dir(path)) |
466 | 204 | .map_err(Into::<Error>::into)?0 , |
467 | | )) |
468 | 204 | }) |
469 | 204 | .await?0 ; |
470 | 204 | Ok(ReadDir { permit, inner }) |
471 | 204 | } |
472 | | |
473 | 17 | pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> Result<(), Error> { |
474 | 17 | let from = from.as_ref().to_owned(); |
475 | 17 | let to = to.as_ref().to_owned(); |
476 | 17 | call_with_permit(move |_| std::fs::rename(from, to).map_err(Into::<Error>::into)).await |
477 | 17 | } |
478 | | |
479 | 15 | pub async fn remove_file(path: impl AsRef<Path>) -> Result<(), Error> { |
480 | 15 | let path = path.as_ref().to_owned(); |
481 | 15 | call_with_permit(move |_| std::fs::remove_file(path).map_err(Into::<Error>::into)14 ).await |
482 | 12 | } |
483 | | |
484 | 2 | pub async fn canonicalize(path: impl AsRef<Path>) -> Result<PathBuf, Error> { |
485 | 2 | let path = path.as_ref().to_owned(); |
486 | 2 | call_with_permit(move |_| std::fs::canonicalize(path).map_err(Into::<Error>::into)).await |
487 | 2 | } |
488 | | |
489 | 10 | pub async fn metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> { |
490 | 10 | let path = path.as_ref().to_owned(); |
491 | 10 | call_with_permit(move |_| std::fs::metadata(path).map_err(Into::<Error>::into)).await |
492 | 10 | } |
493 | | |
494 | 4 | pub async fn read(path: impl AsRef<Path>) -> Result<Vec<u8>, Error> { |
495 | 4 | let path = path.as_ref().to_owned(); |
496 | 4 | call_with_permit(move |_| std::fs::read(path).map_err(Into::<Error>::into)).await |
497 | 4 | } |
498 | | |
499 | 3 | pub async fn symlink_metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> { |
500 | 3 | let path = path.as_ref().to_owned(); |
501 | 3 | call_with_permit(move |_| std::fs::symlink_metadata(path).map_err(Into::<Error>::into)).await |
502 | 3 | } |
503 | | |
504 | 9 | pub async fn remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> { |
505 | 9 | let path = path.as_ref().to_owned(); |
506 | 9 | call_with_permit(move |_| std::fs::remove_dir_all(path).map_err(Into::<Error>::into)).await |
507 | 9 | } |