diff --git a/examples/simple.rs b/examples/simple.rs index c4d96f50f..56b836b03 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -9,10 +9,6 @@ use deltachat::chatlist::*; use deltachat::config; use deltachat::contact::*; use deltachat::context::*; -use deltachat::job::{ - perform_inbox_fetch, perform_inbox_idle, perform_inbox_jobs, perform_smtp_idle, - perform_smtp_jobs, -}; use deltachat::Event; fn cb(_ctx: &Context, event: Event) { @@ -39,38 +35,11 @@ async fn main() { let ctx = Context::new("FakeOs".into(), dbfile.into()) .await .expect("Failed to create context"); - let running = Arc::new(RwLock::new(true)); let info = ctx.get_info().await; let duration = time::Duration::from_millis(4000); println!("info: {:#?}", info); - let ctx = Arc::new(ctx); - let r1 = running.clone(); - - let ctx1 = ctx.clone(); - let t1 = async_std::task::spawn(async move { - while *r1.read().await { - perform_inbox_jobs(&ctx1).await; - if *r1.read().await { - perform_inbox_fetch(&ctx1).await; - - if *r1.read().await { - perform_inbox_idle(&ctx1).await; - } - } - } - }); - - let r1 = running.clone(); - let ctx1 = ctx.clone(); - let t2 = async_std::task::spawn(async move { - while *r1.read().await { - perform_smtp_jobs(&ctx1).await; - if *r1.read().await { - perform_smtp_idle(&ctx1).await; - } - } - }); + ctx.run().await; println!("configuring"); let args = std::env::args().collect::>(); @@ -107,15 +76,7 @@ async fn main() { async_std::task::sleep(duration).await; - println!("stopping threads"); - - *running.write().await = false; - deltachat::job::interrupt_inbox_idle(&ctx).await; - deltachat::job::interrupt_smtp_idle(&ctx).await; - - println!("joining"); - t1.await; - t2.await; - + println!("stopping"); + ctx.stop().await; println!("closing"); } diff --git a/src/config.rs b/src/config.rs index 66197c970..76832d014 100644 --- a/src/config.rs +++ b/src/config.rs @@ -152,17 +152,17 @@ impl Context { } Config::InboxWatch => { let ret = self.sql.set_raw_config(self, key, value).await; - interrupt_inbox_idle(self).await; + self.interrupt_inbox().await; ret } Config::SentboxWatch => { let ret = self.sql.set_raw_config(self, key, value).await; - interrupt_sentbox_idle(self).await; + self.interrupt_sentbox().await; ret } Config::MvboxWatch => { let ret = self.sql.set_raw_config(self, key, value).await; - interrupt_mvbox_idle(self).await; + self.interrupt_mvbox().await; ret } Config::Selfstatus => { diff --git a/src/configure/mod.rs b/src/configure/mod.rs index 726883e66..1fcf132b2 100644 --- a/src/configure/mod.rs +++ b/src/configure/mod.rs @@ -37,8 +37,8 @@ impl Context { warn!(self, "There is already another ongoing process running.",); return; } - job::kill_action(self, job::Action::ConfigureImap).await; - job::add(self, job::Action::ConfigureImap, 0, Params::new(), 0).await; + // job::kill_action(self, job::Action::ConfigureImap).await; + // job::add(self, job::Action::ConfigureImap, 0, Params::new(), 0).await; } /// Checks if the context is already configured. diff --git a/src/context.rs b/src/context.rs index ef3a77230..6499c2a26 100644 --- a/src/context.rs +++ b/src/context.rs @@ -26,7 +26,7 @@ use crate::scheduler::Scheduler; use crate::smtp::Smtp; use crate::sql::Sql; -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct Context { pub(crate) inner: Arc, } @@ -131,7 +131,7 @@ impl Context { } pub async fn run(&self) { - self.inner.scheduler.write().await.run().await + self.inner.scheduler.write().await.run(self.clone()).await } pub async fn stop(&self) { diff --git a/src/imex.rs b/src/imex.rs index ae5589321..209ea856b 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -77,8 +77,8 @@ pub async fn imex(context: &Context, what: ImexMode, param1: Option for Thread { MarkseenMdnOnImap => Thread::Imap, MarkseenMsgOnImap => Thread::Imap, MoveMsg => Thread::Imap, - ConfigureImap => Thread::Imap, - ImexImap => Thread::Imap, MaybeSendLocations => Thread::Smtp, MaybeSendLocationsEnded => Thread::Smtp, @@ -179,132 +176,127 @@ impl Job { recipients: Vec, message: Vec, job_id: u32, + smtp: &mut Smtp, success_cb: F, ) -> Status where F: FnOnce() -> Fut, Fut: Future>, { - unimplemented!(); - // // hold the smtp lock during sending of a job and - // // its ok/error response processing. Note that if a message - // // was sent we need to mark it in the database ASAP as we - // // otherwise might send it twice. - // if std::env::var(crate::DCC_MIME_DEBUG).is_ok() { - // info!(context, "smtp-sending out mime message:"); - // println!("{}", String::from_utf8_lossy(&message)); - // } - // match context - // .smtp - // .send(context, recipients, message, job_id) - // .await - // { - // Err(crate::smtp::send::Error::SendError(err)) => { - // // Remote error, retry later. - // warn!(context, "SMTP failed to send: {}", err); - // self.pending_error = Some(err.to_string()); + // hold the smtp lock during sending of a job and + // its ok/error response processing. Note that if a message + // was sent we need to mark it in the database ASAP as we + // otherwise might send it twice. + if std::env::var(crate::DCC_MIME_DEBUG).is_ok() { + info!(context, "smtp-sending out mime message:"); + println!("{}", String::from_utf8_lossy(&message)); + } + match smtp.send(context, recipients, message, job_id).await { + Err(crate::smtp::send::Error::SendError(err)) => { + // Remote error, retry later. + warn!(context, "SMTP failed to send: {}", err); + self.pending_error = Some(err.to_string()); - // let res = match err { - // async_smtp::smtp::error::Error::Permanent(_) => { - // Status::Finished(Err(format_err!("Permanent SMTP error: {}", err))) - // } - // async_smtp::smtp::error::Error::Transient(_) => { - // // We got a transient 4xx response from SMTP server. - // // Give some time until the server-side error maybe goes away. - // Status::RetryLater - // } - // _ => { - // if context.smtp.has_maybe_stale_connection().await { - // info!(context, "stale connection? immediately reconnecting"); - // Status::RetryNow - // } else { - // Status::RetryLater - // } - // } - // }; + let res = match err { + async_smtp::smtp::error::Error::Permanent(_) => { + Status::Finished(Err(format_err!("Permanent SMTP error: {}", err))) + } + async_smtp::smtp::error::Error::Transient(_) => { + // We got a transient 4xx response from SMTP server. + // Give some time until the server-side error maybe goes away. + Status::RetryLater + } + _ => { + if smtp.has_maybe_stale_connection().await { + info!(context, "stale connection? immediately reconnecting"); + Status::RetryNow + } else { + Status::RetryLater + } + } + }; - // // this clears last_success info - // context.smtp.disconnect().await; + // this clears last_success info + smtp.disconnect().await; - // res - // } - // Err(crate::smtp::send::Error::EnvelopeError(err)) => { - // // Local error, job is invalid, do not retry. - // context.smtp.disconnect().await; - // warn!(context, "SMTP job is invalid: {}", err); - // Status::Finished(Err(Error::SmtpError(err))) - // } - // Err(crate::smtp::send::Error::NoTransport) => { - // // Should never happen. - // // It does not even make sense to disconnect here. - // error!(context, "SMTP job failed because SMTP has no transport"); - // Status::Finished(Err(format_err!("SMTP has not transport"))) - // } - // Ok(()) => { - // job_try!(success_cb().await); - // Status::Finished(Ok(())) - // } - // } + res + } + Err(crate::smtp::send::Error::EnvelopeError(err)) => { + // Local error, job is invalid, do not retry. + smtp.disconnect().await; + warn!(context, "SMTP job is invalid: {}", err); + Status::Finished(Err(Error::SmtpError(err))) + } + Err(crate::smtp::send::Error::NoTransport) => { + // Should never happen. + // It does not even make sense to disconnect here. + error!(context, "SMTP job failed because SMTP has no transport"); + Status::Finished(Err(format_err!("SMTP has not transport"))) + } + Ok(()) => { + job_try!(success_cb().await); + Status::Finished(Ok(())) + } + } } - async fn send_msg_to_smtp(&mut self, context: &Context) -> Status { - unimplemented!(); - // connect to// SMTP server, if not yet done - // if !context.smtp.is_connected().await { - // let loginparam = LoginParam::from_database(context, "configured_").await; - // if let Err(err) = context.smtp.connect(context, &loginparam).await { - // warn!(context, "SMTP connection failure: {:?}", err); - // return Status::RetryLater; - // } - // } + async fn send_msg_to_smtp(&mut self, context: &Context, smtp: &mut Smtp) -> Status { + // SMTP server, if not yet done + if !smtp.is_connected().await { + let loginparam = LoginParam::from_database(context, "configured_").await; + if let Err(err) = smtp.connect(context, &loginparam).await { + warn!(context, "SMTP connection failure: {:?}", err); + return Status::RetryLater; + } + } - // let filename = job_try!(job_try!(self - // .param - // .get_path(Param::File, context) - // .map_err(|_| format_err!("Can't get filename"))) - // .ok_or_else(|| format_err!("Can't get filename"))); - // let body = job_try!(dc_read_file(context, &filename).await); - // let recipients = job_try!(self.param.get(Param::Recipients).ok_or_else(|| { - // warn!(context, "Missing recipients for job {}", self.job_id); - // format_err!("Missing recipients") - // })); + let filename = job_try!(job_try!(self + .param + .get_path(Param::File, context) + .map_err(|_| format_err!("Can't get filename"))) + .ok_or_else(|| format_err!("Can't get filename"))); + let body = job_try!(dc_read_file(context, &filename).await); + let recipients = job_try!(self.param.get(Param::Recipients).ok_or_else(|| { + warn!(context, "Missing recipients for job {}", self.job_id); + format_err!("Missing recipients") + })); - // let recipients_list = recipients - // .split('\x1e') - // .filter_map( - // |addr| match async_smtp::EmailAddress::new(addr.to_string()) { - // Ok(addr) => Some(addr), - // Err(err) => { - // warn!(context, "invalid recipient: {} {:?}", addr, err); - // None - // } - // }, - // ) - // .collect::>(); + let recipients_list = recipients + .split('\x1e') + .filter_map( + |addr| match async_smtp::EmailAddress::new(addr.to_string()) { + Ok(addr) => Some(addr), + Err(err) => { + warn!(context, "invalid recipient: {} {:?}", addr, err); + None + } + }, + ) + .collect::>(); - // /* if there is a msg-id and it does not exist in the db, cancel sending. - // this happends if dc_delete_msgs() was called - // before the generated mime was sent out */ - // if 0 != self.foreign_id && !message::exists(context, MsgId::new(self.foreign_id)).await { - // return Status::Finished(Err(format_err!( - // "Not sending Message {} as it was deleted", - // self.foreign_id - // ))); - // }; + /* if there is a msg-id and it does not exist in the db, cancel sending. + this happends if dc_delete_msgs() was called + before the generated mime was sent out */ + if 0 != self.foreign_id && !message::exists(context, MsgId::new(self.foreign_id)).await { + return Status::Finished(Err(format_err!( + "Not sending Message {} as it was deleted", + self.foreign_id + ))); + }; - // let foreign_id = self.foreign_id; - // self.smtp_send(context, recipients_list, body, self.job_id, || { - // async move { - // // smtp success, update db ASAP, then delete smtp file - // if 0 != foreign_id { - // set_delivered(context, MsgId::new(foreign_id)).await; - // } - // // now also delete the generated file - // dc_delete_file(context, filename).await; - // Ok(()) - // } - // }) - // .await + let foreign_id = self.foreign_id; + self.smtp_send(context, recipients_list, body, self.job_id, smtp, || { + async move { + // smtp success, update db ASAP, then delete smtp file + if 0 != foreign_id { + set_delivered(context, MsgId::new(foreign_id)).await; + } + // now also delete the generated file + dc_delete_file(context, filename).await; + Ok(()) + } + }) + .await } /// Get `SendMdn` jobs with foreign_id equal to `contact_id` excluding the `job_id` job. @@ -350,235 +342,223 @@ impl Job { Ok((job_ids, rfc724_mids)) } - async fn send_mdn(&mut self, context: &Context) -> Status { - unimplemented!(); - // if !context.get_config_bool(Config::MdnsEnabled).await { - // // User has disabled MDNs after job scheduling but before - // // execution. - // return Status::Finished(Err(format_err!("MDNs are disabled"))); - // } + async fn send_mdn(&mut self, context: &Context, smtp: &mut Smtp) -> Status { + if !context.get_config_bool(Config::MdnsEnabled).await { + // User has disabled MDNs after job scheduling but before + // execution. + return Status::Finished(Err(format_err!("MDNs are disabled"))); + } - // let contact_id = self.foreign_id; - // let contact = job_try!(Contact::load_from_db(context, contact_id).await); - // if contact.is_blocked() { - // return Status::Finished(Err(format_err!("Contact is blocked"))); - // } + let contact_id = self.foreign_id; + let contact = job_try!(Contact::load_from_db(context, contact_id).await); + if contact.is_blocked() { + return Status::Finished(Err(format_err!("Contact is blocked"))); + } - // let msg_id = if let Some(msg_id) = self.param.get_msg_id() { - // msg_id - // } else { - // return Status::Finished(Err(format_err!( - // "SendMdn job has invalid parameters: {}", - // self.param - // ))); - // }; + let msg_id = if let Some(msg_id) = self.param.get_msg_id() { + msg_id + } else { + return Status::Finished(Err(format_err!( + "SendMdn job has invalid parameters: {}", + self.param + ))); + }; - // // Try to aggregate other SendMdn jobs and send a combined MDN. - // let (additional_job_ids, additional_rfc724_mids) = self - // .get_additional_mdn_jobs(context, contact_id) - // .await - // .unwrap_or_default(); + // Try to aggregate other SendMdn jobs and send a combined MDN. + let (additional_job_ids, additional_rfc724_mids) = self + .get_additional_mdn_jobs(context, contact_id) + .await + .unwrap_or_default(); - // if !additional_rfc724_mids.is_empty() { - // info!( - // context, - // "SendMdn job: aggregating {} additional MDNs", - // additional_rfc724_mids.len() - // ) - // } + if !additional_rfc724_mids.is_empty() { + info!( + context, + "SendMdn job: aggregating {} additional MDNs", + additional_rfc724_mids.len() + ) + } - // let msg = job_try!(Message::load_from_db(context, msg_id).await); - // let mimefactory = - // job_try!(MimeFactory::from_mdn(context, &msg, additional_rfc724_mids).await); - // let rendered_msg = job_try!(mimefactory.render().await); - // let body = rendered_msg.message; + let msg = job_try!(Message::load_from_db(context, msg_id).await); + let mimefactory = + job_try!(MimeFactory::from_mdn(context, &msg, additional_rfc724_mids).await); + let rendered_msg = job_try!(mimefactory.render().await); + let body = rendered_msg.message; - // let addr = contact.get_addr(); - // let recipient = job_try!(async_smtp::EmailAddress::new(addr.to_string()) - // .map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err))); - // let recipients = vec![recipient]; + let addr = contact.get_addr(); + let recipient = job_try!(async_smtp::EmailAddress::new(addr.to_string()) + .map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err))); + let recipients = vec![recipient]; - // // connect to SMTP server, if not yet done - // if !context.smtp.is_connected().await { - // let loginparam = LoginParam::from_database(context, "configured_").await; - // if let Err(err) = context.smtp.connect(context, &loginparam).await { - // warn!(context, "SMTP connection failure: {:?}", err); - // return Status::RetryLater; - // } - // } + // connect to SMTP server, if not yet done + if !smtp.is_connected().await { + let loginparam = LoginParam::from_database(context, "configured_").await; + if let Err(err) = smtp.connect(context, &loginparam).await { + warn!(context, "SMTP connection failure: {:?}", err); + return Status::RetryLater; + } + } - // self.smtp_send(context, recipients, body, self.job_id, || { - // async move { - // // Remove additional SendMdn jobs we have aggregated into this one. - // job::kill_ids(context, &additional_job_ids).await?; - // Ok(()) - // } - // }) - // .await + self.smtp_send(context, recipients, body, self.job_id, smtp, || { + async move { + // Remove additional SendMdn jobs we have aggregated into this one. + kill_ids(context, &additional_job_ids).await?; + Ok(()) + } + }) + .await } - async fn move_msg(&mut self, context: &Context) -> Status { - unimplemented!(); - // let imap_inbox = &context.inbox_thread.imap; + async fn move_msg(&mut self, context: &Context, imap: &mut Imap) -> Status { + let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); - // let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); + if let Err(err) = imap.ensure_configured_folders(context, true).await { + warn!(context, "could not configure folders: {:?}", err); + return Status::RetryLater; + } + let dest_folder = context + .sql + .get_raw_config(context, "configured_mvbox_folder") + .await; - // if let Err(err) = imap_inbox.ensure_configured_folders(context, true).await { - // warn!(context, "could not configure folders: {:?}", err); - // return Status::RetryLater; - // } - // let dest_folder = context - // .sql - // .get_raw_config(context, "configured_mvbox_folder") - // .await; + if let Some(dest_folder) = dest_folder { + let server_folder = msg.server_folder.as_ref().unwrap(); + let mut dest_uid = 0; - // if let Some(dest_folder) = dest_folder { - // let server_folder = msg.server_folder.as_ref().unwrap(); - // let mut dest_uid = 0; - - // match imap_inbox - // .mv( - // context, - // server_folder, - // msg.server_uid, - // &dest_folder, - // &mut dest_uid, - // ) - // .await - // { - // ImapActionResult::RetryLater => Status::RetryLater, - // ImapActionResult::Success => { - // message::update_server_uid(context, &msg.rfc724_mid, &dest_folder, dest_uid) - // .await; - // Status::Finished(Ok(())) - // } - // ImapActionResult::Failed => { - // Status::Finished(Err(format_err!("IMAP action failed"))) - // } - // ImapActionResult::AlreadyDone => Status::Finished(Ok(())), - // } - // } else { - // Status::Finished(Err(format_err!("No mvbox folder configured"))) - // } + match imap + .mv( + context, + server_folder, + msg.server_uid, + &dest_folder, + &mut dest_uid, + ) + .await + { + ImapActionResult::RetryLater => Status::RetryLater, + ImapActionResult::Success => { + message::update_server_uid(context, &msg.rfc724_mid, &dest_folder, dest_uid) + .await; + Status::Finished(Ok(())) + } + ImapActionResult::Failed => { + Status::Finished(Err(format_err!("IMAP action failed"))) + } + ImapActionResult::AlreadyDone => Status::Finished(Ok(())), + } + } else { + Status::Finished(Err(format_err!("No mvbox folder configured"))) + } } - async fn delete_msg_on_imap(&mut self, context: &Context) -> Status { - unimplemented!(); - // let imap_inbox = &context.inbox_thread.imap; + async fn delete_msg_on_imap(&mut self, context: &Context, imap: &mut Imap) -> Status { + let mut msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); - // let mut msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); - - // if !msg.rfc724_mid.is_empty() { - // if message::rfc724_mid_cnt(context, &msg.rfc724_mid).await > 1 { - // info!( - // context, - // "The message is deleted from the server when all parts are deleted.", - // ); - // } else { - // /* if this is the last existing part of the message, - // we delete the message from the server */ - // let mid = msg.rfc724_mid; - // let server_folder = msg.server_folder.as_ref().unwrap(); - // let res = imap_inbox - // .delete_msg(context, &mid, server_folder, &mut msg.server_uid) - // .await; - // if res == ImapActionResult::RetryLater { - // // XXX RetryLater is converted to RetryNow here - // return Status::RetryNow; - // } - // } - // Message::delete_from_db(context, msg.id).await; - // Status::Finished(Ok(())) - // } else { - // /* eg. device messages have no Message-ID */ - // Status::Finished(Ok(())) - // } + if !msg.rfc724_mid.is_empty() { + if message::rfc724_mid_cnt(context, &msg.rfc724_mid).await > 1 { + info!( + context, + "The message is deleted from the server when all parts are deleted.", + ); + } else { + /* if this is the last existing part of the message, + we delete the message from the server */ + let mid = msg.rfc724_mid; + let server_folder = msg.server_folder.as_ref().unwrap(); + let res = imap + .delete_msg(context, &mid, server_folder, &mut msg.server_uid) + .await; + if res == ImapActionResult::RetryLater { + // XXX RetryLater is converted to RetryNow here + return Status::RetryNow; + } + } + Message::delete_from_db(context, msg.id).await; + Status::Finished(Ok(())) + } else { + /* eg. device messages have no Message-ID */ + Status::Finished(Ok(())) + } } - async fn empty_server(&mut self, context: &Context) -> Status { - unimplemented!(); - // let imap_inbox = &context.inbox_thread.imap; - // if self.foreign_id & DC_EMPTY_MVBOX > 0 { - // if let Some(mvbox_folder) = context - // .sql - // .get_raw_config(context, "configured_mvbox_folder") - // .await - // { - // imap_inbox.empty_folder(context, &mvbox_folder).await; - // } - // } - // if self.foreign_id & DC_EMPTY_INBOX > 0 { - // imap_inbox.empty_folder(context, "INBOX").await; - // } - // Status::Finished(Ok(())) + async fn empty_server(&mut self, context: &Context, imap: &mut Imap) -> Status { + if self.foreign_id & DC_EMPTY_MVBOX > 0 { + if let Some(mvbox_folder) = context + .sql + .get_raw_config(context, "configured_mvbox_folder") + .await + { + imap.empty_folder(context, &mvbox_folder).await; + } + } + if self.foreign_id & DC_EMPTY_INBOX > 0 { + imap.empty_folder(context, "INBOX").await; + } + Status::Finished(Ok(())) } - async fn markseen_msg_on_imap(&mut self, context: &Context) -> Status { - unimplemented!(); - // let imap_inbox = &context.inbox_thread.imap; + async fn markseen_msg_on_imap(&mut self, context: &Context, imap: &mut Imap) -> Status { + let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); - // let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); - - // let folder = msg.server_folder.as_ref().unwrap(); - // match imap_inbox.set_seen(context, folder, msg.server_uid).await { - // ImapActionResult::RetryLater => Status::RetryLater, - // ImapActionResult::AlreadyDone => Status::Finished(Ok(())), - // ImapActionResult::Success | ImapActionResult::Failed => { - // // XXX the message might just have been moved - // // we want to send out an MDN anyway - // // The job will not be retried so locally - // // there is no risk of double-sending MDNs. - // if msg.param.get_bool(Param::WantsMdn).unwrap_or_default() - // && context.get_config_bool(Config::MdnsEnabled).await - // { - // if let Err(err) = send_mdn(context, &msg).await { - // warn!(context, "could not send out mdn for {}: {}", msg.id, err); - // return Status::Finished(Err(err)); - // } - // } - // Status::Finished(Ok(())) - // } - // } + let folder = msg.server_folder.as_ref().unwrap(); + match imap.set_seen(context, folder, msg.server_uid).await { + ImapActionResult::RetryLater => Status::RetryLater, + ImapActionResult::AlreadyDone => Status::Finished(Ok(())), + ImapActionResult::Success | ImapActionResult::Failed => { + // XXX the message might just have been moved + // we want to send out an MDN anyway + // The job will not be retried so locally + // there is no risk of double-sending MDNs. + if msg.param.get_bool(Param::WantsMdn).unwrap_or_default() + && context.get_config_bool(Config::MdnsEnabled).await + { + if let Err(err) = send_mdn(context, &msg).await { + warn!(context, "could not send out mdn for {}: {}", msg.id, err); + return Status::Finished(Err(err)); + } + } + Status::Finished(Ok(())) + } + } } - async fn markseen_mdn_on_imap(&mut self, context: &Context) -> Status { - unimplemented!(); - // let folder = self - // .param - // .get(Param::ServerFolder) - // .unwrap_or_default() - // .to_string(); - // let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32; - // let imap_inbox = &context.inbox_thread.imap; - // if imap_inbox.set_seen(context, &folder, uid).await == ImapActionResult::RetryLater { - // return Status::RetryLater; - // } - // if self.param.get_bool(Param::AlsoMove).unwrap_or_default() { - // if let Err(err) = imap_inbox.ensure_configured_folders(context, true).await { - // warn!(context, "configuring folders failed: {:?}", err); - // return Status::RetryLater; - // } - // let dest_folder = context - // .sql - // .get_raw_config(context, "configured_mvbox_folder") - // .await; - // if let Some(dest_folder) = dest_folder { - // let mut dest_uid = 0; - // if ImapActionResult::RetryLater - // == imap_inbox - // .mv(context, &folder, uid, &dest_folder, &mut dest_uid) - // .await - // { - // Status::RetryLater - // } else { - // Status::Finished(Ok(())) - // } - // } else { - // Status::Finished(Err(format_err!("MVBOX is not configured"))) - // } - // } else { - // Status::Finished(Ok(())) - // } + async fn markseen_mdn_on_imap(&mut self, context: &Context, imap: &mut Imap) -> Status { + let folder = self + .param + .get(Param::ServerFolder) + .unwrap_or_default() + .to_string(); + let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32; + + if imap.set_seen(context, &folder, uid).await == ImapActionResult::RetryLater { + return Status::RetryLater; + } + + if self.param.get_bool(Param::AlsoMove).unwrap_or_default() { + if let Err(err) = imap.ensure_configured_folders(context, true).await { + warn!(context, "configuring folders failed: {:?}", err); + return Status::RetryLater; + } + let dest_folder = context + .sql + .get_raw_config(context, "configured_mvbox_folder") + .await; + if let Some(dest_folder) = dest_folder { + let mut dest_uid = 0; + if ImapActionResult::RetryLater + == imap + .mv(context, &folder, uid, &dest_folder, &mut dest_uid) + .await + { + Status::RetryLater + } else { + Status::Finished(Ok(())) + } + } else { + Status::Finished(Err(format_err!("MVBOX is not configured"))) + } + } else { + Status::Finished(Ok(())) + } } } @@ -592,7 +572,7 @@ pub async fn kill_action(context: &Context, action: Action) -> bool { } /// Remove jobs with specified IDs. -pub async fn kill_ids(context: &Context, job_ids: &[u32]) -> sql::Result<()> { +async fn kill_ids(context: &Context, job_ids: &[u32]) -> sql::Result<()> { context .sql .execute( @@ -606,129 +586,6 @@ pub async fn kill_ids(context: &Context, job_ids: &[u32]) -> sql::Result<()> { Ok(()) } -pub async fn perform_inbox_fetch(context: &Context) { - unimplemented!(); - // let use_network = context.get_config_bool(Config::InboxWatch).await; - - // context.inbox_thread.fetch(context, use_network).await; -} - -pub async fn perform_mvbox_fetch(context: &Context) { - unimplemented!(); - // let use_network = context.get_config_bool(Config::MvboxWatch).await; - - // context.mvbox_thread.fetch(context, use_network).await; -} - -pub async fn perform_sentbox_fetch(context: &Context) { - unimplemented!(); - // let use_network = context.get_config_bool(Config::SentboxWatch).await; - - // context.sentbox_thread.fetch(context, use_network).await; -} - -pub async fn perform_inbox_idle(context: &Context) { - unimplemented!(); - // if context - // .perform_inbox_jobs_needed - // .load(std::sync::atomic::Ordering::Relaxed) - // { - // info!( - // context, - // "INBOX-IDLE will not be started because of waiting jobs." - // ); - // return; - // } - // let use_network = context.get_config_bool(Config::InboxWatch).await; - - // context.inbox_thread.idle(context, use_network).await; -} - -pub async fn perform_mvbox_idle(context: &Context) { - unimplemented!(); - // let use_network = context.get_config_bool(Config::MvboxWatch).await; - - // context.mvbox_thread.idle(context, use_network).await; - // } - - // pub async fn perform_sentbox_idle(context: &Context) { - // let use_network = context.get_config_bool(Config::SentboxWatch).await; - - // context.sentbox_thread.idle(context, use_network).await; -} - -pub async fn interrupt_inbox_idle(context: &Context) { - unimplemented!(); - // info!(context, "interrupt_inbox_idle called"); - // // we do not block on trying to obtain the thread lock - // // because we don't know in which state the thread is. - // // If it's currently fetching then we can not get the lock - // // but we flag it for checking jobs so that idle will be skipped. - // if !context.inbox_thread.try_interrupt_idle(context).await { - // context - // .perform_inbox_jobs_needed - // .store(true, std::sync::atomic::Ordering::Relaxed); - // warn!(context, "could not interrupt idle"); - // } -} - -pub async fn interrupt_mvbox_idle(context: &Context) { - unimplemented!(); - // context.mvbox_thread.interrupt_idle(context).await; -} - -pub async fn interrupt_sentbox_idle(context: &Context) { - unimplemented!(); - // context.sentbox_thread.interrupt_idle(context).await; -} - -pub async fn perform_smtp_jobs(context: &Context) { - unimplemented!(); - // let probe_smtp_network = { - // let state = &mut *context.smtp.state.write().await; - - // let probe_smtp_network = state.probe_network; - // state.probe_network = false; - // state.perform_jobs_needed = PerformJobsNeeded::Not; - - // if state.suspended { - // info!(context, "SMTP-jobs suspended.",); - // return; - // } - // state.doing_jobs = true; - // probe_smtp_network - // }; - - // info!(context, "SMTP-jobs started...",); - // job_perform(context, Thread::Smtp, probe_smtp_network).await; - // info!(context, "SMTP-jobs ended."); - - // context.smtp.state.write().await.doing_jobs = false; -} - -pub async fn perform_smtp_idle(context: &Context) { - unimplemented!(); - // info!(context, "SMTP-idle started..."); - - // let perform_jobs_needed = context.smtp.state.read().await.perform_jobs_needed.clone(); - - // match perform_jobs_needed { - // PerformJobsNeeded::AtOnce => { - // info!( - // context, - // "SMTP-idle will not be started because of waiting jobs.", - // ); - // } - // PerformJobsNeeded::Not | PerformJobsNeeded::AvoidDos => { - // let dur = get_next_wakeup_time(context, Thread::Smtp).await; - - // context.smtp.notify_receiver.recv().timeout(dur).await.ok(); - // } - // } - - // info!(context, "SMTP-idle ended.",); -} - async fn get_next_wakeup_time(context: &Context, thread: Thread) -> time::Duration { let t: i64 = context .sql @@ -897,162 +754,122 @@ pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> { Ok(()) } -pub async fn perform_inbox_jobs(context: &Context) { - unimplemented!(); - // info!(context, "dc_perform_inbox_jobs starting.",); - - // let probe_imap_network = context - // .probe_imap_network - // .load(std::sync::atomic::Ordering::Relaxed); - // context - // .probe_imap_network - // .store(false, std::sync::atomic::Ordering::Relaxed); - // context - // .perform_inbox_jobs_needed - // .store(false, std::sync::atomic::Ordering::Relaxed); - - // job_perform(context, Thread::Imap, probe_imap_network).await; - // info!(context, "dc_perform_inbox_jobs ended.",); +#[derive(Debug)] +pub enum Connection<'a> { + Inbox(&'a mut Imap), + Smtp(&'a mut Smtp), } -pub async fn perform_mvbox_jobs(context: &Context) { - unimplemented!(); - // info!(context, "dc_perform_mbox_jobs EMPTY (for now)."); +impl<'a> fmt::Display for Connection<'a> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Connection::Inbox(_) => write!(f, "Inbox"), + Connection::Smtp(_) => write!(f, "Smtp"), + } + } } -pub async fn perform_sentbox_jobs(context: &Context) { - unimplemented!(); - // info!(context, "dc_perform_sentbox_jobs EMPTY (for now)."); +impl<'a> Connection<'a> { + fn inbox(&mut self) -> &mut Imap { + match self { + Connection::Inbox(imap) => imap, + _ => panic!("Not an inbox"), + } + } + + fn smtp(&mut self) -> &mut Smtp { + match self { + Connection::Smtp(smtp) => smtp, + _ => panic!("Not a smtp"), + } + } } -async fn job_perform(context: &Context, thread: Thread, probe_network: bool) { - unimplemented!(); - // while let Some(mut job) = load_next_job(context, thread, probe_network).await { - // info!(context, "{}-job {} started...", thread, job); +pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_>, mut job: Job) { + info!(context, "{}-job {} started...", &connection, &job); - // // some configuration jobs are "exclusive": - // // - they are always executed in the imap-thread and the smtp-thread is suspended during execution - // // - they may change the database handle; we do not keep old pointers therefore - // // - they can be re-executed one time AT_ONCE, but they are not saved in the database for later execution - // if Action::ConfigureImap == job.action || Action::ImexImap == job.action { - // job::kill_action(context, job.action).await; - // context.sentbox_thread.suspend(context).await; - // context.mvbox_thread.suspend(context).await; - // suspend_smtp_thread(context, true).await; - // } + let try_res = match perform_job_action(context, &mut job, &mut connection, 0).await { + Status::RetryNow => perform_job_action(context, &mut job, &mut connection, 1).await, + x => x, + }; - // let try_res = match perform_job_action(context, &mut job, thread, 0).await { - // Status::RetryNow => perform_job_action(context, &mut job, thread, 1).await, - // x => x, - // }; + match try_res { + Status::RetryNow | Status::RetryLater => { + let tries = job.tries + 1; - // if Action::ConfigureImap == job.action || Action::ImexImap == job.action { - // context.sentbox_thread.unsuspend(context).await; - // context.mvbox_thread.unsuspend(context).await; - // suspend_smtp_thread(context, false).await; - // break; - // } + if tries < JOB_RETRIES { + info!( + context, + "{} thread increases job {} tries to {}", &connection, job, tries + ); + job.tries = tries; + let time_offset = get_backoff_time_offset(tries); + job.desired_timestamp = time() + time_offset; + job.update(context).await; + info!( + context, + "{}-job #{} not succeeded on try #{}, retry in {} seconds.", + &connection, + job.job_id as u32, + tries, + time_offset + ); + } else { + info!( + context, + "{} thread removes job {} as it exhausted {} retries", + &connection, + job, + JOB_RETRIES + ); + job.delete(context).await; + } + } + Status::Finished(res) => { + if let Err(err) = res { + warn!( + context, + "{} removes job {} as it failed with error {:?}", &connection, job, err + ); + } else { + info!( + context, + "{} removes job {} as it succeeded", &connection, job + ); + } - // match try_res { - // Status::RetryNow | Status::RetryLater => { - // let tries = job.tries + 1; - - // if tries < JOB_RETRIES { - // info!( - // context, - // "{} thread increases job {} tries to {}", thread, job, tries - // ); - // job.tries = tries; - // let time_offset = get_backoff_time_offset(tries); - // job.desired_timestamp = time() + time_offset; - // job.update(context).await; - // info!( - // context, - // "{}-job #{} not succeeded on try #{}, retry in {} seconds.", - // thread, - // job.job_id as u32, - // tries, - // time_offset - // ); - // if thread == Thread::Smtp && tries < JOB_RETRIES - 1 { - // context.smtp.state.write().await.perform_jobs_needed = - // PerformJobsNeeded::AvoidDos; - // } - // } else { - // info!( - // context, - // "{} thread removes job {} as it exhausted {} retries", - // thread, - // job, - // JOB_RETRIES - // ); - // if job.action == Action::SendMsgToSmtp { - // message::set_msg_failed( - // context, - // MsgId::new(job.foreign_id), - // job.pending_error.as_ref(), - // ) - // .await; - // } - // job.delete(context).await; - // } - // if !probe_network { - // continue; - // } - // // on dc_maybe_network() we stop trying here; - // // these jobs are already tried once. - // // otherwise, we just continue with the next job - // // to give other jobs a chance being tried at least once. - // break; - // } - // Status::Finished(res) => { - // if let Err(err) = res { - // warn!( - // context, - // "{} removes job {} as it failed with error {:?}", thread, job, err - // ); - // } else { - // info!(context, "{} removes job {} as it succeeded", thread, job); - // } - - // job.delete(context).await; - // } - // } - // } + job.delete(context).await; + } + } } async fn perform_job_action( context: &Context, - mut job: &mut Job, - thread: Thread, + job: &mut Job, + connection: &mut Connection<'_>, tries: u32, ) -> Status { info!( context, - "{} begin immediate try {} of job {}", thread, tries, job + "{} begin immediate try {} of job {}", &connection, tries, job ); let try_res = match job.action { - Action::Unknown => Status::Finished(Err(format_err!("Unknown job id found"))), - Action::SendMsgToSmtp => job.send_msg_to_smtp(context).await, - Action::EmptyServer => job.empty_server(context).await, - Action::DeleteMsgOnImap => job.delete_msg_on_imap(context).await, - Action::MarkseenMsgOnImap => job.markseen_msg_on_imap(context).await, - Action::MarkseenMdnOnImap => job.markseen_mdn_on_imap(context).await, - Action::MoveMsg => job.move_msg(context).await, - Action::SendMdn => job.send_mdn(context).await, - Action::ConfigureImap => job_configure_imap(context).await, - Action::ImexImap => match job_imex_imap(context, &job).await { - Ok(()) => Status::Finished(Ok(())), - Err(err) => { - error!(context, "{}", err); - Status::Finished(Err(err)) - } - }, - Action::MaybeSendLocations => location::job_maybe_send_locations(context, &job).await, - Action::MaybeSendLocationsEnded => { - location::job_maybe_send_locations_ended(context, &mut job).await + Action::Unknown => { + warn!(context, "ignoring unknown job"); + Status::Finished(Ok(())) } + Action::SendMsgToSmtp => job.send_msg_to_smtp(context, connection.smtp()).await, + Action::SendMdn => job.send_mdn(context, connection.smtp()).await, + Action::MaybeSendLocations => location::job_maybe_send_locations(context, job).await, + Action::MaybeSendLocationsEnded => { + location::job_maybe_send_locations_ended(context, job).await + } + Action::EmptyServer => job.empty_server(context, connection.inbox()).await, + Action::DeleteMsgOnImap => job.delete_msg_on_imap(context, connection.inbox()).await, + Action::MarkseenMsgOnImap => job.markseen_msg_on_imap(context, connection.inbox()).await, + Action::MarkseenMdnOnImap => job.markseen_mdn_on_imap(context, connection.inbox()).await, + Action::MoveMsg => job.move_msg(context, connection.inbox()).await, Action::Housekeeping => { sql::housekeeping(context).await; Status::Finished(Ok(())) @@ -1061,7 +878,7 @@ async fn perform_job_action( info!( context, - "{} finished immediate try {} of job {}", thread, tries, job + "Inbox finished immediate try {} of job {}", tries, job ); try_res @@ -1078,24 +895,11 @@ fn get_backoff_time_offset(tries: u32) -> i64 { seconds as i64 } -async fn suspend_smtp_thread(context: &Context, suspend: bool) { - unimplemented!(); - // context.smtp.state.write().await.suspended = suspend; - // if suspend { - // loop { - // if !context.smtp.state.read().await.doing_jobs { - // return; - // } - // async_std::task::sleep(time::Duration::from_micros(300 * 1000)).await; - // } - // } -} - async fn send_mdn(context: &Context, msg: &Message) -> Result<()> { let mut param = Params::new(); param.set(Param::MsgId, msg.id.to_u32().to_string()); - job::add(context, Action::SendMdn, msg.from_id as i32, param, 0).await; + add(context, Action::SendMdn, msg.from_id as i32, param, 0).await; Ok(()) } @@ -1149,30 +953,36 @@ pub async fn add( ] ).await.ok(); - match thread { - Thread::Imap => interrupt_inbox_idle(context).await, - Thread::Smtp => interrupt_smtp_idle(context).await, - Thread::Unknown => {} + match action { + Action::Unknown => unreachable!(), + Action::Housekeeping + | Action::EmptyServer + | Action::DeleteMsgOnImap + | Action::MarkseenMdnOnImap + | Action::MarkseenMsgOnImap + | Action::MoveMsg => { + context.interrupt_inbox().await; + } + Action::MaybeSendLocations + | Action::MaybeSendLocationsEnded + | Action::SendMdn + | Action::SendMsgToSmtp => { + context.interrupt_smtp().await; + } } } -pub async fn interrupt_smtp_idle(context: &Context) { - unimplemented!(); - // info!(context, "Interrupting SMTP-idle...",); - - // context.smtp.state.write().await.perform_jobs_needed = PerformJobsNeeded::AtOnce; - // context.smtp.notify_sender.send(()).await; - - // info!(context, "Interrupting SMTP-idle... ended",); -} - /// Load jobs from the database. /// /// Load jobs for this "[Thread]", i.e. either load SMTP jobs or load /// IMAP jobs. The `probe_network` parameter decides how to query /// jobs, this is tricky and probably wrong currently. Look at the /// SQL queries for details. -async fn load_next_job(context: &Context, thread: Thread, probe_network: bool) -> Option { +pub(crate) async fn load_next( + context: &Context, + thread: Thread, + probe_network: bool, +) -> Option { let query = if !probe_network { // processing for first-try and after backoff-timeouts: // process jobs in the order they were added. @@ -1263,11 +1073,11 @@ mod tests { // all jobs. let t = dummy_context().await; insert_job(&t.ctx, -1).await; // This can not be loaded into Job struct. - let jobs = load_next_job(&t.ctx, Thread::from(Action::MoveMsg), false).await; + let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await; assert!(jobs.is_none()); insert_job(&t.ctx, 1).await; - let jobs = load_next_job(&t.ctx, Thread::from(Action::MoveMsg), false).await; + let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await; assert!(jobs.is_some()); } } diff --git a/src/scheduler.rs b/src/scheduler.rs index f010b2359..63ebedf7e 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -1,9 +1,12 @@ use async_std::prelude::*; use async_std::sync::{channel, Receiver, Sender}; +use async_std::task; -const MAX_JOBS_WAITING: usize = 50; +use std::time::Duration; +use crate::context::Context; use crate::imap::Imap; +use crate::job::{self, Thread}; use crate::smtp::Smtp; /// Job and connection scheduler. @@ -11,16 +14,34 @@ use crate::smtp::Smtp; pub(crate) enum Scheduler { Stopped, Running { - inbox: ImapConnectionState, - mvbox: ImapConnectionState, - sentbox: ImapConnectionState, + inbox: ImapConnectionState, + mvbox: ImapConnectionState, + sentbox: ImapConnectionState, smtp: SmtpConnectionState, }, } +impl Context { + pub(crate) async fn interrupt_inbox(&self) { + self.scheduler.read().await.interrupt_inbox().await; + } + + pub(crate) async fn interrupt_sentbox(&self) { + self.scheduler.read().await.interrupt_sentbox().await; + } + + pub(crate) async fn interrupt_mvbox(&self) { + self.scheduler.read().await.interrupt_mvbox().await; + } + + pub(crate) async fn interrupt_smtp(&self) { + self.scheduler.read().await.interrupt_smtp().await; + } +} + impl Scheduler { /// Start the scheduler, panics if it is already running. - pub async fn run(&mut self) { + pub async fn run(&mut self, ctx: Context) { match self { Scheduler::Stopped => { let ( @@ -34,13 +55,96 @@ impl Scheduler { .join(ImapConnectionState::new()) .join(SmtpConnectionState::new()) .await; - *self = Scheduler::Running { inbox, mvbox, sentbox, smtp, }; + + let ctx1 = ctx.clone(); + task::spawn(async move { + let ImapConnectionHandlers { + mut connection, + stop_receiver, + shutdown_sender, + } = inbox_handlers; + + let fut = async move { + loop { + // TODO: correct value + let probe_network = false; + match job::load_next(&ctx1, Thread::Imap, probe_network) + .timeout(Duration::from_millis(200)) + .await + { + Ok(Some(job)) => { + job::perform_job( + &ctx1, + job::Connection::Inbox(&mut connection), + job, + ) + .await; + } + Ok(None) | Err(async_std::future::TimeoutError { .. }) => { + // fetch + connection.fetch(&ctx1, "TODO").await; + + // idle + connection.idle(&ctx1, Some("TODO".into())).await; + } + } + } + }; + + fut.race(stop_receiver.recv()).await; + shutdown_sender.send(()).await; + }); + + // TODO: mvbox + + // TODO: sentbox + + let ctx1 = ctx.clone(); + task::spawn(async move { + let SmtpConnectionHandlers { + mut connection, + stop_receiver, + shutdown_sender, + idle_interrupt_receiver, + } = smtp_handlers; + + let fut = async move { + loop { + // TODO: correct value + let probe_network = false; + match job::load_next(&ctx1, Thread::Smtp, probe_network) + .timeout(Duration::from_millis(200)) + .await + { + Ok(Some(job)) => { + job::perform_job( + &ctx1, + job::Connection::Smtp(&mut connection), + job, + ) + .await; + } + Ok(None) | Err(async_std::future::TimeoutError { .. }) => { + use futures::future::FutureExt; + + // Fake Idle + async_std::task::sleep(Duration::from_millis(500)) + .race(idle_interrupt_receiver.recv().map(|_| ())) + .await; + } + } + } + }; + + fut.race(stop_receiver.recv()).await; + shutdown_sender.send(()).await; + }); } Scheduler::Running { .. } => { // TODO: return an error @@ -49,6 +153,41 @@ impl Scheduler { } } + fn inbox(&self) -> Option<&ImapConnectionState> { + match self { + Scheduler::Running { ref inbox, .. } => Some(inbox), + _ => None, + } + } + + async fn interrupt_inbox(&self) { + match self { + Scheduler::Running { ref inbox, .. } => inbox.interrupt().await, + _ => panic!("interrupt_imap must be called in running mode"), + } + } + + async fn interrupt_mvbox(&self) { + match self { + Scheduler::Running { ref mvbox, .. } => mvbox.interrupt().await, + _ => panic!("interrupt_mvbox must be called in running mode"), + } + } + + async fn interrupt_sentbox(&self) { + match self { + Scheduler::Running { ref sentbox, .. } => sentbox.interrupt().await, + _ => panic!("interrupt_sentbox must be called in running mode"), + } + } + + async fn interrupt_smtp(&self) { + match self { + Scheduler::Running { ref smtp, .. } => smtp.interrupt().await, + _ => panic!("interrupt_smtp must be called in running mode"), + } + } + /// Halt the scheduler, panics if it is already stopped. pub async fn stop(&mut self) { match self { @@ -90,54 +229,51 @@ impl Scheduler { /// Connection state logic shared between imap and smtp connections. #[derive(Debug)] -struct ConnectionState { +struct ConnectionState { /// Channel to notify that shutdown has completed. shutdown_receiver: Receiver<()>, /// Channel to interrupt the whole connection. stop_sender: Sender<()>, - /// Channel to receive new jobs. - jobs_receiver: Receiver, - /// Channel to schedule new jobs. - jobs_sender: Sender, + /// Channel to interrupt idle. + idle_interrupt_sender: Sender<()>, } -impl ConnectionState { - /// Send a new job. - pub async fn send_job(&self, job: T) { - self.jobs_sender.send(job).await; - } - +impl ConnectionState { /// Shutdown this connection completely. - pub async fn stop(&self) { + async fn stop(&self) { // Trigger shutdown of the run loop. self.stop_sender.send(()).await; // Wait for a notification that the run loop has been shutdown. self.shutdown_receiver.recv().await; } + + async fn interrupt(&self) { + self.idle_interrupt_sender.send(()).await; + } } #[derive(Debug)] pub(crate) struct SmtpConnectionState { - state: ConnectionState, + state: ConnectionState, } impl SmtpConnectionState { async fn new() -> (Self, SmtpConnectionHandlers) { - let (jobs_sender, jobs_receiver) = channel(50); let (stop_sender, stop_receiver) = channel(1); let (shutdown_sender, shutdown_receiver) = channel(1); + let (idle_interrupt_sender, idle_interrupt_receiver) = channel(1); let handlers = SmtpConnectionHandlers { connection: Smtp::new(), stop_receiver, shutdown_sender, + idle_interrupt_receiver, }; let state = ConnectionState { + idle_interrupt_sender, shutdown_receiver, stop_sender, - jobs_sender, - jobs_receiver, }; let conn = SmtpConnectionState { state }; @@ -145,9 +281,9 @@ impl SmtpConnectionState { (conn, handlers) } - /// Send a new job. - async fn send_job(&self, job: SmtpJob) { - self.state.send_job(job).await; + /// Interrupt any form of idle. + async fn interrupt(&self) { + self.state.interrupt().await; } /// Shutdown this connection completely. @@ -161,19 +297,17 @@ struct SmtpConnectionHandlers { connection: Smtp, stop_receiver: Receiver<()>, shutdown_sender: Sender<()>, + idle_interrupt_receiver: Receiver<()>, } #[derive(Debug)] -pub(crate) struct ImapConnectionState { - /// Channel to interrupt idle. - idle_interrupt_sender: Sender<()>, - state: ConnectionState, +pub(crate) struct ImapConnectionState { + state: ConnectionState, } -impl ImapConnectionState { +impl ImapConnectionState { /// Construct a new connection. async fn new() -> (Self, ImapConnectionHandlers) { - let (jobs_sender, jobs_receiver) = channel(MAX_JOBS_WAITING); let (stop_sender, stop_receiver) = channel(1); let (idle_interrupt_sender, idle_interrupt_receiver) = channel(1); let (shutdown_sender, shutdown_receiver) = channel(1); @@ -185,26 +319,19 @@ impl ImapConnectionState { }; let state = ConnectionState { + idle_interrupt_sender, shutdown_receiver, stop_sender, - jobs_sender, - jobs_receiver, }; - let conn = ImapConnectionState { - idle_interrupt_sender, - state, - }; + let conn = ImapConnectionState { state }; (conn, handlers) } - /// Send a new job. - async fn send_job(&self, job: T) { - self.state - .send_job(job) - .join(self.idle_interrupt_sender.send(())) - .await; + /// Interrupt any form of idle. + async fn interrupt(&self) { + self.state.interrupt().await; } /// Shutdown this connection completely. @@ -219,19 +346,3 @@ struct ImapConnectionHandlers { stop_receiver: Receiver<()>, shutdown_sender: Sender<()>, } - -/// Jobs handled by the inbox connection. -#[derive(Debug)] -pub enum InboxJob {} - -/// Jobs handled by the mvbox connection. -#[derive(Debug)] -pub enum MvboxJob {} - -/// Jobs handled by the sentbox connection. -#[derive(Debug)] -pub enum SentboxJob {} - -/// Jobs handled by the smtp connection. -#[derive(Debug)] -pub enum SmtpJob {}