diff --git a/CHANGELOG.md b/CHANGELOG.md index 89ce0603c..f975dc6d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,8 @@ - jsonrpc: add verified-by information to `Contact`-Object - Remove `attach_selfavatar` config #3951 +### Changes +- add debug logging support for webxdcs #3296 ## 1.106.0 diff --git a/Cargo.toml b/Cargo.toml index bbd777a48..779178c2d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -141,6 +141,10 @@ harness = false name = "get_chatlist" harness = false +[[bench]] +name = "send_events" +harness = false + [features] default = ["vendored"] internals = [] diff --git a/benches/contacts.rs b/benches/contacts.rs index aef0b7e3f..48dab7c77 100644 --- a/benches/contacts.rs +++ b/benches/contacts.rs @@ -14,7 +14,7 @@ async fn address_book_benchmark(n: u32, read_count: u32) { .unwrap(); let book = (0..n) - .map(|i| format!("Name {}\naddr{}@example.org\n", i, i)) + .map(|i| format!("Name {i}\naddr{i}@example.org\n")) .collect::>() .join(""); diff --git a/benches/send_events.rs b/benches/send_events.rs new file mode 100644 index 000000000..ec83f7a86 --- /dev/null +++ b/benches/send_events.rs @@ -0,0 +1,47 @@ +use criterion::{criterion_group, criterion_main, Criterion}; + +use deltachat::context::Context; +use deltachat::stock_str::StockStrings; +use deltachat::{info, Event, EventType, Events}; +use tempfile::tempdir; + +async fn send_events_benchmark(context: &Context) { + let emitter = context.get_event_emitter(); + for _i in 0..1_000_000 { + info!(context, "interesting event..."); + } + info!(context, "DONE"); + + loop { + match emitter.recv().await.unwrap() { + Event { + typ: EventType::Info(info), + .. + } if info.contains("DONE") => { + break; + } + _ => {} + } + } +} + +fn criterion_benchmark(c: &mut Criterion) { + let dir = tempdir().unwrap(); + let dbfile = dir.path().join("db.sqlite"); + let rt = tokio::runtime::Runtime::new().unwrap(); + + let context = rt.block_on(async { + Context::new(&dbfile, 100, Events::new(), StockStrings::new()) + .await + .expect("failed to create context") + }); + let executor = tokio::runtime::Runtime::new().unwrap(); + + c.bench_function("Sending 1.000.000 events", |b| { + b.to_async(&executor) + .iter(|| send_events_benchmark(&context)) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/src/chat.rs b/src/chat.rs index 384884075..57e83af48 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -23,6 +23,7 @@ use crate::constants::{ }; use crate::contact::{Contact, ContactId, Origin, VerifiedStatus}; use crate::context::Context; +use crate::debug_logging::maybe_set_logging_xdc; use crate::ephemeral::Timer as EphemeralTimer; use crate::events::EventType; use crate::html::new_html_mimepart; @@ -1192,6 +1193,7 @@ impl Chat { Ok(chat) } + /// Returns whether this is the `saved messages` chat pub fn is_self_talk(&self) -> bool { self.param.exists(Param::Selftalk) } @@ -1498,7 +1500,6 @@ impl Chat { } // add independent location to database - if msg.param.exists(Param::SetLatitude) { if let Ok(row_id) = context .sql @@ -1542,7 +1543,6 @@ impl Chat { }; // add message to the database - if let Some(update_msg_id) = update_msg_id { context .sql @@ -1624,6 +1624,8 @@ impl Chat { ) .await?; msg.id = MsgId::new(u32::try_from(raw_id)?); + + maybe_set_logging_xdc(context, msg, self.id).await?; } context.interrupt_ephemeral_task().await; Ok(msg.id) @@ -2030,6 +2032,8 @@ async fn prepare_msg_blob(context: &Context, msg: &mut Message) -> Result<()> { Ok(()) } +/// Prepares a message to be send out +/// - Checks if chat can be sent to async fn prepare_msg_common( context: &Context, chat_id: ChatId, diff --git a/src/config.rs b/src/config.rs index 0bca875f1..90ce185b2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,7 +3,7 @@ #![allow(missing_docs)] use anyhow::{ensure, Context as _, Result}; -use strum::{EnumProperty as EnumPropertyTrait, IntoEnumIterator}; +use strum::{EnumProperty, IntoEnumIterator}; use strum_macros::{AsRefStr, Display, EnumIter, EnumProperty, EnumString}; use crate::blob::BlobObject; @@ -192,6 +192,11 @@ pub enum Config { /// /// See `crate::authres::update_authservid_candidates`. AuthservIdCandidates, + + /// Let the core save all events to the database. + /// This value is used internally to remember the MsgId of the logging xdc + #[strum(props(default = "0"))] + DebugLogging, } impl Context { diff --git a/src/context.rs b/src/context.rs index 9e0db4fc5..e58d742d6 100644 --- a/src/context.rs +++ b/src/context.rs @@ -13,11 +13,13 @@ use anyhow::{ensure, Result}; use async_channel::{self as channel, Receiver, Sender}; use ratelimit::Ratelimit; use tokio::sync::{Mutex, RwLock}; +use tokio::task; use crate::chat::{get_chat_cnt, ChatId}; use crate::config::Config; use crate::constants::DC_VERSION_STR; use crate::contact::Contact; +use crate::debug_logging::DebugEventLogData; use crate::events::{Event, EventEmitter, EventType, Events}; use crate::key::{DcKey, SignedPublicKey}; use crate::login_param::LoginParam; @@ -233,6 +235,20 @@ pub struct InnerContext { /// If the ui wants to display an error after a failure, /// `last_error` should be used to avoid races with the event thread. pub(crate) last_error: std::sync::RwLock, + + /// If debug logging is enabled, this contains all neccesary information + pub(crate) debug_logging: RwLock>, +} + +#[derive(Debug)] +pub(crate) struct DebugLogging { + /// The message containing the logging xdc + pub(crate) msg_id: MsgId, + /// Handle to the background task responisble for sending + pub(crate) loop_handle: task::JoinHandle<()>, + /// Channel that log events should be send to + /// A background loop will receive and handle them + pub(crate) sender: Sender, } /// The state of ongoing process. @@ -363,6 +379,7 @@ impl Context { creation_time: std::time::SystemTime::now(), last_full_folder_scan: Mutex::new(None), last_error: std::sync::RwLock::new("".to_string()), + debug_logging: RwLock::new(None), }; let ctx = Context { @@ -397,7 +414,9 @@ impl Context { // to terminate on receiving the next event and then call stop_io() // which will emit the below event(s) info!(self, "stopping IO"); - + if let Some(debug_logging) = self.debug_logging.read().await.as_ref() { + debug_logging.loop_handle.abort(); + } if let Some(scheduler) = self.inner.scheduler.write().await.take() { scheduler.stop(self).await; } @@ -434,12 +453,41 @@ impl Context { /// Emits a single event. pub fn emit_event(&self, event: EventType) { + if self + .debug_logging + .try_read() + .ok() + .map(|inner| inner.is_some()) + == Some(true) + { + self.send_log_event(event.clone()).ok(); + }; self.events.emit(Event { id: self.id, typ: event, }); } + pub(crate) fn send_log_event(&self, event: EventType) -> anyhow::Result<()> { + if let Ok(lock) = self.debug_logging.try_read() { + if let Some(DebugLogging { + msg_id: xdc_id, + sender, + .. + }) = &*lock + { + let event_data = DebugEventLogData { + time: time(), + msg_id: *xdc_id, + event, + }; + + sender.try_send(event_data).ok(); + } + } + Ok(()) + } + /// Emits a generic MsgsChanged event (without chat or message id) pub fn emit_msgs_changed_without_ids(&self) { self.emit_event(EventType::MsgsChanged { @@ -708,6 +756,11 @@ impl Context { .unwrap_or_default(), ); + res.insert( + "debug_logging", + self.get_config_int(Config::DebugLogging).await?.to_string(), + ); + let elapsed = self.creation_time.elapsed(); res.insert("uptime", duration_to_str(elapsed.unwrap_or_default())); diff --git a/src/debug_logging.rs b/src/debug_logging.rs new file mode 100644 index 000000000..4755004bb --- /dev/null +++ b/src/debug_logging.rs @@ -0,0 +1,147 @@ +//! Forward log messages to logging webxdc +use crate::{ + chat::ChatId, + config::Config, + context::{Context, DebugLogging}, + message::{Message, MsgId, Viewtype}, + param::Param, + webxdc::StatusUpdateItem, + Event, EventType, +}; +use async_channel::{self as channel, Receiver}; +use serde_json::json; +use std::path::PathBuf; +use tokio::task; + +/// Store all information needed to log an event to a webxdc. +pub struct DebugEventLogData { + pub time: i64, + pub msg_id: MsgId, + pub event: EventType, +} + +/// Creates a loop which forwards all log messages send into the channel to the associated +/// logging xdc. +pub async fn debug_logging_loop(context: &Context, events: Receiver) { + while let Ok(DebugEventLogData { + time, + msg_id, + event, + }) = events.recv().await + { + match context + .write_status_update_inner( + &msg_id, + StatusUpdateItem { + payload: json!({ + "event": event, + "time": time, + }), + info: None, + summary: None, + document: None, + }, + ) + .await + { + Err(err) => { + eprintln!("Can't log event to webxdc status update: {:#}", err); + } + Ok(serial) => { + context.events.emit(Event { + id: context.id, + typ: EventType::WebxdcStatusUpdate { + msg_id, + status_update_serial: serial, + }, + }); + } + } + } +} + +/// Set message as new logging webxdc if filename and chat_id fit +pub async fn maybe_set_logging_xdc( + context: &Context, + msg: &Message, + chat_id: ChatId, +) -> anyhow::Result<()> { + maybe_set_logging_xdc_inner( + context, + msg.get_viewtype(), + chat_id, + msg.param.get_path(Param::File, context), + msg.get_id(), + ) + .await?; + + Ok(()) +} + +/// Set message as new logging webxdc if filename and chat_id fit +pub async fn maybe_set_logging_xdc_inner( + context: &Context, + viewtype: Viewtype, + chat_id: ChatId, + file: anyhow::Result>, + msg_id: MsgId, +) -> anyhow::Result<()> { + if viewtype == Viewtype::Webxdc { + if let Ok(Some(file)) = file { + if let Some(file_name) = file.file_name().and_then(|name| name.to_str()) { + if file_name.starts_with("debug_logging") + && file_name.ends_with(".xdc") + && chat_id.is_self_talk(context).await? + { + set_debug_logging_xdc(context, Some(msg_id)).await?; + } + } + } + } + Ok(()) +} + +/// Set the webxdc contained in the msg as the current logging xdc on the context and save it to db +/// If id is a `None` value, disable debug logging +pub(crate) async fn set_debug_logging_xdc(ctx: &Context, id: Option) -> anyhow::Result<()> { + match id { + Some(msg_id) => { + ctx.sql + .set_raw_config( + Config::DebugLogging.as_ref(), + Some(msg_id.to_string().as_ref()), + ) + .await?; + let debug_logging = &mut *ctx.debug_logging.write().await; + match debug_logging { + // Switch logging xdc + Some(debug_logging) => debug_logging.msg_id = msg_id, + // Bootstrap background loop for message forwarding + None => { + let (sender, debug_logging_recv) = channel::bounded(1000); + let loop_handle = { + let ctx = ctx.clone(); + task::spawn( + async move { debug_logging_loop(&ctx, debug_logging_recv).await }, + ) + }; + *debug_logging = Some(DebugLogging { + msg_id, + loop_handle, + sender, + }); + } + } + info!(ctx, "replacing logging webxdc"); + } + // Delete current debug logging + None => { + ctx.sql + .set_raw_config(Config::DebugLogging.as_ref(), None) + .await?; + *ctx.debug_logging.write().await = None; + info!(ctx, "removing logging webxdc"); + } + } + Ok(()) +} diff --git a/src/events.rs b/src/events.rs index 6d2962341..378dab814 100644 --- a/src/events.rs +++ b/src/events.rs @@ -5,6 +5,7 @@ use std::path::PathBuf; use async_channel::{self as channel, Receiver, Sender, TrySendError}; +use serde::Serialize; use crate::chat::ChatId; use crate::contact::ContactId; @@ -108,7 +109,7 @@ pub struct Event { pub typ: EventType, } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub enum EventType { /// The library-user may write an informational string to the log. /// diff --git a/src/lib.rs b/src/lib.rs index c941db562..db26a5dde 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -105,6 +105,7 @@ mod net; pub mod plaintext; pub mod summary; +mod debug_logging; pub mod receive_imf; pub mod tools; diff --git a/src/message.rs b/src/message.rs index 0bde53961..6911c5c3f 100644 --- a/src/message.rs +++ b/src/message.rs @@ -15,6 +15,7 @@ use crate::constants::{ }; use crate::contact::{Contact, ContactId, Origin}; use crate::context::Context; +use crate::debug_logging::set_debug_logging_xdc; use crate::download::DownloadState; use crate::ephemeral::{start_ephemeral_timers_msgids, Timer as EphemeralTimer}; use crate::events::EventType; @@ -398,6 +399,7 @@ impl Message { self.param.get_path(Param::File, context).unwrap_or(None) } + /// If message is an image or gif, set Param::Width and Param::Height pub(crate) async fn try_calc_and_set_dimensions(&mut self, context: &Context) -> Result<()> { if self.viewtype.has_file() { let file_param = self.param.get_path(Param::File, context)?; @@ -1375,12 +1377,27 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { paramsv![msg.rfc724_mid], ) .await?; + + let logging_xdc_id = context + .debug_logging + .read() + .await + .as_ref() + .map(|dl| dl.msg_id); + + if let Some(id) = logging_xdc_id { + if id == *msg_id { + set_debug_logging_xdc(context, None).await?; + } + } } if !msg_ids.is_empty() { context.emit_msgs_changed_without_ids(); // Run housekeeping to delete unused blobs. + // We need to use set_raw_config() here since with set_config() it + // wouldn't compile ("recursion in an `async fn`") context.set_config(Config::LastHousekeeping, None).await?; } diff --git a/src/param.rs b/src/param.rs index 55bf5f930..a2bf08ff8 100644 --- a/src/param.rs +++ b/src/param.rs @@ -129,6 +129,7 @@ pub enum Param { ProfileImage = b'i', /// For Chats + /// Signals wheter the chat is the `saved messages` chat Selftalk = b'K', /// For Chats: On sending a new message we set the subject to `Re: `. diff --git a/src/receive_imf.rs b/src/receive_imf.rs index 5ede0f424..14d58607a 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -19,6 +19,7 @@ use crate::contact::{ may_be_valid_addr, normalize_name, Contact, ContactAddress, ContactId, Origin, VerifiedStatus, }; use crate::context::Context; +use crate::debug_logging::maybe_set_logging_xdc_inner; use crate::download::DownloadState; use crate::ephemeral::{stock_ephemeral_timer_changed, Timer as EphemeralTimer}; use crate::events::EventType; @@ -1230,6 +1231,18 @@ SET rfc724_mid=excluded.rfc724_mid, chat_id=excluded.chat_id, } drop(conn); + // check all parts wheter they contain a new logging webxdc + for (part, msg_id) in mime_parser.parts.iter().zip(&created_db_entries) { + maybe_set_logging_xdc_inner( + context, + part.typ, + chat_id, + part.param.get_path(Param::File, context), + *msg_id, + ) + .await?; + } + if let Some(replace_msg_id) = replace_msg_id { // "Replace" placeholder with a message that has no parts. replace_msg_id.delete_from_db(context).await?; diff --git a/src/sql.rs b/src/sql.rs index 778a79107..493948cf9 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -17,9 +17,10 @@ use crate::chat::{add_device_msg, update_device_icon, update_saved_messages_icon use crate::config::Config; use crate::constants::DC_CHAT_ID_TRASH; use crate::context::Context; +use crate::debug_logging::set_debug_logging_xdc; use crate::ephemeral::start_ephemeral_timers; use crate::log::LogExt; -use crate::message::{Message, Viewtype}; +use crate::message::{Message, MsgId, Viewtype}; use crate::param::{Param, Params}; use crate::peerstate::{deduplicate_peerstates, Peerstate}; use crate::stock_str; @@ -343,6 +344,15 @@ impl Sql { } else { info!(context, "Opened database {:?}.", self.dbfile); *self.is_encrypted.write().await = Some(passphrase_nonempty); + + // setup debug logging if there is an entry containing its id + if let Some(xdc_id) = self + .get_raw_config_u32(Config::DebugLogging.as_ref()) + .await? + { + set_debug_logging_xdc(context, Some(MsgId::new(xdc_id))).await?; + } + Ok(()) } } @@ -594,6 +604,12 @@ impl Sql { .map(|s| s.and_then(|s| s.parse().ok())) } + pub async fn get_raw_config_u32(&self, key: &str) -> Result> { + self.get_raw_config(key) + .await + .map(|s| s.and_then(|s| s.parse().ok())) + } + pub async fn get_raw_config_bool(&self, key: &str) -> Result { // Not the most obvious way to encode bool as string, but it is matter // of backward compatibility. diff --git a/src/webxdc.rs b/src/webxdc.rs index 992597736..bb71553f4 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -143,16 +143,16 @@ struct StatusUpdates { /// Update items as sent on the wire and as stored in the database. #[derive(Debug, Serialize, Deserialize)] pub(crate) struct StatusUpdateItem { - payload: Value, + pub(crate) payload: Value, #[serde(skip_serializing_if = "Option::is_none")] - info: Option, + pub(crate) info: Option, #[serde(skip_serializing_if = "Option::is_none")] - document: Option, + pub(crate) document: Option, #[serde(skip_serializing_if = "Option::is_none")] - summary: Option, + pub(crate) summary: Option, } /// Update items as passed to the UIs. @@ -348,16 +348,10 @@ impl Context { self.emit_msgs_changed(instance.chat_id, instance.id); } - let rowid = self - .sql - .insert( - "INSERT INTO msgs_status_updates (msg_id, update_item) VALUES(?, ?);", - paramsv![instance.id, serde_json::to_string(&status_update_item)?], - ) + let status_update_serial = self + .write_status_update_inner(&instance.id, status_update_item) .await?; - let status_update_serial = StatusUpdateSerial(u32::try_from(rowid)?); - if instance.viewtype == Viewtype::Webxdc { self.emit_event(EventType::WebxdcStatusUpdate { msg_id: instance.id, @@ -368,6 +362,22 @@ impl Context { Ok(status_update_serial) } + pub(crate) async fn write_status_update_inner( + &self, + instance_id: &MsgId, + status_update_item: StatusUpdateItem, + ) -> Result { + let rowid = self + .sql + .insert( + "INSERT INTO msgs_status_updates (msg_id, update_item) VALUES(?, ?);", + paramsv![instance_id, serde_json::to_string(&status_update_item)?], + ) + .await?; + let status_update_serial = StatusUpdateSerial(u32::try_from(rowid)?); + Ok(status_update_serial) + } + /// Sends a status update for an webxdc instance. /// /// If the instance is a draft, @@ -2415,4 +2425,42 @@ sth_for_the = "future""# .await; Ok(()) } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn change_logging_webxdc() -> Result<()> { + let alice = TestContext::new_alice().await; + let chat_id = ChatId::create_for_contact(&alice, ContactId::SELF).await?; + + assert_eq!( + alice + .sql + .count("SELECT COUNT(*) FROM msgs_status_updates;", paramsv![],) + .await?, + 0 + ); + + let mut instance = create_webxdc_instance( + &alice, + "debug_logging.xdc", + include_bytes!("../test-data/webxdc/minimal.xdc"), + ) + .await?; + assert!(alice.debug_logging.read().await.is_none()); + send_msg(&alice, chat_id, &mut instance).await?; + assert!(alice.debug_logging.read().await.is_some()); + + alice.emit_event(EventType::Info("hi".to_string())); + alice + .evtracker + .get_matching(|ev| matches!(*ev, EventType::WebxdcStatusUpdate { .. })) + .await; + assert!( + alice + .sql + .count("SELECT COUNT(*) FROM msgs_status_updates;", paramsv![],) + .await? + > 0 + ); + Ok(()) + } }