diff --git a/src/configure/mod.rs b/src/configure/mod.rs index b73aff7f5..726883e66 100644 --- a/src/configure/mod.rs +++ b/src/configure/mod.rs @@ -52,402 +52,403 @@ impl Context { ******************************************************************************/ #[allow(clippy::cognitive_complexity)] pub(crate) async fn job_configure_imap(context: &Context) -> job::Status { - if !context.sql.is_open().await { - error!(context, "Cannot configure, database not opened.",); - progress!(context, 0); - return job::Status::Finished(Err(format_err!("Database not opened"))); - } - if !context.alloc_ongoing().await { - progress!(context, 0); - return job::Status::Finished(Err(format_err!("Cannot allocated ongoing process"))); - } - let mut success = false; - let mut imap_connected_here = false; - let mut smtp_connected_here = false; + unimplemented!() + // if !context.sql.is_open().await { + // error!(context, "Cannot configure, database not opened.",); + // progress!(context, 0); + // return job::Status::Finished(Err(format_err!("Database not opened"))); + // } + // if !context.alloc_ongoing().await { + // progress!(context, 0); + // return job::Status::Finished(Err(format_err!("Cannot allocated ongoing process"))); + // } + // let mut success = false; + // let mut imap_connected_here = false; + // let mut smtp_connected_here = false; - let mut param_autoconfig: Option = None; + // let mut param_autoconfig: Option = None; - context.inbox_thread.imap.disconnect(context).await; - context.sentbox_thread.imap.disconnect(context).await; - context.mvbox_thread.imap.disconnect(context).await; - context.smtp.disconnect().await; + // // context.inbox_thread.imap.disconnect(context).await; + // // context.sentbox_thread.imap.disconnect(context).await; + // // context.mvbox_thread.imap.disconnect(context).await; + // // context.smtp.disconnect().await; - info!(context, "Configure ...",); + // info!(context, "Configure ...",); - // Variables that are shared between steps: - let mut param = LoginParam::from_database(context, "").await; - // need all vars here to be mutable because rust thinks the same step could be called multiple times - // and also initialize, because otherwise rust thinks it's used while unitilized, even if thats not the case as the loop goes only forward - let mut param_domain = "undefined.undefined".to_owned(); - let mut param_addr_urlencoded: String = - "Internal Error: this value should never be used".to_owned(); - let mut keep_flags = 0; + // // Variables that are shared between steps: + // let mut param = LoginParam::from_database(context, "").await; + // // need all vars here to be mutable because rust thinks the same step could be called multiple times + // // and also initialize, because otherwise rust thinks it's used while unitilized, even if thats not the case as the loop goes only forward + // let mut param_domain = "undefined.undefined".to_owned(); + // let mut param_addr_urlencoded: String = + // "Internal Error: this value should never be used".to_owned(); + // let mut keep_flags = 0; - const STEP_12_USE_AUTOCONFIG: u8 = 12; - const STEP_13_AFTER_AUTOCONFIG: u8 = 13; + // const STEP_12_USE_AUTOCONFIG: u8 = 12; + // const STEP_13_AFTER_AUTOCONFIG: u8 = 13; - let mut step_counter: u8 = 0; - while !context.shall_stop_ongoing().await { - step_counter += 1; + // let mut step_counter: u8 = 0; + // while !context.shall_stop_ongoing().await { + // step_counter += 1; - let success = match step_counter { - // Read login parameters from the database - 1 => { - progress!(context, 1); - if param.addr.is_empty() { - error!(context, "Please enter an email address.",); - } - !param.addr.is_empty() - } - // Step 1: Load the parameters and check email-address and password - 2 => { - if 0 != param.server_flags & DC_LP_AUTH_OAUTH2 { - // the used oauth2 addr may differ, check this. - // if dc_get_oauth2_addr() is not available in the oauth2 implementation, - // just use the given one. - progress!(context, 10); - if let Some(oauth2_addr) = - dc_get_oauth2_addr(context, ¶m.addr, ¶m.mail_pw) - .await - .and_then(|e| e.parse().ok()) - { - info!(context, "Authorized address is {}", oauth2_addr); - param.addr = oauth2_addr; - context - .sql - .set_raw_config(context, "addr", Some(param.addr.as_str())) - .await - .ok(); - } - progress!(context, 20); - } - true // no oauth? - just continue it's no error - } - 3 => { - if let Ok(parsed) = param.addr.parse() { - let parsed: EmailAddress = parsed; - param_domain = parsed.domain; - param_addr_urlencoded = - utf8_percent_encode(¶m.addr, NON_ALPHANUMERIC).to_string(); - true - } else { - error!(context, "Bad email-address."); - false - } - } - // Step 2: Autoconfig - 4 => { - progress!(context, 200); + // let success = match step_counter { + // // Read login parameters from the database + // 1 => { + // progress!(context, 1); + // if param.addr.is_empty() { + // error!(context, "Please enter an email address.",); + // } + // !param.addr.is_empty() + // } + // // Step 1: Load the parameters and check email-address and password + // 2 => { + // if 0 != param.server_flags & DC_LP_AUTH_OAUTH2 { + // // the used oauth2 addr may differ, check this. + // // if dc_get_oauth2_addr() is not available in the oauth2 implementation, + // // just use the given one. + // progress!(context, 10); + // if let Some(oauth2_addr) = + // dc_get_oauth2_addr(context, ¶m.addr, ¶m.mail_pw) + // .await + // .and_then(|e| e.parse().ok()) + // { + // info!(context, "Authorized address is {}", oauth2_addr); + // param.addr = oauth2_addr; + // context + // .sql + // .set_raw_config(context, "addr", Some(param.addr.as_str())) + // .await + // .ok(); + // } + // progress!(context, 20); + // } + // true // no oauth? - just continue it's no error + // } + // 3 => { + // if let Ok(parsed) = param.addr.parse() { + // let parsed: EmailAddress = parsed; + // param_domain = parsed.domain; + // param_addr_urlencoded = + // utf8_percent_encode(¶m.addr, NON_ALPHANUMERIC).to_string(); + // true + // } else { + // error!(context, "Bad email-address."); + // false + // } + // } + // // Step 2: Autoconfig + // 4 => { + // progress!(context, 200); - if param.mail_server.is_empty() - && param.mail_port == 0 - /*&¶m.mail_user.is_empty() -- the user can enter a loginname which is used by autoconfig then */ - && param.send_server.is_empty() - && param.send_port == 0 - && param.send_user.is_empty() - /*&¶m.send_pw.is_empty() -- the password cannot be auto-configured and is no criterion for autoconfig or not */ - && (param.server_flags & !DC_LP_AUTH_OAUTH2) == 0 - { - // no advanced parameters entered by the user: query provider-database or do Autoconfig - keep_flags = param.server_flags & DC_LP_AUTH_OAUTH2; - if let Some(new_param) = get_offline_autoconfig(context, ¶m) { - // got parameters from our provider-database, skip Autoconfig, preserve the OAuth2 setting - param_autoconfig = Some(new_param); - step_counter = STEP_12_USE_AUTOCONFIG - 1; // minus one as step_counter is increased on next loop - } - } else { - // advanced parameters entered by the user: skip Autoconfig - step_counter = STEP_13_AFTER_AUTOCONFIG - 1; // minus one as step_counter is increased on next loop - } - true - } - /* A. Search configurations from the domain used in the email-address, prefer encrypted */ - 5 => { - if param_autoconfig.is_none() { - let url = format!( - "https://autoconfig.{}/mail/config-v1.1.xml?emailaddress={}", - param_domain, param_addr_urlencoded - ); - param_autoconfig = moz_autoconfigure(context, &url, ¶m).ok(); - } - true - } - 6 => { - progress!(context, 300); - if param_autoconfig.is_none() { - // the doc does not mention `emailaddress=`, however, Thunderbird adds it, see https://releases.mozilla.org/pub/thunderbird/ , which makes some sense - let url = format!( - "https://{}/.well-known/autoconfig/mail/config-v1.1.xml?emailaddress={}", - param_domain, param_addr_urlencoded - ); - param_autoconfig = moz_autoconfigure(context, &url, ¶m).ok(); - } - true - } - /* Outlook section start ------------- */ - /* Outlook uses always SSL but different domains (this comment describes the next two steps) */ - 7 => { - progress!(context, 310); - if param_autoconfig.is_none() { - let url = format!("https://{}/autodiscover/autodiscover.xml", param_domain); - param_autoconfig = outlk_autodiscover(context, &url, ¶m).ok(); - } - true - } - 8 => { - progress!(context, 320); - if param_autoconfig.is_none() { - let url = format!( - "https://{}{}/autodiscover/autodiscover.xml", - "autodiscover.", param_domain - ); - param_autoconfig = outlk_autodiscover(context, &url, ¶m).ok(); - } - true - } - /* ----------- Outlook section end */ - 9 => { - progress!(context, 330); - if param_autoconfig.is_none() { - let url = format!( - "http://autoconfig.{}/mail/config-v1.1.xml?emailaddress={}", - param_domain, param_addr_urlencoded - ); - param_autoconfig = moz_autoconfigure(context, &url, ¶m).ok(); - } - true - } - 10 => { - progress!(context, 340); - if param_autoconfig.is_none() { - // do not transfer the email-address unencrypted - let url = format!( - "http://{}/.well-known/autoconfig/mail/config-v1.1.xml", - param_domain - ); - param_autoconfig = moz_autoconfigure(context, &url, ¶m).ok(); - } - true - } - /* B. If we have no configuration yet, search configuration in Thunderbird's centeral database */ - 11 => { - progress!(context, 350); - if param_autoconfig.is_none() { - /* always SSL for Thunderbird's database */ - let url = format!("https://autoconfig.thunderbird.net/v1.1/{}", param_domain); - param_autoconfig = moz_autoconfigure(context, &url, ¶m).ok(); - } - true - } - /* C. Do we have any autoconfig result? - If you change the match-number here, also update STEP_12_COPY_AUTOCONFIG above - */ - STEP_12_USE_AUTOCONFIG => { - progress!(context, 500); - if let Some(ref cfg) = param_autoconfig { - info!(context, "Got autoconfig: {}", &cfg); - if !cfg.mail_user.is_empty() { - param.mail_user = cfg.mail_user.clone(); - } - param.mail_server = cfg.mail_server.clone(); /* all other values are always NULL when entering autoconfig */ - param.mail_port = cfg.mail_port; - param.send_server = cfg.send_server.clone(); - param.send_port = cfg.send_port; - param.send_user = cfg.send_user.clone(); - param.server_flags = cfg.server_flags; - /* although param_autoconfig's data are no longer needed from, - it is used to later to prevent trying variations of port/server/logins */ - } - param.server_flags |= keep_flags; - true - } - // Step 3: Fill missing fields with defaults - // If you change the match-number here, also update STEP_13_AFTER_AUTOCONFIG above - STEP_13_AFTER_AUTOCONFIG => { - if param.mail_server.is_empty() { - param.mail_server = format!("imap.{}", param_domain,) - } - if param.mail_port == 0 { - param.mail_port = if 0 != param.server_flags & (0x100 | 0x400) { - 143 - } else { - 993 - } - } - if param.mail_user.is_empty() { - param.mail_user = param.addr.clone(); - } - if param.send_server.is_empty() && !param.mail_server.is_empty() { - param.send_server = param.mail_server.clone(); - if param.send_server.starts_with("imap.") { - param.send_server = param.send_server.replacen("imap", "smtp", 1); - } - } - if param.send_port == 0 { - param.send_port = if 0 != param.server_flags & DC_LP_SMTP_SOCKET_STARTTLS as i32 - { - 587 - } else if 0 != param.server_flags & DC_LP_SMTP_SOCKET_PLAIN as i32 { - 25 - } else { - 465 - } - } - if param.send_user.is_empty() && !param.mail_user.is_empty() { - param.send_user = param.mail_user.clone(); - } - if param.send_pw.is_empty() && !param.mail_pw.is_empty() { - param.send_pw = param.mail_pw.clone() - } - if !dc_exactly_one_bit_set(param.server_flags & DC_LP_AUTH_FLAGS as i32) { - param.server_flags &= !(DC_LP_AUTH_FLAGS as i32); - param.server_flags |= DC_LP_AUTH_NORMAL as i32 - } - if !dc_exactly_one_bit_set(param.server_flags & DC_LP_IMAP_SOCKET_FLAGS as i32) { - param.server_flags &= !(DC_LP_IMAP_SOCKET_FLAGS as i32); - param.server_flags |= if param.send_port == 143 { - DC_LP_IMAP_SOCKET_STARTTLS as i32 - } else { - DC_LP_IMAP_SOCKET_SSL as i32 - } - } - if !dc_exactly_one_bit_set(param.server_flags & (DC_LP_SMTP_SOCKET_FLAGS as i32)) { - param.server_flags &= !(DC_LP_SMTP_SOCKET_FLAGS as i32); - param.server_flags |= if param.send_port == 587 { - DC_LP_SMTP_SOCKET_STARTTLS as i32 - } else if param.send_port == 25 { - DC_LP_SMTP_SOCKET_PLAIN as i32 - } else { - DC_LP_SMTP_SOCKET_SSL as i32 - } - } - /* do we have a complete configuration? */ - if param.mail_server.is_empty() - || param.mail_port == 0 - || param.mail_user.is_empty() - || param.mail_pw.is_empty() - || param.send_server.is_empty() - || param.send_port == 0 - || param.send_user.is_empty() - || param.send_pw.is_empty() - || param.server_flags == 0 - { - error!(context, "Account settings incomplete."); - false - } else { - true - } - } - 14 => { - progress!(context, 600); - /* try to connect to IMAP - if we did not got an autoconfig, - do some further tries with different settings and username variations */ - imap_connected_here = - try_imap_connections(context, &mut param, param_autoconfig.is_some()).await; - imap_connected_here - } - 15 => { - progress!(context, 800); - smtp_connected_here = - try_smtp_connections(context, &mut param, param_autoconfig.is_some()).await; - smtp_connected_here - } - 16 => { - progress!(context, 900); - let create_mvbox = context.get_config_bool(Config::MvboxWatch).await - || context.get_config_bool(Config::MvboxMove).await; - let imap = &context.inbox_thread.imap; - if let Err(err) = imap.ensure_configured_folders(context, create_mvbox).await { - warn!(context, "configuring folders failed: {:?}", err); - false - } else { - let res = imap.select_with_uidvalidity(context, "INBOX").await; - if let Err(err) = res { - error!(context, "could not read INBOX status: {:?}", err); - false - } else { - true - } - } - } - 17 => { - progress!(context, 910); - /* configuration success - write back the configured parameters with the "configured_" prefix; also write the "configured"-flag */ - param - .save_to_database( - context, - "configured_", /*the trailing underscore is correct*/ - ) - .await - .ok(); + // if param.mail_server.is_empty() + // && param.mail_port == 0 + // /*&¶m.mail_user.is_empty() -- the user can enter a loginname which is used by autoconfig then */ + // && param.send_server.is_empty() + // && param.send_port == 0 + // && param.send_user.is_empty() + // /*&¶m.send_pw.is_empty() -- the password cannot be auto-configured and is no criterion for autoconfig or not */ + // && (param.server_flags & !DC_LP_AUTH_OAUTH2) == 0 + // { + // // no advanced parameters entered by the user: query provider-database or do Autoconfig + // keep_flags = param.server_flags & DC_LP_AUTH_OAUTH2; + // if let Some(new_param) = get_offline_autoconfig(context, ¶m) { + // // got parameters from our provider-database, skip Autoconfig, preserve the OAuth2 setting + // param_autoconfig = Some(new_param); + // step_counter = STEP_12_USE_AUTOCONFIG - 1; // minus one as step_counter is increased on next loop + // } + // } else { + // // advanced parameters entered by the user: skip Autoconfig + // step_counter = STEP_13_AFTER_AUTOCONFIG - 1; // minus one as step_counter is increased on next loop + // } + // true + // } + // /* A. Search configurations from the domain used in the email-address, prefer encrypted */ + // 5 => { + // if param_autoconfig.is_none() { + // let url = format!( + // "https://autoconfig.{}/mail/config-v1.1.xml?emailaddress={}", + // param_domain, param_addr_urlencoded + // ); + // param_autoconfig = moz_autoconfigure(context, &url, ¶m).ok(); + // } + // true + // } + // 6 => { + // progress!(context, 300); + // if param_autoconfig.is_none() { + // // the doc does not mention `emailaddress=`, however, Thunderbird adds it, see https://releases.mozilla.org/pub/thunderbird/ , which makes some sense + // let url = format!( + // "https://{}/.well-known/autoconfig/mail/config-v1.1.xml?emailaddress={}", + // param_domain, param_addr_urlencoded + // ); + // param_autoconfig = moz_autoconfigure(context, &url, ¶m).ok(); + // } + // true + // } + // /* Outlook section start ------------- */ + // /* Outlook uses always SSL but different domains (this comment describes the next two steps) */ + // 7 => { + // progress!(context, 310); + // if param_autoconfig.is_none() { + // let url = format!("https://{}/autodiscover/autodiscover.xml", param_domain); + // param_autoconfig = outlk_autodiscover(context, &url, ¶m).ok(); + // } + // true + // } + // 8 => { + // progress!(context, 320); + // if param_autoconfig.is_none() { + // let url = format!( + // "https://{}{}/autodiscover/autodiscover.xml", + // "autodiscover.", param_domain + // ); + // param_autoconfig = outlk_autodiscover(context, &url, ¶m).ok(); + // } + // true + // } + // /* ----------- Outlook section end */ + // 9 => { + // progress!(context, 330); + // if param_autoconfig.is_none() { + // let url = format!( + // "http://autoconfig.{}/mail/config-v1.1.xml?emailaddress={}", + // param_domain, param_addr_urlencoded + // ); + // param_autoconfig = moz_autoconfigure(context, &url, ¶m).ok(); + // } + // true + // } + // 10 => { + // progress!(context, 340); + // if param_autoconfig.is_none() { + // // do not transfer the email-address unencrypted + // let url = format!( + // "http://{}/.well-known/autoconfig/mail/config-v1.1.xml", + // param_domain + // ); + // param_autoconfig = moz_autoconfigure(context, &url, ¶m).ok(); + // } + // true + // } + // /* B. If we have no configuration yet, search configuration in Thunderbird's centeral database */ + // 11 => { + // progress!(context, 350); + // if param_autoconfig.is_none() { + // /* always SSL for Thunderbird's database */ + // let url = format!("https://autoconfig.thunderbird.net/v1.1/{}", param_domain); + // param_autoconfig = moz_autoconfigure(context, &url, ¶m).ok(); + // } + // true + // } + // /* C. Do we have any autoconfig result? + // If you change the match-number here, also update STEP_12_COPY_AUTOCONFIG above + // */ + // STEP_12_USE_AUTOCONFIG => { + // progress!(context, 500); + // if let Some(ref cfg) = param_autoconfig { + // info!(context, "Got autoconfig: {}", &cfg); + // if !cfg.mail_user.is_empty() { + // param.mail_user = cfg.mail_user.clone(); + // } + // param.mail_server = cfg.mail_server.clone(); /* all other values are always NULL when entering autoconfig */ + // param.mail_port = cfg.mail_port; + // param.send_server = cfg.send_server.clone(); + // param.send_port = cfg.send_port; + // param.send_user = cfg.send_user.clone(); + // param.server_flags = cfg.server_flags; + // /* although param_autoconfig's data are no longer needed from, + // it is used to later to prevent trying variations of port/server/logins */ + // } + // param.server_flags |= keep_flags; + // true + // } + // // Step 3: Fill missing fields with defaults + // // If you change the match-number here, also update STEP_13_AFTER_AUTOCONFIG above + // STEP_13_AFTER_AUTOCONFIG => { + // if param.mail_server.is_empty() { + // param.mail_server = format!("imap.{}", param_domain,) + // } + // if param.mail_port == 0 { + // param.mail_port = if 0 != param.server_flags & (0x100 | 0x400) { + // 143 + // } else { + // 993 + // } + // } + // if param.mail_user.is_empty() { + // param.mail_user = param.addr.clone(); + // } + // if param.send_server.is_empty() && !param.mail_server.is_empty() { + // param.send_server = param.mail_server.clone(); + // if param.send_server.starts_with("imap.") { + // param.send_server = param.send_server.replacen("imap", "smtp", 1); + // } + // } + // if param.send_port == 0 { + // param.send_port = if 0 != param.server_flags & DC_LP_SMTP_SOCKET_STARTTLS as i32 + // { + // 587 + // } else if 0 != param.server_flags & DC_LP_SMTP_SOCKET_PLAIN as i32 { + // 25 + // } else { + // 465 + // } + // } + // if param.send_user.is_empty() && !param.mail_user.is_empty() { + // param.send_user = param.mail_user.clone(); + // } + // if param.send_pw.is_empty() && !param.mail_pw.is_empty() { + // param.send_pw = param.mail_pw.clone() + // } + // if !dc_exactly_one_bit_set(param.server_flags & DC_LP_AUTH_FLAGS as i32) { + // param.server_flags &= !(DC_LP_AUTH_FLAGS as i32); + // param.server_flags |= DC_LP_AUTH_NORMAL as i32 + // } + // if !dc_exactly_one_bit_set(param.server_flags & DC_LP_IMAP_SOCKET_FLAGS as i32) { + // param.server_flags &= !(DC_LP_IMAP_SOCKET_FLAGS as i32); + // param.server_flags |= if param.send_port == 143 { + // DC_LP_IMAP_SOCKET_STARTTLS as i32 + // } else { + // DC_LP_IMAP_SOCKET_SSL as i32 + // } + // } + // if !dc_exactly_one_bit_set(param.server_flags & (DC_LP_SMTP_SOCKET_FLAGS as i32)) { + // param.server_flags &= !(DC_LP_SMTP_SOCKET_FLAGS as i32); + // param.server_flags |= if param.send_port == 587 { + // DC_LP_SMTP_SOCKET_STARTTLS as i32 + // } else if param.send_port == 25 { + // DC_LP_SMTP_SOCKET_PLAIN as i32 + // } else { + // DC_LP_SMTP_SOCKET_SSL as i32 + // } + // } + // /* do we have a complete configuration? */ + // if param.mail_server.is_empty() + // || param.mail_port == 0 + // || param.mail_user.is_empty() + // || param.mail_pw.is_empty() + // || param.send_server.is_empty() + // || param.send_port == 0 + // || param.send_user.is_empty() + // || param.send_pw.is_empty() + // || param.server_flags == 0 + // { + // error!(context, "Account settings incomplete."); + // false + // } else { + // true + // } + // } + // 14 => { + // progress!(context, 600); + // /* try to connect to IMAP - if we did not got an autoconfig, + // do some further tries with different settings and username variations */ + // imap_connected_here = + // try_imap_connections(context, &mut param, param_autoconfig.is_some()).await; + // imap_connected_here + // } + // 15 => { + // progress!(context, 800); + // smtp_connected_here = + // try_smtp_connections(context, &mut param, param_autoconfig.is_some()).await; + // smtp_connected_here + // } + // 16 => { + // progress!(context, 900); + // let create_mvbox = context.get_config_bool(Config::MvboxWatch).await + // || context.get_config_bool(Config::MvboxMove).await; + // let imap = &context.inbox_thread.imap; + // if let Err(err) = imap.ensure_configured_folders(context, create_mvbox).await { + // warn!(context, "configuring folders failed: {:?}", err); + // false + // } else { + // let res = imap.select_with_uidvalidity(context, "INBOX").await; + // if let Err(err) = res { + // error!(context, "could not read INBOX status: {:?}", err); + // false + // } else { + // true + // } + // } + // } + // 17 => { + // progress!(context, 910); + // /* configuration success - write back the configured parameters with the "configured_" prefix; also write the "configured"-flag */ + // param + // .save_to_database( + // context, + // "configured_", /*the trailing underscore is correct*/ + // ) + // .await + // .ok(); - context - .sql - .set_raw_config_bool(context, "configured", true) - .await - .ok(); - true - } - 18 => { - progress!(context, 920); - // we generate the keypair just now - we could also postpone this until the first message is sent, however, - // this may result in a unexpected and annoying delay when the user sends his very first message - // (~30 seconds on a Moto G4 play) and might looks as if message sending is always that slow. - success = e2ee::ensure_secret_key_exists(context).await.is_ok(); - info!(context, "key generation completed"); - progress!(context, 940); - break; // We are done here - } - _ => { - error!(context, "Internal error: step counter out of bound",); - break; - } - }; + // context + // .sql + // .set_raw_config_bool(context, "configured", true) + // .await + // .ok(); + // true + // } + // 18 => { + // progress!(context, 920); + // // we generate the keypair just now - we could also postpone this until the first message is sent, however, + // // this may result in a unexpected and annoying delay when the user sends his very first message + // // (~30 seconds on a Moto G4 play) and might looks as if message sending is always that slow. + // success = e2ee::ensure_secret_key_exists(context).await.is_ok(); + // info!(context, "key generation completed"); + // progress!(context, 940); + // break; // We are done here + // } + // _ => { + // error!(context, "Internal error: step counter out of bound",); + // break; + // } + // }; - if !success { - break; - } - } - if imap_connected_here { - context.inbox_thread.imap.disconnect(context).await; - } - if smtp_connected_here { - context.smtp.disconnect().await; - } + // if !success { + // break; + // } + // } + // if imap_connected_here { + // context.inbox_thread.imap.disconnect(context).await; + // } + // if smtp_connected_here { + // context.smtp.disconnect().await; + // } - // remember the entered parameters on success - // and restore to last-entered on failure. - // this way, the parameters visible to the ui are always in-sync with the current configuration. - if success { - LoginParam::from_database(context, "") - .await - .save_to_database(context, "configured_raw_") - .await - .ok(); - } else { - LoginParam::from_database(context, "configured_raw_") - .await - .save_to_database(context, "") - .await - .ok(); - } + // // remember the entered parameters on success + // // and restore to last-entered on failure. + // // this way, the parameters visible to the ui are always in-sync with the current configuration. + // if success { + // LoginParam::from_database(context, "") + // .await + // .save_to_database(context, "configured_raw_") + // .await + // .ok(); + // } else { + // LoginParam::from_database(context, "configured_raw_") + // .await + // .save_to_database(context, "") + // .await + // .ok(); + // } - if let Some(provider) = provider::get_provider_info(¶m.addr) { - if !provider.after_login_hint.is_empty() { - let mut msg = Message::new(Viewtype::Text); - msg.text = Some(provider.after_login_hint.to_string()); - if chat::add_device_msg(context, Some("core-provider-info"), Some(&mut msg)) - .await - .is_err() - { - warn!(context, "cannot add after_login_hint as core-provider-info"); - } - } - } + // if let Some(provider) = provider::get_provider_info(¶m.addr) { + // if !provider.after_login_hint.is_empty() { + // let mut msg = Message::new(Viewtype::Text); + // msg.text = Some(provider.after_login_hint.to_string()); + // if chat::add_device_msg(context, Some("core-provider-info"), Some(&mut msg)) + // .await + // .is_err() + // { + // warn!(context, "cannot add after_login_hint as core-provider-info"); + // } + // } + // } - context.free_ongoing().await; - progress!(context, if success { 1000 } else { 0 }); - job::Status::Finished(Ok(())) + // context.free_ongoing().await; + // progress!(context, if success { 1000 } else { 0 }); + // job::Status::Finished(Ok(())) } #[allow(clippy::unnecessary_unwrap)] @@ -562,24 +563,25 @@ async fn try_imap_connection( } async fn try_imap_one_param(context: &Context, param: &LoginParam) -> Option { - let inf = format!( - "imap: {}@{}:{} flags=0x{:x} certificate_checks={}", - param.mail_user, - param.mail_server, - param.mail_port, - param.server_flags, - param.imap_certificate_checks - ); - info!(context, "Trying: {}", inf); - if context.inbox_thread.imap.connect(context, ¶m).await { - info!(context, "success: {}", inf); - return Some(true); - } - if context.shall_stop_ongoing().await { - return Some(false); - } - info!(context, "Could not connect: {}", inf); - None + unimplemented!(); + // let inf = format!( + // "imap: {}@{}:{} flags=0x{:x} certificate_checks={}", + // param.mail_user, + // param.mail_server, + // param.mail_port, + // param.server_flags, + // param.imap_certificate_checks + // ); + // info!(context, "Trying: {}", inf); + // if context.inbox_thread.imap.connect(context, ¶m).await { + // info!(context, "success: {}", inf); + // return Some(true); + // } + // if context.shall_stop_ongoing().await { + // return Some(false); + // } + // info!(context, "Could not connect: {}", inf); + // None } async fn try_smtp_connections( @@ -613,25 +615,26 @@ async fn try_smtp_connections( } async fn try_smtp_one_param(context: &Context, param: &LoginParam) -> Option { - let inf = format!( - "smtp: {}@{}:{} flags: 0x{:x}", - param.send_user, param.send_server, param.send_port, param.server_flags - ); - info!(context, "Trying: {}", inf); - match context.smtp.connect(context, ¶m).await { - Ok(()) => { - info!(context, "success: {}", inf); - Some(true) - } - Err(err) => { - if context.shall_stop_ongoing().await { - Some(false) - } else { - warn!(context, "could not connect: {}", err); - None - } - } - } + unimplemented!() + // let inf = format!( + // "smtp: {}@{}:{} flags: 0x{:x}", + // param.send_user, param.send_server, param.send_port, param.server_flags + // ); + // info!(context, "Trying: {}", inf); + // match context.smtp.connect(context, ¶m).await { + // Ok(()) => { + // info!(context, "success: {}", inf); + // Some(true) + // } + // Err(err) => { + // if context.shall_stop_ongoing().await { + // Some(false) + // } else { + // warn!(context, "could not connect: {}", err); + // None + // } + // } + // } } #[cfg(test)] diff --git a/src/context.rs b/src/context.rs index b0678fe8c..ef3a77230 100644 --- a/src/context.rs +++ b/src/context.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::ffi::OsString; -use std::sync::atomic::AtomicBool; +use std::ops::Deref; use async_std::path::{Path, PathBuf}; use async_std::sync::{Arc, Mutex, RwLock}; @@ -22,33 +22,40 @@ use crate::login_param::LoginParam; use crate::lot::Lot; use crate::message::{self, Message, MessengerMessage, MsgId}; use crate::param::Params; +use crate::scheduler::Scheduler; use crate::smtp::Smtp; use crate::sql::Sql; -#[derive(DebugStub)] +#[derive(Debug)] pub struct Context { - /// Database file path - dbfile: PathBuf, - /// Blob directory path - blobdir: PathBuf, - pub sql: Sql, - pub perform_inbox_jobs_needed: AtomicBool, - pub probe_imap_network: AtomicBool, - pub inbox_thread: JobThread, - pub sentbox_thread: JobThread, - pub mvbox_thread: JobThread, - pub smtp: Smtp, - pub oauth2_critical: Arc>, - pub os_name: Option, - pub cmdline_sel_chat_id: Arc>, - pub(crate) bob: Arc>, - pub last_smeared_timestamp: RwLock, - pub running_state: Arc>, - /// Mutex to avoid generating the key for the user more than once. - pub generating_key_mutex: Mutex<()>, - pub translated_stockstrings: RwLock>, + pub(crate) inner: Arc, +} +impl Deref for Context { + type Target = InnerContext; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +#[derive(Debug)] +pub struct InnerContext { + /// Database file path + pub(crate) dbfile: PathBuf, + /// Blob directory path + pub(crate) blobdir: PathBuf, + pub(crate) sql: Sql, + pub(crate) os_name: Option, + pub(crate) bob: RwLock, + pub(crate) last_smeared_timestamp: RwLock, + pub(crate) running_state: RwLock, + /// Mutex to avoid generating the key for the user more than once. + pub(crate) generating_key_mutex: Mutex<()>, + pub(crate) translated_stockstrings: RwLock>, pub(crate) logs: SegQueue, + + pub(crate) scheduler: RwLock, } #[derive(Debug, PartialEq, Eq)] @@ -97,27 +104,24 @@ impl Context { "Blobdir does not exist: {}", blobdir.display() ); - let ctx = Context { + + let inner = InnerContext { blobdir, dbfile, os_name: Some(os_name), - running_state: Arc::new(RwLock::new(Default::default())), + running_state: RwLock::new(Default::default()), sql: Sql::new(), - smtp: Smtp::new(), - oauth2_critical: Arc::new(Mutex::new(())), - bob: Arc::new(RwLock::new(Default::default())), + bob: RwLock::new(Default::default()), last_smeared_timestamp: RwLock::new(0), - cmdline_sel_chat_id: Arc::new(RwLock::new(ChatId::new(0))), - inbox_thread: JobThread::new("INBOX", "configured_inbox_folder", Imap::new()), - sentbox_thread: JobThread::new("SENTBOX", "configured_sentbox_folder", Imap::new()), - mvbox_thread: JobThread::new("MVBOX", "configured_mvbox_folder", Imap::new()), - probe_imap_network: Default::default(), - perform_inbox_jobs_needed: Default::default(), generating_key_mutex: Mutex::new(()), translated_stockstrings: RwLock::new(HashMap::new()), logs: SegQueue::new(), + scheduler: RwLock::new(Scheduler::Stopped), }; + let ctx = Context { + inner: Arc::new(inner), + }; ensure!( ctx.sql.open(&ctx, &ctx.dbfile, false).await, "Failed opening sqlite database" @@ -126,6 +130,16 @@ impl Context { Ok(ctx) } + pub async fn run(&self) { + self.inner.scheduler.write().await.run().await + } + + pub async fn stop(&self) { + if self.inner.scheduler.read().await.is_running() { + self.inner.scheduler.write().await.stop().await; + } + } + /// Returns database file path. pub fn get_dbfile(&self) -> &Path { self.dbfile.as_path() @@ -160,7 +174,7 @@ impl Context { false } else { - let s_a = self.running_state.clone(); + let s_a = &self.running_state; let mut s = s_a.write().await; s.ongoing_running = true; @@ -171,7 +185,7 @@ impl Context { } pub async fn free_ongoing(&self) { - let s_a = self.running_state.clone(); + let s_a = &self.running_state; let mut s = s_a.write().await; s.ongoing_running = false; @@ -179,7 +193,7 @@ impl Context { } pub async fn has_ongoing(&self) -> bool { - let s_a = self.running_state.clone(); + let s_a = &self.running_state; let s = s_a.read().await; s.ongoing_running || !s.shall_stop_ongoing @@ -187,7 +201,7 @@ impl Context { /// Signal an ongoing process to stop. pub async fn stop_ongoing(&self) { - let s_a = self.running_state.clone(); + let s_a = &self.running_state; let mut s = s_a.write().await; if s.ongoing_running && !s.shall_stop_ongoing { @@ -199,7 +213,7 @@ impl Context { } pub async fn shall_stop_ongoing(&self) -> bool { - self.running_state.clone().read().await.shall_stop_ongoing + self.running_state.read().await.shall_stop_ongoing } /******************************************************************************* @@ -456,15 +470,7 @@ impl Context { impl Drop for Context { fn drop(&mut self) { async_std::task::block_on(async move { - info!(self, "disconnecting inbox-thread"); - self.inbox_thread.imap.disconnect(self).await; - info!(self, "disconnecting sentbox-thread"); - self.sentbox_thread.imap.disconnect(self).await; - info!(self, "disconnecting mvbox-thread"); - self.mvbox_thread.imap.disconnect(self).await; - info!(self, "disconnecting SMTP"); - self.smtp.disconnect().await; - + self.stop().await; self.sql.close(self).await; }); } diff --git a/src/imap/idle.rs b/src/imap/idle.rs index e8e213104..1959bc8f8 100644 --- a/src/imap/idle.rs +++ b/src/imap/idle.rs @@ -60,12 +60,12 @@ impl Session { } impl Imap { - pub async fn can_idle(&self) -> bool { - self.config.read().await.can_idle + pub fn can_idle(&self) -> bool { + self.config.can_idle } - pub async fn idle(&self, context: &Context, watch_folder: Option) -> Result<()> { - if !self.can_idle().await { + pub async fn idle(&mut self, context: &Context, watch_folder: Option) -> Result<()> { + if !self.can_idle() { return Err(Error::IdleAbilityMissing); } @@ -75,7 +75,7 @@ impl Imap { self.select_folder(context, watch_folder.clone()).await?; - let session = self.session.lock().await.take(); + let session = self.session.take(); let timeout = Duration::from_secs(23 * 60); if let Some(session) = session { match session.idle() { @@ -87,12 +87,12 @@ impl Imap { } let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); - *self.interrupt.lock().await = Some(interrupt); + self.interrupt = Some(interrupt); - if self.skip_next_idle_wait.load(Ordering::SeqCst) { + if self.skip_next_idle_wait { // interrupt_idle has happened before we // provided self.interrupt - self.skip_next_idle_wait.store(false, Ordering::SeqCst); + self.skip_next_idle_wait = false; std::mem::drop(idle_wait); info!(context, "Idle wait was skipped"); } else { @@ -126,7 +126,7 @@ impl Imap { match res { Ok(session) => { - *self.session.lock().await = Some(Session::Secure(session)); + self.session = Some(Session::Secure(session)); } Err(err) => { // if we cannot terminate IDLE it probably @@ -143,12 +143,12 @@ impl Imap { } let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); - *self.interrupt.lock().await = Some(interrupt); + self.interrupt = Some(interrupt); - if self.skip_next_idle_wait.load(Ordering::SeqCst) { + if self.skip_next_idle_wait { // interrupt_idle has happened before we // provided self.interrupt - self.skip_next_idle_wait.store(false, Ordering::SeqCst); + self.skip_next_idle_wait = false; std::mem::drop(idle_wait); info!(context, "Idle wait was skipped"); } else { @@ -182,7 +182,7 @@ impl Imap { match res { Ok(session) => { - *self.session.lock().await = Some(Session::Insecure(session)); + self.session = Some(Session::Insecure(session)); } Err(err) => { // if we cannot terminate IDLE it probably @@ -199,7 +199,7 @@ impl Imap { Ok(()) } - pub(crate) async fn fake_idle(&self, context: &Context, watch_folder: Option) { + pub(crate) async fn fake_idle(&mut self, context: &Context, watch_folder: Option) { // Idle using polling. This is also needed if we're not yet configured - // in this case, we're waiting for a configure job (and an interrupt). @@ -213,11 +213,11 @@ impl Imap { // TODO: grow sleep durations / make them more flexible let interval = async_std::stream::interval(Duration::from_secs(60)); let mut interrupt_interval = interrupt.stop_token().stop_stream(interval); - *self.interrupt.lock().await = Some(interrupt); - if self.skip_next_idle_wait.load(Ordering::SeqCst) { + self.interrupt = Some(interrupt); + if self.skip_next_idle_wait { // interrupt_idle has happened before we // provided self.interrupt - self.skip_next_idle_wait.store(false, Ordering::SeqCst); + self.skip_next_idle_wait = false; info!(context, "fake-idle wait was skipped"); } else { // loop until we are interrupted or if we fetched something @@ -229,7 +229,7 @@ impl Imap { warn!(context, "fake_idle: could not connect: {}", err); continue; } - if self.config.read().await.can_idle { + if self.config.can_idle { // we only fake-idled because network was gone during IDLE, probably break; } @@ -255,7 +255,7 @@ impl Imap { } } } - self.interrupt.lock().await.take(); + self.interrupt.take(); info!( context, @@ -268,14 +268,14 @@ impl Imap { ); } - pub async fn interrupt_idle(&self, context: &Context) { - let mut interrupt: Option = self.interrupt.lock().await.take(); + pub async fn interrupt_idle(&mut self, context: &Context) { + let mut interrupt: Option = self.interrupt.take(); if interrupt.is_none() { // idle wait is not running, signal it needs to skip - self.skip_next_idle_wait.store(true, Ordering::SeqCst); + self.skip_next_idle_wait = false; // meanwhile idle-wait may have produced the StopSource - interrupt = self.interrupt.lock().await.take(); + interrupt = self.interrupt.take(); } // let's manually drop the StopSource if interrupt.is_some() { diff --git a/src/imap/mod.rs b/src/imap/mod.rs index ee8154fce..a5f3fcda4 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -12,7 +12,7 @@ use async_imap::{ types::{Capability, Fetch, Flag, Mailbox, Name, NameAttribute}, }; use async_std::prelude::*; -use async_std::sync::{Mutex, RwLock}; +use async_std::sync::{Mutex, Receiver, RwLock}; use crate::config::*; use crate::constants::*; @@ -137,14 +137,15 @@ const JUST_UID: &str = "(UID)"; const BODY_FLAGS: &str = "(FLAGS BODY.PEEK[])"; const SELECT_ALL: &str = "1:*"; -#[derive(Debug, Default)] +#[derive(Debug)] pub struct Imap { - config: RwLock, - session: Mutex>, - connected: Mutex, - interrupt: Mutex>, - skip_next_idle_wait: AtomicBool, - should_reconnect: AtomicBool, + idle_interrupt: Receiver<()>, + config: ImapConfig, + session: Option, + connected: bool, + interrupt: Option, + skip_next_idle_wait: bool, + should_reconnect: bool, } #[derive(Debug)] @@ -212,39 +213,47 @@ impl Default for ImapConfig { } impl Imap { - pub fn new() -> Self { - Default::default() + pub fn new(idle_interrupt: Receiver<()>) -> Self { + Imap { + idle_interrupt, + config: Default::default(), + session: Default::default(), + connected: Default::default(), + interrupt: Default::default(), + skip_next_idle_wait: Default::default(), + should_reconnect: Default::default(), + } } - pub async fn is_connected(&self) -> bool { - *self.connected.lock().await + pub fn is_connected(&self) -> bool { + self.connected } pub fn should_reconnect(&self) -> bool { - self.should_reconnect.load(Ordering::Relaxed) + self.should_reconnect } - pub fn trigger_reconnect(&self) { - self.should_reconnect.store(true, Ordering::Relaxed) + pub fn trigger_reconnect(&mut self) { + self.should_reconnect = true; } - async fn setup_handle_if_needed(&self, context: &Context) -> Result<()> { - if self.config.read().await.imap_server.is_empty() { + async fn setup_handle_if_needed(&mut self, context: &Context) -> Result<()> { + if self.config.imap_server.is_empty() { return Err(Error::InTeardown); } if self.should_reconnect() { self.unsetup_handle(context).await; - self.should_reconnect.store(false, Ordering::Relaxed); - } else if self.is_connected().await { + self.should_reconnect = false; + } else if self.is_connected() { return Ok(()); } - let server_flags = self.config.read().await.server_flags as i32; + let server_flags = self.config.server_flags as i32; let connection_res: ImapResult = if (server_flags & (DC_LP_IMAP_SOCKET_STARTTLS | DC_LP_IMAP_SOCKET_PLAIN)) != 0 { - let config = self.config.read().await; + let config = &mut self.config; let imap_server: &str = config.imap_server.as_ref(); let imap_port = config.imap_port; @@ -259,7 +268,7 @@ impl Imap { Err(err) => Err(err), } } else { - let config = self.config.read().await; + let config = &self.config; let imap_server: &str = config.imap_server.as_ref(); let imap_port = config.imap_port; @@ -273,7 +282,7 @@ impl Imap { let login_res = match connection_res { Ok(client) => { - let config = self.config.read().await; + let config = &self.config; let imap_user: &str = config.imap_user.as_ref(); let imap_pw: &str = config.imap_pw.as_ref(); @@ -297,7 +306,7 @@ impl Imap { } Err(err) => { let message = { - let config = self.config.read().await; + let config = &self.config; let imap_server: &str = config.imap_server.as_ref(); let imap_port = config.imap_port; context @@ -314,15 +323,15 @@ impl Imap { } }; - self.should_reconnect.store(false, Ordering::Relaxed); + self.should_reconnect = false; match login_res { Ok(session) => { - *self.session.lock().await = Some(session); + self.session = Some(session); Ok(()) } Err((err, _)) => { - let imap_user = self.config.read().await.imap_user.to_owned(); + let imap_user = self.config.imap_user.to_owned(); let message = context .stock_string_repl_str(StockMessage::CannotLogin, &imap_user) .await; @@ -337,26 +346,23 @@ impl Imap { } } - async fn unsetup_handle(&self, context: &Context) { - info!( - context, - "IMAP unsetup_handle step 2 (acquiring session.lock)" - ); - if let Some(mut session) = self.session.lock().await.take() { + async fn unsetup_handle(&mut self, context: &Context) { + info!(context, "IMAP unsetup_handle step 2"); + if let Some(mut session) = self.session.take() { if let Err(err) = session.close().await { warn!(context, "failed to close connection: {:?}", err); } } - *self.connected.lock().await = false; + self.connected = false; info!(context, "IMAP unsetup_handle step 3 (clearing config)."); - self.config.write().await.selected_folder = None; - self.config.write().await.selected_mailbox = None; + self.config.selected_folder = None; + self.config.selected_mailbox = None; info!(context, "IMAP unsetup_handle step 4 (disconnected)"); } - async fn free_connect_params(&self) { - let mut cfg = self.config.write().await; + async fn free_connect_params(&mut self) { + let mut cfg = &mut self.config; cfg.addr = "".into(); cfg.imap_server = "".into(); @@ -369,8 +375,8 @@ impl Imap { } /// Connects to imap account using already-configured parameters. - pub async fn connect_configured(&self, context: &Context) -> Result<()> { - if self.is_connected().await && !self.should_reconnect() { + pub async fn connect_configured(&mut self, context: &Context) -> Result<()> { + if self.is_connected() && !self.should_reconnect() { return Ok(()); } if !context.sql.get_raw_config_bool(context, "configured").await { @@ -389,7 +395,7 @@ impl Imap { /// tries connecting to imap account using the specific login /// parameters - pub async fn connect(&self, context: &Context, lp: &LoginParam) -> bool { + pub async fn connect(&mut self, context: &Context, lp: &LoginParam) -> bool { if lp.mail_server.is_empty() || lp.mail_user.is_empty() || lp.mail_pw.is_empty() { return false; } @@ -402,7 +408,7 @@ impl Imap { let imap_pw = &lp.mail_pw; let server_flags = lp.server_flags as usize; - let mut config = self.config.write().await; + let mut config = &mut self.config; config.addr = addr.to_string(); config.imap_server = imap_server.to_string(); config.imap_port = imap_port; @@ -418,7 +424,7 @@ impl Imap { return false; } - let teardown = match &mut *self.session.lock().await { + let teardown = match &mut self.session { Some(ref mut session) => match session.capabilities().await { Ok(caps) => { if !context.sql.is_open().await { @@ -435,9 +441,9 @@ impl Imap { } }); - self.config.write().await.can_idle = can_idle; - self.config.write().await.can_move = can_move; - *self.connected.lock().await = true; + self.config.can_idle = can_idle; + self.config.can_move = can_move; + self.connected = true; emit_event!( context, Event::ImapConnected(format!( @@ -465,12 +471,12 @@ impl Imap { } } - pub async fn disconnect(&self, context: &Context) { + pub async fn disconnect(&mut self, context: &Context) { self.unsetup_handle(context).await; self.free_connect_params().await; } - pub async fn fetch(&self, context: &Context, watch_folder: &str) -> Result<()> { + pub async fn fetch(&mut self, context: &Context, watch_folder: &str) -> Result<()> { if !context.sql.is_open().await { // probably shutdown return Err(Error::InTeardown); @@ -511,7 +517,7 @@ impl Imap { /// return Result with (uid_validity, last_seen_uid) tuple. pub(crate) async fn select_with_uidvalidity( - &self, + &mut self, context: &Context, folder: &str, ) -> Result<(u32, u32)> { @@ -520,7 +526,7 @@ impl Imap { // compare last seen UIDVALIDITY against the current one let (uid_validity, last_seen_uid) = self.get_config_last_seen_uid(context, &folder).await; - let config = self.config.read().await; + let config = &mut self.config; let mailbox = config .selected_mailbox .as_ref() @@ -561,7 +567,7 @@ impl Imap { context, "IMAP folder has no uid_next, fall back to fetching" ); - if let Some(ref mut session) = &mut *self.session.lock().await { + if let Some(ref mut session) = &mut self.session { // note that we use fetch by sequence number // and thus we only need to get exactly the // last-index message. @@ -598,7 +604,7 @@ impl Imap { } async fn fetch_new_messages>( - &self, + &mut self, context: &Context, folder: S, ) -> Result { @@ -613,9 +619,10 @@ impl Imap { // prefetch info from all unfetched mails let mut new_last_seen_uid = last_seen_uid; - let mut read_errors = 0; + let mut read_errors: usize = 0; - if let Some(ref mut session) = &mut *self.session.lock().await { + let mut uids = Vec::new(); + if let Some(ref mut session) = &mut self.session { // fetch messages with larger UID than the last one seen // `(UID FETCH lastseenuid+1:*)`, see RFC 4549 let set = format!("{}:*", last_seen_uid + 1); @@ -646,9 +653,9 @@ impl Imap { continue; } read_cnt += 1; - let headers = get_fetch_headers(&fetch)?; let message_id = prefetch_get_message_id(&headers).unwrap_or_default(); + if precheck_imf(context, &message_id, folder.as_ref(), cur_uid).await { // we know the message-id already or don't want the message otherwise. info!( @@ -675,26 +682,31 @@ impl Imap { ); } else { // check passed, go fetch the rest - if let Err(err) = self.fetch_single_msg(context, &folder, cur_uid).await { - info!( - context, - "Read error for message {} from \"{}\", trying over later: {}.", - message_id, - folder.as_ref(), - err - ); - read_errors += 1; - } + uids.push((cur_uid, message_id)); } } - if read_errors == 0 { - new_last_seen_uid = cur_uid; - } } } else { return Err(Error::NoConnection); }; + for (cur_uid, message_id) in uids.into_iter() { + if let Err(err) = self.fetch_single_msg(context, &folder, cur_uid).await { + info!( + context, + "Read error for message {} from \"{}\", trying over later: {}.", + message_id, + folder.as_ref(), + err + ); + read_errors += 1; + } + + if read_errors == 0 { + new_last_seen_uid = cur_uid; + } + } + if new_last_seen_uid > last_seen_uid { self.set_config_last_seen_uid(context, &folder, uid_validity, new_last_seen_uid) .await; @@ -743,25 +755,24 @@ impl Imap { /// if no database entries are created. If the function returns an /// error, the caller should try again later. async fn fetch_single_msg>( - &self, + &mut self, context: &Context, folder: S, server_uid: u32, ) -> Result<()> { - if !self.is_connected().await { + if !self.is_connected() { return Err(Error::Other("Not connected".to_string())); } let set = format!("{}", server_uid); - let mut session_lock = self.session.lock().await; - let mut msgs = if let Some(ref mut session) = &mut *session_lock { + let mut msgs = if let Some(ref mut session) = &mut self.session { match session.uid_fetch(set, BODY_FLAGS).await { Ok(msgs) => msgs, Err(err) => { // TODO maybe differentiate between IO and input/parsing problems // so we don't reconnect if we have a (rare) input/output parsing problem? - self.trigger_reconnect(); + self.should_reconnect = true; warn!( context, "Error on fetching message #{} from folder \"{}\"; error={}.", @@ -810,11 +821,11 @@ impl Imap { } pub async fn can_move(&self) -> bool { - self.config.read().await.can_move + self.config.can_move } pub async fn mv( - &self, + &mut self, context: &Context, folder: &str, uid: u32, @@ -843,7 +854,7 @@ impl Imap { let display_folder_id = format!("{}/{}", folder, uid); if self.can_move().await { - if let Some(ref mut session) = &mut *self.session.lock().await { + if let Some(ref mut session) = &mut self.session { match session.uid_mv(&set, &dest_folder).await { Ok(_) => { emit_event!( @@ -879,7 +890,7 @@ impl Imap { ); } - if let Some(ref mut session) = &mut *self.session.lock().await { + if let Some(ref mut session) = &mut self.session { if let Err(err) = session.uid_copy(&set, &dest_folder).await { warn!(context, "Could not copy message: {}", err); return ImapActionResult::Failed; @@ -899,7 +910,7 @@ impl Imap { ); ImapActionResult::Failed } else { - self.config.write().await.selected_folder_needs_expunge = true; + self.config.selected_folder_needs_expunge = true; emit_event!( context, Event::ImapMessageMoved(format!( @@ -911,7 +922,7 @@ impl Imap { } } - async fn add_flag_finalized(&self, context: &Context, server_uid: u32, flag: &str) -> bool { + async fn add_flag_finalized(&mut self, context: &Context, server_uid: u32, flag: &str) -> bool { // return true if we successfully set the flag or we otherwise // think add_flag should not be retried: Disconnection during setting // the flag, or other imap-errors, returns true as well. @@ -925,7 +936,7 @@ impl Imap { } async fn add_flag_finalized_with_set( - &self, + &mut self, context: &Context, uid_set: &str, flag: &str, @@ -933,7 +944,7 @@ impl Imap { if self.should_reconnect() { return false; } - if let Some(ref mut session) = &mut *self.session.lock().await { + if let Some(ref mut session) = &mut self.session { let query = format!("+FLAGS ({})", flag); match session.uid_store(uid_set, &query).await { Ok(_) => {} @@ -951,7 +962,7 @@ impl Imap { } pub async fn prepare_imap_operation_on_msg( - &self, + &mut self, context: &Context, folder: &str, uid: u32, @@ -959,7 +970,7 @@ impl Imap { if uid == 0 { return Some(ImapActionResult::Failed); } - if !self.is_connected().await { + if !self.is_connected() { // currently jobs are only performed on the INBOX thread // TODO: make INBOX/SENT/MVBOX perform the jobs on their // respective folders to avoid select_folder network traffic @@ -990,7 +1001,12 @@ impl Imap { } } - pub async fn set_seen(&self, context: &Context, folder: &str, uid: u32) -> ImapActionResult { + pub async fn set_seen( + &mut self, + context: &Context, + folder: &str, + uid: u32, + ) -> ImapActionResult { if let Some(imapresult) = self .prepare_imap_operation_on_msg(context, folder, uid) .await @@ -1012,7 +1028,7 @@ impl Imap { } pub async fn delete_msg( - &self, + &mut self, context: &Context, message_id: &str, folder: &str, @@ -1031,7 +1047,7 @@ impl Imap { // double-check that we are deleting the correct message-id // this comes at the expense of another imap query - if let Some(ref mut session) = &mut *self.session.lock().await { + if let Some(ref mut session) = &mut self.session { match session.uid_fetch(set, DELETE_CHECK_FLAGS).await { Ok(mut msgs) => { let fetch = if let Some(Ok(fetch)) = msgs.next().await { @@ -1086,13 +1102,13 @@ impl Imap { display_imap_id, message_id )) ); - self.config.write().await.selected_folder_needs_expunge = true; + self.config.selected_folder_needs_expunge = true; ImapActionResult::Success } } pub async fn ensure_configured_folders( - &self, + &mut self, context: &Context, create_mvbox: bool, ) -> Result<()> { @@ -1106,12 +1122,12 @@ impl Imap { return Ok(()); } - if !self.is_connected().await { + if !self.is_connected() { return Err(Error::NoConnection); } info!(context, "Configuring IMAP-folders."); - if let Some(ref mut session) = &mut *self.session.lock().await { + if let Some(ref mut session) = &mut self.session { let mut folders = match session.list(Some(""), Some("*")).await { Ok(f) => f, Err(err) => { @@ -1121,7 +1137,7 @@ impl Imap { let mut sentbox_folder = None; let mut mvbox_folder = None; - let delimiter = self.config.read().await.imap_delimiter; + let delimiter = self.config.imap_delimiter; let fallback_folder = format!("INBOX{}DeltaChat", delimiter); while let Some(folder) = folders.next().await { @@ -1231,7 +1247,7 @@ impl Imap { // } // } - pub async fn empty_folder(&self, context: &Context, folder: &str) { + pub async fn empty_folder(&mut self, context: &Context, folder: &str) { info!(context, "emptying folder {}", folder); // we want to report all error to the user @@ -1261,7 +1277,7 @@ impl Imap { } // we now trigger expunge to actually delete messages - self.config.write().await.selected_folder_needs_expunge = true; + self.config.selected_folder_needs_expunge = true; match self.select_folder::(context, None).await { Ok(()) => { emit_event!(context, Event::ImapFolderEmptied(folder.to_string())); diff --git a/src/imap/select_folder.rs b/src/imap/select_folder.rs index 36c2dbf6b..f14802113 100644 --- a/src/imap/select_folder.rs +++ b/src/imap/select_folder.rs @@ -26,14 +26,13 @@ impl Imap { /// select a folder, possibly update uid_validity and, if needed, /// expunge the folder to remove delete-marked messages. pub(super) async fn select_folder>( - &self, + &mut self, context: &Context, folder: Option, ) -> Result<()> { - if self.session.lock().await.is_none() { - let mut cfg = self.config.write().await; - cfg.selected_folder = None; - cfg.selected_folder_needs_expunge = false; + if self.session.is_none() { + self.config.selected_folder = None; + self.config.selected_folder_needs_expunge = false; self.trigger_reconnect(); return Err(Error::NoSession); } @@ -41,7 +40,7 @@ impl Imap { // if there is a new folder and the new folder is equal to the selected one, there's nothing to do. // if there is _no_ new folder, we continue as we might want to expunge below. if let Some(ref folder) = folder { - if let Some(ref selected_folder) = self.config.read().await.selected_folder { + if let Some(ref selected_folder) = self.config.selected_folder { if folder.as_ref() == selected_folder { return Ok(()); } @@ -49,14 +48,14 @@ impl Imap { } // deselect existing folder, if needed (it's also done implicitly by SELECT, however, without EXPUNGE then) - let needs_expunge = { self.config.read().await.selected_folder_needs_expunge }; + let needs_expunge = { self.config.selected_folder_needs_expunge }; if needs_expunge { - if let Some(ref folder) = self.config.read().await.selected_folder { + if let Some(ref folder) = self.config.selected_folder { info!(context, "Expunge messages in \"{}\".", folder); // A CLOSE-SELECT is considerably faster than an EXPUNGE-SELECT, see // https://tools.ietf.org/html/rfc3501#section-6.4.2 - if let Some(ref mut session) = &mut *self.session.lock().await { + if let Some(ref mut session) = &mut self.session { match session.close().await { Ok(_) => { info!(context, "close/expunge succeeded"); @@ -70,12 +69,12 @@ impl Imap { return Err(Error::NoSession); } } - self.config.write().await.selected_folder_needs_expunge = false; + self.config.selected_folder_needs_expunge = false; } // select new folder if let Some(ref folder) = folder { - if let Some(ref mut session) = &mut *self.session.lock().await { + if let Some(ref mut session) = &mut self.session { let res = session.select(folder).await; // https://tools.ietf.org/html/rfc3501#section-6.3.1 @@ -84,21 +83,20 @@ impl Imap { match res { Ok(mailbox) => { - let mut config = self.config.write().await; - config.selected_folder = Some(folder.as_ref().to_string()); - config.selected_mailbox = Some(mailbox); + self.config.selected_folder = Some(folder.as_ref().to_string()); + self.config.selected_mailbox = Some(mailbox); Ok(()) } Err(async_imap::error::Error::ConnectionLost) => { self.trigger_reconnect(); - self.config.write().await.selected_folder = None; + self.config.selected_folder = None; Err(Error::ConnectionLost) } Err(async_imap::error::Error::Validate(_)) => { Err(Error::BadFolderName(folder.as_ref().to_string())) } Err(err) => { - self.config.write().await.selected_folder = None; + self.config.selected_folder = None; self.trigger_reconnect(); Err(Error::Other(err.to_string())) } diff --git a/src/job.rs b/src/job.rs index edc9c2342..92f3b08d5 100644 --- a/src/job.rs +++ b/src/job.rs @@ -185,124 +185,126 @@ impl Job { F: FnOnce() -> Fut, Fut: Future>, { - // hold the smtp lock during sending of a job and - // its ok/error response processing. Note that if a message - // was sent we need to mark it in the database ASAP as we - // otherwise might send it twice. - if std::env::var(crate::DCC_MIME_DEBUG).is_ok() { - info!(context, "smtp-sending out mime message:"); - println!("{}", String::from_utf8_lossy(&message)); - } - match context - .smtp - .send(context, recipients, message, job_id) - .await - { - Err(crate::smtp::send::Error::SendError(err)) => { - // Remote error, retry later. - warn!(context, "SMTP failed to send: {}", err); - self.pending_error = Some(err.to_string()); + unimplemented!(); + // // hold the smtp lock during sending of a job and + // // its ok/error response processing. Note that if a message + // // was sent we need to mark it in the database ASAP as we + // // otherwise might send it twice. + // if std::env::var(crate::DCC_MIME_DEBUG).is_ok() { + // info!(context, "smtp-sending out mime message:"); + // println!("{}", String::from_utf8_lossy(&message)); + // } + // match context + // .smtp + // .send(context, recipients, message, job_id) + // .await + // { + // Err(crate::smtp::send::Error::SendError(err)) => { + // // Remote error, retry later. + // warn!(context, "SMTP failed to send: {}", err); + // self.pending_error = Some(err.to_string()); - let res = match err { - async_smtp::smtp::error::Error::Permanent(_) => { - Status::Finished(Err(format_err!("Permanent SMTP error: {}", err))) - } - async_smtp::smtp::error::Error::Transient(_) => { - // We got a transient 4xx response from SMTP server. - // Give some time until the server-side error maybe goes away. - Status::RetryLater - } - _ => { - if context.smtp.has_maybe_stale_connection().await { - info!(context, "stale connection? immediately reconnecting"); - Status::RetryNow - } else { - Status::RetryLater - } - } - }; + // let res = match err { + // async_smtp::smtp::error::Error::Permanent(_) => { + // Status::Finished(Err(format_err!("Permanent SMTP error: {}", err))) + // } + // async_smtp::smtp::error::Error::Transient(_) => { + // // We got a transient 4xx response from SMTP server. + // // Give some time until the server-side error maybe goes away. + // Status::RetryLater + // } + // _ => { + // if context.smtp.has_maybe_stale_connection().await { + // info!(context, "stale connection? immediately reconnecting"); + // Status::RetryNow + // } else { + // Status::RetryLater + // } + // } + // }; - // this clears last_success info - context.smtp.disconnect().await; + // // this clears last_success info + // context.smtp.disconnect().await; - res - } - Err(crate::smtp::send::Error::EnvelopeError(err)) => { - // Local error, job is invalid, do not retry. - context.smtp.disconnect().await; - warn!(context, "SMTP job is invalid: {}", err); - Status::Finished(Err(Error::SmtpError(err))) - } - Err(crate::smtp::send::Error::NoTransport) => { - // Should never happen. - // It does not even make sense to disconnect here. - error!(context, "SMTP job failed because SMTP has no transport"); - Status::Finished(Err(format_err!("SMTP has not transport"))) - } - Ok(()) => { - job_try!(success_cb().await); - Status::Finished(Ok(())) - } - } + // res + // } + // Err(crate::smtp::send::Error::EnvelopeError(err)) => { + // // Local error, job is invalid, do not retry. + // context.smtp.disconnect().await; + // warn!(context, "SMTP job is invalid: {}", err); + // Status::Finished(Err(Error::SmtpError(err))) + // } + // Err(crate::smtp::send::Error::NoTransport) => { + // // Should never happen. + // // It does not even make sense to disconnect here. + // error!(context, "SMTP job failed because SMTP has no transport"); + // Status::Finished(Err(format_err!("SMTP has not transport"))) + // } + // Ok(()) => { + // job_try!(success_cb().await); + // Status::Finished(Ok(())) + // } + // } } async fn send_msg_to_smtp(&mut self, context: &Context) -> Status { - // connect to SMTP server, if not yet done - if !context.smtp.is_connected().await { - let loginparam = LoginParam::from_database(context, "configured_").await; - if let Err(err) = context.smtp.connect(context, &loginparam).await { - warn!(context, "SMTP connection failure: {:?}", err); - return Status::RetryLater; - } - } + unimplemented!(); + // connect to// SMTP server, if not yet done + // if !context.smtp.is_connected().await { + // let loginparam = LoginParam::from_database(context, "configured_").await; + // if let Err(err) = context.smtp.connect(context, &loginparam).await { + // warn!(context, "SMTP connection failure: {:?}", err); + // return Status::RetryLater; + // } + // } - let filename = job_try!(job_try!(self - .param - .get_path(Param::File, context) - .map_err(|_| format_err!("Can't get filename"))) - .ok_or_else(|| format_err!("Can't get filename"))); - let body = job_try!(dc_read_file(context, &filename).await); - let recipients = job_try!(self.param.get(Param::Recipients).ok_or_else(|| { - warn!(context, "Missing recipients for job {}", self.job_id); - format_err!("Missing recipients") - })); + // let filename = job_try!(job_try!(self + // .param + // .get_path(Param::File, context) + // .map_err(|_| format_err!("Can't get filename"))) + // .ok_or_else(|| format_err!("Can't get filename"))); + // let body = job_try!(dc_read_file(context, &filename).await); + // let recipients = job_try!(self.param.get(Param::Recipients).ok_or_else(|| { + // warn!(context, "Missing recipients for job {}", self.job_id); + // format_err!("Missing recipients") + // })); - let recipients_list = recipients - .split('\x1e') - .filter_map( - |addr| match async_smtp::EmailAddress::new(addr.to_string()) { - Ok(addr) => Some(addr), - Err(err) => { - warn!(context, "invalid recipient: {} {:?}", addr, err); - None - } - }, - ) - .collect::>(); + // let recipients_list = recipients + // .split('\x1e') + // .filter_map( + // |addr| match async_smtp::EmailAddress::new(addr.to_string()) { + // Ok(addr) => Some(addr), + // Err(err) => { + // warn!(context, "invalid recipient: {} {:?}", addr, err); + // None + // } + // }, + // ) + // .collect::>(); - /* if there is a msg-id and it does not exist in the db, cancel sending. - this happends if dc_delete_msgs() was called - before the generated mime was sent out */ - if 0 != self.foreign_id && !message::exists(context, MsgId::new(self.foreign_id)).await { - return Status::Finished(Err(format_err!( - "Not sending Message {} as it was deleted", - self.foreign_id - ))); - }; + // /* if there is a msg-id and it does not exist in the db, cancel sending. + // this happends if dc_delete_msgs() was called + // before the generated mime was sent out */ + // if 0 != self.foreign_id && !message::exists(context, MsgId::new(self.foreign_id)).await { + // return Status::Finished(Err(format_err!( + // "Not sending Message {} as it was deleted", + // self.foreign_id + // ))); + // }; - let foreign_id = self.foreign_id; - self.smtp_send(context, recipients_list, body, self.job_id, || { - async move { - // smtp success, update db ASAP, then delete smtp file - if 0 != foreign_id { - set_delivered(context, MsgId::new(foreign_id)).await; - } - // now also delete the generated file - dc_delete_file(context, filename).await; - Ok(()) - } - }) - .await + // let foreign_id = self.foreign_id; + // self.smtp_send(context, recipients_list, body, self.job_id, || { + // async move { + // // smtp success, update db ASAP, then delete smtp file + // if 0 != foreign_id { + // set_delivered(context, MsgId::new(foreign_id)).await; + // } + // // now also delete the generated file + // dc_delete_file(context, filename).await; + // Ok(()) + // } + // }) + // .await } /// Get `SendMdn` jobs with foreign_id equal to `contact_id` excluding the `job_id` job. @@ -349,228 +351,234 @@ impl Job { } async fn send_mdn(&mut self, context: &Context) -> Status { - if !context.get_config_bool(Config::MdnsEnabled).await { - // User has disabled MDNs after job scheduling but before - // execution. - return Status::Finished(Err(format_err!("MDNs are disabled"))); - } + unimplemented!(); + // if !context.get_config_bool(Config::MdnsEnabled).await { + // // User has disabled MDNs after job scheduling but before + // // execution. + // return Status::Finished(Err(format_err!("MDNs are disabled"))); + // } - let contact_id = self.foreign_id; - let contact = job_try!(Contact::load_from_db(context, contact_id).await); - if contact.is_blocked() { - return Status::Finished(Err(format_err!("Contact is blocked"))); - } + // let contact_id = self.foreign_id; + // let contact = job_try!(Contact::load_from_db(context, contact_id).await); + // if contact.is_blocked() { + // return Status::Finished(Err(format_err!("Contact is blocked"))); + // } - let msg_id = if let Some(msg_id) = self.param.get_msg_id() { - msg_id - } else { - return Status::Finished(Err(format_err!( - "SendMdn job has invalid parameters: {}", - self.param - ))); - }; + // let msg_id = if let Some(msg_id) = self.param.get_msg_id() { + // msg_id + // } else { + // return Status::Finished(Err(format_err!( + // "SendMdn job has invalid parameters: {}", + // self.param + // ))); + // }; - // Try to aggregate other SendMdn jobs and send a combined MDN. - let (additional_job_ids, additional_rfc724_mids) = self - .get_additional_mdn_jobs(context, contact_id) - .await - .unwrap_or_default(); + // // Try to aggregate other SendMdn jobs and send a combined MDN. + // let (additional_job_ids, additional_rfc724_mids) = self + // .get_additional_mdn_jobs(context, contact_id) + // .await + // .unwrap_or_default(); - if !additional_rfc724_mids.is_empty() { - info!( - context, - "SendMdn job: aggregating {} additional MDNs", - additional_rfc724_mids.len() - ) - } + // if !additional_rfc724_mids.is_empty() { + // info!( + // context, + // "SendMdn job: aggregating {} additional MDNs", + // additional_rfc724_mids.len() + // ) + // } - let msg = job_try!(Message::load_from_db(context, msg_id).await); - let mimefactory = - job_try!(MimeFactory::from_mdn(context, &msg, additional_rfc724_mids).await); - let rendered_msg = job_try!(mimefactory.render().await); - let body = rendered_msg.message; + // let msg = job_try!(Message::load_from_db(context, msg_id).await); + // let mimefactory = + // job_try!(MimeFactory::from_mdn(context, &msg, additional_rfc724_mids).await); + // let rendered_msg = job_try!(mimefactory.render().await); + // let body = rendered_msg.message; - let addr = contact.get_addr(); - let recipient = job_try!(async_smtp::EmailAddress::new(addr.to_string()) - .map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err))); - let recipients = vec![recipient]; + // let addr = contact.get_addr(); + // let recipient = job_try!(async_smtp::EmailAddress::new(addr.to_string()) + // .map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err))); + // let recipients = vec![recipient]; - // connect to SMTP server, if not yet done - if !context.smtp.is_connected().await { - let loginparam = LoginParam::from_database(context, "configured_").await; - if let Err(err) = context.smtp.connect(context, &loginparam).await { - warn!(context, "SMTP connection failure: {:?}", err); - return Status::RetryLater; - } - } + // // connect to SMTP server, if not yet done + // if !context.smtp.is_connected().await { + // let loginparam = LoginParam::from_database(context, "configured_").await; + // if let Err(err) = context.smtp.connect(context, &loginparam).await { + // warn!(context, "SMTP connection failure: {:?}", err); + // return Status::RetryLater; + // } + // } - self.smtp_send(context, recipients, body, self.job_id, || { - async move { - // Remove additional SendMdn jobs we have aggregated into this one. - job::kill_ids(context, &additional_job_ids).await?; - Ok(()) - } - }) - .await + // self.smtp_send(context, recipients, body, self.job_id, || { + // async move { + // // Remove additional SendMdn jobs we have aggregated into this one. + // job::kill_ids(context, &additional_job_ids).await?; + // Ok(()) + // } + // }) + // .await } async fn move_msg(&mut self, context: &Context) -> Status { - let imap_inbox = &context.inbox_thread.imap; + unimplemented!(); + // let imap_inbox = &context.inbox_thread.imap; - let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); + // let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); - if let Err(err) = imap_inbox.ensure_configured_folders(context, true).await { - warn!(context, "could not configure folders: {:?}", err); - return Status::RetryLater; - } - let dest_folder = context - .sql - .get_raw_config(context, "configured_mvbox_folder") - .await; + // if let Err(err) = imap_inbox.ensure_configured_folders(context, true).await { + // warn!(context, "could not configure folders: {:?}", err); + // return Status::RetryLater; + // } + // let dest_folder = context + // .sql + // .get_raw_config(context, "configured_mvbox_folder") + // .await; - if let Some(dest_folder) = dest_folder { - let server_folder = msg.server_folder.as_ref().unwrap(); - let mut dest_uid = 0; + // if let Some(dest_folder) = dest_folder { + // let server_folder = msg.server_folder.as_ref().unwrap(); + // let mut dest_uid = 0; - match imap_inbox - .mv( - context, - server_folder, - msg.server_uid, - &dest_folder, - &mut dest_uid, - ) - .await - { - ImapActionResult::RetryLater => Status::RetryLater, - ImapActionResult::Success => { - message::update_server_uid(context, &msg.rfc724_mid, &dest_folder, dest_uid) - .await; - Status::Finished(Ok(())) - } - ImapActionResult::Failed => { - Status::Finished(Err(format_err!("IMAP action failed"))) - } - ImapActionResult::AlreadyDone => Status::Finished(Ok(())), - } - } else { - Status::Finished(Err(format_err!("No mvbox folder configured"))) - } + // match imap_inbox + // .mv( + // context, + // server_folder, + // msg.server_uid, + // &dest_folder, + // &mut dest_uid, + // ) + // .await + // { + // ImapActionResult::RetryLater => Status::RetryLater, + // ImapActionResult::Success => { + // message::update_server_uid(context, &msg.rfc724_mid, &dest_folder, dest_uid) + // .await; + // Status::Finished(Ok(())) + // } + // ImapActionResult::Failed => { + // Status::Finished(Err(format_err!("IMAP action failed"))) + // } + // ImapActionResult::AlreadyDone => Status::Finished(Ok(())), + // } + // } else { + // Status::Finished(Err(format_err!("No mvbox folder configured"))) + // } } async fn delete_msg_on_imap(&mut self, context: &Context) -> Status { - let imap_inbox = &context.inbox_thread.imap; + unimplemented!(); + // let imap_inbox = &context.inbox_thread.imap; - let mut msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); + // let mut msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); - if !msg.rfc724_mid.is_empty() { - if message::rfc724_mid_cnt(context, &msg.rfc724_mid).await > 1 { - info!( - context, - "The message is deleted from the server when all parts are deleted.", - ); - } else { - /* if this is the last existing part of the message, - we delete the message from the server */ - let mid = msg.rfc724_mid; - let server_folder = msg.server_folder.as_ref().unwrap(); - let res = imap_inbox - .delete_msg(context, &mid, server_folder, &mut msg.server_uid) - .await; - if res == ImapActionResult::RetryLater { - // XXX RetryLater is converted to RetryNow here - return Status::RetryNow; - } - } - Message::delete_from_db(context, msg.id).await; - Status::Finished(Ok(())) - } else { - /* eg. device messages have no Message-ID */ - Status::Finished(Ok(())) - } + // if !msg.rfc724_mid.is_empty() { + // if message::rfc724_mid_cnt(context, &msg.rfc724_mid).await > 1 { + // info!( + // context, + // "The message is deleted from the server when all parts are deleted.", + // ); + // } else { + // /* if this is the last existing part of the message, + // we delete the message from the server */ + // let mid = msg.rfc724_mid; + // let server_folder = msg.server_folder.as_ref().unwrap(); + // let res = imap_inbox + // .delete_msg(context, &mid, server_folder, &mut msg.server_uid) + // .await; + // if res == ImapActionResult::RetryLater { + // // XXX RetryLater is converted to RetryNow here + // return Status::RetryNow; + // } + // } + // Message::delete_from_db(context, msg.id).await; + // Status::Finished(Ok(())) + // } else { + // /* eg. device messages have no Message-ID */ + // Status::Finished(Ok(())) + // } } async fn empty_server(&mut self, context: &Context) -> Status { - let imap_inbox = &context.inbox_thread.imap; - if self.foreign_id & DC_EMPTY_MVBOX > 0 { - if let Some(mvbox_folder) = context - .sql - .get_raw_config(context, "configured_mvbox_folder") - .await - { - imap_inbox.empty_folder(context, &mvbox_folder).await; - } - } - if self.foreign_id & DC_EMPTY_INBOX > 0 { - imap_inbox.empty_folder(context, "INBOX").await; - } - Status::Finished(Ok(())) + unimplemented!(); + // let imap_inbox = &context.inbox_thread.imap; + // if self.foreign_id & DC_EMPTY_MVBOX > 0 { + // if let Some(mvbox_folder) = context + // .sql + // .get_raw_config(context, "configured_mvbox_folder") + // .await + // { + // imap_inbox.empty_folder(context, &mvbox_folder).await; + // } + // } + // if self.foreign_id & DC_EMPTY_INBOX > 0 { + // imap_inbox.empty_folder(context, "INBOX").await; + // } + // Status::Finished(Ok(())) } async fn markseen_msg_on_imap(&mut self, context: &Context) -> Status { - let imap_inbox = &context.inbox_thread.imap; + unimplemented!(); + // let imap_inbox = &context.inbox_thread.imap; - let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); + // let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); - let folder = msg.server_folder.as_ref().unwrap(); - match imap_inbox.set_seen(context, folder, msg.server_uid).await { - ImapActionResult::RetryLater => Status::RetryLater, - ImapActionResult::AlreadyDone => Status::Finished(Ok(())), - ImapActionResult::Success | ImapActionResult::Failed => { - // XXX the message might just have been moved - // we want to send out an MDN anyway - // The job will not be retried so locally - // there is no risk of double-sending MDNs. - if msg.param.get_bool(Param::WantsMdn).unwrap_or_default() - && context.get_config_bool(Config::MdnsEnabled).await - { - if let Err(err) = send_mdn(context, &msg).await { - warn!(context, "could not send out mdn for {}: {}", msg.id, err); - return Status::Finished(Err(err)); - } - } - Status::Finished(Ok(())) - } - } + // let folder = msg.server_folder.as_ref().unwrap(); + // match imap_inbox.set_seen(context, folder, msg.server_uid).await { + // ImapActionResult::RetryLater => Status::RetryLater, + // ImapActionResult::AlreadyDone => Status::Finished(Ok(())), + // ImapActionResult::Success | ImapActionResult::Failed => { + // // XXX the message might just have been moved + // // we want to send out an MDN anyway + // // The job will not be retried so locally + // // there is no risk of double-sending MDNs. + // if msg.param.get_bool(Param::WantsMdn).unwrap_or_default() + // && context.get_config_bool(Config::MdnsEnabled).await + // { + // if let Err(err) = send_mdn(context, &msg).await { + // warn!(context, "could not send out mdn for {}: {}", msg.id, err); + // return Status::Finished(Err(err)); + // } + // } + // Status::Finished(Ok(())) + // } + // } } async fn markseen_mdn_on_imap(&mut self, context: &Context) -> Status { - let folder = self - .param - .get(Param::ServerFolder) - .unwrap_or_default() - .to_string(); - let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32; - let imap_inbox = &context.inbox_thread.imap; - if imap_inbox.set_seen(context, &folder, uid).await == ImapActionResult::RetryLater { - return Status::RetryLater; - } - if self.param.get_bool(Param::AlsoMove).unwrap_or_default() { - if let Err(err) = imap_inbox.ensure_configured_folders(context, true).await { - warn!(context, "configuring folders failed: {:?}", err); - return Status::RetryLater; - } - let dest_folder = context - .sql - .get_raw_config(context, "configured_mvbox_folder") - .await; - if let Some(dest_folder) = dest_folder { - let mut dest_uid = 0; - if ImapActionResult::RetryLater - == imap_inbox - .mv(context, &folder, uid, &dest_folder, &mut dest_uid) - .await - { - Status::RetryLater - } else { - Status::Finished(Ok(())) - } - } else { - Status::Finished(Err(format_err!("MVBOX is not configured"))) - } - } else { - Status::Finished(Ok(())) - } + unimplemented!(); + // let folder = self + // .param + // .get(Param::ServerFolder) + // .unwrap_or_default() + // .to_string(); + // let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32; + // let imap_inbox = &context.inbox_thread.imap; + // if imap_inbox.set_seen(context, &folder, uid).await == ImapActionResult::RetryLater { + // return Status::RetryLater; + // } + // if self.param.get_bool(Param::AlsoMove).unwrap_or_default() { + // if let Err(err) = imap_inbox.ensure_configured_folders(context, true).await { + // warn!(context, "configuring folders failed: {:?}", err); + // return Status::RetryLater; + // } + // let dest_folder = context + // .sql + // .get_raw_config(context, "configured_mvbox_folder") + // .await; + // if let Some(dest_folder) = dest_folder { + // let mut dest_uid = 0; + // if ImapActionResult::RetryLater + // == imap_inbox + // .mv(context, &folder, uid, &dest_folder, &mut dest_uid) + // .await + // { + // Status::RetryLater + // } else { + // Status::Finished(Ok(())) + // } + // } else { + // Status::Finished(Err(format_err!("MVBOX is not configured"))) + // } + // } else { + // Status::Finished(Ok(())) + // } } } @@ -599,116 +607,126 @@ pub async fn kill_ids(context: &Context, job_ids: &[u32]) -> sql::Result<()> { } pub async fn perform_inbox_fetch(context: &Context) { - let use_network = context.get_config_bool(Config::InboxWatch).await; + unimplemented!(); + // let use_network = context.get_config_bool(Config::InboxWatch).await; - context.inbox_thread.fetch(context, use_network).await; + // context.inbox_thread.fetch(context, use_network).await; } pub async fn perform_mvbox_fetch(context: &Context) { - let use_network = context.get_config_bool(Config::MvboxWatch).await; + unimplemented!(); + // let use_network = context.get_config_bool(Config::MvboxWatch).await; - context.mvbox_thread.fetch(context, use_network).await; + // context.mvbox_thread.fetch(context, use_network).await; } pub async fn perform_sentbox_fetch(context: &Context) { - let use_network = context.get_config_bool(Config::SentboxWatch).await; + unimplemented!(); + // let use_network = context.get_config_bool(Config::SentboxWatch).await; - context.sentbox_thread.fetch(context, use_network).await; + // context.sentbox_thread.fetch(context, use_network).await; } pub async fn perform_inbox_idle(context: &Context) { - if context - .perform_inbox_jobs_needed - .load(std::sync::atomic::Ordering::Relaxed) - { - info!( - context, - "INBOX-IDLE will not be started because of waiting jobs." - ); - return; - } - let use_network = context.get_config_bool(Config::InboxWatch).await; + unimplemented!(); + // if context + // .perform_inbox_jobs_needed + // .load(std::sync::atomic::Ordering::Relaxed) + // { + // info!( + // context, + // "INBOX-IDLE will not be started because of waiting jobs." + // ); + // return; + // } + // let use_network = context.get_config_bool(Config::InboxWatch).await; - context.inbox_thread.idle(context, use_network).await; + // context.inbox_thread.idle(context, use_network).await; } pub async fn perform_mvbox_idle(context: &Context) { - let use_network = context.get_config_bool(Config::MvboxWatch).await; + unimplemented!(); + // let use_network = context.get_config_bool(Config::MvboxWatch).await; - context.mvbox_thread.idle(context, use_network).await; -} + // context.mvbox_thread.idle(context, use_network).await; + // } -pub async fn perform_sentbox_idle(context: &Context) { - let use_network = context.get_config_bool(Config::SentboxWatch).await; + // pub async fn perform_sentbox_idle(context: &Context) { + // let use_network = context.get_config_bool(Config::SentboxWatch).await; - context.sentbox_thread.idle(context, use_network).await; + // context.sentbox_thread.idle(context, use_network).await; } pub async fn interrupt_inbox_idle(context: &Context) { - info!(context, "interrupt_inbox_idle called"); - // we do not block on trying to obtain the thread lock - // because we don't know in which state the thread is. - // If it's currently fetching then we can not get the lock - // but we flag it for checking jobs so that idle will be skipped. - if !context.inbox_thread.try_interrupt_idle(context).await { - context - .perform_inbox_jobs_needed - .store(true, std::sync::atomic::Ordering::Relaxed); - warn!(context, "could not interrupt idle"); - } + unimplemented!(); + // info!(context, "interrupt_inbox_idle called"); + // // we do not block on trying to obtain the thread lock + // // because we don't know in which state the thread is. + // // If it's currently fetching then we can not get the lock + // // but we flag it for checking jobs so that idle will be skipped. + // if !context.inbox_thread.try_interrupt_idle(context).await { + // context + // .perform_inbox_jobs_needed + // .store(true, std::sync::atomic::Ordering::Relaxed); + // warn!(context, "could not interrupt idle"); + // } } pub async fn interrupt_mvbox_idle(context: &Context) { - context.mvbox_thread.interrupt_idle(context).await; + unimplemented!(); + // context.mvbox_thread.interrupt_idle(context).await; } pub async fn interrupt_sentbox_idle(context: &Context) { - context.sentbox_thread.interrupt_idle(context).await; + unimplemented!(); + // context.sentbox_thread.interrupt_idle(context).await; } pub async fn perform_smtp_jobs(context: &Context) { - let probe_smtp_network = { - let state = &mut *context.smtp.state.write().await; + unimplemented!(); + // let probe_smtp_network = { + // let state = &mut *context.smtp.state.write().await; - let probe_smtp_network = state.probe_network; - state.probe_network = false; - state.perform_jobs_needed = PerformJobsNeeded::Not; + // let probe_smtp_network = state.probe_network; + // state.probe_network = false; + // state.perform_jobs_needed = PerformJobsNeeded::Not; - if state.suspended { - info!(context, "SMTP-jobs suspended.",); - return; - } - state.doing_jobs = true; - probe_smtp_network - }; + // if state.suspended { + // info!(context, "SMTP-jobs suspended.",); + // return; + // } + // state.doing_jobs = true; + // probe_smtp_network + // }; - info!(context, "SMTP-jobs started...",); - job_perform(context, Thread::Smtp, probe_smtp_network).await; - info!(context, "SMTP-jobs ended."); + // info!(context, "SMTP-jobs started...",); + // job_perform(context, Thread::Smtp, probe_smtp_network).await; + // info!(context, "SMTP-jobs ended."); - context.smtp.state.write().await.doing_jobs = false; + // context.smtp.state.write().await.doing_jobs = false; } pub async fn perform_smtp_idle(context: &Context) { - info!(context, "SMTP-idle started..."); + unimplemented!(); + // info!(context, "SMTP-idle started..."); - let perform_jobs_needed = context.smtp.state.read().await.perform_jobs_needed.clone(); + // let perform_jobs_needed = context.smtp.state.read().await.perform_jobs_needed.clone(); - match perform_jobs_needed { - PerformJobsNeeded::AtOnce => { - info!( - context, - "SMTP-idle will not be started because of waiting jobs.", - ); - } - PerformJobsNeeded::Not | PerformJobsNeeded::AvoidDos => { - let dur = get_next_wakeup_time(context, Thread::Smtp).await; + // match perform_jobs_needed { + // PerformJobsNeeded::AtOnce => { + // info!( + // context, + // "SMTP-idle will not be started because of waiting jobs.", + // ); + // } + // PerformJobsNeeded::Not | PerformJobsNeeded::AvoidDos => { + // let dur = get_next_wakeup_time(context, Thread::Smtp).await; - context.smtp.notify_receiver.recv().timeout(dur).await.ok(); - } - } + // context.smtp.notify_receiver.recv().timeout(dur).await.ok(); + // } + // } - info!(context, "SMTP-idle ended.",); + // info!(context, "SMTP-idle ended.",); } async fn get_next_wakeup_time(context: &Context, thread: Thread) -> time::Duration { @@ -736,17 +754,18 @@ async fn get_next_wakeup_time(context: &Context, thread: Thread) -> time::Durati } pub async fn maybe_network(context: &Context) { - { - context.smtp.state.write().await.probe_network = true; - context - .probe_imap_network - .store(true, std::sync::atomic::Ordering::Relaxed); - } + unimplemented!(); + // { + // context.smtp.state.write().await.probe_network = true; + // context + // .probe_imap_network + // .store(true, std::sync::atomic::Ordering::Relaxed); + // } - interrupt_smtp_idle(context).await; - interrupt_inbox_idle(context).await; - interrupt_mvbox_idle(context).await; - interrupt_sentbox_idle(context).await; + // interrupt_smtp_idle(context).await; + // interrupt_inbox_idle(context).await; + // interrupt_mvbox_idle(context).await; + // interrupt_sentbox_idle(context).await; } pub async fn action_exists(context: &Context, action: Action) -> bool { @@ -879,123 +898,127 @@ pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> { } pub async fn perform_inbox_jobs(context: &Context) { - info!(context, "dc_perform_inbox_jobs starting.",); + unimplemented!(); + // info!(context, "dc_perform_inbox_jobs starting.",); - let probe_imap_network = context - .probe_imap_network - .load(std::sync::atomic::Ordering::Relaxed); - context - .probe_imap_network - .store(false, std::sync::atomic::Ordering::Relaxed); - context - .perform_inbox_jobs_needed - .store(false, std::sync::atomic::Ordering::Relaxed); + // let probe_imap_network = context + // .probe_imap_network + // .load(std::sync::atomic::Ordering::Relaxed); + // context + // .probe_imap_network + // .store(false, std::sync::atomic::Ordering::Relaxed); + // context + // .perform_inbox_jobs_needed + // .store(false, std::sync::atomic::Ordering::Relaxed); - job_perform(context, Thread::Imap, probe_imap_network).await; - info!(context, "dc_perform_inbox_jobs ended.",); + // job_perform(context, Thread::Imap, probe_imap_network).await; + // info!(context, "dc_perform_inbox_jobs ended.",); } pub async fn perform_mvbox_jobs(context: &Context) { - info!(context, "dc_perform_mbox_jobs EMPTY (for now)."); + unimplemented!(); + // info!(context, "dc_perform_mbox_jobs EMPTY (for now)."); } pub async fn perform_sentbox_jobs(context: &Context) { - info!(context, "dc_perform_sentbox_jobs EMPTY (for now)."); + unimplemented!(); + // info!(context, "dc_perform_sentbox_jobs EMPTY (for now)."); } async fn job_perform(context: &Context, thread: Thread, probe_network: bool) { - while let Some(mut job) = load_next_job(context, thread, probe_network).await { - info!(context, "{}-job {} started...", thread, job); + unimplemented!(); + // while let Some(mut job) = load_next_job(context, thread, probe_network).await { + // info!(context, "{}-job {} started...", thread, job); - // some configuration jobs are "exclusive": - // - they are always executed in the imap-thread and the smtp-thread is suspended during execution - // - they may change the database handle; we do not keep old pointers therefore - // - they can be re-executed one time AT_ONCE, but they are not saved in the database for later execution - if Action::ConfigureImap == job.action || Action::ImexImap == job.action { - job::kill_action(context, job.action).await; - context.sentbox_thread.suspend(context).await; - context.mvbox_thread.suspend(context).await; - suspend_smtp_thread(context, true).await; - } + // // some configuration jobs are "exclusive": + // // - they are always executed in the imap-thread and the smtp-thread is suspended during execution + // // - they may change the database handle; we do not keep old pointers therefore + // // - they can be re-executed one time AT_ONCE, but they are not saved in the database for later execution + // if Action::ConfigureImap == job.action || Action::ImexImap == job.action { + // job::kill_action(context, job.action).await; + // context.sentbox_thread.suspend(context).await; + // context.mvbox_thread.suspend(context).await; + // suspend_smtp_thread(context, true).await; + // } - let try_res = match perform_job_action(context, &mut job, thread, 0).await { - Status::RetryNow => perform_job_action(context, &mut job, thread, 1).await, - x => x, - }; + // let try_res = match perform_job_action(context, &mut job, thread, 0).await { + // Status::RetryNow => perform_job_action(context, &mut job, thread, 1).await, + // x => x, + // }; - if Action::ConfigureImap == job.action || Action::ImexImap == job.action { - context.sentbox_thread.unsuspend(context).await; - context.mvbox_thread.unsuspend(context).await; - suspend_smtp_thread(context, false).await; - break; - } + // if Action::ConfigureImap == job.action || Action::ImexImap == job.action { + // context.sentbox_thread.unsuspend(context).await; + // context.mvbox_thread.unsuspend(context).await; + // suspend_smtp_thread(context, false).await; + // break; + // } - match try_res { - Status::RetryNow | Status::RetryLater => { - let tries = job.tries + 1; + // match try_res { + // Status::RetryNow | Status::RetryLater => { + // let tries = job.tries + 1; - if tries < JOB_RETRIES { - info!( - context, - "{} thread increases job {} tries to {}", thread, job, tries - ); - job.tries = tries; - let time_offset = get_backoff_time_offset(tries); - job.desired_timestamp = time() + time_offset; - job.update(context).await; - info!( - context, - "{}-job #{} not succeeded on try #{}, retry in {} seconds.", - thread, - job.job_id as u32, - tries, - time_offset - ); - if thread == Thread::Smtp && tries < JOB_RETRIES - 1 { - context.smtp.state.write().await.perform_jobs_needed = - PerformJobsNeeded::AvoidDos; - } - } else { - info!( - context, - "{} thread removes job {} as it exhausted {} retries", - thread, - job, - JOB_RETRIES - ); - if job.action == Action::SendMsgToSmtp { - message::set_msg_failed( - context, - MsgId::new(job.foreign_id), - job.pending_error.as_ref(), - ) - .await; - } - job.delete(context).await; - } - if !probe_network { - continue; - } - // on dc_maybe_network() we stop trying here; - // these jobs are already tried once. - // otherwise, we just continue with the next job - // to give other jobs a chance being tried at least once. - break; - } - Status::Finished(res) => { - if let Err(err) = res { - warn!( - context, - "{} removes job {} as it failed with error {:?}", thread, job, err - ); - } else { - info!(context, "{} removes job {} as it succeeded", thread, job); - } + // if tries < JOB_RETRIES { + // info!( + // context, + // "{} thread increases job {} tries to {}", thread, job, tries + // ); + // job.tries = tries; + // let time_offset = get_backoff_time_offset(tries); + // job.desired_timestamp = time() + time_offset; + // job.update(context).await; + // info!( + // context, + // "{}-job #{} not succeeded on try #{}, retry in {} seconds.", + // thread, + // job.job_id as u32, + // tries, + // time_offset + // ); + // if thread == Thread::Smtp && tries < JOB_RETRIES - 1 { + // context.smtp.state.write().await.perform_jobs_needed = + // PerformJobsNeeded::AvoidDos; + // } + // } else { + // info!( + // context, + // "{} thread removes job {} as it exhausted {} retries", + // thread, + // job, + // JOB_RETRIES + // ); + // if job.action == Action::SendMsgToSmtp { + // message::set_msg_failed( + // context, + // MsgId::new(job.foreign_id), + // job.pending_error.as_ref(), + // ) + // .await; + // } + // job.delete(context).await; + // } + // if !probe_network { + // continue; + // } + // // on dc_maybe_network() we stop trying here; + // // these jobs are already tried once. + // // otherwise, we just continue with the next job + // // to give other jobs a chance being tried at least once. + // break; + // } + // Status::Finished(res) => { + // if let Err(err) = res { + // warn!( + // context, + // "{} removes job {} as it failed with error {:?}", thread, job, err + // ); + // } else { + // info!(context, "{} removes job {} as it succeeded", thread, job); + // } - job.delete(context).await; - } - } - } + // job.delete(context).await; + // } + // } + // } } async fn perform_job_action( @@ -1056,15 +1079,16 @@ fn get_backoff_time_offset(tries: u32) -> i64 { } async fn suspend_smtp_thread(context: &Context, suspend: bool) { - context.smtp.state.write().await.suspended = suspend; - if suspend { - loop { - if !context.smtp.state.read().await.doing_jobs { - return; - } - async_std::task::sleep(time::Duration::from_micros(300 * 1000)).await; - } - } + unimplemented!(); + // context.smtp.state.write().await.suspended = suspend; + // if suspend { + // loop { + // if !context.smtp.state.read().await.doing_jobs { + // return; + // } + // async_std::task::sleep(time::Duration::from_micros(300 * 1000)).await; + // } + // } } async fn send_mdn(context: &Context, msg: &Message) -> Result<()> { @@ -1133,12 +1157,13 @@ pub async fn add( } pub async fn interrupt_smtp_idle(context: &Context) { - info!(context, "Interrupting SMTP-idle...",); + unimplemented!(); + // info!(context, "Interrupting SMTP-idle...",); - context.smtp.state.write().await.perform_jobs_needed = PerformJobsNeeded::AtOnce; - context.smtp.notify_sender.send(()).await; + // context.smtp.state.write().await.perform_jobs_needed = PerformJobsNeeded::AtOnce; + // context.smtp.notify_sender.send(()).await; - info!(context, "Interrupting SMTP-idle... ended",); + // info!(context, "Interrupting SMTP-idle... ended",); } /// Load jobs from the database. diff --git a/src/job_thread.rs b/src/job_thread.rs index 2d3f843f9..12d86bc50 100644 --- a/src/job_thread.rs +++ b/src/job_thread.rs @@ -35,7 +35,7 @@ impl JobThread { } } - pub async fn suspend(&self, context: &Context) { + pub async fn suspend(&mut self, context: &Context) { info!(context, "Suspending {}-thread.", self.name,); { self.state.lock().await.suspended = true; @@ -62,7 +62,7 @@ impl JobThread { self.notify_sender.send(()).await; } - pub async fn try_interrupt_idle(&self, context: &Context) -> bool { + pub async fn try_interrupt_idle(&mut self, context: &Context) -> bool { if self.state.lock().await.using_handle { self.interrupt_idle(context).await; return true; @@ -71,7 +71,7 @@ impl JobThread { false } - pub async fn interrupt_idle(&self, context: &Context) { + pub async fn interrupt_idle(&mut self, context: &Context) { { self.state.lock().await.jobs_needed = true; } @@ -85,7 +85,7 @@ impl JobThread { info!(context, "Interrupting {}-IDLE... finished", self.name); } - pub async fn fetch(&self, context: &Context, use_network: bool) { + pub async fn fetch(&mut self, context: &Context, use_network: bool) { { let lock = &*self.state.clone(); let mut state = lock.lock().await; @@ -111,7 +111,7 @@ impl JobThread { } } - async fn connect_and_fetch(&self, context: &Context) -> Result<()> { + async fn connect_and_fetch(&mut self, context: &Context) -> Result<()> { let prefix = format!("{}-fetch", self.name); match self.imap.connect_configured(context).await { Ok(()) => { @@ -153,7 +153,7 @@ impl JobThread { } } - pub async fn idle(&self, context: &Context, use_network: bool) { + pub async fn idle(&mut self, context: &Context, use_network: bool) { { let lock = &*self.state.clone(); let mut state = lock.lock().await; @@ -185,7 +185,7 @@ impl JobThread { let prefix = format!("{}-IDLE", self.name); let do_fake_idle = match self.imap.connect_configured(context).await { Ok(()) => { - if !self.imap.can_idle().await { + if !self.imap.can_idle() { true // we have to do fake_idle } else { let watch_folder = self.get_watch_folder(context).await; diff --git a/src/lib.rs b/src/lib.rs index c9f8aafe9..3d2dd376c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -51,6 +51,7 @@ pub mod context; mod e2ee; mod imap; pub mod imex; +mod scheduler; #[macro_use] pub mod job; mod job_thread; diff --git a/src/oauth2.rs b/src/oauth2.rs index 8eb0fc286..6bb657882 100644 --- a/src/oauth2.rs +++ b/src/oauth2.rs @@ -84,8 +84,9 @@ pub async fn dc_get_oauth2_access_token( regenerate: bool, ) -> Option { if let Some(oauth2) = Oauth2::from_address(addr) { - let lock = context.oauth2_critical.clone(); - let _l = lock.lock().await; + // TODO: FIXME + // let lock = context.oauth2_critical.clone(); + // let _l = lock.lock().await; // read generated token if !regenerate && !is_expired(context).await { diff --git a/src/scheduler.rs b/src/scheduler.rs new file mode 100644 index 000000000..f010b2359 --- /dev/null +++ b/src/scheduler.rs @@ -0,0 +1,237 @@ +use async_std::prelude::*; +use async_std::sync::{channel, Receiver, Sender}; + +const MAX_JOBS_WAITING: usize = 50; + +use crate::imap::Imap; +use crate::smtp::Smtp; + +/// Job and connection scheduler. +#[derive(Debug)] +pub(crate) enum Scheduler { + Stopped, + Running { + inbox: ImapConnectionState, + mvbox: ImapConnectionState, + sentbox: ImapConnectionState, + smtp: SmtpConnectionState, + }, +} + +impl Scheduler { + /// Start the scheduler, panics if it is already running. + pub async fn run(&mut self) { + match self { + Scheduler::Stopped => { + let ( + ( + ((inbox, inbox_handlers), (mvbox, mvbox_handlers)), + (sentbox, sentbox_handlers), + ), + (smtp, smtp_handlers), + ) = ImapConnectionState::new() + .join(ImapConnectionState::new()) + .join(ImapConnectionState::new()) + .join(SmtpConnectionState::new()) + .await; + + *self = Scheduler::Running { + inbox, + mvbox, + sentbox, + smtp, + }; + } + Scheduler::Running { .. } => { + // TODO: return an error + panic!("WARN: already running"); + } + } + } + + /// Halt the scheduler, panics if it is already stopped. + pub async fn stop(&mut self) { + match self { + Scheduler::Stopped => { + panic!("WARN: already stopped"); + } + Scheduler::Running { + inbox, + mvbox, + sentbox, + smtp, + } => { + inbox + .stop() + .join(mvbox.stop()) + .join(sentbox.stop()) + .join(smtp.stop()) + .await; + } + } + } + + /// Check if the scheduler is running. + pub fn is_running(&self) -> bool { + match self { + Scheduler::Running { .. } => true, + _ => false, + } + } + + /// Check if the scheduler is stoppd. + pub fn is_stopped(&self) -> bool { + match self { + Scheduler::Stopped => true, + _ => false, + } + } +} + +/// Connection state logic shared between imap and smtp connections. +#[derive(Debug)] +struct ConnectionState { + /// Channel to notify that shutdown has completed. + shutdown_receiver: Receiver<()>, + /// Channel to interrupt the whole connection. + stop_sender: Sender<()>, + /// Channel to receive new jobs. + jobs_receiver: Receiver, + /// Channel to schedule new jobs. + jobs_sender: Sender, +} + +impl ConnectionState { + /// Send a new job. + pub async fn send_job(&self, job: T) { + self.jobs_sender.send(job).await; + } + + /// Shutdown this connection completely. + pub async fn stop(&self) { + // Trigger shutdown of the run loop. + self.stop_sender.send(()).await; + // Wait for a notification that the run loop has been shutdown. + self.shutdown_receiver.recv().await; + } +} + +#[derive(Debug)] +pub(crate) struct SmtpConnectionState { + state: ConnectionState, +} + +impl SmtpConnectionState { + async fn new() -> (Self, SmtpConnectionHandlers) { + let (jobs_sender, jobs_receiver) = channel(50); + let (stop_sender, stop_receiver) = channel(1); + let (shutdown_sender, shutdown_receiver) = channel(1); + + let handlers = SmtpConnectionHandlers { + connection: Smtp::new(), + stop_receiver, + shutdown_sender, + }; + + let state = ConnectionState { + shutdown_receiver, + stop_sender, + jobs_sender, + jobs_receiver, + }; + + let conn = SmtpConnectionState { state }; + + (conn, handlers) + } + + /// Send a new job. + async fn send_job(&self, job: SmtpJob) { + self.state.send_job(job).await; + } + + /// Shutdown this connection completely. + async fn stop(&self) { + self.state.stop().await; + } +} + +#[derive(Debug)] +struct SmtpConnectionHandlers { + connection: Smtp, + stop_receiver: Receiver<()>, + shutdown_sender: Sender<()>, +} + +#[derive(Debug)] +pub(crate) struct ImapConnectionState { + /// Channel to interrupt idle. + idle_interrupt_sender: Sender<()>, + state: ConnectionState, +} + +impl ImapConnectionState { + /// Construct a new connection. + async fn new() -> (Self, ImapConnectionHandlers) { + let (jobs_sender, jobs_receiver) = channel(MAX_JOBS_WAITING); + let (stop_sender, stop_receiver) = channel(1); + let (idle_interrupt_sender, idle_interrupt_receiver) = channel(1); + let (shutdown_sender, shutdown_receiver) = channel(1); + + let handlers = ImapConnectionHandlers { + connection: Imap::new(idle_interrupt_receiver), + stop_receiver, + shutdown_sender, + }; + + let state = ConnectionState { + shutdown_receiver, + stop_sender, + jobs_sender, + jobs_receiver, + }; + + let conn = ImapConnectionState { + idle_interrupt_sender, + state, + }; + + (conn, handlers) + } + + /// Send a new job. + async fn send_job(&self, job: T) { + self.state + .send_job(job) + .join(self.idle_interrupt_sender.send(())) + .await; + } + + /// Shutdown this connection completely. + async fn stop(&self) { + self.state.stop().await; + } +} + +#[derive(Debug)] +struct ImapConnectionHandlers { + connection: Imap, + stop_receiver: Receiver<()>, + shutdown_sender: Sender<()>, +} + +/// Jobs handled by the inbox connection. +#[derive(Debug)] +pub enum InboxJob {} + +/// Jobs handled by the mvbox connection. +#[derive(Debug)] +pub enum MvboxJob {} + +/// Jobs handled by the sentbox connection. +#[derive(Debug)] +pub enum SentboxJob {} + +/// Jobs handled by the smtp connection. +#[derive(Debug)] +pub enum SmtpJob {} diff --git a/src/smtp/mod.rs b/src/smtp/mod.rs index 0734c717b..5491c763a 100644 --- a/src/smtp/mod.rs +++ b/src/smtp/mod.rs @@ -50,36 +50,8 @@ impl From for Error { pub type Result = std::result::Result; -#[derive(Debug)] -pub struct Smtp { - inner: RwLock, - pub(crate) state: RwLock, - pub(crate) notify_sender: Sender<()>, - pub(crate) notify_receiver: Receiver<()>, -} - -impl Default for Smtp { - fn default() -> Self { - let (notify_sender, notify_receiver) = channel(1); - Smtp { - inner: Default::default(), - state: Default::default(), - notify_sender, - notify_receiver, - } - } -} - -#[derive(Default, Debug)] -pub struct State { - pub(crate) suspended: bool, - pub(crate) doing_jobs: bool, - pub(crate) perform_jobs_needed: PerformJobsNeeded, - pub(crate) probe_network: bool, -} - #[derive(Default, DebugStub)] -struct SmtpInner { +pub struct Smtp { #[debug_stub(some = "SmtpTransport")] transport: Option, @@ -99,18 +71,17 @@ impl Smtp { } /// Disconnect the SMTP transport and drop it entirely. - pub async fn disconnect(&self) { - let inner = &mut *self.inner.write().await; - if let Some(mut transport) = inner.transport.take() { + pub async fn disconnect(&mut self) { + if let Some(mut transport) = self.transport.take() { transport.close().await.ok(); } - inner.last_success = None; + self.last_success = None; } /// Return true if smtp was connected but is not known to /// have been successfully used the last 60 seconds pub async fn has_maybe_stale_connection(&self) -> bool { - if let Some(last_success) = self.inner.read().await.last_success { + if let Some(last_success) = self.last_success { Instant::now().duration_since(last_success).as_secs() > 60 } else { false @@ -119,17 +90,14 @@ impl Smtp { /// Check whether we are connected. pub async fn is_connected(&self) -> bool { - self.inner - .read() - .await - .transport + self.transport .as_ref() .map(|t| t.is_connected()) .unwrap_or_default() } /// Connect using the provided login params. - pub async fn connect(&self, context: &Context, lp: &LoginParam) -> Result<()> { + pub async fn connect(&mut self, context: &Context, lp: &LoginParam) -> Result<()> { if self.is_connected().await { warn!(context, "SMTP already connected."); return Ok(()); @@ -146,8 +114,7 @@ impl Smtp { error: err, })?; - let inner = &mut *self.inner.write().await; - inner.from = Some(from); + self.from = Some(from); let domain = &lp.send_server; let port = lp.send_port as u16; @@ -208,8 +175,8 @@ impl Smtp { let mut trans = client.into_transport(); trans.connect().await.map_err(Error::ConnectionFailure)?; - inner.transport = Some(trans); - inner.last_success = Some(Instant::now()); + self.transport = Some(trans); + self.last_success = Some(Instant::now()); context.call_cb(Event::SmtpConnected(format!( "SMTP-LOGIN as {} ok", diff --git a/src/smtp/send.rs b/src/smtp/send.rs index 6305acce3..306ba917d 100644 --- a/src/smtp/send.rs +++ b/src/smtp/send.rs @@ -24,7 +24,7 @@ impl Smtp { /// Send a prepared mail to recipients. /// On successful send out Ok() is returned. pub async fn send( - &self, + &mut self, context: &Context, recipients: Vec, message: Vec, @@ -38,23 +38,22 @@ impl Smtp { .collect::>() .join(","); - let envelope = Envelope::new(self.inner.read().await.from.clone(), recipients) - .map_err(Error::EnvelopeError)?; + let envelope = + Envelope::new(self.from.clone(), recipients).map_err(Error::EnvelopeError)?; let mail = SendableEmail::new( envelope, format!("{}", job_id), // only used for internal logging message, ); - let inner = &mut *self.inner.write().await; - if let Some(ref mut transport) = inner.transport { + if let Some(ref mut transport) = self.transport { transport.send(mail).await.map_err(Error::SendError)?; context.call_cb(Event::SmtpMessageSent(format!( "Message len={} was smtp-sent to {}", message_len, recipients_display ))); - inner.last_success = Some(std::time::Instant::now()); + self.last_success = Some(std::time::Instant::now()); Ok(()) } else {