mirror of
https://github.com/chatmail/core.git
synced 2026-05-07 08:56:30 +03:00
make setup_handle_if_needed async, call it ahead of select_folder and fetch_new_messages (renamed from fetch_from_single_folder), and be more eager triggering reconnect on error conditions
This commit is contained in:
210
src/imap/mod.rs
210
src/imap/mod.rs
@@ -201,112 +201,107 @@ impl Imap {
|
|||||||
self.should_reconnect.store(true, Ordering::Relaxed)
|
self.should_reconnect.store(true, Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn setup_handle_if_needed(&self, context: &Context) -> Result<()> {
|
async fn setup_handle_if_needed(&self, context: &Context) -> Result<()> {
|
||||||
task::block_on(async move {
|
if self.config.read().await.imap_server.is_empty() {
|
||||||
if self.config.read().await.imap_server.is_empty() {
|
return Err(Error::InTeardown);
|
||||||
return Err(Error::InTeardown);
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if self.should_reconnect() {
|
if self.should_reconnect() {
|
||||||
self.unsetup_handle(context).await;
|
self.unsetup_handle(context).await;
|
||||||
self.should_reconnect.store(false, Ordering::Relaxed);
|
self.should_reconnect.store(false, Ordering::Relaxed);
|
||||||
} else if self.is_connected().await {
|
} else if self.is_connected().await {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let server_flags = self.config.read().await.server_flags as i32;
|
let server_flags = self.config.read().await.server_flags as i32;
|
||||||
|
|
||||||
let connection_res: ImapResult<Client> =
|
let connection_res: ImapResult<Client> =
|
||||||
if (server_flags & (DC_LP_IMAP_SOCKET_STARTTLS | DC_LP_IMAP_SOCKET_PLAIN)) != 0 {
|
if (server_flags & (DC_LP_IMAP_SOCKET_STARTTLS | DC_LP_IMAP_SOCKET_PLAIN)) != 0 {
|
||||||
let config = self.config.read().await;
|
let config = self.config.read().await;
|
||||||
let imap_server: &str = config.imap_server.as_ref();
|
let imap_server: &str = config.imap_server.as_ref();
|
||||||
let imap_port = config.imap_port;
|
let imap_port = config.imap_port;
|
||||||
|
|
||||||
match Client::connect_insecure((imap_server, imap_port)).await {
|
match Client::connect_insecure((imap_server, imap_port)).await {
|
||||||
Ok(client) => {
|
Ok(client) => {
|
||||||
if (server_flags & DC_LP_IMAP_SOCKET_STARTTLS) != 0 {
|
if (server_flags & DC_LP_IMAP_SOCKET_STARTTLS) != 0 {
|
||||||
client.secure(imap_server, config.certificate_checks).await
|
client.secure(imap_server, config.certificate_checks).await
|
||||||
} else {
|
|
||||||
Ok(client)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(err) => Err(err),
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
let config = self.config.read().await;
|
|
||||||
let imap_server: &str = config.imap_server.as_ref();
|
|
||||||
let imap_port = config.imap_port;
|
|
||||||
|
|
||||||
Client::connect_secure(
|
|
||||||
(imap_server, imap_port),
|
|
||||||
imap_server,
|
|
||||||
config.certificate_checks,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
};
|
|
||||||
|
|
||||||
let login_res = match connection_res {
|
|
||||||
Ok(client) => {
|
|
||||||
let config = self.config.read().await;
|
|
||||||
let imap_user: &str = config.imap_user.as_ref();
|
|
||||||
let imap_pw: &str = config.imap_pw.as_ref();
|
|
||||||
|
|
||||||
if (server_flags & DC_LP_AUTH_OAUTH2) != 0 {
|
|
||||||
let addr: &str = config.addr.as_ref();
|
|
||||||
|
|
||||||
if let Some(token) =
|
|
||||||
dc_get_oauth2_access_token(context, addr, imap_pw, true)
|
|
||||||
{
|
|
||||||
let auth = OAuth2 {
|
|
||||||
user: imap_user.into(),
|
|
||||||
access_token: token,
|
|
||||||
};
|
|
||||||
client.authenticate("XOAUTH2", &auth).await
|
|
||||||
} else {
|
} else {
|
||||||
return Err(Error::OauthError);
|
Ok(client)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
client.login(imap_user, imap_pw).await
|
|
||||||
}
|
}
|
||||||
|
Err(err) => Err(err),
|
||||||
}
|
}
|
||||||
Err(err) => {
|
} else {
|
||||||
let message = {
|
let config = self.config.read().await;
|
||||||
let config = self.config.read().await;
|
let imap_server: &str = config.imap_server.as_ref();
|
||||||
let imap_server: &str = config.imap_server.as_ref();
|
let imap_port = config.imap_port;
|
||||||
let imap_port = config.imap_port;
|
|
||||||
context.stock_string_repl_str2(
|
Client::connect_secure(
|
||||||
StockMessage::ServerResponse,
|
(imap_server, imap_port),
|
||||||
format!("{}:{}", imap_server, imap_port),
|
imap_server,
|
||||||
err.to_string(),
|
config.certificate_checks,
|
||||||
)
|
)
|
||||||
};
|
.await
|
||||||
// IMAP connection failures are reported to users
|
|
||||||
emit_event!(context, Event::ErrorNetwork(message));
|
|
||||||
return Err(Error::ConnectionFailed(err.to_string()));
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
self.should_reconnect.store(false, Ordering::Relaxed);
|
let login_res = match connection_res {
|
||||||
|
Ok(client) => {
|
||||||
|
let config = self.config.read().await;
|
||||||
|
let imap_user: &str = config.imap_user.as_ref();
|
||||||
|
let imap_pw: &str = config.imap_pw.as_ref();
|
||||||
|
|
||||||
match login_res {
|
if (server_flags & DC_LP_AUTH_OAUTH2) != 0 {
|
||||||
Ok(session) => {
|
let addr: &str = config.addr.as_ref();
|
||||||
*self.session.lock().await = Some(session);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err((err, _)) => {
|
|
||||||
let imap_user = self.config.read().await.imap_user.to_owned();
|
|
||||||
let message =
|
|
||||||
context.stock_string_repl_str(StockMessage::CannotLogin, &imap_user);
|
|
||||||
|
|
||||||
emit_event!(
|
if let Some(token) = dc_get_oauth2_access_token(context, addr, imap_pw, true) {
|
||||||
context,
|
let auth = OAuth2 {
|
||||||
Event::ErrorNetwork(format!("{} ({})", message, err))
|
user: imap_user.into(),
|
||||||
);
|
access_token: token,
|
||||||
self.trigger_reconnect();
|
};
|
||||||
Err(Error::LoginFailed(format!("cannot login as {}", imap_user)))
|
client.authenticate("XOAUTH2", &auth).await
|
||||||
|
} else {
|
||||||
|
return Err(Error::OauthError);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
client.login(imap_user, imap_pw).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
Err(err) => {
|
||||||
|
let message = {
|
||||||
|
let config = self.config.read().await;
|
||||||
|
let imap_server: &str = config.imap_server.as_ref();
|
||||||
|
let imap_port = config.imap_port;
|
||||||
|
context.stock_string_repl_str2(
|
||||||
|
StockMessage::ServerResponse,
|
||||||
|
format!("{}:{}", imap_server, imap_port),
|
||||||
|
err.to_string(),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
// IMAP connection failures are reported to users
|
||||||
|
emit_event!(context, Event::ErrorNetwork(message));
|
||||||
|
return Err(Error::ConnectionFailed(err.to_string()));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
self.should_reconnect.store(false, Ordering::Relaxed);
|
||||||
|
|
||||||
|
match login_res {
|
||||||
|
Ok(session) => {
|
||||||
|
*self.session.lock().await = Some(session);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err((err, _)) => {
|
||||||
|
let imap_user = self.config.read().await.imap_user.to_owned();
|
||||||
|
let message = context.stock_string_repl_str(StockMessage::CannotLogin, &imap_user);
|
||||||
|
|
||||||
|
emit_event!(
|
||||||
|
context,
|
||||||
|
Event::ErrorNetwork(format!("{} ({})", message, err))
|
||||||
|
);
|
||||||
|
self.trigger_reconnect();
|
||||||
|
Err(Error::LoginFailed(format!("cannot login as {}", imap_user)))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn unsetup_handle(&self, context: &Context) {
|
async fn unsetup_handle(&self, context: &Context) {
|
||||||
@@ -387,7 +382,7 @@ impl Imap {
|
|||||||
config.server_flags = server_flags;
|
config.server_flags = server_flags;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Err(err) = self.setup_handle_if_needed(context) {
|
if let Err(err) = self.setup_handle_if_needed(context).await {
|
||||||
warn!(context, "failed to setup imap handle: {}", err);
|
warn!(context, "failed to setup imap handle: {}", err);
|
||||||
self.free_connect_params().await;
|
self.free_connect_params().await;
|
||||||
return false;
|
return false;
|
||||||
@@ -449,10 +444,9 @@ impl Imap {
|
|||||||
// probably shutdown
|
// probably shutdown
|
||||||
return Err(Error::InTeardown);
|
return Err(Error::InTeardown);
|
||||||
}
|
}
|
||||||
while self
|
self.setup_handle_if_needed(context).await?;
|
||||||
.fetch_from_single_folder(context, &watch_folder)
|
|
||||||
.await?
|
while self.fetch_new_messages(context, &watch_folder).await? {
|
||||||
{
|
|
||||||
// We fetch until no more new messages are there.
|
// We fetch until no more new messages are there.
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -560,7 +554,7 @@ impl Imap {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_from_single_folder<S: AsRef<str>>(
|
async fn fetch_new_messages<S: AsRef<str>>(
|
||||||
&self,
|
&self,
|
||||||
context: &Context,
|
context: &Context,
|
||||||
folder: S,
|
folder: S,
|
||||||
@@ -593,9 +587,11 @@ impl Imap {
|
|||||||
for msg in &list {
|
for msg in &list {
|
||||||
let cur_uid = msg.uid.unwrap_or_default();
|
let cur_uid = msg.uid.unwrap_or_default();
|
||||||
if cur_uid <= last_seen_uid {
|
if cur_uid <= last_seen_uid {
|
||||||
warn!(
|
// seems that at least dovecot sends the last available UID
|
||||||
|
// even if we asked for higher UID+N:*
|
||||||
|
info!(
|
||||||
context,
|
context,
|
||||||
"unexpected uid {}, last seen was {}", cur_uid, last_seen_uid
|
"fetch_new_messages: ignoring uid {}, last seen was {}", cur_uid, last_seen_uid
|
||||||
);
|
);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -740,7 +736,7 @@ impl Imap {
|
|||||||
return Err(Error::IdleAbilityMissing);
|
return Err(Error::IdleAbilityMissing);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.setup_handle_if_needed(context)?;
|
self.setup_handle_if_needed(context).await?;
|
||||||
|
|
||||||
self.select_folder(context, watch_folder.clone()).await?;
|
self.select_folder(context, watch_folder.clone()).await?;
|
||||||
|
|
||||||
@@ -886,9 +882,9 @@ impl Imap {
|
|||||||
// will not find any new.
|
// will not find any new.
|
||||||
|
|
||||||
if let Some(ref watch_folder) = watch_folder {
|
if let Some(ref watch_folder) = watch_folder {
|
||||||
match self.fetch_from_single_folder(context, watch_folder).await {
|
match self.fetch_new_messages(context, watch_folder).await {
|
||||||
Ok(res) => {
|
Ok(res) => {
|
||||||
info!(context, "fetch_from_single_folder returned {:?}", res);
|
info!(context, "fetch_new_messages returned {:?}", res);
|
||||||
if res {
|
if res {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -1322,13 +1318,17 @@ impl Imap {
|
|||||||
task::block_on(async move {
|
task::block_on(async move {
|
||||||
info!(context, "emptying folder {}", folder);
|
info!(context, "emptying folder {}", folder);
|
||||||
|
|
||||||
|
// we want to report all error to the user
|
||||||
|
// (no retry should be attempted)
|
||||||
if folder.is_empty() {
|
if folder.is_empty() {
|
||||||
error!(context, "cannot perform empty, folder not set");
|
error!(context, "cannot perform empty, folder not set");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if let Err(err) = self.setup_handle_if_needed(context).await {
|
||||||
|
error!(context, "could not setup imap connection: {:?}", err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
if let Err(err) = self.select_folder(context, Some(&folder)).await {
|
if let Err(err) = self.select_folder(context, Some(&folder)).await {
|
||||||
// we want to report all error to the user
|
|
||||||
// (no retry should be attempted)
|
|
||||||
error!(
|
error!(
|
||||||
context,
|
context,
|
||||||
"Could not select {} for expunging: {:?}", folder, err
|
"Could not select {} for expunging: {:?}", folder, err
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ impl Imap {
|
|||||||
let mut cfg = self.config.write().await;
|
let mut cfg = self.config.write().await;
|
||||||
cfg.selected_folder = None;
|
cfg.selected_folder = None;
|
||||||
cfg.selected_folder_needs_expunge = false;
|
cfg.selected_folder_needs_expunge = false;
|
||||||
|
self.trigger_reconnect();
|
||||||
return Err(Error::NoSession);
|
return Err(Error::NoSession);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -61,6 +62,7 @@ impl Imap {
|
|||||||
info!(context, "close/expunge succeeded");
|
info!(context, "close/expunge succeeded");
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
self.trigger_reconnect();
|
||||||
return Err(Error::CloseExpungeFailed(err));
|
return Err(Error::CloseExpungeFailed(err));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user