feat: upgrade to iroh@0.29.0

- iroh-net -> iroh
- iroh-gossip uses hex by default, use base32 manually to keep backwards compat
- use the new `iroh::protocol::Router` to manage the gossip integration
This commit is contained in:
dignifiedquire
2024-12-04 19:04:10 +01:00
parent 6468806d86
commit ea5c159165
6 changed files with 471 additions and 398 deletions

746
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -63,8 +63,9 @@ humansize = "2"
hyper = "1" hyper = "1"
hyper-util = "0.1.10" hyper-util = "0.1.10"
image = { version = "0.25.5", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] } image = { version = "0.25.5", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] }
iroh-gossip = { version = "0.28.1", default-features = false, features = ["net"] } iroh-gossip = { version = "0.29", default-features = false, features = ["net"] }
iroh-net = { version = "0.28.1", default-features = false } iroh = { version = "0.29", default-features = false }
iroh-base = { version = "0.29", features = ["base32"] }
kamadak-exif = "0.6.1" kamadak-exif = "0.6.1"
lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" } lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" }
libc = { workspace = true } libc = { workspace = true }

View File

@@ -33,8 +33,7 @@ use std::task::Poll;
use anyhow::{bail, format_err, Context as _, Result}; use anyhow::{bail, format_err, Context as _, Result};
use futures_lite::FutureExt; use futures_lite::FutureExt;
use iroh_net::relay::RelayMode; use iroh::{Endpoint, RelayMode};
use iroh_net::Endpoint;
use tokio::fs; use tokio::fs;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@@ -65,11 +64,11 @@ const BACKUP_ALPN: &[u8] = b"/deltachat/backup";
/// task use the [`Context::stop_ongoing`] mechanism. /// task use the [`Context::stop_ongoing`] mechanism.
#[derive(Debug)] #[derive(Debug)]
pub struct BackupProvider { pub struct BackupProvider {
/// iroh-net endpoint. /// iroh endpoint.
_endpoint: Endpoint, _endpoint: Endpoint,
/// iroh-net address. /// iroh address.
node_addr: iroh_net::NodeAddr, node_addr: iroh::NodeAddr,
/// Authentication token that should be submitted /// Authentication token that should be submitted
/// to retrieve the backup. /// to retrieve the backup.
@@ -162,7 +161,7 @@ impl BackupProvider {
async fn handle_connection( async fn handle_connection(
context: Context, context: Context,
conn: iroh_net::endpoint::Connecting, conn: iroh::endpoint::Connecting,
auth_token: String, auth_token: String,
dbfile: Arc<TempPathGuard>, dbfile: Arc<TempPathGuard>,
) -> Result<()> { ) -> Result<()> {
@@ -291,7 +290,7 @@ impl Future for BackupProvider {
pub async fn get_backup2( pub async fn get_backup2(
context: &Context, context: &Context,
node_addr: iroh_net::NodeAddr, node_addr: iroh::NodeAddr,
auth_token: String, auth_token: String,
) -> Result<()> { ) -> Result<()> {
let relay_mode = RelayMode::Disabled; let relay_mode = RelayMode::Disabled;
@@ -337,7 +336,7 @@ pub async fn get_backup2(
/// This is a long running operation which will return only when completed. /// This is a long running operation which will return only when completed.
/// ///
/// Using [`Qr`] as argument is a bit odd as it only accepts specific variant of it. It /// Using [`Qr`] as argument is a bit odd as it only accepts specific variant of it. It
/// does avoid having [`iroh_net::NodeAddr`] in the primary API however, without /// does avoid having [`iroh::NodeAddr`] in the primary API however, without
/// having to revert to untyped bytes. /// having to revert to untyped bytes.
pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> { pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
match qr { match qr {

View File

@@ -26,15 +26,14 @@
use anyhow::{anyhow, bail, Context as _, Result}; use anyhow::{anyhow, bail, Context as _, Result};
use email::Header; use email::Header;
use futures_lite::StreamExt; use futures_lite::StreamExt;
use iroh::key::{PublicKey, SecretKey};
use iroh::{Endpoint, NodeAddr, NodeId, RelayMap, RelayMode, RelayUrl};
use iroh_gossip::net::{Event, Gossip, GossipEvent, JoinOptions, GOSSIP_ALPN}; use iroh_gossip::net::{Event, Gossip, GossipEvent, JoinOptions, GOSSIP_ALPN};
use iroh_gossip::proto::TopicId; use iroh_gossip::proto::TopicId;
use iroh_net::key::{PublicKey, SecretKey};
use iroh_net::relay::{RelayMap, RelayUrl};
use iroh_net::{relay::RelayMode, Endpoint};
use iroh_net::{NodeAddr, NodeId};
use parking_lot::Mutex; use parking_lot::Mutex;
use std::collections::{BTreeSet, HashMap}; use std::collections::{BTreeSet, HashMap};
use std::env; use std::env;
use std::sync::Arc;
use tokio::sync::{oneshot, RwLock}; use tokio::sync::{oneshot, RwLock};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use url::Url; use url::Url;
@@ -54,11 +53,11 @@ const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();
/// Store iroh peer channels for the context. /// Store iroh peer channels for the context.
#[derive(Debug)] #[derive(Debug)]
pub struct Iroh { pub struct Iroh {
/// [Endpoint] needed for iroh peer channels. /// iroh router needed for iroh peer channels.
pub(crate) endpoint: Endpoint, pub(crate) router: iroh::protocol::Router,
/// [Gossip] needed for iroh peer channels. /// [Gossip] needed for iroh peer channels.
pub(crate) gossip: Gossip, pub(crate) gossip: Arc<Gossip>,
/// Sequence numbers for gossip channels. /// Sequence numbers for gossip channels.
pub(crate) sequence_numbers: Mutex<HashMap<TopicId, i32>>, pub(crate) sequence_numbers: Mutex<HashMap<TopicId, i32>>,
@@ -75,15 +74,12 @@ pub struct Iroh {
impl Iroh { impl Iroh {
/// Notify the endpoint that the network has changed. /// Notify the endpoint that the network has changed.
pub(crate) async fn network_change(&self) { pub(crate) async fn network_change(&self) {
self.endpoint.network_change().await self.router.endpoint().network_change().await
} }
/// Closes the QUIC endpoint. /// Closes the QUIC endpoint.
pub(crate) async fn close(self) -> Result<()> { pub(crate) async fn close(self) -> Result<()> {
self.endpoint self.router.shutdown().await.context("Closing iroh failed")
.close(0u32.into(), b"")
.await
.context("Closing iroh endpoint failed")
} }
/// Join a topic and create the subscriber loop for it. /// Join a topic and create the subscriber loop for it.
@@ -121,7 +117,7 @@ impl Iroh {
// Inform iroh of potentially new node addresses // Inform iroh of potentially new node addresses
for node_addr in &peers { for node_addr in &peers {
if !node_addr.info.is_empty() { if !node_addr.info.is_empty() {
self.endpoint.add_node_addr(node_addr.clone())?; self.router.endpoint().add_node_addr(node_addr.clone())?;
} }
} }
@@ -148,7 +144,7 @@ impl Iroh {
pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec<NodeAddr>) -> Result<()> { pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec<NodeAddr>) -> Result<()> {
if self.iroh_channels.read().await.get(&topic).is_some() { if self.iroh_channels.read().await.get(&topic).is_some() {
for peer in &peers { for peer in &peers {
self.endpoint.add_node_addr(peer.clone())?; self.router.endpoint().add_node_addr(peer.clone())?;
} }
self.gossip.join_with_opts( self.gossip.join_with_opts(
@@ -198,7 +194,7 @@ impl Iroh {
/// Get the iroh [NodeAddr] without direct IP addresses. /// Get the iroh [NodeAddr] without direct IP addresses.
pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> { pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> {
let mut addr = self.endpoint.node_addr().await?; let mut addr = self.router.endpoint().node_addr().await?;
addr.info.direct_addresses = BTreeSet::new(); addr.info.direct_addresses = BTreeSet::new();
Ok(addr) Ok(addr)
} }
@@ -275,16 +271,19 @@ impl Context {
max_message_size: 128 * 1024, max_message_size: 128 * 1024,
..Default::default() ..Default::default()
}; };
let gossip = Gossip::from_endpoint(endpoint.clone(), gossip_config, &my_addr.info); let gossip = Arc::new(Gossip::from_endpoint(
endpoint.clone(),
gossip_config,
&my_addr.info,
));
// spawn endpoint loop that forwards incoming connections to the gossiper let router = iroh::protocol::Router::builder(endpoint)
let context = self.clone(); .accept(GOSSIP_ALPN, gossip.clone())
.spawn()
// Shuts down on deltachat shutdown .await?;
tokio::spawn(endpoint_loop(context, endpoint.clone(), gossip.clone()));
Ok(Iroh { Ok(Iroh {
endpoint, router,
gossip, gossip,
sequence_numbers: Mutex::new(HashMap::new()), sequence_numbers: Mutex::new(HashMap::new()),
iroh_channels: RwLock::new(HashMap::new()), iroh_channels: RwLock::new(HashMap::new()),
@@ -507,54 +506,13 @@ fn create_random_topic() -> TopicId {
pub(crate) async fn create_iroh_header(ctx: &Context, msg_id: MsgId) -> Result<Header> { pub(crate) async fn create_iroh_header(ctx: &Context, msg_id: MsgId) -> Result<Header> {
let topic = create_random_topic(); let topic = create_random_topic();
insert_topic_stub(ctx, msg_id, topic).await?; insert_topic_stub(ctx, msg_id, topic).await?;
let topic_string = iroh_base::base32::fmt(topic.as_bytes());
Ok(Header::new( Ok(Header::new(
HeaderDef::IrohGossipTopic.get_headername().to_string(), HeaderDef::IrohGossipTopic.get_headername().to_string(),
topic.to_string(), topic_string,
)) ))
} }
async fn endpoint_loop(context: Context, endpoint: Endpoint, gossip: Gossip) {
while let Some(conn) = endpoint.accept().await {
let conn = match conn.accept() {
Ok(conn) => conn,
Err(err) => {
warn!(context, "Failed to accept iroh connection: {err:#}.");
continue;
}
};
info!(context, "IROH_REALTIME: accepting iroh connection");
let gossip = gossip.clone();
let context = context.clone();
tokio::spawn(async move {
if let Err(err) = handle_connection(&context, conn, gossip).await {
warn!(context, "IROH_REALTIME: iroh connection error: {err}");
}
});
}
}
async fn handle_connection(
context: &Context,
mut conn: iroh_net::endpoint::Connecting,
gossip: Gossip,
) -> anyhow::Result<()> {
let alpn = conn.alpn().await?;
let conn = conn.await?;
let peer_id = iroh_net::endpoint::get_remote_node_id(&conn)?;
match alpn.as_slice() {
GOSSIP_ALPN => gossip
.handle_connection(conn)
.await
.context(format!("Gossip connection to {peer_id} failed"))?,
_ => warn!(
context,
"Ignoring connection from {peer_id}: unsupported ALPN protocol"
),
}
Ok(())
}
async fn subscribe_loop( async fn subscribe_loop(
context: &Context, context: &Context,
mut stream: iroh_gossip::net::GossipReceiver, mut stream: iroh_gossip::net::GossipReceiver,
@@ -971,6 +929,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_parallel_connect() { async fn test_parallel_connect() {
eprintln!("START-----");
let mut tcm = TestContextManager::new(); let mut tcm = TestContextManager::new();
let alice = &mut tcm.alice().await; let alice = &mut tcm.alice().await;
let bob = &mut tcm.bob().await; let bob = &mut tcm.bob().await;

View File

@@ -112,7 +112,7 @@ pub enum Qr {
/// Provides a backup that can be retrieved using iroh-net based backup transfer protocol. /// Provides a backup that can be retrieved using iroh-net based backup transfer protocol.
Backup2 { Backup2 {
/// Iroh node address. /// Iroh node address.
node_addr: iroh_net::NodeAddr, node_addr: iroh::NodeAddr,
/// Authentication token. /// Authentication token.
auth_token: String, auth_token: String,
@@ -644,7 +644,7 @@ fn decode_backup2(qr: &str) -> Result<Qr> {
.split_once('&') .split_once('&')
.context("Backup QR code has no separator")?; .context("Backup QR code has no separator")?;
let auth_token = auth_token.to_string(); let auth_token = auth_token.to_string();
let node_addr = serde_json::from_str::<iroh_net::NodeAddr>(node_addr) let node_addr = serde_json::from_str::<iroh::NodeAddr>(node_addr)
.context("Invalid node addr in backup QR code")?; .context("Invalid node addr in backup QR code")?;
Ok(Qr::Backup2 { Ok(Qr::Backup2 {

View File

@@ -1,7 +1,6 @@
//! Internet Message Format reception pipeline. //! Internet Message Format reception pipeline.
use std::collections::HashSet; use std::collections::HashSet;
use std::str::FromStr;
use anyhow::{Context as _, Result}; use anyhow::{Context as _, Result};
use deltachat_contact_tools::{addr_cmp, may_be_valid_addr, sanitize_single_line, ContactAddress}; use deltachat_contact_tools::{addr_cmp, may_be_valid_addr, sanitize_single_line, ContactAddress};
@@ -1652,7 +1651,10 @@ RETURNING id
// check if any part contains a webxdc topic id // check if any part contains a webxdc topic id
if part.typ == Viewtype::Webxdc { if part.typ == Viewtype::Webxdc {
if let Some(topic) = mime_parser.get_header(HeaderDef::IrohGossipTopic) { if let Some(topic) = mime_parser.get_header(HeaderDef::IrohGossipTopic) {
let topic = TopicId::from_str(topic).context("wrong gossip topic header")?; // default encoding of topic ids is `hex`.
let topic_raw: [u8; 32] =
iroh_base::base32::parse_array(topic).context("wrong gossip topic header")?;
let topic = TopicId::from_bytes(topic_raw);
insert_topic_stub(context, *msg_id, topic).await?; insert_topic_stub(context, *msg_id, topic).await?;
} else { } else {
warn!(context, "webxdc doesn't have a gossip topic") warn!(context, "webxdc doesn't have a gossip topic")