diff --git a/deltachat-jsonrpc/src/api.rs b/deltachat-jsonrpc/src/api.rs index 3d4fbf422..197262697 100644 --- a/deltachat-jsonrpc/src/api.rs +++ b/deltachat-jsonrpc/src/api.rs @@ -15,7 +15,6 @@ use deltachat::constants::DC_MSG_ID_DAYMARKER; use deltachat::contact::{may_be_valid_addr, Contact, ContactId, Origin}; use deltachat::context::get_info; use deltachat::ephemeral::Timer; -use deltachat::imex; use deltachat::location; use deltachat::message::get_msg_read_receipts; use deltachat::message::{ @@ -28,6 +27,7 @@ use deltachat::reaction::{get_msg_reactions, send_reaction}; use deltachat::securejoin; use deltachat::stock_str::StockMessage; use deltachat::webxdc::StatusUpdateSerial; +use deltachat::{imex, webxdc}; use sanitize_filename::is_sanitized; use tokio::fs; use tokio::sync::{watch, Mutex, RwLock}; @@ -1675,6 +1675,16 @@ impl CommandApi { .await } + async fn join_gossip_topic( + &self, + account_id: u32, + instance_msg_id: u32, + topic: String, + ) -> Result<()> { + let ctx = self.get_context(account_id).await?; + webxdc::join_gossip_topic(&ctx, MsgId::new(instance_msg_id), &topic).await + } + async fn get_webxdc_status_updates( &self, account_id: u32, diff --git a/src/debug_logging.rs b/src/debug_logging.rs index dccc393df..c1814ea02 100644 --- a/src/debug_logging.rs +++ b/src/debug_logging.rs @@ -63,7 +63,7 @@ pub async fn debug_logging_loop(context: &Context, events: Receiver Result { - context - .sql - .query_get_value("SELECT rfc724_mid from msgs WHERE id=?", (self.0,)) - .await? - .context("Can't get rfc724_mid") - } } impl std::fmt::Display for MsgId { @@ -2196,31 +2187,6 @@ mod tests { assert_eq!(msg.get_text(), "hello".to_string()); } - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn test_get_rfc724_mid() { - let alice = TestContext::new_alice().await; - receive_imf( - &alice, - b"From: Bob \n\ - To: alice@example.org\n\ - Chat-Version: 1.0\n\ - Message-ID: <123@example.com>\n\ - Date: Fri, 29 Jan 2021 21:37:55 +0000\n\ - \n\ - hello\n", - false, - ) - .await - .unwrap(); - - // check chat-id of this message - let msg = alice.get_last_msg().await; - assert_eq!( - msg.id.get_rfc724_mid(&alice).await.unwrap(), - "123@example.com".to_string() - ); - } - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_set_override_sender_name() { // send message with overridden sender name diff --git a/src/peer_channels.rs b/src/peer_channels.rs index abfab1d4a..77ac977b0 100644 --- a/src/peer_channels.rs +++ b/src/peer_channels.rs @@ -4,7 +4,7 @@ use crate::config::Config; use crate::contact::ContactId; use crate::context::Context; use crate::message::{Message, MsgId}; -use crate::tools::{get_topic_from_msg_id, time}; +use crate::tools::time; use crate::webxdc::StatusUpdateItem; use anyhow::{anyhow, Context as _, Result}; use image::EncodableLayout; @@ -55,35 +55,25 @@ impl Context { } /// Join a topic and create the subscriber loop for it. - pub async fn join_and_subscribe_topic(&self, rfc724_mid: &str, msg_id: MsgId) -> Result<()> { + pub async fn join_and_subscribe_topic(&self, topic: TopicId, msg_id: MsgId) -> Result<()> { + info!(&self, "Joining topic {topic}."); + let Some(ref gossip) = *self.gossip.lock().await else { warn!( self, - "Not joining topic for {rfc724_mid:?} because there is no gossip." + "Not joining topic {topic} because there is no gossip." ); return Ok(()); }; - let topic = get_topic_from_msg_id(rfc724_mid)?; - info!(&self, "Joining topic {topic}."); - // restore old peers from db, if any let peers = self.get_peers_for_topic(&topic.to_string()).await?; + // TODO: add timeout as the returned future might be pending forever let connect_future = gossip.join(topic, peers).await?; tokio::spawn(connect_future); - tokio::spawn(subscribe_loop( - self.clone(), - self.gossip - .lock() - .await - .as_ref() - .context("can't get gossip")? - .clone(), - topic, - msg_id, - )); + tokio::spawn(subscribe_loop(self.clone(), gossip.clone(), topic, msg_id)); Ok(()) } @@ -116,11 +106,16 @@ impl Context { } /// Cache a peers [NodeId] for one topic. - pub async fn add_peer_for_topic(&self, topic: TopicId, peer: NodeId) -> Result<()> { + pub async fn add_peer_for_topic( + &self, + msg_id: MsgId, + topic: TopicId, + peer: NodeId, + ) -> Result<()> { self.sql .execute( - "INSERT INTO iroh_gossip_peers (public_key, topic) VALUES (?, ?)", - (peer.as_bytes(), topic.as_bytes()), + "INSERT INTO iroh_gossip_peers (msg_id, public_key, topic) VALUES (?, ?, ?)", + (msg_id, peer.as_bytes(), topic.as_bytes()), ) .await?; Ok(()) @@ -193,7 +188,7 @@ async fn subscribe_loop( let event = stream.recv().await?; match event { IrohEvent::NeighborUp(node) => { - context.add_peer_for_topic(topic, node).await?; + context.add_peer_for_topic(msg_id, topic, node).await?; } IrohEvent::NeighborDown(node) => { context.delete_peer_for_topic(topic, node).await?; diff --git a/src/receive_imf.rs b/src/receive_imf.rs index 813064e20..7062c9cfa 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -39,9 +39,7 @@ use crate::simplify; use crate::sql; use crate::stock_str; use crate::sync::Sync::*; -use crate::tools::{ - buf_compress, extract_grpid_from_rfc724_mid, get_topic_from_msg_id, strip_rtlo_characters, -}; +use crate::tools::{buf_compress, extract_grpid_from_rfc724_mid, strip_rtlo_characters}; use crate::{contact, imap}; /// This is the struct that is returned after receiving one email (aka MIME message). @@ -436,11 +434,40 @@ pub(crate) async fn receive_imf_inner( } if let Some(ref status_update) = mime_parser.webxdc_status_update { - if let Err(err) = context + match context .receive_status_update(from_id, insert_msg_id, status_update) .await { - warn!(context, "receive_imf cannot update status: {err:#}."); + // join advertised gossip topics + Ok((topics, instance_id)) => { + if let Some(node_addr) = mime_parser.get_header(HeaderDef::IrohPublicGossip) { + match serde_json::from_str::(node_addr) + .context("Failed to parse node address") + { + Ok(node_addr) => { + context + .endpoint + .lock() + .await + .as_ref() + .context("Failed to get magic endpoint")? + .add_node_addr(node_addr.clone()) + .context("Failed to add node address")?; + + for topic in topics { + let node_id = node_addr.node_id; + context + .add_peer_for_topic(instance_id, topic, node_id) + .await?; + } + } + Err(err) => { + warn!(context, "couldn't parse NodeAddr: {err}"); + } + } + } + } + Err(err) => warn!(context, "receive_imf cannot update status: {err:#}."), } } @@ -1459,44 +1486,6 @@ RETURNING id created_db_entries.push(row_id); } - // Connect to iroh gossip group if it has been advertised. - if incoming { - if let Some(node_addr) = mime_parser.get_header(HeaderDef::IrohPublicGossip) { - info!(context, "Create connection with node_addr: {node_addr}."); - match serde_json::from_str::(node_addr) - .context("Failed to parse node address") - { - Ok(node_addr) => { - context - .endpoint - .lock() - .await - .as_ref() - .context("Failed to get magic endpoint")? - .add_node_addr(node_addr.clone()) - .context("Failed to add node address")?; - - let rfc724_mid = mime_parser - .get_rfc724_mid() - .context("Can't get Message-ID")?; - - let topic = get_topic_from_msg_id(&rfc724_mid)?; - - let node_id = node_addr.node_id; - context - .add_peer_for_topic(topic, node_id) - .await - .with_context(|| { - format!("Failed to add peer {node_id} for topic {topic}") - })?; - } - Err(err) => { - warn!(context, "{err:#}."); - } - } - } - } - // check all parts whether they contain a new logging webxdc for (part, msg_id) in mime_parser.parts.iter().zip(&created_db_entries) { maybe_set_logging_xdc_inner( diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index e1723e279..b97f2a708 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -895,13 +895,15 @@ CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid); SET backward_verified_key_id=(SELECT value FROM config WHERE keyname='key_id') WHERE verified_key IS NOT NULL "#, - 109).await?; + 109, + ) + .await?; } if dbversion < 110 { sql.execute_migration( - "CREATE TABLE iroh_gossip_peers (topic TEXT NOT NULL, public_key TEXT NOT NULL)", - 108, + "CREATE TABLE iroh_gossip_peers (msg_id TEXT not NULL, topic TEXT NOT NULL, public_key TEXT NOT NULL)", + 110, ) .await?; } diff --git a/src/tools.rs b/src/tools.rs index 9a94b5b3e..20a73eff5 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -8,14 +8,13 @@ use std::fmt; use std::io::{Cursor, Write}; use std::mem; use std::path::{Path, PathBuf}; -use std::str::{from_utf8, FromStr}; +use std::str::from_utf8; use std::time::{Duration, SystemTime}; use anyhow::{bail, Context as _, Result}; use base64::Engine as _; use chrono::{Local, NaiveDateTime, NaiveTime, TimeZone}; use futures::{StreamExt, TryStreamExt}; -use iroh_gossip::proto::TopicId; use mailparse::dateparse; use mailparse::headers::Headers; use mailparse::MailHeaderMap; @@ -744,15 +743,6 @@ pub(crate) fn strip_rtlo_characters(input_str: &str) -> String { input_str.replace(|char| RTLO_CHARACTERS.contains(&char), "") } -/// Generates a [TopicId] from some rfc724_mid. -pub(crate) fn get_topic_from_msg_id(rfc724_mid: &str) -> Result { - TopicId::from_str(&iroh_base::base32::fmt( - rfc724_mid - .get(0..32) - .context("Can't get 32 bytes from rfc724_mid")?, - )) -} - #[cfg(test)] mod tests { #![allow(clippy::indexing_slicing)] diff --git a/src/webxdc.rs b/src/webxdc.rs index 226683a3d..fa5ec6067 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -16,10 +16,12 @@ //! - `descr` - text to send along with the updates use std::path::Path; +use std::str::FromStr; use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result}; use deltachat_derive::FromSql; +use iroh_gossip::proto::TopicId; use lettre_email::mime; use lettre_email::PartBuilder; use serde::{Deserialize, Serialize}; @@ -38,7 +40,6 @@ use crate::mimeparser::SystemMessage; use crate::param::Param; use crate::param::Params; use crate::tools::create_id; -use crate::tools::get_topic_from_msg_id; use crate::tools::strip_rtlo_characters; use crate::tools::{create_smeared_timestamp, get_abs_path}; @@ -183,11 +184,11 @@ pub struct StatusUpdateItem { #[serde(skip_serializing_if = "Option::is_none")] pub uid: Option, - /// Wheter the message should be sent over an ephemeral channel. - /// This means it will only be received by the other side if they are currently online - /// and part of the gossip group. + /// If this update should only be gossiped and which topic to use. + /// Gossiped Updates will only be received by the other side if they + /// are currently online and part of the gossip topic. #[serde(default)] - pub ephemeral: bool, + pub gossip_topic: Option, } /// Update items as passed to the UIs. @@ -496,17 +497,34 @@ impl Context { MessageState::Undefined | MessageState::OutPreparing | MessageState::OutDraft ); + let ephemeral = status_update.gossip_topic.is_some(); if send_now { - if let Some(ref gossip) = *self.gossip.lock().await { - let topic = get_topic_from_msg_id(&instance.rfc724_mid)?; - gossip - .broadcast(topic, serde_json::to_string(&status_update)?.into()) + if let Some(ref topic) = status_update.gossip_topic { + let topic = TopicId::from_str(&iroh_base::base32::fmt( + topic + .get(0..32) + .context("Can't get 32 bytes from rfc724_mid")?, + ))?; + + self.join_and_subscribe_topic(topic, instance_msg_id) .await?; + + info!(self, "here3"); + + let gossip = self.gossip.lock().await; + if let Some(ref gossip) = *gossip { + gossip + .broadcast(topic, serde_json::to_string(&status_update)?.into()) + .await?; + } else { + warn!(self, "send_webxdc_status_update: no gossip available."); + } + } else { + warn!(self, "send_webxdc_status_update: no gossip topic given.") } } status_update.uid = Some(create_id()); - let ephemeral = status_update.ephemeral; let status_update_serial: StatusUpdateSerial = self .create_status_update_record( &mut instance, @@ -610,12 +628,14 @@ impl Context { /// /// `json` is an array containing one or more update items as created by send_webxdc_status_update(), /// the array is parsed using serde, the single payloads are used as is. + /// + /// Returns: List of topics that have been advertised in the updates and the [MsgId] of the instance. pub(crate) async fn receive_status_update( &self, from_id: ContactId, msg_id: MsgId, json: &str, - ) -> Result<()> { + ) -> Result<(Vec, MsgId)> { let msg = Message::load_from_db(self, msg_id).await?; let (timestamp, mut instance, can_info_msg) = if msg.viewtype == Viewtype::Webxdc { (msg.timestamp_sort, msg, false) @@ -644,7 +664,17 @@ impl Context { } let updates: StatusUpdates = serde_json::from_str(json)?; + let mut topics = Vec::new(); for update_item in updates.updates { + if let Some(ref topic) = update_item.gossip_topic { + let topic = TopicId::from_str(&iroh_base::base32::fmt( + topic + .get(0..32) + .context("Can't get 32 bytes from rfc724_mid")?, + ))?; + topics.push(topic); + } + self.create_status_update_record( &mut instance, update_item, @@ -655,7 +685,7 @@ impl Context { .await?; } - Ok(()) + Ok((topics, instance.id)) } /// Returns status updates as an JSON-array, ready to be consumed by a webxdc. @@ -670,10 +700,6 @@ impl Context { instance_msg_id: MsgId, last_known_serial: StatusUpdateSerial, ) -> Result { - let rfc724_mid = instance_msg_id.get_rfc724_mid(self).await?; - self.join_and_subscribe_topic(&rfc724_mid, instance_msg_id) - .await?; - let json = self .sql .query_map( @@ -890,11 +916,21 @@ impl Message { } } +/// Join a gossip topic and subscribe to it. +pub async fn join_gossip_topic(ctx: &Context, msg_id: MsgId, topic: &str) -> Result<()> { + let topic = TopicId::from_str(&iroh_base::base32::fmt( + topic + .get(0..32) + .context("Can't get 32 bytes from rfc724_mid")?, + ))?; + info!(ctx, "Received join request from frontend"); + ctx.join_and_subscribe_topic(topic, msg_id).await +} + #[cfg(test)] mod tests { - use std::time::Duration; - use serde_json::json; + use std::time::Duration; use super::*; use crate::chat::{ @@ -1396,7 +1432,7 @@ mod tests { document: None, summary: None, uid: Some("iecie2Ze".to_string()), - ephemeral: false, + gossip_topic: None, }, 1640178619, true, @@ -1421,7 +1457,7 @@ mod tests { document: None, summary: None, uid: Some("iecie2Ze".to_string()), - ephemeral: false, + gossip_topic: None, }, 1640178619, true, @@ -1455,7 +1491,7 @@ mod tests { document: None, summary: None, uid: None, - ephemeral: false, + gossip_topic: None, }, 1640178619, true, @@ -1475,7 +1511,7 @@ mod tests { document: None, summary: None, uid: None, - ephemeral: false, + gossip_topic: None, }, 1640178619, true,