extract more

This commit is contained in:
dignifiedquire
2019-11-11 21:41:43 +01:00
parent 0e953d18d0
commit cfba76d600
9 changed files with 124 additions and 86 deletions

View File

@@ -1,5 +1,6 @@
use std::path::Path; use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Arc, Mutex};
use deltachat::chat::{self, Chat}; use deltachat::chat::{self, Chat};
use deltachat::chatlist::*; use deltachat::chatlist::*;
@@ -10,6 +11,7 @@ use deltachat::context::*;
use deltachat::dc_receive_imf::*; use deltachat::dc_receive_imf::*;
use deltachat::dc_tools::*; use deltachat::dc_tools::*;
use deltachat::error::Error; use deltachat::error::Error;
use deltachat::imap::Imap;
use deltachat::imex::*; use deltachat::imex::*;
use deltachat::job::*; use deltachat::job::*;
use deltachat::location; use deltachat::location;
@@ -304,7 +306,11 @@ fn chat_prefix(chat: &Chat) -> &'static str {
chat.typ.into() chat.typ.into()
} }
pub unsafe fn dc_cmdline(context: &Context, line: &str) -> Result<(), failure::Error> { pub unsafe fn dc_cmdline(
context: &Context,
inbox: Arc<Mutex<Imap>>,
line: &str,
) -> Result<(), failure::Error> {
let chat_id = *context.cmdline_sel_chat_id.read().unwrap(); let chat_id = *context.cmdline_sel_chat_id.read().unwrap();
let mut sel_chat = if chat_id > 0 { let mut sel_chat = if chat_id > 0 {
Chat::load_from_db(context, chat_id).ok() Chat::load_from_db(context, chat_id).ok()
@@ -496,7 +502,7 @@ pub unsafe fn dc_cmdline(context: &Context, line: &str) -> Result<(), failure::E
println!("{:#?}", context.get_info()); println!("{:#?}", context.get_info());
} }
"interrupt" => { "interrupt" => {
// interrupt_imap_idle(context); interrupt_imap_idle(context, &mut inbox.lock().unwrap());
unimplemented!() unimplemented!()
} }
"maybenetwork" => { "maybenetwork" => {

View File

@@ -23,6 +23,7 @@ use std::sync::{Arc, Mutex, RwLock};
use deltachat::config; use deltachat::config;
use deltachat::configure::*; use deltachat::configure::*;
use deltachat::context::*; use deltachat::context::*;
use deltachat::imap::Imap;
use deltachat::job::*; use deltachat::job::*;
use deltachat::oauth2::*; use deltachat::oauth2::*;
use deltachat::securejoin::*; use deltachat::securejoin::*;
@@ -139,44 +140,51 @@ macro_rules! while_running {
}; };
} }
fn start_threads(c: Arc<RwLock<Context>>) { fn start_threads(c: Arc<RwLock<Context>>) -> Option<Arc<Mutex<Imap>>> {
if HANDLE.clone().lock().unwrap().is_some() { if HANDLE.clone().lock().unwrap().is_some() {
return; return None;
} }
println!("Starting threads"); println!("Starting threads");
IS_RUNNING.store(true, Ordering::Relaxed); IS_RUNNING.store(true, Ordering::Relaxed);
let inbox = Arc::new(Mutex::new(c.read().unwrap().create_inbox()));
let inbox2 = inbox.clone();
let ctx = c.clone(); let ctx = c.clone();
let handle_imap = std::thread::spawn(move || loop { let handle_imap = std::thread::spawn(move || {
let mut inbox = ctx.read().unwrap().create_inbox(); let inbox = inbox2;
while_running!({ loop {
perform_imap_jobs(&ctx.read().unwrap(), &mut inbox);
perform_imap_fetch(&ctx.read().unwrap(), &mut inbox);
while_running!({ while_running!({
let context = ctx.read().unwrap(); perform_imap_jobs(&ctx.read().unwrap(), &mut inbox.lock().unwrap());
perform_imap_idle(&context, &mut inbox); perform_imap_fetch(&ctx.read().unwrap(), &mut inbox.lock().unwrap());
}); while_running!({
}); let context = ctx.read().unwrap();
perform_imap_idle(&context, &mut inbox.lock().unwrap());
});
})
}
}); });
let ctx = c.clone(); let ctx = c.clone();
let handle_mvbox = std::thread::spawn(move || loop { let handle_mvbox = std::thread::spawn(move || loop {
let mut mvbox = ctx.read().unwrap().create_inbox();
while_running!({ while_running!({
perform_mvbox_fetch(&ctx.read().unwrap()); perform_mvbox_fetch(&ctx.read().unwrap(), &mut mvbox);
while_running!({ while_running!({
perform_mvbox_idle(&ctx.read().unwrap()); perform_mvbox_idle(&ctx.read().unwrap(), &mut mvbox);
}); });
}); });
}); });
let ctx = c.clone(); let ctx = c.clone();
let handle_sentbox = std::thread::spawn(move || loop { let handle_sentbox = std::thread::spawn(move || loop {
let mut sentbox = ctx.read().unwrap().create_inbox();
while_running!({ while_running!({
perform_sentbox_fetch(&ctx.read().unwrap()); perform_sentbox_fetch(&ctx.read().unwrap(), &mut sentbox);
while_running!({ while_running!({
perform_sentbox_idle(&ctx.read().unwrap()); perform_sentbox_idle(&ctx.read().unwrap(), &mut sentbox);
}); });
}); });
}); });
@@ -197,18 +205,15 @@ fn start_threads(c: Arc<RwLock<Context>>) {
handle_sentbox: Some(handle_sentbox), handle_sentbox: Some(handle_sentbox),
handle_smtp: Some(handle_smtp), handle_smtp: Some(handle_smtp),
}); });
Some(inbox)
} }
fn stop_threads(context: &Context) { fn stop_threads(_context: &Context) {
if let Some(ref mut handle) = *HANDLE.clone().lock().unwrap() { if let Some(ref mut handle) = *HANDLE.clone().lock().unwrap() {
println!("Stopping threads"); !("Stopping threads");
IS_RUNNING.store(false, Ordering::Relaxed); IS_RUNNING.store(false, Ordering::Relaxed);
// interrupt_imap_idle(context);
interrupt_mvbox_idle(context);
interrupt_sentbox_idle(context);
interrupt_smtp_idle(context);
handle.handle_imap.take().unwrap().join().unwrap(); handle.handle_imap.take().unwrap().join().unwrap();
handle.handle_mvbox.take().unwrap().join().unwrap(); handle.handle_mvbox.take().unwrap().join().unwrap();
handle.handle_sentbox.take().unwrap().join().unwrap(); handle.handle_sentbox.take().unwrap().join().unwrap();
@@ -441,11 +446,16 @@ unsafe fn handle_cmd(line: &str, ctx: Arc<RwLock<Context>>) -> Result<ExitResult
let arg0 = args.next().unwrap_or_default(); let arg0 = args.next().unwrap_or_default();
let arg1 = args.next().unwrap_or_default(); let arg1 = args.next().unwrap_or_default();
let mut inbox = None;
match arg0 { match arg0 {
"connect" => { "connect" => {
start_threads(ctx); if let Some(i) = start_threads(ctx.clone()) {
inbox = Some(i);
};
} }
"disconnect" => { "disconnect" => {
let _ = inbox.take();
stop_threads(&ctx.read().unwrap()); stop_threads(&ctx.read().unwrap());
} }
"smtp-jobs" => { "smtp-jobs" => {
@@ -459,12 +469,16 @@ unsafe fn handle_cmd(line: &str, ctx: Arc<RwLock<Context>>) -> Result<ExitResult
if HANDLE.clone().lock().unwrap().is_some() { if HANDLE.clone().lock().unwrap().is_some() {
println!("imap-jobs are already running in a thread."); println!("imap-jobs are already running in a thread.");
} else { } else {
// perform_imap_jobs(&ctx.read().unwrap()); perform_imap_jobs(
unimplemented!() &ctx.read().unwrap(),
&mut inbox.expect("not connected").lock().unwrap(),
);
} }
} }
"configure" => { "configure" => {
start_threads(ctx.clone()); if let Some(i) = start_threads(ctx.clone()) {
inbox = Some(i);
};
configure(&ctx.read().unwrap()); configure(&ctx.read().unwrap());
} }
"oauth2" => { "oauth2" => {
@@ -488,7 +502,9 @@ unsafe fn handle_cmd(line: &str, ctx: Arc<RwLock<Context>>) -> Result<ExitResult
print!("\x1b[1;1H\x1b[2J"); print!("\x1b[1;1H\x1b[2J");
} }
"getqr" | "getbadqr" => { "getqr" | "getbadqr" => {
start_threads(ctx.clone()); if let Some(i) = start_threads(ctx.clone()) {
inbox = Some(i);
};
if let Some(mut qr) = if let Some(mut qr) =
dc_get_securejoin_qr(&ctx.read().unwrap(), arg1.parse().unwrap_or_default()) dc_get_securejoin_qr(&ctx.read().unwrap(), arg1.parse().unwrap_or_default())
{ {
@@ -507,13 +523,19 @@ unsafe fn handle_cmd(line: &str, ctx: Arc<RwLock<Context>>) -> Result<ExitResult
} }
} }
"joinqr" => { "joinqr" => {
start_threads(ctx.clone()); if let Some(i) = start_threads(ctx.clone()) {
inbox = Some(i);
};
if !arg0.is_empty() { if !arg0.is_empty() {
dc_join_securejoin(&ctx.read().unwrap(), arg1); dc_join_securejoin(&ctx.read().unwrap(), arg1);
} }
} }
"exit" | "quit" => return Ok(ExitResult::Exit), "exit" | "quit" => return Ok(ExitResult::Exit),
_ => dc_cmdline(&ctx.read().unwrap(), line)?, _ => dc_cmdline(
&ctx.read().unwrap(),
inbox.expect("not started").clone(),
line,
)?,
} }
Ok(ExitResult::Continue) Ok(ExitResult::Continue)

View File

@@ -5,7 +5,7 @@ use crate::constants::DC_VERSION_STR;
use crate::context::Context; use crate::context::Context;
use crate::dc_tools::*; use crate::dc_tools::*;
use crate::error::Error; use crate::error::Error;
use crate::job::*; // use crate::job::*;
use crate::stock::StockMessage; use crate::stock::StockMessage;
/// The available configuration keys. /// The available configuration keys.
@@ -129,7 +129,7 @@ impl Context {
} }
Config::MvboxWatch => { Config::MvboxWatch => {
let ret = self.sql.set_raw_config(self, key, value); let ret = self.sql.set_raw_config(self, key, value);
interrupt_mvbox_idle(self); // interrupt_mvbox_idle(self);
ret ret
} }
Config::Selfstatus => { Config::Selfstatus => {

View File

@@ -62,9 +62,11 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context, inbox: &mut Imap) {
let mut param_autoconfig: Option<LoginParam> = None; let mut param_autoconfig: Option<LoginParam> = None;
inbox.disconnect(); // TODO: these need to be disconnected manually now, before starting configure
context.sentbox_thread.write().unwrap().imap.disconnect(); // inbox.disconnect();
context.mvbox_thread.write().unwrap().imap.disconnect(); // context.sentbox_thread.write().unwrap().imap.disconnect();
// context.mvbox_thread.write().unwrap().imap.disconnect();
context.smtp.clone().lock().unwrap().disconnect(); context.smtp.clone().lock().unwrap().disconnect();
info!(context, "Configure ...",); info!(context, "Configure ...",);

View File

@@ -130,12 +130,10 @@ impl Context {
sentbox_thread: Arc::new(RwLock::new(JobThread::new( sentbox_thread: Arc::new(RwLock::new(JobThread::new(
"SENTBOX", "SENTBOX",
"configured_sentbox_folder", "configured_sentbox_folder",
Imap::new(),
))), ))),
mvbox_thread: Arc::new(RwLock::new(JobThread::new( mvbox_thread: Arc::new(RwLock::new(JobThread::new(
"MVBOX", "MVBOX",
"configured_mvbox_folder", "configured_mvbox_folder",
Imap::new(),
))), ))),
probe_imap_network: Arc::new(RwLock::new(false)), probe_imap_network: Arc::new(RwLock::new(false)),
perform_inbox_jobs_needed: Arc::new(RwLock::new(false)), perform_inbox_jobs_needed: Arc::new(RwLock::new(false)),
@@ -155,6 +153,14 @@ impl Context {
Imap::new() Imap::new()
} }
pub fn create_mvbox(&self) -> Imap {
Imap::new()
}
pub fn create_sentbox(&self) -> Imap {
Imap::new()
}
pub fn get_dbfile(&self) -> &Path { pub fn get_dbfile(&self) -> &Path {
self.dbfile.as_path() self.dbfile.as_path()
} }

View File

@@ -369,6 +369,8 @@ impl Imap {
pub fn disconnect(&mut self) { pub fn disconnect(&mut self) {
task::block_on(async move { task::block_on(async move {
self.interrupt.take();
if self.is_connected().await { if self.is_connected().await {
self.unsetup_handle(None).await; self.unsetup_handle(None).await;
self.free_connect_params().await; self.free_connect_params().await;

View File

@@ -412,60 +412,60 @@ pub fn perform_imap_idle(context: &Context, inbox: &mut Imap) {
info!(context, "INBOX-IDLE ended."); info!(context, "INBOX-IDLE ended.");
} }
pub fn perform_mvbox_fetch(context: &Context) { pub fn perform_mvbox_fetch(context: &Context, imap: &mut Imap) {
let use_network = context.get_config_bool(Config::MvboxWatch); let use_network = context.get_config_bool(Config::MvboxWatch);
context context
.mvbox_thread .mvbox_thread
.write() .read()
.unwrap() .unwrap()
.fetch(context, use_network); .fetch(context, use_network, imap);
} }
pub fn perform_mvbox_idle(context: &Context) { pub fn perform_mvbox_idle(context: &Context, imap: &mut Imap) {
let use_network = context.get_config_bool(Config::MvboxWatch); let use_network = context.get_config_bool(Config::MvboxWatch);
context context
.mvbox_thread .mvbox_thread
.write() .read()
.unwrap() .unwrap()
.idle(context, use_network); .idle(context, use_network, imap);
} }
pub fn interrupt_mvbox_idle(context: &Context) { pub fn interrupt_mvbox_idle(context: &Context, imap: &mut Imap) {
context context
.mvbox_thread .mvbox_thread
.write() .read()
.unwrap() .unwrap()
.interrupt_idle(context); .interrupt_idle(context, imap);
} }
pub fn perform_sentbox_fetch(context: &Context) { pub fn perform_sentbox_fetch(context: &Context, imap: &mut Imap) {
let use_network = context.get_config_bool(Config::SentboxWatch); let use_network = context.get_config_bool(Config::SentboxWatch);
context context
.sentbox_thread .sentbox_thread
.write() .read()
.unwrap() .unwrap()
.fetch(context, use_network); .fetch(context, use_network, imap);
} }
pub fn perform_sentbox_idle(context: &Context) { pub fn perform_sentbox_idle(context: &Context, imap: &mut Imap) {
let use_network = context.get_config_bool(Config::SentboxWatch); let use_network = context.get_config_bool(Config::SentboxWatch);
context context
.sentbox_thread .sentbox_thread
.write() .read()
.unwrap() .unwrap()
.idle(context, use_network); .idle(context, use_network, imap);
} }
pub fn interrupt_sentbox_idle(context: &Context) { pub fn interrupt_sentbox_idle(context: &Context, imap: &mut Imap) {
context context
.sentbox_thread .sentbox_thread
.write() .read()
.unwrap() .unwrap()
.interrupt_idle(context); .interrupt_idle(context, imap);
} }
pub fn perform_smtp_jobs(context: &Context) { pub fn perform_smtp_jobs(context: &Context) {
@@ -550,7 +550,7 @@ fn get_next_wakeup_time(context: &Context, thread: Thread) -> Duration {
wakeup_time wakeup_time
} }
pub fn maybe_network(context: &Context, inbox: &mut Imap) { pub fn maybe_network(context: &Context, _inbox: &mut Imap) {
{ {
let &(ref lock, _) = &*context.smtp_state.clone(); let &(ref lock, _) = &*context.smtp_state.clone();
let mut state = lock.lock().unwrap(); let mut state = lock.lock().unwrap();
@@ -560,9 +560,10 @@ pub fn maybe_network(context: &Context, inbox: &mut Imap) {
} }
interrupt_smtp_idle(context); interrupt_smtp_idle(context);
interrupt_imap_idle(context, inbox); // TODO: manually
interrupt_mvbox_idle(context); // interrupt_imap_idle(context, inbox);
interrupt_sentbox_idle(context); // interrupt_mvbox_idle(context);
// interrupt_sentbox_idle(context);
} }
pub fn job_action_exists(context: &Context, action: Action) -> bool { pub fn job_action_exists(context: &Context, action: Action) -> bool {
@@ -766,8 +767,9 @@ fn job_perform(
// - they can be re-executed one time AT_ONCE, but they are not save in the database for later execution // - they can be re-executed one time AT_ONCE, but they are not save in the database for later execution
if Action::ConfigureImap == job.action || Action::ImexImap == job.action { if Action::ConfigureImap == job.action || Action::ImexImap == job.action {
job_kill_action(context, job.action); job_kill_action(context, job.action);
context.sentbox_thread.write().unwrap().suspend(context); // TODO: figure out better way
context.mvbox_thread.write().unwrap().suspend(context); // context.sentbox_thread.write().unwrap().suspend(context);
// context.mvbox_thread.write().unwrap().suspend(context);
suspend_smtp_thread(context, true); suspend_smtp_thread(context, true);
} }

View File

@@ -8,7 +8,6 @@ use crate::imap::Imap;
pub struct JobThread { pub struct JobThread {
pub name: &'static str, pub name: &'static str,
pub folder_config_name: &'static str, pub folder_config_name: &'static str,
pub imap: Imap,
pub state: Arc<(Mutex<JobState>, Condvar)>, pub state: Arc<(Mutex<JobState>, Condvar)>,
} }
@@ -21,21 +20,20 @@ pub struct JobState {
} }
impl JobThread { impl JobThread {
pub fn new(name: &'static str, folder_config_name: &'static str, imap: Imap) -> Self { pub fn new(name: &'static str, folder_config_name: &'static str) -> Self {
JobThread { JobThread {
name, name,
folder_config_name, folder_config_name,
imap,
state: Arc::new((Mutex::new(Default::default()), Condvar::new())), state: Arc::new((Mutex::new(Default::default()), Condvar::new())),
} }
} }
pub fn suspend(&mut self, context: &Context) { pub fn suspend(&self, context: &Context, imap: &mut Imap) {
info!(context, "Suspending {}-thread.", self.name,); info!(context, "Suspending {}-thread.", self.name,);
{ {
self.state.0.lock().unwrap().suspended = true; self.state.0.lock().unwrap().suspended = true;
} }
self.interrupt_idle(context); self.interrupt_idle(context, imap);
loop { loop {
let using_handle = self.state.0.lock().unwrap().using_handle; let using_handle = self.state.0.lock().unwrap().using_handle;
if !using_handle { if !using_handle {
@@ -56,14 +54,14 @@ impl JobThread {
cvar.notify_one(); cvar.notify_one();
} }
pub fn interrupt_idle(&mut self, context: &Context) { pub fn interrupt_idle(&self, context: &Context, imap: &mut Imap) {
{ {
self.state.0.lock().unwrap().jobs_needed = true; self.state.0.lock().unwrap().jobs_needed = true;
} }
info!(context, "Interrupting {}-IDLE...", self.name); info!(context, "Interrupting {}-IDLE...", self.name);
self.imap.interrupt_idle(); imap.interrupt_idle();
let &(ref lock, ref cvar) = &*self.state.clone(); let &(ref lock, ref cvar) = &*self.state.clone();
let mut state = lock.lock().unwrap(); let mut state = lock.lock().unwrap();
@@ -72,7 +70,7 @@ impl JobThread {
cvar.notify_one(); cvar.notify_one();
} }
pub fn fetch(&mut self, context: &Context, use_network: bool) { pub fn fetch(&self, context: &Context, use_network: bool, imap: &mut Imap) {
{ {
let &(ref lock, _) = &*self.state.clone(); let &(ref lock, _) = &*self.state.clone();
let mut state = lock.lock().unwrap(); let mut state = lock.lock().unwrap();
@@ -86,13 +84,13 @@ impl JobThread {
if use_network { if use_network {
let start = std::time::Instant::now(); let start = std::time::Instant::now();
if self.connect_to_imap(context) { if self.connect_to_imap(context, imap) {
info!(context, "{}-fetch started...", self.name); info!(context, "{}-fetch started...", self.name);
self.imap.fetch(context); imap.fetch(context);
if self.imap.should_reconnect() { if imap.should_reconnect() {
info!(context, "{}-fetch aborted, starting over...", self.name,); info!(context, "{}-fetch aborted, starting over...", self.name,);
self.imap.fetch(context); imap.fetch(context);
} }
info!( info!(
context, context,
@@ -106,13 +104,13 @@ impl JobThread {
self.state.0.lock().unwrap().using_handle = false; self.state.0.lock().unwrap().using_handle = false;
} }
fn connect_to_imap(&mut self, context: &Context) -> bool { fn connect_to_imap(&self, context: &Context, imap: &mut Imap) -> bool {
async_std::task::block_on(async move { async_std::task::block_on(async move {
if self.imap.is_connected().await { if imap.is_connected().await {
return true; return true;
} }
let mut ret_connected = dc_connect_to_configured_imap(context, &mut self.imap) != 0; let mut ret_connected = dc_connect_to_configured_imap(context, imap) != 0;
if ret_connected { if ret_connected {
if context if context
@@ -121,15 +119,15 @@ impl JobThread {
.unwrap_or_default() .unwrap_or_default()
< 3 < 3
{ {
self.imap.configure_folders(context, 0x1); imap.configure_folders(context, 0x1);
} }
if let Some(mvbox_name) = if let Some(mvbox_name) =
context.sql.get_raw_config(context, self.folder_config_name) context.sql.get_raw_config(context, self.folder_config_name)
{ {
self.imap.set_watch_folder(mvbox_name); imap.set_watch_folder(mvbox_name);
} else { } else {
self.imap.disconnect(); imap.disconnect();
ret_connected = false; ret_connected = false;
} }
} }
@@ -138,7 +136,7 @@ impl JobThread {
}) })
} }
pub fn idle(&mut self, context: &Context, use_network: bool) { pub fn idle(&self, context: &Context, use_network: bool, imap: &mut Imap) {
{ {
let &(ref lock, ref cvar) = &*self.state.clone(); let &(ref lock, ref cvar) = &*self.state.clone();
let mut state = lock.lock().unwrap(); let mut state = lock.lock().unwrap();
@@ -174,9 +172,9 @@ impl JobThread {
} }
} }
self.connect_to_imap(context); self.connect_to_imap(context, imap);
info!(context, "{}-IDLE started...", self.name,); info!(context, "{}-IDLE started...", self.name);
self.imap.idle(context); imap.idle(context);
info!(context, "{}-IDLE ended.", self.name); info!(context, "{}-IDLE ended.", self.name);
self.state.0.lock().unwrap().using_handle = false; self.state.0.lock().unwrap().using_handle = false;

View File

@@ -39,7 +39,7 @@ pub mod constants;
pub mod contact; pub mod contact;
pub mod context; pub mod context;
mod e2ee; mod e2ee;
mod imap; pub mod imap;
mod imap_client; mod imap_client;
pub mod imex; pub mod imex;
pub mod job; pub mod job;