unify deps and more strict start and stop

This commit is contained in:
dignifiedquire
2020-05-20 16:42:18 +02:00
parent 4855584de9
commit 3c7b3faa7f
7 changed files with 1389 additions and 1382 deletions

2601
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -20,12 +20,12 @@ smallvec = "1.0.0"
reqwest = { version = "0.10.0", features = ["blocking", "json"] } reqwest = { version = "0.10.0", features = ["blocking", "json"] }
num-derive = "0.3.0" num-derive = "0.3.0"
num-traits = "0.2.6" num-traits = "0.2.6"
async-smtp = "0.3" async-smtp = { git = "https://github.com/async-email/async-smtp", version = "0.3" }
email = { git = "https://github.com/deltachat/rust-email", branch = "master" } email = { git = "https://github.com/deltachat/rust-email", branch = "master" }
lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" } lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" }
async-imap = { git = "https://github.com/async-email/async-imap", branch = "feat/send" } async-imap = { git = "https://github.com/async-email/async-imap" }
async-native-tls = "0.3.1" async-native-tls = { git = "https://github.com/async-email/async-native-tls", version = "0.3.1" }
async-std = { git = "https://github.com/async-rs/async-std", features = ["unstable"] } async-std = { version = "1.6.0-beta.2", features = ["unstable"] }
base64 = "0.11" base64 = "0.11"
charset = "0.1" charset = "0.1"
percent-encoding = "2.0" percent-encoding = "2.0"
@@ -50,7 +50,7 @@ escaper = "0.1.0"
bitflags = "1.1.0" bitflags = "1.1.0"
debug_stub_derive = "0.3.0" debug_stub_derive = "0.3.0"
sanitize-filename = "0.2.1" sanitize-filename = "0.2.1"
stop-token = { version = "0.1.1", features = ["unstable"] } stop-token = { git = "https://github.com/dignifiedquire/stop-token", version = "0.1.1", features = ["unstable"] }
mailparse = "0.12.0" mailparse = "0.12.0"
encoded-words = { git = "https://github.com/async-email/encoded-words", branch="master" } encoded-words = { git = "https://github.com/async-email/encoded-words", branch="master" }
native-tls = "0.2.3" native-tls = "0.2.3"
@@ -71,7 +71,7 @@ tempfile = "3.0"
pretty_assertions = "0.6.1" pretty_assertions = "0.6.1"
pretty_env_logger = "0.3.0" pretty_env_logger = "0.3.0"
proptest = "0.9.4" proptest = "0.9.4"
async-std = { git = "https://github.com/async-rs/async-std", features = ["unstable", "attributes"] } async-std = { version = "1.6.0-beta.2", features = ["unstable", "attributes"] }
[workspace] [workspace]
members = [ members = [

View File

@@ -20,7 +20,7 @@ libc = "0.2"
human-panic = "1.0.1" human-panic = "1.0.1"
num-traits = "0.2.6" num-traits = "0.2.6"
serde_json = "1.0" serde_json = "1.0"
async-std = { git = "https://github.com/async-rs/async-std" } async-std = "1.6.0-beta.2"
anyhow = "1.0.28" anyhow = "1.0.28"
thiserror = "1.0.14" thiserror = "1.0.14"
@@ -28,3 +28,4 @@ thiserror = "1.0.14"
default = ["vendored", "nightly"] default = ["vendored", "nightly"]
vendored = ["deltachat/vendored"] vendored = ["deltachat/vendored"]
nightly = ["deltachat/nightly"] nightly = ["deltachat/nightly"]

View File

@@ -1875,7 +1875,6 @@ pub unsafe extern "C" fn dc_imex(
} }
}; };
// TODO: this is now blocking, figure out if that is okay
let ffi_context = &*context; let ffi_context = &*context;
let param1 = to_opt_string_lossy(param1); let param1 = to_opt_string_lossy(param1);

View File

@@ -587,6 +587,9 @@ class Account(object):
def stop_scheduler(self): def stop_scheduler(self):
""" stop core scheduler if it is running. """ """ stop core scheduler if it is running. """
self.ac_log_line("stop_ongoing")
self.stop_ongoing()
self.ac_log_line("context_shutdown (stop core scheduler)") self.ac_log_line("context_shutdown (stop core scheduler)")
self.stop_ongoing() self.stop_ongoing()
lib.dc_context_shutdown(self._dc_context) lib.dc_context_shutdown(self._dc_context)

View File

