diff --git a/src/blob.rs b/src/blob.rs index ef2869bed..1fe51f140 100644 --- a/src/blob.rs +++ b/src/blob.rs @@ -4,13 +4,16 @@ use core::cmp::max; use std::ffi::OsStr; use std::fmt; use std::io::Cursor; +use std::iter::FusedIterator; use std::path::{Path, PathBuf}; use anyhow::{format_err, Context as _, Result}; +use futures::StreamExt; use image::{DynamicImage, ImageFormat}; use num_traits::FromPrimitive; use tokio::io::AsyncWriteExt; use tokio::{fs, io}; +use tokio_stream::wrappers::ReadDirStream; use crate::config::Config; use crate::constants::{ @@ -468,6 +471,94 @@ impl<'a> fmt::Display for BlobObject<'a> { } } +/// All files in the blobdir. +/// +/// This exists so we can have a [`BlobDirIter`] which needs something to own the data of +/// it's `&Path`. Use [`BlobDirContents::iter`] to create the iterator. +/// +/// Additionally pre-allocating this means we get a length for progress report. +pub(crate) struct BlobDirContents<'a> { + inner: Vec, + context: &'a Context, +} + +impl<'a> BlobDirContents<'a> { + pub(crate) async fn new(context: &'a Context) -> Result> { + let readdir = fs::read_dir(context.get_blobdir()).await?; + let inner = ReadDirStream::new(readdir) + .filter_map(|entry| async move { + match entry { + Ok(entry) => Some(entry), + Err(err) => { + error!(context, "Failed to read blob file: {err}"); + None + } + } + }) + .filter_map(|entry| async move { + match entry.file_type().await.ok()?.is_file() { + true => Some(entry.path()), + false => { + warn!( + context, + "Export: Found blob dir entry {} that is not a file, ignoring", + entry.path().display() + ); + None + } + } + }) + .collect() + .await; + Ok(Self { inner, context }) + } + + pub(crate) fn iter(&self) -> BlobDirIter<'_> { + BlobDirIter::new(self.context, &self.inner) + } + + pub(crate) fn len(&self) -> usize { + self.inner.len() + } +} + +/// A iterator over all the [`BlobObject`]s in the blobdir. +pub(crate) struct BlobDirIter<'a> { + paths: &'a [PathBuf], + offset: usize, + context: &'a Context, +} + +impl<'a> BlobDirIter<'a> { + fn new(context: &'a Context, paths: &'a [PathBuf]) -> BlobDirIter<'a> { + Self { + paths, + offset: 0, + context, + } + } +} + +impl<'a> Iterator for BlobDirIter<'a> { + type Item = BlobObject<'a>; + + fn next(&mut self) -> Option { + while let Some(path) = self.paths.get(self.offset) { + self.offset += 1; + + // In theory this can error but we'd have corrupted filenames in the blobdir, so + // silently skipping them is fine. + match BlobObject::from_path(self.context, path) { + Ok(blob) => return Some(blob), + Err(err) => warn!(self.context, "{err}"), + } + } + None + } +} + +impl FusedIterator for BlobDirIter<'_> {} + fn encode_img(img: &DynamicImage, encoded: &mut Vec) -> anyhow::Result<()> { encoded.clear(); let mut buf = Cursor::new(encoded); diff --git a/src/imex.rs b/src/imex.rs index c6187a208..df28fae63 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -2,7 +2,6 @@ use std::any::Any; use std::ffi::OsStr; -use std::iter::FusedIterator; use std::path::{Path, PathBuf}; use ::pgp::types::KeyTrait; @@ -11,10 +10,9 @@ use futures::StreamExt; use futures_lite::FutureExt; use rand::{thread_rng, Rng}; use tokio::fs::{self, File}; -use tokio_stream::wrappers::ReadDirStream; use tokio_tar::Archive; -use crate::blob::BlobObject; +use crate::blob::{BlobDirContents, BlobObject}; use crate::chat::{self, delete_and_reset_all_device_msgs, ChatId}; use crate::config::Config; use crate::contact::ContactId; @@ -570,10 +568,17 @@ async fn export_backup_inner( .await?; let blobdir = BlobDirContents::new(context).await?; - for blob in blobdir.iter() { + let mut last_progress = 0; + + for (i, blob) in blobdir.iter().enumerate() { let mut file = File::open(blob.to_abs_path()).await?; let path_in_archive = PathBuf::from(BLOBS_BACKUP_NAME).join(blob.as_name()); builder.append_file(path_in_archive, &mut file).await?; + let progress = 1000 * i / blobdir.len(); + if progress != last_progress && progress > 10 && progress < 1000 { + context.emit_event(EventType::ImexProgress(progress)); + last_progress = progress; + } } builder.finish().await?; @@ -761,105 +766,6 @@ async fn export_database(context: &Context, dest: &Path, passphrase: String) -> Ok(()) } -/// All files in the blobdir. -/// -/// This exists so we can have a [`BlobDirIter`] which needs something to own the data of -/// it's `&Path`. Use [`BlobDirContents::iter`] to create the iterator. -/// -/// Additionally pre-allocating this means we get a length for progress report. -struct BlobDirContents<'a> { - inner: Vec, - context: &'a Context, -} - -impl<'a> BlobDirContents<'a> { - async fn new(context: &'a Context) -> Result> { - let readdir = fs::read_dir(context.get_blobdir()).await?; - let inner = ReadDirStream::new(readdir) - .filter_map(|entry| async move { - match entry { - Ok(entry) => Some(entry), - Err(err) => { - error!(context, "Failed to read blob file: {err}"); - None - } - } - }) - .filter_map(|entry| async move { - match entry.file_type().await.ok()?.is_file() { - true => Some(entry.path()), - false => { - warn!( - context, - "Export: Found blob dir entry {} that is not a file, ignoring", - entry.path().display() - ); - None - } - } - }) - .collect() - .await; - Ok(Self { inner, context }) - } - - fn iter(&self) -> BlobDirIter<'_> { - BlobDirIter::new(self.context, &self.inner) - } -} - -/// A stream for [`BlobObject`]s. -/// -/// The stream emits [`EventType::ImexProgress`] events as it being consumed. -/// -/// Because we like to know the total number of blobs to emit progress all the blobs are -/// read up front and stored. Luckily this also makes our life easier, since we now only -/// need to implement `Iterator` and not `Stream`. -struct BlobDirIter<'a> { - paths: &'a [PathBuf], - offset: usize, - last_progress: usize, - context: &'a Context, -} - -impl<'a> BlobDirIter<'a> { - fn new(context: &'a Context, paths: &'a [PathBuf]) -> BlobDirIter<'a> { - Self { - paths, - offset: 0, - last_progress: 0, - context, - } - } -} - -impl<'a> Iterator for BlobDirIter<'a> { - type Item = BlobObject<'a>; - - fn next(&mut self) -> Option { - while let Some(path) = self.paths.get(self.offset) { - self.offset += 1; - - // In theory this can error but we'd have corrupted filenames in the blobdir, so - // silently skipping them is fine. - match BlobObject::from_path(self.context, path) { - Ok(blob) => { - let progress = 1000 * self.offset / self.paths.len(); - if progress != self.last_progress && progress > 10 && progress < 1000 { - self.context.emit_event(EventType::ImexProgress(progress)); - self.last_progress = progress; - } - return Some(blob); - } - Err(err) => warn!(self.context, "{err}"), - } - } - None - } -} - -impl FusedIterator for BlobDirIter<'_> {} - #[cfg(test)] mod tests { use ::pgp::armor::BlockType; diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index d345c94a8..af7e644f9 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -39,12 +39,13 @@ use tokio::sync::broadcast::error::RecvError; use tokio::task::JoinHandle; use tokio_stream::wrappers::ReadDirStream; +use crate::blob::BlobDirContents; use crate::chat::delete_and_reset_all_device_msgs; use crate::context::Context; use crate::qr::Qr; use crate::{e2ee, EventType}; -use super::{export_database, BlobDirContents, DeleteOnDrop, DBFILE_BACKUP_NAME}; +use super::{export_database, DeleteOnDrop, DBFILE_BACKUP_NAME}; /// Provide or send a backup of this device. ///