Compare commits

...

16 Commits

Author SHA1 Message Date
Septias
09e0b0083f add integration test 2024-03-11 16:31:14 +01:00
adz
367ffd91b2 Fix method name in tests 2024-03-11 14:44:06 +01:00
adz
dae6d6e450 Bring back code to persist iroh secret 2024-03-11 13:35:48 +01:00
adz
6d8dcdb40d If there's no peers we can't join the gossip? 2024-03-10 22:03:53 +01:00
adz
7a942ab27c Minor clean ups 2024-03-10 21:41:08 +01:00
Sebastian Klähn
29581c5ed9 logs 2024-02-21 16:42:24 +01:00
Sebastian Klähn
8385ba92c7 fixes 2024-02-09 09:10:56 +01:00
Sebastian Klähn
bd37c36143 more logs 2024-01-29 16:58:23 +01:00
Sebastian Klähn
4fb0002283 stufff 2024-01-29 16:12:57 +01:00
Septias
eca8ed3d56 use as_bytes everywhere 2024-01-29 16:12:02 +01:00
Sebastian Klähn
f5a7a22239 add self to topic and add pubkey 2024-01-26 15:25:31 +01:00
Sebastian Klähn
820a4b9357 send smpt-message for advertisement 2024-01-25 19:20:23 +01:00
Sebastian Klähn
d13d8d48ec add gossip join api
webxdcs can now have multiple gossip channels and decide where to send the message to.
2024-01-25 18:09:25 +01:00
Sebastian Klähn
5d775231d0 cleanup 2024-01-24 13:03:07 +01:00
Sebastian Klähn
60b240429a fix: rebase cleanup 2024-01-23 22:43:59 +01:00
Septias
70181493b0 feat: add iroh gossip peer channel 2024-01-23 22:33:55 +01:00
15 changed files with 2184 additions and 119 deletions

View File

@@ -82,9 +82,9 @@ jobs:
- os: macos-latest
rust: 1.75.0
# Minimum Supported Rust Version = 1.70.0
# Minimum Supported Rust Version = 1.72.0
- os: ubuntu-latest
rust: 1.70.0
rust: 1.72.0
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v3

1446
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -3,7 +3,7 @@ name = "deltachat"
version = "1.133.1"
edition = "2021"
license = "MPL-2.0"
rust-version = "1.70"
rust-version = "1.72"
[profile.dev]
debug = 0
@@ -32,6 +32,7 @@ strip = true
[patch.crates-io]
imap-proto = { git = "https://github.com/djc/tokio-imap.git", rev = "01ff256a7e42a9f7d2732706f8b71a16ce93427e" }
"iroh-blake3" = { git = "https://github.com/n0-computer/iroh-blake3.git", branch = "master" }
[dependencies]
deltachat_derive = { path = "./deltachat_derive" }
@@ -40,14 +41,26 @@ ratelimit = { path = "./deltachat-ratelimit" }
anyhow = "1"
async-channel = "2.0.0"
async-imap = { version = "0.9.5", default-features = false, features = ["runtime-tokio"] }
async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] }
async-smtp = { version = "0.9", default-features = false, features = ["runtime-tokio"] }
async_zip = { version = "0.0.12", default-features = false, features = ["deflate", "fs"] }
async-imap = { version = "0.9.5", default-features = false, features = [
"runtime-tokio",
] }
async-native-tls = { version = "0.5", default-features = false, features = [
"runtime-tokio",
] }
async-smtp = { version = "0.9", default-features = false, features = [
"runtime-tokio",
] }
async_zip = { version = "0.0.12", default-features = false, features = [
"deflate",
"fs",
] }
backtrace = "0.3"
base64 = "0.21"
brotli = { version = "3.4", default-features=false, features = ["std"] }
chrono = { version = "0.4", default-features=false, features = ["clock", "std"] }
brotli = { version = "3.4", default-features = false, features = ["std"] }
chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
] }
email = { git = "https://github.com/deltachat/rust-email", branch = "master" }
encoded-words = { git = "https://github.com/async-email/encoded-words", branch = "master" }
escaper = "0.1"
@@ -58,8 +71,22 @@ futures-lite = "2.0.0"
hex = "0.4.0"
hickory-resolver = "0.24"
humansize = "2"
image = { version = "0.24.7", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] }
iroh = { version = "0.4.2", default-features = false }
image = { version = "0.24.7", default-features = false, features = [
"gif",
"jpeg",
"ico",
"png",
"pnm",
"webp",
"bmp",
] }
iroh = { git = "https://github.com/deltachat/iroh", branch = "0.4-update-quic", default-features = false }
iroh-net = { git = "https://github.com/n0-computer/iroh", branch = "main" }
iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }
iroh-gossip = { git = "https://github.com/n0-computer/iroh", branch = "main", features = [
"net",
] }
quinn = "0.10"
kamadak-exif = "0.5"
lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" }
libc = "0.2"
@@ -93,7 +120,12 @@ strum_macros = "0.25"
tagger = "4.3.4"
textwrap = "0.16.0"
thiserror = "1"
tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros"] }
tokio = { version = "1", features = [
"fs",
"rt-multi-thread",
"macros",
"time",
] }
tokio-io-timeout = "1.2.0"
tokio-stream = { version = "0.1.14", features = ["fs"] }
tokio-tar = { version = "0.3" } # TODO: integrate tokio into async-tar
@@ -104,7 +136,9 @@ uuid = { version = "1", features = ["serde", "v4"] }
[dev-dependencies]
ansi_term = "0.12.0"
anyhow = { version = "1", features = ["backtrace"] } # Enable `backtrace` feature in tests.
anyhow = { version = "1", features = [
"backtrace",
] } # Enable `backtrace` feature in tests.
criterion = { version = "0.5.1", features = ["async_tokio"] }
futures-lite = "2.0.0"
log = "0.4"
@@ -112,7 +146,11 @@ pretty_env_logger = "0.5"
proptest = { version = "1", default-features = false, features = ["std"] }
tempfile = "3"
testdir = "0.9.0"
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "macros"] }
tokio = { version = "1", features = [
"parking_lot",
"rt-multi-thread",
"macros",
] }
pretty_assertions = "1.3.0"
[workspace]
@@ -160,5 +198,5 @@ internals = []
vendored = [
"async-native-tls/vendored",
"rusqlite/bundled-sqlcipher-vendored-openssl",
"reqwest/native-tls-vendored"
"reqwest/native-tls-vendored",
]

