Compare commits

...

4 Commits

Author SHA1 Message Date
Floris Bruynooghe
11e4e6adda Hook up reading messages from an iroh doc
If the DocTicket is configured then we join the doc.  If we have
joined the doc then we will try and fetch the individual message from
iroh.  However it is still pre-fetched from imap first so that we keep
respecting the imap order.
2023-11-21 17:55:06 +01:00
Floris Bruynooghe
11ae91a67a Create an iroh node 2023-11-21 15:39:47 +01:00
Floris Bruynooghe
525913d3a3 Add iroh11 dep 2023-11-21 14:44:54 +01:00
Floris Bruynooghe
5f70e20760 Rename old iroh to iroh04
This should prepare for adding and iroh11 node.
2023-11-21 14:37:39 +01:00
7 changed files with 2316 additions and 122 deletions

2243
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -56,7 +56,8 @@ hex = "0.4.0"
hickory-resolver = "0.24"
humansize = "2"
image = { version = "0.24.7", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] }
iroh = { version = "0.4.1", default-features = false }
iroh04 = { package = "iroh", version = "0.4.1", default-features = false }
iroh = "0.11"
kamadak-exif = "0.5"
lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" }
libc = "0.2"
@@ -96,6 +97,7 @@ tokio-util = "0.7.9"
toml = "0.7"
url = "2"
uuid = { version = "1", features = ["serde", "v4"] }
quic-rpc = "0.6.1"
[dev-dependencies]
ansi_term = "0.12.0"

View File

