mirror of
https://github.com/chatmail/core.git
synced 2026-04-02 05:22:14 +03:00
Compare commits
4 Commits
d6dacdcd27
...
iroh11
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
11e4e6adda | ||
|
|
11ae91a67a | ||
|
|
525913d3a3 | ||
|
|
5f70e20760 |
2243
Cargo.lock
generated
2243
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -56,7 +56,8 @@ hex = "0.4.0"
|
||||
hickory-resolver = "0.24"
|
||||
humansize = "2"
|
||||
image = { version = "0.24.7", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] }
|
||||
iroh = { version = "0.4.1", default-features = false }
|
||||
iroh04 = { package = "iroh", version = "0.4.1", default-features = false }
|
||||
iroh = "0.11"
|
||||
kamadak-exif = "0.5"
|
||||
lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" }
|
||||
libc = "0.2"
|
||||
@@ -96,6 +97,7 @@ tokio-util = "0.7.9"
|
||||
toml = "0.7"
|
||||
url = "2"
|
||||
uuid = { version = "1", features = ["serde", "v4"] }
|
||||
quic-rpc = "0.6.1"
|
||||
|
||||
[dev-dependencies]
|
||||
ansi_term = "0.12.0"
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::str::FromStr;
|
||||
|
||||
use anyhow::{ensure, Context as _, Result};
|
||||
use strum::{EnumProperty, IntoEnumIterator};
|
||||
use strum_macros::{AsRefStr, Display, EnumIter, EnumProperty, EnumString};
|
||||
use strum_macros::{AsRefStr, Display, EnumIter, EnumString};
|
||||
|
||||
use crate::blob::BlobObject;
|
||||
use crate::constants::DC_VERSION_STR;
|
||||
@@ -335,6 +335,9 @@ pub enum Config {
|
||||
/// until `chat_id.accept()` is called.
|
||||
#[strum(props(default = "0"))]
|
||||
VerifiedOneOnOneChats,
|
||||
|
||||
/// The iroh document ticket.
|
||||
DocTicket,
|
||||
}
|
||||
|
||||
impl Context {
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::collections::{BTreeMap, HashMap};
|
||||
use std::ffi::OsString;
|
||||
use std::ops::Deref;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
@@ -244,6 +245,8 @@ pub struct InnerContext {
|
||||
/// Standard RwLock instead of [`tokio::sync::RwLock`] is used
|
||||
/// because the lock is used from synchronous [`Context::emit_event`].
|
||||
pub(crate) debug_logging: std::sync::RwLock<Option<DebugLogging>>,
|
||||
|
||||
pub(crate) iroh_node: iroh::node::Node<iroh::bytes::store::flat::Store>,
|
||||
}
|
||||
|
||||
/// The state of ongoing process.
|
||||
@@ -312,7 +315,10 @@ impl Context {
|
||||
if !blobdir.exists() {
|
||||
tokio::fs::create_dir_all(&blobdir).await?;
|
||||
}
|
||||
let context = Context::with_blobdir(dbfile.into(), blobdir, id, events, stockstrings)?;
|
||||
let irohdir = dbfile.with_file_name("iroh");
|
||||
let context =
|
||||
Context::with_blobdir(dbfile.into(), blobdir, irohdir, id, events, stockstrings)
|
||||
.await?;
|
||||
Ok(context)
|
||||
}
|
||||
|
||||
@@ -349,9 +355,10 @@ impl Context {
|
||||
self.sql.check_passphrase(passphrase).await
|
||||
}
|
||||
|
||||
pub(crate) fn with_blobdir(
|
||||
pub(crate) async fn with_blobdir(
|
||||
dbfile: PathBuf,
|
||||
blobdir: PathBuf,
|
||||
irohdir: PathBuf,
|
||||
id: u32,
|
||||
events: Events,
|
||||
stockstrings: StockStrings,
|
||||
@@ -362,6 +369,31 @@ impl Context {
|
||||
blobdir.display()
|
||||
);
|
||||
|
||||
tokio::fs::create_dir_all(&irohdir).await?;
|
||||
let keyfile = irohdir.join("secret-key");
|
||||
let key = if tokio::fs::try_exists(&keyfile).await? {
|
||||
let s = tokio::fs::read_to_string(&keyfile).await?;
|
||||
iroh::net::key::SecretKey::from_str(&s)?
|
||||
} else {
|
||||
let key = iroh::net::key::SecretKey::generate();
|
||||
tokio::fs::write(keyfile, key.to_string()).await?;
|
||||
key
|
||||
};
|
||||
let rt = iroh::bytes::util::runtime::Handle::from_current(1)?;
|
||||
let baostore = iroh::bytes::store::flat::Store::load(
|
||||
irohdir.join("complete"),
|
||||
irohdir.join("partial"),
|
||||
irohdir.join("meta"),
|
||||
&rt,
|
||||
)
|
||||
.await?;
|
||||
let docstore = iroh::sync::store::fs::Store::new(irohdir.join("docs"))?;
|
||||
let iroh_node = iroh::node::Node::builder(baostore, docstore)
|
||||
.secret_key(key)
|
||||
.runtime(&rt)
|
||||
.spawn()
|
||||
.await?;
|
||||
|
||||
let new_msgs_notify = Notify::new();
|
||||
// Notify once immediately to allow processing old messages
|
||||
// without starting I/O.
|
||||
@@ -388,6 +420,7 @@ impl Context {
|
||||
last_full_folder_scan: Mutex::new(None),
|
||||
last_error: std::sync::RwLock::new("".to_string()),
|
||||
debug_logging: std::sync::RwLock::new(None),
|
||||
iroh_node,
|
||||
};
|
||||
|
||||
let ctx = Context {
|
||||
@@ -404,6 +437,19 @@ impl Context {
|
||||
return;
|
||||
}
|
||||
self.scheduler.start(self.clone()).await;
|
||||
|
||||
if let Ok(Some(ticket)) = self.get_config(Config::DocTicket).await {
|
||||
if let Err(err) = self.start_iroh(ticket).await {
|
||||
error!(self, "failed to join iroh doc, {err:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn start_iroh(&self, ticket: String) -> Result<()> {
|
||||
let client = self.iroh_node.client();
|
||||
let _doc = client.docs.import(ticket.parse()?).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stops the IO scheduler.
|
||||
@@ -1242,7 +1288,16 @@ mod tests {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let dbfile = tmp.path().join("db.sqlite");
|
||||
let blobdir = PathBuf::new();
|
||||
let res = Context::with_blobdir(dbfile, blobdir, 1, Events::new(), StockStrings::new());
|
||||
let irohdir = tmp.path().join("iroh");
|
||||
let res = Context::with_blobdir(
|
||||
dbfile,
|
||||
blobdir,
|
||||
irohdir,
|
||||
1,
|
||||
Events::new(),
|
||||
StockStrings::new(),
|
||||
)
|
||||
.await;
|
||||
assert!(res.is_err());
|
||||
}
|
||||
|
||||
@@ -1251,7 +1306,16 @@ mod tests {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let dbfile = tmp.path().join("db.sqlite");
|
||||
let blobdir = tmp.path().join("blobs");
|
||||
let res = Context::with_blobdir(dbfile, blobdir, 1, Events::new(), StockStrings::new());
|
||||
let irohdir = tmp.path().join("iroh");
|
||||
let res = Context::with_blobdir(
|
||||
dbfile,
|
||||
blobdir,
|
||||
irohdir,
|
||||
1,
|
||||
Events::new(),
|
||||
StockStrings::new(),
|
||||
)
|
||||
.await;
|
||||
assert!(res.is_err());
|
||||
}
|
||||
|
||||
|
||||
90
src/imap.rs
90
src/imap.rs
@@ -1379,12 +1379,100 @@ impl Imap {
|
||||
Ok(msgs.into_iter().map(|((_, uid), msg)| (uid, msg)).collect())
|
||||
}
|
||||
|
||||
pub(crate) async fn fetch_many_msgs(
|
||||
&mut self,
|
||||
context: &Context,
|
||||
folder: &str,
|
||||
request_uids: Vec<u32>,
|
||||
uid_message_ids: &BTreeMap<u32, String>,
|
||||
fetch_partially: bool,
|
||||
fetching_existing_messages: bool,
|
||||
) -> Result<(Option<u32>, Vec<ReceivedMsg>)> {
|
||||
let client = context.iroh_node.client();
|
||||
let n_docs = client.docs.list().await?.count().await;
|
||||
if n_docs == 1 {
|
||||
self.fetch_many_msgs_iroh(
|
||||
context,
|
||||
folder,
|
||||
request_uids,
|
||||
uid_message_ids,
|
||||
fetch_partially,
|
||||
fetching_existing_messages,
|
||||
client,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
self.fetch_many_msgs_imap(
|
||||
context,
|
||||
folder,
|
||||
request_uids,
|
||||
uid_message_ids,
|
||||
fetch_partially,
|
||||
fetching_existing_messages,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn fetch_many_msgs_iroh(
|
||||
&mut self,
|
||||
context: &Context,
|
||||
_folder: &str,
|
||||
request_uids: Vec<u32>,
|
||||
uid_message_ids: &BTreeMap<u32, String>,
|
||||
_fetch_partially: bool,
|
||||
_fetching_existing_messages: bool,
|
||||
// client: iroh::client::quic::Iroh,
|
||||
client: iroh::client::Iroh<
|
||||
quic_rpc::transport::flume::FlumeConnection<
|
||||
iroh::rpc_protocol::ProviderResponse,
|
||||
iroh::rpc_protocol::ProviderRequest,
|
||||
>,
|
||||
>,
|
||||
) -> Result<(Option<u32>, Vec<ReceivedMsg>)> {
|
||||
let mut received_msgs = Vec::new();
|
||||
let mut last_uid = None;
|
||||
|
||||
let (id, _caps) = client.docs.list().await?.next().await.context("no doc")??;
|
||||
let doc = client.docs.open(id).await?.context("where is my doc?")?;
|
||||
|
||||
for request_uid in request_uids {
|
||||
let rfc724_mid = if let Some(rfc724_mid) = uid_message_ids.get(&request_uid) {
|
||||
rfc724_mid
|
||||
} else {
|
||||
error!(
|
||||
context,
|
||||
"No Message-ID corresponding to UID {} passed in uid_messsage_ids.",
|
||||
request_uid
|
||||
);
|
||||
continue;
|
||||
};
|
||||
let key = format!("/inbox/{rfc724_mid}");
|
||||
warn!(context, "Reading iroh key {key}");
|
||||
let query = iroh::sync::store::Query::key_exact(&key).build();
|
||||
let entry = doc.get_one(query).await?.context("entry not found")?;
|
||||
let imf_raw = doc.read_to_bytes(&entry).await?;
|
||||
info!(
|
||||
context,
|
||||
"Passing message UID {} to receive_imf().", request_uid
|
||||
);
|
||||
match receive_imf_inner(context, rfc724_mid, &imf_raw, false, None, false).await {
|
||||
Ok(Some(msg)) => received_msgs.push(msg),
|
||||
Ok(None) => (),
|
||||
Err(err) => warn!(context, "receive_imf error: {err:#}"),
|
||||
}
|
||||
|
||||
last_uid = Some(request_uid);
|
||||
}
|
||||
Ok((last_uid, received_msgs))
|
||||
}
|
||||
|
||||
/// Fetches a list of messages by server UID.
|
||||
///
|
||||
/// Returns the last UID fetched successfully and the info about each downloaded message.
|
||||
/// If the message is incorrect or there is a failure to write a message to the database,
|
||||
/// it is skipped and the error is logged.
|
||||
pub(crate) async fn fetch_many_msgs(
|
||||
pub(crate) async fn fetch_many_msgs_imap(
|
||||
&mut self,
|
||||
context: &Context,
|
||||
folder: &str,
|
||||
|
||||
@@ -32,12 +32,12 @@ use std::task::Poll;
|
||||
use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
|
||||
use async_channel::Receiver;
|
||||
use futures_lite::StreamExt;
|
||||
use iroh::blobs::Collection;
|
||||
use iroh::get::DataStream;
|
||||
use iroh::progress::ProgressEmitter;
|
||||
use iroh::protocol::AuthToken;
|
||||
use iroh::provider::{DataSource, Event, Provider, Ticket};
|
||||
use iroh::Hash;
|
||||
use iroh04::blobs::Collection;
|
||||
use iroh04::get::DataStream;
|
||||
use iroh04::progress::ProgressEmitter;
|
||||
use iroh04::protocol::AuthToken;
|
||||
use iroh04::provider::{DataSource, Event, Provider, Ticket};
|
||||
use iroh04::Hash;
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::{self, AsyncWriteExt, BufWriter};
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
@@ -176,7 +176,7 @@ impl BackupProvider {
|
||||
}
|
||||
|
||||
// Start listening.
|
||||
let (db, hash) = iroh::provider::create_collection(files).await?;
|
||||
let (db, hash) = iroh04::provider::create_collection(files).await?;
|
||||
context.emit_event(SendProgress::CollectionCreated.into());
|
||||
let provider = Provider::builder(db)
|
||||
.bind_addr((Ipv4Addr::UNSPECIFIED, 0).into())
|
||||
@@ -382,7 +382,7 @@ impl From<SendProgress> for EventType {
|
||||
/// This is a long running operation which will only when completed.
|
||||
///
|
||||
/// Using [`Qr`] as argument is a bit odd as it only accepts one specific variant of it. It
|
||||
/// does avoid having [`iroh::provider::Ticket`] in the primary API however, without
|
||||
/// does avoid having [`iroh04::provider::Ticket`] in the primary API however, without
|
||||
/// having to revert to untyped bytes.
|
||||
pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
|
||||
ensure!(
|
||||
@@ -456,7 +456,7 @@ async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<()
|
||||
|
||||
// Perform the transfer.
|
||||
let keylog = false; // Do not enable rustls SSLKEYLOGFILE env var functionality
|
||||
let stats = iroh::get::run_ticket(
|
||||
let stats = iroh04::get::run_ticket(
|
||||
ticket,
|
||||
keylog,
|
||||
MAX_CONCURRENT_DIALS,
|
||||
|
||||
@@ -114,7 +114,7 @@ pub enum Qr {
|
||||
/// information to connect to and authenticate a backup provider.
|
||||
///
|
||||
/// The format is somewhat opaque, but `sendme` can deserialise this.
|
||||
ticket: iroh::provider::Ticket,
|
||||
ticket: iroh04::provider::Ticket,
|
||||
},
|
||||
|
||||
/// Ask the user if they want to use the given service for video chats.
|
||||
@@ -497,12 +497,12 @@ fn decode_webrtc_instance(_context: &Context, qr: &str) -> Result<Qr> {
|
||||
/// Decodes a [`DCBACKUP_SCHEME`] QR code.
|
||||
///
|
||||
/// The format of this scheme is `DCBACKUP:<encoded ticket>`. The encoding is the
|
||||
/// [`iroh::provider::Ticket`]'s `Display` impl.
|
||||
/// [`iroh04::provider::Ticket`]'s `Display` impl.
|
||||
fn decode_backup(qr: &str) -> Result<Qr> {
|
||||
let payload = qr
|
||||
.strip_prefix(DCBACKUP_SCHEME)
|
||||
.ok_or_else(|| anyhow!("invalid DCBACKUP scheme"))?;
|
||||
let ticket: iroh::provider::Ticket = payload.parse().context("invalid DCBACKUP payload")?;
|
||||
let ticket: iroh04::provider::Ticket = payload.parse().context("invalid DCBACKUP payload")?;
|
||||
Ok(Qr::Backup { ticket })
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user