@@ -138,12 +138,10 @@ impl Context {
} }
pub async fn run(&self) { pub async fn run(&self) {
if self.is_running().await { assert!(!self.is_running().await, "context is already running");
return;
}
let l = &mut *self.inner.scheduler.write().await; let l = &mut *self.inner.scheduler.write().await;
l.run(self.clone()); l.run(self.clone()).await;
} }
pub async fn is_running(&self) -> bool { pub async fn is_running(&self) -> bool {
@@ -153,6 +151,7 @@ impl Context {
pub async fn stop(&self) { pub async fn stop(&self) {
info!(self, "stopping context"); info!(self, "stopping context");
self.inner.stop().await; self.inner.stop().await;
info!(self, "stopped context");
} }
/// Returns a reference to the underlying SQL instance. /// Returns a reference to the underlying SQL instance.
@@ -499,15 +498,14 @@ impl InnerContext {
} }
async fn stop(&self) { async fn stop(&self) {
if self.is_running().await { assert!(self.is_running().await, "context is already stopped");
let token = { let token = {
let lock = &*self.scheduler.read().await; let lock = &*self.scheduler.read().await;
lock.pre_stop().await lock.pre_stop().await
}; };
{ {
let lock = &mut *self.scheduler.write().await; let lock = &mut *self.scheduler.write().await;
lock.stop(token).await; lock.stop(token).await;
}
} }
} }
} }

View File

