File deduplication (#6332)

When receiving messages, blobs will be deduplicated with the new
function `create_and_deduplicate_from_bytes()`. For sending files, this
adds a new function `set_file_and_deduplicate()` instead of
deduplicating by default.

This is for
https://github.com/deltachat/deltachat-core-rust/issues/6265; read the
issue description there for more details.

TODO:
- [x] Set files as read-only
- [x] Don't do a write when the file is already identical
- [x] The first 32 chars or so of the 64-character hash are enough. I
calculated that if 10b people (i.e. all of humanity) use DC, and each of
them has 200k distinct blob files (I have 4k in my day-to-day account),
and we used 20 chars, then the expected value for the number of name
collisions would be ~0.0002 (and the probability that there is a least
one name collision is lower than that) [^1]. I added 12 more characters
to be on the super safe side, but this wouldn't be necessary and I could
also make it 20 instead of 32.
- Not 100% sure whether that's necessary at all - it would mainly be
necessary if we might hit a length limit on some file systems (the
blobdir is usually sth like
`accounts/2ff9fc096d2f46b6832b24a1ed99c0d6/dc.db-blobs` (53 chars), plus
64 chars for the filename would be 117).
- [x] "touch" the files to prevent them from being deleted
- [x] TODOs in the code

For later PRs:
- Replace `BlobObject::create(…)` with
`BlobObject::create_and_deduplicate(…)` in order to deduplicate
everytime core creates a file
- Modify JsonRPC to deduplicate blob files
- Possibly rename BlobObject.name to BlobObject.file in order to prevent
confusion (because `name` usually means "user-visible-name", not "name
of the file on disk").

[^1]: Calculated with both https://printfn.github.io/fend/ and
https://www.geogebra.org/calculator, both of which came to the same
result
([1](https://github.com/user-attachments/assets/bbb62550-3781-48b5-88b1-ba0e29c28c0d),

[2](https://github.com/user-attachments/assets/82171212-b797-4117-a39f-0e132eac7252))

---------

Co-authored-by: l <link2xt@testrun.org>
This commit is contained in:
Hocuri
2025-01-21 19:42:19 +01:00
committed by GitHub
parent 22a7cfe9c3
commit 65a9c4b79b
23 changed files with 583 additions and 240 deletions

View File

@@ -16,7 +16,7 @@ use image::ImageReader;
use image::{DynamicImage, GenericImage, GenericImageView, ImageFormat, Pixel, Rgba};
use num_traits::FromPrimitive;
use tokio::io::AsyncWriteExt;
use tokio::{fs, io};
use tokio::{fs, io, task};
use tokio_stream::wrappers::ReadDirStream;
use crate::config::Config;
@@ -34,6 +34,10 @@ use crate::log::LogExt;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlobObject<'a> {
blobdir: &'a Path,
/// The name of the file on the disc.
/// Note that this is NOT the user-visible filename,
/// which is only stored in Param::Filename on the message.
name: String,
}
@@ -74,7 +78,7 @@ impl<'a> BlobObject<'a> {
Ok(blob)
}
// Creates a new file, returning a tuple of the name and the handle.
/// Creates a new file, returning a tuple of the name and the handle.
async fn create_new_file(
context: &Context,
dir: &Path,
@@ -88,6 +92,8 @@ impl<'a> BlobObject<'a> {
attempt += 1;
let path = dir.join(&name);
match fs::OpenOptions::new()
// Using `create_new(true)` in order to avoid race conditions
// when creating multiple files with the same name.
.create_new(true)
.write(true)
.open(&path)
@@ -139,6 +145,88 @@ impl<'a> BlobObject<'a> {
Ok(blob)
}
/// Creates a blob object by copying or renaming an existing file.
/// If the source file is already in the blobdir, it will be renamed,
/// otherwise it will be copied to the blobdir first.
///
/// In order to deduplicate files that contain the same data,
/// the file will be named as the hash of the file data.
///
/// This is done in a in way which avoids race-conditions when multiple files are
/// concurrently created.
pub fn create_and_deduplicate(context: &'a Context, src: &Path) -> Result<BlobObject<'a>> {
// `create_and_deduplicate{_from_bytes}()` do blocking I/O, but can still be called
// from an async context thanks to `block_in_place()`.
// Tokio's "async" I/O functions are also just thin wrappers around the blocking I/O syscalls,
// so we are doing essentially the same here.
task::block_in_place(|| {
let temp_path;
let src_in_blobdir: &Path;
let blobdir = context.get_blobdir();
if src.starts_with(blobdir) || src.starts_with("$BLOBDIR/") {
src_in_blobdir = src;
} else {
info!(
context,
"Source file not in blobdir. Copying instead of moving in order to prevent moving a file that was still needed."
);
temp_path = blobdir.join(format!("tmp-{}", rand::random::<u64>()));
if std::fs::copy(src, &temp_path).is_err() {
// Maybe the blobdir didn't exist
std::fs::create_dir_all(blobdir).log_err(context).ok();
std::fs::copy(src, &temp_path).context("Copying new blobfile failed")?;
};
src_in_blobdir = &temp_path;
}
let blob = BlobObject::from_hash(blobdir, file_hash(src_in_blobdir)?);
let new_path = blob.to_abs_path();
// This will also replace an already-existing file.
// Renaming is atomic, so this will avoid race conditions.
std::fs::rename(src_in_blobdir, &new_path)?;
context.emit_event(EventType::NewBlobFile(blob.as_name().to_string()));
Ok(blob)
})
}
/// Creates a new blob object with the file contents in `data`.
/// In order to deduplicate files that contain the same data,
/// the file will be renamed to a hash of the file data.
///
/// The `data` will be written into the file without race-conditions.
///
/// This function does blocking I/O, but it can still be called from an async context
/// because `block_in_place()` is used to leave the async runtime if necessary.
pub fn create_and_deduplicate_from_bytes(
context: &'a Context,
data: &[u8],
) -> Result<BlobObject<'a>> {
task::block_in_place(|| {
let blobdir = context.get_blobdir();
let temp_path = blobdir.join(format!("tmp-{}", rand::random::<u64>()));
if std::fs::write(&temp_path, data).is_err() {
// Maybe the blobdir didn't exist
std::fs::create_dir_all(blobdir).log_err(context).ok();
std::fs::write(&temp_path, data).context("writing new blobfile failed")?;
};
BlobObject::create_and_deduplicate(context, &temp_path)
})
}
fn from_hash(blobdir: &Path, hash: blake3::Hash) -> BlobObject<'_> {
let hash = hash.to_hex();
let hash = hash.as_str();
let hash = hash.get(0..31).unwrap_or(hash);
BlobObject {
blobdir,
name: format!("$BLOBDIR/{hash}"),
}
}
/// Creates a blob from a file, possibly copying it to the blobdir.
///
/// If the source file is not a path to into the blob directory
@@ -210,6 +298,9 @@ impl<'a> BlobObject<'a> {
/// this string in the database or [Params]. Eventually even
/// those conversions should be handled by the type system.
///
/// Note that this is NOT the user-visible filename,
/// which is only stored in Param::Filename on the message.
///
/// [Params]: crate::param::Params
pub fn as_name(&self) -> &str {
&self.name
@@ -356,8 +447,6 @@ impl<'a> BlobObject<'a> {
}
pub async fn recode_to_avatar_size(&mut self, context: &Context) -> Result<()> {
let blob_abs = self.to_abs_path();
let img_wh =
match MediaQuality::from_i32(context.get_config_int(Config::MediaQuality).await?)
.unwrap_or_default()
@@ -370,16 +459,15 @@ impl<'a> BlobObject<'a> {
let strict_limits = true;
// max_bytes is 20_000 bytes: Outlook servers don't allow headers larger than 32k.
// 32 / 4 * 3 = 24k if you account for base64 encoding. To be safe, we reduced this to 20k.
if let Some(new_name) = self.recode_to_size(
self.recode_to_size(
context,
blob_abs,
"".to_string(), // The name of an avatar doesn't matter
maybe_sticker,
img_wh,
20_000,
strict_limits,
)? {
self.name = new_name;
}
)?;
Ok(())
}
@@ -393,9 +481,9 @@ impl<'a> BlobObject<'a> {
pub async fn recode_to_image_size(
&mut self,
context: &Context,
name: String,
maybe_sticker: &mut bool,
) -> Result<()> {
let blob_abs = self.to_abs_path();
) -> Result<String> {
let (img_wh, max_bytes) =
match MediaQuality::from_i32(context.get_config_int(Config::MediaQuality).await?)
.unwrap_or_default()
@@ -407,35 +495,43 @@ impl<'a> BlobObject<'a> {
MediaQuality::Worse => (constants::WORSE_IMAGE_SIZE, constants::WORSE_IMAGE_BYTES),
};
let strict_limits = false;
if let Some(new_name) = self.recode_to_size(
let new_name = self.recode_to_size(
context,
blob_abs,
name,
maybe_sticker,
img_wh,
max_bytes,
strict_limits,
)? {
self.name = new_name;
}
Ok(())
)?;
Ok(new_name)
}
/// If `!strict_limits`, then if `max_bytes` is exceeded, reduce the image to `img_wh` and just
/// proceed with the result.
///
/// This modifies the blob object in-place.
///
/// Additionally, if you pass the user-visible filename as `name`
/// then the updated user-visible filename will be returned;
/// this may be necessary because the format may be changed to JPG,
/// i.e. "image.png" -> "image.jpg".
/// Pass an empty string if you don't care.
fn recode_to_size(
&mut self,
context: &Context,
mut blob_abs: PathBuf,
mut name: String,
maybe_sticker: &mut bool,
mut img_wh: u32,
max_bytes: usize,
strict_limits: bool,
) -> Result<Option<String>> {
) -> Result<String> {
// Add white background only to avatars to spare the CPU.
let mut add_white_bg = img_wh <= constants::BALANCED_AVATAR_SIZE;
let mut no_exif = false;
let no_exif_ref = &mut no_exif;
let res = tokio::task::block_in_place(move || {
let original_name = name.clone();
let res: Result<String> = tokio::task::block_in_place(move || {
let mut file = std::fs::File::open(self.to_abs_path())?;
let (nr_bytes, exif) = image_metadata(&file)?;
*no_exif_ref = exif.is_none();
@@ -449,7 +545,7 @@ impl<'a> BlobObject<'a> {
file.rewind()?;
ImageReader::with_format(
std::io::BufReader::new(&file),
ImageFormat::from_path(&blob_abs)?,
ImageFormat::from_path(self.to_abs_path())?,
)
}
};
@@ -457,7 +553,6 @@ impl<'a> BlobObject<'a> {
let mut img = imgreader.decode().context("image decode failure")?;
let orientation = exif.as_ref().map(|exif| exif_orientation(exif, context));
let mut encoded = Vec::new();
let mut changed_name = None;
if *maybe_sticker {
let x_max = img.width().saturating_sub(1);
@@ -469,7 +564,7 @@ impl<'a> BlobObject<'a> {
|| img.get_pixel(x_max, y_max).0[3] == 0);
}
if *maybe_sticker && exif.is_none() {
return Ok(None);
return Ok(name);
}
img = match orientation {
@@ -566,10 +661,10 @@ impl<'a> BlobObject<'a> {
if !matches!(fmt, ImageFormat::Jpeg)
&& matches!(ofmt, ImageOutputFormat::Jpeg { .. })
{
blob_abs = blob_abs.with_extension("jpg");
let file_name = blob_abs.file_name().context("No image file name (???)")?;
let file_name = file_name.to_str().context("Filename is no UTF-8 (???)")?;
changed_name = Some(format!("$BLOBDIR/{file_name}"));
name = Path::new(&name)
.with_extension("jpg")
.to_string_lossy()
.into_owned();
}
if encoded.is_empty() {
@@ -579,11 +674,12 @@ impl<'a> BlobObject<'a> {
encode_img(&img, ofmt, &mut encoded)?;
}
std::fs::write(&blob_abs, &encoded)
.context("failed to write recoded blob to file")?;
self.name = BlobObject::create_and_deduplicate_from_bytes(context, &encoded)
.context("failed to write recoded blob to file")?
.name;
}
Ok(changed_name)
Ok(name)
});
match res {
Ok(_) => res,
@@ -593,7 +689,7 @@ impl<'a> BlobObject<'a> {
context,
"Cannot recode image, using original data: {err:#}.",
);
Ok(None)
Ok(original_name)
} else {
Err(err)
}
@@ -602,6 +698,17 @@ impl<'a> BlobObject<'a> {
}
}
fn file_hash(src: &Path) -> Result<blake3::Hash> {
let mut hasher = blake3::Hasher::new();
let mut src_file = std::fs::File::open(src)
.with_context(|| format!("Failed to open file {}", src.display()))?;
hasher
.update_reader(&mut src_file)
.context("update_reader")?;
let hash = hasher.finalize();
Ok(hash)
}
/// Returns image file size and Exif.
pub fn image_metadata(file: &std::fs::File) -> Result<(u64, Option<exif::Exif>)> {
let len = file.metadata()?.len();
@@ -762,15 +869,22 @@ fn add_white_bg(img: &mut DynamicImage) {
#[cfg(test)]
mod tests {
use fs::File;
use std::time::Duration;
use super::*;
use crate::message::{Message, Viewtype};
use crate::sql;
use crate::test_utils::{self, TestContext};
use crate::tools::SystemTime;
fn check_image_size(path: impl AsRef<Path>, width: u32, height: u32) -> image::DynamicImage {
tokio::task::block_in_place(move || {
let img = image::open(path).expect("failed to open image");
let img = ImageReader::open(path)
.expect("failed to open image")
.with_guessed_format()
.expect("failed to guess format")
.decode()
.expect("failed to decode image");
assert_eq!(img.width(), width, "invalid width");
assert_eq!(img.height(), height, "invalid height");
img
@@ -1008,7 +1122,7 @@ mod tests {
let strict_limits = true;
blob.recode_to_size(
&t,
blob.to_abs_path(),
"avatar.png".to_string(),
maybe_sticker,
img_wh,
20_000,
@@ -1016,7 +1130,12 @@ mod tests {
)
.unwrap();
tokio::task::block_in_place(move || {
let img = image::open(blob.to_abs_path()).unwrap();
let img = ImageReader::open(blob.to_abs_path())
.unwrap()
.with_guessed_format()
.unwrap()
.decode()
.unwrap();
assert!(img.width() == img_wh);
assert!(img.height() == img_wh);
assert_eq!(img.get_pixel(0, 0), Rgba(color));
@@ -1026,19 +1145,25 @@ mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_selfavatar_outside_blobdir() {
async fn file_size(path_buf: &Path) -> u64 {
fs::metadata(path_buf).await.unwrap().len()
}
let t = TestContext::new().await;
let avatar_src = t.dir.path().join("avatar.jpg");
let avatar_bytes = include_bytes!("../test-data/image/avatar1000x1000.jpg");
fs::write(&avatar_src, avatar_bytes).await.unwrap();
let avatar_blob = t.get_blobdir().join("avatar.jpg");
assert!(!avatar_blob.exists());
t.set_config(Config::Selfavatar, Some(avatar_src.to_str().unwrap()))
.await
.unwrap();
assert!(avatar_blob.exists());
assert!(fs::metadata(&avatar_blob).await.unwrap().len() < avatar_bytes.len() as u64);
let avatar_cfg = t.get_config(Config::Selfavatar).await.unwrap();
assert_eq!(avatar_cfg, avatar_blob.to_str().map(|s| s.to_string()));
let avatar_blob = t.get_config(Config::Selfavatar).await.unwrap().unwrap();
let avatar_path = Path::new(&avatar_blob);
assert!(
avatar_blob.ends_with("d98cd30ed8f2129bf3968420208849d"),
"The avatar filename should be its hash, put instead it's {avatar_blob}"
);
let scaled_avatar_size = file_size(avatar_path).await;
assert!(scaled_avatar_size < avatar_bytes.len() as u64);
check_image_size(avatar_src, 1000, 1000);
check_image_size(
@@ -1047,27 +1172,32 @@ mod tests {
constants::BALANCED_AVATAR_SIZE,
);
async fn file_size(path_buf: &Path) -> u64 {
let file = File::open(path_buf).await.unwrap();
file.metadata().await.unwrap().len()
}
let mut blob = BlobObject::new_from_path(&t, &avatar_blob).await.unwrap();
let mut blob = BlobObject::new_from_path(&t, avatar_path).await.unwrap();
let maybe_sticker = &mut false;
let strict_limits = true;
blob.recode_to_size(
&t,
blob.to_abs_path(),
"avatar.jpg".to_string(),
maybe_sticker,
1000,
3000,
strict_limits,
)
.unwrap();
assert!(file_size(&avatar_blob).await <= 3000);
assert!(file_size(&avatar_blob).await > 2000);
let new_file_size = file_size(&blob.to_abs_path()).await;
assert!(new_file_size <= 3000);
assert!(new_file_size > 2000);
// The new file should be smaller:
assert!(new_file_size < scaled_avatar_size);
// And the original file should not be touched:
assert_eq!(file_size(avatar_path).await, scaled_avatar_size);
tokio::task::block_in_place(move || {
let img = image::open(avatar_blob).unwrap();
let img = ImageReader::open(blob.to_abs_path())
.unwrap()
.with_guessed_format()
.unwrap()
.decode()
.unwrap();
assert!(img.width() > 130);
assert_eq!(img.width(), img.height());
});
@@ -1087,9 +1217,9 @@ mod tests {
.await
.unwrap();
let avatar_cfg = t.get_config(Config::Selfavatar).await.unwrap().unwrap();
assert_eq!(
avatar_cfg,
avatar_src.with_extension("png").to_str().unwrap()
assert!(
avatar_cfg.ends_with("9e7f409ac5c92b942cc4f31cee2770a"),
"Avatar file name {avatar_cfg} should end with its hash"
);
check_image_size(
@@ -1373,6 +1503,7 @@ mod tests {
.set_config(Config::MediaQuality, Some(media_quality_config))
.await?;
let file = alice.get_blobdir().join("file").with_extension(extension);
let file_name = format!("file.{extension}");
fs::write(&file, &bytes)
.await
@@ -1388,7 +1519,7 @@ mod tests {
}
let mut msg = Message::new(viewtype);
msg.set_file(file.to_str().unwrap(), None);
msg.set_file_and_deduplicate(&alice, &file, Some(&file_name), None)?;
let chat = alice.create_chat(&bob).await;
if set_draft {
chat.id.set_draft(&alice, Some(&mut msg)).await.unwrap();
@@ -1444,7 +1575,7 @@ mod tests {
.await
.context("failed to write file")?;
let mut msg = Message::new(Viewtype::Image);
msg.set_file(file.to_str().unwrap(), None);
msg.set_file_and_deduplicate(&alice, &file, Some("file.gif"), None)?;
let chat = alice.create_chat(&bob).await;
let sent = alice.send_msg(chat.id, &mut msg).await;
let bob_msg = bob.recv_msg(&sent).await;
@@ -1480,4 +1611,83 @@ mod tests {
assert_eq!(msg.get_viewtype(), Viewtype::Sticker);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_create_and_deduplicate() -> Result<()> {
let t = TestContext::new().await;
let path = t.get_blobdir().join("anyfile.dat");
fs::write(&path, b"bla").await?;
let blob = BlobObject::create_and_deduplicate(&t, &path)?;
assert_eq!(blob.name, "$BLOBDIR/ce940175885d7b78f7b7e9f1396611f");
assert_eq!(path.exists(), false);
assert_eq!(fs::read(&blob.to_abs_path()).await?, b"bla");
fs::write(&path, b"bla").await?;
let blob2 = BlobObject::create_and_deduplicate(&t, &path)?;
assert_eq!(blob2.name, blob.name);
let path_outside_blobdir = t.dir.path().join("anyfile.dat");
fs::write(&path_outside_blobdir, b"bla").await?;
let blob3 = BlobObject::create_and_deduplicate(&t, &path_outside_blobdir)?;
assert!(path_outside_blobdir.exists());
assert_eq!(blob3.name, blob.name);
fs::write(&path, b"blabla").await?;
let blob4 = BlobObject::create_and_deduplicate(&t, &path)?;
assert_ne!(blob4.name, blob.name);
fs::remove_dir_all(t.get_blobdir()).await?;
let blob5 = BlobObject::create_and_deduplicate(&t, &path_outside_blobdir)?;
assert_eq!(blob5.name, blob.name);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_create_and_deduplicate_from_bytes() -> Result<()> {
let t = TestContext::new().await;
fs::remove_dir(t.get_blobdir()).await?;
let blob = BlobObject::create_and_deduplicate_from_bytes(&t, b"bla")?;
assert_eq!(blob.name, "$BLOBDIR/ce940175885d7b78f7b7e9f1396611f");
assert_eq!(fs::read(&blob.to_abs_path()).await?, b"bla");
let modified1 = blob.to_abs_path().metadata()?.modified()?;
// Test that the modification time of the file is updated when a new file is created
// so that it's not deleted during housekeeping.
// We can't use SystemTime::shift() here because file creation uses the actual OS time,
// which we can't mock from our code.
tokio::time::sleep(Duration::from_millis(1100)).await;
let blob2 = BlobObject::create_and_deduplicate_from_bytes(&t, b"bla")?;
assert_eq!(blob2.name, blob.name);
let modified2 = blob.to_abs_path().metadata()?.modified()?;
assert_ne!(modified1, modified2);
sql::housekeeping(&t).await?;
assert!(blob2.to_abs_path().exists());
// If we do shift the time by more than 1h, the blob file will be deleted during housekeeping:
SystemTime::shift(Duration::from_secs(65 * 60));
sql::housekeeping(&t).await?;
assert_eq!(blob2.to_abs_path().exists(), false);
let blob3 = BlobObject::create_and_deduplicate_from_bytes(&t, b"blabla")?;
assert_ne!(blob3.name, blob.name);
{
// If something goes wrong and the blob file is overwritten,
// the correct content should be restored:
fs::write(blob3.to_abs_path(), b"bloblo").await?;
let blob4 = BlobObject::create_and_deduplicate_from_bytes(&t, b"blabla")?;
let blob4_content = fs::read(blob4.to_abs_path()).await?;
assert_eq!(blob4_content, b"blabla");
}
Ok(())
}
}