From 11e4e6adda1f73d3dd912109ee209891b3a27225 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 21 Nov 2023 17:55:06 +0100 Subject: [PATCH] Hook up reading messages from an iroh doc If the DocTicket is configured then we join the doc. If we have joined the doc then we will try and fetch the individual message from iroh. However it is still pre-fetched from imap first so that we keep respecting the imap order. --- Cargo.lock | 1 + Cargo.toml | 1 + src/config.rs | 3 ++ src/context.rs | 13 ++++++++ src/imap.rs | 90 +++++++++++++++++++++++++++++++++++++++++++++++++- 5 files changed, 107 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 0e42c09d5..e30ed1164 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1471,6 +1471,7 @@ dependencies = [ "pretty_env_logger", "proptest", "qrcodegen", + "quic-rpc 0.6.1", "quick-xml", "rand 0.8.5", "ratelimit", diff --git a/Cargo.toml b/Cargo.toml index 653bc8317..a5a5efa4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,6 +97,7 @@ tokio-util = "0.7.9" toml = "0.7" url = "2" uuid = { version = "1", features = ["serde", "v4"] } +quic-rpc = "0.6.1" [dev-dependencies] ansi_term = "0.12.0" diff --git a/src/config.rs b/src/config.rs index ab0140379..00d344821 100644 --- a/src/config.rs +++ b/src/config.rs @@ -335,6 +335,9 @@ pub enum Config { /// until `chat_id.accept()` is called. #[strum(props(default = "0"))] VerifiedOneOnOneChats, + + /// The iroh document ticket. + DocTicket, } impl Context { diff --git a/src/context.rs b/src/context.rs index 3a02eabfe..2dd62a463 100644 --- a/src/context.rs +++ b/src/context.rs @@ -437,6 +437,19 @@ impl Context { return; } self.scheduler.start(self.clone()).await; + + if let Ok(Some(ticket)) = self.get_config(Config::DocTicket).await { + if let Err(err) = self.start_iroh(ticket).await { + error!(self, "failed to join iroh doc, {err:#}"); + } + } + } + + async fn start_iroh(&self, ticket: String) -> Result<()> { + let client = self.iroh_node.client(); + let _doc = client.docs.import(ticket.parse()?).await?; + + Ok(()) } /// Stops the IO scheduler. diff --git a/src/imap.rs b/src/imap.rs index 3fc61bb1d..1e4734018 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -1379,12 +1379,100 @@ impl Imap { Ok(msgs.into_iter().map(|((_, uid), msg)| (uid, msg)).collect()) } + pub(crate) async fn fetch_many_msgs( + &mut self, + context: &Context, + folder: &str, + request_uids: Vec, + uid_message_ids: &BTreeMap, + fetch_partially: bool, + fetching_existing_messages: bool, + ) -> Result<(Option, Vec)> { + let client = context.iroh_node.client(); + let n_docs = client.docs.list().await?.count().await; + if n_docs == 1 { + self.fetch_many_msgs_iroh( + context, + folder, + request_uids, + uid_message_ids, + fetch_partially, + fetching_existing_messages, + client, + ) + .await + } else { + self.fetch_many_msgs_imap( + context, + folder, + request_uids, + uid_message_ids, + fetch_partially, + fetching_existing_messages, + ) + .await + } + } + + pub(crate) async fn fetch_many_msgs_iroh( + &mut self, + context: &Context, + _folder: &str, + request_uids: Vec, + uid_message_ids: &BTreeMap, + _fetch_partially: bool, + _fetching_existing_messages: bool, + // client: iroh::client::quic::Iroh, + client: iroh::client::Iroh< + quic_rpc::transport::flume::FlumeConnection< + iroh::rpc_protocol::ProviderResponse, + iroh::rpc_protocol::ProviderRequest, + >, + >, + ) -> Result<(Option, Vec)> { + let mut received_msgs = Vec::new(); + let mut last_uid = None; + + let (id, _caps) = client.docs.list().await?.next().await.context("no doc")??; + let doc = client.docs.open(id).await?.context("where is my doc?")?; + + for request_uid in request_uids { + let rfc724_mid = if let Some(rfc724_mid) = uid_message_ids.get(&request_uid) { + rfc724_mid + } else { + error!( + context, + "No Message-ID corresponding to UID {} passed in uid_messsage_ids.", + request_uid + ); + continue; + }; + let key = format!("/inbox/{rfc724_mid}"); + warn!(context, "Reading iroh key {key}"); + let query = iroh::sync::store::Query::key_exact(&key).build(); + let entry = doc.get_one(query).await?.context("entry not found")?; + let imf_raw = doc.read_to_bytes(&entry).await?; + info!( + context, + "Passing message UID {} to receive_imf().", request_uid + ); + match receive_imf_inner(context, rfc724_mid, &imf_raw, false, None, false).await { + Ok(Some(msg)) => received_msgs.push(msg), + Ok(None) => (), + Err(err) => warn!(context, "receive_imf error: {err:#}"), + } + + last_uid = Some(request_uid); + } + Ok((last_uid, received_msgs)) + } + /// Fetches a list of messages by server UID. /// /// Returns the last UID fetched successfully and the info about each downloaded message. /// If the message is incorrect or there is a failure to write a message to the database, /// it is skipped and the error is logged. - pub(crate) async fn fetch_many_msgs( + pub(crate) async fn fetch_many_msgs_imap( &mut self, context: &Context, folder: &str,