mirror of
https://github.com/dani-garcia/vaultwarden.git
synced 2025-04-01 02:42:49 -05:00
PR improvements
This commit is contained in:
parent
d668402c8e
commit
53d7713d62
8 changed files with 41 additions and 39 deletions
6
Cargo.lock
generated
6
Cargo.lock
generated
|
@ -4693,12 +4693,13 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "tokio-util"
|
||||
version = "0.7.13"
|
||||
version = "0.7.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078"
|
||||
checksum = "6b9590b93e6fcc1739458317cccd391ad3955e2bde8913edf6f95f9e65a8f034"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"futures-sink",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
|
@ -5040,6 +5041,7 @@ dependencies = [
|
|||
"syslog",
|
||||
"time",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"totp-lite",
|
||||
"tracing",
|
||||
"url",
|
||||
|
|
|
@ -73,6 +73,7 @@ dashmap = "6.1.0"
|
|||
# Async futures
|
||||
futures = "0.3.31"
|
||||
tokio = { version = "1.43.0", features = ["rt-multi-thread", "fs", "io-util", "parking_lot", "time", "signal", "net"] }
|
||||
tokio-util = { version = "0.7.14", features = ["compat"]}
|
||||
|
||||
# A generic serialization/deserialization framework
|
||||
serde = { version = "1.0.218", features = ["derive"] }
|
||||
|
|
|
@ -1265,7 +1265,7 @@ async fn save_attachment(
|
|||
attachment.save(&mut conn).await.expect("Error saving attachment");
|
||||
}
|
||||
|
||||
save_temp_file(PathType::Attachments, &format!("{cipher_id}/{file_id}"), data.data).await?;
|
||||
save_temp_file(PathType::Attachments, &format!("{cipher_id}/{file_id}"), data.data, true).await?;
|
||||
|
||||
nt.send_cipher_update(
|
||||
UpdateType::SyncCipherUpdate,
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use std::error::Error as _;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
|
@ -253,7 +254,7 @@ async fn post_send_file(data: Form<UploadData<'_>>, headers: Headers, mut conn:
|
|||
|
||||
let file_id = crate::crypto::generate_send_file_id();
|
||||
|
||||
save_temp_file(PathType::Sends, &format!("{}/{file_id}", send.uuid), data).await?;
|
||||
save_temp_file(PathType::Sends, &format!("{}/{file_id}", send.uuid), data, true).await?;
|
||||
|
||||
let mut data_value: Value = serde_json::from_str(&send.data)?;
|
||||
if let Some(o) = data_value.as_object_mut() {
|
||||
|
@ -403,20 +404,29 @@ async fn post_send_file_v2_data(
|
|||
err!("Send file size does not match.", format!("Expected a file size of {} got {size}", send_data.size));
|
||||
}
|
||||
|
||||
let operator = CONFIG.opendal_operator_for_path_type(PathType::Sends)?;
|
||||
let file_path = format!("{send_id}/{file_id}");
|
||||
|
||||
// Check if the file already exists, if that is the case do not overwrite it
|
||||
if operator.exists(&file_path).await.map_err(|e| {
|
||||
save_temp_file(PathType::Sends, &file_path, data.data, false).await.map_err(|e| {
|
||||
let was_file_exists_error = e
|
||||
.source()
|
||||
.and_then(|e| e.downcast_ref::<std::io::Error>())
|
||||
.and_then(|e| e.get_ref())
|
||||
.and_then(|e| e.downcast_ref::<opendal::Error>())
|
||||
.map(|e| e.kind() == opendal::ErrorKind::ConditionNotMatch)
|
||||
.unwrap_or(false);
|
||||
|
||||
if was_file_exists_error {
|
||||
return crate::Error::new(
|
||||
"Send file has already been uploaded.",
|
||||
format!("File {file_path:?} already exists"),
|
||||
);
|
||||
}
|
||||
|
||||
crate::Error::new(
|
||||
"Unexpected error while creating send file",
|
||||
format!("Error while checking existence of send file at path {file_path}: {e:?}"),
|
||||
format!("Error while saving send file at path {file_path}: {e:?}"),
|
||||
)
|
||||
})? {
|
||||
err!("Send file has already been uploaded.", format!("File {file_path:?} already exists"))
|
||||
}
|
||||
|
||||
save_temp_file(PathType::Sends, &file_path, data.data).await?;
|
||||
})?;
|
||||
|
||||
nt.send_send_update(
|
||||
UpdateType::SyncSendCreate,
|
||||
|
@ -565,12 +575,7 @@ async fn download_url(host: &Host, send_id: &SendId, file_id: &SendFileId) -> Re
|
|||
|
||||
Ok(format!("{}/api/sends/{}/{}?t={}", &host.host, send_id, file_id, token))
|
||||
} else {
|
||||
Ok(operator
|
||||
.presign_read(&format!("{send_id}/{file_id}"), Duration::from_secs(5 * 60))
|
||||
.await
|
||||
.map_err(Into::<crate::Error>::into)?
|
||||
.uri()
|
||||
.to_string())
|
||||
Ok(operator.presign_read(&format!("{send_id}/{file_id}"), Duration::from_secs(5 * 60)).await?.uri().to_string())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -232,7 +232,7 @@ async fn icon_is_negcached(path: &str) -> bool {
|
|||
Ok(true) => {
|
||||
match CONFIG.opendal_operator_for_path_type(PathType::IconCache) {
|
||||
Ok(operator) => {
|
||||
if let Err(e) = operator.delete_iter([miss_indicator]).await {
|
||||
if let Err(e) = operator.delete(&miss_indicator).await {
|
||||
error!("Could not remove negative cache indicator for icon {:?}: {:?}", path, e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1178,7 +1178,7 @@ fn opendal_operator_for_path(path: &str) -> Result<opendal::Operator, Error> {
|
|||
opendal_s3_operator_for_path(path)?
|
||||
} else {
|
||||
let builder = opendal::services::Fs::default().root(path);
|
||||
opendal::Operator::new(builder).map_err(Into::<Error>::into)?.finish()
|
||||
opendal::Operator::new(builder)?.finish()
|
||||
};
|
||||
|
||||
operators_by_path.insert(path.to_string(), operator.clone());
|
||||
|
@ -1227,11 +1227,12 @@ fn opendal_s3_operator_for_path(path: &str) -> Result<opendal::Operator, Error>
|
|||
|
||||
let builder = opendal::services::S3::default()
|
||||
.customized_credential_load(Box::new(OPEN_DAL_S3_CREDENTIAL_LOADER))
|
||||
.enable_virtual_host_style()
|
||||
.bucket(bucket)
|
||||
.root(url.path())
|
||||
.default_storage_class("INTELLIGENT_TIERING");
|
||||
|
||||
Ok(opendal::Operator::new(builder).map_err(Into::<Error>::into)?.finish())
|
||||
Ok(opendal::Operator::new(builder)?.finish())
|
||||
}
|
||||
|
||||
pub enum PathType {
|
||||
|
|
|
@ -51,12 +51,7 @@ impl Attachment {
|
|||
let token = encode_jwt(&generate_file_download_claims(self.cipher_uuid.clone(), self.id.clone()));
|
||||
Ok(format!("{}/attachments/{}/{}?token={}", host, self.cipher_uuid, self.id, token))
|
||||
} else {
|
||||
Ok(operator
|
||||
.presign_read(&self.get_file_path(), Duration::from_secs(5 * 60))
|
||||
.await
|
||||
.map_err(Into::<crate::Error>::into)?
|
||||
.uri()
|
||||
.to_string())
|
||||
Ok(operator.presign_read(&self.get_file_path(), Duration::from_secs(5 * 60)).await?.uri().to_string())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -126,7 +121,7 @@ impl Attachment {
|
|||
let operator = CONFIG.opendal_operator_for_path_type(PathType::Attachments)?;
|
||||
let file_path = self.get_file_path();
|
||||
|
||||
if let Err(e) = operator.delete_iter([file_path.clone()]).await {
|
||||
if let Err(e) = operator.delete(&file_path).await {
|
||||
if e.kind() == opendal::ErrorKind::NotFound {
|
||||
debug!("File '{file_path}' already deleted.");
|
||||
} else {
|
||||
|
|
18
src/util.rs
18
src/util.rs
|
@ -817,24 +817,22 @@ pub fn is_global(ip: std::net::IpAddr) -> bool {
|
|||
}
|
||||
|
||||
/// Saves a Rocket temporary file to the OpenDAL Operator at the given path.
|
||||
///
|
||||
/// Ideally we would stream the Rocket TempFile directly to the OpenDAL
|
||||
/// Operator, but Tempfile exposes a tokio ASyncBufRead trait, which OpenDAL
|
||||
/// does not support. This could be reworked in the future to read and write
|
||||
/// chunks to reduce copy overhead.
|
||||
pub async fn save_temp_file(
|
||||
path_type: PathType,
|
||||
path: &str,
|
||||
temp_file: rocket::fs::TempFile<'_>,
|
||||
overwrite: bool,
|
||||
) -> Result<(), crate::Error> {
|
||||
use tokio::io::AsyncReadExt as _;
|
||||
use futures::AsyncWriteExt as _;
|
||||
use tokio_util::compat::TokioAsyncReadCompatExt as _;
|
||||
|
||||
let operator = CONFIG.opendal_operator_for_path_type(path_type)?;
|
||||
|
||||
let mut read_stream = temp_file.open().await?;
|
||||
let mut buf = Vec::with_capacity(temp_file.len() as usize);
|
||||
read_stream.read_to_end(&mut buf).await?;
|
||||
operator.write(path, buf).await?;
|
||||
let mut read_stream = temp_file.open().await?.compat();
|
||||
let mut writer = operator.writer_with(path).if_not_exists(!overwrite).await?.into_futures_async_write();
|
||||
futures::io::copy(&mut read_stream, &mut writer).await?;
|
||||
writer.close().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue