From 014d2946b2ca9c4627551aff12eb745ac7a5f6ed Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Fri, 22 May 2020 21:03:01 +0200 Subject: [PATCH] feat: use EventEmitter for events --- Cargo.toml | 8 +++-- deltachat-ffi/deltachat.h | 6 +++- deltachat-ffi/src/lib.rs | 39 +++++++++++++++++++----- examples/repl/cmdline.rs | 6 ++-- examples/repl/main.rs | 4 +-- examples/simple.rs | 4 +-- python/src/deltachat/account.py | 17 +++++------ python/src/deltachat/events.py | 33 ++++++++------------ src/blob.rs | 4 +-- src/chat.rs | 40 ++++++++++++------------ src/config.rs | 2 +- src/configure/mod.rs | 2 +- src/contact.rs | 12 ++++---- src/context.rs | 20 ++++++------ src/dc_receive_imf.rs | 12 ++++---- src/dc_tools.rs | 2 +- src/events.rs | 54 +++++++++++++++++++++++++++++++++ src/imex.rs | 14 ++++----- src/job.rs | 2 +- src/location.rs | 8 ++--- src/log.rs | 2 +- src/message.rs | 6 ++-- src/mimeparser.rs | 2 +- src/securejoin.rs | 4 +-- src/smtp/mod.rs | 4 +-- src/smtp/send.rs | 2 +- 26 files changed, 192 insertions(+), 117 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e209a069b..6591225e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,14 +57,15 @@ image = { version = "0.22.4", default-features=false, features = ["gif_codec", " futures = "0.3.4" thiserror = "1.0.14" anyhow = "1.0.28" +async-trait = "0.1.31" +url = "2.1.1" +crossbeam-channel = "0.4.2" pretty_env_logger = { version = "0.3.1", optional = true } log = {version = "0.4.8", optional = true } rustyline = { version = "4.1.0", optional = true } ansi_term = { version = "0.12.1", optional = true } -async-trait = "0.1.31" -crossbeam-channel = "0.4.2" -url = "2.1.1" + [dev-dependencies] tempfile = "3.0" @@ -95,3 +96,4 @@ internals = [] repl = ["internals", "rustyline", "log", "pretty_env_logger", "ansi_term"] vendored = ["async-native-tls/vendored", "async-smtp/native-tls-vendored"] nightly = ["pgp/nightly"] + diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index 0556a6184..de84d9699 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -20,6 +20,7 @@ typedef struct _dc_contact dc_contact_t; typedef struct _dc_lot dc_lot_t; typedef struct _dc_provider dc_provider_t; typedef struct _dc_event dc_event_t; + typedef struct _dc_event_emitter dc_event_emitter_t; /** @@ -193,7 +194,10 @@ typedef struct _dc_event dc_event_t; /** * TODO: document */ -dc_event_t* dc_get_next_event(dc_context_t* context); +dc_event_t* dc_get_next_event(dc_event_emitter_t* emitter); + +dc_event_emitter_t* dc_get_event_emitter(dc_context_t* context); +void dc_event_emitter_unref(dc_event_emitter_t* emitter); int dc_event_get_id (dc_event_t* event); int dc_event_get_data1_int(dc_event_t* event); diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index 739cdc0ec..17d1aed7f 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -91,7 +91,7 @@ impl ContextWrapper { /// logfile rather than being shown directly to the user. unsafe fn warning(&self, msg: &str) { self.with_inner(|ctx| { - ctx.call_cb(Event::Warning(msg.to_string())); + ctx.emit_event(Event::Warning(msg.to_string())); }) .unwrap(); } @@ -650,20 +650,45 @@ pub unsafe extern "C" fn dc_event_get_data3_str(event: *mut dc_event_t) -> *mut } #[no_mangle] -pub unsafe extern "C" fn dc_get_next_event(context: *mut dc_context_t) -> *mut dc_event_t { +pub type dc_event_emitter_t = EventEmitter; + +#[no_mangle] +pub unsafe extern "C" fn dc_get_event_emitter( + context: *mut dc_context_t, +) -> *mut dc_event_emitter_t { if context.is_null() { + eprintln!("ignoring careless call to dc_get_event_emitter()"); return ptr::null_mut(); } let ffi_context = &*context; - ffi_context - .with_inner(|ctx| match ctx.get_next_event() { - Ok(ev) => Box::into_raw(Box::new(ev)), - Err(_) => ptr::null_mut(), - }) + .with_inner(|ctx| Box::into_raw(Box::new(ctx.get_event_emitter()))) .unwrap_or_else(|_| ptr::null_mut()) } +#[no_mangle] +pub unsafe extern "C" fn dc_event_emitter_unref(emitter: *mut dc_event_emitter_t) { + if emitter.is_null() { + eprintln!("ignoring careless call to dc_event_mitter_unref()"); + return; + } + + Box::from_raw(emitter); +} + +#[no_mangle] +pub unsafe extern "C" fn dc_get_next_event(events: *mut dc_event_emitter_t) -> *mut dc_event_t { + if events.is_null() { + return ptr::null_mut(); + } + let events = &*events; + + events + .recv_sync() + .map(|ev| Box::into_raw(Box::new(ev))) + .unwrap_or_else(|| ptr::null_mut()) +} + #[no_mangle] pub unsafe extern "C" fn dc_stop_io(context: *mut dc_context_t) { if context.is_null() { diff --git a/examples/repl/cmdline.rs b/examples/repl/cmdline.rs index dc4659311..619988837 100644 --- a/examples/repl/cmdline.rs +++ b/examples/repl/cmdline.rs @@ -86,7 +86,7 @@ async fn reset_tables(context: &Context, bits: i32) { println!("(8) Rest but server config reset."); } - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { chat_id: ChatId::new(0), msg_id: MsgId::new(0), }); @@ -157,7 +157,7 @@ async fn poke_spec(context: &Context, spec: Option<&str>) -> bool { } println!("Import: {} items read from \"{}\".", read_cnt, &real_spec); if read_cnt > 0 { - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { chat_id: ChatId::new(0), msg_id: MsgId::new(0), }); @@ -1030,7 +1030,7 @@ pub async fn cmdline(context: Context, line: &str, chat_id: &mut ChatId) -> Resu // ensure!(!arg1.is_empty(), "Argument missing."); // let event = arg1.parse()?; // let event = Event::from_u32(event).ok_or(format_err!("Event::from_u32({})", event))?; - // let r = context.call_cb(event, 0 as libc::uintptr_t, 0 as libc::uintptr_t); + // let r = context.emit_event(event, 0 as libc::uintptr_t, 0 as libc::uintptr_t); // println!( // "Sending event {:?}({}), received value {}.", // event, event as usize, r as libc::c_int, diff --git a/examples/repl/main.rs b/examples/repl/main.rs index 04d0fe105..86a2088f6 100644 --- a/examples/repl/main.rs +++ b/examples/repl/main.rs @@ -275,10 +275,10 @@ async fn start(args: Vec) -> Result<(), Error> { } let context = Context::new("CLI".into(), Path::new(&args[1]).to_path_buf()).await?; - let ctx = context.clone(); + let events = context.get_event_emitter(); async_std::task::spawn(async move { loop { - if let Ok(event) = ctx.get_next_event() { + if let Some(event) = events.recv().await { receive_event(event); } } diff --git a/examples/simple.rs b/examples/simple.rs index 2542c89cd..694d901f0 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -38,10 +38,10 @@ async fn main() { let duration = time::Duration::from_millis(4000); println!("info: {:#?}", info); - let ctx1 = ctx.clone(); + let events = ctx.get_event_emitter(); async_std::task::spawn(async move { loop { - if let Ok(event) = ctx1.get_next_event() { + if let Some(event) = events.recv().await { cb(event); } } diff --git a/python/src/deltachat/account.py b/python/src/deltachat/account.py index fba5b6186..3925d3292 100644 --- a/python/src/deltachat/account.py +++ b/python/src/deltachat/account.py @@ -613,24 +613,23 @@ class Account(object): else: self.log("stop_scheduler called on non-running context") - def shutdown(self, wait=True): + def shutdown(self): """ shutdown and destroy account (stop callback thread, close and remove - underlying dc_context.""" + underlying dc_context).""" dc_context = self._dc_context if dc_context is None: return - if self._event_thread.is_alive(): - self.log("stop threads") - self._event_thread.stop(wait=False) - self.stop_scheduler() self.log("dc_close") + # the dc_close triggers get_next_event to return ffi.NULL + # which in turns makes the event thread finish execution lib.dc_close(dc_context) - self.log("wait threads for real") - if wait: - self._event_thread.stop(wait=wait) + + self.log("wait for event thread to finish") + self._event_thread.wait() + self._dc_context = None atexit.unregister(self.shutdown) self._shutdown_event.set() diff --git a/python/src/deltachat/events.py b/python/src/deltachat/events.py index 173c6f983..10a59fc2c 100644 --- a/python/src/deltachat/events.py +++ b/python/src/deltachat/events.py @@ -134,7 +134,6 @@ class EventThread(threading.Thread): def __init__(self, account): self.account = account self._dc_context = account._dc_context - self._thread_quitflag = False super(EventThread, self).__init__(name="events") self.start() @@ -144,15 +143,12 @@ class EventThread(threading.Thread): yield self.account.log(message + " FINISHED") - def stop(self, wait=False): - self._thread_quitflag = True - - if wait: - if self == threading.current_thread(): - # we are in the callback thread and thus cannot - # wait for the thread-loop to finish. - return - self.join() + def wait(self): + if self == threading.current_thread(): + # we are in the callback thread and thus cannot + # wait for the thread-loop to finish. + return + self.join() def run(self): """ get and run events until shutdown. """ @@ -160,8 +156,12 @@ class EventThread(threading.Thread): self._inner_run() def _inner_run(self): - while lib.dc_is_open(self._dc_context) and not self._thread_quitflag: - event = lib.dc_get_next_event(self._dc_context) + event_emitter = ffi.gc( + lib.dc_get_event_emitter(self._dc_context), + lib.dc_event_emitter_unref, + ) + while 1: + event = lib.dc_get_next_event(event_emitter) if event == ffi.NULL: break evt = lib.dc_event_get_id(event) @@ -180,14 +180,7 @@ class EventThread(threading.Thread): for name, kwargs in self._map_ffi_event(ffi_event): # self.account.log("calling hook name={} kwargs={}".format(name, kwargs)) hook = getattr(self.account._pm.hook, name) - try: - hook(**kwargs) - except Exception: - # don't bother logging this error - # if dc_close() was concurrently called - # (note: core API starts failing after that) - if not self._thread_quitflag: - raise + hook(**kwargs) def _map_ffi_event(self, ffi_event): name = ffi_event.name diff --git a/src/blob.rs b/src/blob.rs index 919c42f9b..bbb5635c9 100644 --- a/src/blob.rs +++ b/src/blob.rs @@ -63,7 +63,7 @@ impl<'a> BlobObject<'a> { blobdir, name: format!("$BLOBDIR/{}", name), }; - context.call_cb(Event::NewBlobFile(blob.as_name().to_string())); + context.emit_event(Event::NewBlobFile(blob.as_name().to_string())); Ok(blob) } @@ -151,7 +151,7 @@ impl<'a> BlobObject<'a> { blobdir: context.get_blobdir(), name: format!("$BLOBDIR/{}", name), }; - context.call_cb(Event::NewBlobFile(blob.as_name().to_string())); + context.emit_event(Event::NewBlobFile(blob.as_name().to_string())); Ok(blob) } diff --git a/src/chat.rs b/src/chat.rs index 4caa095ab..00921bc6c 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -174,7 +174,7 @@ impl ChatId { ) .await?; - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { msg_id: MsgId::new(0), chat_id: ChatId::new(0), }); @@ -231,7 +231,7 @@ impl ChatId { .execute("DELETE FROM chats WHERE id=?;", paramsv![self]) .await?; - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { msg_id: MsgId::new(0), chat_id: ChatId::new(0), }); @@ -256,7 +256,7 @@ impl ChatId { }; if changed { - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { chat_id: self, msg_id: MsgId::new(0), }); @@ -1098,7 +1098,7 @@ pub async fn create_by_msg_id(context: &Context, msg_id: MsgId) -> Result Result< } }; - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { chat_id: ChatId::new(0), msg_id: MsgId::new(0), }); @@ -1307,7 +1307,7 @@ pub async fn prepare_msg( msg.state = MessageState::OutPreparing; let msg_id = prepare_msg_common(context, chat_id, msg).await?; - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { chat_id: msg.chat_id, msg_id: msg.id, }); @@ -1476,13 +1476,13 @@ async fn send_msg_inner( } job::send_msg(context, msg.id).await?; - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { chat_id: msg.chat_id, msg_id: msg.id, }); if msg.param.exists(Param::SetLatitude) { - context.call_cb(Event::LocationChanged(Some(DC_CONTACT_ID_SELF))); + context.emit_event(Event::LocationChanged(Some(DC_CONTACT_ID_SELF))); } Ok(msg.id) @@ -1514,7 +1514,7 @@ pub async fn get_chat_msgs( Err(err) => warn!(context, "Failed to delete expired messages: {}", err), Ok(messages_deleted) => { if messages_deleted { - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { msg_id: MsgId::new(0), chat_id: ChatId::new(0), }) @@ -1635,7 +1635,7 @@ pub async fn marknoticed_chat(context: &Context, chat_id: ChatId) -> Result<(), ) .await?; - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { chat_id: ChatId::new(0), msg_id: MsgId::new(0), }); @@ -1667,7 +1667,7 @@ pub async fn marknoticed_all_chats(context: &Context) -> Result<(), Error> { ) .await?; - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { msg_id: MsgId::new(0), chat_id: ChatId::new(0), }); @@ -1884,7 +1884,7 @@ pub async fn create_group_chat( chat_id.set_draft_raw(context, &mut draft_msg).await; } - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { msg_id: MsgId::new(0), chat_id: ChatId::new(0), }); @@ -2043,7 +2043,7 @@ pub(crate) async fn add_contact_to_chat_ex( msg.param.set_int(Param::Arg2, from_handshake.into()); msg.id = send_msg(context, chat_id, &mut msg).await?; } - context.call_cb(Event::ChatModified(chat_id)); + context.emit_event(Event::ChatModified(chat_id)); Ok(true) } @@ -2205,7 +2205,7 @@ pub async fn set_muted( .await .is_ok() { - context.call_cb(Event::ChatModified(chat_id)); + context.emit_event(Event::ChatModified(chat_id)); } else { bail!("Failed to set mute duration, chat might not exist -"); } @@ -2285,7 +2285,7 @@ pub async fn remove_contact_from_chat( // removed it first, it would complicate the // check/encryption logic. success = remove_from_chat_contacts_table(context, chat_id, contact_id).await; - context.call_cb(Event::ChatModified(chat_id)); + context.emit_event(Event::ChatModified(chat_id)); } } } @@ -2375,12 +2375,12 @@ pub async fn set_chat_name( msg.param.set(Param::Arg, &chat.name); } msg.id = send_msg(context, chat_id, &mut msg).await?; - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { chat_id, msg_id: msg.id, }); } - context.call_cb(Event::ChatModified(chat_id)); + context.emit_event(Event::ChatModified(chat_id)); success = true; } } @@ -2540,7 +2540,7 @@ pub async fn forward_msgs( } } for (chat_id, msg_id) in created_chats.iter().zip(created_msgs.iter()) { - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { chat_id: *chat_id, msg_id: *msg_id, }); @@ -2663,7 +2663,7 @@ pub async fn add_device_msg( } if !msg_id.is_unset() { - context.call_cb(Event::IncomingMsg { chat_id, msg_id }); + context.emit_event(Event::IncomingMsg { chat_id, msg_id }); } Ok(msg_id) @@ -2733,7 +2733,7 @@ pub(crate) async fn add_info_msg(context: &Context, chat_id: ChatId, text: impl .get_rowid(context, "msgs", "rfc724_mid", &rfc724_mid) .await .unwrap_or_default(); - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { chat_id, msg_id: MsgId::new(row_id), }); diff --git a/src/config.rs b/src/config.rs index 34a5f69b9..9cc1f4edd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -225,7 +225,7 @@ impl Context { Config::DeleteDeviceAfter => { let ret = self.sql.set_raw_config(self, key, value).await; // Force chatlist reload to delete old messages immediately. - self.call_cb(Event::MsgsChanged { + self.emit_event(Event::MsgsChanged { msg_id: MsgId::new(0), chat_id: ChatId::new(0), }); diff --git a/src/configure/mod.rs b/src/configure/mod.rs index bb981f6d4..94b58ab3c 100644 --- a/src/configure/mod.rs +++ b/src/configure/mod.rs @@ -28,7 +28,7 @@ macro_rules! progress { $progress <= 1000, "value in range 0..1000 expected with: 0=error, 1..999=progress, 1000=success" ); - $context.call_cb($crate::events::Event::ConfigureProgress($progress)); + $context.emit_event($crate::events::Event::ConfigureProgress($progress)); }; } diff --git a/src/contact.rs b/src/contact.rs index 9b9dd1c97..0fff01025 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -241,7 +241,7 @@ impl Contact { let (contact_id, sth_modified) = Contact::add_or_lookup(context, name, addr, Origin::ManuallyCreated).await?; let blocked = Contact::is_blocked_load(context, contact_id).await; - context.call_cb(Event::ContactsChanged( + context.emit_event(Event::ContactsChanged( if sth_modified == Modifier::Created { Some(contact_id) } else { @@ -269,7 +269,7 @@ impl Contact { .await .is_ok() { - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { chat_id: ChatId::new(0), msg_id: MsgId::new(0), }); @@ -528,7 +528,7 @@ impl Contact { } } if modify_cnt > 0 { - context.call_cb(Event::ContactsChanged(None)); + context.emit_event(Event::ContactsChanged(None)); } Ok(modify_cnt) @@ -777,7 +777,7 @@ impl Contact { .await { Ok(_) => { - context.call_cb(Event::ContactsChanged(None)); + context.emit_event(Event::ContactsChanged(None)); return Ok(()); } Err(err) => { @@ -1054,7 +1054,7 @@ async fn set_block_contact(context: &Context, contact_id: u32, new_blocking: boo paramsv![new_blocking, 100, contact_id as i32], ).await.is_ok() { Contact::mark_noticed(context, contact_id).await; - context.call_cb(Event::ContactsChanged(None)); + context.emit_event(Event::ContactsChanged(None)); } } } @@ -1080,7 +1080,7 @@ pub(crate) async fn set_profile_image( }; if changed { contact.update_param(context).await?; - context.call_cb(Event::ContactsChanged(Some(contact_id))); + context.emit_event(Event::ContactsChanged(Some(contact_id))); } Ok(()) } diff --git a/src/context.rs b/src/context.rs index c3b614e16..57067df8a 100644 --- a/src/context.rs +++ b/src/context.rs @@ -4,10 +4,8 @@ use std::collections::{BTreeMap, HashMap}; use std::ffi::OsString; use std::ops::Deref; -use anyhow::anyhow; use async_std::path::{Path, PathBuf}; use async_std::sync::{channel, Arc, Mutex, Receiver, RwLock, Sender}; -use crossbeam_channel::{unbounded, Receiver as SyncReceiver, Sender as SyncSender}; use crate::chat::*; use crate::config::Config; @@ -15,7 +13,7 @@ use crate::constants::*; use crate::contact::*; use crate::dc_tools::duration_to_str; use crate::error::*; -use crate::events::Event; +use crate::events::{Event, EventEmitter, Events}; use crate::job::{self, Action}; use crate::key::{DcKey, Key, SignedPublicKey}; use crate::login_param::LoginParam; @@ -55,7 +53,7 @@ pub struct InnerContext { /// Mutex to enforce only a single running oauth2 is running. pub(crate) oauth2_mutex: Mutex<()>, pub(crate) translated_stockstrings: RwLock>, - pub(crate) logs: (SyncSender, SyncReceiver), + pub(crate) events: Events, pub(crate) scheduler: RwLock, @@ -121,7 +119,7 @@ impl Context { generating_key_mutex: Mutex::new(()), oauth2_mutex: Mutex::new(()), translated_stockstrings: RwLock::new(HashMap::new()), - logs: unbounded(), + events: Events::default(), scheduler: RwLock::new(Scheduler::Stopped), creation_time: std::time::SystemTime::now(), }; @@ -172,14 +170,14 @@ impl Context { self.blobdir.as_path() } - pub fn call_cb(&self, event: Event) { - self.logs.0.send(event).expect("failed to send event"); + /// Emits a single event. + pub fn emit_event(&self, event: Event) { + self.events.emit(event); } - pub fn get_next_event(&self) -> Result { - let event = self.logs.1.recv().map_err(|err| anyhow!("{}", err))?; - - Ok(event) + /// Get the next queued event. + pub fn get_event_emitter(&self) -> EventEmitter { + self.events.get_emitter() } // Ongoing process allocation/free/check diff --git a/src/dc_receive_imf.rs b/src/dc_receive_imf.rs index 3219e908b..bcb6e39a2 100644 --- a/src/dc_receive_imf.rs +++ b/src/dc_receive_imf.rs @@ -94,7 +94,7 @@ pub async fn dc_receive_imf( CreateEvent::MsgsChanged => Event::MsgsChanged { msg_id, chat_id }, CreateEvent::IncomingMsg => Event::IncomingMsg { msg_id, chat_id }, }; - context.call_cb(event); + context.emit_event(event); } } }; @@ -197,7 +197,7 @@ pub async fn dc_receive_imf( if let Some(avatar_action) = &mime_parser.user_avatar { match contact::set_profile_image(&context, from_id, avatar_action).await { Ok(()) => { - context.call_cb(Event::ChatModified(chat_id)); + context.emit_event(Event::ChatModified(chat_id)); } Err(err) => { warn!(context, "reveive_imf cannot update profile image: {}", err); @@ -806,7 +806,7 @@ async fn save_locations( } } if send_event { - context.call_cb(Event::LocationChanged(Some(from_id))); + context.emit_event(Event::LocationChanged(Some(from_id))); } } @@ -1120,7 +1120,7 @@ async fn create_or_lookup_group( .await .is_ok() { - context.call_cb(Event::ChatModified(chat_id)); + context.emit_event(Event::ChatModified(chat_id)); } } } @@ -1179,7 +1179,7 @@ async fn create_or_lookup_group( } if send_EVENT_CHAT_MODIFIED { - context.call_cb(Event::ChatModified(chat_id)); + context.emit_event(Event::ChatModified(chat_id)); } Ok((chat_id, chat_id_blocked)) } @@ -1300,7 +1300,7 @@ async fn create_or_lookup_adhoc_group( chat::add_to_chat_contacts_table(context, new_chat_id, member_id).await; } - context.call_cb(Event::ChatModified(new_chat_id)); + context.emit_event(Event::ChatModified(new_chat_id)); Ok((new_chat_id, create_blocked)) } diff --git a/src/dc_tools.rs b/src/dc_tools.rs index 14286b9fd..63fe7626f 100644 --- a/src/dc_tools.rs +++ b/src/dc_tools.rs @@ -283,7 +283,7 @@ pub(crate) async fn dc_delete_file(context: &Context, path: impl AsRef) -> let dpath = format!("{}", path.as_ref().to_string_lossy()); match fs::remove_file(path_abs).await { Ok(_) => { - context.call_cb(Event::DeletedBlobFile(dpath)); + context.emit_event(Event::DeletedBlobFile(dpath)); true } Err(err) => { diff --git a/src/events.rs b/src/events.rs index bcab73e8b..c0e1e5901 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,12 +1,66 @@ //! # Events specification use async_std::path::PathBuf; +use crossbeam_channel::{bounded as channel, Receiver, Sender, TrySendError}; use strum::EnumProperty; use crate::chat::ChatId; use crate::message::MsgId; +#[derive(Debug)] +pub struct Events { + receiver: Receiver, + sender: Sender, +} + +impl Default for Events { + fn default() -> Self { + let (sender, receiver) = channel(1_000); + + Self { receiver, sender } + } +} + +impl Events { + pub fn emit(&self, event: Event) { + match self.sender.try_send(event) { + Ok(()) => {} + Err(TrySendError::Full(event)) => { + // when we are full, we pop remove the oldest event and push on the new one + let _ = self.receiver.try_recv(); + + // try again + self.emit(event); + } + Err(TrySendError::Disconnected(_)) => { + unreachable!("unable to emit event, channel disconnected"); + } + } + } + + /// Retrieve the event emitter. + pub fn get_emitter(&self) -> EventEmitter { + EventEmitter(self.receiver.clone()) + } +} + +#[derive(Debug, Clone)] +pub struct EventEmitter(Receiver); + +impl EventEmitter { + /// Blocking recv of an event. Return `None` if the `Sender` has been droped. + pub fn recv_sync(&self) -> Option { + self.0.recv().ok() + } + + /// Blocking async recv of an event. Return `None` if the `Sender` has been droped. + pub async fn recv(&self) -> Option { + // TODO: change once we can use async channels internally. + self.0.recv().ok() + } +} + impl Event { /// Returns the corresponding Event id. pub fn as_id(&self) -> i32 { diff --git a/src/imex.rs b/src/imex.rs index 15eb7ab54..2c443e846 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -377,7 +377,7 @@ async fn imex_inner( ensure!(param.is_some(), "No Import/export dir/file given."); info!(context, "Import/export process started."); - context.call_cb(Event::ImexProgress(10)); + context.emit_event(Event::ImexProgress(10)); ensure!(context.sql.is_open().await, "Database not opened."); @@ -401,11 +401,11 @@ async fn imex_inner( match success { Ok(()) => { info!(context, "IMEX successfully completed"); - context.call_cb(Event::ImexProgress(1000)); + context.emit_event(Event::ImexProgress(1000)); Ok(()) } Err(err) => { - context.call_cb(Event::ImexProgress(0)); + context.emit_event(Event::ImexProgress(0)); bail!("IMEX FAILED to complete: {}", err); } } @@ -489,7 +489,7 @@ async fn import_backup(context: &Context, backup_to_import: impl AsRef) -> if permille > 990 { permille = 990 } - context.call_cb(Event::ImexProgress(permille)); + context.emit_event(Event::ImexProgress(permille)); if file_blob.is_empty() { continue; } @@ -565,7 +565,7 @@ async fn export_backup(context: &Context, dir: impl AsRef) -> Result<()> { dest_sql .set_raw_config_int(context, "backup_time", now as i32) .await?; - context.call_cb(Event::ImexFileWritten(dest_path_filename)); + context.emit_event(Event::ImexFileWritten(dest_path_filename)); Ok(()) } }; @@ -604,7 +604,7 @@ async fn add_files_to_export(context: &Context, sql: &Sql) -> Result<()> { } processed_files_cnt += 1; let permille = max(min(processed_files_cnt * 1000 / total_files_cnt, 990), 10); - context.call_cb(Event::ImexProgress(permille)); + context.emit_event(Event::ImexProgress(permille)); let name_f = entry.file_name(); let name = name_f.to_string_lossy(); @@ -761,7 +761,7 @@ async fn export_key_to_asc_file( if res.is_err() { error!(context, "Cannot write key to {}", file_name.display()); } else { - context.call_cb(Event::ImexFileWritten(file_name)); + context.emit_event(Event::ImexFileWritten(file_name)); } res } diff --git a/src/job.rs b/src/job.rs index fd4839b4d..98a7a3ad4 100644 --- a/src/job.rs +++ b/src/job.rs @@ -691,7 +691,7 @@ async fn set_delivered(context: &Context, msg_id: MsgId) { ) .await .unwrap_or_default(); - context.call_cb(Event::MsgDelivered { chat_id, msg_id }); + context.emit_event(Event::MsgDelivered { chat_id, msg_id }); } // special case for DC_JOB_SEND_MSG_TO_SMTP diff --git a/src/location.rs b/src/location.rs index fee937438..d36450bbf 100644 --- a/src/location.rs +++ b/src/location.rs @@ -227,7 +227,7 @@ pub async fn send_locations_to_chat(context: &Context, chat_id: ChatId, seconds: .await; chat::add_info_msg(context, chat_id, stock_str).await; } - context.call_cb(Event::ChatModified(chat_id)); + context.emit_event(Event::ChatModified(chat_id)); if 0 != seconds { schedule_maybe_send_locations(context, false).await; job::add( @@ -302,7 +302,7 @@ pub async fn set(context: &Context, latitude: f64, longitude: f64, accuracy: f64 } } if continue_streaming { - context.call_cb(Event::LocationChanged(Some(DC_CONTACT_ID_SELF))); + context.emit_event(Event::LocationChanged(Some(DC_CONTACT_ID_SELF))); }; schedule_maybe_send_locations(context, false).await; } @@ -382,7 +382,7 @@ pub async fn delete_all(context: &Context) -> Result<(), Error> { .sql .execute("DELETE FROM locations;", paramsv![]) .await?; - context.call_cb(Event::LocationChanged(None)); + context.emit_event(Event::LocationChanged(None)); Ok(()) } @@ -714,7 +714,7 @@ pub(crate) async fn job_maybe_send_locations_ended( .stock_system_msg(StockMessage::MsgLocationDisabled, "", "", 0) .await; chat::add_info_msg(context, chat_id, stock_str).await; - context.call_cb(Event::ChatModified(chat_id)); + context.emit_event(Event::ChatModified(chat_id)); } } job::Status::Finished(Ok(())) diff --git a/src/log.rs b/src/log.rs index db56c1f70..38e3c2f9e 100644 --- a/src/log.rs +++ b/src/log.rs @@ -44,6 +44,6 @@ macro_rules! error { #[macro_export] macro_rules! emit_event { ($ctx:expr, $event:expr) => { - $ctx.call_cb($event); + $ctx.emit_event($event); }; } diff --git a/src/message.rs b/src/message.rs index 8e63fc602..59951b265 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1030,7 +1030,7 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) { } if !msg_ids.is_empty() { - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { chat_id: ChatId::new(0), msg_id: MsgId::new(0), }); @@ -1112,7 +1112,7 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec) -> bool { } if send_event { - context.call_cb(Event::MsgsChanged { + context.emit_event(Event::MsgsChanged { chat_id: ChatId::new(0), msg_id: MsgId::new(0), }); @@ -1270,7 +1270,7 @@ pub async fn set_msg_failed(context: &Context, msg_id: MsgId, error: Option= 0 && $progress <= 1000, "value in range 0..1000 expected with: 0=error, 1..999=progress, 1000=success" ); - $context.call_cb($crate::events::Event::SecurejoinJoinerProgress { + $context.emit_event($crate::events::Event::SecurejoinJoinerProgress { contact_id: $contact_id, progress: $progress, }); @@ -45,7 +45,7 @@ macro_rules! inviter_progress { $progress >= 0 && $progress <= 1000, "value in range 0..1000 expected with: 0=error, 1..999=progress, 1000=success" ); - $context.call_cb($crate::events::Event::SecurejoinInviterProgress { + $context.emit_event($crate::events::Event::SecurejoinInviterProgress { contact_id: $contact_id, progress: $progress, }); diff --git a/src/smtp/mod.rs b/src/smtp/mod.rs index cf57df20e..3a4188d6a 100644 --- a/src/smtp/mod.rs +++ b/src/smtp/mod.rs @@ -98,7 +98,7 @@ impl Smtp { } if lp.send_server.is_empty() || lp.send_port == 0 { - context.call_cb(Event::ErrorNetwork("SMTP bad parameters.".into())); + context.emit_event(Event::ErrorNetwork("SMTP bad parameters.".into())); return Err(Error::BadParameters); } @@ -183,7 +183,7 @@ impl Smtp { self.transport = Some(trans); self.last_success = Some(Instant::now()); - context.call_cb(Event::SmtpConnected(format!( + context.emit_event(Event::SmtpConnected(format!( "SMTP-LOGIN as {} ok", lp.send_user, ))); diff --git a/src/smtp/send.rs b/src/smtp/send.rs index cade602f2..350d705e5 100644 --- a/src/smtp/send.rs +++ b/src/smtp/send.rs @@ -49,7 +49,7 @@ impl Smtp { if let Some(ref mut transport) = self.transport { transport.send(mail).await.map_err(Error::SendError)?; - context.call_cb(Event::SmtpMessageSent(format!( + context.emit_event(Event::SmtpMessageSent(format!( "Message len={} was smtp-sent to {}", message_len, recipients_display )));