Do not emit ImexEvent From BlobDirIter

We no longer need that in the transfer case, that would give very
weird results.  This also means there is nothing imex-specific about
this anymore so move it to blobs.rs
This commit is contained in:
Floris Bruynooghe
2023-02-16 18:05:09 +01:00
parent 8ae0ee5a67
commit 8072f78058
3 changed files with 102 additions and 104 deletions

View File

@@ -4,13 +4,16 @@ use core::cmp::max;
use std::ffi::OsStr;
use std::fmt;
use std::io::Cursor;
use std::iter::FusedIterator;
use std::path::{Path, PathBuf};
use anyhow::{format_err, Context as _, Result};
use futures::StreamExt;
use image::{DynamicImage, ImageFormat};
use num_traits::FromPrimitive;
use tokio::io::AsyncWriteExt;
use tokio::{fs, io};
use tokio_stream::wrappers::ReadDirStream;
use crate::config::Config;
use crate::constants::{
@@ -468,6 +471,94 @@ impl<'a> fmt::Display for BlobObject<'a> {
}
}
/// All files in the blobdir.
///
/// This exists so we can have a [`BlobDirIter`] which needs something to own the data of
/// it's `&Path`. Use [`BlobDirContents::iter`] to create the iterator.
///
/// Additionally pre-allocating this means we get a length for progress report.
pub(crate) struct BlobDirContents<'a> {
inner: Vec<PathBuf>,
context: &'a Context,
}
impl<'a> BlobDirContents<'a> {
pub(crate) async fn new(context: &'a Context) -> Result<BlobDirContents<'a>> {
let readdir = fs::read_dir(context.get_blobdir()).await?;
let inner = ReadDirStream::new(readdir)
.filter_map(|entry| async move {
match entry {
Ok(entry) => Some(entry),
Err(err) => {
error!(context, "Failed to read blob file: {err}");
None
}
}
})
.filter_map(|entry| async move {
match entry.file_type().await.ok()?.is_file() {
true => Some(entry.path()),
false => {
warn!(
context,
"Export: Found blob dir entry {} that is not a file, ignoring",
entry.path().display()
);
None
}
}
})
.collect()
.await;
Ok(Self { inner, context })
}
pub(crate) fn iter(&self) -> BlobDirIter<'_> {
BlobDirIter::new(self.context, &self.inner)
}
pub(crate) fn len(&self) -> usize {
self.inner.len()
}
}
/// A iterator over all the [`BlobObject`]s in the blobdir.
pub(crate) struct BlobDirIter<'a> {
paths: &'a [PathBuf],
offset: usize,
context: &'a Context,
}
impl<'a> BlobDirIter<'a> {
fn new(context: &'a Context, paths: &'a [PathBuf]) -> BlobDirIter<'a> {
Self {
paths,
offset: 0,
context,
}
}
}
impl<'a> Iterator for BlobDirIter<'a> {
type Item = BlobObject<'a>;
fn next(&mut self) -> Option<Self::Item> {
while let Some(path) = self.paths.get(self.offset) {
self.offset += 1;
// In theory this can error but we'd have corrupted filenames in the blobdir, so
// silently skipping them is fine.
match BlobObject::from_path(self.context, path) {
Ok(blob) => return Some(blob),
Err(err) => warn!(self.context, "{err}"),
}
}
None
}
}
impl FusedIterator for BlobDirIter<'_> {}
fn encode_img(img: &DynamicImage, encoded: &mut Vec<u8>) -> anyhow::Result<()> {
encoded.clear();
let mut buf = Cursor::new(encoded);

View File

@@ -2,7 +2,6 @@
use std::any::Any;
use std::ffi::OsStr;
use std::iter::FusedIterator;
use std::path::{Path, PathBuf};
use ::pgp::types::KeyTrait;
@@ -11,10 +10,9 @@ use futures::StreamExt;
use futures_lite::FutureExt;
use rand::{thread_rng, Rng};
use tokio::fs::{self, File};
use tokio_stream::wrappers::ReadDirStream;
use tokio_tar::Archive;
use crate::blob::BlobObject;
use crate::blob::{BlobDirContents, BlobObject};
use crate::chat::{self, delete_and_reset_all_device_msgs, ChatId};
use crate::config::Config;
use crate::contact::ContactId;
@@ -570,10 +568,17 @@ async fn export_backup_inner(
.await?;
let blobdir = BlobDirContents::new(context).await?;
for blob in blobdir.iter() {
let mut last_progress = 0;
for (i, blob) in blobdir.iter().enumerate() {
let mut file = File::open(blob.to_abs_path()).await?;
let path_in_archive = PathBuf::from(BLOBS_BACKUP_NAME).join(blob.as_name());
builder.append_file(path_in_archive, &mut file).await?;
let progress = 1000 * i / blobdir.len();
if progress != last_progress && progress > 10 && progress < 1000 {
context.emit_event(EventType::ImexProgress(progress));
last_progress = progress;
}
}
builder.finish().await?;
@@ -761,105 +766,6 @@ async fn export_database(context: &Context, dest: &Path, passphrase: String) ->
Ok(())
}
/// All files in the blobdir.
///
/// This exists so we can have a [`BlobDirIter`] which needs something to own the data of
/// it's `&Path`. Use [`BlobDirContents::iter`] to create the iterator.
///
/// Additionally pre-allocating this means we get a length for progress report.
struct BlobDirContents<'a> {
inner: Vec<PathBuf>,
context: &'a Context,
}
impl<'a> BlobDirContents<'a> {
async fn new(context: &'a Context) -> Result<BlobDirContents<'a>> {
let readdir = fs::read_dir(context.get_blobdir()).await?;
let inner = ReadDirStream::new(readdir)
.filter_map(|entry| async move {
match entry {
Ok(entry) => Some(entry),
Err(err) => {
error!(context, "Failed to read blob file: {err}");
None
}
}
})
.filter_map(|entry| async move {
match entry.file_type().await.ok()?.is_file() {
true => Some(entry.path()),
false => {
warn!(
context,
"Export: Found blob dir entry {} that is not a file, ignoring",
entry.path().display()
);
None
}
}
})
.collect()
.await;
Ok(Self { inner, context })
}
fn iter(&self) -> BlobDirIter<'_> {
BlobDirIter::new(self.context, &self.inner)
}
}
/// A stream for [`BlobObject`]s.
///
/// The stream emits [`EventType::ImexProgress`] events as it being consumed.
///
/// Because we like to know the total number of blobs to emit progress all the blobs are
/// read up front and stored. Luckily this also makes our life easier, since we now only
/// need to implement `Iterator` and not `Stream`.
struct BlobDirIter<'a> {
paths: &'a [PathBuf],
offset: usize,
last_progress: usize,
context: &'a Context,
}
impl<'a> BlobDirIter<'a> {
fn new(context: &'a Context, paths: &'a [PathBuf]) -> BlobDirIter<'a> {
Self {
paths,
offset: 0,
last_progress: 0,
context,
}
}
}
impl<'a> Iterator for BlobDirIter<'a> {
type Item = BlobObject<'a>;
fn next(&mut self) -> Option<Self::Item> {
while let Some(path) = self.paths.get(self.offset) {
self.offset += 1;
// In theory this can error but we'd have corrupted filenames in the blobdir, so
// silently skipping them is fine.
match BlobObject::from_path(self.context, path) {
Ok(blob) => {
let progress = 1000 * self.offset / self.paths.len();
if progress != self.last_progress && progress > 10 && progress < 1000 {
self.context.emit_event(EventType::ImexProgress(progress));
self.last_progress = progress;
}
return Some(blob);
}
Err(err) => warn!(self.context, "{err}"),
}
}
None
}
}
impl FusedIterator for BlobDirIter<'_> {}
#[cfg(test)]
mod tests {
use ::pgp::armor::BlockType;

View File

@@ -39,12 +39,13 @@ use tokio::sync::broadcast::error::RecvError;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReadDirStream;
use crate::blob::BlobDirContents;
use crate::chat::delete_and_reset_all_device_msgs;
use crate::context::Context;
use crate::qr::Qr;
use crate::{e2ee, EventType};
use super::{export_database, BlobDirContents, DeleteOnDrop, DBFILE_BACKUP_NAME};
use super::{export_database, DeleteOnDrop, DBFILE_BACKUP_NAME};
/// Provide or send a backup of this device.
///