From cfba76d60088edce6b642a71866d91d5a0517f34 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Mon, 11 Nov 2019 21:41:43 +0100 Subject: [PATCH] extract more --- examples/repl/cmdline.rs | 10 ++++- examples/repl/main.rs | 82 +++++++++++++++++++++++++--------------- src/config.rs | 4 +- src/configure/mod.rs | 8 ++-- src/context.rs | 10 ++++- src/imap.rs | 2 + src/job.rs | 50 ++++++++++++------------ src/job_thread.rs | 42 ++++++++++---------- src/lib.rs | 2 +- 9 files changed, 124 insertions(+), 86 deletions(-) diff --git a/examples/repl/cmdline.rs b/examples/repl/cmdline.rs index 4f365b8e1..8a7064eaa 100644 --- a/examples/repl/cmdline.rs +++ b/examples/repl/cmdline.rs @@ -1,5 +1,6 @@ use std::path::Path; use std::str::FromStr; +use std::sync::{Arc, Mutex}; use deltachat::chat::{self, Chat}; use deltachat::chatlist::*; @@ -10,6 +11,7 @@ use deltachat::context::*; use deltachat::dc_receive_imf::*; use deltachat::dc_tools::*; use deltachat::error::Error; +use deltachat::imap::Imap; use deltachat::imex::*; use deltachat::job::*; use deltachat::location; @@ -304,7 +306,11 @@ fn chat_prefix(chat: &Chat) -> &'static str { chat.typ.into() } -pub unsafe fn dc_cmdline(context: &Context, line: &str) -> Result<(), failure::Error> { +pub unsafe fn dc_cmdline( + context: &Context, + inbox: Arc>, + line: &str, +) -> Result<(), failure::Error> { let chat_id = *context.cmdline_sel_chat_id.read().unwrap(); let mut sel_chat = if chat_id > 0 { Chat::load_from_db(context, chat_id).ok() @@ -496,7 +502,7 @@ pub unsafe fn dc_cmdline(context: &Context, line: &str) -> Result<(), failure::E println!("{:#?}", context.get_info()); } "interrupt" => { - // interrupt_imap_idle(context); + interrupt_imap_idle(context, &mut inbox.lock().unwrap()); unimplemented!() } "maybenetwork" => { diff --git a/examples/repl/main.rs b/examples/repl/main.rs index 9b8c47fd4..76b2db853 100644 --- a/examples/repl/main.rs +++ b/examples/repl/main.rs @@ -23,6 +23,7 @@ use std::sync::{Arc, Mutex, RwLock}; use deltachat::config; use deltachat::configure::*; use deltachat::context::*; +use deltachat::imap::Imap; use deltachat::job::*; use deltachat::oauth2::*; use deltachat::securejoin::*; @@ -139,44 +140,51 @@ macro_rules! while_running { }; } -fn start_threads(c: Arc>) { +fn start_threads(c: Arc>) -> Option>> { if HANDLE.clone().lock().unwrap().is_some() { - return; + return None; } println!("Starting threads"); IS_RUNNING.store(true, Ordering::Relaxed); - + let inbox = Arc::new(Mutex::new(c.read().unwrap().create_inbox())); + let inbox2 = inbox.clone(); let ctx = c.clone(); - let handle_imap = std::thread::spawn(move || loop { - let mut inbox = ctx.read().unwrap().create_inbox(); + let handle_imap = std::thread::spawn(move || { + let inbox = inbox2; - while_running!({ - perform_imap_jobs(&ctx.read().unwrap(), &mut inbox); - perform_imap_fetch(&ctx.read().unwrap(), &mut inbox); + loop { while_running!({ - let context = ctx.read().unwrap(); - perform_imap_idle(&context, &mut inbox); - }); - }); + perform_imap_jobs(&ctx.read().unwrap(), &mut inbox.lock().unwrap()); + perform_imap_fetch(&ctx.read().unwrap(), &mut inbox.lock().unwrap()); + while_running!({ + let context = ctx.read().unwrap(); + perform_imap_idle(&context, &mut inbox.lock().unwrap()); + }); + }) + } }); let ctx = c.clone(); let handle_mvbox = std::thread::spawn(move || loop { + let mut mvbox = ctx.read().unwrap().create_inbox(); + while_running!({ - perform_mvbox_fetch(&ctx.read().unwrap()); + perform_mvbox_fetch(&ctx.read().unwrap(), &mut mvbox); while_running!({ - perform_mvbox_idle(&ctx.read().unwrap()); + perform_mvbox_idle(&ctx.read().unwrap(), &mut mvbox); }); }); }); let ctx = c.clone(); let handle_sentbox = std::thread::spawn(move || loop { + let mut sentbox = ctx.read().unwrap().create_inbox(); + while_running!({ - perform_sentbox_fetch(&ctx.read().unwrap()); + perform_sentbox_fetch(&ctx.read().unwrap(), &mut sentbox); while_running!({ - perform_sentbox_idle(&ctx.read().unwrap()); + perform_sentbox_idle(&ctx.read().unwrap(), &mut sentbox); }); }); }); @@ -197,18 +205,15 @@ fn start_threads(c: Arc>) { handle_sentbox: Some(handle_sentbox), handle_smtp: Some(handle_smtp), }); + + Some(inbox) } -fn stop_threads(context: &Context) { +fn stop_threads(_context: &Context) { if let Some(ref mut handle) = *HANDLE.clone().lock().unwrap() { - println!("Stopping threads"); + !("Stopping threads"); IS_RUNNING.store(false, Ordering::Relaxed); - // interrupt_imap_idle(context); - interrupt_mvbox_idle(context); - interrupt_sentbox_idle(context); - interrupt_smtp_idle(context); - handle.handle_imap.take().unwrap().join().unwrap(); handle.handle_mvbox.take().unwrap().join().unwrap(); handle.handle_sentbox.take().unwrap().join().unwrap(); @@ -441,11 +446,16 @@ unsafe fn handle_cmd(line: &str, ctx: Arc>) -> Result { - start_threads(ctx); + if let Some(i) = start_threads(ctx.clone()) { + inbox = Some(i); + }; } "disconnect" => { + let _ = inbox.take(); stop_threads(&ctx.read().unwrap()); } "smtp-jobs" => { @@ -459,12 +469,16 @@ unsafe fn handle_cmd(line: &str, ctx: Arc>) -> Result { - start_threads(ctx.clone()); + if let Some(i) = start_threads(ctx.clone()) { + inbox = Some(i); + }; configure(&ctx.read().unwrap()); } "oauth2" => { @@ -488,7 +502,9 @@ unsafe fn handle_cmd(line: &str, ctx: Arc>) -> Result { - start_threads(ctx.clone()); + if let Some(i) = start_threads(ctx.clone()) { + inbox = Some(i); + }; if let Some(mut qr) = dc_get_securejoin_qr(&ctx.read().unwrap(), arg1.parse().unwrap_or_default()) { @@ -507,13 +523,19 @@ unsafe fn handle_cmd(line: &str, ctx: Arc>) -> Result { - start_threads(ctx.clone()); + if let Some(i) = start_threads(ctx.clone()) { + inbox = Some(i); + }; if !arg0.is_empty() { dc_join_securejoin(&ctx.read().unwrap(), arg1); } } "exit" | "quit" => return Ok(ExitResult::Exit), - _ => dc_cmdline(&ctx.read().unwrap(), line)?, + _ => dc_cmdline( + &ctx.read().unwrap(), + inbox.expect("not started").clone(), + line, + )?, } Ok(ExitResult::Continue) diff --git a/src/config.rs b/src/config.rs index cc4ca70cd..e0bb12415 100644 --- a/src/config.rs +++ b/src/config.rs @@ -5,7 +5,7 @@ use crate::constants::DC_VERSION_STR; use crate::context::Context; use crate::dc_tools::*; use crate::error::Error; -use crate::job::*; +// use crate::job::*; use crate::stock::StockMessage; /// The available configuration keys. @@ -129,7 +129,7 @@ impl Context { } Config::MvboxWatch => { let ret = self.sql.set_raw_config(self, key, value); - interrupt_mvbox_idle(self); + // interrupt_mvbox_idle(self); ret } Config::Selfstatus => { diff --git a/src/configure/mod.rs b/src/configure/mod.rs index 7a0d8e2ba..c4459be5f 100644 --- a/src/configure/mod.rs +++ b/src/configure/mod.rs @@ -62,9 +62,11 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context, inbox: &mut Imap) { let mut param_autoconfig: Option = None; - inbox.disconnect(); - context.sentbox_thread.write().unwrap().imap.disconnect(); - context.mvbox_thread.write().unwrap().imap.disconnect(); + // TODO: these need to be disconnected manually now, before starting configure + // inbox.disconnect(); + // context.sentbox_thread.write().unwrap().imap.disconnect(); + // context.mvbox_thread.write().unwrap().imap.disconnect(); + context.smtp.clone().lock().unwrap().disconnect(); info!(context, "Configure ...",); diff --git a/src/context.rs b/src/context.rs index acc8182c0..571c7d8b1 100644 --- a/src/context.rs +++ b/src/context.rs @@ -130,12 +130,10 @@ impl Context { sentbox_thread: Arc::new(RwLock::new(JobThread::new( "SENTBOX", "configured_sentbox_folder", - Imap::new(), ))), mvbox_thread: Arc::new(RwLock::new(JobThread::new( "MVBOX", "configured_mvbox_folder", - Imap::new(), ))), probe_imap_network: Arc::new(RwLock::new(false)), perform_inbox_jobs_needed: Arc::new(RwLock::new(false)), @@ -155,6 +153,14 @@ impl Context { Imap::new() } + pub fn create_mvbox(&self) -> Imap { + Imap::new() + } + + pub fn create_sentbox(&self) -> Imap { + Imap::new() + } + pub fn get_dbfile(&self) -> &Path { self.dbfile.as_path() } diff --git a/src/imap.rs b/src/imap.rs index ec31208a8..e43b8044b 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -369,6 +369,8 @@ impl Imap { pub fn disconnect(&mut self) { task::block_on(async move { + self.interrupt.take(); + if self.is_connected().await { self.unsetup_handle(None).await; self.free_connect_params().await; diff --git a/src/job.rs b/src/job.rs index 2f594e4ff..b6189901b 100644 --- a/src/job.rs +++ b/src/job.rs @@ -412,60 +412,60 @@ pub fn perform_imap_idle(context: &Context, inbox: &mut Imap) { info!(context, "INBOX-IDLE ended."); } -pub fn perform_mvbox_fetch(context: &Context) { +pub fn perform_mvbox_fetch(context: &Context, imap: &mut Imap) { let use_network = context.get_config_bool(Config::MvboxWatch); context .mvbox_thread - .write() + .read() .unwrap() - .fetch(context, use_network); + .fetch(context, use_network, imap); } -pub fn perform_mvbox_idle(context: &Context) { +pub fn perform_mvbox_idle(context: &Context, imap: &mut Imap) { let use_network = context.get_config_bool(Config::MvboxWatch); context .mvbox_thread - .write() + .read() .unwrap() - .idle(context, use_network); + .idle(context, use_network, imap); } -pub fn interrupt_mvbox_idle(context: &Context) { +pub fn interrupt_mvbox_idle(context: &Context, imap: &mut Imap) { context .mvbox_thread - .write() + .read() .unwrap() - .interrupt_idle(context); + .interrupt_idle(context, imap); } -pub fn perform_sentbox_fetch(context: &Context) { +pub fn perform_sentbox_fetch(context: &Context, imap: &mut Imap) { let use_network = context.get_config_bool(Config::SentboxWatch); context .sentbox_thread - .write() + .read() .unwrap() - .fetch(context, use_network); + .fetch(context, use_network, imap); } -pub fn perform_sentbox_idle(context: &Context) { +pub fn perform_sentbox_idle(context: &Context, imap: &mut Imap) { let use_network = context.get_config_bool(Config::SentboxWatch); context .sentbox_thread - .write() + .read() .unwrap() - .idle(context, use_network); + .idle(context, use_network, imap); } -pub fn interrupt_sentbox_idle(context: &Context) { +pub fn interrupt_sentbox_idle(context: &Context, imap: &mut Imap) { context .sentbox_thread - .write() + .read() .unwrap() - .interrupt_idle(context); + .interrupt_idle(context, imap); } pub fn perform_smtp_jobs(context: &Context) { @@ -550,7 +550,7 @@ fn get_next_wakeup_time(context: &Context, thread: Thread) -> Duration { wakeup_time } -pub fn maybe_network(context: &Context, inbox: &mut Imap) { +pub fn maybe_network(context: &Context, _inbox: &mut Imap) { { let &(ref lock, _) = &*context.smtp_state.clone(); let mut state = lock.lock().unwrap(); @@ -560,9 +560,10 @@ pub fn maybe_network(context: &Context, inbox: &mut Imap) { } interrupt_smtp_idle(context); - interrupt_imap_idle(context, inbox); - interrupt_mvbox_idle(context); - interrupt_sentbox_idle(context); + // TODO: manually + // interrupt_imap_idle(context, inbox); + // interrupt_mvbox_idle(context); + // interrupt_sentbox_idle(context); } pub fn job_action_exists(context: &Context, action: Action) -> bool { @@ -766,8 +767,9 @@ fn job_perform( // - they can be re-executed one time AT_ONCE, but they are not save in the database for later execution if Action::ConfigureImap == job.action || Action::ImexImap == job.action { job_kill_action(context, job.action); - context.sentbox_thread.write().unwrap().suspend(context); - context.mvbox_thread.write().unwrap().suspend(context); + // TODO: figure out better way + // context.sentbox_thread.write().unwrap().suspend(context); + // context.mvbox_thread.write().unwrap().suspend(context); suspend_smtp_thread(context, true); } diff --git a/src/job_thread.rs b/src/job_thread.rs index a35df068f..b73251d40 100644 --- a/src/job_thread.rs +++ b/src/job_thread.rs @@ -8,7 +8,6 @@ use crate::imap::Imap; pub struct JobThread { pub name: &'static str, pub folder_config_name: &'static str, - pub imap: Imap, pub state: Arc<(Mutex, Condvar)>, } @@ -21,21 +20,20 @@ pub struct JobState { } impl JobThread { - pub fn new(name: &'static str, folder_config_name: &'static str, imap: Imap) -> Self { + pub fn new(name: &'static str, folder_config_name: &'static str) -> Self { JobThread { name, folder_config_name, - imap, state: Arc::new((Mutex::new(Default::default()), Condvar::new())), } } - pub fn suspend(&mut self, context: &Context) { + pub fn suspend(&self, context: &Context, imap: &mut Imap) { info!(context, "Suspending {}-thread.", self.name,); { self.state.0.lock().unwrap().suspended = true; } - self.interrupt_idle(context); + self.interrupt_idle(context, imap); loop { let using_handle = self.state.0.lock().unwrap().using_handle; if !using_handle { @@ -56,14 +54,14 @@ impl JobThread { cvar.notify_one(); } - pub fn interrupt_idle(&mut self, context: &Context) { + pub fn interrupt_idle(&self, context: &Context, imap: &mut Imap) { { self.state.0.lock().unwrap().jobs_needed = true; } info!(context, "Interrupting {}-IDLE...", self.name); - self.imap.interrupt_idle(); + imap.interrupt_idle(); let &(ref lock, ref cvar) = &*self.state.clone(); let mut state = lock.lock().unwrap(); @@ -72,7 +70,7 @@ impl JobThread { cvar.notify_one(); } - pub fn fetch(&mut self, context: &Context, use_network: bool) { + pub fn fetch(&self, context: &Context, use_network: bool, imap: &mut Imap) { { let &(ref lock, _) = &*self.state.clone(); let mut state = lock.lock().unwrap(); @@ -86,13 +84,13 @@ impl JobThread { if use_network { let start = std::time::Instant::now(); - if self.connect_to_imap(context) { + if self.connect_to_imap(context, imap) { info!(context, "{}-fetch started...", self.name); - self.imap.fetch(context); + imap.fetch(context); - if self.imap.should_reconnect() { + if imap.should_reconnect() { info!(context, "{}-fetch aborted, starting over...", self.name,); - self.imap.fetch(context); + imap.fetch(context); } info!( context, @@ -106,13 +104,13 @@ impl JobThread { self.state.0.lock().unwrap().using_handle = false; } - fn connect_to_imap(&mut self, context: &Context) -> bool { + fn connect_to_imap(&self, context: &Context, imap: &mut Imap) -> bool { async_std::task::block_on(async move { - if self.imap.is_connected().await { + if imap.is_connected().await { return true; } - let mut ret_connected = dc_connect_to_configured_imap(context, &mut self.imap) != 0; + let mut ret_connected = dc_connect_to_configured_imap(context, imap) != 0; if ret_connected { if context @@ -121,15 +119,15 @@ impl JobThread { .unwrap_or_default() < 3 { - self.imap.configure_folders(context, 0x1); + imap.configure_folders(context, 0x1); } if let Some(mvbox_name) = context.sql.get_raw_config(context, self.folder_config_name) { - self.imap.set_watch_folder(mvbox_name); + imap.set_watch_folder(mvbox_name); } else { - self.imap.disconnect(); + imap.disconnect(); ret_connected = false; } } @@ -138,7 +136,7 @@ impl JobThread { }) } - pub fn idle(&mut self, context: &Context, use_network: bool) { + pub fn idle(&self, context: &Context, use_network: bool, imap: &mut Imap) { { let &(ref lock, ref cvar) = &*self.state.clone(); let mut state = lock.lock().unwrap(); @@ -174,9 +172,9 @@ impl JobThread { } } - self.connect_to_imap(context); - info!(context, "{}-IDLE started...", self.name,); - self.imap.idle(context); + self.connect_to_imap(context, imap); + info!(context, "{}-IDLE started...", self.name); + imap.idle(context); info!(context, "{}-IDLE ended.", self.name); self.state.0.lock().unwrap().using_handle = false; diff --git a/src/lib.rs b/src/lib.rs index fd61d2821..511fb7c1e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,7 +39,7 @@ pub mod constants; pub mod contact; pub mod context; mod e2ee; -mod imap; +pub mod imap; mod imap_client; pub mod imex; pub mod job;