mirror of
https://github.com/chatmail/core.git
synced 2026-04-02 05:22:14 +03:00
Compare commits
28 Commits
v2.0.0
...
debug-iroh
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
866fa57234 | ||
|
|
0def0e070d | ||
|
|
1b184af875 | ||
|
|
5a26a84fb0 | ||
|
|
f7a1ab627c | ||
|
|
4bed4b32f5 | ||
|
|
013eaba47f | ||
|
|
7aad78e894 | ||
|
|
41f39117af | ||
|
|
b9425577b4 | ||
|
|
90d30c4a35 | ||
|
|
97695d7e19 | ||
|
|
6bcb347426 | ||
|
|
24aa657984 | ||
|
|
f0bfa5869f | ||
|
|
df17d9b1da | ||
|
|
66fec82daf | ||
|
|
b501ab1532 | ||
|
|
e6087db69c | ||
|
|
9e8ee7b1c7 | ||
|
|
397e71a66a | ||
|
|
4bcc3d22aa | ||
|
|
ba3bc01e1b | ||
|
|
a1649a8258 | ||
|
|
96d43b6084 | ||
|
|
b95a593211 | ||
|
|
7b046692ae | ||
|
|
9fb003563b |
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -1426,6 +1426,8 @@ dependencies = [
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
"toml",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"uuid",
|
||||
]
|
||||
@@ -1495,6 +1497,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing-subscriber",
|
||||
"yerpc",
|
||||
]
|
||||
|
||||
|
||||
@@ -61,8 +61,8 @@ hickory-resolver = "0.24"
|
||||
humansize = "2"
|
||||
image = { version = "0.25.1", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] }
|
||||
iroh_old = { version = "0.4.2", default-features = false, package = "iroh"}
|
||||
iroh-net = "0.16.2"
|
||||
iroh-gossip = { version = "0.16.2", features = ["net"] }
|
||||
iroh-net = { git = "https://github.com/link2xt/iroh", branch="link2xt/keep-connection" }
|
||||
iroh-gossip = { git = "https://github.com/link2xt/iroh", branch="link2xt/keep-connection", features = ["net"] }
|
||||
quinn = "0.10.0"
|
||||
kamadak-exif = "0.5.3"
|
||||
lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" }
|
||||
@@ -104,6 +104,7 @@ tokio-util = "0.7.9"
|
||||
toml = "0.8"
|
||||
url = "2"
|
||||
uuid = { version = "1", features = ["serde", "v4"] }
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
|
||||
# Pin OpenSSL to 3.1 releases.
|
||||
# OpenSSL 3.2 has a regression tracked at <https://github.com/openssl/openssl/issues/23376>
|
||||
@@ -112,6 +113,8 @@ uuid = { version = "1", features = ["serde", "v4"] }
|
||||
# According to <https://www.openssl.org/policies/releasestrat.html>
|
||||
# 3.1 branch will be supported until 2025-03-14.
|
||||
openssl-src = "~300.1"
|
||||
tracing = "0.1.40"
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
ansi_term = "0.12.0"
|
||||
@@ -181,4 +184,4 @@ vendored = [
|
||||
"async-native-tls/vendored",
|
||||
"rusqlite/bundled-sqlcipher-vendored-openssl",
|
||||
"reqwest/native-tls-vendored"
|
||||
]
|
||||
]
|
||||
|
||||
@@ -19,6 +19,7 @@ use deltachat::location;
|
||||
use deltachat::log::LogExt;
|
||||
use deltachat::message::{self, Message, MessageState, MsgId, Viewtype};
|
||||
use deltachat::mimeparser::SystemMessage;
|
||||
use deltachat::peer_channels::{send_webxdc_realtime_advertisement, send_webxdc_realtime_data};
|
||||
use deltachat::peerstate::*;
|
||||
use deltachat::qr::*;
|
||||
use deltachat::reaction::send_reaction;
|
||||
@@ -642,6 +643,30 @@ pub async fn cmdline(context: Context, line: &str, chat_id: &mut ChatId) -> Resu
|
||||
println!("{cnt} chats");
|
||||
println!("{time_needed:?} to create this list");
|
||||
}
|
||||
"start-realtime" => {
|
||||
if arg1.is_empty() {
|
||||
bail!("missing msgid");
|
||||
}
|
||||
let msg_id = MsgId::new(arg1.parse()?);
|
||||
let res = send_webxdc_realtime_advertisement(&context, msg_id).await?;
|
||||
|
||||
if let Some(res) = res {
|
||||
println!("waiting for peer channel join");
|
||||
res.await?;
|
||||
}
|
||||
println!("joined peer channel");
|
||||
}
|
||||
"send-realtime" => {
|
||||
if arg1.is_empty() {
|
||||
bail!("missing msgid");
|
||||
}
|
||||
if arg2.is_empty() {
|
||||
bail!("no message");
|
||||
}
|
||||
let msg_id = MsgId::new(arg1.parse()?);
|
||||
send_webxdc_realtime_data(&context, msg_id, arg2.as_bytes().to_vec()).await?;
|
||||
println!("sent realtime message");
|
||||
}
|
||||
"chat" => {
|
||||
if sel_chat.is_none() && arg1.is_empty() {
|
||||
bail!("Argument [chat-id] is missing.");
|
||||
|
||||
@@ -62,6 +62,7 @@ class EventType(str, Enum):
|
||||
CHATLIST_CHANGED = "ChatlistChanged"
|
||||
CHATLIST_ITEM_CHANGED = "ChatlistItemChanged"
|
||||
CONFIG_SYNCED = "ConfigSynced"
|
||||
WEBXDC_REALTIME_DATA = "WebxdcRealtimeData"
|
||||
|
||||
|
||||
class ChatId(IntEnum):
|
||||
|
||||
@@ -177,7 +177,7 @@ class Rpc:
|
||||
account_id = event["contextId"]
|
||||
queue = self.get_queue(account_id)
|
||||
event = event["event"]
|
||||
logging.debug("account_id=%d got an event %s", account_id, event)
|
||||
print("account_id=%d got an event %s" % (account_id, event), file=sys.stderr)
|
||||
queue.put(event)
|
||||
except Exception:
|
||||
# Log an exception if the event loop dies.
|
||||
|
||||
123
deltachat-rpc-client/tests/test_webxdc_iroh.py
Normal file
123
deltachat-rpc-client/tests/test_webxdc_iroh.py
Normal file
@@ -0,0 +1,123 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Testing webxdc iroh connectivity
|
||||
|
||||
If you want to debug iroh at rust-trace/log level set
|
||||
|
||||
RUST_LOG=iroh_net=trace,iroh_gossip=trace
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
import time
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
import random
|
||||
import itertools
|
||||
import sys
|
||||
|
||||
from deltachat_rpc_client import DeltaChat, EventType, SpecialContactId
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def path_to_webxdc():
|
||||
return "../test-data/webxdc/chess.xdc"
|
||||
|
||||
|
||||
def test_realtime_sequentially(acfactory, path_to_webxdc):
|
||||
"""Test two peers trying to establish connection sequentially."""
|
||||
ac1, ac2 = acfactory.get_online_accounts(2)
|
||||
ac1.create_chat(ac2)
|
||||
ac2.create_chat(ac1)
|
||||
acfactory.send_message(from_account=ac1, to_account=ac2, text="ping0")
|
||||
snapshot = ac2.get_message_by_id(ac2.wait_for_incoming_msg_event().msg_id).get_snapshot()
|
||||
assert snapshot.text == "ping0"
|
||||
|
||||
def log(msg):
|
||||
print()
|
||||
print("*" * 80 + "\n" + msg + "\n", file=sys.stderr)
|
||||
print()
|
||||
|
||||
# share a webxdc app between ac1 and ac2
|
||||
ac1_webxdc_msg = acfactory.send_message(from_account=ac1, to_account=ac2, text="play", file=path_to_webxdc)
|
||||
ac2_webxdc_msg = ac2.get_message_by_id(ac2.wait_for_incoming_msg_event().msg_id)
|
||||
snapshot = ac2_webxdc_msg.get_snapshot()
|
||||
assert snapshot.text == "play"
|
||||
|
||||
# send iroh announcements sequentially
|
||||
log("sending ac1 -> ac2 realtime advertisement and additional message")
|
||||
ac1._rpc.send_webxdc_realtime_advertisement(ac1.id, ac1_webxdc_msg.id)
|
||||
acfactory.send_message(from_account=ac1, to_account=ac2, text="ping1")
|
||||
|
||||
log("waiting for incoming message on ac2")
|
||||
snapshot = ac2.get_message_by_id(ac2.wait_for_incoming_msg_event().msg_id).get_snapshot()
|
||||
assert snapshot.text == "ping1"
|
||||
|
||||
log("sending ac2 -> ac1 realtime advertisement and additional message")
|
||||
ac2._rpc.send_webxdc_realtime_advertisement(ac2.id, ac2_webxdc_msg.id)
|
||||
acfactory.send_message(from_account=ac2, to_account=ac1, text="ping2")
|
||||
|
||||
log("waiting for incoming message on ac1")
|
||||
snapshot = ac1.get_message_by_id(ac1.wait_for_incoming_msg_event().msg_id).get_snapshot()
|
||||
assert snapshot.text == "ping2"
|
||||
|
||||
log("sending realtime data ac1 -> ac2")
|
||||
ac1._rpc.send_webxdc_realtime_data(ac1.id, ac1_webxdc_msg.id, [13, 15, 17])
|
||||
|
||||
log("ac2: waiting for realtime data")
|
||||
while 1:
|
||||
event = ac2.wait_for_event()
|
||||
if event.kind == EventType.WEBXDC_REALTIME_DATA:
|
||||
assert event.data == [13, 15, 17]
|
||||
break
|
||||
|
||||
|
||||
def test_realtime_simultaneously(acfactory, path_to_webxdc):
|
||||
"""Test two peers trying to establish connection simultaneously."""
|
||||
ac1, ac2 = acfactory.get_online_accounts(2)
|
||||
ac1.create_chat(ac2)
|
||||
ac2.create_chat(ac1)
|
||||
acfactory.send_message(from_account=ac1, to_account=ac2, text="ping0")
|
||||
snapshot = ac2.get_message_by_id(ac2.wait_for_incoming_msg_event().msg_id).get_snapshot()
|
||||
assert snapshot.text == "ping0"
|
||||
|
||||
def log(msg):
|
||||
print()
|
||||
print("*" * 80 + "\n" + msg + "\n", file=sys.stderr)
|
||||
print()
|
||||
|
||||
# share a webxdc app between ac1 and ac2
|
||||
ac1_webxdc_msg = acfactory.send_message(from_account=ac1, to_account=ac2, text="play", file=path_to_webxdc)
|
||||
ac2_webxdc_msg = ac2.get_message_by_id(ac2.wait_for_incoming_msg_event().msg_id)
|
||||
snapshot = ac2_webxdc_msg.get_snapshot()
|
||||
assert snapshot.text == "play"
|
||||
|
||||
# send iroh announcements simultaneously
|
||||
log("sending ac1 -> ac2 realtime advertisement and additional message")
|
||||
ac1._rpc.send_webxdc_realtime_advertisement(ac1.id, ac1_webxdc_msg.id)
|
||||
acfactory.send_message(from_account=ac1, to_account=ac2, text="ping1")
|
||||
|
||||
log("sending ac2 -> ac1 realtime advertisement and additional message")
|
||||
ac2._rpc.send_webxdc_realtime_advertisement(ac2.id, ac2_webxdc_msg.id)
|
||||
acfactory.send_message(from_account=ac2, to_account=ac1, text="ping2")
|
||||
|
||||
# Ensure that advertisements have been received.
|
||||
|
||||
log("waiting for incoming message on ac2")
|
||||
snapshot = ac2.get_message_by_id(ac2.wait_for_incoming_msg_event().msg_id).get_snapshot()
|
||||
assert snapshot.text == "ping1"
|
||||
|
||||
log("waiting for incoming message on ac1")
|
||||
snapshot = ac1.get_message_by_id(ac1.wait_for_incoming_msg_event().msg_id).get_snapshot()
|
||||
assert snapshot.text == "ping2"
|
||||
|
||||
log("sending realtime data ac1 -> ac2")
|
||||
ac1._rpc.send_webxdc_realtime_data(ac1.id, ac1_webxdc_msg.id, [13, 15, 17])
|
||||
|
||||
log("ac2: waiting for realtime data")
|
||||
while 1:
|
||||
event = ac2.wait_for_event()
|
||||
if event.kind == EventType.WEBXDC_REALTIME_DATA:
|
||||
assert event.data == [13, 15, 17]
|
||||
break
|
||||
@@ -22,6 +22,7 @@ serde = { version = "1.0", features = ["derive"] }
|
||||
tokio = { version = "1.37.0", features = ["io-std"] }
|
||||
tokio-util = "0.7.9"
|
||||
yerpc = { version = "0.5.2", features = ["anyhow_expose", "openrpc"] }
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
|
||||
[features]
|
||||
default = ["vendored"]
|
||||
|
||||
@@ -11,6 +11,7 @@ use deltachat::constants::DC_VERSION_STR;
|
||||
use deltachat_jsonrpc::api::{Accounts, CommandApi};
|
||||
use futures_lite::stream::StreamExt;
|
||||
use tokio::io::{self, AsyncBufReadExt, BufReader};
|
||||
use tracing_subscriber::{prelude::*, EnvFilter};
|
||||
use yerpc::RpcServer as _;
|
||||
|
||||
#[cfg(target_family = "unix")]
|
||||
@@ -62,6 +63,12 @@ async fn main_impl() -> Result<()> {
|
||||
|
||||
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr))
|
||||
.with(EnvFilter::builder().from_env_lossy())
|
||||
.try_init()
|
||||
.ok();
|
||||
|
||||
let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string());
|
||||
log::info!("Starting with accounts directory `{}`.", path);
|
||||
let writable = true;
|
||||
|
||||
@@ -339,7 +339,6 @@ impl Context {
|
||||
) -> Result<Context> {
|
||||
let context =
|
||||
Self::new_closed(dbfile, id, events, stock_strings, Default::default()).await?;
|
||||
|
||||
// Open the database if is not encrypted.
|
||||
if context.check_passphrase("".to_string()).await? {
|
||||
context.sql.open(&context, "".to_string()).await?;
|
||||
@@ -1376,6 +1375,43 @@ pub fn get_version_str() -> &'static str {
|
||||
&DC_VERSION_STR
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
struct CollectVisitor(HashMap<String, String>);
|
||||
|
||||
impl tracing::field::Visit for CollectVisitor {
|
||||
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
||||
self.0.insert(field.to_string(), value.to_string());
|
||||
}
|
||||
|
||||
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
|
||||
self.0.insert(field.to_string(), value.to_string());
|
||||
}
|
||||
|
||||
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
||||
self.0.insert(field.to_string(), value.to_string());
|
||||
}
|
||||
|
||||
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
|
||||
self.0.insert(field.to_string(), value.to_string());
|
||||
}
|
||||
|
||||
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
||||
self.0.insert(field.to_string(), value.to_string());
|
||||
}
|
||||
|
||||
fn record_error(
|
||||
&mut self,
|
||||
field: &tracing::field::Field,
|
||||
value: &(dyn std::error::Error + 'static),
|
||||
) {
|
||||
self.0.insert(field.to_string(), value.to_string());
|
||||
}
|
||||
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
||||
self.0.insert(field.to_string(), format!("{:?}", value));
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use anyhow::Context as _;
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
|
||||
use anyhow::{anyhow, Context as _, Result};
|
||||
use email::Header;
|
||||
use futures_lite::StreamExt;
|
||||
use iroh_gossip::net::{Gossip, JoinTopicFut, GOSSIP_ALPN};
|
||||
use iroh_gossip::proto::{Event as IrohEvent, TopicId};
|
||||
use iroh_net::relay::{RelayMap, RelayUrl};
|
||||
@@ -101,11 +102,6 @@ impl Iroh {
|
||||
self.endpoint.add_node_addr(peer.clone())?;
|
||||
}
|
||||
|
||||
let connect_future = self
|
||||
.gossip
|
||||
.join(topic, peers.into_iter().map(|addr| addr.node_id).collect())
|
||||
.await?;
|
||||
|
||||
let ctx = ctx.clone();
|
||||
let gossip = self.gossip.clone();
|
||||
let subscribe_loop = tokio::spawn(async move {
|
||||
@@ -114,6 +110,11 @@ impl Iroh {
|
||||
}
|
||||
});
|
||||
|
||||
let connect_future = self
|
||||
.gossip
|
||||
.join(topic, peers.into_iter().map(|addr| addr.node_id).collect())
|
||||
.await?;
|
||||
|
||||
self.iroh_channels
|
||||
.write()
|
||||
.await
|
||||
@@ -122,6 +123,21 @@ impl Iroh {
|
||||
Ok(Some(connect_future))
|
||||
}
|
||||
|
||||
/// Add gossip peers to realtime channel if it is already active.
|
||||
pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec<NodeAddr>) -> Result<()> {
|
||||
if let Some(state) = self.iroh_channels.read().await.get(&topic) {
|
||||
if state.subscribe_loop.is_some() {
|
||||
for peer in &peers {
|
||||
self.endpoint.add_node_addr(peer.clone())?;
|
||||
}
|
||||
self.gossip
|
||||
.join(topic, peers.into_iter().map(|peer| peer.node_id).collect())
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send realtime data to the gossip swarm.
|
||||
pub async fn send_webxdc_realtime_data(
|
||||
&self,
|
||||
@@ -226,7 +242,15 @@ impl Context {
|
||||
|
||||
// Shuts down on deltachat shutdown
|
||||
tokio::spawn(endpoint_loop(context, endpoint.clone(), gossip.clone()));
|
||||
|
||||
let endp = endpoint.clone();
|
||||
let gsp = gossip.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut stream = endp.local_endpoints();
|
||||
while let Some(endpoints) = stream.next().await {
|
||||
gsp.update_endpoints(&endpoints)?;
|
||||
}
|
||||
anyhow::Ok(())
|
||||
});
|
||||
Ok(Iroh {
|
||||
endpoint,
|
||||
gossip,
|
||||
@@ -432,13 +456,10 @@ async fn subscribe_loop(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
chat::send_msg,
|
||||
message::{Message, Viewtype},
|
||||
peer_channels::{
|
||||
get_iroh_gossip_peers, get_iroh_topic_for_msg, leave_webxdc_realtime,
|
||||
send_webxdc_realtime_advertisement,
|
||||
},
|
||||
test_utils::TestContextManager,
|
||||
EventType,
|
||||
};
|
||||
@@ -708,4 +729,73 @@ mod tests {
|
||||
false
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_parallel_connect() {
|
||||
let mut tcm = TestContextManager::new();
|
||||
let alice = &mut tcm.alice().await;
|
||||
let bob = &mut tcm.bob().await;
|
||||
|
||||
// Alice sends webxdc to bob
|
||||
let alice_chat = alice.create_chat(bob).await;
|
||||
let mut instance = Message::new(Viewtype::File);
|
||||
instance
|
||||
.set_file_from_bytes(
|
||||
alice,
|
||||
"minimal.xdc",
|
||||
include_bytes!("../test-data/webxdc/minimal.xdc"),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
|
||||
let alice_webxdc = alice.get_last_msg().await;
|
||||
|
||||
let webxdc = alice.pop_sent_msg().await;
|
||||
let bob_webxdc = bob.recv_msg(&webxdc).await;
|
||||
assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
|
||||
|
||||
bob_webxdc.chat_id.accept(bob).await.unwrap();
|
||||
|
||||
eprintln!("Sending advertisements");
|
||||
// Alice advertises herself.
|
||||
let alice_advertisement_future = send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let alice_advertisement = alice.pop_sent_msg().await;
|
||||
|
||||
send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let bob_advertisement = bob.pop_sent_msg().await;
|
||||
|
||||
eprintln!("Receiving advertisements");
|
||||
bob.recv_msg_trash(&alice_advertisement).await;
|
||||
alice.recv_msg_trash(&bob_advertisement).await;
|
||||
|
||||
eprintln!("Alice waits for connection");
|
||||
alice_advertisement_future.await;
|
||||
|
||||
// Alice sends ephemeral message
|
||||
eprintln!("Sending ephemeral message");
|
||||
send_webxdc_realtime_data(alice, alice_webxdc.id, b"alice -> bob".into())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
eprintln!("Waiting for ephemeral message");
|
||||
loop {
|
||||
let event = bob.evtracker.recv().await.unwrap();
|
||||
if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
|
||||
if data == b"alice -> bob" {
|
||||
break;
|
||||
} else {
|
||||
panic!(
|
||||
"Unexpected status update: {}",
|
||||
String::from_utf8_lossy(&data)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1443,7 +1443,8 @@ async fn add_parts(
|
||||
let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
|
||||
let topic = get_iroh_topic_for_msg(context, instance_id).await?;
|
||||
iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;
|
||||
|
||||
let iroh = context.get_or_try_init_peer_channel().await?;
|
||||
iroh.maybe_add_gossip_peers(topic, vec![node_addr]).await?;
|
||||
chat_id = DC_CHAT_ID_TRASH;
|
||||
}
|
||||
Err(err) => {
|
||||
|
||||
Reference in New Issue
Block a user