wip: stop sharing the inbox across threads

This commit is contained in:
dignifiedquire
2019-11-11 17:55:50 +01:00
parent 50aa68e047
commit 0e953d18d0
9 changed files with 301 additions and 295 deletions

View File

@@ -496,10 +496,12 @@ pub unsafe fn dc_cmdline(context: &Context, line: &str) -> Result<(), failure::E
println!("{:#?}", context.get_info());
}
"interrupt" => {
interrupt_imap_idle(context);
// interrupt_imap_idle(context);
unimplemented!()
}
"maybenetwork" => {
maybe_network(context);
// maybe_network(context);
unimplemented!()
}
"housekeeping" => {
sql::housekeeping(context);

View File

@@ -149,12 +149,14 @@ fn start_threads(c: Arc<RwLock<Context>>) {
let ctx = c.clone();
let handle_imap = std::thread::spawn(move || loop {
let mut inbox = ctx.read().unwrap().create_inbox();
while_running!({
perform_imap_jobs(&ctx.read().unwrap());
perform_imap_fetch(&ctx.read().unwrap());
perform_imap_jobs(&ctx.read().unwrap(), &mut inbox);
perform_imap_fetch(&ctx.read().unwrap(), &mut inbox);
while_running!({
let context = ctx.read().unwrap();
perform_imap_idle(&context);
perform_imap_idle(&context, &mut inbox);
});
});
});
@@ -202,7 +204,7 @@ fn stop_threads(context: &Context) {
println!("Stopping threads");
IS_RUNNING.store(false, Ordering::Relaxed);
interrupt_imap_idle(context);
// interrupt_imap_idle(context);
interrupt_mvbox_idle(context);
interrupt_sentbox_idle(context);
interrupt_smtp_idle(context);
@@ -457,7 +459,8 @@ unsafe fn handle_cmd(line: &str, ctx: Arc<RwLock<Context>>) -> Result<ExitResult
if HANDLE.clone().lock().unwrap().is_some() {
println!("imap-jobs are already running in a thread.");
} else {
perform_imap_jobs(&ctx.read().unwrap());
// perform_imap_jobs(&ctx.read().unwrap());
unimplemented!()
}
}
"configure" => {

View File

@@ -49,13 +49,15 @@ fn main() {
let ctx1 = ctx.clone();
let r1 = running.clone();
let t1 = thread::spawn(move || {
let mut inbox = ctx1.create_inbox();
while *r1.read().unwrap() {
perform_imap_jobs(&ctx1);
perform_imap_jobs(&ctx1, &mut inbox);
if *r1.read().unwrap() {
perform_imap_fetch(&ctx1);
perform_imap_fetch(&ctx1, &mut inbox);
if *r1.read().unwrap() {
perform_imap_idle(&ctx1);
perform_imap_idle(&ctx1, &mut inbox);
}
}
}
@@ -113,7 +115,7 @@ fn main() {
println!("stopping threads");
*running.clone().write().unwrap() = false;
deltachat::job::interrupt_imap_idle(&ctx);
// not needed anymore I believe. deltachat::job::interrupt_imap_idle(&ctx);
deltachat::job::interrupt_smtp_idle(&ctx);
println!("joining");

View File

@@ -119,12 +119,12 @@ impl Context {
}
Config::InboxWatch => {
let ret = self.sql.set_raw_config(self, key, value);
interrupt_imap_idle(self);
// interrupt_imap_idle(self);
ret
}
Config::SentboxWatch => {
let ret = self.sql.set_raw_config(self, key, value);
interrupt_sentbox_idle(self);
// interrupt_sentbox_idle(self);
ret
}
Config::MvboxWatch => {

View File

@@ -46,7 +46,7 @@ pub fn dc_is_configured(context: &Context) -> bool {
******************************************************************************/
// the other dc_job_do_DC_JOB_*() functions are declared static in the c-file
#[allow(non_snake_case, unused_must_use)]
pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) {
pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context, inbox: &mut Imap) {
if !context.sql.is_open() {
error!(context, "Cannot configure, database not opened.",);
progress!(context, 0);
@@ -62,19 +62,9 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) {
let mut param_autoconfig: Option<LoginParam> = None;
context.inbox.read().unwrap().disconnect(context);
context
.sentbox_thread
.read()
.unwrap()
.imap
.disconnect(context);
context
.mvbox_thread
.read()
.unwrap()
.imap
.disconnect(context);
inbox.disconnect();
context.sentbox_thread.write().unwrap().imap.disconnect();
context.mvbox_thread.write().unwrap().imap.disconnect();
context.smtp.clone().lock().unwrap().disconnect();
info!(context, "Configure ...",);
@@ -337,7 +327,7 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) {
/* try to connect to IMAP - if we did not got an autoconfig,
do some further tries with different settings and username variations */
imap_connected_here =
try_imap_connections(context, &mut param, param_autoconfig.is_some());
try_imap_connections(context, &mut param, param_autoconfig.is_some(), inbox);
imap_connected_here
}
15 => {
@@ -355,11 +345,7 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) {
} else {
0
};
context
.inbox
.read()
.unwrap()
.configure_folders(context, flags);
inbox.configure_folders(context, flags);
true
}
17 => {
@@ -398,7 +384,7 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) {
}
}
if imap_connected_here {
context.inbox.read().unwrap().disconnect(context);
inbox.disconnect();
}
if smtp_connected_here {
context.smtp.clone().lock().unwrap().disconnect();
@@ -439,9 +425,10 @@ fn try_imap_connections(
context: &Context,
mut param: &mut LoginParam,
was_autoconfig: bool,
inbox: &mut Imap,
) -> bool {
// progress 650 and 660
if let Some(res) = try_imap_connection(context, &mut param, was_autoconfig, 0) {
if let Some(res) = try_imap_connection(context, &mut param, was_autoconfig, 0, inbox) {
return res;
}
progress!(context, 670);
@@ -456,7 +443,7 @@ fn try_imap_connections(
param.send_user = param.send_user.split_at(at).0.to_string();
}
// progress 680 and 690
if let Some(res) = try_imap_connection(context, &mut param, was_autoconfig, 1) {
if let Some(res) = try_imap_connection(context, &mut param, was_autoconfig, 1, inbox) {
res
} else {
false
@@ -468,8 +455,9 @@ fn try_imap_connection(
param: &mut LoginParam,
was_autoconfig: bool,
variation: usize,
inbox: &mut Imap,
) -> Option<bool> {
if let Some(res) = try_imap_one_param(context, &param) {
if let Some(res) = try_imap_one_param(context, &param, inbox) {
return Some(res);
}
if was_autoconfig {
@@ -478,23 +466,23 @@ fn try_imap_connection(
progress!(context, 650 + variation * 30);
param.server_flags &= !(DC_LP_IMAP_SOCKET_FLAGS);
param.server_flags |= DC_LP_IMAP_SOCKET_STARTTLS;
if let Some(res) = try_imap_one_param(context, &param) {
if let Some(res) = try_imap_one_param(context, &param, inbox) {
return Some(res);
}
progress!(context, 660 + variation * 30);
param.mail_port = 143;
try_imap_one_param(context, &param)
try_imap_one_param(context, &param, inbox)
}
fn try_imap_one_param(context: &Context, param: &LoginParam) -> Option<bool> {
fn try_imap_one_param(context: &Context, param: &LoginParam, inbox: &mut Imap) -> Option<bool> {
let inf = format!(
"imap: {}@{}:{} flags=0x{:x}",
param.mail_user, param.mail_server, param.mail_port, param.server_flags
);
info!(context, "Trying: {}", inf);
if context.inbox.read().unwrap().connect(context, &param) {
if inbox.connect(context, &param) {
info!(context, "success: {}", inf);
return Some(true);
}
@@ -566,23 +554,25 @@ fn try_smtp_one_param(context: &Context, param: &LoginParam) -> Option<bool> {
/*******************************************************************************
* Connect to configured account
******************************************************************************/
pub fn dc_connect_to_configured_imap(context: &Context, imap: &Imap) -> libc::c_int {
let mut ret_connected = 0;
pub fn dc_connect_to_configured_imap(context: &Context, imap: &mut Imap) -> libc::c_int {
async_std::task::block_on(async move {
let mut ret_connected = 0;
if async_std::task::block_on(async move { imap.is_connected().await }) {
ret_connected = 1
} else if !context.sql.get_raw_config_bool(context, "configured") {
warn!(context, "Not configured, cannot connect.",);
} else {
let param = LoginParam::from_database(context, "configured_");
// the trailing underscore is correct
if imap.is_connected().await {
ret_connected = 1
} else if !context.sql.get_raw_config_bool(context, "configured") {
warn!(context, "Not configured, cannot connect.",);
} else {
let param = LoginParam::from_database(context, "configured_");
// the trailing underscore is correct
if imap.connect(context, &param) {
ret_connected = 2;
if imap.connect(context, &param) {
ret_connected = 2;
}
}
}
ret_connected
ret_connected
})
}
/*******************************************************************************
@@ -620,6 +610,8 @@ mod tests {
.set_config(Config::Addr, Some("probably@unexistant.addr"))
.unwrap();
t.ctx.set_config(Config::MailPw, Some("123456")).unwrap();
dc_job_do_DC_JOB_CONFIGURE_IMAP(&t.ctx);
let mut inbox = t.ctx.create_inbox();
dc_job_do_DC_JOB_CONFIGURE_IMAP(&t.ctx, &mut inbox);
}
}

View File

@@ -42,7 +42,6 @@ pub struct Context {
dbfile: PathBuf,
blobdir: PathBuf,
pub sql: Sql,
pub inbox: Arc<RwLock<Imap>>,
pub perform_inbox_jobs_needed: Arc<RwLock<bool>>,
pub probe_imap_network: Arc<RwLock<bool>>,
pub sentbox_thread: Arc<RwLock<JobThread>>,
@@ -118,7 +117,6 @@ impl Context {
let ctx = Context {
blobdir,
dbfile,
inbox: Arc::new(RwLock::new(Imap::new())),
cb,
os_name: Some(os_name),
running_state: Arc::new(RwLock::new(Default::default())),
@@ -153,6 +151,10 @@ impl Context {
Ok(ctx)
}
pub fn create_inbox(&self) -> Imap {
Imap::new()
}
pub fn get_dbfile(&self) -> &Path {
self.dbfile.as_path()
}
@@ -458,12 +460,6 @@ impl Context {
impl Drop for Context {
fn drop(&mut self) {
info!(self, "disconnecting INBOX-watch",);
self.inbox.read().unwrap().disconnect(self);
info!(self, "disconnecting sentbox-thread",);
self.sentbox_thread.read().unwrap().imap.disconnect(self);
info!(self, "disconnecting mvbox-thread",);
self.mvbox_thread.read().unwrap().imap.disconnect(self);
info!(self, "disconnecting SMTP");
self.smtp.clone().lock().unwrap().disconnect();
self.sql.close(self);

View File

@@ -1,4 +1,3 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, SystemTime};
use async_imap::{
@@ -6,7 +5,6 @@ use async_imap::{
types::{Fetch, Flag, Mailbox, Name, NameAttribute},
};
use async_std::prelude::*;
use async_std::sync::{Arc, Mutex, RwLock};
use async_std::task;
use crate::configure::dc_connect_to_configured_imap;
@@ -40,13 +38,19 @@ const SELECT_ALL: &str = "1:*";
#[derive(Debug)]
pub struct Imap {
config: Arc<RwLock<ImapConfig>>,
config: ImapConfig,
session: Arc<Mutex<Option<Session>>>,
connected: Arc<Mutex<bool>>,
interrupt: Arc<Mutex<Option<stop_token::StopSource>>>,
skip_next_idle_wait: AtomicBool,
should_reconnect: AtomicBool,
session: Option<Session>,
connected: bool,
interrupt: Option<stop_token::StopSource>,
skip_next_idle_wait: bool,
should_reconnect: bool,
}
impl Drop for Imap {
fn drop(&mut self) {
self.disconnect()
}
}
#[derive(Debug)]
@@ -115,43 +119,43 @@ impl Default for ImapConfig {
impl Imap {
pub fn new() -> Self {
Imap {
session: Arc::new(Mutex::new(None)),
config: Arc::new(RwLock::new(ImapConfig::default())),
interrupt: Arc::new(Mutex::new(None)),
connected: Arc::new(Mutex::new(false)),
skip_next_idle_wait: AtomicBool::new(false),
should_reconnect: AtomicBool::new(false),
session: None,
config: ImapConfig::default(),
interrupt: None,
connected: false,
skip_next_idle_wait: false,
should_reconnect: false,
}
}
pub async fn is_connected(&self) -> bool {
*self.connected.lock().await
self.connected
}
pub fn should_reconnect(&self) -> bool {
self.should_reconnect.load(Ordering::Relaxed)
self.should_reconnect
}
fn setup_handle_if_needed(&self, context: &Context) -> bool {
fn setup_handle_if_needed(&mut self, context: &Context) -> bool {
task::block_on(async move {
if self.config.read().await.imap_server.is_empty() {
if self.config.imap_server.is_empty() {
return false;
}
if self.should_reconnect() {
self.unsetup_handle(context).await;
self.unsetup_handle(Some(context)).await;
}
if self.is_connected().await {
self.should_reconnect.store(false, Ordering::Relaxed);
self.should_reconnect = false;
return true;
}
let server_flags = self.config.read().await.server_flags as i32;
let server_flags = self.config.server_flags as i32;
let connection_res: ImapResult<Client> =
if (server_flags & (DC_LP_IMAP_SOCKET_STARTTLS | DC_LP_IMAP_SOCKET_PLAIN)) != 0 {
let config = self.config.read().await;
let config = &mut self.config;
let imap_server: &str = config.imap_server.as_ref();
let imap_port = config.imap_port;
@@ -168,7 +172,7 @@ impl Imap {
Err(err) => Err(err),
}
} else {
let config = self.config.read().await;
let config = &mut self.config;
let imap_server: &str = config.imap_server.as_ref();
let imap_port = config.imap_port;
@@ -183,7 +187,7 @@ impl Imap {
let login_res = match connection_res {
Ok(client) => {
let config = self.config.read().await;
let config = &mut self.config;
let imap_user: &str = config.imap_user.as_ref();
let imap_pw: &str = config.imap_pw.as_ref();
@@ -208,7 +212,7 @@ impl Imap {
}
}
Err(err) => {
let config = self.config.read().await;
let config = &mut self.config;
let imap_server: &str = config.imap_server.as_ref();
let imap_port = config.imap_port;
let message = context.stock_string_repl_str2(
@@ -223,15 +227,15 @@ impl Imap {
}
};
self.should_reconnect.store(false, Ordering::Relaxed);
self.should_reconnect = false;
match login_res {
Ok(session) => {
*self.session.lock().await = Some(session);
self.session = Some(session);
true
}
Err((err, _)) => {
let imap_user = self.config.read().await.imap_user.to_owned();
let imap_user = self.config.imap_user.to_owned();
let message =
context.stock_string_repl_str(StockMessage::CannotLogin, &imap_user);
@@ -239,7 +243,7 @@ impl Imap {
context,
Event::ErrorNetwork(format!("{} ({})", message, err))
);
self.unsetup_handle(context).await;
self.unsetup_handle(Some(context)).await;
false
}
@@ -247,25 +251,33 @@ impl Imap {
})
}
async fn unsetup_handle(&self, context: &Context) {
info!(
context,
"IMAP unsetup_handle step 2 (acquiring session.lock)"
);
if let Some(mut session) = self.session.lock().await.take() {
async fn unsetup_handle(&mut self, context: Option<&Context>) {
if let Some(context) = context {
info!(
context,
"IMAP unsetup_handle step 2 (acquiring session.lock)"
);
}
if let Some(mut session) = self.session.take() {
if let Err(err) = session.close().await {
warn!(context, "failed to close connection: {:?}", err);
if let Some(context) = context {
warn!(context, "failed to close connection: {:?}", err);
}
}
}
info!(context, "IMAP unsetup_handle step 3 (clearing config).");
self.config.write().await.selected_folder = None;
self.config.write().await.selected_mailbox = None;
info!(context, "IMAP unsetup_handle step 4 (disconnected).");
if let Some(context) = context {
info!(context, "IMAP unsetup_handle step 3 (clearing config).");
}
self.config.selected_folder = None;
self.config.selected_mailbox = None;
if let Some(context) = context {
info!(context, "IMAP unsetup_handle step 4 (disconnected).");
}
}
async fn free_connect_params(&self) {
let mut cfg = self.config.write().await;
async fn free_connect_params(&mut self) {
let mut cfg = &mut self.config;
cfg.addr = "".into();
cfg.imap_server = "".into();
@@ -279,7 +291,7 @@ impl Imap {
cfg.watch_folder = None;
}
pub fn connect(&self, context: &Context, lp: &LoginParam) -> bool {
pub fn connect(&mut self, context: &Context, lp: &LoginParam) -> bool {
task::block_on(async move {
if lp.mail_server.is_empty() || lp.mail_user.is_empty() || lp.mail_pw.is_empty() {
return false;
@@ -297,7 +309,7 @@ impl Imap {
let imap_pw = &lp.mail_pw;
let server_flags = lp.server_flags as usize;
let mut config = self.config.write().await;
let mut config = &mut self.config;
config.addr = addr.to_string();
config.imap_server = imap_server.to_string();
config.imap_port = imap_port;
@@ -312,7 +324,7 @@ impl Imap {
return false;
}
let (teardown, can_idle, has_xlist) = match &mut *self.session.lock().await {
let (teardown, can_idle, has_xlist) = match &mut self.session {
Some(ref mut session) => match session.capabilities().await {
Ok(caps) => {
if !context.sql.is_open() {
@@ -343,35 +355,35 @@ impl Imap {
};
if teardown {
self.unsetup_handle(context).await;
self.unsetup_handle(Some(context)).await;
self.free_connect_params().await;
false
} else {
self.config.write().await.can_idle = can_idle;
self.config.write().await.has_xlist = has_xlist;
*self.connected.lock().await = true;
self.config.can_idle = can_idle;
self.config.has_xlist = has_xlist;
self.connected = true;
true
}
})
}
pub fn disconnect(&self, context: &Context) {
pub fn disconnect(&mut self) {
task::block_on(async move {
if self.is_connected().await {
self.unsetup_handle(context).await;
self.unsetup_handle(None).await;
self.free_connect_params().await;
*self.connected.lock().await = false;
self.connected = false;
}
});
}
pub fn set_watch_folder(&self, watch_folder: String) {
pub fn set_watch_folder(&mut self, watch_folder: String) {
task::block_on(async move {
self.config.write().await.watch_folder = Some(watch_folder);
self.config.watch_folder = Some(watch_folder);
});
}
pub fn fetch(&self, context: &Context) -> bool {
pub fn fetch(&mut self, context: &Context) -> bool {
task::block_on(async move {
if !self.is_connected().await || !context.sql.is_open() {
return false;
@@ -379,7 +391,7 @@ impl Imap {
self.setup_handle_if_needed(context);
let watch_folder = self.config.read().await.watch_folder.to_owned();
let watch_folder = self.config.watch_folder.to_owned();
if let Some(ref watch_folder) = watch_folder {
// as during the fetch commands, new messages may arrive, we fetch until we do not
@@ -398,12 +410,12 @@ impl Imap {
}
async fn select_folder<S: AsRef<str>>(
&self,
&mut self,
context: &Context,
folder: Option<S>,
) -> ImapActionResult {
if self.session.lock().await.is_none() {
let mut cfg = self.config.write().await;
if self.session.is_none() {
let mut cfg = &mut self.config;
cfg.selected_folder = None;
cfg.selected_folder_needs_expunge = false;
return ImapActionResult::Failed;
@@ -412,7 +424,7 @@ impl Imap {
// if there is a new folder and the new folder is equal to the selected one, there's nothing to do.
// if there is _no_ new folder, we continue as we might want to expunge below.
if let Some(ref folder) = folder {
if let Some(ref selected_folder) = self.config.read().await.selected_folder {
if let Some(ref selected_folder) = self.config.selected_folder {
if folder.as_ref() == selected_folder {
return ImapActionResult::AlreadyDone;
}
@@ -420,14 +432,14 @@ impl Imap {
}
// deselect existing folder, if needed (it's also done implicitly by SELECT, however, without EXPUNGE then)
let needs_expunge = { self.config.read().await.selected_folder_needs_expunge };
let needs_expunge = { self.config.selected_folder_needs_expunge };
if needs_expunge {
if let Some(ref folder) = self.config.read().await.selected_folder {
if let Some(ref folder) = self.config.selected_folder {
info!(context, "Expunge messages in \"{}\".", folder);
// A CLOSE-SELECT is considerably faster than an EXPUNGE-SELECT, see
// https://tools.ietf.org/html/rfc3501#section-6.4.2
if let Some(ref mut session) = &mut *self.session.lock().await {
if let Some(ref mut session) = &mut self.session {
match session.close().await {
Ok(_) => {
info!(context, "close/expunge succeeded");
@@ -441,15 +453,15 @@ impl Imap {
return ImapActionResult::Failed;
}
}
self.config.write().await.selected_folder_needs_expunge = false;
self.config.selected_folder_needs_expunge = false;
}
// select new folder
if let Some(ref folder) = folder {
if let Some(ref mut session) = &mut *self.session.lock().await {
if let Some(ref mut session) = &mut self.session {
match session.select(folder).await {
Ok(mailbox) => {
let mut config = self.config.write().await;
let mut config = &mut self.config;
config.selected_folder = Some(folder.as_ref().to_string());
config.selected_mailbox = Some(mailbox);
}
@@ -461,8 +473,8 @@ impl Imap {
err
);
self.config.write().await.selected_folder = None;
self.should_reconnect.store(true, Ordering::Relaxed);
self.config.selected_folder = None;
self.should_reconnect = true;
return ImapActionResult::Failed;
}
}
@@ -496,7 +508,11 @@ impl Imap {
}
}
async fn fetch_from_single_folder<S: AsRef<str>>(&self, context: &Context, folder: S) -> usize {
async fn fetch_from_single_folder<S: AsRef<str>>(
&mut self,
context: &Context,
folder: S,
) -> usize {
match self.select_folder(context, Some(&folder)).await {
ImapActionResult::Failed | ImapActionResult::RetryLater => {
warn!(
@@ -512,8 +528,11 @@ impl Imap {
// compare last seen UIDVALIDITY against the current one
let (mut uid_validity, mut last_seen_uid) = self.get_config_last_seen_uid(context, &folder);
let config = self.config.read().await;
let mailbox = config.selected_mailbox.as_ref().expect("just selected");
let mailbox = self
.config
.selected_mailbox
.as_ref()
.expect("just selected");
if mailbox.uid_validity.is_none() {
error!(
@@ -544,13 +563,13 @@ impl Imap {
return 0;
}
let list = if let Some(ref mut session) = &mut *self.session.lock().await {
let list = if let Some(ref mut session) = &mut self.session {
// `FETCH <message sequence number> (UID)`
let set = format!("{}", mailbox.exists);
match session.fetch(set, PREFETCH_FLAGS).await {
Ok(list) => list,
Err(_err) => {
self.should_reconnect.store(true, Ordering::Relaxed);
self.should_reconnect = true;
info!(
context,
"No result returned for folder \"{}\".",
@@ -586,7 +605,7 @@ impl Imap {
let mut read_errors = 0;
let mut new_last_seen_uid = 0;
let list = if let Some(ref mut session) = &mut *self.session.lock().await {
let list = if let Some(ref mut session) = &mut self.session {
// fetch messages with larger UID than the last one seen
// (`UID FETCH lastseenuid+1:*)`, see RFC 4549
let set = format!("{}:*", last_seen_uid + 1);
@@ -676,7 +695,7 @@ impl Imap {
}
async fn fetch_single_msg<S: AsRef<str>>(
&self,
&mut self,
context: &Context,
folder: S,
server_uid: u32,
@@ -690,11 +709,11 @@ impl Imap {
let set = format!("{}", server_uid);
let msgs = if let Some(ref mut session) = &mut *self.session.lock().await {
let msgs = if let Some(ref mut session) = &mut self.session {
match session.uid_fetch(set, BODY_FLAGS).await {
Ok(msgs) => msgs,
Err(err) => {
self.should_reconnect.store(true, Ordering::Relaxed);
self.should_reconnect = true;
warn!(
context,
"Error on fetching message #{} from folder \"{}\"; retry={}; error={}.",
@@ -743,19 +762,19 @@ impl Imap {
1
}
pub fn idle(&self, context: &Context) {
pub fn idle(&mut self, context: &Context) {
task::block_on(async move {
if self.config.read().await.selected_folder.is_none() {
if self.config.selected_folder.is_none() {
return;
}
if !self.config.read().await.can_idle {
if !self.config.can_idle {
self.fake_idle(context).await;
return;
}
self.setup_handle_if_needed(context);
let watch_folder = self.config.read().await.watch_folder.clone();
let watch_folder = self.config.watch_folder.clone();
match self.select_folder(context, watch_folder.as_ref()).await {
ImapActionResult::Success | ImapActionResult::AlreadyDone => {}
@@ -770,7 +789,7 @@ impl Imap {
}
}
let session = self.session.lock().await.take();
let session = self.session.take();
let timeout = Duration::from_secs(23 * 60);
if let Some(session) = session {
match session.idle() {
@@ -780,12 +799,12 @@ impl Imap {
return;
}
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
*self.interrupt.lock().await = Some(interrupt);
self.interrupt = Some(interrupt);
if self.skip_next_idle_wait.load(Ordering::Relaxed) {
if self.skip_next_idle_wait {
// interrupt_idle has happened before we
// provided self.interrupt
self.skip_next_idle_wait.store(false, Ordering::Relaxed);
self.skip_next_idle_wait = false;
std::mem::drop(idle_wait);
info!(context, "Idle wait was skipped");
} else {
@@ -795,7 +814,7 @@ impl Imap {
}
match handle.done().await {
Ok(session) => {
*self.session.lock().await = Some(Session::Secure(session));
self.session = Some(Session::Secure(session));
}
Err(err) => {
warn!(context, "Failed to close IMAP IDLE connection: {:?}", err);
@@ -808,12 +827,12 @@ impl Imap {
return;
}
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
*self.interrupt.lock().await = Some(interrupt);
self.interrupt = Some(interrupt);
if self.skip_next_idle_wait.load(Ordering::Relaxed) {
if self.skip_next_idle_wait {
// interrupt_idle has happened before we
// provided self.interrupt
self.skip_next_idle_wait.store(false, Ordering::Relaxed);
self.skip_next_idle_wait = false;
std::mem::drop(idle_wait);
info!(context, "Idle wait was skipped");
} else {
@@ -824,7 +843,7 @@ impl Imap {
match handle.done().await {
Ok(session) => {
*self.session.lock().await = Some(Session::Insecure(session));
self.session = Some(Session::Insecure(session));
}
Err(err) => {
warn!(context, "Failed to close IMAP IDLE connection: {:?}", err);
@@ -836,7 +855,7 @@ impl Imap {
});
}
async fn fake_idle(&self, context: &Context) {
async fn fake_idle(&mut self, context: &Context) {
// Idle using timeouts. This is also needed if we're not yet configured -
// in this case, we're waiting for a configure job
let fake_idle_start_time = SystemTime::now();
@@ -849,7 +868,7 @@ impl Imap {
// TODO: More flexible interval
let interval = async_std::stream::interval(Duration::from_secs(10));
let mut interrupt_interval = interrupt.stop_token().stop_stream(interval);
*self.interrupt.lock().await = Some(interrupt);
self.interrupt = Some(interrupt);
while let Some(_) = interrupt_interval.next().await {
// check if we want to finish fake-idling.
@@ -857,8 +876,8 @@ impl Imap {
// try to connect with proper login params
// (setup_handle_if_needed might not know about them if we
// never successfully connected)
if dc_connect_to_configured_imap(context, &self) != 0 {
self.interrupt.lock().await.take();
if dc_connect_to_configured_imap(context, self) != 0 {
self.interrupt.take();
}
}
// we are connected, let's see if fetching messages results
@@ -866,10 +885,10 @@ impl Imap {
// will have already fetched the messages so perform_*_fetch
// will not find any new.
let watch_folder = self.config.read().await.watch_folder.clone();
let watch_folder = self.config.watch_folder.clone();
if let Some(watch_folder) = watch_folder {
if 0 != self.fetch_from_single_folder(context, watch_folder).await {
self.interrupt.lock().await.take();
self.interrupt.take();
break;
}
}
@@ -887,20 +906,17 @@ impl Imap {
);
}
pub fn interrupt_idle(&self) {
pub fn interrupt_idle(&mut self) {
task::block_on(async move {
if self.interrupt.lock().await.take().is_none() {
// idle wait is not running, signal it needs to skip
self.skip_next_idle_wait.store(true, Ordering::Relaxed);
// meanwhile idle-wait may have produced the interrupter
let _ = self.interrupt.lock().await.take();
if self.interrupt.take().is_none() {
// idle wait is not running
self.skip_next_idle_wait = true;
}
});
}
pub fn mv(
&self,
&mut self,
context: &Context,
folder: &str,
uid: u32,
@@ -928,7 +944,7 @@ impl Imap {
let set = format!("{}", uid);
let display_folder_id = format!("{}/{}", folder, uid);
if let Some(ref mut session) = &mut *self.session.lock().await {
if let Some(ref mut session) = &mut self.session {
match session.uid_mv(&set, &dest_folder).await {
Ok(_) => {
emit_event!(
@@ -955,14 +971,14 @@ impl Imap {
unreachable!();
};
if let Some(ref mut session) = &mut *self.session.lock().await {
if let Some(ref mut session) = &mut self.session {
match session.uid_copy(&set, &dest_folder).await {
Ok(_) => {
if !self.add_flag_finalized(context, uid, "\\Deleted").await {
warn!(context, "Cannot mark {} as \"Deleted\" after copy.", uid);
ImapActionResult::Failed
} else {
self.config.write().await.selected_folder_needs_expunge = true;
self.config.selected_folder_needs_expunge = true;
ImapActionResult::Success
}
}
@@ -977,7 +993,7 @@ impl Imap {
})
}
async fn add_flag_finalized(&self, context: &Context, server_uid: u32, flag: &str) -> bool {
async fn add_flag_finalized(&mut self, context: &Context, server_uid: u32, flag: &str) -> bool {
// return true if we successfully set the flag or we otherwise
// think add_flag should not be retried: Disconnection during setting
// the flag, or other imap-errors, returns true as well.
@@ -991,7 +1007,7 @@ impl Imap {
}
async fn add_flag_finalized_with_set(
&self,
&mut self,
context: &Context,
uid_set: &str,
flag: &str,
@@ -999,7 +1015,7 @@ impl Imap {
if self.should_reconnect() {
return false;
}
if let Some(ref mut session) = &mut *self.session.lock().await {
if let Some(ref mut session) = &mut self.session {
let query = format!("+FLAGS ({})", flag);
match session.uid_store(uid_set, &query).await {
Ok(_) => {}
@@ -1017,7 +1033,7 @@ impl Imap {
}
pub fn prepare_imap_operation_on_msg(
&self,
&mut self,
context: &Context,
folder: &str,
uid: u32,
@@ -1026,7 +1042,7 @@ impl Imap {
if uid == 0 {
return Some(ImapActionResult::Failed);
} else if !self.is_connected().await {
connect_to_inbox(context, &self);
connect_to_inbox(context, self);
if !self.is_connected().await {
return Some(ImapActionResult::RetryLater);
}
@@ -1045,7 +1061,7 @@ impl Imap {
})
}
pub fn set_seen(&self, context: &Context, folder: &str, uid: u32) -> ImapActionResult {
pub fn set_seen(&mut self, context: &Context, folder: &str, uid: u32) -> ImapActionResult {
task::block_on(async move {
if let Some(imapresult) = self.prepare_imap_operation_on_msg(context, folder, uid) {
return imapresult;
@@ -1067,7 +1083,7 @@ impl Imap {
// only returns 0 on connection problems; we should try later again in this case *
pub fn delete_msg(
&self,
&mut self,
context: &Context,
message_id: &str,
folder: &str,
@@ -1084,7 +1100,7 @@ impl Imap {
// double-check that we are deleting the correct message-id
// this comes at the expense of another imap query
if let Some(ref mut session) = &mut *self.session.lock().await {
if let Some(ref mut session) = &mut self.session {
match session.uid_fetch(set, PREFETCH_FLAGS).await {
Ok(msgs) => {
if msgs.is_empty() {
@@ -1135,13 +1151,13 @@ impl Imap {
display_imap_id, message_id
))
);
self.config.write().await.selected_folder_needs_expunge = true;
self.config.selected_folder_needs_expunge = true;
ImapActionResult::Success
}
})
}
pub fn configure_folders(&self, context: &Context, flags: libc::c_int) {
pub fn configure_folders(&mut self, context: &Context, flags: libc::c_int) {
task::block_on(async move {
if !self.is_connected().await {
return;
@@ -1149,27 +1165,23 @@ impl Imap {
info!(context, "Configuring IMAP-folders.");
if let Some(ref mut session) = &mut *self.session.lock().await {
let folders = self
.list_folders(session, context)
.await
.expect("no folders found");
let delimiter = self.config.read().await.imap_delimiter;
let fallback_folder = format!("INBOX{}DeltaChat", delimiter);
let folders = self.list_folders(context).await.expect("no folders found");
let delimiter = self.config.imap_delimiter;
let fallback_folder = format!("INBOX{}DeltaChat", delimiter);
let mut mvbox_folder = folders
.iter()
.find(|folder| folder.name() == "DeltaChat" || folder.name() == fallback_folder)
.map(|n| n.name().to_string());
let mut mvbox_folder = folders
.iter()
.find(|folder| folder.name() == "DeltaChat" || folder.name() == fallback_folder)
.map(|n| n.name().to_string());
let sentbox_folder =
folders
.iter()
.find(|folder| match get_folder_meaning(folder) {
FolderMeaning::SentObjects => true,
_ => false,
});
let sentbox_folder = folders
.iter()
.find(|folder| match get_folder_meaning(folder) {
FolderMeaning::SentObjects => true,
_ => false,
});
if let Some(ref mut session) = self.session {
if mvbox_folder.is_none() && 0 != (flags as usize & DC_CREATE_MVBOX) {
info!(context, "Creating MVBOX-folder \"DeltaChat\"...",);
@@ -1233,29 +1245,29 @@ impl Imap {
})
}
async fn list_folders<'a>(
&self,
session: &'a mut Session,
context: &Context,
) -> Option<Vec<Name>> {
// TODO: use xlist when available
match session.list(Some(""), Some("*")).await {
Ok(list) => {
if list.is_empty() {
warn!(context, "Folder list is empty.",);
async fn list_folders<'a>(&mut self, context: &Context) -> Option<Vec<Name>> {
if let Some(ref mut session) = self.session {
// TODO: use xlist when available
match session.list(Some(""), Some("*")).await {
Ok(list) => {
if list.is_empty() {
warn!(context, "Folder list is empty.",);
}
Some(list)
}
Some(list)
}
Err(err) => {
eprintln!("list error: {:?}", err);
warn!(context, "Cannot get folder list.",);
Err(err) => {
eprintln!("list error: {:?}", err);
warn!(context, "Cannot get folder list.",);
None
None
}
}
} else {
None
}
}
pub fn empty_folder(&self, context: &Context, folder: &str) {
pub fn empty_folder(&mut self, context: &Context, folder: &str) {
task::block_on(async move {
info!(context, "emptying folder {}", folder);
@@ -1272,7 +1284,7 @@ impl Imap {
warn!(context, "Cannot empty folder {}", folder);
} else {
// we now trigger expunge to actually delete messages
self.config.write().await.selected_folder_needs_expunge = true;
self.config.selected_folder_needs_expunge = true;
if self.select_folder::<String>(context, None).await
== ImapActionResult::Success
{

View File

@@ -218,9 +218,7 @@ impl Job {
}
#[allow(non_snake_case)]
fn do_DC_JOB_MOVE_MSG(&mut self, context: &Context) {
let inbox = context.inbox.read().unwrap();
fn do_DC_JOB_MOVE_MSG(&mut self, context: &Context, inbox: &mut Imap) {
if let Ok(msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) {
if context
.sql
@@ -263,9 +261,7 @@ impl Job {
}
#[allow(non_snake_case)]
fn do_DC_JOB_DELETE_MSG_ON_IMAP(&mut self, context: &Context) {
let inbox = context.inbox.read().unwrap();
fn do_DC_JOB_DELETE_MSG_ON_IMAP(&mut self, context: &Context, inbox: &mut Imap) {
if let Ok(mut msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) {
if !msg.rfc724_mid.is_empty() {
/* eg. device messages have no Message-ID */
@@ -291,8 +287,7 @@ impl Job {
}
#[allow(non_snake_case)]
fn do_DC_JOB_EMPTY_SERVER(&mut self, context: &Context) {
let inbox = context.inbox.read().unwrap();
fn do_DC_JOB_EMPTY_SERVER(&mut self, context: &Context, inbox: &mut Imap) {
if self.foreign_id & DC_EMPTY_MVBOX > 0 {
if let Some(mvbox_folder) = context
.sql
@@ -307,9 +302,7 @@ impl Job {
}
#[allow(non_snake_case)]
fn do_DC_JOB_MARKSEEN_MSG_ON_IMAP(&mut self, context: &Context) {
let inbox = context.inbox.read().unwrap();
fn do_DC_JOB_MARKSEEN_MSG_ON_IMAP(&mut self, context: &Context, inbox: &mut Imap) {
if let Ok(msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) {
let folder = msg.server_folder.as_ref().unwrap();
match inbox.set_seen(context, folder, msg.server_uid) {
@@ -335,14 +328,13 @@ impl Job {
}
#[allow(non_snake_case)]
fn do_DC_JOB_MARKSEEN_MDN_ON_IMAP(&mut self, context: &Context) {
fn do_DC_JOB_MARKSEEN_MDN_ON_IMAP(&mut self, context: &Context, inbox: &mut Imap) {
let folder = self
.param
.get(Param::ServerFolder)
.unwrap_or_default()
.to_string();
let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32;
let inbox = context.inbox.read().unwrap();
if inbox.set_seen(context, &folder, uid) == ImapActionResult::RetryLater {
self.try_again_later(3i32, None);
return;
@@ -382,11 +374,10 @@ pub fn job_kill_action(context: &Context, action: Action) -> bool {
.is_ok()
}
pub fn perform_imap_fetch(context: &Context) {
let inbox = context.inbox.read().unwrap();
pub fn perform_imap_fetch(context: &Context, inbox: &mut Imap) {
let start = std::time::Instant::now();
if 0 == connect_to_inbox(context, &inbox) {
if 0 == connect_to_inbox(context, inbox) {
return;
}
if !context.get_config_bool(Config::InboxWatch) {
@@ -406,10 +397,8 @@ pub fn perform_imap_fetch(context: &Context) {
);
}
pub fn perform_imap_idle(context: &Context) {
let inbox = context.inbox.read().unwrap();
connect_to_inbox(context, &inbox);
pub fn perform_imap_idle(context: &Context, inbox: &mut Imap) {
connect_to_inbox(context, inbox);
if *context.perform_inbox_jobs_needed.clone().read().unwrap() {
info!(
@@ -438,13 +427,17 @@ pub fn perform_mvbox_idle(context: &Context) {
context
.mvbox_thread
.read()
.write()
.unwrap()
.idle(context, use_network);
}
pub fn interrupt_mvbox_idle(context: &Context) {
context.mvbox_thread.read().unwrap().interrupt_idle(context);
context
.mvbox_thread
.write()
.unwrap()
.interrupt_idle(context);
}
pub fn perform_sentbox_fetch(context: &Context) {
@@ -462,7 +455,7 @@ pub fn perform_sentbox_idle(context: &Context) {
context
.sentbox_thread
.read()
.write()
.unwrap()
.idle(context, use_network);
}
@@ -470,7 +463,7 @@ pub fn perform_sentbox_idle(context: &Context) {
pub fn interrupt_sentbox_idle(context: &Context) {
context
.sentbox_thread
.read()
.write()
.unwrap()
.interrupt_idle(context);
}
@@ -493,7 +486,7 @@ pub fn perform_smtp_jobs(context: &Context) {
};
info!(context, "SMTP-jobs started...",);
job_perform(context, Thread::Smtp, probe_smtp_network);
job_perform(context, Thread::Smtp, probe_smtp_network, None);
info!(context, "SMTP-jobs ended.");
{
@@ -557,7 +550,7 @@ fn get_next_wakeup_time(context: &Context, thread: Thread) -> Duration {
wakeup_time
}
pub fn maybe_network(context: &Context) {
pub fn maybe_network(context: &Context, inbox: &mut Imap) {
{
let &(ref lock, _) = &*context.smtp_state.clone();
let mut state = lock.lock().unwrap();
@@ -567,7 +560,7 @@ pub fn maybe_network(context: &Context) {
}
interrupt_smtp_idle(context);
interrupt_imap_idle(context);
interrupt_imap_idle(context, inbox);
interrupt_mvbox_idle(context);
interrupt_sentbox_idle(context);
}
@@ -681,14 +674,14 @@ pub fn job_send_msg(context: &Context, msg_id: MsgId) -> Result<(), Error> {
Ok(())
}
pub fn perform_imap_jobs(context: &Context) {
pub fn perform_imap_jobs(context: &Context, inbox: &mut Imap) {
info!(context, "dc_perform_imap_jobs starting.",);
let probe_imap_network = *context.probe_imap_network.clone().read().unwrap();
*context.probe_imap_network.write().unwrap() = false;
*context.perform_inbox_jobs_needed.write().unwrap() = false;
job_perform(context, Thread::Imap, probe_imap_network);
job_perform(context, Thread::Imap, probe_imap_network, Some(inbox));
info!(context, "dc_perform_imap_jobs ended.",);
}
@@ -700,7 +693,12 @@ pub fn perform_sentbox_jobs(context: &Context) {
info!(context, "dc_perform_sentbox_jobs EMPTY (for now).",);
}
fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
fn job_perform(
context: &Context,
thread: Thread,
probe_network: bool,
mut inbox: Option<&mut Imap>,
) {
let query = if !probe_network {
// processing for first-try and after backoff-timeouts:
// process jobs in the order they were added.
@@ -768,18 +766,8 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
// - they can be re-executed one time AT_ONCE, but they are not save in the database for later execution
if Action::ConfigureImap == job.action || Action::ImexImap == job.action {
job_kill_action(context, job.action);
context
.sentbox_thread
.clone()
.read()
.unwrap()
.suspend(context);
context
.mvbox_thread
.clone()
.read()
.unwrap()
.suspend(context);
context.sentbox_thread.write().unwrap().suspend(context);
context.mvbox_thread.write().unwrap().suspend(context);
suspend_smtp_thread(context, true);
}
@@ -793,13 +781,21 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
warn!(context, "Unknown job id found");
}
Action::SendMsgToSmtp => job.do_DC_JOB_SEND(context),
Action::EmptyServer => job.do_DC_JOB_EMPTY_SERVER(context),
Action::DeleteMsgOnImap => job.do_DC_JOB_DELETE_MSG_ON_IMAP(context),
Action::MarkseenMsgOnImap => job.do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context),
Action::MarkseenMdnOnImap => job.do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context),
Action::MoveMsg => job.do_DC_JOB_MOVE_MSG(context),
Action::EmptyServer => job.do_DC_JOB_EMPTY_SERVER(context, inbox.as_mut().unwrap()),
Action::DeleteMsgOnImap => {
job.do_DC_JOB_DELETE_MSG_ON_IMAP(context, inbox.as_mut().unwrap())
}
Action::MarkseenMsgOnImap => {
job.do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context, inbox.as_mut().unwrap())
}
Action::MarkseenMdnOnImap => {
job.do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context, inbox.as_mut().unwrap())
}
Action::MoveMsg => job.do_DC_JOB_MOVE_MSG(context, inbox.as_mut().unwrap()),
Action::SendMdn => job.do_DC_JOB_SEND(context),
Action::ConfigureImap => dc_job_do_DC_JOB_CONFIGURE_IMAP(context),
Action::ConfigureImap => {
dc_job_do_DC_JOB_CONFIGURE_IMAP(context, inbox.as_mut().unwrap())
}
Action::ImexImap => match job_do_DC_JOB_IMEX_IMAP(context, &job) {
Ok(()) => {}
Err(err) => {
@@ -926,7 +922,7 @@ fn suspend_smtp_thread(context: &Context, suspend: bool) {
}
}
pub fn connect_to_inbox(context: &Context, inbox: &Imap) -> libc::c_int {
pub fn connect_to_inbox(context: &Context, inbox: &mut Imap) -> libc::c_int {
let ret_connected = dc_connect_to_configured_imap(context, inbox);
if 0 != ret_connected {
inbox.set_watch_folder("INBOX".into());
@@ -1004,7 +1000,7 @@ pub fn job_add(
).ok();
match thread {
Thread::Imap => interrupt_imap_idle(context),
Thread::Imap => {} //FIXME interrupt_imap_idle(context),
Thread::Smtp => interrupt_smtp_idle(context),
Thread::Unknown => {}
}
@@ -1021,10 +1017,9 @@ pub fn interrupt_smtp_idle(context: &Context) {
cvar.notify_one();
}
pub fn interrupt_imap_idle(context: &Context) {
pub fn interrupt_imap_idle(context: &Context, inbox: &mut Imap) {
info!(context, "Interrupting INBOX-IDLE...",);
*context.perform_inbox_jobs_needed.write().unwrap() = true;
context.inbox.read().unwrap().interrupt_idle();
inbox.interrupt_idle();
}

View File

@@ -30,7 +30,7 @@ impl JobThread {
}
}
pub fn suspend(&self, context: &Context) {
pub fn suspend(&mut self, context: &Context) {
info!(context, "Suspending {}-thread.", self.name,);
{
self.state.0.lock().unwrap().suspended = true;
@@ -56,7 +56,7 @@ impl JobThread {
cvar.notify_one();
}
pub fn interrupt_idle(&self, context: &Context) {
pub fn interrupt_idle(&mut self, context: &Context) {
{
self.state.0.lock().unwrap().jobs_needed = true;
}
@@ -106,35 +106,39 @@ impl JobThread {
self.state.0.lock().unwrap().using_handle = false;
}
fn connect_to_imap(&self, context: &Context) -> bool {
if async_std::task::block_on(async move { self.imap.is_connected().await }) {
return true;
}
let mut ret_connected = dc_connect_to_configured_imap(context, &self.imap) != 0;
if ret_connected {
if context
.sql
.get_raw_config_int(context, "folders_configured")
.unwrap_or_default()
< 3
{
self.imap.configure_folders(context, 0x1);
fn connect_to_imap(&mut self, context: &Context) -> bool {
async_std::task::block_on(async move {
if self.imap.is_connected().await {
return true;
}
if let Some(mvbox_name) = context.sql.get_raw_config(context, self.folder_config_name) {
self.imap.set_watch_folder(mvbox_name);
} else {
self.imap.disconnect(context);
ret_connected = false;
}
}
let mut ret_connected = dc_connect_to_configured_imap(context, &mut self.imap) != 0;
ret_connected
if ret_connected {
if context
.sql
.get_raw_config_int(context, "folders_configured")
.unwrap_or_default()
< 3
{
self.imap.configure_folders(context, 0x1);
}
if let Some(mvbox_name) =
context.sql.get_raw_config(context, self.folder_config_name)
{
self.imap.set_watch_folder(mvbox_name);
} else {
self.imap.disconnect();
ret_connected = false;
}
}
ret_connected
})
}
pub fn idle(&self, context: &Context, use_network: bool) {
pub fn idle(&mut self, context: &Context, use_network: bool) {
{
let &(ref lock, ref cvar) = &*self.state.clone();
let mut state = lock.lock().unwrap();