feat: use EventEmitter for events

This commit is contained in:
Friedel Ziegelmayer
2020-05-22 21:03:01 +02:00
committed by GitHub
parent 4b4e6e1732
commit 014d2946b2
26 changed files with 192 additions and 117 deletions

View File

@@ -63,7 +63,7 @@ impl<'a> BlobObject<'a> {
blobdir,
name: format!("$BLOBDIR/{}", name),
};
context.call_cb(Event::NewBlobFile(blob.as_name().to_string()));
context.emit_event(Event::NewBlobFile(blob.as_name().to_string()));
Ok(blob)
}
@@ -151,7 +151,7 @@ impl<'a> BlobObject<'a> {
blobdir: context.get_blobdir(),
name: format!("$BLOBDIR/{}", name),
};
context.call_cb(Event::NewBlobFile(blob.as_name().to_string()));
context.emit_event(Event::NewBlobFile(blob.as_name().to_string()));
Ok(blob)
}

View File

@@ -174,7 +174,7 @@ impl ChatId {
)
.await?;
context.call_cb(Event::MsgsChanged {
context.emit_event(Event::MsgsChanged {
msg_id: MsgId::new(0),
chat_id: ChatId::new(0),
});
@@ -231,7 +231,7 @@ impl ChatId {
.execute("DELETE FROM chats WHERE id=?;", paramsv![self])
.await?;
context.call_cb(Event::MsgsChanged {
context.emit_event(Event::MsgsChanged {
msg_id: MsgId::new(0),
chat_id: ChatId::new(0),
});
@@ -256,7 +256,7 @@ impl ChatId {
};
if changed {
context.call_cb(Event::MsgsChanged {
context.emit_event(Event::MsgsChanged {
chat_id: self,
msg_id: MsgId::new(0),
});
@@ -1098,7 +1098,7 @@ pub async fn create_by_msg_id(context: &Context, msg_id: MsgId) -> Result<ChatId
chat.id.unblock(context).await;
// Sending with 0s as data since multiple messages may have changed.
context.call_cb(Event::MsgsChanged {
context.emit_event(Event::MsgsChanged {
chat_id: ChatId::new(0),
msg_id: MsgId::new(0),
});
@@ -1140,7 +1140,7 @@ pub async fn create_by_contact_id(context: &Context, contact_id: u32) -> Result<
}
};
context.call_cb(Event::MsgsChanged {
context.emit_event(Event::MsgsChanged {
chat_id: ChatId::new(0),
msg_id: MsgId::new(0),
});
@@ -1307,7 +1307,7 @@ pub async fn prepare_msg(
msg.state = MessageState::OutPreparing;
let msg_id = prepare_msg_common(context, chat_id, msg).await?;
context.call_cb(Event::MsgsChanged {
context.emit_event(Event::MsgsChanged {
chat_id: msg.chat_id,
msg_id: msg.id,
});
@@ -1476,13 +1476,13 @@ async fn send_msg_inner(
}
job::send_msg(context, msg.id).await?;
context.call_cb(Event::MsgsChanged {
context.emit_event(Event::MsgsChanged {
chat_id: msg.chat_id,
msg_id: msg.id,
});
if msg.param.exists(Param::SetLatitude) {
context.call_cb(Event::LocationChanged(Some(DC_CONTACT_ID_SELF)));
context.emit_event(Event::LocationChanged(Some(DC_CONTACT_ID_SELF)));
}
Ok(msg.id)
@@ -1514,7 +1514,7 @@ pub async fn get_chat_msgs(
Err(err) => warn!(context, "Failed to delete expired messages: {}", err),
Ok(messages_deleted) => {
if messages_deleted {
context.call_cb(Event::MsgsChanged {
context.emit_event(Event::MsgsChanged {
msg_id: MsgId::new(0),
chat_id: ChatId::new(0),
})
@@ -1635,7 +1635,7 @@ pub async fn marknoticed_chat(context: &Context, chat_id: ChatId) -> Result<(),
)
.await?;
context.call_cb(Event::MsgsChanged {
context.emit_event(Event::MsgsChanged {
chat_id: ChatId::new(0),
msg_id: MsgId::new(0),
});
@@ -1667,7 +1667,7 @@ pub async fn marknoticed_all_chats(context: &Context) -> Result<(), Error> {
)
.await?;
context.call_cb(Event::MsgsChanged {
context.emit_event(Event::MsgsChanged {
msg_id: MsgId::new(0),
chat_id: ChatId::new(0),
});
@@ -1884,7 +1884,7 @@ pub async fn create_group_chat(
chat_id.set_draft_raw(context, &mut draft_msg).await;
}
context.call_cb(Event::MsgsChanged {
context.emit_event(Event::MsgsChanged {
msg_id: MsgId::new(0),
chat_id: ChatId::new(0),
});
@@ -2043,7 +2043,7 @@ pub(crate) async fn add_contact_to_chat_ex(
msg.param.set_int(Param::Arg2, from_handshake.into());
msg.id = send_msg(context, chat_id, &mut msg).await?;
}
context.call_cb(Event::ChatModified(chat_id));
context.emit_event(Event::ChatModified(chat_id));
Ok(true)
}
@@ -2205,7 +2205,7 @@ pub async fn set_muted(
.await
.is_ok()
{
context.call_cb(Event::ChatModified(chat_id));
context.emit_event(Event::ChatModified(chat_id));
} else {
bail!("Failed to set mute duration, chat might not exist -");
}
@@ -2285,7 +2285,7 @@ pub async fn remove_contact_from_chat(
// removed it first, it would complicate the
// check/encryption logic.
success = remove_from_chat_contacts_table(context, chat_id, contact_id).await;
context.call_cb(Event::ChatModified(chat_id));
context.emit_event(Event::ChatModified(chat_id));
}
}
}
@@ -2375,12 +2375,12 @@ pub async fn set_chat_name(
msg.param.set(Param::Arg, &chat.name);
}
msg.id = send_msg(context, chat_id, &mut msg).await?;
context.call_cb(Event::MsgsChanged {
context.emit_event(Event::MsgsChanged {
chat_id,
msg_id: msg.id,
});
}
context.call_cb(Event::ChatModified(chat_id));
context.emit_event(Event::ChatModified(chat_id));
success = true;
}
}
@@ -2540,7 +2540,7 @@ pub async fn forward_msgs(
}
}
for (chat_id, msg_id) in created_chats.iter().zip(created_msgs.iter()) {
context.call_cb(Event::MsgsChanged {
context.emit_event(Event::MsgsChanged {
chat_id: *chat_id,
msg_id: *msg_id,
});
@@ -2663,7 +2663,7 @@ pub async fn add_device_msg(
}
if !msg_id.is_unset() {
context.call_cb(Event::IncomingMsg { chat_id, msg_id });
context.emit_event(Event::IncomingMsg { chat_id, msg_id });
}
Ok(msg_id)
@@ -2733,7 +2733,7 @@ pub(crate) async fn add_info_msg(context: &Context, chat_id: ChatId, text: impl
.get_rowid(context, "msgs", "rfc724_mid", &rfc724_mid)
.await
.unwrap_or_default();
context.call_cb(Event::MsgsChanged {
context.emit_event(Event::MsgsChanged {
chat_id,
msg_id: MsgId::new(row_id),
});

View File

@@ -225,7 +225,7 @@ impl Context {
Config::DeleteDeviceAfter => {
let ret = self.sql.set_raw_config(self, key, value).await;
// Force chatlist reload to delete old messages immediately.
self.call_cb(Event::MsgsChanged {
self.emit_event(Event::MsgsChanged {
msg_id: MsgId::new(0),
chat_id: ChatId::new(0),
});

View File

@@ -28,7 +28,7 @@ macro_rules! progress {
$progress <= 1000,
"value in range 0..1000 expected with: 0=error, 1..999=progress, 1000=success"
);
$context.call_cb($crate::events::Event::ConfigureProgress($progress));
$context.emit_event($crate::events::Event::ConfigureProgress($progress));
};
}

View File

@@ -241,7 +241,7 @@ impl Contact {
let (contact_id, sth_modified) =
Contact::add_or_lookup(context, name, addr, Origin::ManuallyCreated).await?;
let blocked = Contact::is_blocked_load(context, contact_id).await;
context.call_cb(Event::ContactsChanged(
context.emit_event(Event::ContactsChanged(
if sth_modified == Modifier::Created {
Some(contact_id)
} else {
@@ -269,7 +269,7 @@ impl Contact {
.await
.is_ok()
{
context.call_cb(Event::MsgsChanged {
context.emit_event(Event::MsgsChanged {
chat_id: ChatId::new(0),
msg_id: MsgId::new(0),
});
@@ -528,7 +528,7 @@ impl Contact {
}
}
if modify_cnt > 0 {
context.call_cb(Event::ContactsChanged(None));
context.emit_event(Event::ContactsChanged(None));
}
Ok(modify_cnt)
@@ -777,7 +777,7 @@ impl Contact {
.await
{
Ok(_) => {
context.call_cb(Event::ContactsChanged(None));
context.emit_event(Event::ContactsChanged(None));
return Ok(());
}
Err(err) => {
@@ -1054,7 +1054,7 @@ async fn set_block_contact(context: &Context, contact_id: u32, new_blocking: boo
paramsv![new_blocking, 100, contact_id as i32],
).await.is_ok() {
Contact::mark_noticed(context, contact_id).await;
context.call_cb(Event::ContactsChanged(None));
context.emit_event(Event::ContactsChanged(None));
}
}
}
@@ -1080,7 +1080,7 @@ pub(crate) async fn set_profile_image(
};
if changed {
contact.update_param(context).await?;
context.call_cb(Event::ContactsChanged(Some(contact_id)));
context.emit_event(Event::ContactsChanged(Some(contact_id)));
}
Ok(())
}

View File

@@ -4,10 +4,8 @@ use std::collections::{BTreeMap, HashMap};
use std::ffi::OsString;
use std::ops::Deref;
use anyhow::anyhow;
use async_std::path::{Path, PathBuf};
use async_std::sync::{channel, Arc, Mutex, Receiver, RwLock, Sender};
use crossbeam_channel::{unbounded, Receiver as SyncReceiver, Sender as SyncSender};
use crate::chat::*;
use crate::config::Config;
@@ -15,7 +13,7 @@ use crate::constants::*;
use crate::contact::*;
use crate::dc_tools::duration_to_str;
use crate::error::*;
use crate::events::Event;
use crate::events::{Event, EventEmitter, Events};
use crate::job::{self, Action};
use crate::key::{DcKey, Key, SignedPublicKey};
use crate::login_param::LoginParam;
@@ -55,7 +53,7 @@ pub struct InnerContext {
/// Mutex to enforce only a single running oauth2 is running.
pub(crate) oauth2_mutex: Mutex<()>,
pub(crate) translated_stockstrings: RwLock<HashMap<usize, String>>,
pub(crate) logs: (SyncSender<Event>, SyncReceiver<Event>),
pub(crate) events: Events,
pub(crate) scheduler: RwLock<Scheduler>,
@@ -121,7 +119,7 @@ impl Context {
generating_key_mutex: Mutex::new(()),
oauth2_mutex: Mutex::new(()),
translated_stockstrings: RwLock::new(HashMap::new()),
logs: unbounded(),
events: Events::default(),
scheduler: RwLock::new(Scheduler::Stopped),
creation_time: std::time::SystemTime::now(),
};
@@ -172,14 +170,14 @@ impl Context {
self.blobdir.as_path()
}
pub fn call_cb(&self, event: Event) {
self.logs.0.send(event).expect("failed to send event");
/// Emits a single event.
pub fn emit_event(&self, event: Event) {
self.events.emit(event);
}
pub fn get_next_event(&self) -> Result<Event> {
let event = self.logs.1.recv().map_err(|err| anyhow!("{}", err))?;
Ok(event)
/// Get the next queued event.
pub fn get_event_emitter(&self) -> EventEmitter {
self.events.get_emitter()
}
// Ongoing process allocation/free/check

View File

@@ -94,7 +94,7 @@ pub async fn dc_receive_imf(
CreateEvent::MsgsChanged => Event::MsgsChanged { msg_id, chat_id },
CreateEvent::IncomingMsg => Event::IncomingMsg { msg_id, chat_id },
};
context.call_cb(event);
context.emit_event(event);
}
}
};
@@ -197,7 +197,7 @@ pub async fn dc_receive_imf(
if let Some(avatar_action) = &mime_parser.user_avatar {
match contact::set_profile_image(&context, from_id, avatar_action).await {
Ok(()) => {
context.call_cb(Event::ChatModified(chat_id));
context.emit_event(Event::ChatModified(chat_id));
}
Err(err) => {
warn!(context, "reveive_imf cannot update profile image: {}", err);
@@ -806,7 +806,7 @@ async fn save_locations(
}
}
if send_event {
context.call_cb(Event::LocationChanged(Some(from_id)));
context.emit_event(Event::LocationChanged(Some(from_id)));
}
}
@@ -1120,7 +1120,7 @@ async fn create_or_lookup_group(
.await
.is_ok()
{
context.call_cb(Event::ChatModified(chat_id));
context.emit_event(Event::ChatModified(chat_id));
}
}
}
@@ -1179,7 +1179,7 @@ async fn create_or_lookup_group(
}
if send_EVENT_CHAT_MODIFIED {
context.call_cb(Event::ChatModified(chat_id));
context.emit_event(Event::ChatModified(chat_id));
}
Ok((chat_id, chat_id_blocked))
}
@@ -1300,7 +1300,7 @@ async fn create_or_lookup_adhoc_group(
chat::add_to_chat_contacts_table(context, new_chat_id, member_id).await;
}
context.call_cb(Event::ChatModified(new_chat_id));
context.emit_event(Event::ChatModified(new_chat_id));
Ok((new_chat_id, create_blocked))
}

View File

@@ -283,7 +283,7 @@ pub(crate) async fn dc_delete_file(context: &Context, path: impl AsRef<Path>) ->
let dpath = format!("{}", path.as_ref().to_string_lossy());
match fs::remove_file(path_abs).await {
Ok(_) => {
context.call_cb(Event::DeletedBlobFile(dpath));
context.emit_event(Event::DeletedBlobFile(dpath));
true
}
Err(err) => {

View File

@@ -1,12 +1,66 @@
//! # Events specification
use async_std::path::PathBuf;
use crossbeam_channel::{bounded as channel, Receiver, Sender, TrySendError};
use strum::EnumProperty;
use crate::chat::ChatId;
use crate::message::MsgId;
#[derive(Debug)]
pub struct Events {
receiver: Receiver<Event>,
sender: Sender<Event>,
}
impl Default for Events {
fn default() -> Self {
let (sender, receiver) = channel(1_000);
Self { receiver, sender }
}
}
impl Events {
pub fn emit(&self, event: Event) {
match self.sender.try_send(event) {
Ok(()) => {}
Err(TrySendError::Full(event)) => {
// when we are full, we pop remove the oldest event and push on the new one
let _ = self.receiver.try_recv();
// try again
self.emit(event);
}
Err(TrySendError::Disconnected(_)) => {
unreachable!("unable to emit event, channel disconnected");
}
}
}
/// Retrieve the event emitter.
pub fn get_emitter(&self) -> EventEmitter {
EventEmitter(self.receiver.clone())
}
}
#[derive(Debug, Clone)]
pub struct EventEmitter(Receiver<Event>);
impl EventEmitter {
/// Blocking recv of an event. Return `None` if the `Sender` has been droped.
pub fn recv_sync(&self) -> Option<Event> {
self.0.recv().ok()
}
/// Blocking async recv of an event. Return `None` if the `Sender` has been droped.
pub async fn recv(&self) -> Option<Event> {
// TODO: change once we can use async channels internally.
self.0.recv().ok()
}
}
impl Event {
/// Returns the corresponding Event id.
pub fn as_id(&self) -> i32 {

View File

@@ -377,7 +377,7 @@ async fn imex_inner(
ensure!(param.is_some(), "No Import/export dir/file given.");
info!(context, "Import/export process started.");
context.call_cb(Event::ImexProgress(10));
context.emit_event(Event::ImexProgress(10));
ensure!(context.sql.is_open().await, "Database not opened.");
@@ -401,11 +401,11 @@ async fn imex_inner(
match success {
Ok(()) => {
info!(context, "IMEX successfully completed");
context.call_cb(Event::ImexProgress(1000));
context.emit_event(Event::ImexProgress(1000));
Ok(())
}
Err(err) => {
context.call_cb(Event::ImexProgress(0));
context.emit_event(Event::ImexProgress(0));
bail!("IMEX FAILED to complete: {}", err);
}
}
@@ -489,7 +489,7 @@ async fn import_backup(context: &Context, backup_to_import: impl AsRef<Path>) ->
if permille > 990 {
permille = 990
}
context.call_cb(Event::ImexProgress(permille));
context.emit_event(Event::ImexProgress(permille));
if file_blob.is_empty() {
continue;
}
@@ -565,7 +565,7 @@ async fn export_backup(context: &Context, dir: impl AsRef<Path>) -> Result<()> {
dest_sql
.set_raw_config_int(context, "backup_time", now as i32)
.await?;
context.call_cb(Event::ImexFileWritten(dest_path_filename));
context.emit_event(Event::ImexFileWritten(dest_path_filename));
Ok(())
}
};
@@ -604,7 +604,7 @@ async fn add_files_to_export(context: &Context, sql: &Sql) -> Result<()> {
}
processed_files_cnt += 1;
let permille = max(min(processed_files_cnt * 1000 / total_files_cnt, 990), 10);
context.call_cb(Event::ImexProgress(permille));
context.emit_event(Event::ImexProgress(permille));
let name_f = entry.file_name();
let name = name_f.to_string_lossy();
@@ -761,7 +761,7 @@ async fn export_key_to_asc_file(
if res.is_err() {
error!(context, "Cannot write key to {}", file_name.display());
} else {
context.call_cb(Event::ImexFileWritten(file_name));
context.emit_event(Event::ImexFileWritten(file_name));
}
res
}

View File

@@ -691,7 +691,7 @@ async fn set_delivered(context: &Context, msg_id: MsgId) {
)
.await
.unwrap_or_default();
context.call_cb(Event::MsgDelivered { chat_id, msg_id });
context.emit_event(Event::MsgDelivered { chat_id, msg_id });
}
// special case for DC_JOB_SEND_MSG_TO_SMTP

View File

@@ -227,7 +227,7 @@ pub async fn send_locations_to_chat(context: &Context, chat_id: ChatId, seconds:
.await;
chat::add_info_msg(context, chat_id, stock_str).await;
}
context.call_cb(Event::ChatModified(chat_id));
context.emit_event(Event::ChatModified(chat_id));
if 0 != seconds {
schedule_maybe_send_locations(context, false).await;
job::add(
@@ -302,7 +302,7 @@ pub async fn set(context: &Context, latitude: f64, longitude: f64, accuracy: f64
}
}
if continue_streaming {
context.call_cb(Event::LocationChanged(Some(DC_CONTACT_ID_SELF)));
context.emit_event(Event::LocationChanged(Some(DC_CONTACT_ID_SELF)));
};
schedule_maybe_send_locations(context, false).await;
}
@@ -382,7 +382,7 @@ pub async fn delete_all(context: &Context) -> Result<(), Error> {
.sql
.execute("DELETE FROM locations;", paramsv![])
.await?;
context.call_cb(Event::LocationChanged(None));
context.emit_event(Event::LocationChanged(None));
Ok(())
}
@@ -714,7 +714,7 @@ pub(crate) async fn job_maybe_send_locations_ended(
.stock_system_msg(StockMessage::MsgLocationDisabled, "", "", 0)
.await;
chat::add_info_msg(context, chat_id, stock_str).await;
context.call_cb(Event::ChatModified(chat_id));
context.emit_event(Event::ChatModified(chat_id));
}
}
job::Status::Finished(Ok(()))

View File

@@ -44,6 +44,6 @@ macro_rules! error {
#[macro_export]
macro_rules! emit_event {
($ctx:expr, $event:expr) => {
$ctx.call_cb($event);
$ctx.emit_event($event);
};
}

View File

@@ -1030,7 +1030,7 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) {
}
if !msg_ids.is_empty() {
context.call_cb(Event::MsgsChanged {
context.emit_event(Event::MsgsChanged {
chat_id: ChatId::new(0),
msg_id: MsgId::new(0),
});
@@ -1112,7 +1112,7 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> bool {
}
if send_event {
context.call_cb(Event::MsgsChanged {
context.emit_event(Event::MsgsChanged {
chat_id: ChatId::new(0),
msg_id: MsgId::new(0),
});
@@ -1270,7 +1270,7 @@ pub async fn set_msg_failed(context: &Context, msg_id: MsgId, error: Option<impl
.await
.is_ok()
{
context.call_cb(Event::MsgFailed {
context.emit_event(Event::MsgFailed {
chat_id: msg.chat_id,
msg_id,
});

View File

@@ -852,7 +852,7 @@ impl MimeMessage {
message::mdn_from_ext(context, from_id, original_message_id, sent_timestamp)
.await
{
context.call_cb(Event::MsgRead { chat_id, msg_id });
context.emit_event(Event::MsgRead { chat_id, msg_id });
}
}
}

View File

@@ -32,7 +32,7 @@ macro_rules! joiner_progress {
$progress >= 0 && $progress <= 1000,
"value in range 0..1000 expected with: 0=error, 1..999=progress, 1000=success"
);
$context.call_cb($crate::events::Event::SecurejoinJoinerProgress {
$context.emit_event($crate::events::Event::SecurejoinJoinerProgress {
contact_id: $contact_id,
progress: $progress,
});
@@ -45,7 +45,7 @@ macro_rules! inviter_progress {
$progress >= 0 && $progress <= 1000,
"value in range 0..1000 expected with: 0=error, 1..999=progress, 1000=success"
);
$context.call_cb($crate::events::Event::SecurejoinInviterProgress {
$context.emit_event($crate::events::Event::SecurejoinInviterProgress {
contact_id: $contact_id,
progress: $progress,
});

View File

@@ -98,7 +98,7 @@ impl Smtp {
}
if lp.send_server.is_empty() || lp.send_port == 0 {
context.call_cb(Event::ErrorNetwork("SMTP bad parameters.".into()));
context.emit_event(Event::ErrorNetwork("SMTP bad parameters.".into()));
return Err(Error::BadParameters);
}
@@ -183,7 +183,7 @@ impl Smtp {
self.transport = Some(trans);
self.last_success = Some(Instant::now());
context.call_cb(Event::SmtpConnected(format!(
context.emit_event(Event::SmtpConnected(format!(
"SMTP-LOGIN as {} ok",
lp.send_user,
)));

View File

@@ -49,7 +49,7 @@ impl Smtp {
if let Some(ref mut transport) = self.transport {
transport.send(mail).await.map_err(Error::SendError)?;
context.call_cb(Event::SmtpMessageSent(format!(
context.emit_event(Event::SmtpMessageSent(format!(
"Message len={} was smtp-sent to {}",
message_len, recipients_display
)));