From 6e05039a107be8a4cd53b86f42b10439024afcf9 Mon Sep 17 00:00:00 2001 From: Hocuri Date: Sun, 31 May 2020 18:07:40 +0200 Subject: [PATCH] Try to fix it. I dont like the code and it does not work. --- src/config.rs | 9 ++-- src/imap/idle.rs | 36 ++++++++------- src/imap/mod.rs | 10 +++-- src/job.rs | 70 ++++++++++++++++++++--------- src/scheduler.rs | 112 +++++++++++++++++++++++------------------------ 5 files changed, 137 insertions(+), 100 deletions(-) diff --git a/src/config.rs b/src/config.rs index a76554ecf..644093a97 100644 --- a/src/config.rs +++ b/src/config.rs @@ -11,7 +11,7 @@ use crate::dc_tools::*; use crate::events::Event; use crate::message::MsgId; use crate::mimefactory::RECOMMENDED_FILE_SIZE; -use crate::stock::StockMessage; +use crate::{scheduler::InterruptInfo, stock::StockMessage}; /// The available configuration keys. #[derive( @@ -203,17 +203,18 @@ impl Context { } Config::InboxWatch => { let ret = self.sql.set_raw_config(self, key, value).await; - self.interrupt_inbox(false).await; + self.interrupt_inbox(InterruptInfo::new(false, None)).await; ret } Config::SentboxWatch => { let ret = self.sql.set_raw_config(self, key, value).await; - self.interrupt_sentbox(false).await; + self.interrupt_sentbox(InterruptInfo::new(false, None)) + .await; ret } Config::MvboxWatch => { let ret = self.sql.set_raw_config(self, key, value).await; - self.interrupt_mvbox(false).await; + self.interrupt_mvbox(InterruptInfo::new(false, None)).await; ret } Config::Selfstatus => { diff --git a/src/imap/idle.rs b/src/imap/idle.rs index e6b3ea621..a029f74e3 100644 --- a/src/imap/idle.rs +++ b/src/imap/idle.rs @@ -4,7 +4,7 @@ use async_imap::extensions::idle::IdleResponse; use async_std::prelude::*; use std::time::{Duration, SystemTime}; -use crate::context::Context; +use crate::{context::Context, scheduler::InterruptInfo}; use super::select_folder; use super::session::Session; @@ -34,7 +34,11 @@ impl Imap { self.config.can_idle } - pub async fn idle(&mut self, context: &Context, watch_folder: Option) -> Result { + pub async fn idle( + &mut self, + context: &Context, + watch_folder: Option, + ) -> Result { use futures::future::FutureExt; if !self.can_idle() { @@ -46,7 +50,7 @@ impl Imap { let session = self.session.take(); let timeout = Duration::from_secs(23 * 60); - let mut probe_network = false; + let mut info = Default::default(); if let Some(session) = session { let mut handle = session.idle(); @@ -58,7 +62,7 @@ impl Imap { enum Event { IdleResponse(IdleResponse), - Interrupt(bool), + Interrupt(InterruptInfo), } if self.skip_next_idle_wait { @@ -90,8 +94,8 @@ impl Imap { Ok(Event::IdleResponse(IdleResponse::ManualInterrupt)) => { info!(context, "Idle wait was interrupted"); } - Ok(Event::Interrupt(probe)) => { - probe_network = probe; + Ok(Event::Interrupt(i)) => { + info = i; info!(context, "Idle wait was interrupted"); } Err(err) => { @@ -125,14 +129,14 @@ impl Imap { } } - Ok(probe_network) + Ok(info) } pub(crate) async fn fake_idle( &mut self, context: &Context, watch_folder: Option, - ) -> bool { + ) -> InterruptInfo { // Idle using polling. This is also needed if we're not yet configured - // in this case, we're waiting for a configure job (and an interrupt). @@ -144,7 +148,7 @@ impl Imap { return self.idle_interrupt.recv().await.unwrap_or_default(); } - let mut probe_network = false; + let mut info: InterruptInfo = Default::default(); if self.skip_next_idle_wait { // interrupt_idle has happened before we // provided self.interrupt @@ -157,10 +161,10 @@ impl Imap { enum Event { Tick, - Interrupt(bool), + Interrupt(InterruptInfo), } // loop until we are interrupted or if we fetched something - probe_network = + info = loop { use futures::future::FutureExt; match interval @@ -181,7 +185,7 @@ impl Imap { } if self.config.can_idle { // we only fake-idled because network was gone during IDLE, probably - break false; + break InterruptInfo::new(false, None); } info!(context, "fake_idle is connected"); // we are connected, let's see if fetching messages results @@ -194,7 +198,7 @@ impl Imap { Ok(res) => { info!(context, "fetch_new_messages returned {:?}", res); if res { - break false; + break InterruptInfo::new(false, None); } } Err(err) => { @@ -204,9 +208,9 @@ impl Imap { } } } - Event::Interrupt(probe_network) => { + Event::Interrupt(info) => { // Interrupt - break probe_network; + break info; } } }; @@ -222,6 +226,6 @@ impl Imap { / 1000., ); - probe_network + info } } diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 96722e115..3bf9a706a 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -27,7 +27,7 @@ use crate::message::{self, update_server_uid}; use crate::mimeparser; use crate::oauth2::dc_get_oauth2_access_token; use crate::param::Params; -use crate::stock::StockMessage; +use crate::{scheduler::InterruptInfo, stock::StockMessage}; mod client; mod idle; @@ -109,7 +109,7 @@ const SELECT_ALL: &str = "1:*"; #[derive(Debug)] pub struct Imap { - idle_interrupt: Receiver, + idle_interrupt: Receiver, config: ImapConfig, session: Option, connected: bool, @@ -181,7 +181,7 @@ impl Default for ImapConfig { } impl Imap { - pub fn new(idle_interrupt: Receiver) -> Self { + pub fn new(idle_interrupt: Receiver) -> Self { Imap { idle_interrupt, config: Default::default(), @@ -1391,6 +1391,10 @@ async fn precheck_imf( if old_server_folder != server_folder || old_server_uid != server_uid { update_server_uid(context, rfc724_mid, server_folder, server_uid).await; + context + .interrupt_inbox(InterruptInfo::new(false, Some(msg_id))) + .await; + info!(context, "Updating server_uid and interrupting") } Ok(true) } else { diff --git a/src/job.rs b/src/job.rs index 271508104..81c97b1f0 100644 --- a/src/job.rs +++ b/src/job.rs @@ -31,7 +31,7 @@ use crate::message::{self, Message, MessageState}; use crate::mimefactory::MimeFactory; use crate::param::*; use crate::smtp::Smtp; -use crate::sql; +use crate::{scheduler::InterruptInfo, sql}; // results in ~3 weeks for the last backoff timespan const JOB_RETRIES: u32 = 17; @@ -1022,14 +1022,18 @@ pub async fn add(context: &Context, job: Job) { | Action::MarkseenMsgOnImap | Action::MoveMsg => { info!(context, "interrupt: imap"); - context.interrupt_inbox(false).await; + context + .interrupt_inbox(InterruptInfo::new(false, None)) + .await; } Action::MaybeSendLocations | Action::MaybeSendLocationsEnded | Action::SendMdn | Action::SendMsgToSmtp => { info!(context, "interrupt: smtp"); - context.interrupt_smtp(false).await; + context + .interrupt_smtp(InterruptInfo::new(false, None)) + .await; } } } @@ -1044,38 +1048,49 @@ pub async fn add(context: &Context, job: Job) { pub(crate) async fn load_next( context: &Context, thread: Thread, - probe_network: bool, + info: &InterruptInfo, ) -> Option { info!(context, "loading job for {}-thread", thread); - let query = if !probe_network { + + let query; + let params; + let t = time(); + let m; + let thread_i = thread as i64; + + if let Some(msg_id) = info.msg_id { + query = r#" +SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries +FROM jobs +WHERE foreign_id=? +ORDER BY action DESC, added_timestamp +LIMIT 1; +"#; + m = msg_id; + params = paramsv![m]; + } else if !info.probe_network { // processing for first-try and after backoff-timeouts: // process jobs in the order they were added. - r#" + query = r#" SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, added_timestamp LIMIT 1; -"# +"#; + params = paramsv![thread_i, t]; } else { // processing after call to dc_maybe_network(): // process _all_ pending jobs that failed before // in the order of their backoff-times. - r#" + query = r#" SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries FROM jobs WHERE thread=? AND tries>0 ORDER BY desired_timestamp, action DESC LIMIT 1; -"# - }; - - let thread_i = thread as i64; - let t = time(); - let params = if !probe_network { - paramsv![thread_i, t] - } else { - paramsv![thread_i] +"#; + params = paramsv![thread_i]; }; let job = loop { @@ -1182,11 +1197,21 @@ mod tests { // all jobs. let t = dummy_context().await; insert_job(&t.ctx, -1).await; // This can not be loaded into Job struct. - let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await; + let jobs = load_next( + &t.ctx, + Thread::from(Action::MoveMsg), + &InterruptInfo::new(false, None), + ) + .await; assert!(jobs.is_none()); insert_job(&t.ctx, 1).await; - let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await; + let jobs = load_next( + &t.ctx, + Thread::from(Action::MoveMsg), + &InterruptInfo::new(false, None), + ) + .await; assert!(jobs.is_some()); } @@ -1196,7 +1221,12 @@ mod tests { insert_job(&t.ctx, 1).await; - let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await; + let jobs = load_next( + &t.ctx, + Thread::from(Action::MoveMsg), + &InterruptInfo::new(false, None), + ) + .await; assert!(jobs.is_some()); } } diff --git a/src/scheduler.rs b/src/scheduler.rs index ee4a0b4c8..fd685f9e7 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -5,7 +5,7 @@ use async_std::task; use crate::context::Context; use crate::imap::Imap; use crate::job::{self, Thread}; -use crate::{config::Config, smtp::Smtp}; +use crate::{config::Config, message::MsgId, smtp::Smtp}; pub(crate) struct StopToken; @@ -32,36 +32,20 @@ impl Context { self.scheduler.read().await.maybe_network().await; } - pub(crate) async fn interrupt_inbox(&self, probe_network: bool) { - self.scheduler - .read() - .await - .interrupt_inbox(probe_network) - .await; + pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) { + self.scheduler.read().await.interrupt_inbox(info).await; } - pub(crate) async fn interrupt_sentbox(&self, probe_network: bool) { - self.scheduler - .read() - .await - .interrupt_sentbox(probe_network) - .await; + pub(crate) async fn interrupt_sentbox(&self, info: InterruptInfo) { + self.scheduler.read().await.interrupt_sentbox(info).await; } - pub(crate) async fn interrupt_mvbox(&self, probe_network: bool) { - self.scheduler - .read() - .await - .interrupt_mvbox(probe_network) - .await; + pub(crate) async fn interrupt_mvbox(&self, info: InterruptInfo) { + self.scheduler.read().await.interrupt_mvbox(info).await; } - pub(crate) async fn interrupt_smtp(&self, probe_network: bool) { - self.scheduler - .read() - .await - .interrupt_smtp(probe_network) - .await; + pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) { + self.scheduler.read().await.interrupt_smtp(info).await; } } @@ -86,14 +70,14 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne started.send(()).await; // track number of continously executed jobs - let mut jobs_loaded = 0; - let mut probe_network = false; + let mut jobs_loaded: i32 = 0; + let mut info: InterruptInfo = Default::default(); loop { - match job::load_next(&ctx, Thread::Imap, probe_network).await { + match job::load_next(&ctx, Thread::Imap, &info).await { Some(job) if jobs_loaded <= 20 => { jobs_loaded += 1; job::perform_job(&ctx, job::Connection::Inbox(&mut connection), job).await; - probe_network = false; + info = Default::default(); } Some(job) => { // Let the fetch run, but return back to the job afterwards. @@ -103,8 +87,7 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne } None => { jobs_loaded = 0; - probe_network = - fetch_idle(&ctx, &mut connection, Config::ConfiguredInboxFolder).await; + info = fetch_idle(&ctx, &mut connection, Config::ConfiguredInboxFolder).await; } } } @@ -136,7 +119,7 @@ async fn fetch(ctx: &Context, connection: &mut Imap) { } } -async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> bool { +async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> InterruptInfo { match ctx.get_config(folder).await { Some(watch_folder) => { // fetch @@ -153,7 +136,7 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> boo .unwrap_or_else(|err| { connection.trigger_reconnect(); error!(ctx, "{}", err); - false + InterruptInfo::new(false, None) }) } else { connection.fake_idle(&ctx, Some(watch_folder)).await @@ -223,18 +206,18 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect started.send(()).await; let ctx = ctx1; - let mut probe_network = false; + let mut interrupt_info = Default::default(); loop { - match job::load_next(&ctx, Thread::Smtp, probe_network).await { + match job::load_next(&ctx, Thread::Smtp, &interrupt_info).await { Some(job) => { info!(ctx, "executing smtp job"); job::perform_job(&ctx, job::Connection::Smtp(&mut connection), job).await; - probe_network = false; + interrupt_info = Default::default(); } None => { // Fake Idle info!(ctx, "smtp fake idle - started"); - probe_network = idle_interrupt_receiver.recv().await.unwrap_or_default(); + interrupt_info = idle_interrupt_receiver.recv().await.unwrap_or_default(); info!(ctx, "smtp fake idle - interrupted") } } @@ -333,34 +316,34 @@ impl Scheduler { return; } - self.interrupt_inbox(true) - .join(self.interrupt_mvbox(true)) - .join(self.interrupt_sentbox(true)) - .join(self.interrupt_smtp(true)) + self.interrupt_inbox(InterruptInfo::new(true, None)) + .join(self.interrupt_mvbox(InterruptInfo::new(true, None))) + .join(self.interrupt_sentbox(InterruptInfo::new(true, None))) + .join(self.interrupt_smtp(InterruptInfo::new(true, None))) .await; } - async fn interrupt_inbox(&self, probe_network: bool) { + async fn interrupt_inbox(&self, info: InterruptInfo) { if let Scheduler::Running { ref inbox, .. } = self { - inbox.interrupt(probe_network).await; + inbox.interrupt(info).await; } } - async fn interrupt_mvbox(&self, probe_network: bool) { + async fn interrupt_mvbox(&self, info: InterruptInfo) { if let Scheduler::Running { ref mvbox, .. } = self { - mvbox.interrupt(probe_network).await; + mvbox.interrupt(info).await; } } - async fn interrupt_sentbox(&self, probe_network: bool) { + async fn interrupt_sentbox(&self, info: InterruptInfo) { if let Scheduler::Running { ref sentbox, .. } = self { - sentbox.interrupt(probe_network).await; + sentbox.interrupt(info).await; } } - async fn interrupt_smtp(&self, probe_network: bool) { + async fn interrupt_smtp(&self, info: InterruptInfo) { if let Scheduler::Running { ref smtp, .. } = self { - smtp.interrupt(probe_network).await; + smtp.interrupt(info).await; } } @@ -429,7 +412,7 @@ struct ConnectionState { /// Channel to interrupt the whole connection. stop_sender: Sender<()>, /// Channel to interrupt idle. - idle_interrupt_sender: Sender, + idle_interrupt_sender: Sender, } impl ConnectionState { @@ -441,9 +424,9 @@ impl ConnectionState { self.shutdown_receiver.recv().await.ok(); } - async fn interrupt(&self, probe_network: bool) { + async fn interrupt(&self, info: InterruptInfo) { // Use try_send to avoid blocking on interrupts. - self.idle_interrupt_sender.try_send(probe_network).ok(); + self.idle_interrupt_sender.try_send(info).ok(); } } @@ -477,8 +460,8 @@ impl SmtpConnectionState { } /// Interrupt any form of idle. - async fn interrupt(&self, probe_network: bool) { - self.state.interrupt(probe_network).await; + async fn interrupt(&self, info: InterruptInfo) { + self.state.interrupt(info).await; } /// Shutdown this connection completely. @@ -492,7 +475,7 @@ struct SmtpConnectionHandlers { connection: Smtp, stop_receiver: Receiver<()>, shutdown_sender: Sender<()>, - idle_interrupt_receiver: Receiver, + idle_interrupt_receiver: Receiver, } #[derive(Debug)] @@ -525,8 +508,8 @@ impl ImapConnectionState { } /// Interrupt any form of idle. - async fn interrupt(&self, probe_network: bool) { - self.state.interrupt(probe_network).await; + async fn interrupt(&self, info: InterruptInfo) { + self.state.interrupt(info).await; } /// Shutdown this connection completely. @@ -541,3 +524,18 @@ struct ImapConnectionHandlers { stop_receiver: Receiver<()>, shutdown_sender: Sender<()>, } + +#[derive(Default, Debug)] +pub struct InterruptInfo { + pub probe_network: bool, + pub msg_id: Option, +} + +impl InterruptInfo { + pub fn new(probe_network: bool, msg_id: Option) -> Self { + Self { + probe_network, + msg_id, + } + } +}