View File

@@ -15,7 +15,6 @@ use deltachat::constants::DC_MSG_ID_DAYMARKER;
use deltachat::contact::{may_be_valid_addr, Contact, ContactId, Origin};
use deltachat::context::get_info;
use deltachat::ephemeral::Timer;
use deltachat::imex;
use deltachat::location;
use deltachat::message::get_msg_read_receipts;
use deltachat::message::{
@@ -28,6 +27,7 @@ use deltachat::reaction::{get_msg_reactions, send_reaction};
use deltachat::securejoin;
use deltachat::stock_str::StockMessage;
use deltachat::webxdc::StatusUpdateSerial;
use deltachat::{imex, webxdc};
use sanitize_filename::is_sanitized;
use tokio::fs;
use tokio::sync::{watch, Mutex, RwLock};
@@ -1675,6 +1675,16 @@ impl CommandApi {
.await
}
async fn join_gossip_topic(
&self,
account_id: u32,
instance_msg_id: u32,
topic: String,
) -> Result<()> {
let ctx = self.get_context(account_id).await?;
webxdc::join_gossip_topic(&ctx, MsgId::new(instance_msg_id), &topic).await
}
async fn get_webxdc_status_updates(
&self,
account_id: u32,

115
deny.toml
View File

@@ -18,56 +18,58 @@ ignore = [
# when upgrading.
# Please keep this list alphabetically sorted.
skip = [
{ name = "async-channel", version = "1.9.0" },
{ name = "base16ct", version = "0.1.1" },
{ name = "base64", version = "<0.21" },
{ name = "bitflags", version = "1.3.2" },
{ name = "block-buffer", version = "<0.10" },
{ name = "convert_case", version = "0.4.0" },
{ name = "curve25519-dalek", version = "3.2.0" },
{ name = "darling_core", version = "<0.14" },
{ name = "darling_macro", version = "<0.14" },
{ name = "darling", version = "<0.14" },
{ name = "der", version = "0.6.1" },
{ name = "digest", version = "<0.10" },
{ name = "ed25519-dalek", version = "1.0.1" },
{ name = "ed25519", version = "1.5.3" },
{ name = "event-listener", version = "2.5.3" },
{ name = "getrandom", version = "<0.2" },
{ name = "h2", version = "0.3.22" },
{ name = "http-body", version = "0.4.5" },
{ name = "http", version = "0.2.11" },
{ name = "hyper", version = "0.14.27" },
{ name = "idna", version = "0.4.0" },
{ name = "pem-rfc7468", version = "0.6.0" },
{ name = "pkcs8", version = "0.9.0" },
{ name = "quick-error", version = "<2.0" },
{ name = "rand_chacha", version = "<0.3" },
{ name = "rand_core", version = "<0.6" },
{ name = "rand", version = "<0.8" },
{ name = "redox_syscall", version = "0.3.5" },
{ name = "regex-automata", version = "0.1.10" },
{ name = "regex-syntax", version = "0.6.29" },
{ name = "ring", version = "0.16.20" },
{ name = "sec1", version = "0.3.0" },
{ name = "sha2", version = "<0.10" },
{ name = "signature", version = "1.6.4" },
{ name = "spin", version = "<0.9.6" },
{ name = "spki", version = "0.6.0" },
{ name = "syn", version = "1.0.109" },
{ name = "time", version = "<0.3" },
{ name = "untrusted", version = "0.7.1" },
{ name = "wasi", version = "<0.11" },
{ name = "windows_aarch64_gnullvm", version = "<0.52" },
{ name = "windows_aarch64_msvc", version = "<0.52" },
{ name = "windows_i686_gnu", version = "<0.52" },
{ name = "windows_i686_msvc", version = "<0.52" },
{ name = "windows-sys", version = "<0.52" },
{ name = "windows-targets", version = "<0.52" },
{ name = "windows", version = "0.32.0" },
{ name = "windows_x86_64_gnullvm", version = "<0.52" },
{ name = "windows_x86_64_gnu", version = "<0.52" },
{ name = "windows_x86_64_msvc", version = "<0.52" },
{ name = "async-channel", version = "1.9.0" },
{ name = "base16ct", version = "0.1.1" },
{ name = "base64", version = "<0.21" },
{ name = "bitflags", version = "1.3.2" },
{ name = "block-buffer", version = "<0.10" },
{ name = "convert_case", version = "0.4.0" },
{ name = "curve25519-dalek", version = "3.2.0" },
{ name = "darling_core", version = "<0.14" },
{ name = "darling_macro", version = "<0.14" },
{ name = "darling", version = "<0.14" },
{ name = "der", version = "0.6.1" },
{ name = "digest", version = "<0.10" },
{ name = "ed25519-dalek", version = "1.0.1" },
{ name = "ed25519", version = "1.5.3" },
{ name = "event-listener", version = "2.5.3" },
{ name = "fd-lock", version = "3.0.13" },
{ name = "getrandom", version = "<0.2" },
{ name = "h2", version = "0.3.22" },
{ name = "http-body", version = "0.4.5" },
{ name = "http", version = "0.2.11" },
{ name = "hyper", version = "0.14.27" },
{ name = "idna", version = "0.4.0" },
{ name = "pem-rfc7468", version = "0.6.0" },
{ name = "pkcs8", version = "0.9.0" },
{ name = "quick-error", version = "<2.0" },
{ name = "rand_chacha", version = "<0.3" },
{ name = "rand_core", version = "<0.6" },
{ name = "rand", version = "<0.8" },
{ name = "redox_syscall", version = "0.3.5" },
{ name = "regex-automata", version = "0.1.10" },
{ name = "regex-syntax", version = "0.6.29" },
{ name = "ring", version = "0.16.20" },
{ name = "sec1", version = "0.3.0" },
{ name = "sha2", version = "<0.10" },
{ name = "signature", version = "1.6.4" },
{ name = "socket2", version = "0.4.9" },
{ name = "spin", version = "<0.9.6" },
{ name = "spki", version = "0.6.0" },
{ name = "syn", version = "1.0.109" },
{ name = "time", version = "<0.3" },
{ name = "untrusted", version = "0.7.1" },
{ name = "wasi", version = "<0.11" },
{ name = "windows_aarch64_gnullvm", version = "<0.52" },
{ name = "windows_aarch64_msvc", version = "<0.52" },
{ name = "windows_i686_gnu", version = "<0.52" },
{ name = "windows_i686_msvc", version = "<0.52" },
{ name = "windows-sys", version = "<0.52" },
{ name = "windows-targets", version = "<0.52" },
{ name = "windows", version = "0.32.0" },
{ name = "windows_x86_64_gnullvm", version = "<0.52" },
{ name = "windows_x86_64_gnu", version = "<0.52" },
{ name = "windows_x86_64_msvc", version = "<0.52" },
]
@@ -77,7 +79,7 @@ allow = [
"Apache-2.0",
"BSD-2-Clause",
"BSD-3-Clause",
"BSL-1.0", # Boost Software License 1.0
"BSL-1.0", # Boost Software License 1.0
"CC0-1.0",
"ISC",
"MIT",
@@ -90,14 +92,13 @@ allow = [
[[licenses.clarify]]
name = "ring"
expression = "MIT AND ISC AND OpenSSL"
license-files = [
{ path = "LICENSE", hash = 0xbd0eed23 },
]
license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }]
[sources.allow-org]
# Organisations which we allow git sources from.
github = [
"async-email",
"deltachat",
"djc",
"async-email",
"deltachat",
"djc",
"n0-computer", # iroh
]

View File

@@ -7,7 +7,7 @@ use std::str::FromStr;
use anyhow::{ensure, Context as _, Result};
use serde::{Deserialize, Serialize};
use strum::{EnumProperty, IntoEnumIterator};
use strum_macros::{AsRefStr, Display, EnumIter, EnumProperty, EnumString};
use strum_macros::{AsRefStr, Display, EnumIter, EnumString};
use crate::blob::BlobObject;
use crate::constants::DC_VERSION_STR;
@@ -347,6 +347,9 @@ pub enum Config {
/// Row ID of the key in the `keypairs` table
/// used for signatures, encryption to self and included in `Autocrypt` header.
KeyId,
/// Iroh secret key.
IrohSecretKey,
}
impl Config {

View File

@@ -10,6 +10,8 @@ use std::time::{Duration, Instant, SystemTime};
use anyhow::{bail, ensure, Context as _, Result};
use async_channel::{self as channel, Receiver, Sender};
use iroh_gossip::net::Gossip;
use iroh_net::MagicEndpoint;
use ratelimit::Ratelimit;
use tokio::sync::{Mutex, Notify, RwLock};
@@ -244,6 +246,12 @@ pub struct InnerContext {
/// Standard RwLock instead of [`tokio::sync::RwLock`] is used
/// because the lock is used from synchronous [`Context::emit_event`].
pub(crate) debug_logging: std::sync::RwLock<Option<DebugLogging>>,
/// [MagicEndpoint] needed for iroh peer channels.
pub(crate) endpoint: Mutex<Option<MagicEndpoint>>,
/// [Gossip] needed for iroh peer channels.
pub(crate) gossip: Mutex<Option<Gossip>>,
}
/// The state of ongoing process.
@@ -388,6 +396,8 @@ impl Context {
last_full_folder_scan: Mutex::new(None),
last_error: std::sync::RwLock::new("".to_string()),
debug_logging: std::sync::RwLock::new(None),
endpoint: Mutex::new(None),
gossip: Mutex::new(None),
};
let ctx = Context {
@@ -417,11 +427,17 @@ impl Context {
*lock = Ratelimit::new(Duration::new(3, 0), 3.0);
}
}
if let Err(e) = self.create_gossip().await {
warn!(self, "{e}");
}
self.scheduler.start(self.clone()).await;
}
/// Stops the IO scheduler.
pub async fn stop_io(&self) {
self.endpoint.lock().await.take();
self.gossip.lock().await.take();
self.scheduler.stop(self).await;
}
@@ -433,6 +449,9 @@ impl Context {
/// Indicate that the network likely has come back.
pub async fn maybe_network(&self) {
if let Some(ref mut endpoint) = *self.endpoint.lock().await {
endpoint.network_change().await;
}
self.scheduler.maybe_network().await;
}
@@ -1319,6 +1338,7 @@ mod tests {
"socks5_user",
"socks5_password",
"key_id",
"iroh_secret_key",
];
let t = TestContext::new().await;
let info = t.get_info().await.unwrap();
@@ -1605,4 +1625,15 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_keypair_saving() -> Result<()> {
let alice = TestContext::new_alice().await;
let key = alice.get_or_generate_iroh_keypair().await?;
let loaded_key = alice.get_or_generate_iroh_keypair().await?;
assert_eq!(key.to_bytes(), loaded_key.to_bytes());
Ok(())
}
}

View File

@@ -63,6 +63,7 @@ pub async fn debug_logging_loop(context: &Context, events: Receiver<DebugEventLo
summary: None,
document: None,
uid: None,
gossip_topic: None,
},
)
.await
@@ -72,12 +73,10 @@ pub async fn debug_logging_loop(context: &Context, events: Receiver<DebugEventLo
}
Ok(serial) => {
if let Some(serial) = serial {
if !matches!(event, EventType::WebxdcStatusUpdate { .. }) {
context.emit_event(EventType::WebxdcStatusUpdate {
msg_id,
status_update_serial: serial,
});
}
context.emit_event(EventType::WebxdcStatusUpdate {
msg_id,
status_update_serial: serial,
});
} else {
// This should not happen as the update has no `uid`.
error!(context, "Debug logging update is not created.");

View File

@@ -88,6 +88,9 @@ pub enum HeaderDef {
/// See <https://datatracker.ietf.org/doc/html/rfc8601>
AuthenticationResults,
/// Public key to join gossip network.
IrohPublicGossip,
#[cfg(test)]
TestHeader,
}

View File

@@ -105,6 +105,7 @@ pub mod receive_imf;
pub mod tools;
pub mod accounts;
pub mod peer_channels;
pub mod reaction;
/// If set IMAP/incoming and SMTP/outgoing MIME messages will be printed.

View File

@@ -18,6 +18,7 @@ use crate::contact::Contact;
use crate::context::Context;
use crate::e2ee::EncryptHelper;
use crate::ephemeral::Timer as EphemeralTimer;
use crate::headerdef::HeaderDef;
use crate::html::new_html_mimepart;
use crate::location;
use crate::message::{self, Message, MsgId, Viewtype};
@@ -1274,6 +1275,7 @@ impl<'a> MimeFactory<'a> {
}
}
println!("hiiiii");
// we do not piggyback sync-files to other self-sent-messages
// to not risk files becoming too larger and being skipped by download-on-demand.
if command == SystemMessage::MultiDeviceSync && self.is_e2ee_guaranteed() {
@@ -1283,6 +1285,16 @@ impl<'a> MimeFactory<'a> {
self.sync_ids_to_delete = Some(ids.to_string());
} else if command == SystemMessage::WebxdcStatusUpdate {
let json = self.msg.param.get(Param::Arg).unwrap_or_default();
if json.find("gossip_topic").is_some() {
if let Some(ref endpoint) = *context.endpoint.lock().await {
// Add iroh NodeAddr to headers so peers can connect to us.
let node_addr = endpoint.my_addr().await.unwrap();
headers.protected.push(Header::new(
HeaderDef::IrohPublicGossip.get_headername().to_string(),
serde_json::to_string(&node_addr)?,
));
}
}
parts.push(context.build_status_update_part(json));
} else if self.msg.viewtype == Viewtype::Webxdc {
if let Some(json) = context

413
src/peer_channels.rs Normal file
View File

@@ -0,0 +1,413 @@
//! Peer channels for webxdc updates using iroh.
use anyhow::{anyhow, Context as _, Result};
use image::EncodableLayout;
use iroh_base::base32;
use iroh_gossip::net::{Gossip, GOSSIP_ALPN};
use iroh_gossip::proto::{Event as IrohEvent, TopicId};
use iroh_net::magic_endpoint::accept_conn;
use iroh_net::NodeId;
use iroh_net::{derp::DerpMode, key::SecretKey, MagicEndpoint};
use crate::config::Config;
use crate::contact::ContactId;
use crate::context::Context;
use crate::message::{Message, MsgId};
use crate::tools::time;
use crate::webxdc::StatusUpdateItem;
impl Context {
/// Create magic endpoint and gossip for the context.
pub async fn create_gossip(&self) -> Result<()> {
let secret_key: SecretKey = self.get_or_generate_iroh_keypair().await?;
println!("> our secret key: {}", base32::fmt(secret_key.to_bytes()));
if self.endpoint.lock().await.is_some() {
warn!(
self,
"Tried to create endpoint even though there is already one."
);
return Ok(());
}
// build magic endpoint
let endpoint = MagicEndpoint::builder()
.secret_key(secret_key)
.alpns(vec![GOSSIP_ALPN.to_vec()])
.derp_mode(DerpMode::Default)
.bind(0)
.await?;
// create gossip
let my_addr = endpoint.my_addr().await?;
let gossip = Gossip::from_endpoint(endpoint.clone(), Default::default(), &my_addr.info);
// spawn endpoint loop that forwards incoming connections to the gossiper
let context = self.clone();
tokio::spawn(endpoint_loop(context, endpoint.clone(), gossip.clone()));
*self.gossip.lock().await = Some(gossip);
*self.endpoint.lock().await = Some(endpoint);
Ok(())
}
/// Join a topic and create the subscriber loop for it.
pub async fn join_and_subscribe_topic(&self, topic: TopicId, msg_id: MsgId) -> Result<()> {
info!(&self, "Joining topic {}.", topic.to_string());
let Some(ref gossip) = *self.gossip.lock().await else {
warn!(
self,
"Not joining topic {topic} because there is no gossip."
);
return Ok(());
};
// restore old peers from db, if any
let peers = self.get_peers_for_topic(topic).await?;
if peers.len() == 0 {
// TODO: When there's no peers we will never be able to join the gossip?
warn!(self, "joining gossip with zero peers");
} else {
info!(self, "joining gossip with peers: {peers:?}");
info!(
self,
"{:?}",
self.endpoint
.lock()
.await
.as_ref()
.unwrap()
.my_addr()
.await?
);
}
// TODO: add timeout as the returned future might be pending forever
let connect_future = gossip.join(topic, peers).await?;
tokio::spawn(connect_future);
tokio::spawn(subscribe_loop(self.clone(), gossip.clone(), topic, msg_id));
Ok(())
}
/// Get list of [NodeId]s for one topic.
/// This is used to rejoin a gossip group when reopening the xdc.
/// Only [NodeId] is needed because the magic endpoint caches region and derp server for [NodeId]s.
pub async fn get_peers_for_topic(&self, topic: TopicId) -> Result<Vec<NodeId>> {
self.sql
.query_map(
"SELECT public_key FROM iroh_gossip_peers WHERE topic = ?",
(topic.as_bytes(),),
|row| {
let data = row.get::<_, Vec<u8>>(0)?;
Ok(data)
},
|g| {
g.map(|data| {
Ok::<NodeId, anyhow::Error>(NodeId::from_bytes(
&data?
.try_into()
.map_err(|_| anyhow!("Can't convert sql data to [u8; 32]"))?,
)?)
})
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.await
}
/// Cache a peers [NodeId] for one topic.
pub async fn add_peer_for_topic(
&self,
msg_id: MsgId,
topic: TopicId,
peer: NodeId,
) -> Result<()> {
self.sql
.execute(
"INSERT INTO iroh_gossip_peers (msg_id, public_key, topic) VALUES (?, ?, ?)",
(msg_id, peer.as_bytes(), topic.as_bytes()),
)
.await?;
Ok(())
}
/// Remove one cached peer from a topic.
pub async fn delete_peer_for_topic(&self, topic: TopicId, peer: NodeId) -> Result<()> {
self.sql
.execute(
"DELETE FROM iroh_gossip_peers WHERE public_key = ? topic = ?",
(peer.as_bytes(), topic.as_bytes()),
)
.await?;
Ok(())
}
/// Get the iroh gossip secret key from the database or generate a new one and persist it.
pub async fn get_or_generate_iroh_keypair(&self) -> Result<SecretKey> {
match self.get_config_parsed(Config::IrohSecretKey).await? {
Some(key) => Ok(key),
None => {
let key = SecretKey::generate();
self.set_config(Config::IrohSecretKey, Some(&key.to_string()))
.await?;
Ok(key)
}
}
}
}
async fn endpoint_loop(context: Context, endpoint: MagicEndpoint, gossip: Gossip) {
while let Some(conn) = endpoint.accept().await {
info!(context, "accepting connection with {:?}", conn);
let gossip = gossip.clone();
let context = context.clone();
tokio::spawn(async move {
if let Err(err) = handle_connection(&context, conn, gossip).await {
warn!(context, "iroh connection error: {err}");
}
});
}
}
async fn handle_connection(
context: &Context,
conn: quinn::Connecting,
gossip: Gossip,
) -> anyhow::Result<()> {
let (peer_id, alpn, conn) = accept_conn(conn).await?;
match alpn.as_bytes() {
GOSSIP_ALPN => gossip
.handle_connection(conn)
.await
.context(format!("Connection to {peer_id} with ALPN {alpn} failed"))?,
_ => info!(
context,
"Ignoring connection from {peer_id}: unsupported ALPN protocol"
),
}
Ok(())
}
async fn subscribe_loop(
context: Context,
gossip: Gossip,
topic: TopicId,
msg_id: MsgId,
) -> Result<()> {
let mut stream = gossip.subscribe(topic).await?;
loop {
let event = stream.recv().await?;
match event {
IrohEvent::NeighborUp(node) => {
info!(context, "NeighborUp: {:?}", node);
context.add_peer_for_topic(msg_id, topic, node).await?;
}
IrohEvent::NeighborDown(node) => {
info!(context, "NeighborDown: {:?}", node);
context.delete_peer_for_topic(topic, node).await?;
}
IrohEvent::Received(event) => {
info!(context, "Received: {:?}", event);
let payload = String::from_utf8_lossy(event.content.as_bytes());
let mut instance = Message::load_from_db(&context, msg_id).await?;
let update: StatusUpdateItem = serde_json::from_str(&payload)?;
context
.create_status_update_record(
&mut instance,
update,
time(),
false,
ContactId::SELF,
)
.await?;
}
};
}
}
#[cfg(test)]
mod tests {
use std::{os::unix::thread, str::FromStr, time::Duration};
use tokio::time::timeout;
use crate::{
chat::send_msg,
message::Viewtype,
test_utils::TestContextManager,
webxdc::{join_gossip_topic, StatusUpdateSerial},
EventType,
};
use super::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_can_connect() {
let mut tcm = TestContextManager::new();
let alice = &mut tcm.alice().await;
let bob = &mut tcm.bob().await;
alice.ctx.start_io().await;
bob.ctx.start_io().await;
// Alice sends webxdc to bob
let alice_chat = alice.create_chat(bob).await;
let mut instance = Message::new(Viewtype::File);
instance
.set_file_from_bytes(
alice,
"minimal.xdc",
include_bytes!("../test-data/webxdc/minimal.xdc"),
None,
)
.await
.unwrap();
send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
let alice_instance = alice.get_last_msg().await;
assert_eq!(alice_instance.get_viewtype(), Viewtype::Webxdc);
let webxdc = alice.pop_sent_msg().await;
let bob_webdxc = bob.recv_msg(&webxdc).await;
bob_webdxc.chat_id.accept(bob).await.unwrap();
assert_eq!(bob_webdxc.get_viewtype(), Viewtype::Webxdc);
// Alice sends webxdc update with gossip.
// This produces an SMTP message that contains the topic and a header with alices' node id
alice
.send_webxdc_status_update_struct(
alice_instance.id,
StatusUpdateItem {
payload: "test".to_string().into(),
gossip_topic: Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string()),
..Default::default()
},
"",
)
.await
.unwrap();
alice.flush_status_updates().await.unwrap();
bob.recv_msg(&alice.pop_sent_msg().await).await;
let status = bob
.get_webxdc_status_updates(bob_webdxc.id, StatusUpdateSerial::new(0))
.await
.unwrap();
let status_update_items: Vec<StatusUpdateItem> = serde_json::from_str(&status).unwrap();
let topic = status_update_items[0].gossip_topic.as_ref().unwrap();
assert_eq!(topic, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
let topic_id = TopicId::from_str(&iroh_base::base32::fmt(topic)).unwrap();
let topics = bob.get_peers_for_topic(topic_id).await.unwrap();
assert_eq!(
topics,
vec![alice.endpoint.lock().await.as_ref().unwrap().node_id()]
);
let mut stream = alice
.ctx
.gossip
.lock()
.await
.as_ref()
.unwrap()
.subscribe(topic_id)
.await
.unwrap();
// Bob joins topic
join_gossip_topic(bob, bob_webdxc.id, topic).await.unwrap();
let event = timeout(Duration::from_secs(5), stream.recv())
.await
.unwrap()
.unwrap();
match event {
IrohEvent::NeighborUp(node) => {
assert_eq!(node, bob.endpoint.lock().await.as_ref().unwrap().node_id());
}
_ => panic!("Expected NeighborUp event"),
}
// Bob sends webxdc update with gossip.
bob.send_webxdc_status_update_struct(
bob_webdxc.id,
StatusUpdateItem {
payload: "bob -> alice".to_string().into(),
gossip_topic: Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string()),
..Default::default()
},
"",
)
.await
.unwrap();
alice.evtracker.try_recv().unwrap();
while let Ok(event) = alice.evtracker.try_recv() {
if let EventType::WebxdcStatusUpdate {
msg_id,
status_update_serial,
} = event.typ
{
let status_update = alice
.get_status_update(msg_id, status_update_serial)
.await
.unwrap();
let status_update_item: StatusUpdateItem =
serde_json::from_str(&status_update).unwrap();
println!("{:?}", status_update_item.payload.to_string());
if status_update_item
.payload
.to_string()
.contains("bob -> alice")
{
break;
}
}
}
// Alice sends webxdc update with gossip.
alice
.send_webxdc_status_update_struct(
bob_webdxc.id,
StatusUpdateItem {
payload: "alice -> bob".to_string().into(),
gossip_topic: Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string()),
..Default::default()
},
"",
)
.await
.unwrap();
while let Ok(event) = bob.evtracker.try_recv() {
if let EventType::WebxdcStatusUpdate {
msg_id,
status_update_serial,
} = event.typ
{
let status_update = alice
.get_status_update(msg_id, status_update_serial)
.await
.unwrap();
let status_update_item: StatusUpdateItem =
serde_json::from_str(&status_update).unwrap();
if status_update_item
.payload
.to_string()
.contains("alice -> bob")
{
break;
}
}
}
}
}

View File

@@ -4,6 +4,7 @@ use std::collections::HashSet;
use std::convert::TryFrom;
use anyhow::{Context as _, Result};
use iroh_net::NodeAddr;
use mailparse::{parse_mail, SingleInfo};
use num_traits::FromPrimitive;
use once_cell::sync::Lazy;
@@ -433,11 +434,49 @@ pub(crate) async fn receive_imf_inner(
}
if let Some(ref status_update) = mime_parser.webxdc_status_update {
if let Err(err) = context
match context
.receive_status_update(from_id, insert_msg_id, status_update)
.await
{
warn!(context, "receive_imf cannot update status: {err:#}.");
// join advertised gossip topics
Ok((topics, instance_id)) => {
warn!(context, "Joining topics: {:#?}", topics);
if let Some(node_addr) = mime_parser.get_header(HeaderDef::IrohPublicGossip) {
match serde_json::from_str::<NodeAddr>(node_addr)
.context("Failed to parse node address")
{
Ok(node_addr) => {
context
.endpoint
.lock()
.await
.as_ref()
.context("Failed to get magic endpoint")?
.add_node_addr(node_addr.clone())
.context("Failed to add node address")?;
let node_id = node_addr.node_id;
for topic in topics {
println!("Adding peer: {:?}", node_id);
context
.add_peer_for_topic(instance_id, topic, node_id)
.await?;
println!(
"New peer topics: {:?}",
context.get_peers_for_topic(topic).await?
);
}
}
Err(err) => {
warn!(context, "couldn't parse NodeAddr: {err}");
}
}
} else {
error!(context, "No IrohPublicGossip header found");
}
}
Err(err) => warn!(context, "receive_imf cannot update status: {err:#}."),
}
}

View File

@@ -900,6 +900,14 @@ CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid);
.await?;
}
if dbversion < 110 {
sql.execute_migration(
"CREATE TABLE iroh_gossip_peers (msg_id TEXT not NULL, topic TEXT NOT NULL, public_key TEXT NOT NULL)",
110,
)
.await?;
}
let new_version = sql
.get_raw_config_int(VERSION_CFG)
.await?

View File

@@ -16,10 +16,11 @@
//! - `descr` - text to send along with the updates
use std::path::Path;
use std::str::FromStr;
use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
use deltachat_derive::FromSql;
use iroh_gossip::proto::TopicId;
use lettre_email::mime;
use lettre_email::PartBuilder;
use serde::{Deserialize, Serialize};
@@ -154,7 +155,7 @@ struct StatusUpdates {
}
/// Update items as sent on the wire and as stored in the database.
#[derive(Debug, Serialize, Deserialize, Default)]
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
pub struct StatusUpdateItem {
/// The playload of the status update.
pub payload: Value,
@@ -181,6 +182,12 @@ pub struct StatusUpdateItem {
/// If there is no ID, message is always considered to be unique.
#[serde(skip_serializing_if = "Option::is_none")]
pub uid: Option<String>,
/// If this update should only be gossiped and which topic to use.
/// Gossiped Updates will only be received by the other side if they
/// are currently online and part of the gossip topic.
#[serde(default)]
pub gossip_topic: Option<String>,
}
/// Update items as passed to the UIs.
@@ -305,7 +312,7 @@ impl Context {
/// Takes an update-json as `{payload: PAYLOAD}`
/// writes it to the database and handles events, info-messages, document name and summary.
async fn create_status_update_record(
pub(crate) async fn create_status_update_record(
&self,
instance: &mut Message,
status_update_item: StatusUpdateItem,
@@ -396,7 +403,6 @@ impl Context {
instance_id: &MsgId,
status_update_item: &StatusUpdateItem,
) -> Result<Option<StatusUpdateSerial>> {
let _lock = self.sql.write_lock().await;
let uid = status_update_item.uid.as_deref();
let Some(rowid) = self
.sql
@@ -490,6 +496,47 @@ impl Context {
MessageState::Undefined | MessageState::OutPreparing | MessageState::OutDraft
);
let mut ephemeral = status_update.gossip_topic.is_some();
if send_now {
if let Some(ref topic) = status_update.gossip_topic {
let topic = TopicId::from_str(&iroh_base::base32::fmt(
topic.get(0..32).context("Can't get 32 bytes from topic")?,
))?;
let topic_exists = self
.sql
.query_row_optional(
"SELECT 1 FROM iroh_gossip_peers WHERE topic=?",
(topic.as_bytes(),),
|_| Ok(()),
)
.await
.context("Failed to check if gossip topic exists")?
.is_some();
if !topic_exists {
info!(
self,
"Gossip topic {topic} does not exist, sending over smtp",
);
self.join_and_subscribe_topic(topic, instance_msg_id)
.await
.context("Failed to join and subscribe to gossip topic")?;
ephemeral = false;
} else {
if let Some(ref gossip) = *self.gossip.lock().await {
println!(
"sending to topic {topic} with peers: {:?}",
self.get_peers_for_topic(topic).await?
);
gossip
.broadcast(topic, serde_json::to_string(&status_update)?.into())
.await?;
}
}
}
}
status_update.uid = Some(create_id());
let status_update_serial: StatusUpdateSerial = self
.create_status_update_record(
@@ -499,11 +546,10 @@ impl Context {
send_now,
ContactId::SELF,
)
.await
.context("Failed to create status update")?
.context("Duplicate status update UID was generated")?;
.await?
.context("Failed to create status update")?;
if send_now {
if send_now && !ephemeral {
self.sql.insert(
"INSERT INTO smtp_status_updates (msg_id, first_serial, last_serial, descr) VALUES(?, ?, ?, ?)
ON CONFLICT(msg_id)
@@ -595,12 +641,14 @@ impl Context {
///
/// `json` is an array containing one or more update items as created by send_webxdc_status_update(),
/// the array is parsed using serde, the single payloads are used as is.
///
/// Returns: List of topics that have been advertised in the updates and the [MsgId] of the instance.
pub(crate) async fn receive_status_update(
&self,
from_id: ContactId,
msg_id: MsgId,
json: &str,
) -> Result<()> {
) -> Result<(Vec<TopicId>, MsgId)> {
let msg = Message::load_from_db(self, msg_id).await?;
let (timestamp, mut instance, can_info_msg) = if msg.viewtype == Viewtype::Webxdc {
(msg.timestamp_sort, msg, false)
@@ -629,7 +677,15 @@ impl Context {
}
let updates: StatusUpdates = serde_json::from_str(json)?;
let mut topics = Vec::new();
for update_item in updates.updates {
if let Some(ref topic) = update_item.gossip_topic {
let topic = TopicId::from_str(&iroh_base::base32::fmt(
topic.get(0..32).context("Can't get 32 bytes from topic")?,
))?;
topics.push(topic);
}
self.create_status_update_record(
&mut instance,
update_item,
@@ -640,7 +696,7 @@ impl Context {
.await?;
}
Ok(())
Ok((topics, instance.id))
}
/// Returns status updates as an JSON-array, ready to be consumed by a webxdc.
@@ -871,9 +927,18 @@ impl Message {
}
}
/// Join a gossip topic and subscribe to it.
pub async fn join_gossip_topic(ctx: &Context, msg_id: MsgId, topic: &str) -> Result<()> {
let topic = TopicId::from_str(&iroh_base::base32::fmt(
topic.get(0..32).context("Can't get 32 bytes from topic")?,
))?;
ctx.join_and_subscribe_topic(topic, msg_id).await
}
#[cfg(test)]
mod tests {
use serde_json::json;
use std::time::Duration;
use super::*;
use crate::chat::{
@@ -1340,11 +1405,10 @@ mod tests {
// set_draft(None) deletes the message without the need to simulate network
chat_id.set_draft(&t, None).await?;
assert_eq!(
t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
.await?,
"[]".to_string()
);
assert!(t
.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
.await
.is_err());
assert_eq!(
t.sql
.count("SELECT COUNT(*) FROM msgs_status_updates;", ())
@@ -1376,6 +1440,7 @@ mod tests {
document: None,
summary: None,
uid: Some("iecie2Ze".to_string()),
gossip_topic: None,
},
1640178619,
true,
@@ -1400,6 +1465,7 @@ mod tests {
document: None,
summary: None,
uid: Some("iecie2Ze".to_string()),
gossip_topic: None,
},
1640178619,
true,
@@ -1433,6 +1499,7 @@ mod tests {
document: None,
summary: None,
uid: None,
gossip_topic: None,
},
1640178619,
true,
@@ -1452,6 +1519,7 @@ mod tests {
document: None,
summary: None,
uid: None,
gossip_topic: None,
},
1640178619,
true,
@@ -1677,6 +1745,43 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_send_ephemeral_webxdc_status_update() -> Result<()> {
let alice = TestContext::new_alice().await;
alice.set_config_bool(Config::BccSelf, true).await?;
let bob = TestContext::new_bob().await;
// Alice sends an webxdc instance and a status update
let alice_chat = alice.create_chat(&bob).await;
let alice_instance = send_webxdc_instance(&alice, alice_chat.id).await?;
alice
.send_webxdc_status_update(
alice_instance.id,
r#"{"payload" : {"foo":"bar"}}"#,
"descr text",
)
.await?;
alice.flush_status_updates().await?;
// Not setting ephemeral should prepare a message
alice.pop_sent_msg().await;
alice
.send_webxdc_status_update(
alice_instance.id,
r#"{"payload" : {"foo":"bar"}, "ephemeral": true}"#,
"descr text",
)
.await?;
alice.flush_status_updates().await?;
// Setting ephemeral should noot prepare a message
assert!(&alice
.pop_sent_msg_opt(Duration::from_secs(1))
.await
.is_none());
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_render_webxdc_status_update_object() -> Result<()> {
let t = TestContext::new_alice().await;