fix(imap): improve reconnection logic and ideling

This commit is contained in:
dignifiedquire
2019-05-14 22:43:04 +02:00
parent ed79e5256f
commit 7dd2cc660b

View File

@@ -34,6 +34,7 @@ pub struct Imap {
receive_imf: dc_receive_imf_t,
session: Arc<Mutex<(Option<Session>, Option<net::TcpStream>)>>,
connected: Arc<Mutex<bool>>,
}
struct OAuth2 {
@@ -303,12 +304,12 @@ impl Session {
}
pub struct ImapConfig {
pub addr: Option<String>,
pub imap_server: Option<String>,
pub imap_port: Option<usize>,
pub imap_user: Option<String>,
pub imap_pw: Option<String>,
pub server_flags: Option<usize>,
pub addr: String,
pub imap_server: String,
pub imap_port: u16,
pub imap_user: String,
pub imap_pw: String,
pub server_flags: usize,
pub selected_folder: Option<String>,
pub selected_mailbox: Option<imap::types::Mailbox>,
pub selected_folder_needs_expunge: bool,
@@ -322,12 +323,12 @@ pub struct ImapConfig {
impl Default for ImapConfig {
fn default() -> Self {
let cfg = ImapConfig {
addr: None,
imap_server: None,
imap_port: None,
imap_user: None,
imap_pw: None,
server_flags: None,
addr: "".into(),
imap_server: "".into(),
imap_port: 0,
imap_user: "".into(),
imap_pw: "".into(),
server_flags: 0,
selected_folder: None,
selected_mailbox: None,
selected_folder_needs_expunge: false,
@@ -357,39 +358,36 @@ impl Imap {
set_config,
precheck_imf,
receive_imf,
connected: Arc::new(Mutex::new(false)),
}
}
pub fn is_connected(&self) -> bool {
self.session.lock().unwrap().1.is_some()
*self.connected.lock().unwrap()
}
pub fn should_reconnect(&self) -> bool {
self.config.read().unwrap().should_reconnect
}
pub fn connect(&self, context: &dc_context_t, lp: *const dc_loginparam_t) -> libc::c_int {
if lp.is_null() {
return 0;
}
let lp = unsafe { *lp };
if lp.mail_server.is_null() || lp.mail_user.is_null() || lp.mail_pw.is_null() {
return 0;
fn setup_handle_if_needed(&self, context: &dc_context_t) -> libc::c_int {
if self.should_reconnect() {
self.unsetup_handle(context);
}
if self.is_connected() {
if self.is_connected() && self.session.lock().unwrap().1.is_some() {
self.config.write().unwrap().should_reconnect = false;
return 1;
}
let addr = to_str(lp.addr);
let imap_server = to_str(lp.mail_server);
let imap_port = lp.mail_port as u16;
let imap_user = to_str(lp.mail_user);
let imap_pw = to_str(lp.mail_pw);
let server_flags = lp.server_flags as usize;
let server_flags = self.config.read().unwrap().server_flags;
let connection_res: imap::error::Result<Client> =
if (server_flags & (DC_LP_IMAP_SOCKET_STARTTLS | DC_LP_IMAP_SOCKET_PLAIN)) != 0 {
let config = self.config.read().unwrap();
let imap_server: &str = config.imap_server.as_ref();
let imap_port = config.imap_port;
Client::connect_insecure((imap_server, imap_port)).and_then(|client| {
if (server_flags & DC_LP_IMAP_SOCKET_STARTTLS) != 0 {
client.secure(imap_server)
@@ -398,17 +396,27 @@ impl Imap {
}
})
} else {
let config = self.config.read().unwrap();
let imap_server: &str = config.imap_server.as_ref();
let imap_port = config.imap_port;
Client::connect_secure((imap_server, imap_port), imap_server)
};
let login_res = match connection_res {
Ok(client) => {
let config = self.config.read().unwrap();
let imap_user: &str = config.imap_user.as_ref();
let imap_pw: &str = config.imap_pw.as_ref();
if (server_flags & DC_LP_AUTH_OAUTH2) != 0 {
let addr: &str = config.addr.as_ref();
let access_token = unsafe {
CStr::from_ptr(dc_get_oauth2_access_token(
context,
lp.addr,
lp.mail_pw,
CString::new(addr).unwrap().as_ptr(),
CString::new(imap_pw).unwrap().as_ptr(),
DC_REGENERATE as libc::c_int,
))
.to_str()
@@ -425,6 +433,10 @@ impl Imap {
}
}
Err(err) => {
let config = self.config.read().unwrap();
let imap_server: &str = config.imap_server.as_ref();
let imap_port = config.imap_port;
eprintln!("failed to connect: {:?}", err);
unsafe {
dc_log_event_seq(
@@ -442,39 +454,15 @@ impl Imap {
}
};
self.config.write().unwrap().should_reconnect = false;
match login_res {
Ok((mut session, stream)) => {
// TODO: error handling
let caps = session.capabilities().unwrap();
let can_idle = caps.has("IDLE");
let has_xlist = caps.has("XLIST");
let caps_list = caps.iter().fold(String::new(), |mut s, c| {
s += " ";
s += c;
s
});
let caps_list_c = std::ffi::CString::new(caps_list).unwrap();
info!(context, 0, "IMAP-capabilities:%s", caps_list_c.as_ptr());
let mut config = self.config.write().unwrap();
config.can_idle = can_idle;
config.has_xlist = has_xlist;
config.addr = Some(addr.into());
config.imap_server = Some(imap_server.into());
config.imap_port = Some(imap_port.into());
config.imap_user = Some(imap_user.into());
config.imap_pw = Some(imap_pw.into());
config.server_flags = Some(server_flags);
Ok((session, stream)) => {
*self.session.lock().unwrap() = (Some(session), Some(stream));
1
}
Err((err, _)) => {
eprintln!("failed to login: {:?}", err);
unsafe {
dc_log_event_seq(
context,
@@ -484,12 +472,14 @@ impl Imap {
)
};
self.unsetup_handle(context);
0
}
}
}
pub fn disconnect(&self, context: &dc_context_t) {
fn unsetup_handle(&self, context: &dc_context_t) {
let session = self.session.lock().unwrap().0.take();
if session.is_some() {
match session.unwrap().close() {
@@ -510,20 +500,101 @@ impl Imap {
}
let mut cfg = self.config.write().unwrap();
cfg.selected_folder = None;
cfg.selected_mailbox = None;
info!(context, 0, "IMAP disconnected.",);
}
cfg.addr = None;
cfg.imap_server = None;
cfg.imap_user = None;
cfg.imap_pw = None;
cfg.imap_port = None;
fn free_connect_params(&self) {
let mut cfg = self.config.write().unwrap();
cfg.addr = "".into();
cfg.imap_server = "".into();
cfg.imap_user = "".into();
cfg.imap_pw = "".into();
cfg.imap_port = 0;
cfg.can_idle = false;
cfg.has_xlist = false;
cfg.watch_folder = None;
cfg.selected_folder = None;
cfg.selected_mailbox = None;
info!(context, 0, "IMAP disconnected.",);
}
pub fn connect(&self, context: &dc_context_t, lp: *const dc_loginparam_t) -> libc::c_int {
if lp.is_null() {
return 0;
}
let lp = unsafe { *lp };
if lp.mail_server.is_null() || lp.mail_user.is_null() || lp.mail_pw.is_null() {
return 0;
}
if self.is_connected() {
return 1;
}
{
let addr = to_str(lp.addr);
let imap_server = to_str(lp.mail_server);
let imap_port = lp.mail_port as u16;
let imap_user = to_str(lp.mail_user);
let imap_pw = to_str(lp.mail_pw);
let server_flags = lp.server_flags as usize;
let mut config = self.config.write().unwrap();
config.addr = addr.into();
config.imap_server = imap_server.into();
config.imap_port = imap_port.into();
config.imap_user = imap_user.into();
config.imap_pw = imap_pw.into();
config.server_flags = server_flags;
}
if self.setup_handle_if_needed(context) == 0 {
return 0;
}
match self.session.lock().unwrap().0 {
Some(ref mut session) => {
if let Ok(caps) = session.capabilities() {
let can_idle = caps.has("IDLE");
let has_xlist = caps.has("XLIST");
let caps_list = caps.iter().fold(String::new(), |mut s, c| {
s += " ";
s += c;
s
});
let caps_list_c = std::ffi::CString::new(caps_list).unwrap();
info!(context, 0, "IMAP-capabilities:%s", caps_list_c.as_ptr());
let mut config = self.config.write().unwrap();
config.can_idle = can_idle;
config.has_xlist = has_xlist;
*self.connected.lock().unwrap() = true;
1
} else {
self.unsetup_handle(context);
self.free_connect_params();
0
}
}
None => {
self.unsetup_handle(context);
self.free_connect_params();
0
}
}
}
pub fn disconnect(&self, context: &dc_context_t) {
if self.is_connected() {
self.unsetup_handle(context);
self.free_connect_params();
*self.connected.lock().unwrap() = false;
}
}
pub fn set_watch_folder(&self, watch_folder: *const libc::c_char) {
@@ -531,25 +602,31 @@ impl Imap {
}
pub fn fetch(&self, context: &dc_context_t) -> libc::c_int {
let mut success = 0;
if !self.is_connected() {
return 0;
}
self.setup_handle_if_needed(context);
let watch_folder = self.config.read().unwrap().watch_folder.to_owned();
if self.is_connected() && watch_folder.is_some() {
let watch_folder = watch_folder.unwrap();
if let Some(ref watch_folder) = watch_folder {
// as during the fetch commands, new messages may arrive, we fetch until we do not
// get any more. if IDLE is called directly after, there is only a small chance that
// messages are missed and delayed until the next IDLE call
loop {
let cnt = self.fetch_from_single_folder(context, &watch_folder);
if cnt == 0 {
if self.fetch_from_single_folder(context, watch_folder) == 0 {
break;
}
}
success = 1;
1
} else {
0
}
success
}
fn select_folder<S: AsRef<str>>(&self, context: &dc_context_t, folder: Option<S>) -> usize {
if !self.is_connected() {
if self.session.lock().unwrap().0.is_none() {
let mut cfg = self.config.write().unwrap();
cfg.selected_folder = None;
cfg.selected_folder_needs_expunge = false;
@@ -576,26 +653,42 @@ impl Imap {
CString::new(folder.to_owned()).unwrap().as_ptr()
);
// a CLOSE-SELECT is considerably faster than an EXPUNGE-SELECT, see https://tools.ietf.org/html/rfc3501#section-6.4.2
// A CLOSE-SELECT is considerably faster than an EXPUNGE-SELECT, see
// https://tools.ietf.org/html/rfc3501#section-6.4.2
if let Some(ref mut session) = self.session.lock().unwrap().0 {
session.close().expect("failed to expunge");
match session.close() {
Ok(_) => {}
Err(err) => {
eprintln!("failed to close session: {:?}", err);
}
}
} else {
return 0;
}
self.config.write().unwrap().selected_folder_needs_expunge = true;
}
}
// select new folder
if let Some(folder) = folder {
if let Some(ref folder) = folder {
if let Some(ref mut session) = self.session.lock().unwrap().0 {
match session.select(folder) {
Ok(mailbox) => {
self.config.write().unwrap().selected_mailbox = Some(mailbox);
let mut config = self.config.write().unwrap();
config.selected_folder = Some(folder.as_ref().to_string());
config.selected_mailbox = Some(mailbox);
}
Err(err) => {
eprintln!("select error: {:?}", err);
info!(context, 0, "Cannot select folder.");
self.config.write().unwrap().selected_folder = None;
info!(
context,
0,
format!("Cannot select folder: {}; {:?}.", folder.as_ref(), err)
);
let mut config = self.config.write().unwrap();
config.selected_folder = None;
config.should_reconnect = true;
return 0;
}
}
} else {
@@ -655,6 +748,7 @@ impl Imap {
return 0;
}
// compare last seen UIDVALIDITY against the current one
let (mut uid_validity, mut last_seen_uid) = self.get_config_last_seen_uid(context, &folder);
let config = self.config.read().unwrap();
@@ -696,6 +790,8 @@ impl Imap {
match session.fetch(set, PREFETCH_FLAGS) {
Ok(list) => list,
Err(err) => {
self.config.write().unwrap().should_reconnect = true;
eprintln!("fetch error: {:?}", err);
info!(
context,
@@ -750,7 +846,6 @@ impl Imap {
};
// go through all mails in folder (this is typically _fast_ as we already have the whole list)
for msg in &list {
let cur_uid = msg.uid.unwrap_or_else(|| 0);
if cur_uid > last_seen_uid {
@@ -857,22 +952,27 @@ impl Imap {
let mut retry_later = false;
let set = format!("{}", server_uid);
let msgs = if let Some(ref mut session) = self.session.lock().unwrap().0 {
let set = format!("{}", server_uid);
match session.uid_fetch(set, BODY_FLAGS) {
Ok(msgs) => msgs,
Err(err) => {
eprintln!("error fetch single: {:?}", err);
self.config.write().unwrap().should_reconnect = true;
warn!(
context,
0,
"Error on fetching message #%i from folder \"%s\"; retry=%i.",
format!(
"Error on fetching message #%i from folder \"%s\"; retry=%i; error={}.",
err
),
server_uid as libc::c_int,
CString::new(folder.as_ref().to_owned()).unwrap().as_ptr(),
self.should_reconnect() as libc::c_int,
);
if self.should_reconnect() {
// maybe we should also retry on other errors, however, we should check this carefully, as this may result in a dead lock!
retry_later = true;
}
@@ -940,10 +1040,7 @@ impl Imap {
return self.fake_idle(context);
}
// TODO: reconnect in all methods that need it
if !self.is_connected() {
return;
}
self.setup_handle_if_needed(context);
let watch_folder = self.config.read().unwrap().watch_folder.clone();
if self.select_folder(context, watch_folder.as_ref()) == 0 {
@@ -952,46 +1049,105 @@ impl Imap {
return self.fake_idle(context);
}
let mut session = self.session.lock().unwrap().0.take().unwrap();
let mut idle = match session.idle() {
Ok(idle) => idle,
Err(err) => {
eprintln!("imap idle error: {:?}", err);
warn!(context, 0, "IMAP-IDLE: Cannot start.",);
let session = self.session.clone();
let mut worker = Some({
let (sender, receiver) = std::sync::mpsc::channel();
let v = self.watch.clone();
return self.fake_idle(context);
std::thread::spawn(move || {
let &(ref lock, ref cvar) = &*v;
if let Some(ref mut session) = session.lock().unwrap().0 {
let mut idle = match session.idle() {
Ok(idle) => idle,
Err(err) => {
panic!("failed to setup idle: {:?}", err);
}
};
// most servers do not allow more than ~28 minutes; stay clearly below that.
// a good value that is also used by other MUAs is 23 minutes.
// if needed, the ui can call dc_imap_interrupt_idle() to trigger a reconnect.
idle.set_keepalive(Duration::from_secs(23 * 60));
let res = idle.wait_keepalive();
// Ignoring the error, as this happens when we try sending after the drop
let _send_res = sender.send(res);
// Trigger condvar
let mut watch = lock.lock().unwrap();
*watch = true;
cvar.notify_one();
}
});
receiver
});
let &(ref lock, ref cvar) = &*self.watch.clone();
let mut watch = lock.lock().unwrap();
let handle_res = |res| {
info!(context, 0, format!("IMAP-IDLE done: {:?}", res));
match res {
Ok(()) => {
info!(context, 0, "IMAP-IDLE has data.");
}
Err(err) => match err {
imap::error::Error::ConnectionLost => {
info!(
context,
0, "IMAP-IDLE wait cancelled, we will reconnect soon."
);
self.config.write().unwrap().should_reconnect = true;
}
_ => {
warn!(
context,
0,
format!("IMAP-IDLE returns unknown value: {:?}", err)
);
}
},
}
};
// most servers do not allow more than ~28 minutes; stay clearly below that.
// a good value that is also used by other MUAs is 23 minutes.
// if needed, the ui can call dc_imap_interrupt_idle() to trigger a reconnect.
idle.set_keepalive(Duration::from_secs(23 * 60));
loop {
if let Ok(res) = worker.as_ref().unwrap().try_recv() {
handle_res(res);
break;
} else {
let res = cvar.wait(watch).unwrap();
watch = res;
if *watch {
if let Ok(res) = worker.as_ref().unwrap().try_recv() {
handle_res(res);
} else {
info!(context, 0, "IMAP-IDLE interrupted");
}
// TODO: proper logging of different states
// TODO: reconnect if we timed out
match idle.wait_keepalive() {
Ok(_) => {}
Err(err) => {
eprintln!("idle error: {:?}", err);
drop(worker.take());
break;
}
}
}
// put session back
self.session.lock().unwrap().0 = Some(session);
*watch = false;
}
fn fake_idle(&self, context: &dc_context_t) {
// Idle using timeouts. This is also needed if we're not yet configured -
// in this case, we're waiting for a configure job
let fake_idle_start_time = SystemTime::now();
let mut wait_long = false;
info!(context, 0, "IMAP-fake-IDLEing...");
let mut do_fake_idle = true;
while do_fake_idle {
// wait a moment: every 5 seconds in the first 3 minutes after a new message, after that every 60 seconds.
let seconds_to_wait =
if fake_idle_start_time.elapsed().unwrap() < Duration::new(3 * 60, 0) {
if fake_idle_start_time.elapsed().unwrap() < Duration::new(3 * 60, 0) && !wait_long
{
Duration::new(5, 0)
} else {
Duration::new(60, 0)
@@ -1017,29 +1173,26 @@ impl Imap {
return;
}
// TODO: connect if needed
if let Some(ref watch_folder) = self.config.read().unwrap().watch_folder {
if 0 != self.fetch_from_single_folder(context, watch_folder) {
do_fake_idle = false;
// check for new messages. fetch_from_single_folder() has the side-effect that messages
// are also downloaded, however, typically this would take place in the FETCH command
// following IDLE otherwise, so this seems okay here.
if self.setup_handle_if_needed(context) != 0 {
if let Some(ref watch_folder) = self.config.read().unwrap().watch_folder {
if 0 != self.fetch_from_single_folder(context, watch_folder) {
do_fake_idle = false;
}
}
} else {
// if we cannot connect, set the starting time to a small value which will
// result in larger timeouts (60 instead of 5 seconds) for re-checking the availablility of network.
// to get the _exact_ moment of re-available network, the ui should call interrupt_idle()
wait_long = true;
}
}
}
pub fn interrupt_idle(&self) {
// only kill the connection, if we are actually ideling
if self.session.lock().unwrap().0.is_none() {
if let Some(ref mut stream) = self.session.lock().unwrap().1 {
match stream.shutdown(net::Shutdown::Both) {
Ok(_) => {}
Err(err) => {
eprintln!("failed to disconnect: {}", err);
}
}
}
}
// interrupt fake idle
// interrupt idle
let &(ref lock, ref cvar) = &*self.watch.clone();
let mut watch = lock.lock().unwrap();