refactor: Remove BlobObject::create(), use create_and_deduplicate_from_bytes() instead (#6467)

Part of #6332
This commit is contained in:
Hocuri
2025-01-24 20:04:02 +01:00
committed by GitHub
parent 6430977670
commit b0c8d46762
2 changed files with 55 additions and 84 deletions

View File

@@ -48,36 +48,6 @@ enum ImageOutputFormat {
} }
impl<'a> BlobObject<'a> { impl<'a> BlobObject<'a> {
/// Creates a new blob object with a unique name.
///
/// Creates a new file in the blob directory. The name will be
/// derived from the platform-agnostic basename of the suggested
/// name, followed by a random number and followed by a possible
/// extension. The `data` will be written into the file without
/// race-conditions.
pub async fn create(
context: &'a Context,
suggested_name: &str,
data: &[u8],
) -> Result<BlobObject<'a>> {
let blobdir = context.get_blobdir();
let (stem, ext) = BlobObject::sanitise_name(suggested_name);
let (name, mut file) = BlobObject::create_new_file(context, blobdir, &stem, &ext).await?;
file.write_all(data).await.context("file write failure")?;
// workaround a bug in async-std
// (the executor does not handle blocking operation in Drop correctly,
// see <https://github.com/async-rs/async-std/issues/900>)
let _ = file.flush().await;
let blob = BlobObject {
blobdir,
name: format!("$BLOBDIR/{name}"),
};
context.emit_event(EventType::NewBlobFile(blob.as_name().to_string()));
Ok(blob)
}
/// Creates a new file, returning a tuple of the name and the handle. /// Creates a new file, returning a tuple of the name and the handle.
async fn create_new_file( async fn create_new_file(
context: &Context, context: &Context,
@@ -115,8 +85,8 @@ impl<'a> BlobObject<'a> {
/// Creates a new blob object with unique name by copying an existing file. /// Creates a new blob object with unique name by copying an existing file.
/// ///
/// This creates a new blob as described in [BlobObject::create] /// This creates a new blob
/// but also copies an existing file into it. This is done in a /// and copies an existing file into it. This is done in a
/// in way which avoids race-conditions when multiple files are /// in way which avoids race-conditions when multiple files are
/// concurrently created. /// concurrently created.
pub async fn create_and_copy(context: &'a Context, src: &Path) -> Result<BlobObject<'a>> { pub async fn create_and_copy(context: &'a Context, src: &Path) -> Result<BlobObject<'a>> {
@@ -134,8 +104,8 @@ impl<'a> BlobObject<'a> {
return Err(err).context("failed to copy file"); return Err(err).context("failed to copy file");
} }
// workaround, see create() for details // Ensure that all buffered bytes are written
let _ = dst_file.flush().await; dst_file.flush().await?;
let blob = BlobObject { let blob = BlobObject {
blobdir: context.get_blobdir(), blobdir: context.get_blobdir(),
@@ -158,7 +128,7 @@ impl<'a> BlobObject<'a> {
pub fn create_and_deduplicate( pub fn create_and_deduplicate(
context: &'a Context, context: &'a Context,
src: &Path, src: &Path,
original_name: &str, original_name: &Path,
) -> Result<BlobObject<'a>> { ) -> Result<BlobObject<'a>> {
// `create_and_deduplicate{_from_bytes}()` do blocking I/O, but can still be called // `create_and_deduplicate{_from_bytes}()` do blocking I/O, but can still be called
// from an async context thanks to `block_in_place()`. // from an async context thanks to `block_in_place()`.
@@ -188,10 +158,8 @@ impl<'a> BlobObject<'a> {
let hash = file_hash(src_in_blobdir)?.to_hex(); let hash = file_hash(src_in_blobdir)?.to_hex();
let hash = hash.as_str(); let hash = hash.as_str();
let hash = hash.get(0..31).unwrap_or(hash); let hash = hash.get(0..31).unwrap_or(hash);
let new_file = if let Some(extension) = Path::new(original_name) let new_file =
.extension() if let Some(extension) = original_name.extension().filter(|e| e.len() <= 32) {
.filter(|e| e.len() <= 32)
{
format!( format!(
"$BLOBDIR/{hash}.{}", "$BLOBDIR/{hash}.{}",
extension.to_string_lossy().to_lowercase() extension.to_string_lossy().to_lowercase()
@@ -238,7 +206,7 @@ impl<'a> BlobObject<'a> {
std::fs::write(&temp_path, data).context("writing new blobfile failed")?; std::fs::write(&temp_path, data).context("writing new blobfile failed")?;
}; };
BlobObject::create_and_deduplicate(context, &temp_path, original_name) BlobObject::create_and_deduplicate(context, &temp_path, Path::new(original_name))
}) })
} }
@@ -902,21 +870,26 @@ mod tests {
}) })
} }
const FILE_BYTES: &[u8] = b"hello";
const FILE_DEDUPLICATED: &str = "ea8f163db38682925e4491c5e58d4bb.txt";
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_create() { async fn test_create() {
let t = TestContext::new().await; let t = TestContext::new().await;
let blob = BlobObject::create(&t, "foo", b"hello").await.unwrap(); let blob =
let fname = t.get_blobdir().join("foo"); BlobObject::create_and_deduplicate_from_bytes(&t, FILE_BYTES, "foo.txt").unwrap();
let fname = t.get_blobdir().join(FILE_DEDUPLICATED);
let data = fs::read(fname).await.unwrap(); let data = fs::read(fname).await.unwrap();
assert_eq!(data, b"hello"); assert_eq!(data, FILE_BYTES);
assert_eq!(blob.as_name(), "$BLOBDIR/foo"); assert_eq!(blob.as_name(), format!("$BLOBDIR/{FILE_DEDUPLICATED}"));
assert_eq!(blob.to_abs_path(), t.get_blobdir().join("foo")); assert_eq!(blob.to_abs_path(), t.get_blobdir().join(FILE_DEDUPLICATED));
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_lowercase_ext() { async fn test_lowercase_ext() {
let t = TestContext::new().await; let t = TestContext::new().await;
let blob = BlobObject::create_and_deduplicate_from_bytes(&t, b"hello", "foo.TXT").unwrap(); let blob =
BlobObject::create_and_deduplicate_from_bytes(&t, FILE_BYTES, "foo.TXT").unwrap();
assert!( assert!(
blob.as_name().ends_with(".txt"), blob.as_name().ends_with(".txt"),
"Blob {blob:?} should end with .txt" "Blob {blob:?} should end with .txt"
@@ -926,70 +899,66 @@ mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_as_file_name() { async fn test_as_file_name() {
let t = TestContext::new().await; let t = TestContext::new().await;
let blob = BlobObject::create_and_deduplicate_from_bytes(&t, b"hello", "foo.txt").unwrap(); let blob =
assert_eq!(blob.as_file_name(), "ea8f163db38682925e4491c5e58d4bb.txt"); BlobObject::create_and_deduplicate_from_bytes(&t, FILE_BYTES, "foo.txt").unwrap();
assert_eq!(blob.as_file_name(), FILE_DEDUPLICATED);
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_as_rel_path() { async fn test_as_rel_path() {
let t = TestContext::new().await; let t = TestContext::new().await;
let blob = BlobObject::create_and_deduplicate_from_bytes(&t, b"hello", "foo.txt").unwrap(); let blob =
assert_eq!( BlobObject::create_and_deduplicate_from_bytes(&t, FILE_BYTES, "foo.txt").unwrap();
blob.as_rel_path(), assert_eq!(blob.as_rel_path(), Path::new(FILE_DEDUPLICATED));
Path::new("ea8f163db38682925e4491c5e58d4bb.txt")
);
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_suffix() { async fn test_suffix() {
let t = TestContext::new().await; let t = TestContext::new().await;
let blob = BlobObject::create(&t, "foo.txt", b"hello").await.unwrap(); let blob =
BlobObject::create_and_deduplicate_from_bytes(&t, FILE_BYTES, "foo.txt").unwrap();
assert_eq!(blob.suffix(), Some("txt")); assert_eq!(blob.suffix(), Some("txt"));
let blob = BlobObject::create(&t, "bar", b"world").await.unwrap(); let blob = BlobObject::create_and_deduplicate_from_bytes(&t, FILE_BYTES, "bar").unwrap();
assert_eq!(blob.suffix(), None); assert_eq!(blob.suffix(), None);
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_create_dup() { async fn test_create_dup() {
let t = TestContext::new().await; let t = TestContext::new().await;
BlobObject::create(&t, "foo.txt", b"hello").await.unwrap(); BlobObject::create_and_deduplicate_from_bytes(&t, FILE_BYTES, "foo.txt").unwrap();
let foo_path = t.get_blobdir().join("foo.txt"); let foo_path = t.get_blobdir().join(FILE_DEDUPLICATED);
assert!(foo_path.exists()); assert!(foo_path.exists());
BlobObject::create(&t, "foo.txt", b"world").await.unwrap(); BlobObject::create_and_deduplicate_from_bytes(&t, b"world", "foo.txt").unwrap();
let mut dir = fs::read_dir(t.get_blobdir()).await.unwrap(); let mut dir = fs::read_dir(t.get_blobdir()).await.unwrap();
while let Ok(Some(dirent)) = dir.next_entry().await { while let Ok(Some(dirent)) = dir.next_entry().await {
let fname = dirent.file_name(); let fname = dirent.file_name();
if fname == foo_path.file_name().unwrap() { if fname == foo_path.file_name().unwrap() {
assert_eq!(fs::read(&foo_path).await.unwrap(), b"hello"); assert_eq!(fs::read(&foo_path).await.unwrap(), FILE_BYTES);
} else { } else {
let name = fname.to_str().unwrap(); let name = fname.to_str().unwrap();
assert!(name.starts_with("foo"));
assert!(name.ends_with(".txt")); assert!(name.ends_with(".txt"));
} }
} }
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_double_ext_preserved() { async fn test_double_ext() {
let t = TestContext::new().await; let t = TestContext::new().await;
BlobObject::create(&t, "foo.tar.gz", b"hello") BlobObject::create_and_deduplicate_from_bytes(&t, FILE_BYTES, "foo.tar.gz").unwrap();
.await let foo_path = t.get_blobdir().join(FILE_DEDUPLICATED).with_extension("gz");
.unwrap();
let foo_path = t.get_blobdir().join("foo.tar.gz");
assert!(foo_path.exists()); assert!(foo_path.exists());
BlobObject::create(&t, "foo.tar.gz", b"world") BlobObject::create_and_deduplicate_from_bytes(&t, b"world", "foo.tar.gz").unwrap();
.await
.unwrap();
let mut dir = fs::read_dir(t.get_blobdir()).await.unwrap(); let mut dir = fs::read_dir(t.get_blobdir()).await.unwrap();
while let Ok(Some(dirent)) = dir.next_entry().await { while let Ok(Some(dirent)) = dir.next_entry().await {
let fname = dirent.file_name(); let fname = dirent.file_name();
if fname == foo_path.file_name().unwrap() { if fname == foo_path.file_name().unwrap() {
assert_eq!(fs::read(&foo_path).await.unwrap(), b"hello"); assert_eq!(fs::read(&foo_path).await.unwrap(), FILE_BYTES);
} else { } else {
let name = fname.to_str().unwrap(); let name = fname.to_str().unwrap();
println!("{name}"); println!("{name}");
assert!(name.starts_with("foo")); assert_eq!(name.starts_with("foo"), false);
assert!(name.ends_with(".tar.gz")); assert_eq!(name.ends_with(".tar.gz"), false);
assert!(name.ends_with(".gz"));
} }
} }
} }
@@ -1635,28 +1604,30 @@ mod tests {
let path = t.get_blobdir().join("anyfile.dat"); let path = t.get_blobdir().join("anyfile.dat");
fs::write(&path, b"bla").await?; fs::write(&path, b"bla").await?;
let blob = BlobObject::create_and_deduplicate(&t, &path, "anyfile.dat")?; let blob = BlobObject::create_and_deduplicate(&t, &path, &path)?;
assert_eq!(blob.name, "$BLOBDIR/ce940175885d7b78f7b7e9f1396611f.dat"); assert_eq!(blob.name, "$BLOBDIR/ce940175885d7b78f7b7e9f1396611f.dat");
assert_eq!(path.exists(), false); assert_eq!(path.exists(), false);
assert_eq!(fs::read(&blob.to_abs_path()).await?, b"bla"); assert_eq!(fs::read(&blob.to_abs_path()).await?, b"bla");
fs::write(&path, b"bla").await?; fs::write(&path, b"bla").await?;
let blob2 = BlobObject::create_and_deduplicate(&t, &path, "anyfile.dat")?; let blob2 = BlobObject::create_and_deduplicate(&t, &path, &path)?;
assert_eq!(blob2.name, blob.name); assert_eq!(blob2.name, blob.name);
let path_outside_blobdir = t.dir.path().join("anyfile.dat"); let path_outside_blobdir = t.dir.path().join("anyfile.dat");
fs::write(&path_outside_blobdir, b"bla").await?; fs::write(&path_outside_blobdir, b"bla").await?;
let blob3 = BlobObject::create_and_deduplicate(&t, &path_outside_blobdir, "anyfile.dat")?; let blob3 =
BlobObject::create_and_deduplicate(&t, &path_outside_blobdir, &path_outside_blobdir)?;
assert!(path_outside_blobdir.exists()); assert!(path_outside_blobdir.exists());
assert_eq!(blob3.name, blob.name); assert_eq!(blob3.name, blob.name);
fs::write(&path, b"blabla").await?; fs::write(&path, b"blabla").await?;
let blob4 = BlobObject::create_and_deduplicate(&t, &path, "anyfile.dat")?; let blob4 = BlobObject::create_and_deduplicate(&t, &path, &path)?;
assert_ne!(blob4.name, blob.name); assert_ne!(blob4.name, blob.name);
fs::remove_dir_all(t.get_blobdir()).await?; fs::remove_dir_all(t.get_blobdir()).await?;
let blob5 = BlobObject::create_and_deduplicate(&t, &path_outside_blobdir, "anyfile.dat")?; let blob5 =
BlobObject::create_and_deduplicate(&t, &path_outside_blobdir, &path_outside_blobdir)?;
assert_eq!(blob5.name, blob.name); assert_eq!(blob5.name, blob.name);
Ok(()) Ok(())

View File

@@ -1114,7 +1114,7 @@ impl Message {
.unwrap_or_else(|| "unknown_file".to_string()) .unwrap_or_else(|| "unknown_file".to_string())
}; };
let blob = BlobObject::create_and_deduplicate(context, file, &name)?; let blob = BlobObject::create_and_deduplicate(context, file, Path::new(&name))?;
self.param.set(Param::File, blob.as_name()); self.param.set(Param::File, blob.as_name());
self.param.set(Param::Filename, name); self.param.set(Param::Filename, name);