mirror of
https://github.com/chatmail/core.git
synced 2026-05-17 05:46:30 +03:00
Add progress for provider
Fix progress for getter. Maths. It's hard. Add test for progress.
This commit is contained in:
@@ -127,10 +127,12 @@ impl BackupProvider {
|
|||||||
async fn prepare_inner(context: &Context, dir: &Path) -> Result<(Provider, Ticket)> {
|
async fn prepare_inner(context: &Context, dir: &Path) -> Result<(Provider, Ticket)> {
|
||||||
// Generate the token up front: we also use it to encrypt the database.
|
// Generate the token up front: we also use it to encrypt the database.
|
||||||
let token = AuthToken::generate();
|
let token = AuthToken::generate();
|
||||||
|
context.emit_event(SendProgress::Started.into());
|
||||||
let dbfile = dir.join(DBFILE_BACKUP_NAME);
|
let dbfile = dir.join(DBFILE_BACKUP_NAME);
|
||||||
export_database(context, &dbfile, token.to_string())
|
export_database(context, &dbfile, token.to_string())
|
||||||
.await
|
.await
|
||||||
.context("Database export failed")?;
|
.context("Database export failed")?;
|
||||||
|
context.emit_event(SendProgress::DatabaseExported.into());
|
||||||
|
|
||||||
// Now we can be sure IO is not running.
|
// Now we can be sure IO is not running.
|
||||||
let mut files = vec![DataSource::with_name(
|
let mut files = vec![DataSource::with_name(
|
||||||
@@ -146,7 +148,9 @@ impl BackupProvider {
|
|||||||
|
|
||||||
// Start listening.
|
// Start listening.
|
||||||
let (db, hash) = sendme::provider::create_collection(files).await?;
|
let (db, hash) = sendme::provider::create_collection(files).await?;
|
||||||
|
context.emit_event(SendProgress::CollectionCreated.into());
|
||||||
let provider = Provider::builder(db).auth_token(token).spawn()?;
|
let provider = Provider::builder(db).auth_token(token).spawn()?;
|
||||||
|
context.emit_event(SendProgress::ProviderListening.into());
|
||||||
let ticket = provider.ticket(hash);
|
let ticket = provider.ticket(hash);
|
||||||
Ok((provider, ticket))
|
Ok((provider, ticket))
|
||||||
}
|
}
|
||||||
@@ -165,6 +169,7 @@ impl BackupProvider {
|
|||||||
mut provider: Provider,
|
mut provider: Provider,
|
||||||
cancel_token: Receiver<()>,
|
cancel_token: Receiver<()>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
context.emit_event(SendProgress::ProviderListening.into());
|
||||||
let mut events = provider.subscribe();
|
let mut events = provider.subscribe();
|
||||||
let res = loop {
|
let res = loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
@@ -176,14 +181,21 @@ impl BackupProvider {
|
|||||||
match maybe_event {
|
match maybe_event {
|
||||||
Ok(event) => {
|
Ok(event) => {
|
||||||
match event {
|
match event {
|
||||||
|
Event::ClientConnected { ..} => {
|
||||||
|
context.emit_event(SendProgress::ClientConnected.into());
|
||||||
|
}
|
||||||
|
Event::RequestReceived { .. } => {
|
||||||
|
context.emit_event(SendProgress::TransferStarted.into());
|
||||||
|
}
|
||||||
Event::TransferCompleted { .. } => {
|
Event::TransferCompleted { .. } => {
|
||||||
|
context.emit_event(SendProgress::TransferFinished.into());
|
||||||
provider.shutdown();
|
provider.shutdown();
|
||||||
}
|
}
|
||||||
Event::TransferAborted { .. } => {
|
Event::TransferAborted { .. } => {
|
||||||
|
context.emit_event(SendProgress::Failed.into());
|
||||||
provider.shutdown();
|
provider.shutdown();
|
||||||
break Err(anyhow!("BackupSender transfer aborted"));
|
break Err(anyhow!("BackupSender transfer aborted"));
|
||||||
}
|
}
|
||||||
_ => (),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(broadcast::error::RecvError::Closed) => {
|
Err(broadcast::error::RecvError::Closed) => {
|
||||||
@@ -204,6 +216,7 @@ impl BackupProvider {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
context.emit_event(SendProgress::Completed.into());
|
||||||
context.free_ongoing().await;
|
context.free_ongoing().await;
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
@@ -224,6 +237,29 @@ impl BackupProvider {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create [`EventType::ImexProgress`] events using readable names.
|
||||||
|
///
|
||||||
|
/// Plus you get warnings if you don't use all variants.
|
||||||
|
#[derive(Debug)]
|
||||||
|
#[repr(u16)]
|
||||||
|
enum SendProgress {
|
||||||
|
Failed = 0,
|
||||||
|
Started = 100,
|
||||||
|
DatabaseExported = 300,
|
||||||
|
CollectionCreated = 400,
|
||||||
|
ProviderListening = 500,
|
||||||
|
ClientConnected = 600,
|
||||||
|
TransferStarted = 650,
|
||||||
|
TransferFinished = 950,
|
||||||
|
Completed = 1000,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<SendProgress> for EventType {
|
||||||
|
fn from(source: SendProgress) -> Self {
|
||||||
|
Self::ImexProgress((source as u16).into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Contacts a backup provider and receives the backup from it.
|
/// Contacts a backup provider and receives the backup from it.
|
||||||
///
|
///
|
||||||
/// This uses a QR code to contact another instance of deltachat which is providing a backup
|
/// This uses a QR code to contact another instance of deltachat which is providing a backup
|
||||||
@@ -359,6 +395,7 @@ fn spawn_progress_proxy(context: Context, mut rx: broadcast::Receiver<u16>) {
|
|||||||
/// Create [`EventType::ImexProgress`] events using readable names.
|
/// Create [`EventType::ImexProgress`] events using readable names.
|
||||||
///
|
///
|
||||||
/// Plus you get warnings if you don't use all variants.
|
/// Plus you get warnings if you don't use all variants.
|
||||||
|
#[derive(Debug)]
|
||||||
enum ReceiveProgress {
|
enum ReceiveProgress {
|
||||||
Connected,
|
Connected,
|
||||||
CollectionRecieved,
|
CollectionRecieved,
|
||||||
@@ -456,7 +493,7 @@ impl InnerProgressEmitter {
|
|||||||
let prev_count = self.count.fetch_add(amount, Ordering::Relaxed);
|
let prev_count = self.count.fetch_add(amount, Ordering::Relaxed);
|
||||||
let count = prev_count + amount;
|
let count = prev_count + amount;
|
||||||
let total = self.total.load(Ordering::Relaxed);
|
let total = self.total.load(Ordering::Relaxed);
|
||||||
let step = (total * u64::from(self.steps) / std::cmp::min(count, total)) as u16;
|
let step = (std::cmp::min(count, total) * u64::from(self.steps) / total) as u16;
|
||||||
let last_step = self.last_step.swap(step, Ordering::Relaxed);
|
let last_step = self.last_step.swap(step, Ordering::Relaxed);
|
||||||
if step > last_step {
|
if step > last_step {
|
||||||
self.tx.send(step).ok();
|
self.tx.send(step).ok();
|
||||||
@@ -530,16 +567,10 @@ mod tests {
|
|||||||
send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
|
send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
|
||||||
|
|
||||||
// Prepare to transfer backup.
|
// Prepare to transfer backup.
|
||||||
ctx0.stop_io().await;
|
|
||||||
let provider = BackupProvider::prepare(&ctx0, &dir).await.unwrap();
|
let provider = BackupProvider::prepare(&ctx0, &dir).await.unwrap();
|
||||||
|
|
||||||
// Set up second device.
|
// Set up second device.
|
||||||
let ctx1 = tcm.bob().await;
|
let ctx1 = tcm.unconfigured().await;
|
||||||
ctx1.stop_io().await;
|
|
||||||
ctx1.sql
|
|
||||||
.set_raw_config_bool("configured", false)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
get_backup(&ctx1, provider.qr()).await.unwrap();
|
get_backup(&ctx1, provider.qr()).await.unwrap();
|
||||||
|
|
||||||
// Make sure the provider finishes without an error.
|
// Make sure the provider finishes without an error.
|
||||||
@@ -559,5 +590,13 @@ mod tests {
|
|||||||
let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
|
let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
|
||||||
let text = msg.get_text().unwrap();
|
let text = msg.get_text().unwrap();
|
||||||
assert_eq!(text, "hi there");
|
assert_eq!(text, "hi there");
|
||||||
|
|
||||||
|
// Check that both received the ImexProgress events.
|
||||||
|
ctx0.evtracker
|
||||||
|
.get_matching(|ev| matches!(ev, EventType::ImexProgress(1000)))
|
||||||
|
.await;
|
||||||
|
ctx1.evtracker
|
||||||
|
.get_matching(|ev| matches!(ev, EventType::ImexProgress(1000)))
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -40,6 +40,11 @@ pub const AVATAR_900x900_BYTES: &[u8] = include_bytes!("../test-data/image/avata
|
|||||||
static CONTEXT_NAMES: Lazy<std::sync::RwLock<BTreeMap<u32, String>>> =
|
static CONTEXT_NAMES: Lazy<std::sync::RwLock<BTreeMap<u32, String>>> =
|
||||||
Lazy::new(|| std::sync::RwLock::new(BTreeMap::new()));
|
Lazy::new(|| std::sync::RwLock::new(BTreeMap::new()));
|
||||||
|
|
||||||
|
/// Manage multiple [`TestContext`]s in one place.
|
||||||
|
///
|
||||||
|
/// The main advantage is that the log records of the contexts will appear in the order they
|
||||||
|
/// occurred rather than grouped by context like would happen when you use separate
|
||||||
|
/// [`TestContext`]s without managing your own [`LogSink`].
|
||||||
pub struct TestContextManager {
|
pub struct TestContextManager {
|
||||||
log_tx: Sender<LogEvent>,
|
log_tx: Sender<LogEvent>,
|
||||||
_log_sink: LogSink,
|
_log_sink: LogSink,
|
||||||
|
|||||||
Reference in New Issue
Block a user