mirror of
https://github.com/chatmail/core.git
synced 2026-04-17 21:46:35 +03:00
Hook up reading messages from an iroh doc
If the DocTicket is configured then we join the doc. If we have joined the doc then we will try and fetch the individual message from iroh. However it is still pre-fetched from imap first so that we keep respecting the imap order.
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1471,6 +1471,7 @@ dependencies = [
|
||||
"pretty_env_logger",
|
||||
"proptest",
|
||||
"qrcodegen",
|
||||
"quic-rpc 0.6.1",
|
||||
"quick-xml",
|
||||
"rand 0.8.5",
|
||||
"ratelimit",
|
||||
|
||||
@@ -97,6 +97,7 @@ tokio-util = "0.7.9"
|
||||
toml = "0.7"
|
||||
url = "2"
|
||||
uuid = { version = "1", features = ["serde", "v4"] }
|
||||
quic-rpc = "0.6.1"
|
||||
|
||||
[dev-dependencies]
|
||||
ansi_term = "0.12.0"
|
||||
|
||||
@@ -335,6 +335,9 @@ pub enum Config {
|
||||
/// until `chat_id.accept()` is called.
|
||||
#[strum(props(default = "0"))]
|
||||
VerifiedOneOnOneChats,
|
||||
|
||||
/// The iroh document ticket.
|
||||
DocTicket,
|
||||
}
|
||||
|
||||
impl Context {
|
||||
|
||||
@@ -437,6 +437,19 @@ impl Context {
|
||||
return;
|
||||
}
|
||||
self.scheduler.start(self.clone()).await;
|
||||
|
||||
if let Ok(Some(ticket)) = self.get_config(Config::DocTicket).await {
|
||||
if let Err(err) = self.start_iroh(ticket).await {
|
||||
error!(self, "failed to join iroh doc, {err:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn start_iroh(&self, ticket: String) -> Result<()> {
|
||||
let client = self.iroh_node.client();
|
||||
let _doc = client.docs.import(ticket.parse()?).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stops the IO scheduler.
|
||||
|
||||
90
src/imap.rs
90
src/imap.rs
@@ -1379,12 +1379,100 @@ impl Imap {
|
||||
Ok(msgs.into_iter().map(|((_, uid), msg)| (uid, msg)).collect())
|
||||
}
|
||||
|
||||
pub(crate) async fn fetch_many_msgs(
|
||||
&mut self,
|
||||
context: &Context,
|
||||
folder: &str,
|
||||
request_uids: Vec<u32>,
|
||||
uid_message_ids: &BTreeMap<u32, String>,
|
||||
fetch_partially: bool,
|
||||
fetching_existing_messages: bool,
|
||||
) -> Result<(Option<u32>, Vec<ReceivedMsg>)> {
|
||||
let client = context.iroh_node.client();
|
||||
let n_docs = client.docs.list().await?.count().await;
|
||||
if n_docs == 1 {
|
||||
self.fetch_many_msgs_iroh(
|
||||
context,
|
||||
folder,
|
||||
request_uids,
|
||||
uid_message_ids,
|
||||
fetch_partially,
|
||||
fetching_existing_messages,
|
||||
client,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
self.fetch_many_msgs_imap(
|
||||
context,
|
||||
folder,
|
||||
request_uids,
|
||||
uid_message_ids,
|
||||
fetch_partially,
|
||||
fetching_existing_messages,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn fetch_many_msgs_iroh(
|
||||
&mut self,
|
||||
context: &Context,
|
||||
_folder: &str,
|
||||
request_uids: Vec<u32>,
|
||||
uid_message_ids: &BTreeMap<u32, String>,
|
||||
_fetch_partially: bool,
|
||||
_fetching_existing_messages: bool,
|
||||
// client: iroh::client::quic::Iroh,
|
||||
client: iroh::client::Iroh<
|
||||
quic_rpc::transport::flume::FlumeConnection<
|
||||
iroh::rpc_protocol::ProviderResponse,
|
||||
iroh::rpc_protocol::ProviderRequest,
|
||||
>,
|
||||
>,
|
||||
) -> Result<(Option<u32>, Vec<ReceivedMsg>)> {
|
||||
let mut received_msgs = Vec::new();
|
||||
let mut last_uid = None;
|
||||
|
||||
let (id, _caps) = client.docs.list().await?.next().await.context("no doc")??;
|
||||
let doc = client.docs.open(id).await?.context("where is my doc?")?;
|
||||
|
||||
for request_uid in request_uids {
|
||||
let rfc724_mid = if let Some(rfc724_mid) = uid_message_ids.get(&request_uid) {
|
||||
rfc724_mid
|
||||
} else {
|
||||
error!(
|
||||
context,
|
||||
"No Message-ID corresponding to UID {} passed in uid_messsage_ids.",
|
||||
request_uid
|
||||
);
|
||||
continue;
|
||||
};
|
||||
let key = format!("/inbox/{rfc724_mid}");
|
||||
warn!(context, "Reading iroh key {key}");
|
||||
let query = iroh::sync::store::Query::key_exact(&key).build();
|
||||
let entry = doc.get_one(query).await?.context("entry not found")?;
|
||||
let imf_raw = doc.read_to_bytes(&entry).await?;
|
||||
info!(
|
||||
context,
|
||||
"Passing message UID {} to receive_imf().", request_uid
|
||||
);
|
||||
match receive_imf_inner(context, rfc724_mid, &imf_raw, false, None, false).await {
|
||||
Ok(Some(msg)) => received_msgs.push(msg),
|
||||
Ok(None) => (),
|
||||
Err(err) => warn!(context, "receive_imf error: {err:#}"),
|
||||
}
|
||||
|
||||
last_uid = Some(request_uid);
|
||||
}
|
||||
Ok((last_uid, received_msgs))
|
||||
}
|
||||
|
||||
/// Fetches a list of messages by server UID.
|
||||
///
|
||||
/// Returns the last UID fetched successfully and the info about each downloaded message.
|
||||
/// If the message is incorrect or there is a failure to write a message to the database,
|
||||
/// it is skipped and the error is logged.
|
||||
pub(crate) async fn fetch_many_msgs(
|
||||
pub(crate) async fn fetch_many_msgs_imap(
|
||||
&mut self,
|
||||
context: &Context,
|
||||
folder: &str,
|
||||
|
||||
Reference in New Issue
Block a user