* fix interrupt_idle by signalling "skip_next_idle_wait" to the potentially concurrently "fn idle" function

* fixes double-export issue
This commit is contained in:
holger krekel
2019-11-11 15:33:19 +01:00
parent 8723aa097e
commit c6f4d6d8bd
2 changed files with 45 additions and 28 deletions

View File

@@ -678,7 +678,6 @@ class TestOnlineAccount:
assert len(messages) == 1 assert len(messages) == 1
assert messages[0].text == "msg1" assert messages[0].text == "msg1"
pytest.xfail("cannot export twice yet, probably due to interrupt_idle failing")
# wait until a second passed since last backup # wait until a second passed since last backup
# because get_latest_backupfile() shall return the latest backup # because get_latest_backupfile() shall return the latest backup
# from a UI it's unlikely anyone manages to export two # from a UI it's unlikely anyone manages to export two
@@ -899,7 +898,7 @@ class TestOnlineConfigureFails:
ac1.start_threads() ac1.start_threads()
wait_configuration_progress(ac1, 500) wait_configuration_progress(ac1, 500)
ev1 = ac1._evlogger.get_matching("DC_EVENT_ERROR_NETWORK") ev1 = ac1._evlogger.get_matching("DC_EVENT_ERROR_NETWORK")
assert "authentication failed" in ev1[2].lower() assert "cannot login" in ev1[2].lower()
wait_configuration_progress(ac1, 0, 0) wait_configuration_progress(ac1, 0, 0)
def test_invalid_user(self, acfactory): def test_invalid_user(self, acfactory):
@@ -908,7 +907,7 @@ class TestOnlineConfigureFails:
ac1.start_threads() ac1.start_threads()
wait_configuration_progress(ac1, 500) wait_configuration_progress(ac1, 500)
ev1 = ac1._evlogger.get_matching("DC_EVENT_ERROR_NETWORK") ev1 = ac1._evlogger.get_matching("DC_EVENT_ERROR_NETWORK")
assert "authentication failed" in ev1[2].lower() assert "cannot login" in ev1[2].lower()
wait_configuration_progress(ac1, 0, 0) wait_configuration_progress(ac1, 0, 0)
def test_invalid_domain(self, acfactory): def test_invalid_domain(self, acfactory):

View File

@@ -45,7 +45,7 @@ pub struct Imap {
session: Arc<Mutex<Option<Session>>>, session: Arc<Mutex<Option<Session>>>,
connected: Arc<Mutex<bool>>, connected: Arc<Mutex<bool>>,
interrupt: Arc<Mutex<Option<stop_token::StopSource>>>, interrupt: Arc<Mutex<Option<stop_token::StopSource>>>,
skip_next_idle_wait: AtomicBool,
should_reconnect: AtomicBool, should_reconnect: AtomicBool,
} }
@@ -119,6 +119,7 @@ impl Imap {
config: Arc::new(RwLock::new(ImapConfig::default())), config: Arc::new(RwLock::new(ImapConfig::default())),
interrupt: Arc::new(Mutex::new(None)), interrupt: Arc::new(Mutex::new(None)),
connected: Arc::new(Mutex::new(false)), connected: Arc::new(Mutex::new(false)),
skip_next_idle_wait: AtomicBool::new(false),
should_reconnect: AtomicBool::new(false), should_reconnect: AtomicBool::new(false),
} }
} }
@@ -496,24 +497,16 @@ impl Imap {
} }
async fn fetch_from_single_folder<S: AsRef<str>>(&self, context: &Context, folder: S) -> usize { async fn fetch_from_single_folder<S: AsRef<str>>(&self, context: &Context, folder: S) -> usize {
if !self.is_connected().await { match self.select_folder(context, Some(&folder)).await {
info!( ImapActionResult::Failed | ImapActionResult::RetryLater => {
context, warn!(
"Cannot fetch from \"{}\" - not connected.", context,
folder.as_ref() "Cannot select folder \"{}\" for fetching.",
); folder.as_ref()
);
return 0; return 0;
} }
ImapActionResult::Success | ImapActionResult::AlreadyDone => {}
let res = self.select_folder(context, Some(&folder)).await;
if res == ImapActionResult::Failed || res == ImapActionResult::RetryLater {
info!(
context,
"Cannot select folder \"{}\" for fetching.",
folder.as_ref()
);
return 0;
} }
// compare last seen UIDVALIDITY against the current one // compare last seen UIDVALIDITY against the current one
@@ -789,8 +782,17 @@ impl Imap {
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
*self.interrupt.lock().await = Some(interrupt); *self.interrupt.lock().await = Some(interrupt);
let res = idle_wait.await; if self.skip_next_idle_wait.load(Ordering::Relaxed) {
info!(context, "Idle finished: {:?}", res); // interrupt_idle has happened before we
// provided self.interrupt
self.skip_next_idle_wait.store(false, Ordering::Relaxed);
std::mem::drop(idle_wait);
info!(context, "Idle wait was skipped");
} else {
info!(context, "Idle entering wait-on-remote state");
let res = idle_wait.await;
info!(context, "Idle finished wait-on-remote: {:?}", res);
}
match handle.done().await { match handle.done().await {
Ok(session) => { Ok(session) => {
*self.session.lock().await = Some(Session::Secure(session)); *self.session.lock().await = Some(Session::Secure(session));
@@ -808,8 +810,18 @@ impl Imap {
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
*self.interrupt.lock().await = Some(interrupt); *self.interrupt.lock().await = Some(interrupt);
let res = idle_wait.await; if self.skip_next_idle_wait.load(Ordering::Relaxed) {
info!(context, "Idle finished: {:?}", res); // interrupt_idle has happened before we
// provided self.interrupt
self.skip_next_idle_wait.store(false, Ordering::Relaxed);
std::mem::drop(idle_wait);
info!(context, "Idle wait was skipped");
} else {
info!(context, "Idle entering wait-on-remote state");
let res = idle_wait.await;
info!(context, "Idle finished wait-on-remote: {:?}", res);
}
match handle.done().await { match handle.done().await {
Ok(session) => { Ok(session) => {
*self.session.lock().await = Some(Session::Insecure(session)); *self.session.lock().await = Some(Session::Insecure(session));
@@ -877,7 +889,13 @@ impl Imap {
pub fn interrupt_idle(&self) { pub fn interrupt_idle(&self) {
task::block_on(async move { task::block_on(async move {
let _ = self.interrupt.lock().await.take(); if self.interrupt.lock().await.take().is_none() {
// idle wait is not running, signal it needs to skip
self.skip_next_idle_wait.store(true, Ordering::Relaxed);
// meanwhile idle-wait may have produced the interrupter
let _ = self.interrupt.lock().await.take();
}
}); });
} }
@@ -1241,7 +1259,7 @@ impl Imap {
task::block_on(async move { task::block_on(async move {
info!(context, "emptying folder {}", folder); info!(context, "emptying folder {}", folder);
if !folder.is_empty() { if folder.is_empty() {
warn!(context, "cannot perform empty, folder not set"); warn!(context, "cannot perform empty, folder not set");
return; return;
} }