cleanup interrupt and exit of imap idle

This commit is contained in:
dignifiedquire
2020-07-07 12:54:24 +02:00
committed by link2xt
parent 3df0ef50a4
commit 7d08397b48
2 changed files with 83 additions and 118 deletions

View File

@@ -48,11 +48,10 @@ impl Imap {
self.select_folder(context, watch_folder.clone()).await?; self.select_folder(context, watch_folder.clone()).await?;
let session = self.session.take();
let timeout = Duration::from_secs(23 * 60); let timeout = Duration::from_secs(23 * 60);
let mut info = Default::default(); let mut info = Default::default();
if let Some(session) = session { if let Some(session) = self.session.take() {
let mut handle = session.idle(); let mut handle = session.idle();
if let Err(err) = handle.init().await { if let Err(err) = handle.init().await {
return Err(Error::IdleProtocolFailed(err)); return Err(Error::IdleProtocolFailed(err));
@@ -65,68 +64,43 @@ impl Imap {
Interrupt(InterruptInfo), Interrupt(InterruptInfo),
} }
if self.skip_next_idle_wait { info!(context, "Idle entering wait-on-remote state");
// interrupt_idle has happened before we let fut = idle_wait.map(|ev| ev.map(Event::IdleResponse)).race(async {
// provided self.interrupt let probe_network = self.idle_interrupt.recv().await;
self.skip_next_idle_wait = false;
drop(idle_wait); // cancel imap idle connection properly
drop(interrupt); drop(interrupt);
info!(context, "Idle wait was skipped"); Ok(Event::Interrupt(probe_network.unwrap_or_default()))
} else { });
info!(context, "Idle entering wait-on-remote state");
let fut = idle_wait.map(|ev| ev.map(Event::IdleResponse)).race(
self.idle_interrupt.recv().map(|probe_network| {
Ok(Event::Interrupt(probe_network.unwrap_or_default()))
}),
);
match fut.await { match fut.await {
Ok(Event::IdleResponse(IdleResponse::NewData(_))) => { Ok(Event::IdleResponse(IdleResponse::NewData(_))) => {
info!(context, "Idle has NewData"); info!(context, "Idle has NewData");
} }
// TODO: idle_wait does not distinguish manual interrupts Ok(Event::IdleResponse(IdleResponse::Timeout)) => {
// from Timeouts if we would know it's a Timeout we could bail info!(context, "Idle-wait timeout or interruption");
// directly and reconnect . }
Ok(Event::IdleResponse(IdleResponse::Timeout)) => { Ok(Event::IdleResponse(IdleResponse::ManualInterrupt)) => {
info!(context, "Idle-wait timeout or interruption"); info!(context, "Idle wait was interrupted");
} }
Ok(Event::IdleResponse(IdleResponse::ManualInterrupt)) => { Ok(Event::Interrupt(i)) => {
info!(context, "Idle wait was interrupted"); info = i;
} info!(context, "Idle wait was interrupted");
Ok(Event::Interrupt(i)) => { }
info = i; Err(err) => {
info!(context, "Idle wait was interrupted"); warn!(context, "Idle wait errored: {:?}", err);
}
Err(err) => {
warn!(context, "Idle wait errored: {:?}", err);
}
} }
} }
// if we can't properly terminate the idle let session = handle
// protocol let's break the connection.
let res = handle
.done() .done()
.timeout(Duration::from_secs(15)) .timeout(Duration::from_secs(15))
.await .await
.map_err(|err| { .map_err(Error::IdleTimeout)??;
self.trigger_reconnect(); self.session = Some(Session { inner: session });
Error::IdleTimeout(err) } else {
})?; warn!(context, "Attempted to idle without a session");
match res {
Ok(session) => {
self.session = Some(Session { inner: session });
}
Err(err) => {
// if we cannot terminate IDLE it probably
// means that we waited long (with idle_wait)
// but the network went away/changed
self.trigger_reconnect();
return Err(Error::IdleProtocolFailed(err));
}
}
} }
Ok(info) Ok(info)
@@ -148,73 +122,66 @@ impl Imap {
return self.idle_interrupt.recv().await.unwrap_or_default(); return self.idle_interrupt.recv().await.unwrap_or_default();
} }
let mut info: InterruptInfo = Default::default(); // check every minute if there are new messages
if self.skip_next_idle_wait { // TODO: grow sleep durations / make them more flexible
// interrupt_idle has happened before we let mut interval = async_std::stream::interval(Duration::from_secs(60));
// provided self.interrupt
self.skip_next_idle_wait = false;
info!(context, "fake-idle wait was skipped");
} else {
// check every minute if there are new messages
// TODO: grow sleep durations / make them more flexible
let mut interval = async_std::stream::interval(Duration::from_secs(60));
enum Event { enum Event {
Tick, Tick,
Interrupt(InterruptInfo), Interrupt(InterruptInfo),
} }
// loop until we are interrupted or if we fetched something // loop until we are interrupted or if we fetched something
info = let info = loop {
loop { use futures::future::FutureExt;
use futures::future::FutureExt; match interval
match interval .next()
.next() .map(|_| Event::Tick)
.map(|_| Event::Tick) .race(
.race(self.idle_interrupt.recv().map(|probe_network| { self.idle_interrupt
Event::Interrupt(probe_network.unwrap_or_default()) .recv()
})) .map(|probe_network| Event::Interrupt(probe_network.unwrap_or_default())),
.await )
{ .await
Event::Tick => { {
// try to connect with proper login params Event::Tick => {
// (setup_handle_if_needed might not know about them if we // try to connect with proper login params
// never successfully connected) // (setup_handle_if_needed might not know about them if we
if let Err(err) = self.connect_configured(context).await { // never successfully connected)
warn!(context, "fake_idle: could not connect: {}", err); if let Err(err) = self.connect_configured(context).await {
continue; warn!(context, "fake_idle: could not connect: {}", err);
} continue;
if self.config.can_idle { }
// we only fake-idled because network was gone during IDLE, probably if self.config.can_idle {
break InterruptInfo::new(false, None); // we only fake-idled because network was gone during IDLE, probably
} break InterruptInfo::new(false, None);
info!(context, "fake_idle is connected"); }
// we are connected, let's see if fetching messages results info!(context, "fake_idle is connected");
// in anything. If so, we behave as if IDLE had data but // we are connected, let's see if fetching messages results
// will have already fetched the messages so perform_*_fetch // in anything. If so, we behave as if IDLE had data but
// will not find any new. // will have already fetched the messages so perform_*_fetch
// will not find any new.
if let Some(ref watch_folder) = watch_folder { if let Some(ref watch_folder) = watch_folder {
match self.fetch_new_messages(context, watch_folder).await { match self.fetch_new_messages(context, watch_folder).await {
Ok(res) => { Ok(res) => {
info!(context, "fetch_new_messages returned {:?}", res); info!(context, "fetch_new_messages returned {:?}", res);
if res { if res {
break InterruptInfo::new(false, None); break InterruptInfo::new(false, None);
}
}
Err(err) => {
error!(context, "could not fetch from folder: {}", err);
self.trigger_reconnect()
}
} }
} }
} Err(err) => {
Event::Interrupt(info) => { error!(context, "could not fetch from folder: {}", err);
// Interrupt self.trigger_reconnect()
break info; }
} }
} }
}; }
} Event::Interrupt(info) => {
// Interrupt
break info;
}
}
};
info!( info!(
context, context,

View File

@@ -117,7 +117,6 @@ pub struct Imap {
session: Option<Session>, session: Option<Session>,
connected: bool, connected: bool,
interrupt: Option<stop_token::StopSource>, interrupt: Option<stop_token::StopSource>,
skip_next_idle_wait: bool,
should_reconnect: bool, should_reconnect: bool,
} }
@@ -191,7 +190,6 @@ impl Imap {
session: Default::default(), session: Default::default(),
connected: Default::default(), connected: Default::default(),
interrupt: Default::default(), interrupt: Default::default(),
skip_next_idle_wait: Default::default(),
should_reconnect: Default::default(), should_reconnect: Default::default(),
} }
} }