Construct event channel outside of Context

This allows account manager to construct a single event channel and
inject it into all created contexts instead of aggregating events from
separate event emitters.
This commit is contained in:
link2xt
2022-06-11 15:52:55 +00:00
parent 8033966a70
commit 0ed3480258
12 changed files with 95 additions and 136 deletions

View File

@@ -2,18 +2,15 @@
use std::collections::BTreeMap;
use async_std::channel::{self, Receiver, Sender};
use async_std::fs;
use async_std::path::PathBuf;
use async_std::prelude::*;
use async_std::sync::{Arc, RwLock};
use uuid::Uuid;
use anyhow::{ensure, Context as _, Result};
use serde::{Deserialize, Serialize};
use crate::context::Context;
use crate::events::{Event, EventType, Events};
use crate::events::{Event, EventEmitter, EventType, Events};
/// Account manager, that can handle multiple accounts in a single place.
#[derive(Debug)]
@@ -21,7 +18,6 @@ pub struct Accounts {
dir: PathBuf,
config: Config,
accounts: BTreeMap<u32, Context>,
emitter: EventEmitter,
/// Event channel to emit account manager errors.
events: Events,
@@ -63,28 +59,16 @@ impl Accounts {
let config = Config::from_file(config_file)
.await
.context("failed to load accounts config")?;
let events = Events::new();
let accounts = config
.load_accounts()
.load_accounts(&events)
.await
.context("failed to load accounts")?;
let emitter = EventEmitter::new();
let events = Events::default();
emitter.sender.send(events.get_emitter()).await?;
for account in accounts.values() {
emitter.add_account(account).await.with_context(|| {
format!("failed to add account {} to event emitter", account.id)
})?;
}
Ok(Self {
dir,
config,
accounts,
emitter,
events,
})
}
@@ -121,8 +105,12 @@ impl Accounts {
pub async fn add_account(&mut self) -> Result<u32> {
let account_config = self.config.new_account(&self.dir).await?;
let ctx = Context::new(account_config.dbfile().into(), account_config.id).await?;
self.emitter.add_account(&ctx).await?;
let ctx = Context::new(
account_config.dbfile().into(),
account_config.id,
self.events.clone(),
)
.await?;
self.accounts.insert(account_config.id, ctx);
Ok(account_config.id)
@@ -132,8 +120,12 @@ impl Accounts {
pub async fn add_closed_account(&mut self) -> Result<u32> {
let account_config = self.config.new_account(&self.dir).await?;
let ctx = Context::new_closed(account_config.dbfile().into(), account_config.id).await?;
self.emitter.add_account(&ctx).await?;
let ctx = Context::new_closed(
account_config.dbfile().into(),
account_config.id,
self.events.clone(),
)
.await?;
self.accounts.insert(account_config.id, ctx);
Ok(account_config.id)
@@ -225,8 +217,7 @@ impl Accounts {
match res {
Ok(_) => {
let ctx = Context::new(new_dbfile, account_config.id).await?;
self.emitter.add_account(&ctx).await?;
let ctx = Context::new(new_dbfile, account_config.id, self.events.clone()).await?;
self.accounts.insert(account_config.id, ctx);
Ok(account_config.id)
}
@@ -302,74 +293,9 @@ impl Accounts {
self.events.emit(Event { id: 0, typ: event })
}
/// Returns unified event emitter.
/// Returns event emitter.
pub async fn get_event_emitter(&self) -> EventEmitter {
self.emitter.clone()
}
}
/// Unified event emitter for multiple accounts.
#[derive(Debug, Clone)]
pub struct EventEmitter {
/// Aggregate stream of events from all accounts.
stream: Arc<RwLock<futures::stream::SelectAll<crate::events::EventEmitter>>>,
/// Sender for the channel where new account emitters will be pushed.
sender: Sender<crate::events::EventEmitter>,
/// Receiver for the channel where new account emitters will be pushed.
receiver: Receiver<crate::events::EventEmitter>,
}
impl EventEmitter {
pub fn new() -> Self {
let (sender, receiver) = channel::unbounded();
Self {
stream: Arc::new(RwLock::new(futures::stream::SelectAll::new())),
sender,
receiver,
}
}
/// Blocking recv of an event. Return `None` if all `Sender`s have been droped.
pub fn recv_sync(&mut self) -> Option<Event> {
async_std::task::block_on(self.recv()).unwrap_or_default()
}
/// Async recv of an event. Return `None` if all `Sender`s have been dropped.
pub async fn recv(&mut self) -> Result<Option<Event>> {
let mut stream = self.stream.write().await;
loop {
match futures::future::select(self.receiver.recv(), stream.next()).await {
futures::future::Either::Left((emitter, _)) => {
stream.push(emitter?);
}
futures::future::Either::Right((ev, _)) => return Ok(ev),
}
}
}
/// Add event emitter of a new account to the aggregate event emitter.
pub async fn add_account(&self, context: &Context) -> Result<()> {
self.sender.send(context.get_event_emitter()).await?;
Ok(())
}
}
impl Default for EventEmitter {
fn default() -> Self {
Self::new()
}
}
impl async_std::stream::Stream for EventEmitter {
type Item = Event;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
std::pin::Pin::new(&mut self).poll_next(cx)
self.events.get_emitter()
}
}
@@ -426,17 +352,21 @@ impl Config {
Ok(Config { file, inner })
}
pub async fn load_accounts(&self) -> Result<BTreeMap<u32, Context>> {
pub async fn load_accounts(&self, events: &Events) -> Result<BTreeMap<u32, Context>> {
let mut accounts = BTreeMap::new();
for account_config in &self.inner.accounts {
let ctx = Context::new(account_config.dbfile().into(), account_config.id)
.await
.with_context(|| {
format!(
"failed to create context from file {:?}",
account_config.dbfile()
)
})?;
let ctx = Context::new(
account_config.dbfile().into(),
account_config.id,
events.clone(),
)
.await
.with_context(|| {
format!(
"failed to create context from file {:?}",
account_config.dbfile()
)
})?;
accounts.insert(account_config.id, ctx);
}
@@ -610,7 +540,9 @@ mod tests {
assert_eq!(accounts.config.get_selected_account().await, 0);
let extern_dbfile: PathBuf = dir.path().join("other").into();
let ctx = Context::new(extern_dbfile.clone(), 0).await.unwrap();
let ctx = Context::new(extern_dbfile.clone(), 0, Events::new())
.await
.unwrap();
ctx.set_config(crate::config::Config::Addr, Some("me@mail.com"))
.await
.unwrap();
@@ -746,7 +678,7 @@ mod tests {
assert_eq!(accounts.accounts.len(), 0);
// Create event emitter.
let mut event_emitter = accounts.get_event_emitter().await;
let event_emitter = accounts.get_event_emitter().await;
// Test that event emitter does not return `None` immediately.
let duration = std::time::Duration::from_millis(1);
@@ -756,7 +688,7 @@ mod tests {
// When account manager is dropped, event emitter is exhausted.
drop(accounts);
assert_eq!(event_emitter.recv().await?, None);
assert_eq!(event_emitter.recv().await, None);
Ok(())
}

View File

@@ -115,8 +115,8 @@ pub fn get_info() -> BTreeMap<&'static str, String> {
impl Context {
/// Creates new context and opens the database.
pub async fn new(dbfile: PathBuf, id: u32) -> Result<Context> {
let context = Self::new_closed(dbfile, id).await?;
pub async fn new(dbfile: PathBuf, id: u32, events: Events) -> Result<Context> {
let context = Self::new_closed(dbfile, id, events).await?;
// Open the database if is not encrypted.
if context.check_passphrase("".to_string()).await? {
@@ -126,7 +126,7 @@ impl Context {
}
/// Creates new context without opening the database.
pub async fn new_closed(dbfile: PathBuf, id: u32) -> Result<Context> {
pub async fn new_closed(dbfile: PathBuf, id: u32, events: Events) -> Result<Context> {
let mut blob_fname = OsString::new();
blob_fname.push(dbfile.file_name().unwrap_or_default());
blob_fname.push("-blobs");
@@ -134,7 +134,7 @@ impl Context {
if !blobdir.exists().await {
async_std::fs::create_dir_all(&blobdir).await?;
}
let context = Context::with_blobdir(dbfile, blobdir, id).await?;
let context = Context::with_blobdir(dbfile, blobdir, id, events).await?;
Ok(context)
}
@@ -169,6 +169,7 @@ impl Context {
dbfile: PathBuf,
blobdir: PathBuf,
id: u32,
events: Events,
) -> Result<Context> {
ensure!(
blobdir.is_dir().await,
@@ -186,7 +187,7 @@ impl Context {
oauth2_mutex: Mutex::new(()),
wrong_pw_warning_mutex: Mutex::new(()),
translated_stockstrings: RwLock::new(HashMap::new()),
events: Events::default(),
events,
scheduler: RwLock::new(None),
ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 3.0)), // Allow to send 3 messages immediately, no more than once every 20 seconds.
quota: RwLock::new(None),
@@ -683,7 +684,7 @@ mod tests {
let tmp = tempfile::tempdir()?;
let dbfile = tmp.path().join("db.sqlite");
std::fs::write(&dbfile, b"123")?;
let res = Context::new(dbfile.into(), 1).await?;
let res = Context::new(dbfile.into(), 1, Events::new()).await?;
// Broken database is indistinguishable from encrypted one.
assert_eq!(res.is_open().await, false);
@@ -829,7 +830,7 @@ mod tests {
async fn test_blobdir_exists() {
let tmp = tempfile::tempdir().unwrap();
let dbfile = tmp.path().join("db.sqlite");
Context::new(dbfile.into(), 1).await.unwrap();
Context::new(dbfile.into(), 1, Events::new()).await.unwrap();
let blobdir = tmp.path().join("db.sqlite-blobs");
assert!(blobdir.is_dir());
}
@@ -840,7 +841,7 @@ mod tests {
let dbfile = tmp.path().join("db.sqlite");
let blobdir = tmp.path().join("db.sqlite-blobs");
std::fs::write(&blobdir, b"123").unwrap();
let res = Context::new(dbfile.into(), 1).await;
let res = Context::new(dbfile.into(), 1, Events::new()).await;
assert!(res.is_err());
}
@@ -850,7 +851,7 @@ mod tests {
let subdir = tmp.path().join("subdir");
let dbfile = subdir.join("db.sqlite");
let dbfile2 = dbfile.clone();
Context::new(dbfile.into(), 1).await.unwrap();
Context::new(dbfile.into(), 1, Events::new()).await.unwrap();
assert!(subdir.is_dir());
assert!(dbfile2.is_file());
}
@@ -860,7 +861,7 @@ mod tests {
let tmp = tempfile::tempdir().unwrap();
let dbfile = tmp.path().join("db.sqlite");
let blobdir = PathBuf::new();
let res = Context::with_blobdir(dbfile.into(), blobdir, 1).await;
let res = Context::with_blobdir(dbfile.into(), blobdir, 1, Events::new()).await;
assert!(res.is_err());
}
@@ -869,7 +870,7 @@ mod tests {
let tmp = tempfile::tempdir().unwrap();
let dbfile = tmp.path().join("db.sqlite");
let blobdir = tmp.path().join("blobs");
let res = Context::with_blobdir(dbfile.into(), blobdir.into(), 1).await;
let res = Context::with_blobdir(dbfile.into(), blobdir.into(), 1, Events::new()).await;
assert!(res.is_err());
}
@@ -1038,7 +1039,7 @@ mod tests {
let dbfile = dir.path().join("db.sqlite");
let id = 1;
let context = Context::new_closed(dbfile.clone().into(), id)
let context = Context::new_closed(dbfile.clone().into(), id, Events::new())
.await
.context("failed to create context")?;
assert_eq!(context.open("foo".to_string()).await?, true);
@@ -1046,7 +1047,7 @@ mod tests {
drop(context);
let id = 2;
let context = Context::new(dbfile.into(), id)
let context = Context::new(dbfile.into(), id, Events::new())
.await
.context("failed to create context")?;
assert_eq!(context.is_open().await, false);

View File

@@ -9,7 +9,8 @@ use crate::ephemeral::Timer as EphemeralTimer;
use crate::message::MsgId;
use crate::webxdc::StatusUpdateSerial;
#[derive(Debug)]
/// Event channel.
#[derive(Debug, Clone)]
pub struct Events {
receiver: Receiver<Event>,
sender: Sender<Event>,
@@ -17,13 +18,17 @@ pub struct Events {
impl Default for Events {
fn default() -> Self {
let (sender, receiver) = channel::bounded(1_000);
Self { receiver, sender }
Self::new()
}
}
impl Events {
pub fn new() -> Self {
let (sender, receiver) = channel::bounded(1_000);
Self { receiver, sender }
}
pub fn emit(&self, event: Event) {
match self.sender.try_send(event) {
Ok(()) => {}

View File

@@ -27,7 +27,7 @@ use crate::contact::{Contact, ContactId, Modifier, Origin};
use crate::context::Context;
use crate::dc_receive_imf::dc_receive_imf;
use crate::dc_tools::EmailAddress;
use crate::events::{Event, EventType};
use crate::events::{Event, EventType, Events};
use crate::key::{self, DcKey, KeyPair, KeyPairUse};
use crate::message::{update_msg_state, Message, MessageState, MsgId, Viewtype};
use crate::mimeparser::MimeMessage;
@@ -266,7 +266,7 @@ impl TestContext {
let mut context_names = CONTEXT_NAMES.write().unwrap();
context_names.insert(id, name);
}
let ctx = Context::new(dbfile.into(), id)
let ctx = Context::new(dbfile.into(), id, Events::new())
.await
.expect("failed to create context");