fix imap fetch loop and watch folders

This commit is contained in:
dignifiedquire
2020-03-21 16:26:27 +01:00
parent 1846f20f6e
commit 8a7eaba668
4 changed files with 146 additions and 151 deletions

View File

@@ -269,9 +269,13 @@ async fn start(args: Vec<String>) -> Result<(), failure::Error> {
let ctx = context.clone(); let ctx = context.clone();
std::thread::spawn(move || loop { std::thread::spawn(move || loop {
if ctx.has_next_event() {
if let Ok(event) = ctx.get_next_event() { if let Ok(event) = ctx.get_next_event() {
receive_event(event); receive_event(event);
} }
} else {
std::thread::sleep(std::time::Duration::from_millis(50));
}
}); });
println!("Delta Chat Core is awaiting your commands."); println!("Delta Chat Core is awaiting your commands.");

View File

@@ -423,9 +423,7 @@ async fn exec_step(
// "configured_" prefix; also write the "configured"-flag */ // "configured_" prefix; also write the "configured"-flag */
// the trailing underscore is correct // the trailing underscore is correct
param.save_to_database(ctx, "configured_").await?; param.save_to_database(ctx, "configured_").await?;
println!("storing configured val");
ctx.sql.set_raw_config_bool(ctx, "configured", true).await?; ctx.sql.set_raw_config_bool(ctx, "configured", true).await?;
println!("stored configured val");
} }
18 => { 18 => {
progress!(ctx, 920); progress!(ctx, 920);

View File

@@ -345,18 +345,14 @@ impl Imap {
} }
async fn unsetup_handle(&mut self, context: &Context) { async fn unsetup_handle(&mut self, context: &Context) {
info!(context, "IMAP unsetup_handle step 2");
if let Some(mut session) = self.session.take() { if let Some(mut session) = self.session.take() {
if let Err(err) = session.close().await { if let Err(err) = session.close().await {
warn!(context, "failed to close connection: {:?}", err); warn!(context, "failed to close connection: {:?}", err);
} }
} }
self.connected = false; self.connected = false;
info!(context, "IMAP unsetup_handle step 3 (clearing config).");
self.config.selected_folder = None; self.config.selected_folder = None;
self.config.selected_mailbox = None; self.config.selected_mailbox = None;
info!(context, "IMAP unsetup_handle step 4 (disconnected)");
} }
async fn free_connect_params(&mut self) { async fn free_connect_params(&mut self) {
@@ -613,13 +609,16 @@ impl Imap {
.await?; .await?;
let mut read_cnt: usize = 0; let mut read_cnt: usize = 0;
let mut read_errors = 0;
// prefetch info from all unfetched mails // prefetch info from all unfetched mails
let mut new_last_seen_uid = last_seen_uid; let mut new_last_seen_uid = last_seen_uid;
let mut read_errors: usize = 0;
let mut uids = Vec::new(); if self.session.is_none() {
if let Some(ref mut session) = &mut self.session { return Err(Error::NoConnection);
}
let session = self.session.as_mut().unwrap();
// fetch messages with larger UID than the last one seen // fetch messages with larger UID than the last one seen
// `(UID FETCH lastseenuid+1:*)`, see RFC 4549 // `(UID FETCH lastseenuid+1:*)`, see RFC 4549
let set = format!("{}:*", last_seen_uid + 1); let set = format!("{}:*", last_seen_uid + 1);
@@ -630,8 +629,16 @@ impl Imap {
} }
}; };
let mut msgs = Vec::new();
while let Some(fetch) = list.next().await { while let Some(fetch) = list.next().await {
let fetch = fetch.map_err(|err| Error::Other(err.to_string()))?; let fetch = fetch.map_err(|err| Error::Other(err.to_string()))?;
msgs.push(fetch);
}
drop(list);
msgs.sort_unstable_by_key(|msg| msg.uid.unwrap_or_default());
for fetch in msgs.into_iter() {
let cur_uid = fetch.uid.unwrap_or_default(); let cur_uid = fetch.uid.unwrap_or_default();
if cur_uid <= last_seen_uid { if cur_uid <= last_seen_uid {
// If the mailbox is not empty, results always include // If the mailbox is not empty, results always include
@@ -643,9 +650,7 @@ impl Imap {
// already seen messages and have to filter them out. // already seen messages and have to filter them out.
info!( info!(
context, context,
"fetch_new_messages: ignoring uid {}, last seen was {}", "fetch_new_messages: ignoring uid {}, last seen was {}", cur_uid, last_seen_uid
cur_uid,
last_seen_uid
); );
continue; continue;
} }
@@ -679,15 +684,6 @@ impl Imap {
); );
} else { } else {
// check passed, go fetch the rest // check passed, go fetch the rest
uids.push((cur_uid, message_id));
}
}
}
} else {
return Err(Error::NoConnection);
};
for (cur_uid, message_id) in uids.into_iter() {
if let Err(err) = self.fetch_single_msg(context, &folder, cur_uid).await { if let Err(err) = self.fetch_single_msg(context, &folder, cur_uid).await {
info!( info!(
context, context,
@@ -698,6 +694,8 @@ impl Imap {
); );
read_errors += 1; read_errors += 1;
} }
}
}
if read_errors == 0 { if read_errors == 0 {
new_last_seen_uid = cur_uid; new_last_seen_uid = cur_uid;
@@ -1114,6 +1112,7 @@ impl Imap {
.get_raw_config_int(context, "folders_configured") .get_raw_config_int(context, "folders_configured")
.await; .await;
if folders_configured.unwrap_or_default() >= 3 { if folders_configured.unwrap_or_default() >= 3 {
info!(context, "IMAP-folders already configured");
// the "3" here we increase if we have future updates to // the "3" here we increase if we have future updates to
// to folder configuration // to folder configuration
return Ok(()); return Ok(());
@@ -1139,18 +1138,20 @@ impl Imap {
while let Some(folder) = folders.next().await { while let Some(folder) = folders.next().await {
let folder = folder.map_err(|err| Error::Other(err.to_string()))?; let folder = folder.map_err(|err| Error::Other(err.to_string()))?;
info!(context, "Scanning folder: {:?}", folder);
if folder.name() == "DeltaChat" || folder.name() == fallback_folder { if mvbox_folder.is_none()
&& (folder.name() == "DeltaChat" || folder.name() == fallback_folder)
{
mvbox_folder = Some(folder.name().to_string()); mvbox_folder = Some(folder.name().to_string());
} }
let is_sentbox_folder = match get_folder_meaning(&folder) {
FolderMeaning::SentObjects => true, if sentbox_folder.is_none() {
_ => false, if let FolderMeaning::SentObjects = get_folder_meaning(&folder) {
};
if is_sentbox_folder {
info!(context, "sentbox folder is {:?}", folder); info!(context, "sentbox folder is {:?}", folder);
sentbox_folder = Some(folder); sentbox_folder = Some(folder);
} }
}
if mvbox_folder.is_some() && sentbox_folder.is_some() { if mvbox_folder.is_some() && sentbox_folder.is_some() {
break; break;
@@ -1227,23 +1228,6 @@ impl Imap {
Ok(()) Ok(())
} }
// async fn list_folders(&self, session: &mut Session, context: &Context) -> Option<Vec<Name>> {
// match session.list(Some(""), Some("*")).await {
// Ok(list) => {
// if list.is_empty() {
// warn!(context, "Folder list is empty.",);
// }
// Some(list)
// }
// Err(err) => {
// eprintln!("list error: {:?}", err);
// warn!(context, "Cannot get folder list.",);
// None
// }
// }
// }
pub async fn empty_folder(&mut self, context: &Context, folder: &str) { pub async fn empty_folder(&mut self, context: &Context, folder: &str) {
info!(context, "emptying folder {}", folder); info!(context, "emptying folder {}", folder);

View File

@@ -5,7 +5,6 @@ use async_std::task;
use std::time::Duration; use std::time::Duration;
use crate::context::Context; use crate::context::Context;
use crate::error::Error;
use crate::imap::Imap; use crate::imap::Imap;
use crate::job::{self, Thread}; use crate::job::{self, Thread};
use crate::smtp::Smtp; use crate::smtp::Smtp;
@@ -68,11 +67,8 @@ async fn inbox_loop(ctx: Context, inbox_handlers: ImapConnectionHandlers) {
ctx.scheduler.write().await.set_probe_network(false); ctx.scheduler.write().await.set_probe_network(false);
} }
Ok(None) | Err(async_std::future::TimeoutError { .. }) => { Ok(None) | Err(async_std::future::TimeoutError { .. }) => {
let watch_folder = get_watch_folder(&ctx, "configured_inbox_folder") match get_watch_folder(&ctx, "configured_inbox_folder").await {
.await Some(watch_folder) => {
.ok_or_else(|| Error::WatchFolderNotFound("not-set".to_string()))
.unwrap();
// fetch // fetch
connection connection
.fetch(&ctx, &watch_folder) .fetch(&ctx, &watch_folder)
@@ -93,6 +89,12 @@ async fn inbox_loop(ctx: Context, inbox_handlers: ImapConnectionHandlers) {
connection.fake_idle(&ctx, Some(watch_folder)).await; connection.fake_idle(&ctx, Some(watch_folder)).await;
} }
} }
None => {
warn!(ctx, "Can not watch inbox folder, not set");
connection.fake_idle(&ctx, None).await;
}
}
}
} }
} }
}; };
@@ -106,7 +108,7 @@ async fn simple_imap_loop(
inbox_handlers: ImapConnectionHandlers, inbox_handlers: ImapConnectionHandlers,
folder: impl AsRef<str>, folder: impl AsRef<str>,
) { ) {
info!(ctx, "starting simple loop"); info!(ctx, "starting simple loop for {}", folder.as_ref());
let ImapConnectionHandlers { let ImapConnectionHandlers {
mut connection, mut connection,
stop_receiver, stop_receiver,
@@ -117,11 +119,8 @@ async fn simple_imap_loop(
connection.connect_configured(&ctx).await.unwrap(); connection.connect_configured(&ctx).await.unwrap();
loop { loop {
let watch_folder = get_watch_folder(&ctx, folder.as_ref()) match get_watch_folder(&ctx, folder.as_ref()).await {
.await Some(watch_folder) => {
.ok_or_else(|| Error::WatchFolderNotFound("not-set".to_string()))
.unwrap();
// fetch // fetch
connection connection
.fetch(&ctx, &watch_folder) .fetch(&ctx, &watch_folder)
@@ -142,6 +141,16 @@ async fn simple_imap_loop(
connection.fake_idle(&ctx, Some(watch_folder)).await; connection.fake_idle(&ctx, Some(watch_folder)).await;
} }
} }
None => {
warn!(
&ctx,
"No watch folder found for {}, skipping",
folder.as_ref()
);
connection.fake_idle(&ctx, None).await
}
}
}
}; };
fut.race(stop_receiver.recv()).await; fut.race(stop_receiver.recv()).await;