diff --git a/src/chat.rs b/src/chat.rs index 15f77375c..546f9737b 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -3209,11 +3209,11 @@ mod tests { .await .unwrap() .chat_id; - std::thread::sleep(std::time::Duration::from_millis(1000)); + async_std::task::sleep(std::time::Duration::from_millis(1000)).await; let chat_id2 = create_by_contact_id(&t.ctx, DC_CONTACT_ID_SELF) .await .unwrap(); - std::thread::sleep(std::time::Duration::from_millis(1000)); + async_std::task::sleep(std::time::Duration::from_millis(1000)).await; let chat_id3 = create_group_chat(&t.ctx, VerifiedStatus::Unverified, "foo") .await .unwrap(); diff --git a/src/job.rs b/src/job.rs index 7da9b6186..b80bf9607 100644 --- a/src/job.rs +++ b/src/job.rs @@ -172,25 +172,25 @@ impl Job { } /// Deletes the job from the database. - async fn delete(&self, context: &Context) -> bool { + async fn delete(self, context: &Context) -> Result<()> { if self.job_id != 0 { context .sql .execute("DELETE FROM jobs WHERE id=?;", paramsv![self.job_id as i32]) - .await - .is_ok() - } else { - // Already deleted. - true + .await?; } + + Ok(()) } /// Saves the job to the database, creating a new entry if necessary. /// /// The Job is consumed by this method. - async fn save(self, context: &Context) -> bool { + async fn save(self, context: &Context) -> Result<()> { let thread: Thread = self.action.into(); + info!(context, "saving job for {}-thread: {:?}", thread, self); + if self.job_id != 0 { context .sql @@ -203,8 +203,7 @@ impl Job { self.job_id as i32, ], ) - .await - .is_ok() + .await?; } else { context.sql.execute( "INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);", @@ -216,8 +215,10 @@ impl Job { self.param.to_string(), self.desired_timestamp ] - ).await.is_ok() + ).await?; } + + Ok(()) } async fn smtp_send( @@ -896,7 +897,9 @@ pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_ tries, time_offset ); - job.save(context).await; + job.save(context).await.unwrap_or_else(|err| { + error!(context, "failed to save job: {}", err); + }); } else { info!( context, @@ -905,7 +908,9 @@ pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_ job, JOB_RETRIES ); - job.delete(context).await; + job.delete(context).await.unwrap_or_else(|err| { + error!(context, "failed to delete job: {}", err); + }); } } Status::Finished(res) => { @@ -921,7 +926,9 @@ pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_ ); } - job.delete(context).await; + job.delete(context).await.unwrap_or_else(|err| { + error!(context, "failed to delete job: {}", err); + }); } } } @@ -1019,7 +1026,9 @@ pub async fn add( } let job = Job::new(action, foreign_id as u32, param, delay_seconds); - job.save(context).await; + job.save(context).await.unwrap_or_else(|err| { + error!(context, "failed to save job: {}", err); + }); if delay_seconds == 0 { match action { @@ -1053,76 +1062,79 @@ pub(crate) async fn load_next( thread: Thread, probe_network: bool, ) -> Option { + info!(context, "loading job for {}-thread", thread); let query = if !probe_network { // processing for first-try and after backoff-timeouts: // process jobs in the order they were added. - "SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \ - FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, added_timestamp;" + r#" +SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries +FROM jobs +WHERE thread=? AND desired_timestamp<=? +ORDER BY action DESC, added_timestamp +LIMIT 1; +"# } else { // processing after call to dc_maybe_network(): // process _all_ pending jobs that failed before // in the order of their backoff-times. - "SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \ - FROM jobs WHERE thread=? AND tries>0 ORDER BY desired_timestamp, action DESC;" + r#" +SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries +FROM jobs +WHERE thread=? AND tries>0 +ORDER BY desired_timestamp, action DESC +LIMIT 1; +"# }; let thread_i = thread as i64; let t = time(); - let params_no_probe = paramsv![thread_i, t]; - let params_probe = paramsv![thread_i]; let params = if !probe_network { - params_no_probe + paramsv![thread_i, t] } else { - params_probe + paramsv![thread_i] }; let job = context .sql - .query_map( - query, - params, - |row| { - let job = Job { - job_id: row.get(0)?, - action: row.get(1)?, - foreign_id: row.get(2)?, - desired_timestamp: row.get(5)?, - added_timestamp: row.get(4)?, - tries: row.get(6)?, - param: row.get::<_, String>(3)?.parse().unwrap_or_default(), - pending_error: None, - }; + .query_row_optional(query, params, |row| { + let job = Job { + job_id: row.get(0)?, + action: row.get(1)?, + foreign_id: row.get(2)?, + desired_timestamp: row.get(5)?, + added_timestamp: row.get(4)?, + tries: row.get(6)?, + param: row.get::<_, String>(3)?.parse().unwrap_or_default(), + pending_error: None, + }; - Ok(job) - }, - |jobs| { - for job in jobs { - match job { - Ok(j) => return Ok(Some(j)), - Err(e) => warn!(context, "Bad job from the database: {}", e), + Ok(job) + }) + .await; + + match job { + Ok(job) => { + if thread == Thread::Imap { + if let Some(job) = job { + if job.action < Action::DeleteMsgOnImap { + load_imap_deletion_job(context) + .await + .unwrap_or_default() + .or(Some(job)) + } else { + Some(job) } + } else { + load_imap_deletion_job(context).await.unwrap_or_default() } - Ok(None) - }, - ) - .await - .unwrap_or_default(); - - if thread == Thread::Imap { - if let Some(job) = job { - if job.action < Action::DeleteMsgOnImap { - load_imap_deletion_job(context) - .await - .unwrap_or_default() - .or(Some(job)) } else { - Some(job) + job } - } else { - load_imap_deletion_job(context).await.unwrap_or_default() } - } else { - job + Err(err) => { + warn!(context, "Bad job from the database: {}", err); + None + } } } diff --git a/src/scheduler.rs b/src/scheduler.rs index f6444e386..7573f9c41 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -252,13 +252,13 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect ctx.scheduler.write().await.set_probe_network(false); } Ok(None) | Err(async_std::future::TimeoutError { .. }) => { - info!(ctx, "smpt fake idle"); + info!(ctx, "smtp fake idle"); // Fake Idle - async_std::task::sleep(Duration::from_secs(5)) - .race(idle_interrupt_receiver.recv().map(|_| { - info!(ctx, "smtp idle interrupt"); - })) - .await; + idle_interrupt_receiver + .recv() + .timeout(Duration::from_secs(5)) + .await + .ok(); } } } @@ -384,28 +384,36 @@ impl Scheduler { async fn interrupt_inbox(&self) { match self { - Scheduler::Running { ref inbox, .. } => inbox.interrupt().await, + Scheduler::Running { ref inbox, .. } => { + inbox.interrupt().await.ok(); + } _ => {} } } async fn interrupt_mvbox(&self) { match self { - Scheduler::Running { ref mvbox, .. } => mvbox.interrupt().await, + Scheduler::Running { ref mvbox, .. } => { + mvbox.interrupt().await.ok(); + } _ => {} } } async fn interrupt_sentbox(&self) { match self { - Scheduler::Running { ref sentbox, .. } => sentbox.interrupt().await, + Scheduler::Running { ref sentbox, .. } => { + sentbox.interrupt().await.ok(); + } _ => {} } } async fn interrupt_smtp(&self) { match self { - Scheduler::Running { ref smtp, .. } => smtp.interrupt().await, + Scheduler::Running { ref smtp, .. } => { + smtp.interrupt().await.ok(); + } _ => {} } } @@ -487,9 +495,9 @@ impl ConnectionState { self.shutdown_receiver.recv().await.ok(); } - async fn interrupt(&self) { + async fn interrupt(&self) -> Result<(), async_std::sync::TrySendError<()>> { // Use try_send to avoid blocking on interrupts. - self.idle_interrupt_sender.try_send(()).ok(); + self.idle_interrupt_sender.try_send(()) } } @@ -523,8 +531,8 @@ impl SmtpConnectionState { } /// Interrupt any form of idle. - async fn interrupt(&self) { - self.state.interrupt().await; + async fn interrupt(&self) -> Result<(), async_std::sync::TrySendError<()>> { + self.state.interrupt().await } /// Shutdown this connection completely. @@ -571,8 +579,8 @@ impl ImapConnectionState { } /// Interrupt any form of idle. - async fn interrupt(&self) { - self.state.interrupt().await; + async fn interrupt(&self) -> Result<(), async_std::sync::TrySendError<()>> { + self.state.interrupt().await } /// Shutdown this connection completely. diff --git a/src/securejoin.rs b/src/securejoin.rs index c0d14802c..2a46fad88 100644 --- a/src/securejoin.rs +++ b/src/securejoin.rs @@ -258,7 +258,7 @@ async fn securejoin(context: &Context, qr: &str) -> ChatId { let own_fingerprint = get_self_fingerprint(context).await.unwrap_or_default(); // Bob -> Alice - send_handshake_msg( + if let Err(err) = send_handshake_msg( context, contact_chat_id, if join_vg { @@ -274,12 +274,16 @@ async fn securejoin(context: &Context, qr: &str) -> ChatId { "".to_string() }, ) - .await; + .await + { + error!(context, "failed to send handshake message: {}", err); + return cleanup(&context, contact_chat_id, true, join_vg).await; + } } else { context.bob.write().await.expects = DC_VC_AUTH_REQUIRED; // Bob -> Alice - send_handshake_msg( + if let Err(err) = send_handshake_msg( context, contact_chat_id, if join_vg { "vg-request" } else { "vc-request" }, @@ -287,13 +291,17 @@ async fn securejoin(context: &Context, qr: &str) -> ChatId { None, "", ) - .await; + .await + { + error!(context, "failed to send handshake message: {}", err); + return cleanup(&context, contact_chat_id, true, join_vg).await; + } } if join_vg { // for a group-join, wait until the secure-join is done and the group is created while !context.shall_stop_ongoing().await { - std::thread::sleep(std::time::Duration::from_millis(200)); + async_std::task::sleep(std::time::Duration::from_millis(200)).await; } cleanup(&context, contact_chat_id, true, join_vg).await } else { @@ -311,7 +319,7 @@ async fn send_handshake_msg( param2: impl AsRef, fingerprint: Option, grpid: impl AsRef, -) { +) -> Result<(), HandshakeError> { let mut msg = Message::default(); msg.viewtype = Viewtype::Text; msg.text = Some(format!("Secure-Join: {}", step)); @@ -339,10 +347,12 @@ async fn send_handshake_msg( } else { msg.param.set_int(Param::GuaranteeE2ee, 1); } - // TODO. handle cleanup on error + chat::send_msg(context, contact_chat_id, &mut msg) .await - .unwrap_or_default(); + .map_err(|err| HandshakeError::MsgSendFailed(err))?; + + Ok(()) } async fn chat_id_2_contact_id(context: &Context, contact_chat_id: ChatId) -> u32 { @@ -391,6 +401,8 @@ pub(crate) enum HandshakeError { ChatNotFound { group: String }, #[error("No configured self address found")] NoSelfAddr, + #[error("Failed to send message")] + MsgSendFailed(#[source] Error), } /// What to do with a Secure-Join handshake message after it was handled. @@ -487,7 +499,7 @@ pub(crate) async fn handle_securejoin_handshake( None, "", ) - .await; + .await?; Ok(HandshakeMessage::Done) } "vg-auth-required" | "vc-auth-required" => { @@ -559,7 +571,7 @@ pub(crate) async fn handle_securejoin_handshake( "".to_string() }, ) - .await; + .await?; Ok(HandshakeMessage::Done) } "vg-request-with-auth" | "vc-request-with-auth" => { @@ -670,7 +682,8 @@ pub(crate) async fn handle_securejoin_handshake( Some(fingerprint.clone()), "", ) - .await; + .await?; + inviter_progress!(context, contact_id, 1000); } Ok(HandshakeMessage::Ignore) // "Done" would delete the message and break multi-device (the key from Autocrypt-header is needed) @@ -779,7 +792,7 @@ pub(crate) async fn handle_securejoin_handshake( Some(scanned_fingerprint_of_alice), "", ) - .await; + .await?; context.bob.write().await.status = 1; context.stop_ongoing().await;