feat: add iroh gossip peer channel

This commit is contained in:
Septias
2023-11-22 13:42:10 +01:00
committed by Sebastian Klähn
parent c5f31c3d03
commit 70181493b0
16 changed files with 1943 additions and 126 deletions

View File

@@ -347,6 +347,9 @@ pub enum Config {
/// Row ID of the key in the `keypairs` table
/// used for signatures, encryption to self and included in `Autocrypt` header.
KeyId,
/// Iroh secret key.
IrohSecretKey,
}
impl Config {

View File

@@ -10,6 +10,8 @@ use std::time::{Duration, Instant, SystemTime};
use anyhow::{bail, ensure, Context as _, Result};
use async_channel::{self as channel, Receiver, Sender};
use iroh_gossip::net::Gossip;
use iroh_net::MagicEndpoint;
use ratelimit::Ratelimit;
use tokio::sync::{Mutex, Notify, RwLock};
@@ -244,6 +246,11 @@ pub struct InnerContext {
/// Standard RwLock instead of [`tokio::sync::RwLock`] is used
/// because the lock is used from synchronous [`Context::emit_event`].
pub(crate) debug_logging: std::sync::RwLock<Option<DebugLogging>>,
/// [MagicEndpoint] needed for iroh peer channels.
pub(crate) endpoint: Mutex<Option<MagicEndpoint>>,
/// [Gossip] needed for iroh peer channels.
pub(crate) gossip: Mutex<Option<Gossip>>,
}
/// The state of ongoing process.
@@ -388,6 +395,8 @@ impl Context {
last_full_folder_scan: Mutex::new(None),
last_error: std::sync::RwLock::new("".to_string()),
debug_logging: std::sync::RwLock::new(None),
endpoint: Mutex::new(None),
gossip: Mutex::new(None),
};
let ctx = Context {
@@ -417,11 +426,17 @@ impl Context {
*lock = Ratelimit::new(Duration::new(3, 0), 3.0);
}
}
if let Err(e) = self.create_gossip().await {
warn!(self, "{e}");
}
self.scheduler.start(self.clone()).await;
}
/// Stops the IO scheduler.
pub async fn stop_io(&self) {
self.endpoint.lock().await.take();
self.gossip.lock().await.take();
self.scheduler.stop(self).await;
}
@@ -431,8 +446,11 @@ impl Context {
self.scheduler.restart(self).await;
}
/// Indicate that the network likely has come back.
/// Indicates that the network likely has come back.
pub async fn maybe_network(&self) {
if let Some(ref mut endpoint) = *self.endpoint.lock().await {
endpoint.network_change().await;
}
self.scheduler.maybe_network().await;
}
@@ -1319,6 +1337,7 @@ mod tests {
"socks5_user",
"socks5_password",
"key_id",
"iroh_secret_key",
];
let t = TestContext::new().await;
let info = t.get_info().await.unwrap();
@@ -1605,4 +1624,15 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_keypair_saving() -> Result<()> {
let alice = TestContext::new_alice().await;
let key = alice.get_or_create_iroh_keypair().await?;
let loaded_key = alice.get_or_create_iroh_keypair().await?;
assert_eq!(key.to_bytes(), loaded_key.to_bytes());
Ok(())
}
}

View File

