mirror of
https://github.com/chatmail/core.git
synced 2026-04-05 15:02:11 +03:00
Compare commits
25 Commits
v1.157.3
...
simon/toki
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f029e442ff | ||
|
|
fe6543cd7a | ||
|
|
6845e81608 | ||
|
|
e4ccb5287b | ||
|
|
e9d32c1bc5 | ||
|
|
fbf2538fa2 | ||
|
|
3d4e96897e | ||
|
|
3ff271ba93 | ||
|
|
95aaaee43c | ||
|
|
087050c27d | ||
|
|
1404cbd3ef | ||
|
|
b965db6657 | ||
|
|
6749b90ff5 | ||
|
|
7bdac58040 | ||
|
|
06c4eb9df0 | ||
|
|
7eb34a56f6 | ||
|
|
6b0b551973 | ||
|
|
09aa9712c4 | ||
|
|
de7d6753a9 | ||
|
|
92c10e56d3 | ||
|
|
c71a6c4af9 | ||
|
|
308053dc44 | ||
|
|
f37fb9574d | ||
|
|
baa8da86ba | ||
|
|
2cd4af576a |
225
Cargo.lock
generated
225
Cargo.lock
generated
@@ -358,6 +358,28 @@ dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-stream"
|
||||
version = "0.3.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
|
||||
dependencies = [
|
||||
"async-stream-impl",
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-stream-impl"
|
||||
version = "0.3.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.79",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.77"
|
||||
@@ -384,6 +406,12 @@ dependencies = [
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic-waker"
|
||||
version = "1.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
|
||||
|
||||
[[package]]
|
||||
name = "attohttpc"
|
||||
version = "0.24.1"
|
||||
@@ -953,6 +981,45 @@ dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "console-api"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "86ed14aa9c9f927213c6e4f3ef75faaad3406134efe84ba2cb7983431d5f0931"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"prost",
|
||||
"prost-types",
|
||||
"tonic",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "console-subscriber"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e2e3a111a37f3333946ebf9da370ba5c5577b18eb342ec683eb488dd21980302"
|
||||
dependencies = [
|
||||
"console-api",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-utils",
|
||||
"futures-task",
|
||||
"hdrhistogram",
|
||||
"humantime",
|
||||
"hyper-util",
|
||||
"prost",
|
||||
"prost-types",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thread_local",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const-oid"
|
||||
version = "0.9.6"
|
||||
@@ -1084,6 +1151,15 @@ dependencies = [
|
||||
"itertools",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-channel"
|
||||
version = "0.5.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-deque"
|
||||
version = "0.8.5"
|
||||
@@ -1270,7 +1346,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"hashbrown",
|
||||
"hashbrown 0.14.3",
|
||||
"lock_api",
|
||||
"once_cell",
|
||||
"parking_lot_core",
|
||||
@@ -1421,6 +1497,7 @@ name = "deltachat-repl"
|
||||
version = "1.148.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"console-subscriber",
|
||||
"deltachat",
|
||||
"dirs",
|
||||
"log",
|
||||
@@ -1429,6 +1506,7 @@ dependencies = [
|
||||
"rusqlite",
|
||||
"rustyline",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
@@ -1437,6 +1515,7 @@ name = "deltachat-rpc-server"
|
||||
version = "1.148.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"console-subscriber",
|
||||
"deltachat",
|
||||
"deltachat-jsonrpc",
|
||||
"futures-lite 2.3.0",
|
||||
@@ -1445,6 +1524,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"yerpc",
|
||||
]
|
||||
@@ -2454,7 +2534,26 @@ dependencies = [
|
||||
"futures-sink",
|
||||
"futures-util",
|
||||
"http 0.2.12",
|
||||
"indexmap",
|
||||
"indexmap 2.2.5",
|
||||
"slab",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "0.4.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205"
|
||||
dependencies = [
|
||||
"atomic-waker",
|
||||
"bytes",
|
||||
"fnv",
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"http 1.1.0",
|
||||
"indexmap 2.2.5",
|
||||
"slab",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
@@ -2471,6 +2570,12 @@ dependencies = [
|
||||
"crunchy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.14.3"
|
||||
@@ -2486,7 +2591,20 @@ version = "0.9.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af"
|
||||
dependencies = [
|
||||
"hashbrown",
|
||||
"hashbrown 0.14.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hdrhistogram"
|
||||
version = "7.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
|
||||
dependencies = [
|
||||
"base64 0.21.7",
|
||||
"byteorder",
|
||||
"flate2",
|
||||
"nom",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2721,7 +2839,7 @@ dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"h2",
|
||||
"h2 0.3.26",
|
||||
"http 0.2.12",
|
||||
"http-body 0.4.6",
|
||||
"httparse",
|
||||
@@ -2744,6 +2862,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"h2 0.4.6",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"httparse",
|
||||
@@ -2773,6 +2892,19 @@ dependencies = [
|
||||
"webpki-roots",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-timeout"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793"
|
||||
dependencies = [
|
||||
"hyper 1.4.1",
|
||||
"hyper-util",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-util"
|
||||
version = "0.1.9"
|
||||
@@ -2895,6 +3027,16 @@ dependencies = [
|
||||
"nom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "1.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"hashbrown 0.12.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "2.2.5"
|
||||
@@ -2902,7 +3044,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4"
|
||||
dependencies = [
|
||||
"equivalent",
|
||||
"hashbrown",
|
||||
"hashbrown 0.14.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2996,7 +3138,7 @@ dependencies = [
|
||||
"futures-concurrency",
|
||||
"futures-lite 2.3.0",
|
||||
"futures-util",
|
||||
"indexmap",
|
||||
"indexmap 2.2.5",
|
||||
"iroh-base",
|
||||
"iroh-blake3",
|
||||
"iroh-metrics",
|
||||
@@ -4480,6 +4622,38 @@ dependencies = [
|
||||
"unarray",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost"
|
||||
version = "0.13.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-derive"
|
||||
version = "0.13.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.79",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-types"
|
||||
version = "0.13.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670"
|
||||
dependencies = [
|
||||
"prost",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "qr2term"
|
||||
version = "0.3.3"
|
||||
@@ -6001,6 +6175,7 @@ dependencies = [
|
||||
"signal-hook-registry",
|
||||
"socket2",
|
||||
"tokio-macros",
|
||||
"tracing",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
@@ -6120,7 +6295,7 @@ dependencies = [
|
||||
"futures-io",
|
||||
"futures-sink",
|
||||
"futures-util",
|
||||
"hashbrown",
|
||||
"hashbrown 0.14.3",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
]
|
||||
@@ -6152,13 +6327,43 @@ version = "0.22.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "583c44c02ad26b0c3f3066fe629275e50627026c51ac2e595cca4c230ce1ce1d"
|
||||
dependencies = [
|
||||
"indexmap",
|
||||
"indexmap 2.2.5",
|
||||
"serde",
|
||||
"serde_spanned",
|
||||
"toml_datetime",
|
||||
"winnow",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"axum",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"h2 0.4.6",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"hyper 1.4.1",
|
||||
"hyper-timeout",
|
||||
"hyper-util",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
"prost",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower"
|
||||
version = "0.4.13"
|
||||
@@ -6167,9 +6372,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"indexmap 1.9.3",
|
||||
"pin-project",
|
||||
"pin-project-lite",
|
||||
"rand 0.8.5",
|
||||
"slab",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
|
||||
@@ -18,7 +18,6 @@ use deltachat::constants::DC_MSG_ID_DAYMARKER;
|
||||
use deltachat::contact::{may_be_valid_addr, Contact, ContactId, Origin};
|
||||
use deltachat::context::get_info;
|
||||
use deltachat::ephemeral::Timer;
|
||||
use deltachat::location;
|
||||
use deltachat::message::get_msg_read_receipts;
|
||||
use deltachat::message::{
|
||||
self, delete_msgs, markseen_msgs, Message, MessageState, MsgId, Viewtype,
|
||||
@@ -35,6 +34,7 @@ use deltachat::stock_str::StockMessage;
|
||||
use deltachat::webxdc::StatusUpdateSerial;
|
||||
use deltachat::EventEmitter;
|
||||
use deltachat::{imex, info};
|
||||
use deltachat::{location, spawn_named_task};
|
||||
use sanitize_filename::is_sanitized;
|
||||
use tokio::fs;
|
||||
use tokio::sync::{watch, Mutex, RwLock};
|
||||
@@ -1777,7 +1777,7 @@ impl CommandApi {
|
||||
let ctx = self.get_context(account_id).await?;
|
||||
let fut = send_webxdc_realtime_advertisement(&ctx, MsgId::new(instance_msg_id)).await?;
|
||||
if let Some(fut) = fut {
|
||||
tokio::spawn(async move {
|
||||
spawn_named_task!("send_webxdc_realtime_advertisement", async move {
|
||||
fut.await.ok();
|
||||
info!(ctx, "send_webxdc_realtime_advertisement done")
|
||||
});
|
||||
|
||||
@@ -7,6 +7,7 @@ repository = "https://github.com/deltachat/deltachat-core-rust"
|
||||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
console-subscriber = "0.4.0"
|
||||
deltachat = { workspace = true, features = ["internals"]}
|
||||
dirs = "5"
|
||||
log = { workspace = true }
|
||||
@@ -15,6 +16,7 @@ qr2term = "0.3.3"
|
||||
rusqlite = { workspace = true }
|
||||
rustyline = "14"
|
||||
tokio = { workspace = true, features = ["fs", "rt-multi-thread", "macros"] }
|
||||
tracing = "0.1.40"
|
||||
tracing-subscriber = { workspace = true, features = ["env-filter"] }
|
||||
|
||||
[features]
|
||||
|
||||
57
deltachat-repl/README.md
Normal file
57
deltachat-repl/README.md
Normal file
@@ -0,0 +1,57 @@
|
||||
# Delta Chat REPL
|
||||
|
||||
This is a [REPL](https://en.wikipedia.org/wiki/Read%E2%80%93eval%E2%80%93print_loop) frontend built on top of Delta Chat core.
|
||||
Its purpose is to help with testing during development, it is not meant for end users.
|
||||
|
||||
Dependencies:
|
||||
- If you want to use `getqr` you need `qrencode` (To install, use your system's package manager)
|
||||
|
||||
## Usage
|
||||
|
||||
```
|
||||
cargo run <path to deltachat db>
|
||||
```
|
||||
|
||||
Type in `help` to learn about what commands are available.
|
||||
|
||||
## Usage with `tokio-console`
|
||||
|
||||
Tokio is an async runtime that Delta Chat core uses.
|
||||
Core uses Tokio tasks, which are something similar to threads.
|
||||
`tokio-console` is like a task manager for these Tokio tasks.
|
||||
|
||||
Examples of tasks:
|
||||
- The event loop in the REPL tool which processes events received from core
|
||||
- The REPL loop itself which waits for and executes user commands
|
||||
- The IMAP task that manages IMAP connection in core
|
||||
|
||||
```
|
||||
RUSTFLAGS="--cfg tokio_unstable" cargo run <path to deltachat db>
|
||||
```
|
||||
|
||||
Then in a new console window start [`tokio-console`](https://github.com/tokio-rs/console).
|
||||
You can install it via `cargo install --locked tokio-console`.
|
||||
|
||||
### Example
|
||||
|
||||
An example session in the REPL tool:
|
||||
|
||||
```
|
||||
RUSTFLAGS="--cfg tokio_unstable" cargo run test-db/db
|
||||
setqr dcaccount:https://nine.testrun.org/new
|
||||
configure
|
||||
connect
|
||||
listchats
|
||||
getqr
|
||||
```
|
||||
|
||||
Use the qrcode/openpgp4fpr link to setup the contact on Delta Chat.
|
||||
Then write a message to that new contact, after that you can accept the chat in the REPL tool and send a reply:
|
||||
|
||||
```
|
||||
listchats
|
||||
accept 12
|
||||
chat 12
|
||||
send hi!
|
||||
chat
|
||||
```
|
||||
@@ -30,7 +30,7 @@ use rustyline::{
|
||||
};
|
||||
use tokio::fs;
|
||||
use tokio::runtime::Handle;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
use tracing_subscriber::{prelude::__tracing_subscriber_SubscriberExt, EnvFilter, Layer};
|
||||
|
||||
mod cmdline;
|
||||
use self::cmdline::*;
|
||||
@@ -317,7 +317,7 @@ async fn start(args: Vec<String>) -> Result<(), Error> {
|
||||
.await?;
|
||||
|
||||
let events = context.get_event_emitter();
|
||||
tokio::task::spawn(async move {
|
||||
spawn_named_task!("repl:receive_event", async move {
|
||||
while let Some(event) = events.recv().await {
|
||||
receive_event(event.typ);
|
||||
}
|
||||
@@ -333,7 +333,7 @@ async fn start(args: Vec<String>) -> Result<(), Error> {
|
||||
let mut selected_chat = ChatId::default();
|
||||
|
||||
let ctx = context.clone();
|
||||
let input_loop = tokio::task::spawn_blocking(move || {
|
||||
let input_loop = spawn_named_blocking_task!("repl:input_loop", move || {
|
||||
let h = DcHelper {
|
||||
completer: FilenameCompleter::new(),
|
||||
highlighter: MatchingBracketHighlighter::new(),
|
||||
@@ -353,7 +353,6 @@ async fn start(args: Vec<String>) -> Result<(), Error> {
|
||||
|
||||
match readline {
|
||||
Ok(line) => {
|
||||
// TODO: ignore "set mail_pw"
|
||||
rl.add_history_entry(line.as_str())?;
|
||||
let should_continue = Handle::current().block_on(async {
|
||||
match handle_cmd(line.trim(), ctx.clone(), &mut selected_chat).await {
|
||||
@@ -481,11 +480,22 @@ async fn handle_cmd(
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
EnvFilter::from_default_env().add_directive("deltachat_repl=info".parse()?),
|
||||
)
|
||||
.init();
|
||||
#[allow(unexpected_cfgs)]
|
||||
tracing::subscriber::set_global_default({
|
||||
let subscribers = tracing_subscriber::Registry::default().with(
|
||||
tracing_subscriber::fmt::layer().with_filter(
|
||||
EnvFilter::from_default_env().add_directive("deltachat_repl=info".parse()?),
|
||||
),
|
||||
);
|
||||
#[cfg(tokio_unstable)]
|
||||
{
|
||||
subscribers.with(console_subscriber::spawn())
|
||||
}
|
||||
#[cfg(not(tokio_unstable))]
|
||||
{
|
||||
subscribers
|
||||
}
|
||||
})?;
|
||||
|
||||
let args = std::env::args().collect();
|
||||
start(args).await?;
|
||||
|
||||
@@ -22,6 +22,8 @@ tokio = { workspace = true, features = ["io-std"] }
|
||||
tokio-util = { workspace = true }
|
||||
tracing-subscriber = { workspace = true, features = ["env-filter"] }
|
||||
yerpc = { workspace = true, features = ["anyhow_expose", "openrpc"] }
|
||||
console-subscriber = "0.4.0"
|
||||
tracing = "0.1.40"
|
||||
|
||||
[features]
|
||||
default = ["vendored"]
|
||||
|
||||
@@ -35,3 +35,21 @@ languages other than Rust, for example:
|
||||
|
||||
Run `deltachat-rpc-server --version` to check the version of the server.
|
||||
Run `deltachat-rpc-server --openrpc` to get [OpenRPC](https://open-rpc.org/) specification of the provided JSON-RPC API.
|
||||
|
||||
## Usage with `tokio-console`
|
||||
|
||||
When built with `RUSTFLAGS="--cfg tokio_unstable"`, console-subscriber is enabled.
|
||||
That means that you can use [`tokio-console`](https://github.com/tokio-rs/console) to inspect active Tokio tasks.
|
||||
You can install it via `cargo install tokio-console`.
|
||||
|
||||
```sh
|
||||
RUSTFLAGS="--cfg tokio_unstable" cargo run
|
||||
```
|
||||
|
||||
### Usage in deltachat-desktop:
|
||||
|
||||
Follow steps from `deltachat-desktop/docs/UPDATE_CORE.md`, but run the `make_local_dev_version` script with the `tokio_unstable` rustflag:
|
||||
|
||||
```sh
|
||||
RUSTFLAGS="--cfg tokio_unstable" python3 deltachat-rpc-server/npm-package/scripts/make_local_dev_version.py
|
||||
```
|
||||
|
||||
@@ -7,11 +7,11 @@ use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, Context as _, Result};
|
||||
use deltachat::constants::DC_VERSION_STR;
|
||||
use deltachat::{constants::DC_VERSION_STR, spawn_named_task};
|
||||
use deltachat_jsonrpc::api::{Accounts, CommandApi};
|
||||
use futures_lite::stream::StreamExt;
|
||||
use tokio::io::{self, AsyncBufReadExt, BufReader};
|
||||
use tracing_subscriber::EnvFilter;
|
||||
use tracing_subscriber::{prelude::*, EnvFilter};
|
||||
use yerpc::RpcServer as _;
|
||||
|
||||
#[cfg(target_family = "unix")]
|
||||
@@ -67,10 +67,23 @@ async fn main_impl() -> Result<()> {
|
||||
// Logs from `log` crate and traces from `tracing` crate
|
||||
// are configurable with `RUST_LOG` environment variable
|
||||
// and go to stderr to avoid interferring with JSON-RPC using stdout.
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(EnvFilter::from_default_env())
|
||||
.with_writer(std::io::stderr)
|
||||
.init();
|
||||
#[allow(unexpected_cfgs)]
|
||||
tracing::subscriber::set_global_default({
|
||||
let subscribers = tracing_subscriber::Registry::default().with(
|
||||
tracing_subscriber::fmt::layer()
|
||||
.with_writer(std::io::stderr)
|
||||
.with_filter(EnvFilter::from_default_env()),
|
||||
);
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
{
|
||||
subscribers.with(console_subscriber::spawn())
|
||||
}
|
||||
#[cfg(not(tokio_unstable))]
|
||||
{
|
||||
subscribers
|
||||
}
|
||||
})?;
|
||||
|
||||
let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string());
|
||||
log::info!("Starting with accounts directory `{}`.", path);
|
||||
@@ -87,7 +100,7 @@ async fn main_impl() -> Result<()> {
|
||||
|
||||
// Send task prints JSON responses to stdout.
|
||||
let cancel = main_cancel.clone();
|
||||
let send_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
|
||||
let send_task: JoinHandle<anyhow::Result<()>> = spawn_named_task!("send_task", async move {
|
||||
let _cancel_guard = cancel.clone().drop_guard();
|
||||
loop {
|
||||
let message = tokio::select! {
|
||||
@@ -104,24 +117,25 @@ async fn main_impl() -> Result<()> {
|
||||
});
|
||||
|
||||
let cancel = main_cancel.clone();
|
||||
let sigterm_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
|
||||
#[cfg(target_family = "unix")]
|
||||
{
|
||||
let _cancel_guard = cancel.clone().drop_guard();
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => (),
|
||||
_ = sigterm.recv() => {
|
||||
log::info!("got SIGTERM");
|
||||
let sigterm_task: JoinHandle<anyhow::Result<()>> =
|
||||
spawn_named_task!("sigterm_task", async move {
|
||||
#[cfg(target_family = "unix")]
|
||||
{
|
||||
let _cancel_guard = cancel.clone().drop_guard();
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => (),
|
||||
_ = sigterm.recv() => {
|
||||
log::info!("got SIGTERM");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let _ = cancel;
|
||||
Ok(())
|
||||
});
|
||||
let _ = cancel;
|
||||
Ok(())
|
||||
});
|
||||
|
||||
// Receiver task reads JSON requests from stdin.
|
||||
let cancel = main_cancel.clone();
|
||||
let recv_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
|
||||
let recv_task: JoinHandle<anyhow::Result<()>> = spawn_named_task!("recv_task", async move {
|
||||
let _cancel_guard = cancel.clone().drop_guard();
|
||||
let stdin = io::stdin();
|
||||
let mut lines = BufReader::new(stdin).lines();
|
||||
@@ -143,7 +157,7 @@ async fn main_impl() -> Result<()> {
|
||||
};
|
||||
log::trace!("RPC recv {}", message);
|
||||
let session = session.clone();
|
||||
tokio::spawn(async move {
|
||||
spawn_named_task!("handle_incoming", async move {
|
||||
session.handle_incoming(&message).await;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -29,9 +29,11 @@ skip = [
|
||||
{ name = "futures-lite", version = "1.13.0" },
|
||||
{ name = "getrandom", version = "<0.2" },
|
||||
{ name = "h2", version = "0.3.26" },
|
||||
{ name = "hashbrown", version = "0.12.3"},
|
||||
{ name = "http-body", version = "0.4.6" },
|
||||
{ name = "http", version = "0.2.12" },
|
||||
{ name = "hyper", version = "0.14.28" },
|
||||
{ name = "indexmap", version = "1.9.3" },
|
||||
{ name = "nix", version = "0.26.4" },
|
||||
{ name = "quick-error", version = "<2.0" },
|
||||
{ name = "rand_chacha", version = "<0.3" },
|
||||
|
||||
@@ -400,36 +400,39 @@ impl Config {
|
||||
|
||||
#[cfg(not(target_os = "ios"))]
|
||||
async fn create_lock_task(dir: PathBuf) -> Result<Option<JoinHandle<anyhow::Result<()>>>> {
|
||||
use crate::spawn_named_task;
|
||||
|
||||
let lockfile = dir.join(LOCKFILE_NAME);
|
||||
let mut lock = fd_lock::RwLock::new(fs::File::create(lockfile).await?);
|
||||
let (locked_tx, locked_rx) = oneshot::channel();
|
||||
let lock_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
|
||||
let mut timeout = Duration::from_millis(100);
|
||||
let _guard = loop {
|
||||
match lock.try_write() {
|
||||
Ok(guard) => break Ok(guard),
|
||||
Err(err) => {
|
||||
if timeout.as_millis() > 1600 {
|
||||
break Err(err);
|
||||
}
|
||||
// We need to wait for the previous lock_task to be aborted thus unlocking
|
||||
// the lockfile. We don't open configs for writing often outside of the
|
||||
// tests, so this adds delays to the tests, but otherwise ok.
|
||||
sleep(timeout).await;
|
||||
if err.kind() == std::io::ErrorKind::WouldBlock {
|
||||
timeout *= 2;
|
||||
let lock_task: JoinHandle<anyhow::Result<()>> =
|
||||
spawn_named_task!("lock_task", async move {
|
||||
let mut timeout = Duration::from_millis(100);
|
||||
let _guard = loop {
|
||||
match lock.try_write() {
|
||||
Ok(guard) => break Ok(guard),
|
||||
Err(err) => {
|
||||
if timeout.as_millis() > 1600 {
|
||||
break Err(err);
|
||||
}
|
||||
// We need to wait for the previous lock_task to be aborted thus unlocking
|
||||
// the lockfile. We don't open configs for writing often outside of the
|
||||
// tests, so this adds delays to the tests, but otherwise ok.
|
||||
sleep(timeout).await;
|
||||
if err.kind() == std::io::ErrorKind::WouldBlock {
|
||||
timeout *= 2;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}?;
|
||||
locked_tx
|
||||
.send(())
|
||||
.ok()
|
||||
.context("Cannot notify about lockfile locking")?;
|
||||
let (_tx, rx) = oneshot::channel();
|
||||
rx.await?;
|
||||
Ok(())
|
||||
});
|
||||
}?;
|
||||
locked_tx
|
||||
.send(())
|
||||
.ok()
|
||||
.context("Cannot notify about lockfile locking")?;
|
||||
let (_tx, rx) = oneshot::channel();
|
||||
rx.await?;
|
||||
Ok(())
|
||||
});
|
||||
locked_rx.await?;
|
||||
Ok(Some(lock_task))
|
||||
}
|
||||
|
||||
@@ -12,12 +12,10 @@ use deltachat_contact_tools::{sanitize_bidi_characters, sanitize_single_line, Co
|
||||
use deltachat_derive::{FromSql, ToSql};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use strum_macros::EnumIter;
|
||||
use tokio::task;
|
||||
|
||||
use crate::aheader::EncryptPreference;
|
||||
use crate::blob::BlobObject;
|
||||
use crate::chatlist::Chatlist;
|
||||
use crate::chatlist_events;
|
||||
use crate::color::str_to_color;
|
||||
use crate::config::Config;
|
||||
use crate::constants::{
|
||||
@@ -50,6 +48,7 @@ use crate::tools::{
|
||||
truncate_msg_text, IsNoneOrEmpty, SystemTime,
|
||||
};
|
||||
use crate::webxdc::StatusUpdateSerial;
|
||||
use crate::{chatlist_events, spawn_named_task};
|
||||
|
||||
/// An chat item, such as a message or a marker.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
@@ -1476,7 +1475,7 @@ impl ChatId {
|
||||
/// and otherwise notifying the user accordingly.
|
||||
pub(crate) fn spawn_securejoin_wait(self, context: &Context, timeout: u64) {
|
||||
let context = context.clone();
|
||||
task::spawn(async move {
|
||||
spawn_named_task!("securejoin_wait", async move {
|
||||
tokio::time::sleep(Duration::from_secs(timeout)).await;
|
||||
let chat = Chat::load_from_db(&context, self).await?;
|
||||
chat.check_securejoin_wait(&context, 0).await?;
|
||||
|
||||
@@ -21,7 +21,6 @@ use futures::FutureExt;
|
||||
use futures_lite::FutureExt as _;
|
||||
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
|
||||
use server_params::{expand_param_vector, ServerParams};
|
||||
use tokio::task;
|
||||
|
||||
use crate::config::{self, Config};
|
||||
use crate::context::Context;
|
||||
@@ -35,10 +34,10 @@ use crate::message::{Message, Viewtype};
|
||||
use crate::oauth2::get_oauth2_addr;
|
||||
use crate::provider::{Protocol, Socket, UsernamePattern};
|
||||
use crate::smtp::Smtp;
|
||||
use crate::stock_str;
|
||||
use crate::sync::Sync::*;
|
||||
use crate::tools::time;
|
||||
use crate::{chat, e2ee, provider};
|
||||
use crate::{spawn_named_task, stock_str};
|
||||
use deltachat_contact_tools::addr_cmp;
|
||||
|
||||
macro_rules! progress {
|
||||
@@ -372,7 +371,9 @@ async fn configure(ctx: &Context, param: &EnteredLoginParam) -> Result<Configure
|
||||
progress!(ctx, 1);
|
||||
|
||||
let ctx2 = ctx.clone();
|
||||
let update_device_chats_handle = task::spawn(async move { ctx2.update_device_chats().await });
|
||||
let update_device_chats_handle = spawn_named_task!("update_device_chats", async move {
|
||||
ctx2.update_device_chats().await
|
||||
});
|
||||
|
||||
let configured_param = get_configured_param(ctx, param).await?;
|
||||
let strict_tls = configured_param.strict_tls();
|
||||
@@ -387,7 +388,7 @@ async fn configure(ctx: &Context, param: &EnteredLoginParam) -> Result<Configure
|
||||
let smtp_addr = configured_param.addr.clone();
|
||||
let proxy_config = configured_param.proxy_config.clone();
|
||||
|
||||
let smtp_config_task = task::spawn(async move {
|
||||
let smtp_config_task = spawn_named_task!("smtp_config", async move {
|
||||
let mut smtp = Smtp::new();
|
||||
smtp.connect(
|
||||
&context_smtp,
|
||||
|
||||
@@ -37,7 +37,7 @@ use crate::peerstate::Peerstate;
|
||||
use crate::sql::{self, params_iter};
|
||||
use crate::sync::{self, Sync::*};
|
||||
use crate::tools::{duration_to_str, get_abs_path, smeared_time, time, SystemTime};
|
||||
use crate::{chat, chatlist_events, stock_str};
|
||||
use crate::{chat, chatlist_events, spawn_named_task, stock_str};
|
||||
|
||||
/// Time during which a contact is considered as seen recently.
|
||||
const SEEN_RECENTLY_SECONDS: i64 = 600;
|
||||
@@ -1791,7 +1791,7 @@ impl RecentlySeenLoop {
|
||||
pub(crate) fn new(context: Context) -> Self {
|
||||
let (interrupt_send, interrupt_recv) = channel::bounded(1);
|
||||
|
||||
let handle = task::spawn(Self::run(context, interrupt_recv));
|
||||
let handle = spawn_named_task!("recently_seen_loop", Self::run(context, interrupt_recv));
|
||||
Self {
|
||||
handle,
|
||||
interrupt_send,
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::context::Context;
|
||||
use crate::events::EventType;
|
||||
use crate::message::{Message, MsgId, Viewtype};
|
||||
use crate::param::Param;
|
||||
use crate::spawn_named_task;
|
||||
use crate::tools::time;
|
||||
use crate::webxdc::StatusUpdateItem;
|
||||
use async_channel::{self as channel, Receiver, Sender};
|
||||
@@ -150,7 +151,7 @@ pub(crate) async fn set_debug_logging_xdc(ctx: &Context, id: Option<MsgId>) -> a
|
||||
let (sender, debug_logging_recv) = channel::bounded(1000);
|
||||
let loop_handle = {
|
||||
let ctx = ctx.clone();
|
||||
task::spawn(async move {
|
||||
spawn_named_task!("xdc_debug_logging_loop", async move {
|
||||
debug_logging_loop(&ctx, debug_logging_recv).await
|
||||
})
|
||||
};
|
||||
|
||||
@@ -24,7 +24,6 @@ use rand::Rng;
|
||||
use ratelimit::Ratelimit;
|
||||
use url::Url;
|
||||
|
||||
use crate::chat::{self, ChatId, ChatIdBlocked};
|
||||
use crate::chatlist_events;
|
||||
use crate::config::Config;
|
||||
use crate::constants::{self, Blocked, Chattype, ShowEmails};
|
||||
@@ -48,6 +47,10 @@ use crate::scheduler::connectivity::ConnectivityStore;
|
||||
use crate::sql;
|
||||
use crate::stock_str;
|
||||
use crate::tools::{self, create_id, duration_to_str};
|
||||
use crate::{
|
||||
chat::{self, ChatId, ChatIdBlocked},
|
||||
spawn_named_task,
|
||||
};
|
||||
|
||||
pub(crate) mod capabilities;
|
||||
mod client;
|
||||
@@ -1556,7 +1559,9 @@ impl Session {
|
||||
} else if !context.push_subscriber.heartbeat_subscribed().await {
|
||||
let context = context.clone();
|
||||
// Subscribe for heartbeat notifications.
|
||||
tokio::spawn(async move { context.push_subscriber.subscribe(&context).await });
|
||||
spawn_named_task!("subscribe_to_heartbeat_notifications", async move {
|
||||
context.push_subscriber.subscribe(&context).await
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -46,7 +46,7 @@ use crate::message::{Message, Viewtype};
|
||||
use crate::qr::Qr;
|
||||
use crate::stock_str::backup_transfer_msg_body;
|
||||
use crate::tools::{create_id, time, TempPathGuard};
|
||||
use crate::EventType;
|
||||
use crate::{spawn_named_task, EventType};
|
||||
|
||||
use super::{export_backup_stream, export_database, import_backup_stream, DBFILE_BACKUP_NAME};
|
||||
|
||||
@@ -132,7 +132,7 @@ impl BackupProvider {
|
||||
let drop_token = drop_token.clone();
|
||||
let endpoint = endpoint.clone();
|
||||
let auth_token = auth_token.clone();
|
||||
tokio::spawn(async move {
|
||||
spawn_named_task!("accept_loop", async move {
|
||||
Self::accept_loop(
|
||||
context.clone(),
|
||||
endpoint,
|
||||
|
||||
@@ -12,13 +12,13 @@ use pgp::composed::Deserializable;
|
||||
pub use pgp::composed::{SignedPublicKey, SignedSecretKey};
|
||||
use pgp::ser::Serialize;
|
||||
use pgp::types::{KeyTrait, SecretKeyTrait};
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::constants::KeyGenType;
|
||||
use crate::context::Context;
|
||||
use crate::log::LogExt;
|
||||
use crate::pgp::KeyPair;
|
||||
use crate::spawn_named_blocking_task;
|
||||
use crate::tools::{self, time_elapsed};
|
||||
|
||||
/// Convenience trait for working with keys.
|
||||
@@ -251,8 +251,10 @@ async fn generate_keypair(context: &Context) -> Result<KeyPair> {
|
||||
let keytype = KeyGenType::from_i32(context.get_config_int(Config::KeyGenType).await?)
|
||||
.unwrap_or_default();
|
||||
info!(context, "Generating keypair with type {}", keytype);
|
||||
let keypair = Handle::current()
|
||||
.spawn_blocking(move || crate::pgp::create_keypair(addr, keytype))
|
||||
let keypair =
|
||||
spawn_named_blocking_task!("generate_keypair", move || crate::pgp::create_keypair(
|
||||
addr, keytype
|
||||
))
|
||||
.await??;
|
||||
|
||||
store_self_keypair(context, &keypair, KeyPairUse::Default).await?;
|
||||
|
||||
@@ -11,6 +11,7 @@ use crate::context::Context;
|
||||
use crate::net::proxy::ProxyConfig;
|
||||
use crate::net::session::SessionStream;
|
||||
use crate::net::tls::wrap_rustls;
|
||||
use crate::spawn_named_task;
|
||||
|
||||
/// HTTP(S) GET response.
|
||||
#[derive(Debug)]
|
||||
@@ -85,7 +86,7 @@ where
|
||||
|
||||
let io = TokioIo::new(stream);
|
||||
let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
|
||||
tokio::task::spawn(conn);
|
||||
spawn_named_task!("http_connection", conn);
|
||||
|
||||
Ok(sender)
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ use crate::context::Context;
|
||||
use crate::headerdef::HeaderDef;
|
||||
use crate::message::{Message, MsgId, Viewtype};
|
||||
use crate::mimeparser::SystemMessage;
|
||||
use crate::EventType;
|
||||
use crate::{spawn_named_task, EventType};
|
||||
|
||||
/// The length of an ed25519 `PublicKey`, in bytes.
|
||||
const PUBLIC_KEY_LENGTH: usize = 32;
|
||||
@@ -125,7 +125,7 @@ impl Iroh {
|
||||
.split();
|
||||
|
||||
let ctx = ctx.clone();
|
||||
let subscribe_loop = tokio::spawn(async move {
|
||||
let subscribe_loop = spawn_named_task!("subscribe_loop", async move {
|
||||
if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await {
|
||||
warn!(ctx, "subscribe_loop failed: {e}")
|
||||
}
|
||||
@@ -264,7 +264,10 @@ impl Context {
|
||||
let context = self.clone();
|
||||
|
||||
// Shuts down on deltachat shutdown
|
||||
tokio::spawn(endpoint_loop(context, endpoint.clone(), gossip.clone()));
|
||||
spawn_named_task!(
|
||||
"endpoint_loop",
|
||||
endpoint_loop(context, endpoint.clone(), gossip.clone())
|
||||
);
|
||||
|
||||
Ok(Iroh {
|
||||
endpoint,
|
||||
@@ -442,7 +445,7 @@ async fn endpoint_loop(context: Context, endpoint: Endpoint, gossip: Gossip) {
|
||||
info!(context, "IROH_REALTIME: accepting iroh connection");
|
||||
let gossip = gossip.clone();
|
||||
let context = context.clone();
|
||||
tokio::spawn(async move {
|
||||
spawn_named_task!("handle_connection", async move {
|
||||
if let Err(err) = handle_connection(&context, conn, gossip).await {
|
||||
warn!(context, "IROH_REALTIME: iroh connection error: {err}");
|
||||
}
|
||||
|
||||
47
src/pgp.rs
47
src/pgp.rs
@@ -16,10 +16,10 @@ use pgp::crypto::hash::HashAlgorithm;
|
||||
use pgp::crypto::sym::SymmetricKeyAlgorithm;
|
||||
use pgp::types::{CompressionAlgorithm, KeyTrait, Mpi, PublicKeyTrait, StringToKey};
|
||||
use rand::{thread_rng, CryptoRng, Rng};
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
use crate::constants::KeyGenType;
|
||||
use crate::key::{DcKey, Fingerprint};
|
||||
use crate::spawn_named_blocking_task;
|
||||
|
||||
#[allow(missing_docs)]
|
||||
#[cfg(test)]
|
||||
@@ -250,33 +250,32 @@ pub async fn pk_encrypt(
|
||||
) -> Result<String> {
|
||||
let lit_msg = Message::new_literal_bytes("", plain);
|
||||
|
||||
Handle::current()
|
||||
.spawn_blocking(move || {
|
||||
let pkeys: Vec<SignedPublicKeyOrSubkey> = public_keys_for_encryption
|
||||
.iter()
|
||||
.filter_map(select_pk_for_encryption)
|
||||
.collect();
|
||||
let pkeys_refs: Vec<&SignedPublicKeyOrSubkey> = pkeys.iter().collect();
|
||||
spawn_named_blocking_task!("pk_encrypt", move || {
|
||||
let pkeys: Vec<SignedPublicKeyOrSubkey> = public_keys_for_encryption
|
||||
.iter()
|
||||
.filter_map(select_pk_for_encryption)
|
||||
.collect();
|
||||
let pkeys_refs: Vec<&SignedPublicKeyOrSubkey> = pkeys.iter().collect();
|
||||
|
||||
let mut rng = thread_rng();
|
||||
let mut rng = thread_rng();
|
||||
|
||||
let encrypted_msg = if let Some(ref skey) = private_key_for_signing {
|
||||
let signed_msg = lit_msg.sign(skey, || "".into(), HASH_ALGORITHM)?;
|
||||
let compressed_msg = if compress {
|
||||
signed_msg.compress(CompressionAlgorithm::ZLIB)?
|
||||
} else {
|
||||
signed_msg
|
||||
};
|
||||
compressed_msg.encrypt_to_keys(&mut rng, SYMMETRIC_KEY_ALGORITHM, &pkeys_refs)?
|
||||
let encrypted_msg = if let Some(ref skey) = private_key_for_signing {
|
||||
let signed_msg = lit_msg.sign(skey, || "".into(), HASH_ALGORITHM)?;
|
||||
let compressed_msg = if compress {
|
||||
signed_msg.compress(CompressionAlgorithm::ZLIB)?
|
||||
} else {
|
||||
lit_msg.encrypt_to_keys(&mut rng, SYMMETRIC_KEY_ALGORITHM, &pkeys_refs)?
|
||||
signed_msg
|
||||
};
|
||||
compressed_msg.encrypt_to_keys(&mut rng, SYMMETRIC_KEY_ALGORITHM, &pkeys_refs)?
|
||||
} else {
|
||||
lit_msg.encrypt_to_keys(&mut rng, SYMMETRIC_KEY_ALGORITHM, &pkeys_refs)?
|
||||
};
|
||||
|
||||
let encoded_msg = encrypted_msg.to_armored_string(Default::default())?;
|
||||
let encoded_msg = encrypted_msg.to_armored_string(Default::default())?;
|
||||
|
||||
Ok(encoded_msg)
|
||||
})
|
||||
.await?
|
||||
Ok(encoded_msg)
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
/// Signs `plain` text using `private_key_for_signing`.
|
||||
@@ -367,7 +366,7 @@ pub async fn symm_encrypt(passphrase: &str, plain: &[u8]) -> Result<String> {
|
||||
let lit_msg = Message::new_literal_bytes("", plain);
|
||||
let passphrase = passphrase.to_string();
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
spawn_named_blocking_task!("symm_encrypt", move || {
|
||||
let mut rng = thread_rng();
|
||||
let s2k = StringToKey::new_default(&mut rng);
|
||||
let msg =
|
||||
@@ -388,7 +387,7 @@ pub async fn symm_decrypt<T: std::io::Read + std::io::Seek>(
|
||||
let (enc_msg, _) = Message::from_armor_single(ctext)?;
|
||||
|
||||
let passphrase = passphrase.to_string();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
spawn_named_blocking_task!("symm_decrypt", move || {
|
||||
let msg = enc_msg.decrypt_with_password(|| passphrase)?;
|
||||
|
||||
match msg.get_content()? {
|
||||
|
||||
@@ -19,12 +19,12 @@ use crate::download::{download_msg, DownloadState};
|
||||
use crate::ephemeral::{self, delete_expired_imap_messages};
|
||||
use crate::events::EventType;
|
||||
use crate::imap::{session::Session, FolderMeaning, Imap};
|
||||
use crate::location;
|
||||
use crate::log::LogExt;
|
||||
use crate::message::MsgId;
|
||||
use crate::smtp::{send_smtp_messages, Smtp};
|
||||
use crate::sql;
|
||||
use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
|
||||
use crate::{location, spawn_named_task};
|
||||
|
||||
pub(crate) mod connectivity;
|
||||
|
||||
@@ -164,7 +164,7 @@ impl SchedulerState {
|
||||
}
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
tokio::spawn(async move {
|
||||
spawn_named_task!("pause", async move {
|
||||
rx.await.ok();
|
||||
let mut inner = context.scheduler.inner.write().await;
|
||||
match *inner {
|
||||
@@ -849,7 +849,10 @@ impl Scheduler {
|
||||
let (inbox_start_send, inbox_start_recv) = oneshot::channel();
|
||||
let handle = {
|
||||
let ctx = ctx.clone();
|
||||
task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
|
||||
spawn_named_task!(
|
||||
"inbox_loop",
|
||||
inbox_loop(ctx, inbox_start_send, inbox_handlers)
|
||||
)
|
||||
};
|
||||
let inbox = SchedBox {
|
||||
meaning: FolderMeaning::Inbox,
|
||||
@@ -866,7 +869,10 @@ impl Scheduler {
|
||||
let (conn_state, handlers) = ImapConnectionState::new(ctx).await?;
|
||||
let (start_send, start_recv) = oneshot::channel();
|
||||
let ctx = ctx.clone();
|
||||
let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
|
||||
let handle = spawn_named_task!(
|
||||
"simple_imap_loop",
|
||||
simple_imap_loop(ctx, start_send, handlers, meaning)
|
||||
);
|
||||
oboxes.push(SchedBox {
|
||||
meaning,
|
||||
conn_state,
|
||||
@@ -878,20 +884,20 @@ impl Scheduler {
|
||||
|
||||
let smtp_handle = {
|
||||
let ctx = ctx.clone();
|
||||
task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
|
||||
spawn_named_task!("smtp_loop", smtp_loop(ctx, smtp_start_send, smtp_handlers))
|
||||
};
|
||||
start_recvs.push(smtp_start_recv);
|
||||
|
||||
let ephemeral_handle = {
|
||||
let ctx = ctx.clone();
|
||||
task::spawn(async move {
|
||||
spawn_named_task!("ephemeral_loop", async move {
|
||||
ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
|
||||
})
|
||||
};
|
||||
|
||||
let location_handle = {
|
||||
let ctx = ctx.clone();
|
||||
task::spawn(async move {
|
||||
spawn_named_task!("location_loop", async move {
|
||||
location::location_loop(&ctx, location_interrupt_recv).await;
|
||||
})
|
||||
};
|
||||
|
||||
@@ -6,7 +6,6 @@ pub mod send;
|
||||
use anyhow::{bail, format_err, Context as _, Error, Result};
|
||||
use async_smtp::response::{Category, Code, Detail};
|
||||
use async_smtp::{EmailAddress, SmtpTransport};
|
||||
use tokio::task;
|
||||
|
||||
use crate::chat::{add_info_msg_with_cmd, ChatId};
|
||||
use crate::config::Config;
|
||||
@@ -21,9 +20,9 @@ use crate::mimefactory::MimeFactory;
|
||||
use crate::net::proxy::ProxyConfig;
|
||||
use crate::net::session::SessionBufStream;
|
||||
use crate::scheduler::connectivity::ConnectivityStore;
|
||||
use crate::sql;
|
||||
use crate::stock_str::unencrypted_email;
|
||||
use crate::tools::{self, time_elapsed};
|
||||
use crate::{spawn_named_task, sql};
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct Smtp {
|
||||
@@ -56,7 +55,7 @@ impl Smtp {
|
||||
// Closing connection with a QUIT command may take some time, especially if it's a
|
||||
// stale connection and an attempt to send the command times out. Send a command in a
|
||||
// separate task to avoid waiting for reply or timeout.
|
||||
task::spawn(async move { transport.quit().await });
|
||||
spawn_named_task!("disconnect SMTP", async move { transport.quit().await });
|
||||
}
|
||||
self.last_success = None;
|
||||
}
|
||||
|
||||
@@ -41,6 +41,7 @@ use crate::peerstate::Peerstate;
|
||||
use crate::pgp::KeyPair;
|
||||
use crate::receive_imf::receive_imf;
|
||||
use crate::securejoin::{get_securejoin_qr, join_securejoin};
|
||||
use crate::spawn_named_task;
|
||||
use crate::stock_str::StockStrings;
|
||||
|
||||
#[allow(non_upper_case_globals)]
|
||||
@@ -969,7 +970,7 @@ impl InnerLogSink {
|
||||
/// Subscribes this log sink to event emitter.
|
||||
pub fn subscribe(&self, event_emitter: EventEmitter) {
|
||||
let sender = self.sender.clone();
|
||||
task::spawn(async move {
|
||||
spawn_named_task!("InnerLogSink", async move {
|
||||
while let Some(event) = event_emitter.recv().await {
|
||||
sender.try_send(LogEvent::Event(event.clone())).ok();
|
||||
}
|
||||
|
||||
126
src/tools.rs
126
src/tools.rs
@@ -708,6 +708,132 @@ pub(crate) fn inc_and_check<T: PrimInt + AddAssign + std::fmt::Debug>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Spawns a named asynchronous task if the `tokio_unstable` feature is enabled.
|
||||
///
|
||||
/// Spawns a new asynchronous task, returning a [tokio::task::JoinHandle] for it.
|
||||
/// The provided future will start running in the background immediately when the function is called, even if you don't await the returned JoinHandle.
|
||||
/// See [tokio::task::spawn].
|
||||
///
|
||||
/// If the rustflag `tokio_unstable` is active, the task will be given the specified `name`
|
||||
/// for easier identification in monitoring tools (like tokio-console).
|
||||
/// If `tokio_unstable` is not set, the task is spawned normally without a name.
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// - `name`: The name of the task.
|
||||
/// - `future`: The future to be executed, which must implement `Future`, be `Send`, and `'static`.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A [tokio::task::JoinHandle] that can be awaited to retrieve the output of the future.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the task fails to spawn when `tokio_unstable` is enabled.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// use deltachat::spawn_named_task; // or inside of core: crate::spawn_named_task
|
||||
///
|
||||
/// let handle = spawn_named_task!("my_task", async {
|
||||
/// // Your async code here
|
||||
/// });
|
||||
///
|
||||
/// let result = handle.await.unwrap();
|
||||
/// ```
|
||||
#[macro_export]
|
||||
macro_rules! spawn_named_task {
|
||||
($name:expr, $future:expr) => {{
|
||||
#[inline(always)]
|
||||
#[allow(unused_variables, unexpected_cfgs)]
|
||||
pub fn __spawn_named_task<Fut>(
|
||||
name: &str,
|
||||
future: Fut,
|
||||
) -> ::tokio::task::JoinHandle<Fut::Output>
|
||||
where
|
||||
Fut: ::std::future::Future + Send + 'static,
|
||||
Fut::Output: Send + 'static,
|
||||
{
|
||||
#[cfg(tokio_unstable)]
|
||||
{
|
||||
::tokio::task::Builder::new()
|
||||
.name(name)
|
||||
.spawn(future)
|
||||
.expect("Failed to spawn task")
|
||||
}
|
||||
#[cfg(not(tokio_unstable))]
|
||||
{
|
||||
::tokio::task::spawn(future)
|
||||
}
|
||||
}
|
||||
__spawn_named_task($name, $future)
|
||||
}};
|
||||
}
|
||||
|
||||
/// Spawns a named blocking task if the `tokio_unstable` feature is enabled.
|
||||
///
|
||||
/// Spawns a new blocking task, returning a [tokio::task::JoinHandle] for it.
|
||||
/// The provided future will start running in the background immediately when the function is called, even if you don't await the returned JoinHandle.
|
||||
/// See [tokio::task::spawn_blocking].
|
||||
///
|
||||
/// If the rustflag `tokio_unstable` is active, the task will be given the specified `name`
|
||||
/// for easier identification in monitoring tools (like tokio-console).
|
||||
/// If `tokio_unstable` is not set, the task is spawned normally without a name.
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// - `name`: The name of the task
|
||||
/// - `future`: The future to be executed, which must implement `Future`, be `Send`, and `'static`.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A [tokio::task::JoinHandle] that can be awaited to retrieve the output of the future.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the task fails to spawn when `tokio_unstable` is enabled.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// use deltachat::spawn_named_blocking_task; // or inside of core: crate::spawn_named_blocking_task
|
||||
///
|
||||
/// let handle = spawn_named_blocking_task!("my_task", async {
|
||||
/// // Your async code here
|
||||
/// });
|
||||
///
|
||||
/// let result = handle.await.unwrap();
|
||||
/// ```
|
||||
#[macro_export]
|
||||
macro_rules! spawn_named_blocking_task {
|
||||
($name:expr, $future:expr) => {{
|
||||
#[inline(always)]
|
||||
#[allow(unused_variables, unexpected_cfgs)]
|
||||
pub fn __spawn_named_blocking_task<Fut, ReturnType>(
|
||||
name: &str,
|
||||
future: Fut,
|
||||
) -> ::tokio::task::JoinHandle<ReturnType>
|
||||
where
|
||||
Fut: FnOnce() -> ReturnType + Send + 'static,
|
||||
ReturnType: Send + 'static,
|
||||
{
|
||||
#[cfg(tokio_unstable)]
|
||||
{
|
||||
::tokio::task::Builder::new()
|
||||
.name(name)
|
||||
.spawn_blocking(future)
|
||||
.expect("Failed to spawn task")
|
||||
}
|
||||
#[cfg(not(tokio_unstable))]
|
||||
{
|
||||
::tokio::task::spawn_blocking(future)
|
||||
}
|
||||
}
|
||||
__spawn_named_blocking_task($name, $future)
|
||||
}};
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#![allow(clippy::indexing_slicing)]
|
||||
|
||||
Reference in New Issue
Block a user