@@ -17,9 +17,13 @@ pub(crate) enum Scheduler {
Stopped, Stopped,
Running { Running {
inbox: ImapConnectionState, inbox: ImapConnectionState,
inbox_handle: Option<task::JoinHandle<()>>,
mvbox: ImapConnectionState, mvbox: ImapConnectionState,
mvbox_handle: Option<task::JoinHandle<()>>,
sentbox: ImapConnectionState, sentbox: ImapConnectionState,
sentbox_handle: Option<task::JoinHandle<()>>,
smtp: SmtpConnectionState, smtp: SmtpConnectionState,
smtp_handle: Option<task::JoinHandle<()>>,
probe_network: bool, probe_network: bool,
}, },
} }
@@ -47,7 +51,7 @@ impl Context {
} }
} }
async fn inbox_loop(ctx: Context, inbox_handlers: ImapConnectionHandlers) { async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) {
use futures::future::FutureExt; use futures::future::FutureExt;
info!(ctx, "starting inbox loop"); info!(ctx, "starting inbox loop");
@@ -59,6 +63,7 @@ async fn inbox_loop(ctx: Context, inbox_handlers: ImapConnectionHandlers) {
let ctx1 = ctx.clone(); let ctx1 = ctx.clone();
let fut = async move { let fut = async move {
started.send(()).await;
let ctx = ctx1; let ctx = ctx1;
if let Err(err) = connection.connect_configured(&ctx).await { if let Err(err) = connection.connect_configured(&ctx).await {
error!(ctx, "{}", err); error!(ctx, "{}", err);
@@ -92,8 +97,13 @@ async fn inbox_loop(ctx: Context, inbox_handlers: ImapConnectionHandlers) {
} }
}; };
info!(ctx, "Shutting down inbox loop"); stop_receiver
fut.race(stop_receiver.recv().map(|_| ())).await; .recv()
.map(|_| {
info!(ctx, "shutting down inbox loop");
})
.race(fut)
.await;
shutdown_sender.send(()).await; shutdown_sender.send(()).await;
} }
@@ -147,6 +157,7 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap) {
async fn simple_imap_loop( async fn simple_imap_loop(
ctx: Context, ctx: Context,
started: Sender<()>,
inbox_handlers: ImapConnectionHandlers, inbox_handlers: ImapConnectionHandlers,
folder: impl AsRef<str>, folder: impl AsRef<str>,
) { ) {
@@ -159,7 +170,11 @@ async fn simple_imap_loop(
shutdown_sender, shutdown_sender,
} = inbox_handlers; } = inbox_handlers;
let ctx1 = ctx.clone();
let fut = async move { let fut = async move {
started.send(()).await;
let ctx = ctx1;
if let Err(err) = connection.connect_configured(&ctx).await { if let Err(err) = connection.connect_configured(&ctx).await {
error!(ctx, "{}", err); error!(ctx, "{}", err);
return; return;
@@ -200,11 +215,19 @@ async fn simple_imap_loop(
} }
}; };
fut.race(stop_receiver.recv().map(|_| ())).await; stop_receiver
.recv()
.map(|_| {
info!(ctx, "shutting down simple loop");
})
.race(fut)
.await;
shutdown_sender.send(()).await; shutdown_sender.send(()).await;
} }
async fn smtp_loop(ctx: Context, smtp_handlers: SmtpConnectionHandlers) { async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnectionHandlers) {
use futures::future::FutureExt;
info!(ctx, "starting smtp loop"); info!(ctx, "starting smtp loop");
let SmtpConnectionHandlers { let SmtpConnectionHandlers {
mut connection, mut connection,
@@ -213,7 +236,10 @@ async fn smtp_loop(ctx: Context, smtp_handlers: SmtpConnectionHandlers) {
idle_interrupt_receiver, idle_interrupt_receiver,
} = smtp_handlers; } = smtp_handlers;
let ctx1 = ctx.clone();
let fut = async move { let fut = async move {
started.send(()).await;
let ctx = ctx1;
loop { loop {
let probe_network = ctx.scheduler.read().await.get_probe_network(); let probe_network = ctx.scheduler.read().await.get_probe_network();
match job::load_next(&ctx, Thread::Smtp, probe_network) match job::load_next(&ctx, Thread::Smtp, probe_network)
@@ -225,8 +251,6 @@ async fn smtp_loop(ctx: Context, smtp_handlers: SmtpConnectionHandlers) {
ctx.scheduler.write().await.set_probe_network(false); ctx.scheduler.write().await.set_probe_network(false);
} }
Ok(None) | Err(async_std::future::TimeoutError { .. }) => { Ok(None) | Err(async_std::future::TimeoutError { .. }) => {
use futures::future::FutureExt;
// Fake Idle // Fake Idle
async_std::task::sleep(Duration::from_millis(500)) async_std::task::sleep(Duration::from_millis(500))
.race(idle_interrupt_receiver.recv().map(|_| ())) .race(idle_interrupt_receiver.recv().map(|_| ()))
@@ -236,13 +260,19 @@ async fn smtp_loop(ctx: Context, smtp_handlers: SmtpConnectionHandlers) {
} }
}; };
fut.race(stop_receiver.recv()).await.ok(); stop_receiver
.recv()
.map(|_| {
info!(ctx, "shutting down smtp loop");
})
.race(fut)
.await;
shutdown_sender.send(()).await; shutdown_sender.send(()).await;
} }
impl Scheduler { impl Scheduler {
/// Start the scheduler, panics if it is already running. /// Start the scheduler, panics if it is already running.
pub fn run(&mut self, ctx: Context) { pub async fn run(&mut self, ctx: Context) {
let (mvbox, mvbox_handlers) = ImapConnectionState::new(); let (mvbox, mvbox_handlers) = ImapConnectionState::new();
let (sentbox, sentbox_handlers) = ImapConnectionState::new(); let (sentbox, sentbox_handlers) = ImapConnectionState::new();
let (smtp, smtp_handlers) = SmtpConnectionState::new(); let (smtp, smtp_handlers) = SmtpConnectionState::new();
@@ -254,23 +284,65 @@ impl Scheduler {
sentbox, sentbox,
smtp, smtp,
probe_network: false, probe_network: false,
inbox_handle: None,
mvbox_handle: None,
sentbox_handle: None,
smtp_handle: None,
}; };
let ctx1 = ctx.clone(); let (inbox_start_send, inbox_start_recv) = channel(1);
task::spawn(async move { inbox_loop(ctx1, inbox_handlers).await }); if let Scheduler::Running { inbox_handle, .. } = self {
let ctx1 = ctx.clone();
*inbox_handle = Some(task::spawn(async move {
inbox_loop(ctx1, inbox_start_send, inbox_handlers).await
}));
}
let ctx1 = ctx.clone(); let (mvbox_start_send, mvbox_start_recv) = channel(1);
task::spawn(async move { if let Scheduler::Running { mvbox_handle, .. } = self {
simple_imap_loop(ctx1, mvbox_handlers, "configured_mvbox_folder").await let ctx1 = ctx.clone();
}); *mvbox_handle = Some(task::spawn(async move {
simple_imap_loop(
ctx1,
mvbox_start_send,
mvbox_handlers,
"configured_mvbox_folder",
)
.await
}));
}
let ctx1 = ctx.clone(); let (sentbox_start_send, sentbox_start_recv) = channel(1);
task::spawn(async move { if let Scheduler::Running { sentbox_handle, .. } = self {
simple_imap_loop(ctx1, sentbox_handlers, "configured_sentbox_folder").await let ctx1 = ctx.clone();
}); *sentbox_handle = Some(task::spawn(async move {
simple_imap_loop(
ctx1,
sentbox_start_send,
sentbox_handlers,
"configured_sentbox_folder",
)
.await
}));
}
let ctx1 = ctx.clone(); let (smtp_start_send, smtp_start_recv) = channel(1);
task::spawn(async move { smtp_loop(ctx1, smtp_handlers).await }); if let Scheduler::Running { smtp_handle, .. } = self {
let ctx1 = ctx.clone();
*smtp_handle = Some(task::spawn(async move {
smtp_loop(ctx1, smtp_start_send, smtp_handlers).await
}));
}
// wait for all loops to be started
inbox_start_recv
.recv()
.try_join(mvbox_start_recv.recv())
.try_join(sentbox_start_recv.recv())
.try_join(smtp_start_recv.recv())
.await
.map(|_| ())
.unwrap_or_else(|err| error!(ctx, "failed to start scheduler: {}", err));
info!(ctx, "scheduler is running"); info!(ctx, "scheduler is running");
} }
@@ -365,7 +437,18 @@ impl Scheduler {
Scheduler::Stopped => { Scheduler::Stopped => {
panic!("WARN: already stopped"); panic!("WARN: already stopped");
} }
Scheduler::Running { .. } => { Scheduler::Running {
inbox_handle,
mvbox_handle,
sentbox_handle,
smtp_handle,
..
} => {
inbox_handle.take().expect("inbox not started").await;
mvbox_handle.take().expect("mvbox not started").await;
sentbox_handle.take().expect("sentbox not started").await;
smtp_handle.take().expect("smtp not started").await;
*self = Scheduler::Stopped; *self = Scheduler::Stopped;
} }
} }