update deps

and switch to new channels  in async-std@1.8
This commit is contained in:
dignifiedquire
2020-12-12 23:17:02 +01:00
committed by Floris Bruynooghe
parent 93bd9422e7
commit b3fe74e0f0
7 changed files with 646 additions and 524 deletions

View File

@@ -1,6 +1,8 @@
use async_std::prelude::*;
use async_std::sync::{channel, Receiver, Sender};
use async_std::task;
use async_std::{
channel::{bounded as channel, Receiver, Sender},
task,
};
use crate::context::Context;
use crate::dc_tools::maybe_add_time_based_warnings;
@@ -52,46 +54,45 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
shutdown_sender,
} = inbox_handlers;
let fut = {
let ctx = ctx.clone();
async move {
started.send(()).await;
let ctx1 = ctx.clone();
let fut = async move {
started.send(()).await.unwrap();
let ctx = ctx1;
// track number of continously executed jobs
let mut jobs_loaded = 0;
let mut info = InterruptInfo::default();
loop {
match job::load_next(&ctx, Thread::Imap, &info).await {
Some(job) if jobs_loaded <= 20 => {
jobs_loaded += 1;
job::perform_job(&ctx, job::Connection::Inbox(&mut connection), job).await;
info = Default::default();
// track number of continously executed jobs
let mut jobs_loaded = 0;
let mut info = InterruptInfo::default();
loop {
match job::load_next(&ctx, Thread::Imap, &info).await {
Some(job) if jobs_loaded <= 20 => {
jobs_loaded += 1;
job::perform_job(&ctx, job::Connection::Inbox(&mut connection), job).await;
info = Default::default();
}
Some(job) => {
// Let the fetch run, but return back to the job afterwards.
jobs_loaded = 0;
if ctx.get_config_bool(Config::InboxWatch).await {
info!(ctx, "postponing imap-job {} to run fetch...", job);
fetch(&ctx, &mut connection).await;
}
Some(job) => {
// Let the fetch run, but return back to the job afterwards.
jobs_loaded = 0;
if ctx.get_config_bool(Config::InboxWatch).await {
info!(ctx, "postponing imap-job {} to run fetch...", job);
fetch(&ctx, &mut connection).await;
}
}
None => {
jobs_loaded = 0;
// Expunge folder if needed, e.g. if some jobs have
// deleted messages on the server.
if let Err(err) = connection.maybe_close_folder(&ctx).await {
warn!(ctx, "failed to close folder: {:?}", err);
}
None => {
jobs_loaded = 0;
// Expunge folder if needed, e.g. if some jobs have
// deleted messages on the server.
if let Err(err) = connection.maybe_close_folder(&ctx).await {
warn!(ctx, "failed to close folder: {:?}", err);
}
maybe_add_time_based_warnings(&ctx).await;
maybe_add_time_based_warnings(&ctx).await;
info = if ctx.get_config_bool(Config::InboxWatch).await {
fetch_idle(&ctx, &mut connection, Config::ConfiguredInboxFolder).await
} else {
connection.fake_idle(&ctx, None).await
};
}
info = if ctx.get_config_bool(Config::InboxWatch).await {
fetch_idle(&ctx, &mut connection, Config::ConfiguredInboxFolder).await
} else {
connection.fake_idle(&ctx, None).await
};
}
}
}
@@ -104,7 +105,7 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
})
.race(fut)
.await;
shutdown_sender.send(()).await;
shutdown_sender.send(()).await.unwrap();
}
async fn fetch(ctx: &Context, connection: &mut Imap) {
@@ -185,14 +186,14 @@ async fn simple_imap_loop(
shutdown_sender,
} = inbox_handlers;
let fut = {
let ctx = ctx.clone();
async move {
started.send(()).await;
let ctx1 = ctx.clone();
loop {
fetch_idle(&ctx, &mut connection, folder).await;
}
let fut = async move {
started.send(()).await.unwrap();
let ctx = ctx1;
loop {
fetch_idle(&ctx, &mut connection, folder).await;
}
};
@@ -203,7 +204,7 @@ async fn simple_imap_loop(
})
.race(fut)
.await;
shutdown_sender.send(()).await;
shutdown_sender.send(()).await.unwrap();
}
async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnectionHandlers) {
@@ -217,25 +218,24 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
idle_interrupt_receiver,
} = smtp_handlers;
let fut = {
let ctx = ctx.clone();
async move {
started.send(()).await;
let ctx1 = ctx.clone();
let fut = async move {
started.send(()).await.unwrap();
let ctx = ctx1;
let mut interrupt_info = Default::default();
loop {
match job::load_next(&ctx, Thread::Smtp, &interrupt_info).await {
Some(job) => {
info!(ctx, "executing smtp job");
job::perform_job(&ctx, job::Connection::Smtp(&mut connection), job).await;
interrupt_info = Default::default();
}
None => {
// Fake Idle
info!(ctx, "smtp fake idle - started");
interrupt_info = idle_interrupt_receiver.recv().await.unwrap_or_default();
info!(ctx, "smtp fake idle - interrupted")
}
let mut interrupt_info = Default::default();
loop {
match job::load_next(&ctx, Thread::Smtp, &interrupt_info).await {
Some(job) => {
info!(ctx, "executing smtp job");
job::perform_job(&ctx, job::Connection::Smtp(&mut connection), job).await;
interrupt_info = Default::default();
}
None => {
// Fake Idle
info!(ctx, "smtp fake idle - started");
interrupt_info = idle_interrupt_receiver.recv().await.unwrap_or_default();
info!(ctx, "smtp fake idle - interrupted")
}
}
}
@@ -248,7 +248,7 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
})
.race(fut)
.await;
shutdown_sender.send(()).await;
shutdown_sender.send(()).await.unwrap();
}
impl Scheduler {
@@ -285,7 +285,7 @@ impl Scheduler {
.await
}));
} else {
mvbox_start_send.send(()).await;
mvbox_start_send.send(()).await.unwrap();
}
if ctx.get_config_bool(Config::SentboxWatch).await {
@@ -300,7 +300,7 @@ impl Scheduler {
.await
}));
} else {
sentbox_start_send.send(()).await;
sentbox_start_send.send(()).await.unwrap();
}
let smtp_handle = {
@@ -448,7 +448,7 @@ impl ConnectionState {
/// Shutdown this connection completely.
async fn stop(&self) {
// Trigger shutdown of the run loop.
self.stop_sender.send(()).await;
self.stop_sender.send(()).await.unwrap();
// Wait for a notification that the run loop has been shutdown.
self.shutdown_receiver.recv().await.ok();
}