fix: update message ids correctly

Fixes #1495
This commit is contained in:
Hocuri
2020-06-05 16:27:22 +02:00
committed by GitHub
parent ca95f25639
commit 05e1c00cd1
11 changed files with 401 additions and 170 deletions

View File

@@ -5,7 +5,7 @@ use async_std::task;
use crate::context::Context;
use crate::imap::Imap;
use crate::job::{self, Thread};
use crate::smtp::Smtp;
use crate::{config::Config, message::MsgId, smtp::Smtp};
pub(crate) struct StopToken;
@@ -32,36 +32,20 @@ impl Context {
self.scheduler.read().await.maybe_network().await;
}
pub(crate) async fn interrupt_inbox(&self, probe_network: bool) {
self.scheduler
.read()
.await
.interrupt_inbox(probe_network)
.await;
pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) {
self.scheduler.read().await.interrupt_inbox(info).await;
}
pub(crate) async fn interrupt_sentbox(&self, probe_network: bool) {
self.scheduler
.read()
.await
.interrupt_sentbox(probe_network)
.await;
pub(crate) async fn interrupt_sentbox(&self, info: InterruptInfo) {
self.scheduler.read().await.interrupt_sentbox(info).await;
}
pub(crate) async fn interrupt_mvbox(&self, probe_network: bool) {
self.scheduler
.read()
.await
.interrupt_mvbox(probe_network)
.await;
pub(crate) async fn interrupt_mvbox(&self, info: InterruptInfo) {
self.scheduler.read().await.interrupt_mvbox(info).await;
}
pub(crate) async fn interrupt_smtp(&self, probe_network: bool) {
self.scheduler
.read()
.await
.interrupt_smtp(probe_network)
.await;
pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) {
self.scheduler.read().await.interrupt_smtp(info).await;
}
}
@@ -86,14 +70,14 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
started.send(()).await;
// track number of continously executed jobs
let mut jobs_loaded = 0;
let mut probe_network = false;
let mut jobs_loaded: i32 = 0;
let mut info: InterruptInfo = Default::default();
loop {
match job::load_next(&ctx, Thread::Imap, probe_network).await {
match job::load_next(&ctx, Thread::Imap, &info).await {
Some(job) if jobs_loaded <= 20 => {
jobs_loaded += 1;
job::perform_job(&ctx, job::Connection::Inbox(&mut connection), job).await;
probe_network = false;
info = Default::default();
}
Some(job) => {
// Let the fetch run, but return back to the job afterwards.
@@ -103,8 +87,7 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
}
None => {
jobs_loaded = 0;
probe_network =
fetch_idle(&ctx, &mut connection, "configured_inbox_folder").await;
info = fetch_idle(&ctx, &mut connection, Config::ConfiguredInboxFolder).await;
}
}
}
@@ -121,7 +104,7 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
}
async fn fetch(ctx: &Context, connection: &mut Imap) {
match get_watch_folder(&ctx, "configured_inbox_folder").await {
match ctx.get_config(Config::ConfiguredInboxFolder).await {
Some(watch_folder) => {
// fetch
if let Err(err) = connection.fetch(&ctx, &watch_folder).await {
@@ -136,8 +119,8 @@ async fn fetch(ctx: &Context, connection: &mut Imap) {
}
}
async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: &str) -> bool {
match get_watch_folder(&ctx, folder).await {
async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> InterruptInfo {
match ctx.get_config(folder).await {
Some(watch_folder) => {
// fetch
if let Err(err) = connection.fetch(&ctx, &watch_folder).await {
@@ -153,7 +136,7 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: &str) -> bool
.unwrap_or_else(|err| {
connection.trigger_reconnect();
error!(ctx, "{}", err);
false
InterruptInfo::new(false, None)
})
} else {
connection.fake_idle(&ctx, Some(watch_folder)).await
@@ -170,7 +153,7 @@ async fn simple_imap_loop(
ctx: Context,
started: Sender<()>,
inbox_handlers: ImapConnectionHandlers,
folder: impl AsRef<str>,
folder: Config,
) {
use futures::future::FutureExt;
@@ -193,7 +176,7 @@ async fn simple_imap_loop(
started.send(()).await;
loop {
fetch_idle(&ctx, &mut connection, folder.as_ref()).await;
fetch_idle(&ctx, &mut connection, folder).await;
}
};
@@ -223,18 +206,18 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
started.send(()).await;
let ctx = ctx1;
let mut probe_network = false;
let mut interrupt_info = Default::default();
loop {
match job::load_next(&ctx, Thread::Smtp, probe_network).await {
match job::load_next(&ctx, Thread::Smtp, &interrupt_info).await {
Some(job) => {
info!(ctx, "executing smtp job");
job::perform_job(&ctx, job::Connection::Smtp(&mut connection), job).await;
probe_network = false;
interrupt_info = Default::default();
}
None => {
// Fake Idle
info!(ctx, "smtp fake idle - started");
probe_network = idle_interrupt_receiver.recv().await.unwrap_or_default();
interrupt_info = idle_interrupt_receiver.recv().await.unwrap_or_default();
info!(ctx, "smtp fake idle - interrupted")
}
}
@@ -286,7 +269,7 @@ impl Scheduler {
ctx1,
mvbox_start_send,
mvbox_handlers,
"configured_mvbox_folder",
Config::ConfiguredMvboxFolder,
)
.await
}));
@@ -300,7 +283,7 @@ impl Scheduler {
ctx1,
sentbox_start_send,
sentbox_handlers,
"configured_sentbox_folder",
Config::ConfiguredSentboxFolder,
)
.await
}));
@@ -333,34 +316,34 @@ impl Scheduler {
return;
}
self.interrupt_inbox(true)
.join(self.interrupt_mvbox(true))
.join(self.interrupt_sentbox(true))
.join(self.interrupt_smtp(true))
self.interrupt_inbox(InterruptInfo::new(true, None))
.join(self.interrupt_mvbox(InterruptInfo::new(true, None)))
.join(self.interrupt_sentbox(InterruptInfo::new(true, None)))
.join(self.interrupt_smtp(InterruptInfo::new(true, None)))
.await;
}
async fn interrupt_inbox(&self, probe_network: bool) {
async fn interrupt_inbox(&self, info: InterruptInfo) {
if let Scheduler::Running { ref inbox, .. } = self {
inbox.interrupt(probe_network).await;
inbox.interrupt(info).await;
}
}
async fn interrupt_mvbox(&self, probe_network: bool) {
async fn interrupt_mvbox(&self, info: InterruptInfo) {
if let Scheduler::Running { ref mvbox, .. } = self {
mvbox.interrupt(probe_network).await;
mvbox.interrupt(info).await;
}
}
async fn interrupt_sentbox(&self, probe_network: bool) {
async fn interrupt_sentbox(&self, info: InterruptInfo) {
if let Scheduler::Running { ref sentbox, .. } = self {
sentbox.interrupt(probe_network).await;
sentbox.interrupt(info).await;
}
}
async fn interrupt_smtp(&self, probe_network: bool) {
async fn interrupt_smtp(&self, info: InterruptInfo) {
if let Scheduler::Running { ref smtp, .. } = self {
smtp.interrupt(probe_network).await;
smtp.interrupt(info).await;
}
}
@@ -429,7 +412,7 @@ struct ConnectionState {
/// Channel to interrupt the whole connection.
stop_sender: Sender<()>,
/// Channel to interrupt idle.
idle_interrupt_sender: Sender<bool>,
idle_interrupt_sender: Sender<InterruptInfo>,
}
impl ConnectionState {
@@ -441,9 +424,9 @@ impl ConnectionState {
self.shutdown_receiver.recv().await.ok();
}
async fn interrupt(&self, probe_network: bool) {
async fn interrupt(&self, info: InterruptInfo) {
// Use try_send to avoid blocking on interrupts.
self.idle_interrupt_sender.try_send(probe_network).ok();
self.idle_interrupt_sender.try_send(info).ok();
}
}
@@ -477,8 +460,8 @@ impl SmtpConnectionState {
}
/// Interrupt any form of idle.
async fn interrupt(&self, probe_network: bool) {
self.state.interrupt(probe_network).await;
async fn interrupt(&self, info: InterruptInfo) {
self.state.interrupt(info).await;
}
/// Shutdown this connection completely.
@@ -492,7 +475,7 @@ struct SmtpConnectionHandlers {
connection: Smtp,
stop_receiver: Receiver<()>,
shutdown_sender: Sender<()>,
idle_interrupt_receiver: Receiver<bool>,
idle_interrupt_receiver: Receiver<InterruptInfo>,
}
#[derive(Debug)]
@@ -525,8 +508,8 @@ impl ImapConnectionState {
}
/// Interrupt any form of idle.
async fn interrupt(&self, probe_network: bool) {
self.state.interrupt(probe_network).await;
async fn interrupt(&self, info: InterruptInfo) {
self.state.interrupt(info).await;
}
/// Shutdown this connection completely.
@@ -542,20 +525,17 @@ struct ImapConnectionHandlers {
shutdown_sender: Sender<()>,
}
async fn get_watch_folder(context: &Context, config_name: impl AsRef<str>) -> Option<String> {
match context
.sql
.get_raw_config(context, config_name.as_ref())
.await
{
Some(name) => Some(name),
None => {
if config_name.as_ref() == "configured_inbox_folder" {
// initialized with old version, so has not set configured_inbox_folder
Some("INBOX".to_string())
} else {
None
}
#[derive(Default, Debug)]
pub struct InterruptInfo {
pub probe_network: bool,
pub msg_id: Option<MsgId>,
}
impl InterruptInfo {
pub fn new(probe_network: bool, msg_id: Option<MsgId>) -> Self {
Self {
probe_network,
msg_id,
}
}
}