start setting up new scheduler

This commit is contained in:
dignifiedquire
2020-03-17 13:48:03 +01:00
parent efc17983c3
commit ce5b95f8e5
12 changed files with 1400 additions and 1147 deletions

View File

@@ -52,402 +52,403 @@ impl Context {
******************************************************************************/ ******************************************************************************/
#[allow(clippy::cognitive_complexity)] #[allow(clippy::cognitive_complexity)]
pub(crate) async fn job_configure_imap(context: &Context) -> job::Status { pub(crate) async fn job_configure_imap(context: &Context) -> job::Status {
if !context.sql.is_open().await { unimplemented!()
error!(context, "Cannot configure, database not opened.",); // if !context.sql.is_open().await {
progress!(context, 0); // error!(context, "Cannot configure, database not opened.",);
return job::Status::Finished(Err(format_err!("Database not opened"))); // progress!(context, 0);
} // return job::Status::Finished(Err(format_err!("Database not opened")));
if !context.alloc_ongoing().await { // }
progress!(context, 0); // if !context.alloc_ongoing().await {
return job::Status::Finished(Err(format_err!("Cannot allocated ongoing process"))); // progress!(context, 0);
} // return job::Status::Finished(Err(format_err!("Cannot allocated ongoing process")));
let mut success = false; // }
let mut imap_connected_here = false; // let mut success = false;
let mut smtp_connected_here = false; // let mut imap_connected_here = false;
// let mut smtp_connected_here = false;
let mut param_autoconfig: Option<LoginParam> = None; // let mut param_autoconfig: Option<LoginParam> = None;
context.inbox_thread.imap.disconnect(context).await; // // context.inbox_thread.imap.disconnect(context).await;
context.sentbox_thread.imap.disconnect(context).await; // // context.sentbox_thread.imap.disconnect(context).await;
context.mvbox_thread.imap.disconnect(context).await; // // context.mvbox_thread.imap.disconnect(context).await;
context.smtp.disconnect().await; // // context.smtp.disconnect().await;
info!(context, "Configure ...",); // info!(context, "Configure ...",);
// Variables that are shared between steps: // // Variables that are shared between steps:
let mut param = LoginParam::from_database(context, "").await; // let mut param = LoginParam::from_database(context, "").await;
// need all vars here to be mutable because rust thinks the same step could be called multiple times // // need all vars here to be mutable because rust thinks the same step could be called multiple times
// and also initialize, because otherwise rust thinks it's used while unitilized, even if thats not the case as the loop goes only forward // // and also initialize, because otherwise rust thinks it's used while unitilized, even if thats not the case as the loop goes only forward
let mut param_domain = "undefined.undefined".to_owned(); // let mut param_domain = "undefined.undefined".to_owned();
let mut param_addr_urlencoded: String = // let mut param_addr_urlencoded: String =
"Internal Error: this value should never be used".to_owned(); // "Internal Error: this value should never be used".to_owned();
let mut keep_flags = 0; // let mut keep_flags = 0;
const STEP_12_USE_AUTOCONFIG: u8 = 12; // const STEP_12_USE_AUTOCONFIG: u8 = 12;
const STEP_13_AFTER_AUTOCONFIG: u8 = 13; // const STEP_13_AFTER_AUTOCONFIG: u8 = 13;
let mut step_counter: u8 = 0; // let mut step_counter: u8 = 0;
while !context.shall_stop_ongoing().await { // while !context.shall_stop_ongoing().await {
step_counter += 1; // step_counter += 1;
let success = match step_counter { // let success = match step_counter {
// Read login parameters from the database // // Read login parameters from the database
1 => { // 1 => {
progress!(context, 1); // progress!(context, 1);
if param.addr.is_empty() { // if param.addr.is_empty() {
error!(context, "Please enter an email address.",); // error!(context, "Please enter an email address.",);
} // }
!param.addr.is_empty() // !param.addr.is_empty()
} // }
// Step 1: Load the parameters and check email-address and password // // Step 1: Load the parameters and check email-address and password
2 => { // 2 => {
if 0 != param.server_flags & DC_LP_AUTH_OAUTH2 { // if 0 != param.server_flags & DC_LP_AUTH_OAUTH2 {
// the used oauth2 addr may differ, check this. // // the used oauth2 addr may differ, check this.
// if dc_get_oauth2_addr() is not available in the oauth2 implementation, // // if dc_get_oauth2_addr() is not available in the oauth2 implementation,
// just use the given one. // // just use the given one.
progress!(context, 10); // progress!(context, 10);
if let Some(oauth2_addr) = // if let Some(oauth2_addr) =
dc_get_oauth2_addr(context, &param.addr, &param.mail_pw) // dc_get_oauth2_addr(context, &param.addr, &param.mail_pw)
.await // .await
.and_then(|e| e.parse().ok()) // .and_then(|e| e.parse().ok())
{ // {
info!(context, "Authorized address is {}", oauth2_addr); // info!(context, "Authorized address is {}", oauth2_addr);
param.addr = oauth2_addr; // param.addr = oauth2_addr;
context // context
.sql // .sql
.set_raw_config(context, "addr", Some(param.addr.as_str())) // .set_raw_config(context, "addr", Some(param.addr.as_str()))
.await // .await
.ok(); // .ok();
} // }
progress!(context, 20); // progress!(context, 20);
} // }
true // no oauth? - just continue it's no error // true // no oauth? - just continue it's no error
} // }
3 => { // 3 => {
if let Ok(parsed) = param.addr.parse() { // if let Ok(parsed) = param.addr.parse() {
let parsed: EmailAddress = parsed; // let parsed: EmailAddress = parsed;
param_domain = parsed.domain; // param_domain = parsed.domain;
param_addr_urlencoded = // param_addr_urlencoded =
utf8_percent_encode(&param.addr, NON_ALPHANUMERIC).to_string(); // utf8_percent_encode(&param.addr, NON_ALPHANUMERIC).to_string();
true // true
} else { // } else {
error!(context, "Bad email-address."); // error!(context, "Bad email-address.");
false // false
} // }
} // }
// Step 2: Autoconfig // // Step 2: Autoconfig
4 => { // 4 => {
progress!(context, 200); // progress!(context, 200);
if param.mail_server.is_empty() // if param.mail_server.is_empty()
&& param.mail_port == 0 // && param.mail_port == 0
/*&&param.mail_user.is_empty() -- the user can enter a loginname which is used by autoconfig then */ // /*&&param.mail_user.is_empty() -- the user can enter a loginname which is used by autoconfig then */
&& param.send_server.is_empty() // && param.send_server.is_empty()
&& param.send_port == 0 // && param.send_port == 0
&& param.send_user.is_empty() // && param.send_user.is_empty()
/*&&param.send_pw.is_empty() -- the password cannot be auto-configured and is no criterion for autoconfig or not */ // /*&&param.send_pw.is_empty() -- the password cannot be auto-configured and is no criterion for autoconfig or not */
&& (param.server_flags & !DC_LP_AUTH_OAUTH2) == 0 // && (param.server_flags & !DC_LP_AUTH_OAUTH2) == 0
{ // {
// no advanced parameters entered by the user: query provider-database or do Autoconfig // // no advanced parameters entered by the user: query provider-database or do Autoconfig
keep_flags = param.server_flags & DC_LP_AUTH_OAUTH2; // keep_flags = param.server_flags & DC_LP_AUTH_OAUTH2;
if let Some(new_param) = get_offline_autoconfig(context, &param) { // if let Some(new_param) = get_offline_autoconfig(context, &param) {
// got parameters from our provider-database, skip Autoconfig, preserve the OAuth2 setting // // got parameters from our provider-database, skip Autoconfig, preserve the OAuth2 setting
param_autoconfig = Some(new_param); // param_autoconfig = Some(new_param);
step_counter = STEP_12_USE_AUTOCONFIG - 1; // minus one as step_counter is increased on next loop // step_counter = STEP_12_USE_AUTOCONFIG - 1; // minus one as step_counter is increased on next loop
} // }
} else { // } else {
// advanced parameters entered by the user: skip Autoconfig // // advanced parameters entered by the user: skip Autoconfig
step_counter = STEP_13_AFTER_AUTOCONFIG - 1; // minus one as step_counter is increased on next loop // step_counter = STEP_13_AFTER_AUTOCONFIG - 1; // minus one as step_counter is increased on next loop
} // }
true // true
} // }
/* A. Search configurations from the domain used in the email-address, prefer encrypted */ // /* A. Search configurations from the domain used in the email-address, prefer encrypted */
5 => { // 5 => {
if param_autoconfig.is_none() { // if param_autoconfig.is_none() {
let url = format!( // let url = format!(
"https://autoconfig.{}/mail/config-v1.1.xml?emailaddress={}", // "https://autoconfig.{}/mail/config-v1.1.xml?emailaddress={}",
param_domain, param_addr_urlencoded // param_domain, param_addr_urlencoded
); // );
param_autoconfig = moz_autoconfigure(context, &url, &param).ok(); // param_autoconfig = moz_autoconfigure(context, &url, &param).ok();
} // }
true // true
} // }
6 => { // 6 => {
progress!(context, 300); // progress!(context, 300);
if param_autoconfig.is_none() { // if param_autoconfig.is_none() {
// the doc does not mention `emailaddress=`, however, Thunderbird adds it, see https://releases.mozilla.org/pub/thunderbird/ , which makes some sense // // the doc does not mention `emailaddress=`, however, Thunderbird adds it, see https://releases.mozilla.org/pub/thunderbird/ , which makes some sense
let url = format!( // let url = format!(
"https://{}/.well-known/autoconfig/mail/config-v1.1.xml?emailaddress={}", // "https://{}/.well-known/autoconfig/mail/config-v1.1.xml?emailaddress={}",
param_domain, param_addr_urlencoded // param_domain, param_addr_urlencoded
); // );
param_autoconfig = moz_autoconfigure(context, &url, &param).ok(); // param_autoconfig = moz_autoconfigure(context, &url, &param).ok();
} // }
true // true
} // }
/* Outlook section start ------------- */ // /* Outlook section start ------------- */
/* Outlook uses always SSL but different domains (this comment describes the next two steps) */ // /* Outlook uses always SSL but different domains (this comment describes the next two steps) */
7 => { // 7 => {
progress!(context, 310); // progress!(context, 310);
if param_autoconfig.is_none() { // if param_autoconfig.is_none() {
let url = format!("https://{}/autodiscover/autodiscover.xml", param_domain); // let url = format!("https://{}/autodiscover/autodiscover.xml", param_domain);
param_autoconfig = outlk_autodiscover(context, &url, &param).ok(); // param_autoconfig = outlk_autodiscover(context, &url, &param).ok();
} // }
true // true
} // }
8 => { // 8 => {
progress!(context, 320); // progress!(context, 320);
if param_autoconfig.is_none() { // if param_autoconfig.is_none() {
let url = format!( // let url = format!(
"https://{}{}/autodiscover/autodiscover.xml", // "https://{}{}/autodiscover/autodiscover.xml",
"autodiscover.", param_domain // "autodiscover.", param_domain
); // );
param_autoconfig = outlk_autodiscover(context, &url, &param).ok(); // param_autoconfig = outlk_autodiscover(context, &url, &param).ok();
} // }
true // true
} // }
/* ----------- Outlook section end */ // /* ----------- Outlook section end */
9 => { // 9 => {
progress!(context, 330); // progress!(context, 330);
if param_autoconfig.is_none() { // if param_autoconfig.is_none() {
let url = format!( // let url = format!(
"http://autoconfig.{}/mail/config-v1.1.xml?emailaddress={}", // "http://autoconfig.{}/mail/config-v1.1.xml?emailaddress={}",
param_domain, param_addr_urlencoded // param_domain, param_addr_urlencoded
); // );
param_autoconfig = moz_autoconfigure(context, &url, &param).ok(); // param_autoconfig = moz_autoconfigure(context, &url, &param).ok();
} // }
true // true
} // }
10 => { // 10 => {
progress!(context, 340); // progress!(context, 340);
if param_autoconfig.is_none() { // if param_autoconfig.is_none() {
// do not transfer the email-address unencrypted // // do not transfer the email-address unencrypted
let url = format!( // let url = format!(
"http://{}/.well-known/autoconfig/mail/config-v1.1.xml", // "http://{}/.well-known/autoconfig/mail/config-v1.1.xml",
param_domain // param_domain
); // );
param_autoconfig = moz_autoconfigure(context, &url, &param).ok(); // param_autoconfig = moz_autoconfigure(context, &url, &param).ok();
} // }
true // true
} // }
/* B. If we have no configuration yet, search configuration in Thunderbird's centeral database */ // /* B. If we have no configuration yet, search configuration in Thunderbird's centeral database */
11 => { // 11 => {
progress!(context, 350); // progress!(context, 350);
if param_autoconfig.is_none() { // if param_autoconfig.is_none() {
/* always SSL for Thunderbird's database */ // /* always SSL for Thunderbird's database */
let url = format!("https://autoconfig.thunderbird.net/v1.1/{}", param_domain); // let url = format!("https://autoconfig.thunderbird.net/v1.1/{}", param_domain);
param_autoconfig = moz_autoconfigure(context, &url, &param).ok(); // param_autoconfig = moz_autoconfigure(context, &url, &param).ok();
} // }
true // true
} // }
/* C. Do we have any autoconfig result? // /* C. Do we have any autoconfig result?
If you change the match-number here, also update STEP_12_COPY_AUTOCONFIG above // If you change the match-number here, also update STEP_12_COPY_AUTOCONFIG above
*/ // */
STEP_12_USE_AUTOCONFIG => { // STEP_12_USE_AUTOCONFIG => {
progress!(context, 500); // progress!(context, 500);
if let Some(ref cfg) = param_autoconfig { // if let Some(ref cfg) = param_autoconfig {
info!(context, "Got autoconfig: {}", &cfg); // info!(context, "Got autoconfig: {}", &cfg);
if !cfg.mail_user.is_empty() { // if !cfg.mail_user.is_empty() {
param.mail_user = cfg.mail_user.clone(); // param.mail_user = cfg.mail_user.clone();
} // }
param.mail_server = cfg.mail_server.clone(); /* all other values are always NULL when entering autoconfig */ // param.mail_server = cfg.mail_server.clone(); /* all other values are always NULL when entering autoconfig */
param.mail_port = cfg.mail_port; // param.mail_port = cfg.mail_port;
param.send_server = cfg.send_server.clone(); // param.send_server = cfg.send_server.clone();
param.send_port = cfg.send_port; // param.send_port = cfg.send_port;
param.send_user = cfg.send_user.clone(); // param.send_user = cfg.send_user.clone();
param.server_flags = cfg.server_flags; // param.server_flags = cfg.server_flags;
/* although param_autoconfig's data are no longer needed from, // /* although param_autoconfig's data are no longer needed from,
it is used to later to prevent trying variations of port/server/logins */ // it is used to later to prevent trying variations of port/server/logins */
} // }
param.server_flags |= keep_flags; // param.server_flags |= keep_flags;
true // true
} // }
// Step 3: Fill missing fields with defaults // // Step 3: Fill missing fields with defaults
// If you change the match-number here, also update STEP_13_AFTER_AUTOCONFIG above // // If you change the match-number here, also update STEP_13_AFTER_AUTOCONFIG above
STEP_13_AFTER_AUTOCONFIG => { // STEP_13_AFTER_AUTOCONFIG => {
if param.mail_server.is_empty() { // if param.mail_server.is_empty() {
param.mail_server = format!("imap.{}", param_domain,) // param.mail_server = format!("imap.{}", param_domain,)
} // }
if param.mail_port == 0 { // if param.mail_port == 0 {
param.mail_port = if 0 != param.server_flags & (0x100 | 0x400) { // param.mail_port = if 0 != param.server_flags & (0x100 | 0x400) {
143 // 143
} else { // } else {
993 // 993
} // }
} // }
if param.mail_user.is_empty() { // if param.mail_user.is_empty() {
param.mail_user = param.addr.clone(); // param.mail_user = param.addr.clone();
} // }
if param.send_server.is_empty() && !param.mail_server.is_empty() { // if param.send_server.is_empty() && !param.mail_server.is_empty() {
param.send_server = param.mail_server.clone(); // param.send_server = param.mail_server.clone();
if param.send_server.starts_with("imap.") { // if param.send_server.starts_with("imap.") {
param.send_server = param.send_server.replacen("imap", "smtp", 1); // param.send_server = param.send_server.replacen("imap", "smtp", 1);
} // }
} // }
if param.send_port == 0 { // if param.send_port == 0 {
param.send_port = if 0 != param.server_flags & DC_LP_SMTP_SOCKET_STARTTLS as i32 // param.send_port = if 0 != param.server_flags & DC_LP_SMTP_SOCKET_STARTTLS as i32
{ // {
587 // 587
} else if 0 != param.server_flags & DC_LP_SMTP_SOCKET_PLAIN as i32 { // } else if 0 != param.server_flags & DC_LP_SMTP_SOCKET_PLAIN as i32 {
25 // 25
} else { // } else {
465 // 465
} // }
} // }
if param.send_user.is_empty() && !param.mail_user.is_empty() { // if param.send_user.is_empty() && !param.mail_user.is_empty() {
param.send_user = param.mail_user.clone(); // param.send_user = param.mail_user.clone();
} // }
if param.send_pw.is_empty() && !param.mail_pw.is_empty() { // if param.send_pw.is_empty() && !param.mail_pw.is_empty() {
param.send_pw = param.mail_pw.clone() // param.send_pw = param.mail_pw.clone()
} // }
if !dc_exactly_one_bit_set(param.server_flags & DC_LP_AUTH_FLAGS as i32) { // if !dc_exactly_one_bit_set(param.server_flags & DC_LP_AUTH_FLAGS as i32) {
param.server_flags &= !(DC_LP_AUTH_FLAGS as i32); // param.server_flags &= !(DC_LP_AUTH_FLAGS as i32);
param.server_flags |= DC_LP_AUTH_NORMAL as i32 // param.server_flags |= DC_LP_AUTH_NORMAL as i32
} // }
if !dc_exactly_one_bit_set(param.server_flags & DC_LP_IMAP_SOCKET_FLAGS as i32) { // if !dc_exactly_one_bit_set(param.server_flags & DC_LP_IMAP_SOCKET_FLAGS as i32) {
param.server_flags &= !(DC_LP_IMAP_SOCKET_FLAGS as i32); // param.server_flags &= !(DC_LP_IMAP_SOCKET_FLAGS as i32);
param.server_flags |= if param.send_port == 143 { // param.server_flags |= if param.send_port == 143 {
DC_LP_IMAP_SOCKET_STARTTLS as i32 // DC_LP_IMAP_SOCKET_STARTTLS as i32
} else { // } else {
DC_LP_IMAP_SOCKET_SSL as i32 // DC_LP_IMAP_SOCKET_SSL as i32
} // }
} // }
if !dc_exactly_one_bit_set(param.server_flags & (DC_LP_SMTP_SOCKET_FLAGS as i32)) { // if !dc_exactly_one_bit_set(param.server_flags & (DC_LP_SMTP_SOCKET_FLAGS as i32)) {
param.server_flags &= !(DC_LP_SMTP_SOCKET_FLAGS as i32); // param.server_flags &= !(DC_LP_SMTP_SOCKET_FLAGS as i32);
param.server_flags |= if param.send_port == 587 { // param.server_flags |= if param.send_port == 587 {
DC_LP_SMTP_SOCKET_STARTTLS as i32 // DC_LP_SMTP_SOCKET_STARTTLS as i32
} else if param.send_port == 25 { // } else if param.send_port == 25 {
DC_LP_SMTP_SOCKET_PLAIN as i32 // DC_LP_SMTP_SOCKET_PLAIN as i32
} else { // } else {
DC_LP_SMTP_SOCKET_SSL as i32 // DC_LP_SMTP_SOCKET_SSL as i32
} // }
} // }
/* do we have a complete configuration? */ // /* do we have a complete configuration? */
if param.mail_server.is_empty() // if param.mail_server.is_empty()
|| param.mail_port == 0 // || param.mail_port == 0
|| param.mail_user.is_empty() // || param.mail_user.is_empty()
|| param.mail_pw.is_empty() // || param.mail_pw.is_empty()
|| param.send_server.is_empty() // || param.send_server.is_empty()
|| param.send_port == 0 // || param.send_port == 0
|| param.send_user.is_empty() // || param.send_user.is_empty()
|| param.send_pw.is_empty() // || param.send_pw.is_empty()
|| param.server_flags == 0 // || param.server_flags == 0
{ // {
error!(context, "Account settings incomplete."); // error!(context, "Account settings incomplete.");
false // false
} else { // } else {
true // true
} // }
} // }
14 => { // 14 => {
progress!(context, 600); // progress!(context, 600);
/* try to connect to IMAP - if we did not got an autoconfig, // /* try to connect to IMAP - if we did not got an autoconfig,
do some further tries with different settings and username variations */ // do some further tries with different settings and username variations */
imap_connected_here = // imap_connected_here =
try_imap_connections(context, &mut param, param_autoconfig.is_some()).await; // try_imap_connections(context, &mut param, param_autoconfig.is_some()).await;
imap_connected_here // imap_connected_here
} // }
15 => { // 15 => {
progress!(context, 800); // progress!(context, 800);
smtp_connected_here = // smtp_connected_here =
try_smtp_connections(context, &mut param, param_autoconfig.is_some()).await; // try_smtp_connections(context, &mut param, param_autoconfig.is_some()).await;
smtp_connected_here // smtp_connected_here
} // }
16 => { // 16 => {
progress!(context, 900); // progress!(context, 900);
let create_mvbox = context.get_config_bool(Config::MvboxWatch).await // let create_mvbox = context.get_config_bool(Config::MvboxWatch).await
|| context.get_config_bool(Config::MvboxMove).await; // || context.get_config_bool(Config::MvboxMove).await;
let imap = &context.inbox_thread.imap; // let imap = &context.inbox_thread.imap;
if let Err(err) = imap.ensure_configured_folders(context, create_mvbox).await { // if let Err(err) = imap.ensure_configured_folders(context, create_mvbox).await {
warn!(context, "configuring folders failed: {:?}", err); // warn!(context, "configuring folders failed: {:?}", err);
false // false
} else { // } else {
let res = imap.select_with_uidvalidity(context, "INBOX").await; // let res = imap.select_with_uidvalidity(context, "INBOX").await;
if let Err(err) = res { // if let Err(err) = res {
error!(context, "could not read INBOX status: {:?}", err); // error!(context, "could not read INBOX status: {:?}", err);
false // false
} else { // } else {
true // true
} // }
} // }
} // }
17 => { // 17 => {
progress!(context, 910); // progress!(context, 910);
/* configuration success - write back the configured parameters with the "configured_" prefix; also write the "configured"-flag */ // /* configuration success - write back the configured parameters with the "configured_" prefix; also write the "configured"-flag */
param // param
.save_to_database( // .save_to_database(
context, // context,
"configured_", /*the trailing underscore is correct*/ // "configured_", /*the trailing underscore is correct*/
) // )
.await // .await
.ok(); // .ok();
context // context
.sql // .sql
.set_raw_config_bool(context, "configured", true) // .set_raw_config_bool(context, "configured", true)
.await // .await
.ok(); // .ok();
true // true
} // }
18 => { // 18 => {
progress!(context, 920); // progress!(context, 920);
// we generate the keypair just now - we could also postpone this until the first message is sent, however, // // we generate the keypair just now - we could also postpone this until the first message is sent, however,
// this may result in a unexpected and annoying delay when the user sends his very first message // // this may result in a unexpected and annoying delay when the user sends his very first message
// (~30 seconds on a Moto G4 play) and might looks as if message sending is always that slow. // // (~30 seconds on a Moto G4 play) and might looks as if message sending is always that slow.
success = e2ee::ensure_secret_key_exists(context).await.is_ok(); // success = e2ee::ensure_secret_key_exists(context).await.is_ok();
info!(context, "key generation completed"); // info!(context, "key generation completed");
progress!(context, 940); // progress!(context, 940);
break; // We are done here // break; // We are done here
} // }
_ => { // _ => {
error!(context, "Internal error: step counter out of bound",); // error!(context, "Internal error: step counter out of bound",);
break; // break;
} // }
}; // };
if !success { // if !success {
break; // break;
} // }
} // }
if imap_connected_here { // if imap_connected_here {
context.inbox_thread.imap.disconnect(context).await; // context.inbox_thread.imap.disconnect(context).await;
} // }
if smtp_connected_here { // if smtp_connected_here {
context.smtp.disconnect().await; // context.smtp.disconnect().await;
} // }
// remember the entered parameters on success // // remember the entered parameters on success
// and restore to last-entered on failure. // // and restore to last-entered on failure.
// this way, the parameters visible to the ui are always in-sync with the current configuration. // // this way, the parameters visible to the ui are always in-sync with the current configuration.
if success { // if success {
LoginParam::from_database(context, "") // LoginParam::from_database(context, "")
.await // .await
.save_to_database(context, "configured_raw_") // .save_to_database(context, "configured_raw_")
.await // .await
.ok(); // .ok();
} else { // } else {
LoginParam::from_database(context, "configured_raw_") // LoginParam::from_database(context, "configured_raw_")
.await // .await
.save_to_database(context, "") // .save_to_database(context, "")
.await // .await
.ok(); // .ok();
} // }
if let Some(provider) = provider::get_provider_info(&param.addr) { // if let Some(provider) = provider::get_provider_info(&param.addr) {
if !provider.after_login_hint.is_empty() { // if !provider.after_login_hint.is_empty() {
let mut msg = Message::new(Viewtype::Text); // let mut msg = Message::new(Viewtype::Text);
msg.text = Some(provider.after_login_hint.to_string()); // msg.text = Some(provider.after_login_hint.to_string());
if chat::add_device_msg(context, Some("core-provider-info"), Some(&mut msg)) // if chat::add_device_msg(context, Some("core-provider-info"), Some(&mut msg))
.await // .await
.is_err() // .is_err()
{ // {
warn!(context, "cannot add after_login_hint as core-provider-info"); // warn!(context, "cannot add after_login_hint as core-provider-info");
} // }
} // }
} // }
context.free_ongoing().await; // context.free_ongoing().await;
progress!(context, if success { 1000 } else { 0 }); // progress!(context, if success { 1000 } else { 0 });
job::Status::Finished(Ok(())) // job::Status::Finished(Ok(()))
} }
#[allow(clippy::unnecessary_unwrap)] #[allow(clippy::unnecessary_unwrap)]
@@ -562,24 +563,25 @@ async fn try_imap_connection(
} }
async fn try_imap_one_param(context: &Context, param: &LoginParam) -> Option<bool> { async fn try_imap_one_param(context: &Context, param: &LoginParam) -> Option<bool> {
let inf = format!( unimplemented!();
"imap: {}@{}:{} flags=0x{:x} certificate_checks={}", // let inf = format!(
param.mail_user, // "imap: {}@{}:{} flags=0x{:x} certificate_checks={}",
param.mail_server, // param.mail_user,
param.mail_port, // param.mail_server,
param.server_flags, // param.mail_port,
param.imap_certificate_checks // param.server_flags,
); // param.imap_certificate_checks
info!(context, "Trying: {}", inf); // );
if context.inbox_thread.imap.connect(context, &param).await { // info!(context, "Trying: {}", inf);
info!(context, "success: {}", inf); // if context.inbox_thread.imap.connect(context, &param).await {
return Some(true); // info!(context, "success: {}", inf);
} // return Some(true);
if context.shall_stop_ongoing().await { // }
return Some(false); // if context.shall_stop_ongoing().await {
} // return Some(false);
info!(context, "Could not connect: {}", inf); // }
None // info!(context, "Could not connect: {}", inf);
// None
} }
async fn try_smtp_connections( async fn try_smtp_connections(
@@ -613,25 +615,26 @@ async fn try_smtp_connections(
} }
async fn try_smtp_one_param(context: &Context, param: &LoginParam) -> Option<bool> { async fn try_smtp_one_param(context: &Context, param: &LoginParam) -> Option<bool> {
let inf = format!( unimplemented!()
"smtp: {}@{}:{} flags: 0x{:x}", // let inf = format!(
param.send_user, param.send_server, param.send_port, param.server_flags // "smtp: {}@{}:{} flags: 0x{:x}",
); // param.send_user, param.send_server, param.send_port, param.server_flags
info!(context, "Trying: {}", inf); // );
match context.smtp.connect(context, &param).await { // info!(context, "Trying: {}", inf);
Ok(()) => { // match context.smtp.connect(context, &param).await {
info!(context, "success: {}", inf); // Ok(()) => {
Some(true) // info!(context, "success: {}", inf);
} // Some(true)
Err(err) => { // }
if context.shall_stop_ongoing().await { // Err(err) => {
Some(false) // if context.shall_stop_ongoing().await {
} else { // Some(false)
warn!(context, "could not connect: {}", err); // } else {
None // warn!(context, "could not connect: {}", err);
} // None
} // }
} // }
// }
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -2,7 +2,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::ffi::OsString; use std::ffi::OsString;
use std::sync::atomic::AtomicBool; use std::ops::Deref;
use async_std::path::{Path, PathBuf}; use async_std::path::{Path, PathBuf};
use async_std::sync::{Arc, Mutex, RwLock}; use async_std::sync::{Arc, Mutex, RwLock};
@@ -22,33 +22,40 @@ use crate::login_param::LoginParam;
use crate::lot::Lot; use crate::lot::Lot;
use crate::message::{self, Message, MessengerMessage, MsgId}; use crate::message::{self, Message, MessengerMessage, MsgId};
use crate::param::Params; use crate::param::Params;
use crate::scheduler::Scheduler;
use crate::smtp::Smtp; use crate::smtp::Smtp;
use crate::sql::Sql; use crate::sql::Sql;
#[derive(DebugStub)] #[derive(Debug)]
pub struct Context { pub struct Context {
/// Database file path pub(crate) inner: Arc<InnerContext>,
dbfile: PathBuf, }
/// Blob directory path
blobdir: PathBuf,
pub sql: Sql,
pub perform_inbox_jobs_needed: AtomicBool,
pub probe_imap_network: AtomicBool,
pub inbox_thread: JobThread,
pub sentbox_thread: JobThread,
pub mvbox_thread: JobThread,
pub smtp: Smtp,
pub oauth2_critical: Arc<Mutex<()>>,
pub os_name: Option<String>,
pub cmdline_sel_chat_id: Arc<RwLock<ChatId>>,
pub(crate) bob: Arc<RwLock<BobStatus>>,
pub last_smeared_timestamp: RwLock<i64>,
pub running_state: Arc<RwLock<RunningState>>,
/// Mutex to avoid generating the key for the user more than once.
pub generating_key_mutex: Mutex<()>,
pub translated_stockstrings: RwLock<HashMap<usize, String>>,
impl Deref for Context {
type Target = InnerContext;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[derive(Debug)]
pub struct InnerContext {
/// Database file path
pub(crate) dbfile: PathBuf,
/// Blob directory path
pub(crate) blobdir: PathBuf,
pub(crate) sql: Sql,
pub(crate) os_name: Option<String>,
pub(crate) bob: RwLock<BobStatus>,
pub(crate) last_smeared_timestamp: RwLock<i64>,
pub(crate) running_state: RwLock<RunningState>,
/// Mutex to avoid generating the key for the user more than once.
pub(crate) generating_key_mutex: Mutex<()>,
pub(crate) translated_stockstrings: RwLock<HashMap<usize, String>>,
pub(crate) logs: SegQueue<Event>, pub(crate) logs: SegQueue<Event>,
pub(crate) scheduler: RwLock<Scheduler>,
} }
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
@@ -97,27 +104,24 @@ impl Context {
"Blobdir does not exist: {}", "Blobdir does not exist: {}",
blobdir.display() blobdir.display()
); );
let ctx = Context {
let inner = InnerContext {
blobdir, blobdir,
dbfile, dbfile,
os_name: Some(os_name), os_name: Some(os_name),
running_state: Arc::new(RwLock::new(Default::default())), running_state: RwLock::new(Default::default()),
sql: Sql::new(), sql: Sql::new(),
smtp: Smtp::new(), bob: RwLock::new(Default::default()),
oauth2_critical: Arc::new(Mutex::new(())),
bob: Arc::new(RwLock::new(Default::default())),
last_smeared_timestamp: RwLock::new(0), last_smeared_timestamp: RwLock::new(0),
cmdline_sel_chat_id: Arc::new(RwLock::new(ChatId::new(0))),
inbox_thread: JobThread::new("INBOX", "configured_inbox_folder", Imap::new()),
sentbox_thread: JobThread::new("SENTBOX", "configured_sentbox_folder", Imap::new()),
mvbox_thread: JobThread::new("MVBOX", "configured_mvbox_folder", Imap::new()),
probe_imap_network: Default::default(),
perform_inbox_jobs_needed: Default::default(),
generating_key_mutex: Mutex::new(()), generating_key_mutex: Mutex::new(()),
translated_stockstrings: RwLock::new(HashMap::new()), translated_stockstrings: RwLock::new(HashMap::new()),
logs: SegQueue::new(), logs: SegQueue::new(),
scheduler: RwLock::new(Scheduler::Stopped),
}; };
let ctx = Context {
inner: Arc::new(inner),
};
ensure!( ensure!(
ctx.sql.open(&ctx, &ctx.dbfile, false).await, ctx.sql.open(&ctx, &ctx.dbfile, false).await,
"Failed opening sqlite database" "Failed opening sqlite database"
@@ -126,6 +130,16 @@ impl Context {
Ok(ctx) Ok(ctx)
} }
pub async fn run(&self) {
self.inner.scheduler.write().await.run().await
}
pub async fn stop(&self) {
if self.inner.scheduler.read().await.is_running() {
self.inner.scheduler.write().await.stop().await;
}
}
/// Returns database file path. /// Returns database file path.
pub fn get_dbfile(&self) -> &Path { pub fn get_dbfile(&self) -> &Path {
self.dbfile.as_path() self.dbfile.as_path()
@@ -160,7 +174,7 @@ impl Context {
false false
} else { } else {
let s_a = self.running_state.clone(); let s_a = &self.running_state;
let mut s = s_a.write().await; let mut s = s_a.write().await;
s.ongoing_running = true; s.ongoing_running = true;
@@ -171,7 +185,7 @@ impl Context {
} }
pub async fn free_ongoing(&self) { pub async fn free_ongoing(&self) {
let s_a = self.running_state.clone(); let s_a = &self.running_state;
let mut s = s_a.write().await; let mut s = s_a.write().await;
s.ongoing_running = false; s.ongoing_running = false;
@@ -179,7 +193,7 @@ impl Context {
} }
pub async fn has_ongoing(&self) -> bool { pub async fn has_ongoing(&self) -> bool {
let s_a = self.running_state.clone(); let s_a = &self.running_state;
let s = s_a.read().await; let s = s_a.read().await;
s.ongoing_running || !s.shall_stop_ongoing s.ongoing_running || !s.shall_stop_ongoing
@@ -187,7 +201,7 @@ impl Context {
/// Signal an ongoing process to stop. /// Signal an ongoing process to stop.
pub async fn stop_ongoing(&self) { pub async fn stop_ongoing(&self) {
let s_a = self.running_state.clone(); let s_a = &self.running_state;
let mut s = s_a.write().await; let mut s = s_a.write().await;
if s.ongoing_running && !s.shall_stop_ongoing { if s.ongoing_running && !s.shall_stop_ongoing {
@@ -199,7 +213,7 @@ impl Context {
} }
pub async fn shall_stop_ongoing(&self) -> bool { pub async fn shall_stop_ongoing(&self) -> bool {
self.running_state.clone().read().await.shall_stop_ongoing self.running_state.read().await.shall_stop_ongoing
} }
/******************************************************************************* /*******************************************************************************
@@ -456,15 +470,7 @@ impl Context {
impl Drop for Context { impl Drop for Context {
fn drop(&mut self) { fn drop(&mut self) {
async_std::task::block_on(async move { async_std::task::block_on(async move {
info!(self, "disconnecting inbox-thread"); self.stop().await;
self.inbox_thread.imap.disconnect(self).await;
info!(self, "disconnecting sentbox-thread");
self.sentbox_thread.imap.disconnect(self).await;
info!(self, "disconnecting mvbox-thread");
self.mvbox_thread.imap.disconnect(self).await;
info!(self, "disconnecting SMTP");
self.smtp.disconnect().await;
self.sql.close(self).await; self.sql.close(self).await;
}); });
} }

View File

@@ -60,12 +60,12 @@ impl Session {
} }
impl Imap { impl Imap {
pub async fn can_idle(&self) -> bool { pub fn can_idle(&self) -> bool {
self.config.read().await.can_idle self.config.can_idle
} }
pub async fn idle(&self, context: &Context, watch_folder: Option<String>) -> Result<()> { pub async fn idle(&mut self, context: &Context, watch_folder: Option<String>) -> Result<()> {
if !self.can_idle().await { if !self.can_idle() {
return Err(Error::IdleAbilityMissing); return Err(Error::IdleAbilityMissing);
} }
@@ -75,7 +75,7 @@ impl Imap {
self.select_folder(context, watch_folder.clone()).await?; self.select_folder(context, watch_folder.clone()).await?;
let session = self.session.lock().await.take(); let session = self.session.take();
let timeout = Duration::from_secs(23 * 60); let timeout = Duration::from_secs(23 * 60);
if let Some(session) = session { if let Some(session) = session {
match session.idle() { match session.idle() {
@@ -87,12 +87,12 @@ impl Imap {
} }
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
*self.interrupt.lock().await = Some(interrupt); self.interrupt = Some(interrupt);
if self.skip_next_idle_wait.load(Ordering::SeqCst) { if self.skip_next_idle_wait {
// interrupt_idle has happened before we // interrupt_idle has happened before we
// provided self.interrupt // provided self.interrupt
self.skip_next_idle_wait.store(false, Ordering::SeqCst); self.skip_next_idle_wait = false;
std::mem::drop(idle_wait); std::mem::drop(idle_wait);
info!(context, "Idle wait was skipped"); info!(context, "Idle wait was skipped");
} else { } else {
@@ -126,7 +126,7 @@ impl Imap {
match res { match res {
Ok(session) => { Ok(session) => {
*self.session.lock().await = Some(Session::Secure(session)); self.session = Some(Session::Secure(session));
} }
Err(err) => { Err(err) => {
// if we cannot terminate IDLE it probably // if we cannot terminate IDLE it probably
@@ -143,12 +143,12 @@ impl Imap {
} }
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
*self.interrupt.lock().await = Some(interrupt); self.interrupt = Some(interrupt);
if self.skip_next_idle_wait.load(Ordering::SeqCst) { if self.skip_next_idle_wait {
// interrupt_idle has happened before we // interrupt_idle has happened before we
// provided self.interrupt // provided self.interrupt
self.skip_next_idle_wait.store(false, Ordering::SeqCst); self.skip_next_idle_wait = false;
std::mem::drop(idle_wait); std::mem::drop(idle_wait);
info!(context, "Idle wait was skipped"); info!(context, "Idle wait was skipped");
} else { } else {
@@ -182,7 +182,7 @@ impl Imap {
match res { match res {
Ok(session) => { Ok(session) => {
*self.session.lock().await = Some(Session::Insecure(session)); self.session = Some(Session::Insecure(session));
} }
Err(err) => { Err(err) => {
// if we cannot terminate IDLE it probably // if we cannot terminate IDLE it probably
@@ -199,7 +199,7 @@ impl Imap {
Ok(()) Ok(())
} }
pub(crate) async fn fake_idle(&self, context: &Context, watch_folder: Option<String>) { pub(crate) async fn fake_idle(&mut self, context: &Context, watch_folder: Option<String>) {
// Idle using polling. This is also needed if we're not yet configured - // Idle using polling. This is also needed if we're not yet configured -
// in this case, we're waiting for a configure job (and an interrupt). // in this case, we're waiting for a configure job (and an interrupt).
@@ -213,11 +213,11 @@ impl Imap {
// TODO: grow sleep durations / make them more flexible // TODO: grow sleep durations / make them more flexible
let interval = async_std::stream::interval(Duration::from_secs(60)); let interval = async_std::stream::interval(Duration::from_secs(60));
let mut interrupt_interval = interrupt.stop_token().stop_stream(interval); let mut interrupt_interval = interrupt.stop_token().stop_stream(interval);
*self.interrupt.lock().await = Some(interrupt); self.interrupt = Some(interrupt);
if self.skip_next_idle_wait.load(Ordering::SeqCst) { if self.skip_next_idle_wait {
// interrupt_idle has happened before we // interrupt_idle has happened before we
// provided self.interrupt // provided self.interrupt
self.skip_next_idle_wait.store(false, Ordering::SeqCst); self.skip_next_idle_wait = false;
info!(context, "fake-idle wait was skipped"); info!(context, "fake-idle wait was skipped");
} else { } else {
// loop until we are interrupted or if we fetched something // loop until we are interrupted or if we fetched something
@@ -229,7 +229,7 @@ impl Imap {
warn!(context, "fake_idle: could not connect: {}", err); warn!(context, "fake_idle: could not connect: {}", err);
continue; continue;
} }
if self.config.read().await.can_idle { if self.config.can_idle {
// we only fake-idled because network was gone during IDLE, probably // we only fake-idled because network was gone during IDLE, probably
break; break;
} }
@@ -255,7 +255,7 @@ impl Imap {
} }
} }
} }
self.interrupt.lock().await.take(); self.interrupt.take();
info!( info!(
context, context,
@@ -268,14 +268,14 @@ impl Imap {
); );
} }
pub async fn interrupt_idle(&self, context: &Context) { pub async fn interrupt_idle(&mut self, context: &Context) {
let mut interrupt: Option<stop_token::StopSource> = self.interrupt.lock().await.take(); let mut interrupt: Option<stop_token::StopSource> = self.interrupt.take();
if interrupt.is_none() { if interrupt.is_none() {
// idle wait is not running, signal it needs to skip // idle wait is not running, signal it needs to skip
self.skip_next_idle_wait.store(true, Ordering::SeqCst); self.skip_next_idle_wait = false;
// meanwhile idle-wait may have produced the StopSource // meanwhile idle-wait may have produced the StopSource
interrupt = self.interrupt.lock().await.take(); interrupt = self.interrupt.take();
} }
// let's manually drop the StopSource // let's manually drop the StopSource
if interrupt.is_some() { if interrupt.is_some() {

View File

@@ -12,7 +12,7 @@ use async_imap::{
types::{Capability, Fetch, Flag, Mailbox, Name, NameAttribute}, types::{Capability, Fetch, Flag, Mailbox, Name, NameAttribute},
}; };
use async_std::prelude::*; use async_std::prelude::*;
use async_std::sync::{Mutex, RwLock}; use async_std::sync::{Mutex, Receiver, RwLock};
use crate::config::*; use crate::config::*;
use crate::constants::*; use crate::constants::*;
@@ -137,14 +137,15 @@ const JUST_UID: &str = "(UID)";
const BODY_FLAGS: &str = "(FLAGS BODY.PEEK[])"; const BODY_FLAGS: &str = "(FLAGS BODY.PEEK[])";
const SELECT_ALL: &str = "1:*"; const SELECT_ALL: &str = "1:*";
#[derive(Debug, Default)] #[derive(Debug)]
pub struct Imap { pub struct Imap {
config: RwLock<ImapConfig>, idle_interrupt: Receiver<()>,
session: Mutex<Option<Session>>, config: ImapConfig,
connected: Mutex<bool>, session: Option<Session>,
interrupt: Mutex<Option<stop_token::StopSource>>, connected: bool,
skip_next_idle_wait: AtomicBool, interrupt: Option<stop_token::StopSource>,
should_reconnect: AtomicBool, skip_next_idle_wait: bool,
should_reconnect: bool,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -212,39 +213,47 @@ impl Default for ImapConfig {
} }
impl Imap { impl Imap {
pub fn new() -> Self { pub fn new(idle_interrupt: Receiver<()>) -> Self {
Default::default() Imap {
idle_interrupt,
config: Default::default(),
session: Default::default(),
connected: Default::default(),
interrupt: Default::default(),
skip_next_idle_wait: Default::default(),
should_reconnect: Default::default(),
}
} }
pub async fn is_connected(&self) -> bool { pub fn is_connected(&self) -> bool {
*self.connected.lock().await self.connected
} }
pub fn should_reconnect(&self) -> bool { pub fn should_reconnect(&self) -> bool {
self.should_reconnect.load(Ordering::Relaxed) self.should_reconnect
} }
pub fn trigger_reconnect(&self) { pub fn trigger_reconnect(&mut self) {
self.should_reconnect.store(true, Ordering::Relaxed) self.should_reconnect = true;
} }
async fn setup_handle_if_needed(&self, context: &Context) -> Result<()> { async fn setup_handle_if_needed(&mut self, context: &Context) -> Result<()> {
if self.config.read().await.imap_server.is_empty() { if self.config.imap_server.is_empty() {
return Err(Error::InTeardown); return Err(Error::InTeardown);
} }
if self.should_reconnect() { if self.should_reconnect() {
self.unsetup_handle(context).await; self.unsetup_handle(context).await;
self.should_reconnect.store(false, Ordering::Relaxed); self.should_reconnect = false;
} else if self.is_connected().await { } else if self.is_connected() {
return Ok(()); return Ok(());
} }
let server_flags = self.config.read().await.server_flags as i32; let server_flags = self.config.server_flags as i32;
let connection_res: ImapResult<Client> = let connection_res: ImapResult<Client> =
if (server_flags & (DC_LP_IMAP_SOCKET_STARTTLS | DC_LP_IMAP_SOCKET_PLAIN)) != 0 { if (server_flags & (DC_LP_IMAP_SOCKET_STARTTLS | DC_LP_IMAP_SOCKET_PLAIN)) != 0 {
let config = self.config.read().await; let config = &mut self.config;
let imap_server: &str = config.imap_server.as_ref(); let imap_server: &str = config.imap_server.as_ref();
let imap_port = config.imap_port; let imap_port = config.imap_port;
@@ -259,7 +268,7 @@ impl Imap {
Err(err) => Err(err), Err(err) => Err(err),
} }
} else { } else {
let config = self.config.read().await; let config = &self.config;
let imap_server: &str = config.imap_server.as_ref(); let imap_server: &str = config.imap_server.as_ref();
let imap_port = config.imap_port; let imap_port = config.imap_port;
@@ -273,7 +282,7 @@ impl Imap {
let login_res = match connection_res { let login_res = match connection_res {
Ok(client) => { Ok(client) => {
let config = self.config.read().await; let config = &self.config;
let imap_user: &str = config.imap_user.as_ref(); let imap_user: &str = config.imap_user.as_ref();
let imap_pw: &str = config.imap_pw.as_ref(); let imap_pw: &str = config.imap_pw.as_ref();
@@ -297,7 +306,7 @@ impl Imap {
} }
Err(err) => { Err(err) => {
let message = { let message = {
let config = self.config.read().await; let config = &self.config;
let imap_server: &str = config.imap_server.as_ref(); let imap_server: &str = config.imap_server.as_ref();
let imap_port = config.imap_port; let imap_port = config.imap_port;
context context
@@ -314,15 +323,15 @@ impl Imap {
} }
}; };
self.should_reconnect.store(false, Ordering::Relaxed); self.should_reconnect = false;
match login_res { match login_res {
Ok(session) => { Ok(session) => {
*self.session.lock().await = Some(session); self.session = Some(session);
Ok(()) Ok(())
} }
Err((err, _)) => { Err((err, _)) => {
let imap_user = self.config.read().await.imap_user.to_owned(); let imap_user = self.config.imap_user.to_owned();
let message = context let message = context
.stock_string_repl_str(StockMessage::CannotLogin, &imap_user) .stock_string_repl_str(StockMessage::CannotLogin, &imap_user)
.await; .await;
@@ -337,26 +346,23 @@ impl Imap {
} }
} }
async fn unsetup_handle(&self, context: &Context) { async fn unsetup_handle(&mut self, context: &Context) {
info!( info!(context, "IMAP unsetup_handle step 2");
context, if let Some(mut session) = self.session.take() {
"IMAP unsetup_handle step 2 (acquiring session.lock)"
);
if let Some(mut session) = self.session.lock().await.take() {
if let Err(err) = session.close().await { if let Err(err) = session.close().await {
warn!(context, "failed to close connection: {:?}", err); warn!(context, "failed to close connection: {:?}", err);
} }
} }
*self.connected.lock().await = false; self.connected = false;
info!(context, "IMAP unsetup_handle step 3 (clearing config)."); info!(context, "IMAP unsetup_handle step 3 (clearing config).");
self.config.write().await.selected_folder = None; self.config.selected_folder = None;
self.config.write().await.selected_mailbox = None; self.config.selected_mailbox = None;
info!(context, "IMAP unsetup_handle step 4 (disconnected)"); info!(context, "IMAP unsetup_handle step 4 (disconnected)");
} }
async fn free_connect_params(&self) { async fn free_connect_params(&mut self) {
let mut cfg = self.config.write().await; let mut cfg = &mut self.config;
cfg.addr = "".into(); cfg.addr = "".into();
cfg.imap_server = "".into(); cfg.imap_server = "".into();
@@ -369,8 +375,8 @@ impl Imap {
} }
/// Connects to imap account using already-configured parameters. /// Connects to imap account using already-configured parameters.
pub async fn connect_configured(&self, context: &Context) -> Result<()> { pub async fn connect_configured(&mut self, context: &Context) -> Result<()> {
if self.is_connected().await && !self.should_reconnect() { if self.is_connected() && !self.should_reconnect() {
return Ok(()); return Ok(());
} }
if !context.sql.get_raw_config_bool(context, "configured").await { if !context.sql.get_raw_config_bool(context, "configured").await {
@@ -389,7 +395,7 @@ impl Imap {
/// tries connecting to imap account using the specific login /// tries connecting to imap account using the specific login
/// parameters /// parameters
pub async fn connect(&self, context: &Context, lp: &LoginParam) -> bool { pub async fn connect(&mut self, context: &Context, lp: &LoginParam) -> bool {
if lp.mail_server.is_empty() || lp.mail_user.is_empty() || lp.mail_pw.is_empty() { if lp.mail_server.is_empty() || lp.mail_user.is_empty() || lp.mail_pw.is_empty() {
return false; return false;
} }
@@ -402,7 +408,7 @@ impl Imap {
let imap_pw = &lp.mail_pw; let imap_pw = &lp.mail_pw;
let server_flags = lp.server_flags as usize; let server_flags = lp.server_flags as usize;
let mut config = self.config.write().await; let mut config = &mut self.config;
config.addr = addr.to_string(); config.addr = addr.to_string();
config.imap_server = imap_server.to_string(); config.imap_server = imap_server.to_string();
config.imap_port = imap_port; config.imap_port = imap_port;
@@ -418,7 +424,7 @@ impl Imap {
return false; return false;
} }
let teardown = match &mut *self.session.lock().await { let teardown = match &mut self.session {
Some(ref mut session) => match session.capabilities().await { Some(ref mut session) => match session.capabilities().await {
Ok(caps) => { Ok(caps) => {
if !context.sql.is_open().await { if !context.sql.is_open().await {
@@ -435,9 +441,9 @@ impl Imap {
} }
}); });
self.config.write().await.can_idle = can_idle; self.config.can_idle = can_idle;
self.config.write().await.can_move = can_move; self.config.can_move = can_move;
*self.connected.lock().await = true; self.connected = true;
emit_event!( emit_event!(
context, context,
Event::ImapConnected(format!( Event::ImapConnected(format!(
@@ -465,12 +471,12 @@ impl Imap {
} }
} }
pub async fn disconnect(&self, context: &Context) { pub async fn disconnect(&mut self, context: &Context) {
self.unsetup_handle(context).await; self.unsetup_handle(context).await;
self.free_connect_params().await; self.free_connect_params().await;
} }
pub async fn fetch(&self, context: &Context, watch_folder: &str) -> Result<()> { pub async fn fetch(&mut self, context: &Context, watch_folder: &str) -> Result<()> {
if !context.sql.is_open().await { if !context.sql.is_open().await {
// probably shutdown // probably shutdown
return Err(Error::InTeardown); return Err(Error::InTeardown);
@@ -511,7 +517,7 @@ impl Imap {
/// return Result with (uid_validity, last_seen_uid) tuple. /// return Result with (uid_validity, last_seen_uid) tuple.
pub(crate) async fn select_with_uidvalidity( pub(crate) async fn select_with_uidvalidity(
&self, &mut self,
context: &Context, context: &Context,
folder: &str, folder: &str,
) -> Result<(u32, u32)> { ) -> Result<(u32, u32)> {
@@ -520,7 +526,7 @@ impl Imap {
// compare last seen UIDVALIDITY against the current one // compare last seen UIDVALIDITY against the current one
let (uid_validity, last_seen_uid) = self.get_config_last_seen_uid(context, &folder).await; let (uid_validity, last_seen_uid) = self.get_config_last_seen_uid(context, &folder).await;
let config = self.config.read().await; let config = &mut self.config;
let mailbox = config let mailbox = config
.selected_mailbox .selected_mailbox
.as_ref() .as_ref()
@@ -561,7 +567,7 @@ impl Imap {
context, context,
"IMAP folder has no uid_next, fall back to fetching" "IMAP folder has no uid_next, fall back to fetching"
); );
if let Some(ref mut session) = &mut *self.session.lock().await { if let Some(ref mut session) = &mut self.session {
// note that we use fetch by sequence number // note that we use fetch by sequence number
// and thus we only need to get exactly the // and thus we only need to get exactly the
// last-index message. // last-index message.
@@ -598,7 +604,7 @@ impl Imap {
} }
async fn fetch_new_messages<S: AsRef<str>>( async fn fetch_new_messages<S: AsRef<str>>(
&self, &mut self,
context: &Context, context: &Context,
folder: S, folder: S,
) -> Result<bool> { ) -> Result<bool> {
@@ -613,9 +619,10 @@ impl Imap {
// prefetch info from all unfetched mails // prefetch info from all unfetched mails
let mut new_last_seen_uid = last_seen_uid; let mut new_last_seen_uid = last_seen_uid;
let mut read_errors = 0; let mut read_errors: usize = 0;
if let Some(ref mut session) = &mut *self.session.lock().await { let mut uids = Vec::new();
if let Some(ref mut session) = &mut self.session {
// fetch messages with larger UID than the last one seen // fetch messages with larger UID than the last one seen
// `(UID FETCH lastseenuid+1:*)`, see RFC 4549 // `(UID FETCH lastseenuid+1:*)`, see RFC 4549
let set = format!("{}:*", last_seen_uid + 1); let set = format!("{}:*", last_seen_uid + 1);
@@ -646,9 +653,9 @@ impl Imap {
continue; continue;
} }
read_cnt += 1; read_cnt += 1;
let headers = get_fetch_headers(&fetch)?; let headers = get_fetch_headers(&fetch)?;
let message_id = prefetch_get_message_id(&headers).unwrap_or_default(); let message_id = prefetch_get_message_id(&headers).unwrap_or_default();
if precheck_imf(context, &message_id, folder.as_ref(), cur_uid).await { if precheck_imf(context, &message_id, folder.as_ref(), cur_uid).await {
// we know the message-id already or don't want the message otherwise. // we know the message-id already or don't want the message otherwise.
info!( info!(
@@ -675,26 +682,31 @@ impl Imap {
); );
} else { } else {
// check passed, go fetch the rest // check passed, go fetch the rest
if let Err(err) = self.fetch_single_msg(context, &folder, cur_uid).await { uids.push((cur_uid, message_id));
info!(
context,
"Read error for message {} from \"{}\", trying over later: {}.",
message_id,
folder.as_ref(),
err
);
read_errors += 1;
}
} }
} }
if read_errors == 0 {
new_last_seen_uid = cur_uid;
}
} }
} else { } else {
return Err(Error::NoConnection); return Err(Error::NoConnection);
}; };
for (cur_uid, message_id) in uids.into_iter() {
if let Err(err) = self.fetch_single_msg(context, &folder, cur_uid).await {
info!(
context,
"Read error for message {} from \"{}\", trying over later: {}.",
message_id,
folder.as_ref(),
err
);
read_errors += 1;
}
if read_errors == 0 {
new_last_seen_uid = cur_uid;
}
}
if new_last_seen_uid > last_seen_uid { if new_last_seen_uid > last_seen_uid {
self.set_config_last_seen_uid(context, &folder, uid_validity, new_last_seen_uid) self.set_config_last_seen_uid(context, &folder, uid_validity, new_last_seen_uid)
.await; .await;
@@ -743,25 +755,24 @@ impl Imap {
/// if no database entries are created. If the function returns an /// if no database entries are created. If the function returns an
/// error, the caller should try again later. /// error, the caller should try again later.
async fn fetch_single_msg<S: AsRef<str>>( async fn fetch_single_msg<S: AsRef<str>>(
&self, &mut self,
context: &Context, context: &Context,
folder: S, folder: S,
server_uid: u32, server_uid: u32,
) -> Result<()> { ) -> Result<()> {
if !self.is_connected().await { if !self.is_connected() {
return Err(Error::Other("Not connected".to_string())); return Err(Error::Other("Not connected".to_string()));
} }
let set = format!("{}", server_uid); let set = format!("{}", server_uid);
let mut session_lock = self.session.lock().await; let mut msgs = if let Some(ref mut session) = &mut self.session {
let mut msgs = if let Some(ref mut session) = &mut *session_lock {
match session.uid_fetch(set, BODY_FLAGS).await { match session.uid_fetch(set, BODY_FLAGS).await {
Ok(msgs) => msgs, Ok(msgs) => msgs,
Err(err) => { Err(err) => {
// TODO maybe differentiate between IO and input/parsing problems // TODO maybe differentiate between IO and input/parsing problems
// so we don't reconnect if we have a (rare) input/output parsing problem? // so we don't reconnect if we have a (rare) input/output parsing problem?
self.trigger_reconnect(); self.should_reconnect = true;
warn!( warn!(
context, context,
"Error on fetching message #{} from folder \"{}\"; error={}.", "Error on fetching message #{} from folder \"{}\"; error={}.",
@@ -810,11 +821,11 @@ impl Imap {
} }
pub async fn can_move(&self) -> bool { pub async fn can_move(&self) -> bool {
self.config.read().await.can_move self.config.can_move
} }
pub async fn mv( pub async fn mv(
&self, &mut self,
context: &Context, context: &Context,
folder: &str, folder: &str,
uid: u32, uid: u32,
@@ -843,7 +854,7 @@ impl Imap {
let display_folder_id = format!("{}/{}", folder, uid); let display_folder_id = format!("{}/{}", folder, uid);
if self.can_move().await { if self.can_move().await {
if let Some(ref mut session) = &mut *self.session.lock().await { if let Some(ref mut session) = &mut self.session {
match session.uid_mv(&set, &dest_folder).await { match session.uid_mv(&set, &dest_folder).await {
Ok(_) => { Ok(_) => {
emit_event!( emit_event!(
@@ -879,7 +890,7 @@ impl Imap {
); );
} }
if let Some(ref mut session) = &mut *self.session.lock().await { if let Some(ref mut session) = &mut self.session {
if let Err(err) = session.uid_copy(&set, &dest_folder).await { if let Err(err) = session.uid_copy(&set, &dest_folder).await {
warn!(context, "Could not copy message: {}", err); warn!(context, "Could not copy message: {}", err);
return ImapActionResult::Failed; return ImapActionResult::Failed;
@@ -899,7 +910,7 @@ impl Imap {
); );
ImapActionResult::Failed ImapActionResult::Failed
} else { } else {
self.config.write().await.selected_folder_needs_expunge = true; self.config.selected_folder_needs_expunge = true;
emit_event!( emit_event!(
context, context,
Event::ImapMessageMoved(format!( Event::ImapMessageMoved(format!(
@@ -911,7 +922,7 @@ impl Imap {
} }
} }
async fn add_flag_finalized(&self, context: &Context, server_uid: u32, flag: &str) -> bool { async fn add_flag_finalized(&mut self, context: &Context, server_uid: u32, flag: &str) -> bool {
// return true if we successfully set the flag or we otherwise // return true if we successfully set the flag or we otherwise
// think add_flag should not be retried: Disconnection during setting // think add_flag should not be retried: Disconnection during setting
// the flag, or other imap-errors, returns true as well. // the flag, or other imap-errors, returns true as well.
@@ -925,7 +936,7 @@ impl Imap {
} }
async fn add_flag_finalized_with_set( async fn add_flag_finalized_with_set(
&self, &mut self,
context: &Context, context: &Context,
uid_set: &str, uid_set: &str,
flag: &str, flag: &str,
@@ -933,7 +944,7 @@ impl Imap {
if self.should_reconnect() { if self.should_reconnect() {
return false; return false;
} }
if let Some(ref mut session) = &mut *self.session.lock().await { if let Some(ref mut session) = &mut self.session {
let query = format!("+FLAGS ({})", flag); let query = format!("+FLAGS ({})", flag);
match session.uid_store(uid_set, &query).await { match session.uid_store(uid_set, &query).await {
Ok(_) => {} Ok(_) => {}
@@ -951,7 +962,7 @@ impl Imap {
} }
pub async fn prepare_imap_operation_on_msg( pub async fn prepare_imap_operation_on_msg(
&self, &mut self,
context: &Context, context: &Context,
folder: &str, folder: &str,
uid: u32, uid: u32,
@@ -959,7 +970,7 @@ impl Imap {
if uid == 0 { if uid == 0 {
return Some(ImapActionResult::Failed); return Some(ImapActionResult::Failed);
} }
if !self.is_connected().await { if !self.is_connected() {
// currently jobs are only performed on the INBOX thread // currently jobs are only performed on the INBOX thread
// TODO: make INBOX/SENT/MVBOX perform the jobs on their // TODO: make INBOX/SENT/MVBOX perform the jobs on their
// respective folders to avoid select_folder network traffic // respective folders to avoid select_folder network traffic
@@ -990,7 +1001,12 @@ impl Imap {
} }
} }
pub async fn set_seen(&self, context: &Context, folder: &str, uid: u32) -> ImapActionResult { pub async fn set_seen(
&mut self,
context: &Context,
folder: &str,
uid: u32,
) -> ImapActionResult {
if let Some(imapresult) = self if let Some(imapresult) = self
.prepare_imap_operation_on_msg(context, folder, uid) .prepare_imap_operation_on_msg(context, folder, uid)
.await .await
@@ -1012,7 +1028,7 @@ impl Imap {
} }
pub async fn delete_msg( pub async fn delete_msg(
&self, &mut self,
context: &Context, context: &Context,
message_id: &str, message_id: &str,
folder: &str, folder: &str,
@@ -1031,7 +1047,7 @@ impl Imap {
// double-check that we are deleting the correct message-id // double-check that we are deleting the correct message-id
// this comes at the expense of another imap query // this comes at the expense of another imap query
if let Some(ref mut session) = &mut *self.session.lock().await { if let Some(ref mut session) = &mut self.session {
match session.uid_fetch(set, DELETE_CHECK_FLAGS).await { match session.uid_fetch(set, DELETE_CHECK_FLAGS).await {
Ok(mut msgs) => { Ok(mut msgs) => {
let fetch = if let Some(Ok(fetch)) = msgs.next().await { let fetch = if let Some(Ok(fetch)) = msgs.next().await {
@@ -1086,13 +1102,13 @@ impl Imap {
display_imap_id, message_id display_imap_id, message_id
)) ))
); );
self.config.write().await.selected_folder_needs_expunge = true; self.config.selected_folder_needs_expunge = true;
ImapActionResult::Success ImapActionResult::Success
} }
} }
pub async fn ensure_configured_folders( pub async fn ensure_configured_folders(
&self, &mut self,
context: &Context, context: &Context,
create_mvbox: bool, create_mvbox: bool,
) -> Result<()> { ) -> Result<()> {
@@ -1106,12 +1122,12 @@ impl Imap {
return Ok(()); return Ok(());
} }
if !self.is_connected().await { if !self.is_connected() {
return Err(Error::NoConnection); return Err(Error::NoConnection);
} }
info!(context, "Configuring IMAP-folders."); info!(context, "Configuring IMAP-folders.");
if let Some(ref mut session) = &mut *self.session.lock().await { if let Some(ref mut session) = &mut self.session {
let mut folders = match session.list(Some(""), Some("*")).await { let mut folders = match session.list(Some(""), Some("*")).await {
Ok(f) => f, Ok(f) => f,
Err(err) => { Err(err) => {
@@ -1121,7 +1137,7 @@ impl Imap {
let mut sentbox_folder = None; let mut sentbox_folder = None;
let mut mvbox_folder = None; let mut mvbox_folder = None;
let delimiter = self.config.read().await.imap_delimiter; let delimiter = self.config.imap_delimiter;
let fallback_folder = format!("INBOX{}DeltaChat", delimiter); let fallback_folder = format!("INBOX{}DeltaChat", delimiter);
while let Some(folder) = folders.next().await { while let Some(folder) = folders.next().await {
@@ -1231,7 +1247,7 @@ impl Imap {
// } // }
// } // }
pub async fn empty_folder(&self, context: &Context, folder: &str) { pub async fn empty_folder(&mut self, context: &Context, folder: &str) {
info!(context, "emptying folder {}", folder); info!(context, "emptying folder {}", folder);
// we want to report all error to the user // we want to report all error to the user
@@ -1261,7 +1277,7 @@ impl Imap {
} }
// we now trigger expunge to actually delete messages // we now trigger expunge to actually delete messages
self.config.write().await.selected_folder_needs_expunge = true; self.config.selected_folder_needs_expunge = true;
match self.select_folder::<String>(context, None).await { match self.select_folder::<String>(context, None).await {
Ok(()) => { Ok(()) => {
emit_event!(context, Event::ImapFolderEmptied(folder.to_string())); emit_event!(context, Event::ImapFolderEmptied(folder.to_string()));

View File

@@ -26,14 +26,13 @@ impl Imap {
/// select a folder, possibly update uid_validity and, if needed, /// select a folder, possibly update uid_validity and, if needed,
/// expunge the folder to remove delete-marked messages. /// expunge the folder to remove delete-marked messages.
pub(super) async fn select_folder<S: AsRef<str>>( pub(super) async fn select_folder<S: AsRef<str>>(
&self, &mut self,
context: &Context, context: &Context,
folder: Option<S>, folder: Option<S>,
) -> Result<()> { ) -> Result<()> {
if self.session.lock().await.is_none() { if self.session.is_none() {
let mut cfg = self.config.write().await; self.config.selected_folder = None;
cfg.selected_folder = None; self.config.selected_folder_needs_expunge = false;
cfg.selected_folder_needs_expunge = false;
self.trigger_reconnect(); self.trigger_reconnect();
return Err(Error::NoSession); return Err(Error::NoSession);
} }
@@ -41,7 +40,7 @@ impl Imap {
// if there is a new folder and the new folder is equal to the selected one, there's nothing to do. // if there is a new folder and the new folder is equal to the selected one, there's nothing to do.
// if there is _no_ new folder, we continue as we might want to expunge below. // if there is _no_ new folder, we continue as we might want to expunge below.
if let Some(ref folder) = folder { if let Some(ref folder) = folder {
if let Some(ref selected_folder) = self.config.read().await.selected_folder { if let Some(ref selected_folder) = self.config.selected_folder {
if folder.as_ref() == selected_folder { if folder.as_ref() == selected_folder {
return Ok(()); return Ok(());
} }
@@ -49,14 +48,14 @@ impl Imap {
} }
// deselect existing folder, if needed (it's also done implicitly by SELECT, however, without EXPUNGE then) // deselect existing folder, if needed (it's also done implicitly by SELECT, however, without EXPUNGE then)
let needs_expunge = { self.config.read().await.selected_folder_needs_expunge }; let needs_expunge = { self.config.selected_folder_needs_expunge };
if needs_expunge { if needs_expunge {
if let Some(ref folder) = self.config.read().await.selected_folder { if let Some(ref folder) = self.config.selected_folder {
info!(context, "Expunge messages in \"{}\".", folder); info!(context, "Expunge messages in \"{}\".", folder);
// A CLOSE-SELECT is considerably faster than an EXPUNGE-SELECT, see // A CLOSE-SELECT is considerably faster than an EXPUNGE-SELECT, see
// https://tools.ietf.org/html/rfc3501#section-6.4.2 // https://tools.ietf.org/html/rfc3501#section-6.4.2
if let Some(ref mut session) = &mut *self.session.lock().await { if let Some(ref mut session) = &mut self.session {
match session.close().await { match session.close().await {
Ok(_) => { Ok(_) => {
info!(context, "close/expunge succeeded"); info!(context, "close/expunge succeeded");
@@ -70,12 +69,12 @@ impl Imap {
return Err(Error::NoSession); return Err(Error::NoSession);
} }
} }
self.config.write().await.selected_folder_needs_expunge = false; self.config.selected_folder_needs_expunge = false;
} }
// select new folder // select new folder
if let Some(ref folder) = folder { if let Some(ref folder) = folder {
if let Some(ref mut session) = &mut *self.session.lock().await { if let Some(ref mut session) = &mut self.session {
let res = session.select(folder).await; let res = session.select(folder).await;
// https://tools.ietf.org/html/rfc3501#section-6.3.1 // https://tools.ietf.org/html/rfc3501#section-6.3.1
@@ -84,21 +83,20 @@ impl Imap {
match res { match res {
Ok(mailbox) => { Ok(mailbox) => {
let mut config = self.config.write().await; self.config.selected_folder = Some(folder.as_ref().to_string());
config.selected_folder = Some(folder.as_ref().to_string()); self.config.selected_mailbox = Some(mailbox);
config.selected_mailbox = Some(mailbox);
Ok(()) Ok(())
} }
Err(async_imap::error::Error::ConnectionLost) => { Err(async_imap::error::Error::ConnectionLost) => {
self.trigger_reconnect(); self.trigger_reconnect();
self.config.write().await.selected_folder = None; self.config.selected_folder = None;
Err(Error::ConnectionLost) Err(Error::ConnectionLost)
} }
Err(async_imap::error::Error::Validate(_)) => { Err(async_imap::error::Error::Validate(_)) => {
Err(Error::BadFolderName(folder.as_ref().to_string())) Err(Error::BadFolderName(folder.as_ref().to_string()))
} }
Err(err) => { Err(err) => {
self.config.write().await.selected_folder = None; self.config.selected_folder = None;
self.trigger_reconnect(); self.trigger_reconnect();
Err(Error::Other(err.to_string())) Err(Error::Other(err.to_string()))
} }

1005
src/job.rs

File diff suppressed because it is too large Load Diff

View File

@@ -35,7 +35,7 @@ impl JobThread {
} }
} }
pub async fn suspend(&self, context: &Context) { pub async fn suspend(&mut self, context: &Context) {
info!(context, "Suspending {}-thread.", self.name,); info!(context, "Suspending {}-thread.", self.name,);
{ {
self.state.lock().await.suspended = true; self.state.lock().await.suspended = true;
@@ -62,7 +62,7 @@ impl JobThread {
self.notify_sender.send(()).await; self.notify_sender.send(()).await;
} }
pub async fn try_interrupt_idle(&self, context: &Context) -> bool { pub async fn try_interrupt_idle(&mut self, context: &Context) -> bool {
if self.state.lock().await.using_handle { if self.state.lock().await.using_handle {
self.interrupt_idle(context).await; self.interrupt_idle(context).await;
return true; return true;
@@ -71,7 +71,7 @@ impl JobThread {
false false
} }
pub async fn interrupt_idle(&self, context: &Context) { pub async fn interrupt_idle(&mut self, context: &Context) {
{ {
self.state.lock().await.jobs_needed = true; self.state.lock().await.jobs_needed = true;
} }
@@ -85,7 +85,7 @@ impl JobThread {
info!(context, "Interrupting {}-IDLE... finished", self.name); info!(context, "Interrupting {}-IDLE... finished", self.name);
} }
pub async fn fetch(&self, context: &Context, use_network: bool) { pub async fn fetch(&mut self, context: &Context, use_network: bool) {
{ {
let lock = &*self.state.clone(); let lock = &*self.state.clone();
let mut state = lock.lock().await; let mut state = lock.lock().await;
@@ -111,7 +111,7 @@ impl JobThread {
} }
} }
async fn connect_and_fetch(&self, context: &Context) -> Result<()> { async fn connect_and_fetch(&mut self, context: &Context) -> Result<()> {
let prefix = format!("{}-fetch", self.name); let prefix = format!("{}-fetch", self.name);
match self.imap.connect_configured(context).await { match self.imap.connect_configured(context).await {
Ok(()) => { Ok(()) => {
@@ -153,7 +153,7 @@ impl JobThread {
} }
} }
pub async fn idle(&self, context: &Context, use_network: bool) { pub async fn idle(&mut self, context: &Context, use_network: bool) {
{ {
let lock = &*self.state.clone(); let lock = &*self.state.clone();
let mut state = lock.lock().await; let mut state = lock.lock().await;
@@ -185,7 +185,7 @@ impl JobThread {
let prefix = format!("{}-IDLE", self.name); let prefix = format!("{}-IDLE", self.name);
let do_fake_idle = match self.imap.connect_configured(context).await { let do_fake_idle = match self.imap.connect_configured(context).await {
Ok(()) => { Ok(()) => {
if !self.imap.can_idle().await { if !self.imap.can_idle() {
true // we have to do fake_idle true // we have to do fake_idle
} else { } else {
let watch_folder = self.get_watch_folder(context).await; let watch_folder = self.get_watch_folder(context).await;

View File

@@ -51,6 +51,7 @@ pub mod context;
mod e2ee; mod e2ee;
mod imap; mod imap;
pub mod imex; pub mod imex;
mod scheduler;
#[macro_use] #[macro_use]
pub mod job; pub mod job;
mod job_thread; mod job_thread;

View File

@@ -84,8 +84,9 @@ pub async fn dc_get_oauth2_access_token(
regenerate: bool, regenerate: bool,
) -> Option<String> { ) -> Option<String> {
if let Some(oauth2) = Oauth2::from_address(addr) { if let Some(oauth2) = Oauth2::from_address(addr) {
let lock = context.oauth2_critical.clone(); // TODO: FIXME
let _l = lock.lock().await; // let lock = context.oauth2_critical.clone();
// let _l = lock.lock().await;
// read generated token // read generated token
if !regenerate && !is_expired(context).await { if !regenerate && !is_expired(context).await {

237
src/scheduler.rs Normal file
View File

@@ -0,0 +1,237 @@
use async_std::prelude::*;
use async_std::sync::{channel, Receiver, Sender};
const MAX_JOBS_WAITING: usize = 50;
use crate::imap::Imap;
use crate::smtp::Smtp;
/// Job and connection scheduler.
#[derive(Debug)]
pub(crate) enum Scheduler {
Stopped,
Running {
inbox: ImapConnectionState<InboxJob>,
mvbox: ImapConnectionState<MvboxJob>,
sentbox: ImapConnectionState<SentboxJob>,
smtp: SmtpConnectionState,
},
}
impl Scheduler {
/// Start the scheduler, panics if it is already running.
pub async fn run(&mut self) {
match self {
Scheduler::Stopped => {
let (
(
((inbox, inbox_handlers), (mvbox, mvbox_handlers)),
(sentbox, sentbox_handlers),
),
(smtp, smtp_handlers),
) = ImapConnectionState::new()
.join(ImapConnectionState::new())
.join(ImapConnectionState::new())
.join(SmtpConnectionState::new())
.await;
*self = Scheduler::Running {
inbox,
mvbox,
sentbox,
smtp,
};
}
Scheduler::Running { .. } => {
// TODO: return an error
panic!("WARN: already running");
}
}
}
/// Halt the scheduler, panics if it is already stopped.
pub async fn stop(&mut self) {
match self {
Scheduler::Stopped => {
panic!("WARN: already stopped");
}
Scheduler::Running {
inbox,
mvbox,
sentbox,
smtp,
} => {
inbox
.stop()
.join(mvbox.stop())
.join(sentbox.stop())
.join(smtp.stop())
.await;
}
}
}
/// Check if the scheduler is running.
pub fn is_running(&self) -> bool {
match self {
Scheduler::Running { .. } => true,
_ => false,
}
}
/// Check if the scheduler is stoppd.
pub fn is_stopped(&self) -> bool {
match self {
Scheduler::Stopped => true,
_ => false,
}
}
}
/// Connection state logic shared between imap and smtp connections.
#[derive(Debug)]
struct ConnectionState<T> {
/// Channel to notify that shutdown has completed.
shutdown_receiver: Receiver<()>,
/// Channel to interrupt the whole connection.
stop_sender: Sender<()>,
/// Channel to receive new jobs.
jobs_receiver: Receiver<T>,
/// Channel to schedule new jobs.
jobs_sender: Sender<T>,
}
impl<T> ConnectionState<T> {
/// Send a new job.
pub async fn send_job(&self, job: T) {
self.jobs_sender.send(job).await;
}
/// Shutdown this connection completely.
pub async fn stop(&self) {
// Trigger shutdown of the run loop.
self.stop_sender.send(()).await;
// Wait for a notification that the run loop has been shutdown.
self.shutdown_receiver.recv().await;
}
}
#[derive(Debug)]
pub(crate) struct SmtpConnectionState {
state: ConnectionState<SmtpJob>,
}
impl SmtpConnectionState {
async fn new() -> (Self, SmtpConnectionHandlers) {
let (jobs_sender, jobs_receiver) = channel(50);
let (stop_sender, stop_receiver) = channel(1);
let (shutdown_sender, shutdown_receiver) = channel(1);
let handlers = SmtpConnectionHandlers {
connection: Smtp::new(),
stop_receiver,
shutdown_sender,
};
let state = ConnectionState {
shutdown_receiver,
stop_sender,
jobs_sender,
jobs_receiver,
};
let conn = SmtpConnectionState { state };
(conn, handlers)
}
/// Send a new job.
async fn send_job(&self, job: SmtpJob) {
self.state.send_job(job).await;
}
/// Shutdown this connection completely.
async fn stop(&self) {
self.state.stop().await;
}
}
#[derive(Debug)]
struct SmtpConnectionHandlers {
connection: Smtp,
stop_receiver: Receiver<()>,
shutdown_sender: Sender<()>,
}
#[derive(Debug)]
pub(crate) struct ImapConnectionState<T> {
/// Channel to interrupt idle.
idle_interrupt_sender: Sender<()>,
state: ConnectionState<T>,
}
impl<T> ImapConnectionState<T> {
/// Construct a new connection.
async fn new() -> (Self, ImapConnectionHandlers) {
let (jobs_sender, jobs_receiver) = channel(MAX_JOBS_WAITING);
let (stop_sender, stop_receiver) = channel(1);
let (idle_interrupt_sender, idle_interrupt_receiver) = channel(1);
let (shutdown_sender, shutdown_receiver) = channel(1);
let handlers = ImapConnectionHandlers {
connection: Imap::new(idle_interrupt_receiver),
stop_receiver,
shutdown_sender,
};
let state = ConnectionState {
shutdown_receiver,
stop_sender,
jobs_sender,
jobs_receiver,
};
let conn = ImapConnectionState {
idle_interrupt_sender,
state,
};
(conn, handlers)
}
/// Send a new job.
async fn send_job(&self, job: T) {
self.state
.send_job(job)
.join(self.idle_interrupt_sender.send(()))
.await;
}
/// Shutdown this connection completely.
async fn stop(&self) {
self.state.stop().await;
}
}
#[derive(Debug)]
struct ImapConnectionHandlers {
connection: Imap,
stop_receiver: Receiver<()>,
shutdown_sender: Sender<()>,
}
/// Jobs handled by the inbox connection.
#[derive(Debug)]
pub enum InboxJob {}
/// Jobs handled by the mvbox connection.
#[derive(Debug)]
pub enum MvboxJob {}
/// Jobs handled by the sentbox connection.
#[derive(Debug)]
pub enum SentboxJob {}
/// Jobs handled by the smtp connection.
#[derive(Debug)]
pub enum SmtpJob {}

View File

@@ -50,36 +50,8 @@ impl From<async_native_tls::Error> for Error {
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug)]
pub struct Smtp {
inner: RwLock<SmtpInner>,
pub(crate) state: RwLock<State>,
pub(crate) notify_sender: Sender<()>,
pub(crate) notify_receiver: Receiver<()>,
}
impl Default for Smtp {
fn default() -> Self {
let (notify_sender, notify_receiver) = channel(1);
Smtp {
inner: Default::default(),
state: Default::default(),
notify_sender,
notify_receiver,
}
}
}
#[derive(Default, Debug)]
pub struct State {
pub(crate) suspended: bool,
pub(crate) doing_jobs: bool,
pub(crate) perform_jobs_needed: PerformJobsNeeded,
pub(crate) probe_network: bool,
}
#[derive(Default, DebugStub)] #[derive(Default, DebugStub)]
struct SmtpInner { pub struct Smtp {
#[debug_stub(some = "SmtpTransport")] #[debug_stub(some = "SmtpTransport")]
transport: Option<smtp::SmtpTransport>, transport: Option<smtp::SmtpTransport>,
@@ -99,18 +71,17 @@ impl Smtp {
} }
/// Disconnect the SMTP transport and drop it entirely. /// Disconnect the SMTP transport and drop it entirely.
pub async fn disconnect(&self) { pub async fn disconnect(&mut self) {
let inner = &mut *self.inner.write().await; if let Some(mut transport) = self.transport.take() {
if let Some(mut transport) = inner.transport.take() {
transport.close().await.ok(); transport.close().await.ok();
} }
inner.last_success = None; self.last_success = None;
} }
/// Return true if smtp was connected but is not known to /// Return true if smtp was connected but is not known to
/// have been successfully used the last 60 seconds /// have been successfully used the last 60 seconds
pub async fn has_maybe_stale_connection(&self) -> bool { pub async fn has_maybe_stale_connection(&self) -> bool {
if let Some(last_success) = self.inner.read().await.last_success { if let Some(last_success) = self.last_success {
Instant::now().duration_since(last_success).as_secs() > 60 Instant::now().duration_since(last_success).as_secs() > 60
} else { } else {
false false
@@ -119,17 +90,14 @@ impl Smtp {
/// Check whether we are connected. /// Check whether we are connected.
pub async fn is_connected(&self) -> bool { pub async fn is_connected(&self) -> bool {
self.inner self.transport
.read()
.await
.transport
.as_ref() .as_ref()
.map(|t| t.is_connected()) .map(|t| t.is_connected())
.unwrap_or_default() .unwrap_or_default()
} }
/// Connect using the provided login params. /// Connect using the provided login params.
pub async fn connect(&self, context: &Context, lp: &LoginParam) -> Result<()> { pub async fn connect(&mut self, context: &Context, lp: &LoginParam) -> Result<()> {
if self.is_connected().await { if self.is_connected().await {
warn!(context, "SMTP already connected."); warn!(context, "SMTP already connected.");
return Ok(()); return Ok(());
@@ -146,8 +114,7 @@ impl Smtp {
error: err, error: err,
})?; })?;
let inner = &mut *self.inner.write().await; self.from = Some(from);
inner.from = Some(from);
let domain = &lp.send_server; let domain = &lp.send_server;
let port = lp.send_port as u16; let port = lp.send_port as u16;
@@ -208,8 +175,8 @@ impl Smtp {
let mut trans = client.into_transport(); let mut trans = client.into_transport();
trans.connect().await.map_err(Error::ConnectionFailure)?; trans.connect().await.map_err(Error::ConnectionFailure)?;
inner.transport = Some(trans); self.transport = Some(trans);
inner.last_success = Some(Instant::now()); self.last_success = Some(Instant::now());
context.call_cb(Event::SmtpConnected(format!( context.call_cb(Event::SmtpConnected(format!(
"SMTP-LOGIN as {} ok", "SMTP-LOGIN as {} ok",

View File

@@ -24,7 +24,7 @@ impl Smtp {
/// Send a prepared mail to recipients. /// Send a prepared mail to recipients.
/// On successful send out Ok() is returned. /// On successful send out Ok() is returned.
pub async fn send( pub async fn send(
&self, &mut self,
context: &Context, context: &Context,
recipients: Vec<EmailAddress>, recipients: Vec<EmailAddress>,
message: Vec<u8>, message: Vec<u8>,
@@ -38,23 +38,22 @@ impl Smtp {
.collect::<Vec<String>>() .collect::<Vec<String>>()
.join(","); .join(",");
let envelope = Envelope::new(self.inner.read().await.from.clone(), recipients) let envelope =
.map_err(Error::EnvelopeError)?; Envelope::new(self.from.clone(), recipients).map_err(Error::EnvelopeError)?;
let mail = SendableEmail::new( let mail = SendableEmail::new(
envelope, envelope,
format!("{}", job_id), // only used for internal logging format!("{}", job_id), // only used for internal logging
message, message,
); );
let inner = &mut *self.inner.write().await; if let Some(ref mut transport) = self.transport {
if let Some(ref mut transport) = inner.transport {
transport.send(mail).await.map_err(Error::SendError)?; transport.send(mail).await.map_err(Error::SendError)?;
context.call_cb(Event::SmtpMessageSent(format!( context.call_cb(Event::SmtpMessageSent(format!(
"Message len={} was smtp-sent to {}", "Message len={} was smtp-sent to {}",
message_len, recipients_display message_len, recipients_display
))); )));
inner.last_success = Some(std::time::Instant::now()); self.last_success = Some(std::time::Instant::now());
Ok(()) Ok(())
} else { } else {