@@ -63,6 +63,7 @@ pub async fn debug_logging_loop(context: &Context, events: Receiver<DebugEventLo
summary: None,
document: None,
uid: None,
ephemeral: false,
},
)
.await
@@ -72,12 +73,10 @@ pub async fn debug_logging_loop(context: &Context, events: Receiver<DebugEventLo
}
Ok(serial) => {
if let Some(serial) = serial {
if !matches!(event, EventType::WebxdcStatusUpdate { .. }) {
context.emit_event(EventType::WebxdcStatusUpdate {
msg_id,
status_update_serial: serial,
});
}
context.emit_event(EventType::WebxdcStatusUpdate {
msg_id,
status_update_serial: serial,
});
} else {
// This should not happen as the update has no `uid`.
error!(context, "Debug logging update is not created.");

View File

@@ -88,6 +88,9 @@ pub enum HeaderDef {
/// See <https://datatracker.ietf.org/doc/html/rfc8601>
AuthenticationResults,
/// Public key to join gossip network.
IrohPublicGossip,
#[cfg(test)]
TestHeader,
}

View File

@@ -105,6 +105,7 @@ pub mod receive_imf;
pub mod tools;
pub mod accounts;
pub mod peer_channels;
pub mod reaction;
/// If set IMAP/incoming and SMTP/outgoing MIME messages will be printed.

View File

@@ -326,6 +326,15 @@ WHERE id=?;
Ok(ret)
}
/// Get the rfc724 message id from the database and return it.
pub async fn get_rfc724_mid(&self, context: &Context) -> Result<String> {
context
.sql
.query_get_value("SELECT rfc724_mid from msgs WHERE id=?", (self.0,))
.await?
.context("Can't get rfc724_mid")
}
}
impl std::fmt::Display for MsgId {
@@ -2187,6 +2196,31 @@ mod tests {
assert_eq!(msg.get_text(), "hello".to_string());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_get_rfc724_mid() {
let alice = TestContext::new_alice().await;
receive_imf(
&alice,
b"From: Bob <bob@example.com>\n\
To: alice@example.org\n\
Chat-Version: 1.0\n\
Message-ID: <123@example.com>\n\
Date: Fri, 29 Jan 2021 21:37:55 +0000\n\
\n\
hello\n",
false,
)
.await
.unwrap();
// check chat-id of this message
let msg = alice.get_last_msg().await;
assert_eq!(
msg.id.get_rfc724_mid(&alice).await.unwrap(),
"123@example.com".to_string()
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_set_override_sender_name() {
// send message with overridden sender name

View File

@@ -18,6 +18,7 @@ use crate::contact::Contact;
use crate::context::Context;
use crate::e2ee::EncryptHelper;
use crate::ephemeral::Timer as EphemeralTimer;
use crate::headerdef::HeaderDef;
use crate::html::new_html_mimepart;
use crate::location;
use crate::message::{self, Message, MsgId, Viewtype};
@@ -1285,6 +1286,15 @@ impl<'a> MimeFactory<'a> {
let json = self.msg.param.get(Param::Arg).unwrap_or_default();
parts.push(context.build_status_update_part(json));
} else if self.msg.viewtype == Viewtype::Webxdc {
if let Some(ref endpoint) = *context.endpoint.lock().await {
// Add iroh NodeAddr to headers so peers can connect to us.
let node_addr = endpoint.my_addr().await.unwrap();
headers.protected.push(Header::new(
HeaderDef::IrohPublicGossip.get_headername().to_string(),
serde_json::to_string(&node_addr)?,
));
}
if let Some(json) = context
.render_webxdc_status_update_object(self.msg.id, None)
.await?

217
src/peer_channels.rs Normal file
View File

@@ -0,0 +1,217 @@
//! Peer channels for webxdc updates using iroh.
use crate::config::Config;
use crate::contact::ContactId;
use crate::context::Context;
use crate::message::{Message, MsgId};
use crate::tools::{get_topic_from_msg_id, time};
use crate::webxdc::StatusUpdateItem;
use anyhow::{anyhow, Context as _, Result};
use image::EncodableLayout;
use iroh_gossip::net::{Gossip, GOSSIP_ALPN};
use iroh_gossip::proto::{Event as IrohEvent, TopicId};
use iroh_net::magic_endpoint::accept_conn;
use iroh_net::NodeId;
use iroh_net::{derp::DerpMode, key::SecretKey, MagicEndpoint};
impl Context {
/// Create magic endpoint and gossip for the context.
pub async fn create_gossip(&self) -> Result<()> {
let secret_key: SecretKey = self.get_or_create_iroh_keypair().await?;
if self.endpoint.lock().await.is_some() {
warn!(
self,
"Tried to create gossip even tough there still exists an instance"
);
return Ok(());
}
// build magic endpoint
let endpoint = MagicEndpoint::builder()
.secret_key(secret_key)
.alpns(vec![GOSSIP_ALPN.to_vec()])
.derp_mode(DerpMode::Default)
.peers_data_path(
self.blobdir
.parent()
.context("Can't get parent of blob dir")?
.to_path_buf(),
)
.bind(0)
.await?;
// create gossip
let my_addr = endpoint.my_addr().await?;
let gossip = Gossip::from_endpoint(endpoint.clone(), Default::default(), &my_addr.info);
// spawn endpoint loop that forwards incoming connections to the gossiper
let context = self.clone();
tokio::spawn(endpoint_loop(context, endpoint.clone(), gossip.clone()));
*self.gossip.lock().await = Some(gossip);
*self.endpoint.lock().await = Some(endpoint);
Ok(())
}
/// Join a topic and create the subscriber loop for it.
pub async fn join_and_subscribe_topic(&self, rfc724_mid: &str, msg_id: MsgId) -> Result<()> {
let Some(ref gossip) = *self.gossip.lock().await else {
warn!(
&self,
"Not joining topic for {rfc724_mid:?} because there is no gossip."
);
return Ok(());
};
let topic = get_topic_from_msg_id(rfc724_mid)?;
info!(&self, "Joining topic {topic}.");
// restore old peers from db, if any
let peers = self.get_peers_for_topic(&topic.to_string()).await?;
let connect_future = gossip.join(topic, peers).await?;
tokio::spawn(connect_future);
tokio::spawn(subscribe_loop(
self.clone(),
self.gossip
.lock()
.await
.as_ref()
.context("can't get gossip")?
.clone(),
topic,
msg_id,
));
Ok(())
}
/// Get list of [NodeId]s for one topic.
/// This is used to rejoin a gossip group when reopening the xdc.
/// Only [NodeId] is needed because the magic endpoint caches region and derp server for [NodeId]s.
pub async fn get_peers_for_topic(&self, topic: &str) -> Result<Vec<NodeId>> {
self.sql
.query_map(
"SELECT public_key FROM iroh_gossip_peers WHERE topic = ?",
(topic,),
|row| {
let data = row.get::<_, Vec<u8>>(0)?;
Ok(data)
},
|g| {
g.map(|data| {
Ok::<NodeId, anyhow::Error>(NodeId::from_bytes(
&data?
.try_into()
.map_err(|_| anyhow!("Can't convert sql data to [u8; 32]"))?,
)?)
})
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.await
}
/// Cache a peers [NodeId] for one topic.
pub async fn add_peer_for_topic(&self, topic: TopicId, peer: NodeId) -> Result<()> {
self.sql
.execute(
"INSERT INTO iroh_gossip_peers (public_key, topic) VALUES (?, ?)",
(peer.as_bytes(), topic.as_bytes()),
)
.await?;
Ok(())
}
/// Remove one cached peer from a topic.
pub async fn delete_peer_for_topic(&self, topic: TopicId, peer: NodeId) -> Result<()> {
self.sql
.execute(
"DELETE FROM iroh_gossip_peers WHERE public_key = ? topic = ?",
(peer.as_bytes(), topic.as_bytes()),
)
.await?;
Ok(())
}
/// Get the iroh gossip secret key from the database or create one.
pub async fn get_or_create_iroh_keypair(&self) -> Result<SecretKey> {
match self.get_config_parsed(Config::IrohSecretKey).await? {
Some(key) => Ok(key),
None => {
let key = SecretKey::generate();
self.set_config(Config::IrohSecretKey, Some(&key.to_string()))
.await?;
Ok(key)
}
}
}
}
async fn endpoint_loop(context: Context, endpoint: MagicEndpoint, gossip: Gossip) {
while let Some(conn) = endpoint.accept().await {
info!(context, "accepting connection with {:?}", conn);
let gossip = gossip.clone();
let context = context.clone();
tokio::spawn(async move {
if let Err(err) = handle_connection(&context, conn, gossip).await {
warn!(context, "iroh connection error: {err}");
}
});
}
}
async fn handle_connection(
context: &Context,
conn: quinn::Connecting,
gossip: Gossip,
) -> anyhow::Result<()> {
let (peer_id, alpn, conn) = accept_conn(conn).await?;
match alpn.as_bytes() {
GOSSIP_ALPN => gossip
.handle_connection(conn)
.await
.context(format!("Connection to {peer_id} with ALPN {alpn} failed"))?,
_ => info!(
context,
"Ignoring connection from {peer_id}: unsupported ALPN protocol"
),
}
Ok(())
}
async fn subscribe_loop(
context: Context,
gossip: Gossip,
topic: TopicId,
msg_id: MsgId,
) -> Result<()> {
let mut stream = gossip.subscribe(topic).await?;
loop {
let event = stream.recv().await?;
match event {
IrohEvent::NeighborUp(node) => {
context.add_peer_for_topic(topic, node).await?;
}
IrohEvent::NeighborDown(node) => {
context.delete_peer_for_topic(topic, node).await?;
}
IrohEvent::Received(event) => {
let payload = String::from_utf8_lossy(event.content.as_bytes());
let mut instance = Message::load_from_db(&context, msg_id).await?;
let update: StatusUpdateItem = serde_json::from_str(&payload)?;
context
.create_status_update_record(
&mut instance,
update,
time(),
false,
ContactId::SELF,
)
.await?;
}
};
}
}

View File

@@ -4,6 +4,7 @@ use std::collections::HashSet;
use std::convert::TryFrom;
use anyhow::{Context as _, Result};
use iroh_net::NodeAddr;
use mailparse::{parse_mail, SingleInfo};
use num_traits::FromPrimitive;
use once_cell::sync::Lazy;
@@ -38,7 +39,10 @@ use crate::simplify;
use crate::sql;
use crate::stock_str;
use crate::sync::Sync::*;
use crate::tools::{buf_compress, extract_grpid_from_rfc724_mid, strip_rtlo_characters};
use crate::tools::{
buf_compress, extract_grpid_from_rfc724_mid, get_topic_from_msg_id, smeared_time,
strip_rtlo_characters,
};
use crate::{contact, imap};
/// This is the struct that is returned after receiving one email (aka MIME message).
@@ -1456,6 +1460,44 @@ RETURNING id
created_db_entries.push(row_id);
}
// Connect to iroh gossip group if it has been advertised.
if incoming {
if let Some(node_addr) = mime_parser.get_header(HeaderDef::IrohPublicGossip) {
info!(context, "Create connection with node_addr: {node_addr}.");
match serde_json::from_str::<NodeAddr>(node_addr)
.context("Failed to parse node address")
{
Ok(node_addr) => {
context
.endpoint
.lock()
.await
.as_ref()
.context("Failed to get magic endpoint")?
.add_node_addr(node_addr.clone())
.context("Failed to add node address")?;
let rfc724_mid = mime_parser
.get_rfc724_mid()
.context("Can't get Message-ID")?;
let topic = get_topic_from_msg_id(&rfc724_mid)?;
let node_id = node_addr.node_id;
context
.add_peer_for_topic(topic, node_id)
.await
.with_context(|| {
format!("Failed to add peer {node_id} for topic {topic}")
})?;
}
Err(err) => {
warn!(context, "{err:#}.");
}
}
}
}
// check all parts whether they contain a new logging webxdc
for (part, msg_id) in mime_parser.parts.iter().zip(&created_db_entries) {
maybe_set_logging_xdc_inner(

View File

@@ -895,7 +895,13 @@ CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid);
SET backward_verified_key_id=(SELECT value FROM config WHERE keyname='key_id')
WHERE verified_key IS NOT NULL
"#,
109,
109)
}
if dbversion < 110 {
sql.execute_migration(
"CREATE TABLE iroh_gossip_peers (topic TEXT NOT NULL, public_key TEXT NOT NULL)",
108,
)
.await?;
}

View File

@@ -8,13 +8,14 @@ use std::fmt;
use std::io::{Cursor, Write};
use std::mem;
use std::path::{Path, PathBuf};
use std::str::from_utf8;
use std::str::{from_utf8, FromStr};
use std::time::{Duration, SystemTime};
use anyhow::{bail, Context as _, Result};
use base64::Engine as _;
use chrono::{Local, NaiveDateTime, NaiveTime, TimeZone};
use futures::{StreamExt, TryStreamExt};
use iroh_gossip::proto::TopicId;
use mailparse::dateparse;
use mailparse::headers::Headers;
use mailparse::MailHeaderMap;
@@ -743,6 +744,15 @@ pub(crate) fn strip_rtlo_characters(input_str: &str) -> String {
input_str.replace(|char| RTLO_CHARACTERS.contains(&char), "")
}
/// Generates a [TopicId] from some rfc724_mid.
pub(crate) fn get_topic_from_msg_id(rfc724_mid: &str) -> Result<TopicId> {
TopicId::from_str(&iroh_base::base32::fmt(
rfc724_mid
.get(0..32)
.context("Can't get 32 bytes from rfc724_mid")?,
))
}
#[cfg(test)]
mod tests {
#![allow(clippy::indexing_slicing)]

View File

@@ -38,6 +38,7 @@ use crate::mimeparser::SystemMessage;
use crate::param::Param;
use crate::param::Params;
use crate::tools::create_id;
use crate::tools::get_topic_from_msg_id;
use crate::tools::strip_rtlo_characters;
use crate::tools::{create_smeared_timestamp, get_abs_path};
@@ -154,7 +155,7 @@ struct StatusUpdates {
}
/// Update items as sent on the wire and as stored in the database.
#[derive(Debug, Serialize, Deserialize, Default)]
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
pub struct StatusUpdateItem {
/// The playload of the status update.
pub payload: Value,
@@ -181,6 +182,12 @@ pub struct StatusUpdateItem {
/// If there is no ID, message is always considered to be unique.
#[serde(skip_serializing_if = "Option::is_none")]
pub uid: Option<String>,
/// Wheter the message should be sent over an ephemeral channel.
/// This means it will only be received by the other side if they are currently online
/// and part of the gossip group.
#[serde(default)]
pub ephemeral: bool,
}
/// Update items as passed to the UIs.
@@ -305,7 +312,7 @@ impl Context {
/// Takes an update-json as `{payload: PAYLOAD}`
/// writes it to the database and handles events, info-messages, document name and summary.
async fn create_status_update_record(
pub(crate) async fn create_status_update_record(
&self,
instance: &mut Message,
status_update_item: StatusUpdateItem,
@@ -396,7 +403,6 @@ impl Context {
instance_id: &MsgId,
status_update_item: &StatusUpdateItem,
) -> Result<Option<StatusUpdateSerial>> {
let _lock = self.sql.write_lock().await;
let uid = status_update_item.uid.as_deref();
let Some(rowid) = self
.sql
@@ -490,7 +496,19 @@ impl Context {
MessageState::Undefined | MessageState::OutPreparing | MessageState::OutDraft
);
if send_now {
if let Some(ref gossip) = *self.gossip.lock().await {
let topic = get_topic_from_msg_id(&instance.rfc724_mid)?;
gossip
.broadcast(topic, serde_json::to_string(&status_update)?.into())
.await?;
}
}
status_update.uid = Some(create_id());
let ephemeral = status_update.ephemeral;
println!("ephemeral: {}", ephemeral);
let status_update_serial: StatusUpdateSerial = self
.create_status_update_record(
&mut instance,
@@ -499,11 +517,10 @@ impl Context {
send_now,
ContactId::SELF,
)
.await
.context("Failed to create status update")?
.context("Duplicate status update UID was generated")?;
.await?
.context("Failed to create status update")?;
if send_now {
if send_now && !ephemeral {
self.sql.insert(
"INSERT INTO smtp_status_updates (msg_id, first_serial, last_serial, descr) VALUES(?, ?, ?, ?)
ON CONFLICT(msg_id)
@@ -655,6 +672,10 @@ impl Context {
instance_msg_id: MsgId,
last_known_serial: StatusUpdateSerial,
) -> Result<String> {
let rfc724_mid = instance_msg_id.get_rfc724_mid(self).await?;
self.join_and_subscribe_topic(&rfc724_mid, instance_msg_id)
.await?;
let json = self
.sql
.query_map(
@@ -873,6 +894,8 @@ impl Message {
#[cfg(test)]
mod tests {
use std::time::Duration;
use serde_json::json;
use super::*;
@@ -1340,11 +1363,10 @@ mod tests {
// set_draft(None) deletes the message without the need to simulate network
chat_id.set_draft(&t, None).await?;
assert_eq!(
t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
.await?,
"[]".to_string()
);
assert!(t
.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
.await
.is_err());
assert_eq!(
t.sql
.count("SELECT COUNT(*) FROM msgs_status_updates;", ())
@@ -1376,6 +1398,7 @@ mod tests {
document: None,
summary: None,
uid: Some("iecie2Ze".to_string()),
ephemeral: false,
},
1640178619,
true,
@@ -1400,6 +1423,7 @@ mod tests {
document: None,
summary: None,
uid: Some("iecie2Ze".to_string()),
ephemeral: false,
},
1640178619,
true,
@@ -1433,6 +1457,7 @@ mod tests {
document: None,
summary: None,
uid: None,
ephemeral: false,
},
1640178619,
true,
@@ -1452,6 +1477,7 @@ mod tests {
document: None,
summary: None,
uid: None,
ephemeral: false,
},
1640178619,
true,
@@ -1677,6 +1703,43 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_send_ephemeral_webxdc_status_update() -> Result<()> {
let alice = TestContext::new_alice().await;
alice.set_config_bool(Config::BccSelf, true).await?;
let bob = TestContext::new_bob().await;
// Alice sends an webxdc instance and a status update
let alice_chat = alice.create_chat(&bob).await;
let alice_instance = send_webxdc_instance(&alice, alice_chat.id).await?;
alice
.send_webxdc_status_update(
alice_instance.id,
r#"{"payload" : {"foo":"bar"}}"#,
"descr text",
)
.await?;
alice.flush_status_updates().await?;
// Not setting ephemeral should prepare a message
alice.pop_sent_msg().await;
alice
.send_webxdc_status_update(
alice_instance.id,
r#"{"payload" : {"foo":"bar"}, "ephemeral": true}"#,
"descr text",
)
.await?;
alice.flush_status_updates().await?;
// Setting ephemeral should noot prepare a message
assert!(&alice
.pop_sent_msg_opt(Duration::from_secs(1))
.await
.is_none());
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_render_webxdc_status_update_object() -> Result<()> {
let t = TestContext::new_alice().await;