improve error handling and fix sleeps

This commit is contained in:
dignifiedquire
2020-05-21 14:35:07 +02:00
parent 9817ccebcf
commit e8e82d9760
4 changed files with 124 additions and 91 deletions

View File

@@ -3209,11 +3209,11 @@ mod tests {
.await
.unwrap()
.chat_id;
std::thread::sleep(std::time::Duration::from_millis(1000));
async_std::task::sleep(std::time::Duration::from_millis(1000)).await;
let chat_id2 = create_by_contact_id(&t.ctx, DC_CONTACT_ID_SELF)
.await
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(1000));
async_std::task::sleep(std::time::Duration::from_millis(1000)).await;
let chat_id3 = create_group_chat(&t.ctx, VerifiedStatus::Unverified, "foo")
.await
.unwrap();

View File

@@ -172,25 +172,25 @@ impl Job {
}
/// Deletes the job from the database.
async fn delete(&self, context: &Context) -> bool {
async fn delete(self, context: &Context) -> Result<()> {
if self.job_id != 0 {
context
.sql
.execute("DELETE FROM jobs WHERE id=?;", paramsv![self.job_id as i32])
.await
.is_ok()
} else {
// Already deleted.
true
.await?;
}
Ok(())
}
/// Saves the job to the database, creating a new entry if necessary.
///
/// The Job is consumed by this method.
async fn save(self, context: &Context) -> bool {
async fn save(self, context: &Context) -> Result<()> {
let thread: Thread = self.action.into();
info!(context, "saving job for {}-thread: {:?}", thread, self);
if self.job_id != 0 {
context
.sql
@@ -203,8 +203,7 @@ impl Job {
self.job_id as i32,
],
)
.await
.is_ok()
.await?;
} else {
context.sql.execute(
"INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);",
@@ -216,8 +215,10 @@ impl Job {
self.param.to_string(),
self.desired_timestamp
]
).await.is_ok()
).await?;
}
Ok(())
}
async fn smtp_send<F, Fut>(
@@ -896,7 +897,9 @@ pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_
tries,
time_offset
);
job.save(context).await;
job.save(context).await.unwrap_or_else(|err| {
error!(context, "failed to save job: {}", err);
});
} else {
info!(
context,
@@ -905,7 +908,9 @@ pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_
job,
JOB_RETRIES
);
job.delete(context).await;
job.delete(context).await.unwrap_or_else(|err| {
error!(context, "failed to delete job: {}", err);
});
}
}
Status::Finished(res) => {
@@ -921,7 +926,9 @@ pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_
);
}
job.delete(context).await;
job.delete(context).await.unwrap_or_else(|err| {
error!(context, "failed to delete job: {}", err);
});
}
}
}
@@ -1019,7 +1026,9 @@ pub async fn add(
}
let job = Job::new(action, foreign_id as u32, param, delay_seconds);
job.save(context).await;
job.save(context).await.unwrap_or_else(|err| {
error!(context, "failed to save job: {}", err);
});
if delay_seconds == 0 {
match action {
@@ -1053,76 +1062,79 @@ pub(crate) async fn load_next(
thread: Thread,
probe_network: bool,
) -> Option<Job> {
info!(context, "loading job for {}-thread", thread);
let query = if !probe_network {
// processing for first-try and after backoff-timeouts:
// process jobs in the order they were added.
"SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \
FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, added_timestamp;"
r#"
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
FROM jobs
WHERE thread=? AND desired_timestamp<=?
ORDER BY action DESC, added_timestamp
LIMIT 1;
"#
} else {
// processing after call to dc_maybe_network():
// process _all_ pending jobs that failed before
// in the order of their backoff-times.
"SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \
FROM jobs WHERE thread=? AND tries>0 ORDER BY desired_timestamp, action DESC;"
r#"
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
FROM jobs
WHERE thread=? AND tries>0
ORDER BY desired_timestamp, action DESC
LIMIT 1;
"#
};
let thread_i = thread as i64;
let t = time();
let params_no_probe = paramsv![thread_i, t];
let params_probe = paramsv![thread_i];
let params = if !probe_network {
params_no_probe
paramsv![thread_i, t]
} else {
params_probe
paramsv![thread_i]
};
let job = context
.sql
.query_map(
query,
params,
|row| {
let job = Job {
job_id: row.get(0)?,
action: row.get(1)?,
foreign_id: row.get(2)?,
desired_timestamp: row.get(5)?,
added_timestamp: row.get(4)?,
tries: row.get(6)?,
param: row.get::<_, String>(3)?.parse().unwrap_or_default(),
pending_error: None,
};
.query_row_optional(query, params, |row| {
let job = Job {
job_id: row.get(0)?,
action: row.get(1)?,
foreign_id: row.get(2)?,
desired_timestamp: row.get(5)?,
added_timestamp: row.get(4)?,
tries: row.get(6)?,
param: row.get::<_, String>(3)?.parse().unwrap_or_default(),
pending_error: None,
};
Ok(job)
},
|jobs| {
for job in jobs {
match job {
Ok(j) => return Ok(Some(j)),
Err(e) => warn!(context, "Bad job from the database: {}", e),
Ok(job)
})
.await;
match job {
Ok(job) => {
if thread == Thread::Imap {
if let Some(job) = job {
if job.action < Action::DeleteMsgOnImap {
load_imap_deletion_job(context)
.await
.unwrap_or_default()
.or(Some(job))
} else {
Some(job)
}
} else {
load_imap_deletion_job(context).await.unwrap_or_default()
}
Ok(None)
},
)
.await
.unwrap_or_default();
if thread == Thread::Imap {
if let Some(job) = job {
if job.action < Action::DeleteMsgOnImap {
load_imap_deletion_job(context)
.await
.unwrap_or_default()
.or(Some(job))
} else {
Some(job)
job
}
} else {
load_imap_deletion_job(context).await.unwrap_or_default()
}
} else {
job
Err(err) => {
warn!(context, "Bad job from the database: {}", err);
None
}
}
}

View File

@@ -252,13 +252,13 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
ctx.scheduler.write().await.set_probe_network(false);
}
Ok(None) | Err(async_std::future::TimeoutError { .. }) => {
info!(ctx, "smpt fake idle");
info!(ctx, "smtp fake idle");
// Fake Idle
async_std::task::sleep(Duration::from_secs(5))
.race(idle_interrupt_receiver.recv().map(|_| {
info!(ctx, "smtp idle interrupt");
}))
.await;
idle_interrupt_receiver
.recv()
.timeout(Duration::from_secs(5))
.await
.ok();
}
}
}
@@ -384,28 +384,36 @@ impl Scheduler {
async fn interrupt_inbox(&self) {
match self {
Scheduler::Running { ref inbox, .. } => inbox.interrupt().await,
Scheduler::Running { ref inbox, .. } => {
inbox.interrupt().await.ok();
}
_ => {}
}
}
async fn interrupt_mvbox(&self) {
match self {
Scheduler::Running { ref mvbox, .. } => mvbox.interrupt().await,
Scheduler::Running { ref mvbox, .. } => {
mvbox.interrupt().await.ok();
}
_ => {}
}
}
async fn interrupt_sentbox(&self) {
match self {
Scheduler::Running { ref sentbox, .. } => sentbox.interrupt().await,
Scheduler::Running { ref sentbox, .. } => {
sentbox.interrupt().await.ok();
}
_ => {}
}
}
async fn interrupt_smtp(&self) {
match self {
Scheduler::Running { ref smtp, .. } => smtp.interrupt().await,
Scheduler::Running { ref smtp, .. } => {
smtp.interrupt().await.ok();
}
_ => {}
}
}
@@ -487,9 +495,9 @@ impl ConnectionState {
self.shutdown_receiver.recv().await.ok();
}
async fn interrupt(&self) {
async fn interrupt(&self) -> Result<(), async_std::sync::TrySendError<()>> {
// Use try_send to avoid blocking on interrupts.
self.idle_interrupt_sender.try_send(()).ok();
self.idle_interrupt_sender.try_send(())
}
}
@@ -523,8 +531,8 @@ impl SmtpConnectionState {
}
/// Interrupt any form of idle.
async fn interrupt(&self) {
self.state.interrupt().await;
async fn interrupt(&self) -> Result<(), async_std::sync::TrySendError<()>> {
self.state.interrupt().await
}
/// Shutdown this connection completely.
@@ -571,8 +579,8 @@ impl ImapConnectionState {
}
/// Interrupt any form of idle.
async fn interrupt(&self) {
self.state.interrupt().await;
async fn interrupt(&self) -> Result<(), async_std::sync::TrySendError<()>> {
self.state.interrupt().await
}
/// Shutdown this connection completely.

View File

@@ -258,7 +258,7 @@ async fn securejoin(context: &Context, qr: &str) -> ChatId {
let own_fingerprint = get_self_fingerprint(context).await.unwrap_or_default();
// Bob -> Alice
send_handshake_msg(
if let Err(err) = send_handshake_msg(
context,
contact_chat_id,
if join_vg {
@@ -274,12 +274,16 @@ async fn securejoin(context: &Context, qr: &str) -> ChatId {
"".to_string()
},
)
.await;
.await
{
error!(context, "failed to send handshake message: {}", err);
return cleanup(&context, contact_chat_id, true, join_vg).await;
}
} else {
context.bob.write().await.expects = DC_VC_AUTH_REQUIRED;
// Bob -> Alice
send_handshake_msg(
if let Err(err) = send_handshake_msg(
context,
contact_chat_id,
if join_vg { "vg-request" } else { "vc-request" },
@@ -287,13 +291,17 @@ async fn securejoin(context: &Context, qr: &str) -> ChatId {
None,
"",
)
.await;
.await
{
error!(context, "failed to send handshake message: {}", err);
return cleanup(&context, contact_chat_id, true, join_vg).await;
}
}
if join_vg {
// for a group-join, wait until the secure-join is done and the group is created
while !context.shall_stop_ongoing().await {
std::thread::sleep(std::time::Duration::from_millis(200));
async_std::task::sleep(std::time::Duration::from_millis(200)).await;
}
cleanup(&context, contact_chat_id, true, join_vg).await
} else {
@@ -311,7 +319,7 @@ async fn send_handshake_msg(
param2: impl AsRef<str>,
fingerprint: Option<String>,
grpid: impl AsRef<str>,
) {
) -> Result<(), HandshakeError> {
let mut msg = Message::default();
msg.viewtype = Viewtype::Text;
msg.text = Some(format!("Secure-Join: {}", step));
@@ -339,10 +347,12 @@ async fn send_handshake_msg(
} else {
msg.param.set_int(Param::GuaranteeE2ee, 1);
}
// TODO. handle cleanup on error
chat::send_msg(context, contact_chat_id, &mut msg)
.await
.unwrap_or_default();
.map_err(|err| HandshakeError::MsgSendFailed(err))?;
Ok(())
}
async fn chat_id_2_contact_id(context: &Context, contact_chat_id: ChatId) -> u32 {
@@ -391,6 +401,8 @@ pub(crate) enum HandshakeError {
ChatNotFound { group: String },
#[error("No configured self address found")]
NoSelfAddr,
#[error("Failed to send message")]
MsgSendFailed(#[source] Error),
}
/// What to do with a Secure-Join handshake message after it was handled.
@@ -487,7 +499,7 @@ pub(crate) async fn handle_securejoin_handshake(
None,
"",
)
.await;
.await?;
Ok(HandshakeMessage::Done)
}
"vg-auth-required" | "vc-auth-required" => {
@@ -559,7 +571,7 @@ pub(crate) async fn handle_securejoin_handshake(
"".to_string()
},
)
.await;
.await?;
Ok(HandshakeMessage::Done)
}
"vg-request-with-auth" | "vc-request-with-auth" => {
@@ -670,7 +682,8 @@ pub(crate) async fn handle_securejoin_handshake(
Some(fingerprint.clone()),
"",
)
.await;
.await?;
inviter_progress!(context, contact_id, 1000);
}
Ok(HandshakeMessage::Ignore) // "Done" would delete the message and break multi-device (the key from Autocrypt-header is needed)
@@ -779,7 +792,7 @@ pub(crate) async fn handle_securejoin_handshake(
Some(scanned_fingerprint_of_alice),
"",
)
.await;
.await?;
context.bob.write().await.status = 1;
context.stop_ongoing().await;