mirror of
https://github.com/chatmail/core.git
synced 2026-05-19 14:56:33 +03:00
feat: ephemeral peer channels (#5346)
Co-authored-by: link2xt <link2xt@testrun.org> Co-authored-by: iequidoo <117991069+iequidoo@users.noreply.github.com>
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -34,7 +34,6 @@ deltachat-ffi/xml
|
|||||||
coverage/
|
coverage/
|
||||||
.DS_Store
|
.DS_Store
|
||||||
.vscode
|
.vscode
|
||||||
.vscode/launch.json
|
|
||||||
python/accounts.txt
|
python/accounts.txt
|
||||||
python/all-testaccounts.txt
|
python/all-testaccounts.txt
|
||||||
tmp/
|
tmp/
|
||||||
|
|||||||
1905
Cargo.lock
generated
1905
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -60,7 +60,10 @@ hex = "0.4.0"
|
|||||||
hickory-resolver = "0.24"
|
hickory-resolver = "0.24"
|
||||||
humansize = "2"
|
humansize = "2"
|
||||||
image = { version = "0.25.1", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] }
|
image = { version = "0.25.1", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] }
|
||||||
iroh = { version = "0.4.2", default-features = false }
|
iroh_old = { version = "0.4.2", default-features = false, package = "iroh"}
|
||||||
|
iroh-net = "0.16.2"
|
||||||
|
iroh-gossip = { version = "0.16.2", features = ["net"] }
|
||||||
|
quinn = "0.10.0"
|
||||||
kamadak-exif = "0.5.3"
|
kamadak-exif = "0.5.3"
|
||||||
lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" }
|
lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" }
|
||||||
libc = "0.2"
|
libc = "0.2"
|
||||||
@@ -79,7 +82,7 @@ quick-xml = "0.31"
|
|||||||
quoted_printable = "0.5"
|
quoted_printable = "0.5"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
regex = { workspace = true }
|
regex = { workspace = true }
|
||||||
reqwest = { version = "0.12.2", features = ["json"] }
|
reqwest = { version = "0.11.27", features = ["json"] }
|
||||||
rusqlite = { workspace = true, features = ["sqlcipher"] }
|
rusqlite = { workspace = true, features = ["sqlcipher"] }
|
||||||
rust-hsluv = "0.1"
|
rust-hsluv = "0.1"
|
||||||
sanitize-filename = "0.5"
|
sanitize-filename = "0.5"
|
||||||
@@ -178,4 +181,4 @@ vendored = [
|
|||||||
"async-native-tls/vendored",
|
"async-native-tls/vendored",
|
||||||
"rusqlite/bundled-sqlcipher-vendored-openssl",
|
"rusqlite/bundled-sqlcipher-vendored-openssl",
|
||||||
"reqwest/native-tls-vendored"
|
"reqwest/native-tls-vendored"
|
||||||
]
|
]
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#![recursion_limit = "256"]
|
||||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
||||||
use deltachat::contact::Contact;
|
use deltachat::contact::Contact;
|
||||||
use deltachat::context::Context;
|
use deltachat::context::Context;
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#![recursion_limit = "256"]
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#![recursion_limit = "256"]
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
|
||||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#![recursion_limit = "256"]
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
|
||||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#![recursion_limit = "256"]
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#![recursion_limit = "256"]
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
|
||||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#![recursion_limit = "256"]
|
||||||
use criterion::{criterion_group, criterion_main, Criterion};
|
use criterion::{criterion_group, criterion_main, Criterion};
|
||||||
|
|
||||||
use deltachat::context::Context;
|
use deltachat::context::Context;
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#![recursion_limit = "256"]
|
||||||
#![warn(unused, clippy::all)]
|
#![warn(unused, clippy::all)]
|
||||||
#![allow(
|
#![allow(
|
||||||
non_camel_case_types,
|
non_camel_case_types,
|
||||||
@@ -561,6 +562,7 @@ pub unsafe extern "C" fn dc_event_get_id(event: *mut dc_event_t) -> libc::c_int
|
|||||||
EventType::ConfigSynced { .. } => 2111,
|
EventType::ConfigSynced { .. } => 2111,
|
||||||
EventType::WebxdcStatusUpdate { .. } => 2120,
|
EventType::WebxdcStatusUpdate { .. } => 2120,
|
||||||
EventType::WebxdcInstanceDeleted { .. } => 2121,
|
EventType::WebxdcInstanceDeleted { .. } => 2121,
|
||||||
|
EventType::WebxdcRealtimeData { .. } => 2150,
|
||||||
EventType::AccountsBackgroundFetchDone => 2200,
|
EventType::AccountsBackgroundFetchDone => 2200,
|
||||||
EventType::ChatlistChanged => 2300,
|
EventType::ChatlistChanged => 2300,
|
||||||
EventType::ChatlistItemChanged { .. } => 2301,
|
EventType::ChatlistItemChanged { .. } => 2301,
|
||||||
@@ -616,8 +618,9 @@ pub unsafe extern "C" fn dc_event_get_data1_int(event: *mut dc_event_t) -> libc:
|
|||||||
| EventType::SecurejoinJoinerProgress { contact_id, .. } => {
|
| EventType::SecurejoinJoinerProgress { contact_id, .. } => {
|
||||||
contact_id.to_u32() as libc::c_int
|
contact_id.to_u32() as libc::c_int
|
||||||
}
|
}
|
||||||
EventType::WebxdcStatusUpdate { msg_id, .. } => msg_id.to_u32() as libc::c_int,
|
EventType::WebxdcRealtimeData { msg_id, .. }
|
||||||
EventType::WebxdcInstanceDeleted { msg_id, .. } => msg_id.to_u32() as libc::c_int,
|
| EventType::WebxdcStatusUpdate { msg_id, .. }
|
||||||
|
| EventType::WebxdcInstanceDeleted { msg_id, .. } => msg_id.to_u32() as libc::c_int,
|
||||||
EventType::ChatlistItemChanged { chat_id } => {
|
EventType::ChatlistItemChanged { chat_id } => {
|
||||||
chat_id.unwrap_or_default().to_u32() as libc::c_int
|
chat_id.unwrap_or_default().to_u32() as libc::c_int
|
||||||
}
|
}
|
||||||
@@ -655,6 +658,7 @@ pub unsafe extern "C" fn dc_event_get_data2_int(event: *mut dc_event_t) -> libc:
|
|||||||
| EventType::ConnectivityChanged
|
| EventType::ConnectivityChanged
|
||||||
| EventType::WebxdcInstanceDeleted { .. }
|
| EventType::WebxdcInstanceDeleted { .. }
|
||||||
| EventType::IncomingMsgBunch { .. }
|
| EventType::IncomingMsgBunch { .. }
|
||||||
|
| EventType::WebxdcRealtimeData { .. }
|
||||||
| EventType::SelfavatarChanged
|
| EventType::SelfavatarChanged
|
||||||
| EventType::AccountsBackgroundFetchDone
|
| EventType::AccountsBackgroundFetchDone
|
||||||
| EventType::ChatlistChanged
|
| EventType::ChatlistChanged
|
||||||
@@ -721,6 +725,7 @@ pub unsafe extern "C" fn dc_event_get_data2_str(event: *mut dc_event_t) -> *mut
|
|||||||
| EventType::SelfavatarChanged
|
| EventType::SelfavatarChanged
|
||||||
| EventType::WebxdcStatusUpdate { .. }
|
| EventType::WebxdcStatusUpdate { .. }
|
||||||
| EventType::WebxdcInstanceDeleted { .. }
|
| EventType::WebxdcInstanceDeleted { .. }
|
||||||
|
| EventType::WebxdcRealtimeData { .. }
|
||||||
| EventType::AccountsBackgroundFetchDone
|
| EventType::AccountsBackgroundFetchDone
|
||||||
| EventType::ChatEphemeralTimerModified { .. }
|
| EventType::ChatEphemeralTimerModified { .. }
|
||||||
| EventType::IncomingMsgBunch { .. }
|
| EventType::IncomingMsgBunch { .. }
|
||||||
|
|||||||
@@ -18,12 +18,14 @@ use deltachat::constants::DC_MSG_ID_DAYMARKER;
|
|||||||
use deltachat::contact::{may_be_valid_addr, Contact, ContactId, Origin};
|
use deltachat::contact::{may_be_valid_addr, Contact, ContactId, Origin};
|
||||||
use deltachat::context::get_info;
|
use deltachat::context::get_info;
|
||||||
use deltachat::ephemeral::Timer;
|
use deltachat::ephemeral::Timer;
|
||||||
use deltachat::imex;
|
|
||||||
use deltachat::location;
|
use deltachat::location;
|
||||||
use deltachat::message::get_msg_read_receipts;
|
use deltachat::message::get_msg_read_receipts;
|
||||||
use deltachat::message::{
|
use deltachat::message::{
|
||||||
self, delete_msgs, markseen_msgs, Message, MessageState, MsgId, Viewtype,
|
self, delete_msgs, markseen_msgs, Message, MessageState, MsgId, Viewtype,
|
||||||
};
|
};
|
||||||
|
use deltachat::peer_channels::{
|
||||||
|
leave_webxdc_realtime, send_webxdc_realtime_advertisement, send_webxdc_realtime_data,
|
||||||
|
};
|
||||||
use deltachat::provider::get_provider_info;
|
use deltachat::provider::get_provider_info;
|
||||||
use deltachat::qr::{self, Qr};
|
use deltachat::qr::{self, Qr};
|
||||||
use deltachat::qr_code_generator::{generate_backup_qr, get_securejoin_qr_svg};
|
use deltachat::qr_code_generator::{generate_backup_qr, get_securejoin_qr_svg};
|
||||||
@@ -32,6 +34,7 @@ use deltachat::securejoin;
|
|||||||
use deltachat::stock_str::StockMessage;
|
use deltachat::stock_str::StockMessage;
|
||||||
use deltachat::webxdc::StatusUpdateSerial;
|
use deltachat::webxdc::StatusUpdateSerial;
|
||||||
use deltachat::EventEmitter;
|
use deltachat::EventEmitter;
|
||||||
|
use deltachat::{imex, info};
|
||||||
use sanitize_filename::is_sanitized;
|
use sanitize_filename::is_sanitized;
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tokio::sync::{watch, Mutex, RwLock};
|
use tokio::sync::{watch, Mutex, RwLock};
|
||||||
@@ -1763,6 +1766,37 @@ impl CommandApi {
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn send_webxdc_realtime_data(
|
||||||
|
&self,
|
||||||
|
account_id: u32,
|
||||||
|
instance_msg_id: u32,
|
||||||
|
data: Vec<u8>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let ctx = self.get_context(account_id).await?;
|
||||||
|
send_webxdc_realtime_data(&ctx, MsgId::new(instance_msg_id), data).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_webxdc_realtime_advertisement(
|
||||||
|
&self,
|
||||||
|
account_id: u32,
|
||||||
|
instance_msg_id: u32,
|
||||||
|
) -> Result<()> {
|
||||||
|
let ctx = self.get_context(account_id).await?;
|
||||||
|
let fut = send_webxdc_realtime_advertisement(&ctx, MsgId::new(instance_msg_id)).await?;
|
||||||
|
if let Some(fut) = fut {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
fut.await.ok();
|
||||||
|
info!(ctx, "send_webxdc_realtime_advertisement done")
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn leave_webxdc_realtime(&self, account_id: u32, instance_message_id: u32) -> Result<()> {
|
||||||
|
let ctx = self.get_context(account_id).await?;
|
||||||
|
leave_webxdc_realtime(&ctx, MsgId::new(instance_message_id)).await
|
||||||
|
}
|
||||||
|
|
||||||
async fn get_webxdc_status_updates(
|
async fn get_webxdc_status_updates(
|
||||||
&self,
|
&self,
|
||||||
account_id: u32,
|
account_id: u32,
|
||||||
|
|||||||
@@ -240,6 +240,10 @@ pub enum EventType {
|
|||||||
status_update_serial: u32,
|
status_update_serial: u32,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Data received over an ephemeral peer channel.
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
WebxdcRealtimeData { msg_id: u32, data: Vec<u8> },
|
||||||
|
|
||||||
/// Inform that a message containing a webxdc instance has been deleted
|
/// Inform that a message containing a webxdc instance has been deleted
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
WebxdcInstanceDeleted { msg_id: u32 },
|
WebxdcInstanceDeleted { msg_id: u32 },
|
||||||
@@ -362,6 +366,10 @@ impl From<CoreEventType> for EventType {
|
|||||||
msg_id: msg_id.to_u32(),
|
msg_id: msg_id.to_u32(),
|
||||||
status_update_serial: status_update_serial.to_u32(),
|
status_update_serial: status_update_serial.to_u32(),
|
||||||
},
|
},
|
||||||
|
CoreEventType::WebxdcRealtimeData { msg_id, data } => WebxdcRealtimeData {
|
||||||
|
msg_id: msg_id.to_u32(),
|
||||||
|
data,
|
||||||
|
},
|
||||||
CoreEventType::WebxdcInstanceDeleted { msg_id } => WebxdcInstanceDeleted {
|
CoreEventType::WebxdcInstanceDeleted { msg_id } => WebxdcInstanceDeleted {
|
||||||
msg_id: msg_id.to_u32(),
|
msg_id: msg_id.to_u32(),
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -379,6 +379,9 @@ pub enum SystemMessageType {
|
|||||||
|
|
||||||
/// Webxdc info added with `info` set in `send_webxdc_status_update()`.
|
/// Webxdc info added with `info` set in `send_webxdc_status_update()`.
|
||||||
WebxdcInfoMessage,
|
WebxdcInfoMessage,
|
||||||
|
|
||||||
|
/// This message contains a users iroh node address.
|
||||||
|
IrohNodeAddr,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<deltachat::mimeparser::SystemMessage> for SystemMessageType {
|
impl From<deltachat::mimeparser::SystemMessage> for SystemMessageType {
|
||||||
@@ -401,6 +404,7 @@ impl From<deltachat::mimeparser::SystemMessage> for SystemMessageType {
|
|||||||
SystemMessage::WebxdcStatusUpdate => SystemMessageType::WebxdcStatusUpdate,
|
SystemMessage::WebxdcStatusUpdate => SystemMessageType::WebxdcStatusUpdate,
|
||||||
SystemMessage::WebxdcInfoMessage => SystemMessageType::WebxdcInfoMessage,
|
SystemMessage::WebxdcInfoMessage => SystemMessageType::WebxdcInfoMessage,
|
||||||
SystemMessage::InvalidUnencryptedMail => SystemMessageType::InvalidUnencryptedMail,
|
SystemMessage::InvalidUnencryptedMail => SystemMessageType::InvalidUnencryptedMail,
|
||||||
|
SystemMessage::IrohNodeAddr => SystemMessageType::IrohNodeAddr,
|
||||||
SystemMessage::SecurejoinWait => SystemMessageType::SecurejoinWait,
|
SystemMessage::SecurejoinWait => SystemMessageType::SecurejoinWait,
|
||||||
SystemMessage::SecurejoinWaitTimeout => SystemMessageType::SecurejoinWaitTimeout,
|
SystemMessage::SecurejoinWaitTimeout => SystemMessageType::SecurejoinWaitTimeout,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#![recursion_limit = "256"]
|
||||||
pub mod api;
|
pub mod api;
|
||||||
pub use yerpc;
|
pub use yerpc;
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#![recursion_limit = "256"]
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#![recursion_limit = "256"]
|
||||||
//! This is a CLI program and a little testing frame. This file must not be
|
//! This is a CLI program and a little testing frame. This file must not be
|
||||||
//! included when using Delta Chat Core as a library.
|
//! included when using Delta Chat Core as a library.
|
||||||
//!
|
//!
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#![recursion_limit = "256"]
|
||||||
//! Delta Chat core RPC server.
|
//! Delta Chat core RPC server.
|
||||||
//!
|
//!
|
||||||
//! It speaks JSON Lines over stdio.
|
//! It speaks JSON Lines over stdio.
|
||||||
|
|||||||
29
deny.toml
29
deny.toml
@@ -23,6 +23,9 @@ ignore = [
|
|||||||
# when upgrading.
|
# when upgrading.
|
||||||
# Please keep this list alphabetically sorted.
|
# Please keep this list alphabetically sorted.
|
||||||
skip = [
|
skip = [
|
||||||
|
{ name = "asn1-rs-derive", version = "0.4.0" },
|
||||||
|
{ name = "asn1-rs-impl", version = "0.1.0" },
|
||||||
|
{ name = "asn1-rs", version = "0.5.2" },
|
||||||
{ name = "async-channel", version = "1.9.0" },
|
{ name = "async-channel", version = "1.9.0" },
|
||||||
{ name = "base16ct", version = "0.1.1" },
|
{ name = "base16ct", version = "0.1.1" },
|
||||||
{ name = "base64", version = "<0.21" },
|
{ name = "base64", version = "<0.21" },
|
||||||
@@ -34,20 +37,38 @@ skip = [
|
|||||||
{ name = "darling_core", version = "<0.14" },
|
{ name = "darling_core", version = "<0.14" },
|
||||||
{ name = "darling_macro", version = "<0.14" },
|
{ name = "darling_macro", version = "<0.14" },
|
||||||
{ name = "darling", version = "<0.14" },
|
{ name = "darling", version = "<0.14" },
|
||||||
|
{ name = "der_derive", version = "0.6.1" },
|
||||||
|
{ name = "derive_more", version = "0.99.17" },
|
||||||
|
{ name = "der-parser", version = "8.2.0" },
|
||||||
{ name = "der", version = "0.6.1" },
|
{ name = "der", version = "0.6.1" },
|
||||||
{ name = "digest", version = "<0.10" },
|
{ name = "digest", version = "<0.10" },
|
||||||
|
{ name = "dlopen2", version = "0.4.1" },
|
||||||
{ name = "ed25519-dalek", version = "1.0.1" },
|
{ name = "ed25519-dalek", version = "1.0.1" },
|
||||||
{ name = "ed25519", version = "1.5.3" },
|
{ name = "ed25519", version = "1.5.3" },
|
||||||
{ name = "env_logger", version = "0.10.2" },
|
{ name = "env_logger", version = "0.10.2" },
|
||||||
{ name = "event-listener", version = "2.5.3" },
|
{ name = "event-listener", version = "2.5.3" },
|
||||||
|
{ name = "event-listener", version = "4.0.3" },
|
||||||
|
{ name = "fastrand", version = "1.9.0" },
|
||||||
|
{ name = "futures-lite", version = "1.13.0" },
|
||||||
{ name = "getrandom", version = "<0.2" },
|
{ name = "getrandom", version = "<0.2" },
|
||||||
|
{ name = "http-body", version = "0.4.6" },
|
||||||
|
{ name = "http", version = "0.2.12" },
|
||||||
|
{ name = "hyper", version = "0.14.28" },
|
||||||
{ name = "idna", version = "0.4.0" },
|
{ name = "idna", version = "0.4.0" },
|
||||||
|
{ name = "netlink-packet-core", version = "0.5.0" },
|
||||||
|
{ name = "netlink-packet-route", version = "0.15.0" },
|
||||||
|
{ name = "nix", version = "0.26.4" },
|
||||||
|
{ name = "oid-registry", version = "0.6.1" },
|
||||||
{ name = "pem-rfc7468", version = "0.6.0" },
|
{ name = "pem-rfc7468", version = "0.6.0" },
|
||||||
|
{ name = "pem", version = "1.1.1" },
|
||||||
{ name = "pkcs8", version = "0.9.0" },
|
{ name = "pkcs8", version = "0.9.0" },
|
||||||
|
{ name = "proc-macro-error-attr", version = "0.4.12" },
|
||||||
|
{ name = "proc-macro-error", version = "0.4.12" },
|
||||||
{ name = "quick-error", version = "<2.0" },
|
{ name = "quick-error", version = "<2.0" },
|
||||||
{ name = "rand_chacha", version = "<0.3" },
|
{ name = "rand_chacha", version = "<0.3" },
|
||||||
{ name = "rand_core", version = "<0.6" },
|
{ name = "rand_core", version = "<0.6" },
|
||||||
{ name = "rand", version = "<0.8" },
|
{ name = "rand", version = "<0.8" },
|
||||||
|
{ name = "rcgen", version = "<0.12.1" },
|
||||||
{ name = "redox_syscall", version = "0.3.5" },
|
{ name = "redox_syscall", version = "0.3.5" },
|
||||||
{ name = "regex-automata", version = "0.1.10" },
|
{ name = "regex-automata", version = "0.1.10" },
|
||||||
{ name = "regex-syntax", version = "0.6.29" },
|
{ name = "regex-syntax", version = "0.6.29" },
|
||||||
@@ -57,23 +78,31 @@ skip = [
|
|||||||
{ name = "signature", version = "1.6.4" },
|
{ name = "signature", version = "1.6.4" },
|
||||||
{ name = "spin", version = "<0.9.6" },
|
{ name = "spin", version = "<0.9.6" },
|
||||||
{ name = "spki", version = "0.6.0" },
|
{ name = "spki", version = "0.6.0" },
|
||||||
|
{ name = "ssh-encoding", version = "0.1.0" },
|
||||||
|
{ name = "ssh-key", version = "0.5.1" },
|
||||||
{ name = "sync_wrapper", version = "0.1.2" },
|
{ name = "sync_wrapper", version = "0.1.2" },
|
||||||
|
{ name = "synstructure", version = "0.12.6" },
|
||||||
{ name = "syn", version = "1.0.109" },
|
{ name = "syn", version = "1.0.109" },
|
||||||
|
{ name = "system-configuration-sys", version = "0.5.0" },
|
||||||
|
{ name = "system-configuration", version = "0.5.1" },
|
||||||
{ name = "time", version = "<0.3" },
|
{ name = "time", version = "<0.3" },
|
||||||
{ name = "toml_edit", version = "0.21.1" },
|
{ name = "toml_edit", version = "0.21.1" },
|
||||||
{ name = "untrusted", version = "0.7.1" },
|
{ name = "untrusted", version = "0.7.1" },
|
||||||
{ name = "wasi", version = "<0.11" },
|
{ name = "wasi", version = "<0.11" },
|
||||||
{ name = "windows_aarch64_gnullvm", version = "<0.52" },
|
{ name = "windows_aarch64_gnullvm", version = "<0.52" },
|
||||||
{ name = "windows_aarch64_msvc", version = "<0.52" },
|
{ name = "windows_aarch64_msvc", version = "<0.52" },
|
||||||
|
{ name = "windows-core", version = "<0.54.0" },
|
||||||
{ name = "windows_i686_gnu", version = "<0.52" },
|
{ name = "windows_i686_gnu", version = "<0.52" },
|
||||||
{ name = "windows_i686_msvc", version = "<0.52" },
|
{ name = "windows_i686_msvc", version = "<0.52" },
|
||||||
{ name = "windows-sys", version = "<0.52" },
|
{ name = "windows-sys", version = "<0.52" },
|
||||||
{ name = "windows-targets", version = "<0.52" },
|
{ name = "windows-targets", version = "<0.52" },
|
||||||
{ name = "windows", version = "0.32.0" },
|
{ name = "windows", version = "0.32.0" },
|
||||||
|
{ name = "windows", version = "<0.54.0" },
|
||||||
{ name = "windows_x86_64_gnullvm", version = "<0.52" },
|
{ name = "windows_x86_64_gnullvm", version = "<0.52" },
|
||||||
{ name = "windows_x86_64_gnu", version = "<0.52" },
|
{ name = "windows_x86_64_gnu", version = "<0.52" },
|
||||||
{ name = "windows_x86_64_msvc", version = "<0.52" },
|
{ name = "windows_x86_64_msvc", version = "<0.52" },
|
||||||
{ name = "winnow", version = "0.5.40" },
|
{ name = "winnow", version = "0.5.40" },
|
||||||
|
{ name = "x509-parser", version = "<0.16.0" },
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use base64::Engine as _;
|
|||||||
use deltachat_contact_tools::addr_cmp;
|
use deltachat_contact_tools::addr_cmp;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use strum::{EnumProperty, IntoEnumIterator};
|
use strum::{EnumProperty, IntoEnumIterator};
|
||||||
use strum_macros::{AsRefStr, Display, EnumIter, EnumProperty, EnumString};
|
use strum_macros::{AsRefStr, Display, EnumIter, EnumString};
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
|
|
||||||
use crate::blob::BlobObject;
|
use crate::blob::BlobObject;
|
||||||
@@ -362,6 +362,9 @@ pub enum Config {
|
|||||||
|
|
||||||
/// MsgId of webxdc map integration.
|
/// MsgId of webxdc map integration.
|
||||||
WebxdcIntegration,
|
WebxdcIntegration,
|
||||||
|
|
||||||
|
/// Iroh secret key.
|
||||||
|
IrohSecretKey,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ use anyhow::{bail, ensure, Context as _, Result};
|
|||||||
use async_channel::{self as channel, Receiver, Sender};
|
use async_channel::{self as channel, Receiver, Sender};
|
||||||
use pgp::SignedPublicKey;
|
use pgp::SignedPublicKey;
|
||||||
use ratelimit::Ratelimit;
|
use ratelimit::Ratelimit;
|
||||||
use tokio::sync::{Mutex, Notify, RwLock};
|
use tokio::sync::{Mutex, Notify, OnceCell, RwLock};
|
||||||
|
|
||||||
use crate::aheader::EncryptPreference;
|
use crate::aheader::EncryptPreference;
|
||||||
use crate::chat::{get_chat_cnt, ChatId, ProtectionStatus};
|
use crate::chat::{get_chat_cnt, ChatId, ProtectionStatus};
|
||||||
@@ -30,6 +30,7 @@ use crate::key::{load_self_public_key, load_self_secret_key, DcKey as _};
|
|||||||
use crate::login_param::LoginParam;
|
use crate::login_param::LoginParam;
|
||||||
use crate::message::{self, Message, MessageState, MsgId, Viewtype};
|
use crate::message::{self, Message, MessageState, MsgId, Viewtype};
|
||||||
use crate::param::{Param, Params};
|
use crate::param::{Param, Params};
|
||||||
|
use crate::peer_channels::Iroh;
|
||||||
use crate::peerstate::Peerstate;
|
use crate::peerstate::Peerstate;
|
||||||
use crate::push::PushSubscriber;
|
use crate::push::PushSubscriber;
|
||||||
use crate::quota::QuotaInfo;
|
use crate::quota::QuotaInfo;
|
||||||
@@ -288,6 +289,9 @@ pub struct InnerContext {
|
|||||||
|
|
||||||
/// True if account has subscribed to push notifications via IMAP.
|
/// True if account has subscribed to push notifications via IMAP.
|
||||||
pub(crate) push_subscribed: AtomicBool,
|
pub(crate) push_subscribed: AtomicBool,
|
||||||
|
|
||||||
|
/// Iroh for realtime peer channels.
|
||||||
|
pub(crate) iroh: OnceCell<Iroh>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The state of ongoing process.
|
/// The state of ongoing process.
|
||||||
@@ -445,6 +449,7 @@ impl Context {
|
|||||||
debug_logging: std::sync::RwLock::new(None),
|
debug_logging: std::sync::RwLock::new(None),
|
||||||
push_subscriber,
|
push_subscriber,
|
||||||
push_subscribed: AtomicBool::new(false),
|
push_subscribed: AtomicBool::new(false),
|
||||||
|
iroh: OnceCell::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let ctx = Context {
|
let ctx = Context {
|
||||||
@@ -482,6 +487,9 @@ impl Context {
|
|||||||
|
|
||||||
/// Indicate that the network likely has come back.
|
/// Indicate that the network likely has come back.
|
||||||
pub async fn maybe_network(&self) {
|
pub async fn maybe_network(&self) {
|
||||||
|
if let Some(iroh) = self.iroh.get() {
|
||||||
|
iroh.network_change().await;
|
||||||
|
}
|
||||||
self.scheduler.maybe_network().await;
|
self.scheduler.maybe_network().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1648,6 +1656,7 @@ mod tests {
|
|||||||
"socks5_password",
|
"socks5_password",
|
||||||
"key_id",
|
"key_id",
|
||||||
"webxdc_integration",
|
"webxdc_integration",
|
||||||
|
"iroh_secret_key",
|
||||||
];
|
];
|
||||||
let t = TestContext::new().await;
|
let t = TestContext::new().await;
|
||||||
let info = t.get_info().await.unwrap();
|
let info = t.get_info().await.unwrap();
|
||||||
|
|||||||
@@ -279,6 +279,15 @@ pub enum EventType {
|
|||||||
status_update_serial: StatusUpdateSerial,
|
status_update_serial: StatusUpdateSerial,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Data received over an ephemeral peer channel.
|
||||||
|
WebxdcRealtimeData {
|
||||||
|
/// Message ID.
|
||||||
|
msg_id: MsgId,
|
||||||
|
|
||||||
|
/// Realtime data.
|
||||||
|
data: Vec<u8>,
|
||||||
|
},
|
||||||
|
|
||||||
/// Inform that a message containing a webxdc instance has been deleted.
|
/// Inform that a message containing a webxdc instance has been deleted.
|
||||||
WebxdcInstanceDeleted {
|
WebxdcInstanceDeleted {
|
||||||
/// ID of the deleted message.
|
/// ID of the deleted message.
|
||||||
|
|||||||
@@ -93,6 +93,12 @@ pub enum HeaderDef {
|
|||||||
/// See <https://datatracker.ietf.org/doc/html/rfc8601>
|
/// See <https://datatracker.ietf.org/doc/html/rfc8601>
|
||||||
AuthenticationResults,
|
AuthenticationResults,
|
||||||
|
|
||||||
|
/// Node address from iroh where direct addresses have been removed.
|
||||||
|
IrohNodeAddr,
|
||||||
|
|
||||||
|
/// Advertised gossip topic for one webxdc.
|
||||||
|
IrohGossipTopic,
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
TestHeader,
|
TestHeader,
|
||||||
}
|
}
|
||||||
|
|||||||
26
src/imap.rs
26
src/imap.rs
@@ -22,6 +22,7 @@ use futures_lite::FutureExt;
|
|||||||
use num_traits::FromPrimitive;
|
use num_traits::FromPrimitive;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use ratelimit::Ratelimit;
|
use ratelimit::Ratelimit;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
use crate::chat::{self, ChatId, ChatIdBlocked};
|
use crate::chat::{self, ChatId, ChatIdBlocked};
|
||||||
use crate::chatlist_events;
|
use crate::chatlist_events;
|
||||||
@@ -111,6 +112,8 @@ pub(crate) struct ServerMetadata {
|
|||||||
/// IMAP METADATA `/shared/admin` as defined in
|
/// IMAP METADATA `/shared/admin` as defined in
|
||||||
/// <https://www.rfc-editor.org/rfc/rfc5464#section-6.2.2>.
|
/// <https://www.rfc-editor.org/rfc/rfc5464#section-6.2.2>.
|
||||||
pub admin: Option<String>,
|
pub admin: Option<String>,
|
||||||
|
|
||||||
|
pub iroh_relay: Option<Url>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl async_imap::Authenticator for OAuth2 {
|
impl async_imap::Authenticator for OAuth2 {
|
||||||
@@ -1449,11 +1452,16 @@ impl Session {
|
|||||||
|
|
||||||
let mut comment = None;
|
let mut comment = None;
|
||||||
let mut admin = None;
|
let mut admin = None;
|
||||||
|
let mut iroh_relay = None;
|
||||||
|
|
||||||
let mailbox = "";
|
let mailbox = "";
|
||||||
let options = "";
|
let options = "";
|
||||||
let metadata = self
|
let metadata = self
|
||||||
.get_metadata(mailbox, options, "(/shared/comment /shared/admin)")
|
.get_metadata(
|
||||||
|
mailbox,
|
||||||
|
options,
|
||||||
|
"(/shared/comment /shared/admin /shared/vendor/deltachat/irohrelay)",
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
for m in metadata {
|
for m in metadata {
|
||||||
match m.entry.as_ref() {
|
match m.entry.as_ref() {
|
||||||
@@ -1463,10 +1471,24 @@ impl Session {
|
|||||||
"/shared/admin" => {
|
"/shared/admin" => {
|
||||||
admin = m.value;
|
admin = m.value;
|
||||||
}
|
}
|
||||||
|
"/shared/vendor/deltachat/irohrelay" => {
|
||||||
|
if let Some(url) = m.value.as_deref().and_then(|s| Url::parse(s).ok()) {
|
||||||
|
iroh_relay = Some(url);
|
||||||
|
} else {
|
||||||
|
warn!(
|
||||||
|
context,
|
||||||
|
"Got invalid URL from iroh relay metadata: {:?}.", m.value
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*lock = Some(ServerMetadata { comment, admin });
|
*lock = Some(ServerMetadata {
|
||||||
|
comment,
|
||||||
|
admin,
|
||||||
|
iroh_relay,
|
||||||
|
});
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -38,6 +38,7 @@ use iroh::progress::ProgressEmitter;
|
|||||||
use iroh::protocol::AuthToken;
|
use iroh::protocol::AuthToken;
|
||||||
use iroh::provider::{DataSource, Event, Provider, Ticket};
|
use iroh::provider::{DataSource, Event, Provider, Ticket};
|
||||||
use iroh::Hash;
|
use iroh::Hash;
|
||||||
|
use iroh_old as iroh;
|
||||||
use tokio::fs::{self, File};
|
use tokio::fs::{self, File};
|
||||||
use tokio::io::{self, AsyncWriteExt, BufWriter};
|
use tokio::io::{self, AsyncWriteExt, BufWriter};
|
||||||
use tokio::sync::broadcast::error::RecvError;
|
use tokio::sync::broadcast::error::RecvError;
|
||||||
|
|||||||
@@ -106,6 +106,7 @@ pub mod receive_imf;
|
|||||||
pub mod tools;
|
pub mod tools;
|
||||||
|
|
||||||
pub mod accounts;
|
pub mod accounts;
|
||||||
|
pub mod peer_channels;
|
||||||
pub mod reaction;
|
pub mod reaction;
|
||||||
|
|
||||||
/// If set IMAP/incoming and SMTP/outgoing MIME messages will be printed.
|
/// If set IMAP/incoming and SMTP/outgoing MIME messages will be printed.
|
||||||
|
|||||||
@@ -17,11 +17,12 @@ use crate::contact::Contact;
|
|||||||
use crate::context::Context;
|
use crate::context::Context;
|
||||||
use crate::e2ee::EncryptHelper;
|
use crate::e2ee::EncryptHelper;
|
||||||
use crate::ephemeral::Timer as EphemeralTimer;
|
use crate::ephemeral::Timer as EphemeralTimer;
|
||||||
|
use crate::headerdef::HeaderDef;
|
||||||
use crate::html::new_html_mimepart;
|
use crate::html::new_html_mimepart;
|
||||||
use crate::location;
|
|
||||||
use crate::message::{self, Message, MsgId, Viewtype};
|
use crate::message::{self, Message, MsgId, Viewtype};
|
||||||
use crate::mimeparser::SystemMessage;
|
use crate::mimeparser::SystemMessage;
|
||||||
use crate::param::Param;
|
use crate::param::Param;
|
||||||
|
use crate::peer_channels::create_iroh_header;
|
||||||
use crate::peerstate::Peerstate;
|
use crate::peerstate::Peerstate;
|
||||||
use crate::simplify::escape_message_footer_marks;
|
use crate::simplify::escape_message_footer_marks;
|
||||||
use crate::stock_str;
|
use crate::stock_str;
|
||||||
@@ -29,6 +30,7 @@ use crate::tools::IsNoneOrEmpty;
|
|||||||
use crate::tools::{
|
use crate::tools::{
|
||||||
create_outgoing_rfc724_mid, create_smeared_timestamp, remove_subject_prefix, time,
|
create_outgoing_rfc724_mid, create_smeared_timestamp, remove_subject_prefix, time,
|
||||||
};
|
};
|
||||||
|
use crate::{location, peer_channels};
|
||||||
|
|
||||||
// attachments of 25 mb brutto should work on the majority of providers
|
// attachments of 25 mb brutto should work on the majority of providers
|
||||||
// (brutto examples: web.de=50, 1&1=40, t-online.de=32, gmail=25, posteo=50, yahoo=25, all-inkl=100).
|
// (brutto examples: web.de=50, 1&1=40, t-online.de=32, gmail=25, posteo=50, yahoo=25, all-inkl=100).
|
||||||
@@ -1148,6 +1150,18 @@ impl<'a> MimeFactory<'a> {
|
|||||||
"protection-disabled".to_string(),
|
"protection-disabled".to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
SystemMessage::IrohNodeAddr => {
|
||||||
|
headers.protected.push(Header::new(
|
||||||
|
HeaderDef::IrohNodeAddr.get_headername().to_string(),
|
||||||
|
serde_json::to_string(
|
||||||
|
&context
|
||||||
|
.get_or_try_init_peer_channel()
|
||||||
|
.await?
|
||||||
|
.get_node_addr()
|
||||||
|
.await?,
|
||||||
|
)?,
|
||||||
|
));
|
||||||
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1314,6 +1328,10 @@ impl<'a> MimeFactory<'a> {
|
|||||||
let json = self.msg.param.get(Param::Arg).unwrap_or_default();
|
let json = self.msg.param.get(Param::Arg).unwrap_or_default();
|
||||||
parts.push(context.build_status_update_part(json));
|
parts.push(context.build_status_update_part(json));
|
||||||
} else if self.msg.viewtype == Viewtype::Webxdc {
|
} else if self.msg.viewtype == Viewtype::Webxdc {
|
||||||
|
let topic = peer_channels::create_random_topic();
|
||||||
|
headers
|
||||||
|
.protected
|
||||||
|
.push(create_iroh_header(context, topic, self.msg.id).await?);
|
||||||
if let Some(json) = context
|
if let Some(json) = context
|
||||||
.render_webxdc_status_update_object(self.msg.id, None)
|
.render_webxdc_status_update_object(self.msg.id, None)
|
||||||
.await?
|
.await?
|
||||||
|
|||||||
@@ -199,6 +199,9 @@ pub enum SystemMessage {
|
|||||||
|
|
||||||
/// Webxdc info added with `info` set in `send_webxdc_status_update()`.
|
/// Webxdc info added with `info` set in `send_webxdc_status_update()`.
|
||||||
WebxdcInfoMessage = 32,
|
WebxdcInfoMessage = 32,
|
||||||
|
|
||||||
|
/// This message contains a users iroh node address.
|
||||||
|
IrohNodeAddr = 40,
|
||||||
}
|
}
|
||||||
|
|
||||||
const MIME_AC_SETUP_FILE: &str = "application/autocrypt-setup";
|
const MIME_AC_SETUP_FILE: &str = "application/autocrypt-setup";
|
||||||
|
|||||||
711
src/peer_channels.rs
Normal file
711
src/peer_channels.rs
Normal file
@@ -0,0 +1,711 @@
|
|||||||
|
//! Peer channels for realtime communication in webxdcs.
|
||||||
|
//!
|
||||||
|
//! We use Iroh as an ephemeral peer channels provider to create direct communication
|
||||||
|
//! channels between webxdcs. See [here](https://webxdc.org/docs/spec/joinRealtimeChannel.html) for the webxdc specs.
|
||||||
|
//!
|
||||||
|
//! Ephemeral channels should be established lazily, to avoid bootstrapping p2p connectivity
|
||||||
|
//! when it's not required. Only when a webxdc subscribes to realtime data or when a reatlime message is sent,
|
||||||
|
//! the p2p machinery should be started.
|
||||||
|
//!
|
||||||
|
//! Adding peer channels to webxdc needs upfront negotation of a topic and sharing of public keys so that
|
||||||
|
//! nodes can connect to each other. The explicit approach is as follows:
|
||||||
|
//!
|
||||||
|
//! 1. We introduce a new [GossipTopic](crate::headerdef::HeaderDef::IrohGossipTopic) message header with a random 32-byte TopicId,
|
||||||
|
//! securely generated on the initial webxdc sender's device. This message header is encrypted
|
||||||
|
//! and sent in the same message as the webxdc application.
|
||||||
|
//! 2. Whenever `joinRealtimeChannel().setListener()` or `joinRealtimeChannel().send()` is called by the webxdc application,
|
||||||
|
//! we start a routine to establish p2p connectivity and join the gossip swarm with Iroh.
|
||||||
|
//! 3. The first step of this routine is to introduce yourself with a regular message containing the `IrohPublicKey`.
|
||||||
|
//! This message contains the users relay-server and public key.
|
||||||
|
//! Direct IP address is not included as this information can be persisted by email providers.
|
||||||
|
//! 4. After the announcement, the sending peer joins the gossip swarm with an empty list of peer IDs (as they don't know anyone yet).
|
||||||
|
//! 5. Upon receiving an announcement message, other peers store the sender's [NodeAddr] in the database
|
||||||
|
//! (scoped per WebXDC app instance/message-id). The other peers can then join the gossip with `joinRealtimeChannel().setListener()`
|
||||||
|
//! and `joinRealtimeChannel().send()` just like the other peers.
|
||||||
|
|
||||||
|
use anyhow::{anyhow, Context as _, Result};
|
||||||
|
use email::Header;
|
||||||
|
use iroh_gossip::net::{Gossip, JoinTopicFut, GOSSIP_ALPN};
|
||||||
|
use iroh_gossip::proto::{Event as IrohEvent, TopicId};
|
||||||
|
use iroh_net::relay::{RelayMap, RelayUrl};
|
||||||
|
use iroh_net::{key::SecretKey, relay::RelayMode, MagicEndpoint};
|
||||||
|
use iroh_net::{NodeAddr, NodeId};
|
||||||
|
use std::collections::{BTreeSet, HashMap};
|
||||||
|
use std::env;
|
||||||
|
use tokio::sync::RwLock;
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
use crate::chat::send_msg;
|
||||||
|
use crate::context::Context;
|
||||||
|
use crate::headerdef::HeaderDef;
|
||||||
|
use crate::message::{Message, MsgId, Viewtype};
|
||||||
|
use crate::mimeparser::SystemMessage;
|
||||||
|
use crate::EventType;
|
||||||
|
|
||||||
|
/// The length of an ed25519 `PublicKey`, in bytes.
|
||||||
|
const PUBLIC_KEY_LENGTH: usize = 32;
|
||||||
|
const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();
|
||||||
|
|
||||||
|
/// Store iroh peer channels for the context.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Iroh {
|
||||||
|
/// [MagicEndpoint] needed for iroh peer channels.
|
||||||
|
pub(crate) endpoint: MagicEndpoint,
|
||||||
|
|
||||||
|
/// [Gossip] needed for iroh peer channels.
|
||||||
|
pub(crate) gossip: Gossip,
|
||||||
|
|
||||||
|
/// Topics for which an advertisement has already been sent.
|
||||||
|
pub(crate) iroh_channels: RwLock<HashMap<TopicId, ChannelState>>,
|
||||||
|
|
||||||
|
/// Currently used Iroh secret key
|
||||||
|
pub(crate) secret_key: SecretKey,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Iroh {
|
||||||
|
/// Notify the endpoint that the network has changed.
|
||||||
|
pub(crate) async fn network_change(&self) {
|
||||||
|
self.endpoint.network_change().await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Join a topic and create the subscriber loop for it.
|
||||||
|
///
|
||||||
|
/// If there is no gossip, create it.
|
||||||
|
///
|
||||||
|
/// The returned future resolves when the swarm becomes operational.
|
||||||
|
async fn join_and_subscribe_gossip(
|
||||||
|
&self,
|
||||||
|
ctx: &Context,
|
||||||
|
msg_id: MsgId,
|
||||||
|
) -> Result<Option<JoinTopicFut>> {
|
||||||
|
let topic = get_iroh_topic_for_msg(ctx, msg_id).await?;
|
||||||
|
let seq = if let Some(channel_state) = self.iroh_channels.read().await.get(&topic) {
|
||||||
|
if channel_state.subscribe_loop.is_some() {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
channel_state.seq_number
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
};
|
||||||
|
|
||||||
|
let peers = get_iroh_gossip_peers(ctx, msg_id).await?;
|
||||||
|
info!(
|
||||||
|
ctx,
|
||||||
|
"IROH_REALTIME: Joining gossip with peers: {:?}",
|
||||||
|
peers.iter().map(|p| p.node_id).collect::<Vec<_>>()
|
||||||
|
);
|
||||||
|
|
||||||
|
// Connect to all peers
|
||||||
|
for peer in &peers {
|
||||||
|
self.endpoint.add_node_addr(peer.clone())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let connect_future = self
|
||||||
|
.gossip
|
||||||
|
.join(topic, peers.into_iter().map(|addr| addr.node_id).collect())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let ctx = ctx.clone();
|
||||||
|
let gossip = self.gossip.clone();
|
||||||
|
let subscribe_loop = tokio::spawn(async move {
|
||||||
|
if let Err(e) = subscribe_loop(&ctx, gossip, topic, msg_id).await {
|
||||||
|
warn!(ctx, "subscribe_loop failed: {e}")
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
self.iroh_channels
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.insert(topic, ChannelState::new(seq, subscribe_loop));
|
||||||
|
|
||||||
|
Ok(Some(connect_future))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send realtime data to the gossip swarm.
|
||||||
|
pub async fn send_webxdc_realtime_data(
|
||||||
|
&self,
|
||||||
|
ctx: &Context,
|
||||||
|
msg_id: MsgId,
|
||||||
|
mut data: Vec<u8>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let topic = get_iroh_topic_for_msg(ctx, msg_id).await?;
|
||||||
|
self.join_and_subscribe_gossip(ctx, msg_id).await?;
|
||||||
|
|
||||||
|
let seq_num = self.get_and_incr(&topic).await;
|
||||||
|
data.extend(seq_num.to_le_bytes());
|
||||||
|
data.extend(self.secret_key.public().as_bytes());
|
||||||
|
|
||||||
|
self.gossip.broadcast(topic, data.into()).await?;
|
||||||
|
|
||||||
|
if env::var("REALTIME_DEBUG").is_ok() {
|
||||||
|
info!(ctx, "Sent realtime data");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_and_incr(&self, topic: &TopicId) -> i32 {
|
||||||
|
let mut seq = 0;
|
||||||
|
if let Some(state) = self.iroh_channels.write().await.get_mut(topic) {
|
||||||
|
seq = state.seq_number;
|
||||||
|
state.seq_number = state.seq_number.wrapping_add(1)
|
||||||
|
}
|
||||||
|
seq
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the iroh [NodeAddr] without direct IP addresses.
|
||||||
|
pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> {
|
||||||
|
let mut addr = self.endpoint.my_addr().await?;
|
||||||
|
addr.info.direct_addresses = BTreeSet::new();
|
||||||
|
Ok(addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Leave the realtime channel for a given topic.
|
||||||
|
pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> {
|
||||||
|
if let Some(channel) = &mut self.iroh_channels.write().await.get_mut(&topic) {
|
||||||
|
if let Some(subscribe_loop) = channel.subscribe_loop.take() {
|
||||||
|
subscribe_loop.abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.gossip.quit(topic).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Single gossip channel state.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(crate) struct ChannelState {
|
||||||
|
/// Sequence number for the gossip channel.
|
||||||
|
seq_number: i32,
|
||||||
|
/// The subscribe loop handle.
|
||||||
|
subscribe_loop: Option<JoinHandle<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ChannelState {
|
||||||
|
fn new(seq_number: i32, subscribe_loop: JoinHandle<()>) -> Self {
|
||||||
|
Self {
|
||||||
|
seq_number,
|
||||||
|
subscribe_loop: Some(subscribe_loop),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Context {
|
||||||
|
/// Create magic endpoint and gossip.
|
||||||
|
async fn init_peer_channels(&self) -> Result<Iroh> {
|
||||||
|
let secret_key: SecretKey = SecretKey::generate();
|
||||||
|
|
||||||
|
let relay_mode = if let Some(relay_url) = self
|
||||||
|
.metadata
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|conf| conf.iroh_relay.clone())
|
||||||
|
{
|
||||||
|
RelayMode::Custom(RelayMap::from_url(RelayUrl::from(relay_url)))
|
||||||
|
} else {
|
||||||
|
// FIXME: this should be RelayMode::Disabled instead.
|
||||||
|
// Currently using default relays because otherwise Rust tests fail.
|
||||||
|
RelayMode::Default
|
||||||
|
};
|
||||||
|
|
||||||
|
let endpoint = MagicEndpoint::builder()
|
||||||
|
.secret_key(secret_key.clone())
|
||||||
|
.alpns(vec![GOSSIP_ALPN.to_vec()])
|
||||||
|
.relay_mode(relay_mode)
|
||||||
|
.bind(0)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// create gossip
|
||||||
|
let my_addr = endpoint.my_addr().await?;
|
||||||
|
let gossip = Gossip::from_endpoint(endpoint.clone(), Default::default(), &my_addr.info);
|
||||||
|
|
||||||
|
// spawn endpoint loop that forwards incoming connections to the gossiper
|
||||||
|
let context = self.clone();
|
||||||
|
|
||||||
|
// Shuts down on deltachat shutdown
|
||||||
|
tokio::spawn(endpoint_loop(context, endpoint.clone(), gossip.clone()));
|
||||||
|
|
||||||
|
Ok(Iroh {
|
||||||
|
endpoint,
|
||||||
|
gossip,
|
||||||
|
iroh_channels: RwLock::new(HashMap::new()),
|
||||||
|
secret_key,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get or initialize the iroh peer channel.
|
||||||
|
pub async fn get_or_try_init_peer_channel(&self) -> Result<&Iroh> {
|
||||||
|
let ctx = self.clone();
|
||||||
|
self.iroh
|
||||||
|
.get_or_try_init(|| async { ctx.init_peer_channels().await })
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cache a peers [NodeId] for one topic.
|
||||||
|
pub(crate) async fn iroh_add_peer_for_topic(
|
||||||
|
ctx: &Context,
|
||||||
|
msg_id: MsgId,
|
||||||
|
topic: TopicId,
|
||||||
|
peer: NodeId,
|
||||||
|
relay_server: Option<&str>,
|
||||||
|
) -> Result<()> {
|
||||||
|
ctx.sql
|
||||||
|
.execute(
|
||||||
|
"INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
|
||||||
|
(msg_id, peer.as_bytes(), topic.as_bytes(), relay_server),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Insert topicId into the database so that we can use it to retrieve the topic.
|
||||||
|
pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> {
|
||||||
|
ctx.sql
|
||||||
|
.execute(
|
||||||
|
"INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
|
||||||
|
(msg_id, PUBLIC_KEY_STUB, topic.as_bytes(), Option::<&str>::None),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a list of [NodeAddr]s for one webxdc.
|
||||||
|
async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result<Vec<NodeAddr>> {
|
||||||
|
ctx.sql
|
||||||
|
.query_map(
|
||||||
|
"SELECT public_key, relay_server FROM iroh_gossip_peers WHERE msg_id = ? AND public_key != ?",
|
||||||
|
(msg_id, PUBLIC_KEY_STUB),
|
||||||
|
|row| {
|
||||||
|
let key: Vec<u8> = row.get(0)?;
|
||||||
|
let server: Option<String> = row.get(1)?;
|
||||||
|
Ok((key, server))
|
||||||
|
},
|
||||||
|
|g| {
|
||||||
|
g.map(|data| {
|
||||||
|
let (key, server) = data?;
|
||||||
|
let server = server.map(|data| Ok::<_, url::ParseError>(RelayUrl::from(Url::parse(&data)?))).transpose()?;
|
||||||
|
let id = NodeId::from_bytes(&key.try_into()
|
||||||
|
.map_err(|_| anyhow!("Can't convert sql data to [u8; 32]"))?)?;
|
||||||
|
Ok::<_, anyhow::Error>(NodeAddr::from_parts(
|
||||||
|
id, server, vec![]
|
||||||
|
))
|
||||||
|
})
|
||||||
|
.collect::<std::result::Result<Vec<_>, _>>()
|
||||||
|
.map_err(Into::into)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the topic for a given [MsgId].
|
||||||
|
pub(crate) async fn get_iroh_topic_for_msg(ctx: &Context, msg_id: MsgId) -> Result<TopicId> {
|
||||||
|
let bytes: Vec<u8> = ctx
|
||||||
|
.sql
|
||||||
|
.query_get_value(
|
||||||
|
"SELECT topic FROM iroh_gossip_peers WHERE msg_id = ? LIMIT 1",
|
||||||
|
(msg_id,),
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
.context("couldn't restore topic from db")?;
|
||||||
|
Ok(TopicId::from_bytes(bytes.try_into().unwrap()))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a gossip advertisement to the chat that [MsgId] belongs to.
|
||||||
|
/// This method should be called from the frontend when `joinRealtimeChannel` is called.
|
||||||
|
pub async fn send_webxdc_realtime_advertisement(
|
||||||
|
ctx: &Context,
|
||||||
|
msg_id: MsgId,
|
||||||
|
) -> Result<Option<JoinTopicFut>> {
|
||||||
|
let iroh = ctx.get_or_try_init_peer_channel().await?;
|
||||||
|
let conn = iroh.join_and_subscribe_gossip(ctx, msg_id).await?;
|
||||||
|
|
||||||
|
let webxdc = Message::load_from_db(ctx, msg_id).await?;
|
||||||
|
let mut msg = Message::new(Viewtype::Text);
|
||||||
|
msg.hidden = true;
|
||||||
|
msg.param.set_cmd(SystemMessage::IrohNodeAddr);
|
||||||
|
msg.in_reply_to = Some(webxdc.rfc724_mid.clone());
|
||||||
|
send_msg(ctx, webxdc.chat_id, &mut msg).await?;
|
||||||
|
info!(ctx, "IROH_REALTIME: Sent realtime advertisement");
|
||||||
|
Ok(conn)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send realtime data to the gossip swarm.
|
||||||
|
pub async fn send_webxdc_realtime_data(ctx: &Context, msg_id: MsgId, data: Vec<u8>) -> Result<()> {
|
||||||
|
let iroh = ctx.get_or_try_init_peer_channel().await?;
|
||||||
|
iroh.send_webxdc_realtime_data(ctx, msg_id, data).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Leave the gossip of the webxdc with given [MsgId].
|
||||||
|
pub async fn leave_webxdc_realtime(ctx: &Context, msg_id: MsgId) -> Result<()> {
|
||||||
|
let iroh = ctx.get_or_try_init_peer_channel().await?;
|
||||||
|
iroh.leave_realtime(get_iroh_topic_for_msg(ctx, msg_id).await?)
|
||||||
|
.await?;
|
||||||
|
info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn create_random_topic() -> TopicId {
|
||||||
|
TopicId::from_bytes(rand::random())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn create_iroh_header(
|
||||||
|
ctx: &Context,
|
||||||
|
topic: TopicId,
|
||||||
|
msg_id: MsgId,
|
||||||
|
) -> Result<Header> {
|
||||||
|
insert_topic_stub(ctx, msg_id, topic).await?;
|
||||||
|
Ok(Header::new(
|
||||||
|
HeaderDef::IrohGossipTopic.get_headername().to_string(),
|
||||||
|
topic.to_string(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn endpoint_loop(context: Context, endpoint: MagicEndpoint, gossip: Gossip) {
|
||||||
|
while let Some(conn) = endpoint.accept().await {
|
||||||
|
info!(context, "IROH_REALTIME: accepting iroh connection");
|
||||||
|
let gossip = gossip.clone();
|
||||||
|
let context = context.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(err) = handle_connection(&context, conn, gossip).await {
|
||||||
|
warn!(context, "IROH_REALTIME: iroh connection error: {err}");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_connection(
|
||||||
|
context: &Context,
|
||||||
|
mut conn: iroh_net::magic_endpoint::Connecting,
|
||||||
|
gossip: Gossip,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let alpn = conn.alpn().await?;
|
||||||
|
let conn = conn.await?;
|
||||||
|
let peer_id = iroh_net::magic_endpoint::get_remote_node_id(&conn)?;
|
||||||
|
|
||||||
|
match alpn.as_bytes() {
|
||||||
|
GOSSIP_ALPN => gossip
|
||||||
|
.handle_connection(conn)
|
||||||
|
.await
|
||||||
|
.context(format!("Connection to {peer_id} with ALPN {alpn} failed"))?,
|
||||||
|
_ => warn!(
|
||||||
|
context,
|
||||||
|
"Ignoring connection from {peer_id}: unsupported ALPN protocol"
|
||||||
|
),
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn subscribe_loop(
|
||||||
|
context: &Context,
|
||||||
|
gossip: Gossip,
|
||||||
|
topic: TopicId,
|
||||||
|
msg_id: MsgId,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut stream = gossip.subscribe(topic).await?;
|
||||||
|
loop {
|
||||||
|
let event = stream.recv().await?;
|
||||||
|
match event {
|
||||||
|
IrohEvent::NeighborUp(node) => {
|
||||||
|
info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string());
|
||||||
|
iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
|
||||||
|
}
|
||||||
|
IrohEvent::Received(event) => {
|
||||||
|
info!(context, "IROH_REALTIME: Received realtime data");
|
||||||
|
context.emit_event(EventType::WebxdcRealtimeData {
|
||||||
|
msg_id,
|
||||||
|
data: event
|
||||||
|
.content
|
||||||
|
.get(0..event.content.len() - 4 - PUBLIC_KEY_LENGTH)
|
||||||
|
.context("too few bytes in iroh message")?
|
||||||
|
.into(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use crate::{
|
||||||
|
chat::send_msg,
|
||||||
|
message::{Message, Viewtype},
|
||||||
|
peer_channels::{
|
||||||
|
get_iroh_gossip_peers, get_iroh_topic_for_msg, leave_webxdc_realtime,
|
||||||
|
send_webxdc_realtime_advertisement,
|
||||||
|
},
|
||||||
|
test_utils::TestContextManager,
|
||||||
|
EventType,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_can_communicate() {
|
||||||
|
let mut tcm = TestContextManager::new();
|
||||||
|
let alice = &mut tcm.alice().await;
|
||||||
|
let bob = &mut tcm.bob().await;
|
||||||
|
|
||||||
|
// Alice sends webxdc to bob
|
||||||
|
let alice_chat = alice.create_chat(bob).await;
|
||||||
|
let mut instance = Message::new(Viewtype::File);
|
||||||
|
instance
|
||||||
|
.set_file_from_bytes(
|
||||||
|
alice,
|
||||||
|
"minimal.xdc",
|
||||||
|
include_bytes!("../test-data/webxdc/minimal.xdc"),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
|
||||||
|
let alice_webxdc = alice.get_last_msg().await;
|
||||||
|
assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
|
||||||
|
|
||||||
|
let webxdc = alice.pop_sent_msg().await;
|
||||||
|
let bob_webdxc = bob.recv_msg(&webxdc).await;
|
||||||
|
assert_eq!(bob_webdxc.get_viewtype(), Viewtype::Webxdc);
|
||||||
|
|
||||||
|
bob_webdxc.chat_id.accept(bob).await.unwrap();
|
||||||
|
|
||||||
|
// Alice advertises herself.
|
||||||
|
send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
|
||||||
|
let bob_iroh = bob.get_or_try_init_peer_channel().await.unwrap();
|
||||||
|
|
||||||
|
// Bob adds alice to gossip peers.
|
||||||
|
let members = get_iroh_gossip_peers(bob, bob_webdxc.id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.into_iter()
|
||||||
|
.map(|addr| addr.node_id)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let alice_iroh = alice.get_or_try_init_peer_channel().await.unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
members,
|
||||||
|
vec![alice_iroh.get_node_addr().await.unwrap().node_id]
|
||||||
|
);
|
||||||
|
|
||||||
|
bob_iroh
|
||||||
|
.join_and_subscribe_gossip(bob, bob_webdxc.id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Alice sends ephemeral message
|
||||||
|
alice_iroh
|
||||||
|
.send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let event = bob.evtracker.recv().await.unwrap();
|
||||||
|
if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
|
||||||
|
if data == "alice -> bob".as_bytes() {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
panic!(
|
||||||
|
"Unexpected status update: {}",
|
||||||
|
String::from_utf8_lossy(&data)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Bob sends ephemeral message
|
||||||
|
bob_iroh
|
||||||
|
.send_webxdc_realtime_data(bob, bob_webdxc.id, "bob -> alice".as_bytes().to_vec())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let event = alice.evtracker.recv().await.unwrap();
|
||||||
|
if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
|
||||||
|
if data == "bob -> alice".as_bytes() {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
panic!(
|
||||||
|
"Unexpected status update: {}",
|
||||||
|
String::from_utf8_lossy(&data)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Alice adds bob to gossip peers.
|
||||||
|
let members = get_iroh_gossip_peers(alice, alice_webxdc.id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.into_iter()
|
||||||
|
.map(|addr| addr.node_id)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
members,
|
||||||
|
vec![bob_iroh.get_node_addr().await.unwrap().node_id]
|
||||||
|
);
|
||||||
|
|
||||||
|
bob_iroh
|
||||||
|
.send_webxdc_realtime_data(bob, bob_webdxc.id, "bob -> alice 2".as_bytes().to_vec())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let event = alice.evtracker.recv().await.unwrap();
|
||||||
|
if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
|
||||||
|
if data == "bob -> alice 2".as_bytes() {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
panic!(
|
||||||
|
"Unexpected status update: {}",
|
||||||
|
String::from_utf8_lossy(&data)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_can_reconnect() {
|
||||||
|
let mut tcm = TestContextManager::new();
|
||||||
|
let alice = &mut tcm.alice().await;
|
||||||
|
let bob = &mut tcm.bob().await;
|
||||||
|
|
||||||
|
// Alice sends webxdc to bob
|
||||||
|
let alice_chat = alice.create_chat(bob).await;
|
||||||
|
let mut instance = Message::new(Viewtype::File);
|
||||||
|
instance
|
||||||
|
.set_file_from_bytes(
|
||||||
|
alice,
|
||||||
|
"minimal.xdc",
|
||||||
|
include_bytes!("../test-data/webxdc/minimal.xdc"),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
|
||||||
|
let alice_webxdc = alice.get_last_msg().await;
|
||||||
|
assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
|
||||||
|
|
||||||
|
let webxdc = alice.pop_sent_msg().await;
|
||||||
|
let bob_webdxc = bob.recv_msg(&webxdc).await;
|
||||||
|
assert_eq!(bob_webdxc.get_viewtype(), Viewtype::Webxdc);
|
||||||
|
|
||||||
|
bob_webdxc.chat_id.accept(bob).await.unwrap();
|
||||||
|
|
||||||
|
// Alice advertises herself.
|
||||||
|
send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
|
||||||
|
let bob_iroh = bob.get_or_try_init_peer_channel().await.unwrap();
|
||||||
|
|
||||||
|
// Bob adds alice to gossip peers.
|
||||||
|
let members = get_iroh_gossip_peers(bob, bob_webdxc.id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.into_iter()
|
||||||
|
.map(|addr| addr.node_id)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let alice_iroh = alice.get_or_try_init_peer_channel().await.unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
members,
|
||||||
|
vec![alice_iroh.get_node_addr().await.unwrap().node_id]
|
||||||
|
);
|
||||||
|
|
||||||
|
bob_iroh
|
||||||
|
.join_and_subscribe_gossip(bob, bob_webdxc.id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Alice sends ephemeral message
|
||||||
|
alice_iroh
|
||||||
|
.send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let event = bob.evtracker.recv().await.unwrap();
|
||||||
|
if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
|
||||||
|
if data == "alice -> bob".as_bytes() {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
panic!(
|
||||||
|
"Unexpected status update: {}",
|
||||||
|
String::from_utf8_lossy(&data)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: check that seq number is persisted
|
||||||
|
leave_webxdc_realtime(bob, bob_webdxc.id).await.unwrap();
|
||||||
|
|
||||||
|
bob_iroh
|
||||||
|
.join_and_subscribe_gossip(bob, bob_webdxc.id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
bob_iroh
|
||||||
|
.send_webxdc_realtime_data(bob, bob_webdxc.id, "bob -> alice".as_bytes().to_vec())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let event = alice.evtracker.recv().await.unwrap();
|
||||||
|
if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
|
||||||
|
if data == "bob -> alice".as_bytes() {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
panic!(
|
||||||
|
"Unexpected status update: {}",
|
||||||
|
String::from_utf8_lossy(&data)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// channel is only used to remeber if an advertisement has been sent
|
||||||
|
// bob for example does not change the channels because he never sends an
|
||||||
|
// advertisement
|
||||||
|
assert_eq!(
|
||||||
|
alice.iroh.get().unwrap().iroh_channels.read().await.len(),
|
||||||
|
1
|
||||||
|
);
|
||||||
|
leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap();
|
||||||
|
let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(if let Some(state) = alice
|
||||||
|
.iroh
|
||||||
|
.get()
|
||||||
|
.unwrap()
|
||||||
|
.iroh_channels
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.get(&topic)
|
||||||
|
{
|
||||||
|
state.subscribe_loop.is_none()
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -23,6 +23,7 @@ use crate::peerstate::Peerstate;
|
|||||||
use crate::socks::Socks5Config;
|
use crate::socks::Socks5Config;
|
||||||
use crate::token;
|
use crate::token;
|
||||||
use crate::tools::validate_id;
|
use crate::tools::validate_id;
|
||||||
|
use iroh_old as iroh;
|
||||||
|
|
||||||
const OPENPGP4FPR_SCHEME: &str = "OPENPGP4FPR:"; // yes: uppercase
|
const OPENPGP4FPR_SCHEME: &str = "OPENPGP4FPR:"; // yes: uppercase
|
||||||
const IDELTACHAT_SCHEME: &str = "https://i.delta.chat/#";
|
const IDELTACHAT_SCHEME: &str = "https://i.delta.chat/#";
|
||||||
|
|||||||
@@ -1,11 +1,13 @@
|
|||||||
//! Internet Message Format reception pipeline.
|
//! Internet Message Format reception pipeline.
|
||||||
|
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
use anyhow::{Context as _, Result};
|
use anyhow::{Context as _, Result};
|
||||||
use deltachat_contact_tools::{
|
use deltachat_contact_tools::{
|
||||||
addr_cmp, may_be_valid_addr, normalize_name, strip_rtlo_characters, ContactAddress,
|
addr_cmp, may_be_valid_addr, normalize_name, strip_rtlo_characters, ContactAddress,
|
||||||
};
|
};
|
||||||
|
use iroh_gossip::proto::TopicId;
|
||||||
use mailparse::{parse_mail, SingleInfo};
|
use mailparse::{parse_mail, SingleInfo};
|
||||||
use num_traits::FromPrimitive;
|
use num_traits::FromPrimitive;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
@@ -30,6 +32,7 @@ use crate::message::{
|
|||||||
};
|
};
|
||||||
use crate::mimeparser::{parse_message_ids, AvatarAction, MimeMessage, SystemMessage};
|
use crate::mimeparser::{parse_message_ids, AvatarAction, MimeMessage, SystemMessage};
|
||||||
use crate::param::{Param, Params};
|
use crate::param::{Param, Params};
|
||||||
|
use crate::peer_channels::{get_iroh_topic_for_msg, insert_topic_stub, iroh_add_peer_for_topic};
|
||||||
use crate::peerstate::Peerstate;
|
use crate::peerstate::Peerstate;
|
||||||
use crate::reaction::{set_msg_reaction, Reaction};
|
use crate::reaction::{set_msg_reaction, Reaction};
|
||||||
use crate::securejoin::{self, handle_securejoin_handshake, observe_securejoin_on_other_device};
|
use crate::securejoin::{self, handle_securejoin_handshake, observe_securejoin_on_other_device};
|
||||||
@@ -40,6 +43,7 @@ use crate::sync::Sync::*;
|
|||||||
use crate::tools::{self, buf_compress, extract_grpid_from_rfc724_mid};
|
use crate::tools::{self, buf_compress, extract_grpid_from_rfc724_mid};
|
||||||
use crate::{chatlist_events, location};
|
use crate::{chatlist_events, location};
|
||||||
use crate::{contact, imap};
|
use crate::{contact, imap};
|
||||||
|
use iroh_net::NodeAddr;
|
||||||
|
|
||||||
/// This is the struct that is returned after receiving one email (aka MIME message).
|
/// This is the struct that is returned after receiving one email (aka MIME message).
|
||||||
///
|
///
|
||||||
@@ -1220,7 +1224,7 @@ async fn add_parts(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let orig_chat_id = chat_id;
|
let orig_chat_id = chat_id;
|
||||||
let chat_id = if is_mdn || is_reaction {
|
let mut chat_id = if is_mdn || is_reaction {
|
||||||
DC_CHAT_ID_TRASH
|
DC_CHAT_ID_TRASH
|
||||||
} else {
|
} else {
|
||||||
chat_id.unwrap_or_else(|| {
|
chat_id.unwrap_or_else(|| {
|
||||||
@@ -1430,6 +1434,24 @@ async fn add_parts(
|
|||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(node_addr) = mime_parser.get_header(HeaderDef::IrohNodeAddr) {
|
||||||
|
match serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address") {
|
||||||
|
Ok(node_addr) => {
|
||||||
|
info!(context, "Adding iroh peer with address {node_addr:?}.");
|
||||||
|
let instance_id = parent.context("Failed to get parent message")?.id;
|
||||||
|
let node_id = node_addr.node_id;
|
||||||
|
let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
|
||||||
|
let topic = get_iroh_topic_for_msg(context, instance_id).await?;
|
||||||
|
iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;
|
||||||
|
|
||||||
|
chat_id = DC_CHAT_ID_TRASH;
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
warn!(context, "Couldn't parse NodeAddr: {err:#}.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for part in &mime_parser.parts {
|
for part in &mime_parser.parts {
|
||||||
if part.is_reaction {
|
if part.is_reaction {
|
||||||
let reaction_str = simplify::remove_footers(part.msg.as_str());
|
let reaction_str = simplify::remove_footers(part.msg.as_str());
|
||||||
@@ -1597,6 +1619,16 @@ RETURNING id
|
|||||||
|
|
||||||
// check all parts whether they contain a new logging webxdc
|
// check all parts whether they contain a new logging webxdc
|
||||||
for (part, msg_id) in mime_parser.parts.iter().zip(&created_db_entries) {
|
for (part, msg_id) in mime_parser.parts.iter().zip(&created_db_entries) {
|
||||||
|
// check if any part contains a webxdc topic id
|
||||||
|
if part.typ == Viewtype::Webxdc {
|
||||||
|
if let Some(topic) = mime_parser.get_header(HeaderDef::IrohGossipTopic) {
|
||||||
|
let topic = TopicId::from_str(topic).context("wrong gossip topic header")?;
|
||||||
|
insert_topic_stub(context, *msg_id, topic).await?;
|
||||||
|
} else {
|
||||||
|
warn!(context, "webxdc doesn't have a gossip topic")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
maybe_set_logging_xdc_inner(
|
maybe_set_logging_xdc_inner(
|
||||||
context,
|
context,
|
||||||
part.typ,
|
part.typ,
|
||||||
|
|||||||
@@ -912,6 +912,30 @@ CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid);
|
|||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if dbversion < 111 {
|
||||||
|
sql.execute_migration(
|
||||||
|
"CREATE TABLE iroh_gossip_peers (msg_id TEXT not NULL, topic TEXT NOT NULL, public_key TEXT NOT NULL)",
|
||||||
|
111,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if dbversion < 112 {
|
||||||
|
sql.execute_migration(
|
||||||
|
"DROP TABLE iroh_gossip_peers; CREATE TABLE iroh_gossip_peers (msg_id INTEGER not NULL, topic BLOB NOT NULL, public_key BLOB NOT NULL, relay_server TEXT, UNIQUE (public_key, topic)) STRICT",
|
||||||
|
112,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if dbversion < 113 {
|
||||||
|
sql.execute_migration(
|
||||||
|
"DROP TABLE iroh_gossip_peers; CREATE TABLE iroh_gossip_peers (msg_id INTEGER not NULL, topic BLOB NOT NULL, public_key BLOB NOT NULL, relay_server TEXT, UNIQUE (topic, public_key), PRIMARY KEY(topic, public_key)) STRICT",
|
||||||
|
113,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
let new_version = sql
|
let new_version = sql
|
||||||
.get_raw_config_int(VERSION_CFG)
|
.get_raw_config_int(VERSION_CFG)
|
||||||
.await?
|
.await?
|
||||||
|
|||||||
Reference in New Issue
Block a user