fix scheduler shutdown

This commit is contained in:
dignifiedquire
2021-01-03 18:18:39 +01:00
committed by Floris Bruynooghe
parent b3fe74e0f0
commit 2407fbd1f0

View File

@@ -56,7 +56,10 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
let ctx1 = ctx.clone(); let ctx1 = ctx.clone();
let fut = async move { let fut = async move {
started.send(()).await.unwrap(); started
.send(())
.await
.expect("inbox loop, missing started receiver");
let ctx = ctx1; let ctx = ctx1;
// track number of continously executed jobs // track number of continously executed jobs
@@ -105,7 +108,10 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
}) })
.race(fut) .race(fut)
.await; .await;
shutdown_sender.send(()).await.unwrap(); shutdown_sender
.send(())
.await
.expect("inbox loop, missing shutdown receiver");
} }
async fn fetch(ctx: &Context, connection: &mut Imap) { async fn fetch(ctx: &Context, connection: &mut Imap) {
@@ -189,7 +195,10 @@ async fn simple_imap_loop(
let ctx1 = ctx.clone(); let ctx1 = ctx.clone();
let fut = async move { let fut = async move {
started.send(()).await.unwrap(); started
.send(())
.await
.expect("simple imap loop, missing started receive");
let ctx = ctx1; let ctx = ctx1;
loop { loop {
@@ -204,7 +213,10 @@ async fn simple_imap_loop(
}) })
.race(fut) .race(fut)
.await; .await;
shutdown_sender.send(()).await.unwrap(); shutdown_sender
.send(())
.await
.expect("simple imap loop, missing shutdown receiver");
} }
async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnectionHandlers) { async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnectionHandlers) {
@@ -220,7 +232,10 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
let ctx1 = ctx.clone(); let ctx1 = ctx.clone();
let fut = async move { let fut = async move {
started.send(()).await.unwrap(); started
.send(())
.await
.expect("smtp loop, missing started receiver");
let ctx = ctx1; let ctx = ctx1;
let mut interrupt_info = Default::default(); let mut interrupt_info = Default::default();
@@ -248,7 +263,10 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
}) })
.race(fut) .race(fut)
.await; .await;
shutdown_sender.send(()).await.unwrap(); shutdown_sender
.send(())
.await
.expect("smtp loop, missing shutdown receiver");
} }
impl Scheduler { impl Scheduler {
@@ -285,7 +303,10 @@ impl Scheduler {
.await .await
})); }));
} else { } else {
mvbox_start_send.send(()).await.unwrap(); mvbox_start_send
.send(())
.await
.expect("mvbox start send, missing receiver");
} }
if ctx.get_config_bool(Config::SentboxWatch).await { if ctx.get_config_bool(Config::SentboxWatch).await {
@@ -300,7 +321,10 @@ impl Scheduler {
.await .await
})); }));
} else { } else {
sentbox_start_send.send(()).await.unwrap(); sentbox_start_send
.send(())
.await
.expect("sentbox start send, missing receiver");
} }
let smtp_handle = { let smtp_handle = {
@@ -379,17 +403,27 @@ impl Scheduler {
} }
Scheduler::Running { Scheduler::Running {
inbox, inbox,
inbox_handle,
mvbox, mvbox,
mvbox_handle,
sentbox, sentbox,
sentbox_handle,
smtp, smtp,
smtp_handle,
.. ..
} => { } => {
inbox if inbox_handle.is_some() {
.stop() inbox.stop().await;
.join(mvbox.stop()) }
.join(sentbox.stop()) if mvbox_handle.is_some() {
.join(smtp.stop()) mvbox.stop().await;
.await; }
if sentbox_handle.is_some() {
sentbox.stop().await;
}
if smtp_handle.is_some() {
smtp.stop().await;
}
StopToken StopToken
} }
@@ -448,7 +482,10 @@ impl ConnectionState {
/// Shutdown this connection completely. /// Shutdown this connection completely.
async fn stop(&self) { async fn stop(&self) {
// Trigger shutdown of the run loop. // Trigger shutdown of the run loop.
self.stop_sender.send(()).await.unwrap(); self.stop_sender
.send(())
.await
.expect("stop, missing receiver");
// Wait for a notification that the run loop has been shutdown. // Wait for a notification that the run loop has been shutdown.
self.shutdown_receiver.recv().await.ok(); self.shutdown_receiver.recv().await.ok();
} }