From 4dbcab9e6dde8a71d16f9e68dc6301bacce04fa4 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sun, 26 Jan 2020 09:09:27 +0100 Subject: [PATCH] feat: implement minimal rust threads --- Cargo.lock | 15 ++++++ Cargo.toml | 2 +- deltachat-ffi/Cargo.toml | 2 +- rust-toolchain | 2 +- src/context.rs | 103 +++++++++++++++++++++++++++++++++++++-- src/lib.rs | 1 - 6 files changed, 117 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be04272f1..ddb455b60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -498,6 +498,19 @@ dependencies = [ "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crossbeam" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-deque 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "crossbeam-channel" version = "0.4.0" @@ -641,6 +654,7 @@ dependencies = [ "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "charset 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", "debug_stub_derive 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "deltachat_derive 2.0.0", "email 0.0.21 (git+https://github.com/deltachat/rust-email)", @@ -3346,6 +3360,7 @@ dependencies = [ "checksum core-foundation-sys 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e7ca8a5221364ef15ce201e8ed2f609fc312682a8f4e0e3d4aa5879764e0fa3b" "checksum crc24 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "fd121741cf3eb82c08dd3023eb55bf2665e5f60ec20f89760cf836ae4562e6a0" "checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1" +"checksum crossbeam 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e" "checksum crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "acec9a3b0b3559f15aee4f90746c4e5e293b701c0f7d3925d24e01645267b68c" "checksum crossbeam-deque 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3aa945d63861bfe624b55d153a39684da1e8c0bc8fba932f7ee3a3c16cea3ca" "checksum crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5064ebdbf05ce3cb95e45c8b086f72263f4166b29b97f6baff7ef7fe047b55ac" diff --git a/Cargo.toml b/Cargo.toml index a1d283103..166dbdc5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,8 +58,8 @@ encoded-words = { git = "https://github.com/async-email/encoded-words", branch=" native-tls = "0.2.3" image = { version = "0.22.4", default-features=false, features = ["gif_codec", "jpeg", "ico", "png_codec", "pnm", "webp", "bmp"] } pretty_env_logger = "0.3.1" - rustyline = { version = "4.1.0", optional = true } +crossbeam = "0.7.3" [dev-dependencies] tempfile = "3.0" diff --git a/deltachat-ffi/Cargo.toml b/deltachat-ffi/Cargo.toml index e82037ee8..b297ca65e 100644 --- a/deltachat-ffi/Cargo.toml +++ b/deltachat-ffi/Cargo.toml @@ -12,7 +12,7 @@ categories = ["cryptography", "std", "email"] [lib] name = "deltachat" -crate-type = ["cdylib", "staticlib"] +crate-type = ["cdylib"] [dependencies] deltachat = { path = "../", default-features = false } diff --git a/rust-toolchain b/rust-toolchain index 22e904890..bb68f0857 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2019-11-06 +nightly-2020-01-26 diff --git a/src/context.rs b/src/context.rs index 4db0a65dc..212a35325 100644 --- a/src/context.rs +++ b/src/context.rs @@ -3,7 +3,9 @@ use std::collections::HashMap; use std::ffi::OsString; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Condvar, Mutex, RwLock}; +use std::sync::{Arc, Condvar, Mutex, RwLock, atomic::{Ordering, AtomicBool}}; + +use crossbeam::channel::{bounded, unbounded, Receiver, Sender}; use crate::chat::*; use crate::config::Config; @@ -47,8 +49,6 @@ pub struct Context { pub smtp: Arc>, pub smtp_state: Arc<(Mutex, Condvar)>, pub oauth2_critical: Arc>, - #[debug_stub = "Callback"] - cb: Box, pub os_name: Option, pub cmdline_sel_chat_id: Arc>, pub bob: Arc>, @@ -57,6 +57,16 @@ pub struct Context { /// Mutex to avoid generating the key for the user more than once. pub generating_key_mutex: Mutex<()>, pub translated_stockstrings: RwLock>, + + #[debug_stub = "Callback"] + cb: Box, + + event_sender: Sender, + event_receiver: Receiver, + shutdown_sender: Sender<()>, + shutdown_receiver: Receiver<()>, + + is_running: AtomicBool, } #[derive(Debug, PartialEq, Eq)] @@ -80,6 +90,16 @@ pub fn get_info() -> HashMap<&'static str, String> { res } +macro_rules! while_running { + ($self:expr, $code:block) => { + if $self.is_running.load(Ordering::Relaxed) { + $code + } else { + break; + } + }; +} + impl Context { /// Creates new context. pub fn new(cb: Box, os_name: String, dbfile: PathBuf) -> Result { @@ -106,6 +126,10 @@ impl Context { "Blobdir does not exist: {}", blobdir.display() ); + + let (event_sender, event_receiver) = unbounded(); + let (shutdown_sender, shutdown_receiver) = bounded(0); + let ctx = Context { blobdir, dbfile, @@ -138,6 +162,11 @@ impl Context { perform_inbox_jobs_needed: Arc::new(RwLock::new(false)), generating_key_mutex: Mutex::new(()), translated_stockstrings: RwLock::new(HashMap::new()), + event_sender, + event_receiver, + shutdown_sender, + shutdown_receiver, + is_running: Default::default(), }; ensure!( @@ -159,7 +188,73 @@ impl Context { } pub fn call_cb(&self, event: Event) { - (*self.cb)(self, event); + self.event_sender.send(event).unwrap(); + } + + /// Start the run loop. + pub fn run(&self) { + use crossbeam::channel::select; + + self.is_running.store(true, Ordering::Relaxed); + + crossbeam::scope(|s| { + let imap_handle = s.spawn(|_| loop { + while_running!(self, { + perform_inbox_jobs(self); + while_running!(self, { + perform_inbox_fetch(self); + while_running!(self, { perform_inbox_idle(self) }); + }); + }); + }); + let mvbox_handle = s.spawn(|_| loop { + while_running!(self, { + perform_mvbox_fetch(self); + while_running!(self, { + perform_mvbox_idle(self); + }); + }); + }); + let sentbox_handle = s.spawn(|_| loop { + while_running!(self, { + perform_sentbox_fetch(self); + while_running!(self, { + perform_sentbox_idle(self); + }); + }); + }); + let smtp_handle = s.spawn(|_| loop { + while_running!(self, { + perform_smtp_jobs(self); + while_running!(self, { + perform_smtp_idle(self); + }); + }); + }); + + loop { + select! { + recv(self.event_receiver) -> event => { + // This gurantees that the callback is always called from the thread + // that called `run`. + (*self.cb)(self, event.unwrap()) + }, + recv(self.shutdown_receiver) -> _ => break, + } + } + + imap_handle.join().unwrap(); + mvbox_handle.join().unwrap(); + sentbox_handle.join().unwrap(); + smtp_handle.join().unwrap(); + }) + .unwrap() + } + + /// Stop the run loop. Blocks until all threads have shutdown. + pub fn shutdown(&self) { + self.is_running.store(false, Ordering::Relaxed); + self.shutdown_sender.send(()).unwrap(); } /******************************************************************************* diff --git a/src/lib.rs b/src/lib.rs index 06f676298..1ca1c8deb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,3 @@ -#![forbid(unsafe_code)] #![deny(clippy::correctness, missing_debug_implementations, clippy::all)] #![allow(clippy::match_bool)]