@@ -6,7 +6,7 @@ use std::str::FromStr;
use anyhow::{ensure, Context as _, Result};
use strum::{EnumProperty, IntoEnumIterator};
use strum_macros::{AsRefStr, Display, EnumIter, EnumProperty, EnumString};
use strum_macros::{AsRefStr, Display, EnumIter, EnumString};
use crate::blob::BlobObject;
use crate::constants::DC_VERSION_STR;
@@ -335,6 +335,9 @@ pub enum Config {
/// until `chat_id.accept()` is called.
#[strum(props(default = "0"))]
VerifiedOneOnOneChats,
/// The iroh document ticket.
DocTicket,
}
impl Context {

View File

@@ -4,6 +4,7 @@ use std::collections::{BTreeMap, HashMap};
use std::ffi::OsString;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
@@ -244,6 +245,8 @@ pub struct InnerContext {
/// Standard RwLock instead of [`tokio::sync::RwLock`] is used
/// because the lock is used from synchronous [`Context::emit_event`].
pub(crate) debug_logging: std::sync::RwLock<Option<DebugLogging>>,
pub(crate) iroh_node: iroh::node::Node<iroh::bytes::store::flat::Store>,
}
/// The state of ongoing process.
@@ -312,7 +315,10 @@ impl Context {
if !blobdir.exists() {
tokio::fs::create_dir_all(&blobdir).await?;
}
let context = Context::with_blobdir(dbfile.into(), blobdir, id, events, stockstrings)?;
let irohdir = dbfile.with_file_name("iroh");
let context =
Context::with_blobdir(dbfile.into(), blobdir, irohdir, id, events, stockstrings)
.await?;
Ok(context)
}
@@ -349,9 +355,10 @@ impl Context {
self.sql.check_passphrase(passphrase).await
}
pub(crate) fn with_blobdir(
pub(crate) async fn with_blobdir(
dbfile: PathBuf,
blobdir: PathBuf,
irohdir: PathBuf,
id: u32,
events: Events,
stockstrings: StockStrings,
@@ -362,6 +369,31 @@ impl Context {
blobdir.display()
);
tokio::fs::create_dir_all(&irohdir).await?;
let keyfile = irohdir.join("secret-key");
let key = if tokio::fs::try_exists(&keyfile).await? {
let s = tokio::fs::read_to_string(&keyfile).await?;
iroh::net::key::SecretKey::from_str(&s)?
} else {
let key = iroh::net::key::SecretKey::generate();
tokio::fs::write(keyfile, key.to_string()).await?;
key
};
let rt = iroh::bytes::util::runtime::Handle::from_current(1)?;
let baostore = iroh::bytes::store::flat::Store::load(
irohdir.join("complete"),
irohdir.join("partial"),
irohdir.join("meta"),
&rt,
)
.await?;
let docstore = iroh::sync::store::fs::Store::new(irohdir.join("docs"))?;
let iroh_node = iroh::node::Node::builder(baostore, docstore)
.secret_key(key)
.runtime(&rt)
.spawn()
.await?;
let new_msgs_notify = Notify::new();
// Notify once immediately to allow processing old messages
// without starting I/O.
@@ -388,6 +420,7 @@ impl Context {
last_full_folder_scan: Mutex::new(None),
last_error: std::sync::RwLock::new("".to_string()),
debug_logging: std::sync::RwLock::new(None),
iroh_node,
};
let ctx = Context {
@@ -404,6 +437,19 @@ impl Context {
return;
}
self.scheduler.start(self.clone()).await;
if let Ok(Some(ticket)) = self.get_config(Config::DocTicket).await {
if let Err(err) = self.start_iroh(ticket).await {
error!(self, "failed to join iroh doc, {err:#}");
}
}
}
async fn start_iroh(&self, ticket: String) -> Result<()> {
let client = self.iroh_node.client();
let _doc = client.docs.import(ticket.parse()?).await?;
Ok(())
}
/// Stops the IO scheduler.
@@ -1242,7 +1288,16 @@ mod tests {
let tmp = tempfile::tempdir().unwrap();
let dbfile = tmp.path().join("db.sqlite");
let blobdir = PathBuf::new();
let res = Context::with_blobdir(dbfile, blobdir, 1, Events::new(), StockStrings::new());
let irohdir = tmp.path().join("iroh");
let res = Context::with_blobdir(
dbfile,
blobdir,
irohdir,
1,
Events::new(),
StockStrings::new(),
)
.await;
assert!(res.is_err());
}
@@ -1251,7 +1306,16 @@ mod tests {
let tmp = tempfile::tempdir().unwrap();
let dbfile = tmp.path().join("db.sqlite");
let blobdir = tmp.path().join("blobs");
let res = Context::with_blobdir(dbfile, blobdir, 1, Events::new(), StockStrings::new());
let irohdir = tmp.path().join("iroh");
let res = Context::with_blobdir(
dbfile,
blobdir,
irohdir,
1,
Events::new(),
StockStrings::new(),
)
.await;
assert!(res.is_err());
}

View File

@@ -1379,12 +1379,100 @@ impl Imap {
Ok(msgs.into_iter().map(|((_, uid), msg)| (uid, msg)).collect())
}
pub(crate) async fn fetch_many_msgs(
&mut self,
context: &Context,
folder: &str,
request_uids: Vec<u32>,
uid_message_ids: &BTreeMap<u32, String>,
fetch_partially: bool,
fetching_existing_messages: bool,
) -> Result<(Option<u32>, Vec<ReceivedMsg>)> {
let client = context.iroh_node.client();
let n_docs = client.docs.list().await?.count().await;
if n_docs == 1 {
self.fetch_many_msgs_iroh(
context,
folder,
request_uids,
uid_message_ids,
fetch_partially,
fetching_existing_messages,
client,
)
.await
} else {
self.fetch_many_msgs_imap(
context,
folder,
request_uids,
uid_message_ids,
fetch_partially,
fetching_existing_messages,
)
.await
}
}
pub(crate) async fn fetch_many_msgs_iroh(
&mut self,
context: &Context,
_folder: &str,
request_uids: Vec<u32>,
uid_message_ids: &BTreeMap<u32, String>,
_fetch_partially: bool,
_fetching_existing_messages: bool,
// client: iroh::client::quic::Iroh,
client: iroh::client::Iroh<
quic_rpc::transport::flume::FlumeConnection<
iroh::rpc_protocol::ProviderResponse,
iroh::rpc_protocol::ProviderRequest,
>,
>,
) -> Result<(Option<u32>, Vec<ReceivedMsg>)> {
let mut received_msgs = Vec::new();
let mut last_uid = None;
let (id, _caps) = client.docs.list().await?.next().await.context("no doc")??;
let doc = client.docs.open(id).await?.context("where is my doc?")?;
for request_uid in request_uids {
let rfc724_mid = if let Some(rfc724_mid) = uid_message_ids.get(&request_uid) {
rfc724_mid
} else {
error!(
context,
"No Message-ID corresponding to UID {} passed in uid_messsage_ids.",
request_uid
);
continue;
};
let key = format!("/inbox/{rfc724_mid}");
warn!(context, "Reading iroh key {key}");
let query = iroh::sync::store::Query::key_exact(&key).build();
let entry = doc.get_one(query).await?.context("entry not found")?;
let imf_raw = doc.read_to_bytes(&entry).await?;
info!(
context,
"Passing message UID {} to receive_imf().", request_uid
);
match receive_imf_inner(context, rfc724_mid, &imf_raw, false, None, false).await {
Ok(Some(msg)) => received_msgs.push(msg),
Ok(None) => (),
Err(err) => warn!(context, "receive_imf error: {err:#}"),
}
last_uid = Some(request_uid);
}
Ok((last_uid, received_msgs))
}
/// Fetches a list of messages by server UID.
///
/// Returns the last UID fetched successfully and the info about each downloaded message.
/// If the message is incorrect or there is a failure to write a message to the database,
/// it is skipped and the error is logged.
pub(crate) async fn fetch_many_msgs(
pub(crate) async fn fetch_many_msgs_imap(
&mut self,
context: &Context,
folder: &str,

View File

@@ -32,12 +32,12 @@ use std::task::Poll;
use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
use async_channel::Receiver;
use futures_lite::StreamExt;
use iroh::blobs::Collection;
use iroh::get::DataStream;
use iroh::progress::ProgressEmitter;
use iroh::protocol::AuthToken;
use iroh::provider::{DataSource, Event, Provider, Ticket};
use iroh::Hash;
use iroh04::blobs::Collection;
use iroh04::get::DataStream;
use iroh04::progress::ProgressEmitter;
use iroh04::protocol::AuthToken;
use iroh04::provider::{DataSource, Event, Provider, Ticket};
use iroh04::Hash;
use tokio::fs::{self, File};
use tokio::io::{self, AsyncWriteExt, BufWriter};
use tokio::sync::broadcast::error::RecvError;
@@ -176,7 +176,7 @@ impl BackupProvider {
}
// Start listening.
let (db, hash) = iroh::provider::create_collection(files).await?;
let (db, hash) = iroh04::provider::create_collection(files).await?;
context.emit_event(SendProgress::CollectionCreated.into());
let provider = Provider::builder(db)
.bind_addr((Ipv4Addr::UNSPECIFIED, 0).into())
@@ -382,7 +382,7 @@ impl From<SendProgress> for EventType {
/// This is a long running operation which will only when completed.
///
/// Using [`Qr`] as argument is a bit odd as it only accepts one specific variant of it. It
/// does avoid having [`iroh::provider::Ticket`] in the primary API however, without
/// does avoid having [`iroh04::provider::Ticket`] in the primary API however, without
/// having to revert to untyped bytes.
pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
ensure!(
@@ -456,7 +456,7 @@ async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<()
// Perform the transfer.
let keylog = false; // Do not enable rustls SSLKEYLOGFILE env var functionality
let stats = iroh::get::run_ticket(
let stats = iroh04::get::run_ticket(
ticket,
keylog,
MAX_CONCURRENT_DIALS,

View File

@@ -114,7 +114,7 @@ pub enum Qr {
/// information to connect to and authenticate a backup provider.
///
/// The format is somewhat opaque, but `sendme` can deserialise this.
ticket: iroh::provider::Ticket,
ticket: iroh04::provider::Ticket,
},
/// Ask the user if they want to use the given service for video chats.
@@ -497,12 +497,12 @@ fn decode_webrtc_instance(_context: &Context, qr: &str) -> Result<Qr> {
/// Decodes a [`DCBACKUP_SCHEME`] QR code.
///
/// The format of this scheme is `DCBACKUP:<encoded ticket>`. The encoding is the
/// [`iroh::provider::Ticket`]'s `Display` impl.
/// [`iroh04::provider::Ticket`]'s `Display` impl.
fn decode_backup(qr: &str) -> Result<Qr> {
let payload = qr
.strip_prefix(DCBACKUP_SCHEME)
.ok_or_else(|| anyhow!("invalid DCBACKUP scheme"))?;
let ticket: iroh::provider::Ticket = payload.parse().context("invalid DCBACKUP payload")?;
let ticket: iroh04::provider::Ticket = payload.parse().context("invalid DCBACKUP payload")?;
Ok(Qr::Backup { ticket })
}