mirror of
https://github.com/chatmail/core.git
synced 2026-05-05 06:16:30 +03:00
resultify fetch and simplify fake_idle
This commit is contained in:
291
src/imap.rs
291
src/imap.rs
@@ -10,11 +10,10 @@ use async_std::prelude::*;
|
||||
use async_std::sync::{Arc, Mutex, RwLock};
|
||||
use async_std::task;
|
||||
|
||||
use crate::configure::dc_connect_to_configured_imap;
|
||||
use crate::constants::*;
|
||||
use crate::context::Context;
|
||||
use crate::dc_receive_imf::dc_receive_imf;
|
||||
use crate::error::Error;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::events::Event;
|
||||
use crate::imap_client::*;
|
||||
use crate::job::{job_add, Action};
|
||||
@@ -35,12 +34,6 @@ pub enum ImapActionResult {
|
||||
Success,
|
||||
}
|
||||
|
||||
#[derive(Debug, Display, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum IdlePollMode {
|
||||
Often,
|
||||
Never,
|
||||
}
|
||||
|
||||
const PREFETCH_FLAGS: &str = "(UID ENVELOPE)";
|
||||
const BODY_FLAGS: &str = "(FLAGS BODY.PEEK[])";
|
||||
const SELECT_ALL: &str = "1:*";
|
||||
@@ -95,7 +88,6 @@ struct ImapConfig {
|
||||
pub can_idle: bool,
|
||||
pub has_xlist: bool,
|
||||
pub imap_delimiter: char,
|
||||
pub watch_folder: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for ImapConfig {
|
||||
@@ -114,7 +106,6 @@ impl Default for ImapConfig {
|
||||
can_idle: false,
|
||||
has_xlist: false,
|
||||
imap_delimiter: '.',
|
||||
watch_folder: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -143,17 +134,17 @@ impl Imap {
|
||||
self.should_reconnect.store(true, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn setup_handle_if_needed(&self, context: &Context) -> bool {
|
||||
fn setup_handle_if_needed(&self, context: &Context) -> Result<()> {
|
||||
task::block_on(async move {
|
||||
if self.config.read().await.imap_server.is_empty() {
|
||||
return false;
|
||||
return Err(Error::ImapInTeardown);
|
||||
}
|
||||
|
||||
if self.should_reconnect() {
|
||||
self.unsetup_handle(context).await;
|
||||
self.should_reconnect.store(false, Ordering::Relaxed);
|
||||
} else if self.is_connected().await {
|
||||
return true;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let server_flags = self.config.read().await.server_flags as i32;
|
||||
@@ -181,13 +172,12 @@ impl Imap {
|
||||
let imap_server: &str = config.imap_server.as_ref();
|
||||
let imap_port = config.imap_port;
|
||||
|
||||
let res = Client::connect_secure(
|
||||
Client::connect_secure(
|
||||
(imap_server, imap_port),
|
||||
imap_server,
|
||||
config.certificate_checks,
|
||||
)
|
||||
.await;
|
||||
res
|
||||
.await
|
||||
};
|
||||
|
||||
let login_res = match connection_res {
|
||||
@@ -209,7 +199,7 @@ impl Imap {
|
||||
let res = client.authenticate("XOAUTH2", &auth).await;
|
||||
res
|
||||
} else {
|
||||
return false;
|
||||
return Err(Error::ImapOauthError);
|
||||
}
|
||||
} else {
|
||||
let res = client.login(imap_user, imap_pw).await;
|
||||
@@ -217,18 +207,19 @@ impl Imap {
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
let config = self.config.read().await;
|
||||
let imap_server: &str = config.imap_server.as_ref();
|
||||
let imap_port = config.imap_port;
|
||||
let message = context.stock_string_repl_str2(
|
||||
StockMessage::ServerResponse,
|
||||
format!("{}:{}", imap_server, imap_port),
|
||||
format!("{}", err),
|
||||
);
|
||||
|
||||
let message = {
|
||||
let config = self.config.read().await;
|
||||
let imap_server: &str = config.imap_server.as_ref();
|
||||
let imap_port = config.imap_port;
|
||||
context.stock_string_repl_str2(
|
||||
StockMessage::ServerResponse,
|
||||
format!("{}:{}", imap_server, imap_port),
|
||||
format!("{}", err),
|
||||
)
|
||||
};
|
||||
// IMAP connection failures are reported to users
|
||||
emit_event!(context, Event::ErrorNetwork(message));
|
||||
|
||||
return false;
|
||||
return Err(Error::ImapConnectionFailed(format!("{}", err)));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -237,7 +228,7 @@ impl Imap {
|
||||
match login_res {
|
||||
Ok(session) => {
|
||||
*self.session.lock().await = Some(session);
|
||||
true
|
||||
Ok(())
|
||||
}
|
||||
Err((err, _)) => {
|
||||
let imap_user = self.config.read().await.imap_user.to_owned();
|
||||
@@ -248,9 +239,11 @@ impl Imap {
|
||||
context,
|
||||
Event::ErrorNetwork(format!("{} ({})", message, err))
|
||||
);
|
||||
self.unsetup_handle(context).await;
|
||||
|
||||
false
|
||||
self.trigger_reconnect();
|
||||
Err(Error::ImapLoginFailed(format!(
|
||||
"cannot login as {}",
|
||||
imap_user
|
||||
)))
|
||||
}
|
||||
}
|
||||
})
|
||||
@@ -266,11 +259,12 @@ impl Imap {
|
||||
warn!(context, "failed to close connection: {:?}", err);
|
||||
}
|
||||
}
|
||||
*self.connected.lock().await = false;
|
||||
|
||||
info!(context, "IMAP unsetup_handle step 3 (clearing config).");
|
||||
self.config.write().await.selected_folder = None;
|
||||
self.config.write().await.selected_mailbox = None;
|
||||
info!(context, "IMAP unsetup_handle step 4 (disconnected).");
|
||||
info!(context, "IMAP unsetup_handle step 4 (disconnected)");
|
||||
}
|
||||
|
||||
async fn free_connect_params(&self) {
|
||||
@@ -284,8 +278,28 @@ impl Imap {
|
||||
|
||||
cfg.can_idle = false;
|
||||
cfg.has_xlist = false;
|
||||
}
|
||||
|
||||
cfg.watch_folder = None;
|
||||
/// Connects to configured account
|
||||
pub fn connect_configured(&self, context: &Context) -> Result<()> {
|
||||
if async_std::task::block_on(async move {
|
||||
self.is_connected().await && !self.should_reconnect()
|
||||
}) {
|
||||
return Ok(());
|
||||
}
|
||||
if !context.sql.get_raw_config_bool(context, "configured") {
|
||||
return Err(Error::ConnectWithoutConfigure);
|
||||
}
|
||||
|
||||
let param = LoginParam::from_database(context, "configured_");
|
||||
// the trailing underscore is correct
|
||||
|
||||
if self.connect(context, ¶m) {
|
||||
return Ok(());
|
||||
}
|
||||
return Err(Error::ImapConnectionFailed(
|
||||
format!("{}", param).to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn connect(&self, context: &Context, lp: &LoginParam) -> bool {
|
||||
@@ -294,10 +308,6 @@ impl Imap {
|
||||
return false;
|
||||
}
|
||||
|
||||
if self.is_connected().await {
|
||||
return true;
|
||||
}
|
||||
|
||||
{
|
||||
let addr = &lp.addr;
|
||||
let imap_server = &lp.mail_server;
|
||||
@@ -316,7 +326,8 @@ impl Imap {
|
||||
config.server_flags = server_flags;
|
||||
}
|
||||
|
||||
if !self.setup_handle_if_needed(context) {
|
||||
if let Err(err) = self.setup_handle_if_needed(context) {
|
||||
warn!(context, "failed to setup imap handle: {}", err);
|
||||
self.free_connect_params().await;
|
||||
return false;
|
||||
}
|
||||
@@ -366,44 +377,26 @@ impl Imap {
|
||||
|
||||
pub fn disconnect(&self, context: &Context) {
|
||||
task::block_on(async move {
|
||||
if self.is_connected().await {
|
||||
self.unsetup_handle(context).await;
|
||||
self.free_connect_params().await;
|
||||
*self.connected.lock().await = false;
|
||||
}
|
||||
self.unsetup_handle(context).await;
|
||||
self.free_connect_params().await;
|
||||
});
|
||||
}
|
||||
|
||||
pub fn set_watch_folder(&self, watch_folder: String) {
|
||||
task::block_on(async move {
|
||||
self.config.write().await.watch_folder = Some(watch_folder);
|
||||
});
|
||||
}
|
||||
|
||||
pub fn fetch(&self, context: &Context) -> bool {
|
||||
pub fn fetch(&self, context: &Context, watch_folder: &str) -> Result<()> {
|
||||
task::block_on(async move {
|
||||
if !context.sql.is_open() {
|
||||
// probably shutdown
|
||||
return false;
|
||||
return Err(Error::ImapInTeardown);
|
||||
}
|
||||
|
||||
self.setup_handle_if_needed(context);
|
||||
|
||||
let watch_folder = self.config.read().await.watch_folder.to_owned();
|
||||
|
||||
if let Some(ref watch_folder) = watch_folder {
|
||||
// as during the fetch commands, new messages may arrive, we fetch until we do not
|
||||
// get any more. if IDLE is called directly after, there is only a small chance that
|
||||
while self
|
||||
.fetch_from_single_folder(context, &watch_folder)
|
||||
.await?
|
||||
{
|
||||
// During the fetch commands new messages may arrive. So we fetch until we do not
|
||||
// get any more. If IDLE is called directly after, there is only a small chance that
|
||||
// messages are missed and delayed until the next IDLE call
|
||||
loop {
|
||||
if self.fetch_from_single_folder(context, watch_folder).await == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -506,15 +499,14 @@ impl Imap {
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_from_single_folder<S: AsRef<str>>(&self, context: &Context, folder: S) -> usize {
|
||||
async fn fetch_from_single_folder<S: AsRef<str>>(
|
||||
&self,
|
||||
context: &Context,
|
||||
folder: S,
|
||||
) -> Result<bool> {
|
||||
match self.select_folder(context, Some(&folder)).await {
|
||||
ImapActionResult::Failed | ImapActionResult::RetryLater => {
|
||||
warn!(
|
||||
context,
|
||||
"Cannot select folder \"{}\" for fetching.",
|
||||
folder.as_ref()
|
||||
);
|
||||
return 0;
|
||||
bail!("Cannot select folder {:?} for fetching.", folder.as_ref());
|
||||
}
|
||||
ImapActionResult::Success | ImapActionResult::AlreadyDone => {}
|
||||
}
|
||||
@@ -525,23 +517,20 @@ impl Imap {
|
||||
let config = self.config.read().await;
|
||||
let mailbox = config.selected_mailbox.as_ref().expect("just selected");
|
||||
|
||||
if mailbox.uid_validity.is_none() {
|
||||
error!(
|
||||
context,
|
||||
"Cannot get UIDVALIDITY for folder \"{}\".",
|
||||
folder.as_ref(),
|
||||
);
|
||||
ensure!(
|
||||
mailbox.uid_validity.is_some(),
|
||||
"Cannot get UIDVALIDITY for folder {:?}",
|
||||
folder.as_ref()
|
||||
);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
if mailbox.uid_validity.unwrap() != uid_validity {
|
||||
let new_uid_validity = mailbox.uid_validity.unwrap();
|
||||
if new_uid_validity != uid_validity {
|
||||
// First time this folder is selected or UIDVALIDITY has changed.
|
||||
// Init lastseenuid and save it to config.
|
||||
info!(
|
||||
context,
|
||||
"uid_validity={} local uid_validity={} lastseenuid={}",
|
||||
mailbox.uid_validity.unwrap(),
|
||||
"new_uid_validity={} current local uid_validity={} lastseenuid={}",
|
||||
new_uid_validity,
|
||||
uid_validity,
|
||||
last_seen_uid
|
||||
);
|
||||
@@ -553,13 +542,8 @@ impl Imap {
|
||||
// id we do not do this here, we'll miss the first message
|
||||
// as we will get in here again and fetch from lastseenuid+1 then
|
||||
|
||||
self.set_config_last_seen_uid(
|
||||
context,
|
||||
&folder,
|
||||
mailbox.uid_validity.unwrap_or_default(),
|
||||
0,
|
||||
);
|
||||
return 0;
|
||||
self.set_config_last_seen_uid(context, &folder, new_uid_validity, 0);
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let list = if let Some(ref mut session) = &mut *self.session.lock().await {
|
||||
@@ -567,19 +551,12 @@ impl Imap {
|
||||
let set = format!("{}", mailbox.exists);
|
||||
match session.fetch(set, PREFETCH_FLAGS).await {
|
||||
Ok(list) => list,
|
||||
Err(_err) => {
|
||||
self.trigger_reconnect();
|
||||
info!(
|
||||
context,
|
||||
"No result returned for folder \"{}\".",
|
||||
folder.as_ref()
|
||||
);
|
||||
|
||||
return 0;
|
||||
Err(err) => {
|
||||
bail!("fetch failed: {}", err);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return 0;
|
||||
return Err(Error::ImapNoConnection);
|
||||
};
|
||||
|
||||
last_seen_uid = list[0].uid.unwrap_or_else(|| 0);
|
||||
@@ -589,7 +566,7 @@ impl Imap {
|
||||
last_seen_uid -= 1;
|
||||
}
|
||||
|
||||
uid_validity = mailbox.uid_validity.unwrap_or_default();
|
||||
uid_validity = new_uid_validity;
|
||||
self.set_config_last_seen_uid(context, &folder, uid_validity, last_seen_uid);
|
||||
info!(
|
||||
context,
|
||||
@@ -611,12 +588,11 @@ impl Imap {
|
||||
match session.uid_fetch(set, PREFETCH_FLAGS).await {
|
||||
Ok(list) => list,
|
||||
Err(err) => {
|
||||
warn!(context, "failed to fetch uids: {}", err);
|
||||
return 0;
|
||||
bail!("uid_fetch failed: {}", err);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return 0;
|
||||
return Err(Error::ImapNoConnection);
|
||||
};
|
||||
|
||||
// go through all mails in folder (this is typically _fast_ as we already have the whole list)
|
||||
@@ -677,7 +653,7 @@ impl Imap {
|
||||
);
|
||||
}
|
||||
|
||||
read_cnt
|
||||
Ok(read_cnt > 0)
|
||||
}
|
||||
|
||||
fn set_config_last_seen_uid<S: AsRef<str>>(
|
||||
@@ -764,26 +740,19 @@ impl Imap {
|
||||
1
|
||||
}
|
||||
|
||||
pub fn idle(&self, context: &Context) -> Result<(), Error> {
|
||||
pub fn idle(&self, context: &Context, watch_folder: Option<String>) -> Result<()> {
|
||||
task::block_on(async move {
|
||||
if !self.config.read().await.can_idle {
|
||||
return Err(Error::ImapMissesIdle);
|
||||
}
|
||||
|
||||
self.setup_handle_if_needed(context);
|
||||
self.setup_handle_if_needed(context)?;
|
||||
|
||||
let watch_folder = self.config.read().await.watch_folder.clone();
|
||||
if watch_folder.is_none() {
|
||||
return Err(Error::ImapInTeardown);
|
||||
}
|
||||
match self.select_folder(context, watch_folder.as_ref()).await {
|
||||
match self.select_folder(context, watch_folder.clone()).await {
|
||||
ImapActionResult::Success | ImapActionResult::AlreadyDone => {}
|
||||
|
||||
ImapActionResult::Failed | ImapActionResult::RetryLater => {
|
||||
return Err(Error::ImapSelectFailed(format!(
|
||||
"{:?}",
|
||||
watch_folder.as_ref()
|
||||
)));
|
||||
return Err(Error::ImapSelectFailed(format!("{:?}", watch_folder)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -811,16 +780,16 @@ impl Imap {
|
||||
info!(context, "Idle entering wait-on-remote state");
|
||||
match idle_wait.await {
|
||||
IdleResponse::NewData(_) => {
|
||||
info!(context, "Idle finished with NewData");
|
||||
info!(context, "Idle has NewData");
|
||||
}
|
||||
// TODO: idle_wait does not distinguish manual interrupts
|
||||
// from Timeouts if we would know it's a Timeout we could bail
|
||||
// directly and reconnect .
|
||||
IdleResponse::Timeout => {
|
||||
warn!(context, "Idle wait timed out");
|
||||
return Err(Error::ImapIdleProtocolFailed(
|
||||
"timeout".to_string(),
|
||||
));
|
||||
info!(context, "Idle-wait timeout or interruption");
|
||||
}
|
||||
IdleResponse::ManualInterrupt => {
|
||||
warn!(context, "Idle wait was interrupted");
|
||||
info!(context, "Idle wait was interrupted");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -853,8 +822,20 @@ impl Imap {
|
||||
info!(context, "Idle wait was skipped");
|
||||
} else {
|
||||
info!(context, "Idle entering wait-on-remote state");
|
||||
let res = idle_wait.await;
|
||||
info!(context, "Idle finished wait-on-remote: {:?}", res);
|
||||
match idle_wait.await {
|
||||
IdleResponse::NewData(_) => {
|
||||
info!(context, "Idle has NewData");
|
||||
}
|
||||
// TODO: idle_wait does not distinguish manual interrupts
|
||||
// from Timeouts if we would know it's a Timeout we could bail
|
||||
// directly and reconnect .
|
||||
IdleResponse::Timeout => {
|
||||
info!(context, "Idle-wait timeout or interruption");
|
||||
}
|
||||
IdleResponse::ManualInterrupt => {
|
||||
info!(context, "Idle wait was interrupted");
|
||||
}
|
||||
}
|
||||
}
|
||||
match handle.done().await {
|
||||
Ok(session) => {
|
||||
@@ -876,7 +857,7 @@ impl Imap {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn fake_idle(&self, context: &Context, poll_mode: IdlePollMode) {
|
||||
pub(crate) fn fake_idle(&self, context: &Context, watch_folder: Option<String>) {
|
||||
// Idle using polling.
|
||||
task::block_on(async move {
|
||||
let fake_idle_start_time = SystemTime::now();
|
||||
@@ -885,45 +866,43 @@ impl Imap {
|
||||
|
||||
let interrupt = stop_token::StopSource::new();
|
||||
|
||||
// we use 1000 minutes if we are told to not try network
|
||||
// which can happen because the watch_folder is not defined
|
||||
// but clients are still calling us in a loop.
|
||||
// if we are to use network, we check every minute if there
|
||||
// is new mail -- TODO: make this more flexible
|
||||
let secs = match poll_mode {
|
||||
IdlePollMode::Never => 60000,
|
||||
IdlePollMode::Often => 60,
|
||||
};
|
||||
let interval = async_std::stream::interval(Duration::from_secs(secs));
|
||||
// check every minute if there are new messages
|
||||
// TODO: grow sleep durations / make them more flexible
|
||||
let interval = async_std::stream::interval(Duration::from_secs(60));
|
||||
let mut interrupt_interval = interrupt.stop_token().stop_stream(interval);
|
||||
*self.interrupt.lock().await = Some(interrupt);
|
||||
|
||||
// loop until we are interrupted or if we fetched something
|
||||
while let Some(_) = interrupt_interval.next().await {
|
||||
if poll_mode == IdlePollMode::Never {
|
||||
// try to connect with proper login params
|
||||
// (setup_handle_if_needed might not know about them if we
|
||||
// never successfully connected)
|
||||
if let Err(err) = self.connect_configured(context) {
|
||||
warn!(context, "fake_idle: could not connect: {}", err);
|
||||
continue;
|
||||
}
|
||||
if !self.is_connected().await {
|
||||
// try to connect with proper login params
|
||||
// (setup_handle_if_needed might not know about them if we
|
||||
// never successfully connected)
|
||||
match dc_connect_to_configured_imap(context, &self) {
|
||||
Ok(()) => {}
|
||||
Err(err) => {
|
||||
warn!(context, "fake_idle: could not connect: {}", err);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if self.config.read().await.can_idle {
|
||||
// we only fake-idled because network was gone during IDLE, probably
|
||||
break;
|
||||
}
|
||||
info!(context, "fake_idle is connected");
|
||||
// we are connected, let's see if fetching messages results
|
||||
// in anything. If so, we behave as if IDLE had data but
|
||||
// will have already fetched the messages so perform_*_fetch
|
||||
// will not find any new.
|
||||
|
||||
let watch_folder = self.config.read().await.watch_folder.clone();
|
||||
if let Some(watch_folder) = watch_folder {
|
||||
if 0 != self.fetch_from_single_folder(context, watch_folder).await {
|
||||
break;
|
||||
if let Some(ref watch_folder) = watch_folder {
|
||||
match self.fetch_from_single_folder(context, watch_folder).await {
|
||||
Ok(res) => {
|
||||
info!(context, "fetch_from_single_folder returned {:?}", res);
|
||||
if res {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!(context, "could not fetch from folder: {}", err);
|
||||
self.trigger_reconnect()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1434,7 +1413,7 @@ fn precheck_imf(context: &Context, rfc724_mid: &str, server_folder: &str, server
|
||||
}
|
||||
}
|
||||
|
||||
fn prefetch_get_message_id(prefetch_msg: &Fetch) -> Result<String, Error> {
|
||||
fn prefetch_get_message_id(prefetch_msg: &Fetch) -> Result<String> {
|
||||
let message_id = prefetch_msg.envelope().unwrap().message_id.unwrap();
|
||||
wrapmime::parse_message_id(&message_id)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user