diff --git a/benches/contacts.rs b/benches/contacts.rs index 5e6010233..0c4b0a76e 100644 --- a/benches/contacts.rs +++ b/benches/contacts.rs @@ -2,13 +2,16 @@ use async_std::task::block_on; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use deltachat::contact::Contact; use deltachat::context::Context; +use deltachat::Events; use tempfile::tempdir; async fn address_book_benchmark(n: u32, read_count: u32) { let dir = tempdir().unwrap(); let dbfile = dir.path().join("db.sqlite"); let id = 100; - let context = Context::new(dbfile.into(), id).await.unwrap(); + let context = Context::new(dbfile.into(), id, Events::new()) + .await + .unwrap(); let book = (0..n) .map(|i| format!("Name {}\naddr{}@example.org\n", i, i)) diff --git a/benches/get_chat_msgs.rs b/benches/get_chat_msgs.rs index 3382cc1ce..252f8eab5 100644 --- a/benches/get_chat_msgs.rs +++ b/benches/get_chat_msgs.rs @@ -6,10 +6,13 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use deltachat::chat::{self, ChatId}; use deltachat::chatlist::Chatlist; use deltachat::context::Context; +use deltachat::Events; async fn get_chat_msgs_benchmark(dbfile: &Path, chats: &[ChatId]) { let id = 100; - let context = Context::new(dbfile.into(), id).await.unwrap(); + let context = Context::new(dbfile.into(), id, Events::new()) + .await + .unwrap(); for c in chats.iter().take(10) { black_box(chat::get_chat_msgs(&context, *c, 0).await.ok()); @@ -21,7 +24,9 @@ fn criterion_benchmark(c: &mut Criterion) { // messages, such as your primary account. if let Ok(path) = std::env::var("DELTACHAT_BENCHMARK_DATABASE") { let chats: Vec<_> = async_std::task::block_on(async { - let context = Context::new((&path).into(), 100).await.unwrap(); + let context = Context::new((&path).into(), 100, Events::new()) + .await + .unwrap(); let chatlist = Chatlist::try_load(&context, 0, None, None).await.unwrap(); let len = chatlist.len(); (0..len).map(|i| chatlist.get_chat_id(i).unwrap()).collect() diff --git a/benches/get_chatlist.rs b/benches/get_chatlist.rs index 507bca8e7..57e7579c5 100644 --- a/benches/get_chatlist.rs +++ b/benches/get_chatlist.rs @@ -3,6 +3,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use deltachat::chatlist::Chatlist; use deltachat::context::Context; +use deltachat::Events; async fn get_chat_list_benchmark(context: &Context) { Chatlist::try_load(context, 0, None, None).await.unwrap(); @@ -12,8 +13,9 @@ fn criterion_benchmark(c: &mut Criterion) { // To enable this benchmark, set `DELTACHAT_BENCHMARK_DATABASE` to some large database with many // messages, such as your primary account. if let Ok(path) = std::env::var("DELTACHAT_BENCHMARK_DATABASE") { - let context = - async_std::task::block_on(async { Context::new(path.into(), 100).await.unwrap() }); + let context = async_std::task::block_on(async { + Context::new(path.into(), 100, Events::new()).await.unwrap() + }); c.bench_function("chatlist:try_load (Get Chatlist)", |b| { b.to_async(AsyncStdExecutor) .iter(|| get_chat_list_benchmark(black_box(&context))) diff --git a/benches/receive_emails.rs b/benches/receive_emails.rs index 1d4af9fbb..11bf8cf83 100644 --- a/benches/receive_emails.rs +++ b/benches/receive_emails.rs @@ -8,6 +8,7 @@ use deltachat::{ context::Context, dc_receive_imf::dc_receive_imf, imex::{imex, ImexMode}, + Events, }; use tempfile::tempdir; @@ -42,7 +43,9 @@ async fn create_context() -> Context { let dir = tempdir().unwrap(); let dbfile = dir.path().join("db.sqlite"); let id = 100; - let context = Context::new(dbfile.into(), id).await.unwrap(); + let context = Context::new(dbfile.into(), id, Events::new()) + .await + .unwrap(); let backup: PathBuf = std::env::current_dir() .unwrap() diff --git a/benches/search_msgs.rs b/benches/search_msgs.rs index 0c75693eb..45215e11b 100644 --- a/benches/search_msgs.rs +++ b/benches/search_msgs.rs @@ -1,12 +1,15 @@ use async_std::task::block_on; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use deltachat::context::Context; +use deltachat::Events; use std::path::Path; async fn search_benchmark(path: impl AsRef) { let dbfile = path.as_ref(); let id = 100; - let context = Context::new(dbfile.into(), id).await.unwrap(); + let context = Context::new(dbfile.into(), id, Events::new()) + .await + .unwrap(); for _ in 0..10u32 { context.search_msgs(None, "hello").await.unwrap(); diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index c1f4287d3..5ca1877d5 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -77,7 +77,11 @@ pub unsafe extern "C" fn dc_context_new( let ctx = if blobdir.is_null() || *blobdir == 0 { // generate random ID as this functionality is not yet available on the C-api. let id = rand::thread_rng().gen(); - block_on(Context::new(as_path(dbfile).to_path_buf().into(), id)) + block_on(Context::new( + as_path(dbfile).to_path_buf().into(), + id, + Events::new(), + )) } else { eprintln!("blobdir can not be defined explicitly anymore"); return ptr::null_mut(); @@ -104,6 +108,7 @@ pub unsafe extern "C" fn dc_context_new_closed(dbfile: *const libc::c_char) -> * match block_on(Context::new_closed( as_path(dbfile).to_path_buf().into(), id, + Events::new(), )) { Ok(context) => Box::into_raw(Box::new(context)), Err(err) => { @@ -4376,7 +4381,7 @@ pub unsafe extern "C" fn dc_accounts_maybe_network_lost(accounts: *mut dc_accoun block_on(async move { accounts.write().await.maybe_network_lost().await }); } -pub type dc_accounts_event_emitter_t = deltachat::accounts::EventEmitter; +pub type dc_accounts_event_emitter_t = EventEmitter; #[no_mangle] pub unsafe extern "C" fn dc_accounts_get_event_emitter( diff --git a/examples/repl/main.rs b/examples/repl/main.rs index 6a82e5da1..3df75f4ed 100644 --- a/examples/repl/main.rs +++ b/examples/repl/main.rs @@ -19,7 +19,7 @@ use deltachat::config; use deltachat::context::*; use deltachat::oauth2::*; use deltachat::securejoin::*; -use deltachat::EventType; +use deltachat::{EventType, Events}; use log::{error, info, warn}; use rustyline::completion::{Completer, FilenameCompleter, Pair}; use rustyline::config::OutputStreamType; @@ -298,7 +298,7 @@ async fn start(args: Vec) -> Result<(), Error> { println!("Error: Bad arguments, expected [db-name]."); bail!("No db-name specified"); } - let context = Context::new(Path::new(&args[1]).to_path_buf(), 0).await?; + let context = Context::new(Path::new(&args[1]).to_path_buf(), 0, Events::new()).await?; let events = context.get_event_emitter(); async_std::task::spawn(async move { diff --git a/examples/simple.rs b/examples/simple.rs index fc875832c..b552585b9 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -6,7 +6,7 @@ use deltachat::config; use deltachat::contact::*; use deltachat::context::*; use deltachat::message::Message; -use deltachat::EventType; +use deltachat::{EventType, Events}; fn cb(event: EventType) { match event { @@ -36,7 +36,7 @@ async fn main() { let dir = tempdir().unwrap(); let dbfile = dir.path().join("db.sqlite"); log::info!("creating database {:?}", dbfile); - let ctx = Context::new(dbfile.into(), 0) + let ctx = Context::new(dbfile.into(), 0, Events::new()) .await .expect("Failed to create context"); let info = ctx.get_info().await; diff --git a/src/accounts.rs b/src/accounts.rs index 6d142d1a7..c41891296 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -2,18 +2,15 @@ use std::collections::BTreeMap; -use async_std::channel::{self, Receiver, Sender}; use async_std::fs; use async_std::path::PathBuf; -use async_std::prelude::*; -use async_std::sync::{Arc, RwLock}; use uuid::Uuid; use anyhow::{ensure, Context as _, Result}; use serde::{Deserialize, Serialize}; use crate::context::Context; -use crate::events::{Event, EventType, Events}; +use crate::events::{Event, EventEmitter, EventType, Events}; /// Account manager, that can handle multiple accounts in a single place. #[derive(Debug)] @@ -21,7 +18,6 @@ pub struct Accounts { dir: PathBuf, config: Config, accounts: BTreeMap, - emitter: EventEmitter, /// Event channel to emit account manager errors. events: Events, @@ -63,28 +59,16 @@ impl Accounts { let config = Config::from_file(config_file) .await .context("failed to load accounts config")?; + let events = Events::new(); let accounts = config - .load_accounts() + .load_accounts(&events) .await .context("failed to load accounts")?; - let emitter = EventEmitter::new(); - - let events = Events::default(); - - emitter.sender.send(events.get_emitter()).await?; - - for account in accounts.values() { - emitter.add_account(account).await.with_context(|| { - format!("failed to add account {} to event emitter", account.id) - })?; - } - Ok(Self { dir, config, accounts, - emitter, events, }) } @@ -121,8 +105,12 @@ impl Accounts { pub async fn add_account(&mut self) -> Result { let account_config = self.config.new_account(&self.dir).await?; - let ctx = Context::new(account_config.dbfile().into(), account_config.id).await?; - self.emitter.add_account(&ctx).await?; + let ctx = Context::new( + account_config.dbfile().into(), + account_config.id, + self.events.clone(), + ) + .await?; self.accounts.insert(account_config.id, ctx); Ok(account_config.id) @@ -132,8 +120,12 @@ impl Accounts { pub async fn add_closed_account(&mut self) -> Result { let account_config = self.config.new_account(&self.dir).await?; - let ctx = Context::new_closed(account_config.dbfile().into(), account_config.id).await?; - self.emitter.add_account(&ctx).await?; + let ctx = Context::new_closed( + account_config.dbfile().into(), + account_config.id, + self.events.clone(), + ) + .await?; self.accounts.insert(account_config.id, ctx); Ok(account_config.id) @@ -225,8 +217,7 @@ impl Accounts { match res { Ok(_) => { - let ctx = Context::new(new_dbfile, account_config.id).await?; - self.emitter.add_account(&ctx).await?; + let ctx = Context::new(new_dbfile, account_config.id, self.events.clone()).await?; self.accounts.insert(account_config.id, ctx); Ok(account_config.id) } @@ -302,74 +293,9 @@ impl Accounts { self.events.emit(Event { id: 0, typ: event }) } - /// Returns unified event emitter. + /// Returns event emitter. pub async fn get_event_emitter(&self) -> EventEmitter { - self.emitter.clone() - } -} - -/// Unified event emitter for multiple accounts. -#[derive(Debug, Clone)] -pub struct EventEmitter { - /// Aggregate stream of events from all accounts. - stream: Arc>>, - - /// Sender for the channel where new account emitters will be pushed. - sender: Sender, - - /// Receiver for the channel where new account emitters will be pushed. - receiver: Receiver, -} - -impl EventEmitter { - pub fn new() -> Self { - let (sender, receiver) = channel::unbounded(); - Self { - stream: Arc::new(RwLock::new(futures::stream::SelectAll::new())), - sender, - receiver, - } - } - - /// Blocking recv of an event. Return `None` if all `Sender`s have been droped. - pub fn recv_sync(&mut self) -> Option { - async_std::task::block_on(self.recv()).unwrap_or_default() - } - - /// Async recv of an event. Return `None` if all `Sender`s have been dropped. - pub async fn recv(&mut self) -> Result> { - let mut stream = self.stream.write().await; - loop { - match futures::future::select(self.receiver.recv(), stream.next()).await { - futures::future::Either::Left((emitter, _)) => { - stream.push(emitter?); - } - futures::future::Either::Right((ev, _)) => return Ok(ev), - } - } - } - - /// Add event emitter of a new account to the aggregate event emitter. - pub async fn add_account(&self, context: &Context) -> Result<()> { - self.sender.send(context.get_event_emitter()).await?; - Ok(()) - } -} - -impl Default for EventEmitter { - fn default() -> Self { - Self::new() - } -} - -impl async_std::stream::Stream for EventEmitter { - type Item = Event; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - std::pin::Pin::new(&mut self).poll_next(cx) + self.events.get_emitter() } } @@ -426,17 +352,21 @@ impl Config { Ok(Config { file, inner }) } - pub async fn load_accounts(&self) -> Result> { + pub async fn load_accounts(&self, events: &Events) -> Result> { let mut accounts = BTreeMap::new(); for account_config in &self.inner.accounts { - let ctx = Context::new(account_config.dbfile().into(), account_config.id) - .await - .with_context(|| { - format!( - "failed to create context from file {:?}", - account_config.dbfile() - ) - })?; + let ctx = Context::new( + account_config.dbfile().into(), + account_config.id, + events.clone(), + ) + .await + .with_context(|| { + format!( + "failed to create context from file {:?}", + account_config.dbfile() + ) + })?; accounts.insert(account_config.id, ctx); } @@ -610,7 +540,9 @@ mod tests { assert_eq!(accounts.config.get_selected_account().await, 0); let extern_dbfile: PathBuf = dir.path().join("other").into(); - let ctx = Context::new(extern_dbfile.clone(), 0).await.unwrap(); + let ctx = Context::new(extern_dbfile.clone(), 0, Events::new()) + .await + .unwrap(); ctx.set_config(crate::config::Config::Addr, Some("me@mail.com")) .await .unwrap(); @@ -746,7 +678,7 @@ mod tests { assert_eq!(accounts.accounts.len(), 0); // Create event emitter. - let mut event_emitter = accounts.get_event_emitter().await; + let event_emitter = accounts.get_event_emitter().await; // Test that event emitter does not return `None` immediately. let duration = std::time::Duration::from_millis(1); @@ -756,7 +688,7 @@ mod tests { // When account manager is dropped, event emitter is exhausted. drop(accounts); - assert_eq!(event_emitter.recv().await?, None); + assert_eq!(event_emitter.recv().await, None); Ok(()) } diff --git a/src/context.rs b/src/context.rs index 14947a3b8..787a522b0 100644 --- a/src/context.rs +++ b/src/context.rs @@ -115,8 +115,8 @@ pub fn get_info() -> BTreeMap<&'static str, String> { impl Context { /// Creates new context and opens the database. - pub async fn new(dbfile: PathBuf, id: u32) -> Result { - let context = Self::new_closed(dbfile, id).await?; + pub async fn new(dbfile: PathBuf, id: u32, events: Events) -> Result { + let context = Self::new_closed(dbfile, id, events).await?; // Open the database if is not encrypted. if context.check_passphrase("".to_string()).await? { @@ -126,7 +126,7 @@ impl Context { } /// Creates new context without opening the database. - pub async fn new_closed(dbfile: PathBuf, id: u32) -> Result { + pub async fn new_closed(dbfile: PathBuf, id: u32, events: Events) -> Result { let mut blob_fname = OsString::new(); blob_fname.push(dbfile.file_name().unwrap_or_default()); blob_fname.push("-blobs"); @@ -134,7 +134,7 @@ impl Context { if !blobdir.exists().await { async_std::fs::create_dir_all(&blobdir).await?; } - let context = Context::with_blobdir(dbfile, blobdir, id).await?; + let context = Context::with_blobdir(dbfile, blobdir, id, events).await?; Ok(context) } @@ -169,6 +169,7 @@ impl Context { dbfile: PathBuf, blobdir: PathBuf, id: u32, + events: Events, ) -> Result { ensure!( blobdir.is_dir().await, @@ -186,7 +187,7 @@ impl Context { oauth2_mutex: Mutex::new(()), wrong_pw_warning_mutex: Mutex::new(()), translated_stockstrings: RwLock::new(HashMap::new()), - events: Events::default(), + events, scheduler: RwLock::new(None), ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 3.0)), // Allow to send 3 messages immediately, no more than once every 20 seconds. quota: RwLock::new(None), @@ -683,7 +684,7 @@ mod tests { let tmp = tempfile::tempdir()?; let dbfile = tmp.path().join("db.sqlite"); std::fs::write(&dbfile, b"123")?; - let res = Context::new(dbfile.into(), 1).await?; + let res = Context::new(dbfile.into(), 1, Events::new()).await?; // Broken database is indistinguishable from encrypted one. assert_eq!(res.is_open().await, false); @@ -829,7 +830,7 @@ mod tests { async fn test_blobdir_exists() { let tmp = tempfile::tempdir().unwrap(); let dbfile = tmp.path().join("db.sqlite"); - Context::new(dbfile.into(), 1).await.unwrap(); + Context::new(dbfile.into(), 1, Events::new()).await.unwrap(); let blobdir = tmp.path().join("db.sqlite-blobs"); assert!(blobdir.is_dir()); } @@ -840,7 +841,7 @@ mod tests { let dbfile = tmp.path().join("db.sqlite"); let blobdir = tmp.path().join("db.sqlite-blobs"); std::fs::write(&blobdir, b"123").unwrap(); - let res = Context::new(dbfile.into(), 1).await; + let res = Context::new(dbfile.into(), 1, Events::new()).await; assert!(res.is_err()); } @@ -850,7 +851,7 @@ mod tests { let subdir = tmp.path().join("subdir"); let dbfile = subdir.join("db.sqlite"); let dbfile2 = dbfile.clone(); - Context::new(dbfile.into(), 1).await.unwrap(); + Context::new(dbfile.into(), 1, Events::new()).await.unwrap(); assert!(subdir.is_dir()); assert!(dbfile2.is_file()); } @@ -860,7 +861,7 @@ mod tests { let tmp = tempfile::tempdir().unwrap(); let dbfile = tmp.path().join("db.sqlite"); let blobdir = PathBuf::new(); - let res = Context::with_blobdir(dbfile.into(), blobdir, 1).await; + let res = Context::with_blobdir(dbfile.into(), blobdir, 1, Events::new()).await; assert!(res.is_err()); } @@ -869,7 +870,7 @@ mod tests { let tmp = tempfile::tempdir().unwrap(); let dbfile = tmp.path().join("db.sqlite"); let blobdir = tmp.path().join("blobs"); - let res = Context::with_blobdir(dbfile.into(), blobdir.into(), 1).await; + let res = Context::with_blobdir(dbfile.into(), blobdir.into(), 1, Events::new()).await; assert!(res.is_err()); } @@ -1038,7 +1039,7 @@ mod tests { let dbfile = dir.path().join("db.sqlite"); let id = 1; - let context = Context::new_closed(dbfile.clone().into(), id) + let context = Context::new_closed(dbfile.clone().into(), id, Events::new()) .await .context("failed to create context")?; assert_eq!(context.open("foo".to_string()).await?, true); @@ -1046,7 +1047,7 @@ mod tests { drop(context); let id = 2; - let context = Context::new(dbfile.into(), id) + let context = Context::new(dbfile.into(), id, Events::new()) .await .context("failed to create context")?; assert_eq!(context.is_open().await, false); diff --git a/src/events.rs b/src/events.rs index 0f158f3bf..fd82f37a8 100644 --- a/src/events.rs +++ b/src/events.rs @@ -9,7 +9,8 @@ use crate::ephemeral::Timer as EphemeralTimer; use crate::message::MsgId; use crate::webxdc::StatusUpdateSerial; -#[derive(Debug)] +/// Event channel. +#[derive(Debug, Clone)] pub struct Events { receiver: Receiver, sender: Sender, @@ -17,13 +18,17 @@ pub struct Events { impl Default for Events { fn default() -> Self { - let (sender, receiver) = channel::bounded(1_000); - - Self { receiver, sender } + Self::new() } } impl Events { + pub fn new() -> Self { + let (sender, receiver) = channel::bounded(1_000); + + Self { receiver, sender } + } + pub fn emit(&self, event: Event) { match self.sender.try_send(event) { Ok(()) => {} diff --git a/src/test_utils.rs b/src/test_utils.rs index 2e270aa4a..dd70a97fd 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -27,7 +27,7 @@ use crate::contact::{Contact, ContactId, Modifier, Origin}; use crate::context::Context; use crate::dc_receive_imf::dc_receive_imf; use crate::dc_tools::EmailAddress; -use crate::events::{Event, EventType}; +use crate::events::{Event, EventType, Events}; use crate::key::{self, DcKey, KeyPair, KeyPairUse}; use crate::message::{update_msg_state, Message, MessageState, MsgId, Viewtype}; use crate::mimeparser::MimeMessage; @@ -266,7 +266,7 @@ impl TestContext { let mut context_names = CONTEXT_NAMES.write().unwrap(); context_names.insert(id, name); } - let ctx = Context::new(dbfile.into(), id) + let ctx = Context::new(dbfile.into(), id, Events::new()) .await .expect("failed to create context");