mirror of
https://github.com/chatmail/core.git
synced 2026-05-05 06:16:30 +03:00
Send and receive backup over network using QR code
This adds functionality to send and receive a backup over the network using a QR code. The sender or provider prepares the backup, sets up a server that waits for clients. It provides a ticket in the form of a QR code which contains connection and authentication information. The receiver uses the QR code to connect to the provider and fetches backup, restoring it locally.
This commit is contained in:
@@ -160,9 +160,9 @@ impl<'a> BlobObject<'a> {
|
||||
pub fn from_path(context: &'a Context, path: &Path) -> Result<BlobObject<'a>> {
|
||||
let rel_path = path
|
||||
.strip_prefix(context.get_blobdir())
|
||||
.context("wrong blobdir")?;
|
||||
.with_context(|| format!("wrong blobdir: {}", path.display()))?;
|
||||
if !BlobObject::is_acceptible_blob_name(rel_path) {
|
||||
return Err(format_err!("wrong name"));
|
||||
return Err(format_err!("bad blob name: {}", rel_path.display()));
|
||||
}
|
||||
let name = rel_path.to_str().context("wrong name")?;
|
||||
BlobObject::from_name(context, name.to_string())
|
||||
|
||||
@@ -513,6 +513,13 @@ impl Context {
|
||||
|
||||
// Ongoing process allocation/free/check
|
||||
|
||||
/// Tries to acquire the global UI "ongoing" mutex.
|
||||
///
|
||||
/// This is for modal operations during which no other user actions are allowed. Only
|
||||
/// one such operation is allowed at any given time.
|
||||
///
|
||||
/// The return value is a cancel token, which will release the ongoing mutex when
|
||||
/// dropped.
|
||||
pub(crate) async fn alloc_ongoing(&self) -> Result<Receiver<()>> {
|
||||
let mut s = self.running_state.write().await;
|
||||
ensure!(
|
||||
|
||||
296
src/imex.rs
296
src/imex.rs
@@ -4,14 +4,18 @@
|
||||
|
||||
use std::any::Any;
|
||||
use std::ffi::OsStr;
|
||||
use std::iter::FusedIterator;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use ::pgp::types::KeyTrait;
|
||||
use anyhow::{bail, ensure, format_err, Context as _, Result};
|
||||
use async_channel::Receiver;
|
||||
use futures::StreamExt;
|
||||
use futures_lite::FutureExt;
|
||||
use rand::{thread_rng, Rng};
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::task::JoinError;
|
||||
use tokio_stream::wrappers::ReadDirStream;
|
||||
use tokio_tar::Archive;
|
||||
|
||||
use crate::blob::BlobObject;
|
||||
@@ -19,6 +23,7 @@ use crate::chat::{self, delete_and_reset_all_device_msgs, ChatId};
|
||||
use crate::config::Config;
|
||||
use crate::contact::ContactId;
|
||||
use crate::context::Context;
|
||||
use crate::e2ee;
|
||||
use crate::events::EventType;
|
||||
use crate::key::{self, DcKey, DcSecretKey, SignedPublicKey, SignedSecretKey};
|
||||
use crate::log::LogExt;
|
||||
@@ -26,13 +31,13 @@ use crate::message::{Message, MsgId, Viewtype};
|
||||
use crate::mimeparser::SystemMessage;
|
||||
use crate::param::Param;
|
||||
use crate::pgp;
|
||||
use crate::qr::Qr;
|
||||
use crate::sql;
|
||||
use crate::stock_str;
|
||||
use crate::tools::{
|
||||
create_folder, delete_file, get_filesuffix_lc, open_file_std, read_file, time, write_file,
|
||||
EmailAddress,
|
||||
};
|
||||
use crate::{e2ee, tools};
|
||||
|
||||
// Name of the database file in the backup.
|
||||
const DBFILE_BACKUP_NAME: &str = "dc_database_backup.sqlite";
|
||||
@@ -511,23 +516,9 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res
|
||||
let _d1 = DeleteOnDrop(temp_db_path.clone());
|
||||
let _d2 = DeleteOnDrop(temp_path.clone());
|
||||
|
||||
context
|
||||
.sql
|
||||
.set_raw_config_int("backup_time", now as i32)
|
||||
.await?;
|
||||
sql::housekeeping(context).await.ok_or_log(context);
|
||||
|
||||
context
|
||||
.sql
|
||||
.execute("VACUUM;", paramsv![])
|
||||
export_database(context, &temp_db_path, passphrase)
|
||||
.await
|
||||
.map_err(|e| warn!(context, "Vacuum failed, exporting anyway {}", e))
|
||||
.ok();
|
||||
|
||||
ensure!(
|
||||
context.scheduler.read().await.is_none(),
|
||||
"cannot export backup, IO is running"
|
||||
);
|
||||
.context("could not export database")?;
|
||||
|
||||
info!(
|
||||
context,
|
||||
@@ -536,12 +527,6 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res
|
||||
dest_path.display(),
|
||||
);
|
||||
|
||||
context
|
||||
.sql
|
||||
.export(&temp_db_path, passphrase)
|
||||
.await
|
||||
.with_context(|| format!("failed to backup plaintext database to {temp_db_path:?}"))?;
|
||||
|
||||
let res = export_backup_inner(context, &temp_db_path, &temp_path).await;
|
||||
|
||||
match &res {
|
||||
@@ -579,32 +564,11 @@ async fn export_backup_inner(
|
||||
.append_path_with_name(temp_db_path, DBFILE_BACKUP_NAME)
|
||||
.await?;
|
||||
|
||||
let read_dir = tools::read_dir(context.get_blobdir()).await?;
|
||||
let count = read_dir.len();
|
||||
let mut written_files = 0;
|
||||
|
||||
let mut last_progress = 0;
|
||||
for entry in read_dir.into_iter() {
|
||||
let name = entry.file_name();
|
||||
if !entry.file_type().await?.is_file() {
|
||||
warn!(
|
||||
context,
|
||||
"Export: Found dir entry {} that is not a file, ignoring",
|
||||
name.to_string_lossy()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let mut file = File::open(entry.path()).await?;
|
||||
let path_in_archive = PathBuf::from(BLOBS_BACKUP_NAME).join(name);
|
||||
let blobdir = BlobDirContents::new(context).await?;
|
||||
for blob in blobdir.iter() {
|
||||
let mut file = File::open(blob.to_abs_path()).await?;
|
||||
let path_in_archive = PathBuf::from(BLOBS_BACKUP_NAME).join(blob.as_name());
|
||||
builder.append_file(path_in_archive, &mut file).await?;
|
||||
|
||||
written_files += 1;
|
||||
let progress = 1000 * written_files / count;
|
||||
if progress != last_progress && progress > 10 && progress < 1000 {
|
||||
// We already emitted ImexProgress(10) above
|
||||
context.emit_event(EventType::ImexProgress(progress));
|
||||
last_progress = progress;
|
||||
}
|
||||
}
|
||||
|
||||
builder.finish().await?;
|
||||
@@ -763,6 +727,242 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Exports the database to *file*, encrypted using *passphrase*.
|
||||
///
|
||||
/// The directory of *file* must already exist, if *file* itself exists it will be
|
||||
/// overwritten.
|
||||
///
|
||||
/// This also verifies that IO is not running during the export.
|
||||
async fn export_database(context: &Context, dest: &Path, passphrase: String) -> Result<()> {
|
||||
ensure!(
|
||||
context.scheduler.read().await.is_none(),
|
||||
"cannot export backup, IO is running"
|
||||
);
|
||||
let now = time().try_into().context("32-bit UNIX time overflow")?;
|
||||
|
||||
context.sql.set_raw_config_int("backup_time", now).await?;
|
||||
sql::housekeeping(context).await.ok_or_log(context);
|
||||
context
|
||||
.sql
|
||||
.execute("VACUUM;", paramsv![])
|
||||
.await
|
||||
.map_err(|e| warn!(context, "Vacuum failed, exporting anyway {}", e))
|
||||
.ok();
|
||||
context
|
||||
.sql
|
||||
.export(dest, passphrase)
|
||||
.await
|
||||
.with_context(|| format!("failed to backup database to {}", dest.display()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BackupSender {
|
||||
/// A handle to the running provider.
|
||||
provider: sendme::provider::Provider,
|
||||
/// The ticket to retrieve the backup collection.
|
||||
ticket: sendme::provider::Ticket,
|
||||
/// Token holding the "ongoing" mutex. When this completes the provider should shut
|
||||
/// down.
|
||||
cancel_token: Receiver<()>,
|
||||
}
|
||||
|
||||
impl BackupSender {
|
||||
// - [x] check i/o is not running
|
||||
// - [x] check we have secret key
|
||||
// - [x] alloc ongoing
|
||||
// - [ ] correctly cancel Provider when cancelled
|
||||
// - [x] create auth token
|
||||
// - [x] export backup with generated token as password
|
||||
// - needs a path to store the database
|
||||
// - [x] create the sendme database
|
||||
// - [x] start provider
|
||||
// - [ ] wait for one successful connection
|
||||
// - [ ] shutdown when a connection is closed, successful or not.
|
||||
//
|
||||
// - [ ] provide progress report
|
||||
/// Prepares for sending a backup to a second device.
|
||||
///
|
||||
/// Before calling this function all I/O must be stopped so that no changes to the blobs
|
||||
/// or database are happening, this is done by calling the `dc_accounts_stop_io` or
|
||||
/// `dc_stop_io` APIs first. TODO: Add the rust equivalents.
|
||||
///
|
||||
/// This will acquire the global "ongoing" mutex.
|
||||
pub async fn perpare(context: &Context, dir: &Path) -> Result<Self> {
|
||||
ensure!(
|
||||
// TODO: Should we worry about path normalisation?
|
||||
dir != context.get_blobdir(),
|
||||
"Temporary database export directory should not be in blobdir"
|
||||
);
|
||||
e2ee::ensure_secret_key_exists(context)
|
||||
.await
|
||||
.context("Private key not available, aborting backup export")?;
|
||||
|
||||
// Acquire global "ongoing" mutex.
|
||||
let cancel_token = context.alloc_ongoing().await?;
|
||||
let res = tokio::select! {
|
||||
biased;
|
||||
res = Self::prepare_inner(context, dir) => {
|
||||
match res {
|
||||
Ok(slf) => {
|
||||
// TODO: maybe this is the wrong place to log this
|
||||
// TODO: Also needs to log progress somehow.
|
||||
info!(context, "Waiting for remote to connect");
|
||||
Ok(slf)
|
||||
},
|
||||
Err(err) => {
|
||||
error!(context, "Failed to set up second device setup: {:#}", err);
|
||||
Err(err)
|
||||
},
|
||||
}
|
||||
},
|
||||
_ = cancel_token.recv() => Err(format_err!("cancelled")),
|
||||
};
|
||||
|
||||
// TODO: This is all wrong, too early to release
|
||||
context.free_ongoing().await;
|
||||
res
|
||||
}
|
||||
|
||||
async fn prepare_inner(context: &Context, dir: &Path) -> Result<Self> {
|
||||
// Generate the token up front: we also use it to encrypt the database.
|
||||
let token = sendme::protocol::AuthToken::generate();
|
||||
let dbfile = dir.join(DBFILE_BACKUP_NAME);
|
||||
export_database(context, &dbfile, token.to_string())
|
||||
.await
|
||||
.context("Database export failed")?;
|
||||
|
||||
// Now we can be sure IO is not running.
|
||||
let mut files = vec![sendme::provider::DataSource::from(dbfile)];
|
||||
let blobdir = BlobDirContents::new(context).await?;
|
||||
for blob in blobdir.iter() {
|
||||
files.push(blob.to_abs_path().into());
|
||||
}
|
||||
|
||||
// Start listening.
|
||||
let (db, hash) = sendme::provider::create_collection(files).await?;
|
||||
let provider = sendme::provider::Provider::builder(db)
|
||||
.auth_token(token)
|
||||
.spawn()?;
|
||||
let ticket = provider.ticket(hash);
|
||||
Ok(Self { provider, ticket })
|
||||
}
|
||||
|
||||
pub fn qr(&self) -> Qr {
|
||||
Qr::Backup {
|
||||
ticket: self.ticket.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn join(self) -> Result<(), JoinError> {
|
||||
// TODO: should wait for 1 transfer to complete or abort
|
||||
self.provider.join().await
|
||||
}
|
||||
|
||||
pub fn abort(&self) {
|
||||
self.provider.abort()
|
||||
}
|
||||
}
|
||||
|
||||
/// All files in the blobdir.
|
||||
///
|
||||
/// This exists so we can have a [`BlobDirIter`] which needs something to own the data of
|
||||
/// it's `&Path`. Use [`BlobDirContents::iter`] to create the iterator.
|
||||
///
|
||||
/// Additionally pre-allocating this means we get a length for progress report.
|
||||
struct BlobDirContents<'a> {
|
||||
inner: Vec<PathBuf>,
|
||||
context: &'a Context,
|
||||
}
|
||||
|
||||
impl<'a> BlobDirContents<'a> {
|
||||
async fn new(context: &'a Context) -> Result<BlobDirContents<'a>> {
|
||||
let readdir = fs::read_dir(context.get_blobdir()).await?;
|
||||
let inner = ReadDirStream::new(readdir)
|
||||
.filter_map(|entry| async move {
|
||||
match entry {
|
||||
Ok(entry) => Some(entry),
|
||||
Err(err) => {
|
||||
error!(context, "Failed to read blob file: {err}");
|
||||
None
|
||||
}
|
||||
}
|
||||
})
|
||||
.filter_map(|entry| async move {
|
||||
match entry.file_type().await.ok()?.is_file() {
|
||||
true => Some(entry.path()),
|
||||
false => {
|
||||
warn!(
|
||||
context,
|
||||
"Export: Found blob dir entry {} that is not a file, ignoring",
|
||||
entry.path().display()
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
Ok(Self { inner, context })
|
||||
}
|
||||
|
||||
fn iter(&self) -> BlobDirIter<'_> {
|
||||
BlobDirIter::new(self.context, &self.inner)
|
||||
}
|
||||
}
|
||||
|
||||
/// A stream for [`Blob`]s.
|
||||
///
|
||||
/// The stream emits [`EventType::ImexProgress`] events as it being consumed.
|
||||
///
|
||||
/// Because we like to know the total number of blobs to emit progress all the blobs are
|
||||
/// read up front and stored. Luckily this also makes our life easier, since we now only
|
||||
/// need to implement `Iterator` and not `Stream`.
|
||||
struct BlobDirIter<'a> {
|
||||
paths: &'a [PathBuf],
|
||||
offset: usize,
|
||||
last_progress: usize,
|
||||
context: &'a Context,
|
||||
}
|
||||
|
||||
impl<'a> BlobDirIter<'a> {
|
||||
fn new(context: &'a Context, paths: &'a [PathBuf]) -> BlobDirIter<'a> {
|
||||
Self {
|
||||
paths,
|
||||
offset: 0,
|
||||
last_progress: 0,
|
||||
context,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for BlobDirIter<'a> {
|
||||
type Item = BlobObject<'a>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
while let Some(path) = self.paths.get(self.offset) {
|
||||
self.offset += 1;
|
||||
|
||||
// In theory this can error but we'd have corrupted filenames in the blobdir, so
|
||||
// silently skipping them is fine.
|
||||
match BlobObject::from_path(self.context, path) {
|
||||
Ok(blob) => {
|
||||
let progress = 1000 * self.offset / self.paths.len();
|
||||
if progress != self.last_progress && progress > 10 && progress < 1000 {
|
||||
self.context.emit_event(EventType::ImexProgress(progress));
|
||||
self.last_progress = progress;
|
||||
}
|
||||
return Some(blob);
|
||||
}
|
||||
Err(err) => warn!(self.context, "{err}"),
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl FusedIterator for BlobDirIter<'_> {}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use ::pgp::armor::BlockType;
|
||||
|
||||
Reference in New Issue
Block a user