diff --git a/Cargo.lock b/Cargo.lock index 0e42c09d5..e30ed1164 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1471,6 +1471,7 @@ dependencies = [ "pretty_env_logger", "proptest", "qrcodegen", + "quic-rpc 0.6.1", "quick-xml", "rand 0.8.5", "ratelimit", diff --git a/Cargo.toml b/Cargo.toml index 653bc8317..a5a5efa4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,6 +97,7 @@ tokio-util = "0.7.9" toml = "0.7" url = "2" uuid = { version = "1", features = ["serde", "v4"] } +quic-rpc = "0.6.1" [dev-dependencies] ansi_term = "0.12.0" diff --git a/src/config.rs b/src/config.rs index ab0140379..00d344821 100644 --- a/src/config.rs +++ b/src/config.rs @@ -335,6 +335,9 @@ pub enum Config { /// until `chat_id.accept()` is called. #[strum(props(default = "0"))] VerifiedOneOnOneChats, + + /// The iroh document ticket. + DocTicket, } impl Context { diff --git a/src/context.rs b/src/context.rs index 3a02eabfe..2dd62a463 100644 --- a/src/context.rs +++ b/src/context.rs @@ -437,6 +437,19 @@ impl Context { return; } self.scheduler.start(self.clone()).await; + + if let Ok(Some(ticket)) = self.get_config(Config::DocTicket).await { + if let Err(err) = self.start_iroh(ticket).await { + error!(self, "failed to join iroh doc, {err:#}"); + } + } + } + + async fn start_iroh(&self, ticket: String) -> Result<()> { + let client = self.iroh_node.client(); + let _doc = client.docs.import(ticket.parse()?).await?; + + Ok(()) } /// Stops the IO scheduler. diff --git a/src/imap.rs b/src/imap.rs index 3fc61bb1d..1e4734018 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -1379,12 +1379,100 @@ impl Imap { Ok(msgs.into_iter().map(|((_, uid), msg)| (uid, msg)).collect()) } + pub(crate) async fn fetch_many_msgs( + &mut self, + context: &Context, + folder: &str, + request_uids: Vec, + uid_message_ids: &BTreeMap, + fetch_partially: bool, + fetching_existing_messages: bool, + ) -> Result<(Option, Vec)> { + let client = context.iroh_node.client(); + let n_docs = client.docs.list().await?.count().await; + if n_docs == 1 { + self.fetch_many_msgs_iroh( + context, + folder, + request_uids, + uid_message_ids, + fetch_partially, + fetching_existing_messages, + client, + ) + .await + } else { + self.fetch_many_msgs_imap( + context, + folder, + request_uids, + uid_message_ids, + fetch_partially, + fetching_existing_messages, + ) + .await + } + } + + pub(crate) async fn fetch_many_msgs_iroh( + &mut self, + context: &Context, + _folder: &str, + request_uids: Vec, + uid_message_ids: &BTreeMap, + _fetch_partially: bool, + _fetching_existing_messages: bool, + // client: iroh::client::quic::Iroh, + client: iroh::client::Iroh< + quic_rpc::transport::flume::FlumeConnection< + iroh::rpc_protocol::ProviderResponse, + iroh::rpc_protocol::ProviderRequest, + >, + >, + ) -> Result<(Option, Vec)> { + let mut received_msgs = Vec::new(); + let mut last_uid = None; + + let (id, _caps) = client.docs.list().await?.next().await.context("no doc")??; + let doc = client.docs.open(id).await?.context("where is my doc?")?; + + for request_uid in request_uids { + let rfc724_mid = if let Some(rfc724_mid) = uid_message_ids.get(&request_uid) { + rfc724_mid + } else { + error!( + context, + "No Message-ID corresponding to UID {} passed in uid_messsage_ids.", + request_uid + ); + continue; + }; + let key = format!("/inbox/{rfc724_mid}"); + warn!(context, "Reading iroh key {key}"); + let query = iroh::sync::store::Query::key_exact(&key).build(); + let entry = doc.get_one(query).await?.context("entry not found")?; + let imf_raw = doc.read_to_bytes(&entry).await?; + info!( + context, + "Passing message UID {} to receive_imf().", request_uid + ); + match receive_imf_inner(context, rfc724_mid, &imf_raw, false, None, false).await { + Ok(Some(msg)) => received_msgs.push(msg), + Ok(None) => (), + Err(err) => warn!(context, "receive_imf error: {err:#}"), + } + + last_uid = Some(request_uid); + } + Ok((last_uid, received_msgs)) + } + /// Fetches a list of messages by server UID. /// /// Returns the last UID fetched successfully and the info about each downloaded message. /// If the message is incorrect or there is a failure to write a message to the database, /// it is skipped and the error is logged. - pub(crate) async fn fetch_many_msgs( + pub(crate) async fn fetch_many_msgs_imap( &mut self, context: &Context, folder: &str,