diff --git a/Cargo.lock b/Cargo.lock index 453052d0..2476f6a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4693,12 +4693,13 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.13" +version = "0.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" +checksum = "6b9590b93e6fcc1739458317cccd391ad3955e2bde8913edf6f95f9e65a8f034" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -5040,6 +5041,7 @@ dependencies = [ "syslog", "time", "tokio", + "tokio-util", "totp-lite", "tracing", "url", diff --git a/Cargo.toml b/Cargo.toml index 3e80d787..c78443cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ dashmap = "6.1.0" # Async futures futures = "0.3.31" tokio = { version = "1.43.0", features = ["rt-multi-thread", "fs", "io-util", "parking_lot", "time", "signal", "net"] } +tokio-util = { version = "0.7.14", features = ["compat"]} # A generic serialization/deserialization framework serde = { version = "1.0.218", features = ["derive"] } diff --git a/src/api/core/ciphers.rs b/src/api/core/ciphers.rs index 24304439..24842046 100644 --- a/src/api/core/ciphers.rs +++ b/src/api/core/ciphers.rs @@ -1265,7 +1265,7 @@ async fn save_attachment( attachment.save(&mut conn).await.expect("Error saving attachment"); } - save_temp_file(PathType::Attachments, &format!("{cipher_id}/{file_id}"), data.data).await?; + save_temp_file(PathType::Attachments, &format!("{cipher_id}/{file_id}"), data.data, true).await?; nt.send_cipher_update( UpdateType::SyncCipherUpdate, diff --git a/src/api/core/sends.rs b/src/api/core/sends.rs index e2beeba2..19df42c1 100644 --- a/src/api/core/sends.rs +++ b/src/api/core/sends.rs @@ -1,3 +1,4 @@ +use std::error::Error as _; use std::path::Path; use std::time::Duration; @@ -253,7 +254,7 @@ async fn post_send_file(data: Form>, headers: Headers, mut conn: let file_id = crate::crypto::generate_send_file_id(); - save_temp_file(PathType::Sends, &format!("{}/{file_id}", send.uuid), data).await?; + save_temp_file(PathType::Sends, &format!("{}/{file_id}", send.uuid), data, true).await?; let mut data_value: Value = serde_json::from_str(&send.data)?; if let Some(o) = data_value.as_object_mut() { @@ -403,20 +404,29 @@ async fn post_send_file_v2_data( err!("Send file size does not match.", format!("Expected a file size of {} got {size}", send_data.size)); } - let operator = CONFIG.opendal_operator_for_path_type(PathType::Sends)?; let file_path = format!("{send_id}/{file_id}"); - // Check if the file already exists, if that is the case do not overwrite it - if operator.exists(&file_path).await.map_err(|e| { + save_temp_file(PathType::Sends, &file_path, data.data, false).await.map_err(|e| { + let was_file_exists_error = e + .source() + .and_then(|e| e.downcast_ref::()) + .and_then(|e| e.get_ref()) + .and_then(|e| e.downcast_ref::()) + .map(|e| e.kind() == opendal::ErrorKind::ConditionNotMatch) + .unwrap_or(false); + + if was_file_exists_error { + return crate::Error::new( + "Send file has already been uploaded.", + format!("File {file_path:?} already exists"), + ); + } + crate::Error::new( "Unexpected error while creating send file", - format!("Error while checking existence of send file at path {file_path}: {e:?}"), + format!("Error while saving send file at path {file_path}: {e:?}"), ) - })? { - err!("Send file has already been uploaded.", format!("File {file_path:?} already exists")) - } - - save_temp_file(PathType::Sends, &file_path, data.data).await?; + })?; nt.send_send_update( UpdateType::SyncSendCreate, @@ -565,12 +575,7 @@ async fn download_url(host: &Host, send_id: &SendId, file_id: &SendFileId) -> Re Ok(format!("{}/api/sends/{}/{}?t={}", &host.host, send_id, file_id, token)) } else { - Ok(operator - .presign_read(&format!("{send_id}/{file_id}"), Duration::from_secs(5 * 60)) - .await - .map_err(Into::::into)? - .uri() - .to_string()) + Ok(operator.presign_read(&format!("{send_id}/{file_id}"), Duration::from_secs(5 * 60)).await?.uri().to_string()) } } diff --git a/src/api/icons.rs b/src/api/icons.rs index 850ebadb..187c635c 100644 --- a/src/api/icons.rs +++ b/src/api/icons.rs @@ -232,7 +232,7 @@ async fn icon_is_negcached(path: &str) -> bool { Ok(true) => { match CONFIG.opendal_operator_for_path_type(PathType::IconCache) { Ok(operator) => { - if let Err(e) = operator.delete_iter([miss_indicator]).await { + if let Err(e) = operator.delete(&miss_indicator).await { error!("Could not remove negative cache indicator for icon {:?}: {:?}", path, e); } } diff --git a/src/config.rs b/src/config.rs index ebbd7a7f..d0fe9029 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1178,7 +1178,7 @@ fn opendal_operator_for_path(path: &str) -> Result { opendal_s3_operator_for_path(path)? } else { let builder = opendal::services::Fs::default().root(path); - opendal::Operator::new(builder).map_err(Into::::into)?.finish() + opendal::Operator::new(builder)?.finish() }; operators_by_path.insert(path.to_string(), operator.clone()); @@ -1227,11 +1227,12 @@ fn opendal_s3_operator_for_path(path: &str) -> Result let builder = opendal::services::S3::default() .customized_credential_load(Box::new(OPEN_DAL_S3_CREDENTIAL_LOADER)) + .enable_virtual_host_style() .bucket(bucket) .root(url.path()) .default_storage_class("INTELLIGENT_TIERING"); - Ok(opendal::Operator::new(builder).map_err(Into::::into)?.finish()) + Ok(opendal::Operator::new(builder)?.finish()) } pub enum PathType { diff --git a/src/db/models/attachment.rs b/src/db/models/attachment.rs index 6496fd9c..68a67565 100644 --- a/src/db/models/attachment.rs +++ b/src/db/models/attachment.rs @@ -51,12 +51,7 @@ impl Attachment { let token = encode_jwt(&generate_file_download_claims(self.cipher_uuid.clone(), self.id.clone())); Ok(format!("{}/attachments/{}/{}?token={}", host, self.cipher_uuid, self.id, token)) } else { - Ok(operator - .presign_read(&self.get_file_path(), Duration::from_secs(5 * 60)) - .await - .map_err(Into::::into)? - .uri() - .to_string()) + Ok(operator.presign_read(&self.get_file_path(), Duration::from_secs(5 * 60)).await?.uri().to_string()) } } @@ -126,7 +121,7 @@ impl Attachment { let operator = CONFIG.opendal_operator_for_path_type(PathType::Attachments)?; let file_path = self.get_file_path(); - if let Err(e) = operator.delete_iter([file_path.clone()]).await { + if let Err(e) = operator.delete(&file_path).await { if e.kind() == opendal::ErrorKind::NotFound { debug!("File '{file_path}' already deleted."); } else { diff --git a/src/util.rs b/src/util.rs index 491490af..8dc48621 100644 --- a/src/util.rs +++ b/src/util.rs @@ -817,24 +817,22 @@ pub fn is_global(ip: std::net::IpAddr) -> bool { } /// Saves a Rocket temporary file to the OpenDAL Operator at the given path. -/// -/// Ideally we would stream the Rocket TempFile directly to the OpenDAL -/// Operator, but Tempfile exposes a tokio ASyncBufRead trait, which OpenDAL -/// does not support. This could be reworked in the future to read and write -/// chunks to reduce copy overhead. pub async fn save_temp_file( path_type: PathType, path: &str, temp_file: rocket::fs::TempFile<'_>, + overwrite: bool, ) -> Result<(), crate::Error> { - use tokio::io::AsyncReadExt as _; + use futures::AsyncWriteExt as _; + use tokio_util::compat::TokioAsyncReadCompatExt as _; let operator = CONFIG.opendal_operator_for_path_type(path_type)?; - let mut read_stream = temp_file.open().await?; - let mut buf = Vec::with_capacity(temp_file.len() as usize); - read_stream.read_to_end(&mut buf).await?; - operator.write(path, buf).await?; + let mut read_stream = temp_file.open().await?.compat(); + let mut writer = operator.writer_with(path).if_not_exists(!overwrite).await?.into_futures_async_write(); + futures::io::copy(&mut read_stream, &mut writer).await?; + writer.close().await?; + Ok(()) }