Compare commits

...

25 Commits

Author SHA1 Message Date
Simon Laux
f029e442ff Update deltachat-repl/README.md 2024-10-23 14:07:56 +02:00
Simon Laux
fe6543cd7a Apply suggestions from code review
Co-authored-by: iequidoo <117991069+iequidoo@users.noreply.github.com>
2024-10-23 14:07:56 +02:00
Simon Laux
6845e81608 fix lint, revert incorrect change 2024-10-23 14:07:56 +02:00
Simon Laux
e4ccb5287b fix syntax error typo 2024-10-23 14:07:56 +02:00
Simon Laux
e9d32c1bc5 update cargo deny 2024-10-23 14:07:56 +02:00
Simon Laux
fbf2538fa2 Apply suggestions from code review
Co-authored-by: link2xt <link2xt@testrun.org>
Co-authored-by: iequidoo <117991069+iequidoo@users.noreply.github.com>
2024-10-23 14:07:56 +02:00
Simon Laux
3d4e96897e Update deltachat-repl/README.md 2024-10-23 14:07:56 +02:00
Simon Laux
3ff271ba93 name remaining blocking tasks 2024-10-23 14:07:56 +02:00
Simon Laux
95aaaee43c move #[allow(unexpected_cfgs)] 2024-10-23 14:07:56 +02:00
Simon Laux
087050c27d Apply suggestions from code review: Typo & wording fixes
Co-authored-by: iequidoo <117991069+iequidoo@users.noreply.github.com>
2024-10-23 14:07:56 +02:00
Simon Laux
1404cbd3ef cargo fmt 2024-10-23 14:07:56 +02:00
Simon Laux
b965db6657 ignore unexpected_cfgs warning 2024-10-23 14:07:55 +02:00
Simon Laux
6749b90ff5 suppress warning for stdio rpc server 2024-10-23 14:07:55 +02:00
Simon Laux
7bdac58040 cargo fmt 2024-10-23 14:07:55 +02:00
Simon Laux
06c4eb9df0 try again 2024-10-23 14:07:55 +02:00
Simon Laux
7eb34a56f6 attempt at suppressing unknown cfg warnings in deltachat_ffi 2024-10-23 14:07:55 +02:00
Simon Laux
6b0b551973 cargo fmt 2024-10-23 14:07:55 +02:00
Simon Laux
09aa9712c4 add section about using tokio console with desktop 2024-10-23 14:07:55 +02:00
Simon Laux
de7d6753a9 add console-subscriber to rpc-stdio server 2024-10-23 14:07:55 +02:00
Simon Laux
92c10e56d3 import EnvFilter in repl 2024-10-23 14:07:55 +02:00
Simon Laux
c71a6c4af9 commit Cargo.lock 2024-10-23 14:07:55 +02:00
Simon Laux
308053dc44 name spawned tasks in core 2024-10-23 14:05:54 +02:00
Simon Laux
f37fb9574d label blocking tasks (some spawn the task over Handle::current().spawn_blocking(), I marked those with a todo comment until I understand why it doesn't use the normal method) 2024-10-23 14:05:54 +02:00
Simon Laux
baa8da86ba remove unsed warning, update docs 2024-10-23 14:05:54 +02:00
Simon Laux
2cd4af576a add tokio console support to repl tool and create a readme for the repl tool 2024-10-23 14:05:54 +02:00
24 changed files with 582 additions and 122 deletions

225
Cargo.lock generated
View File

@@ -358,6 +358,28 @@ dependencies = [
"tokio",
]
[[package]]
name = "async-stream"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
]
[[package]]
name = "async-trait"
version = "0.1.77"
@@ -384,6 +406,12 @@ dependencies = [
"tokio-util",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "attohttpc"
version = "0.24.1"
@@ -953,6 +981,45 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "console-api"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86ed14aa9c9f927213c6e4f3ef75faaad3406134efe84ba2cb7983431d5f0931"
dependencies = [
"futures-core",
"prost",
"prost-types",
"tonic",
"tracing-core",
]
[[package]]
name = "console-subscriber"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e3a111a37f3333946ebf9da370ba5c5577b18eb342ec683eb488dd21980302"
dependencies = [
"console-api",
"crossbeam-channel",
"crossbeam-utils",
"futures-task",
"hdrhistogram",
"humantime",
"hyper-util",
"prost",
"prost-types",
"serde",
"serde_json",
"thread_local",
"tokio",
"tokio-stream",
"tonic",
"tracing",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "const-oid"
version = "0.9.6"
@@ -1084,6 +1151,15 @@ dependencies = [
"itertools",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.5"
@@ -1270,7 +1346,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown",
"hashbrown 0.14.3",
"lock_api",
"once_cell",
"parking_lot_core",
@@ -1421,6 +1497,7 @@ name = "deltachat-repl"
version = "1.148.1"
dependencies = [
"anyhow",
"console-subscriber",
"deltachat",
"dirs",
"log",
@@ -1429,6 +1506,7 @@ dependencies = [
"rusqlite",
"rustyline",
"tokio",
"tracing",
"tracing-subscriber",
]
@@ -1437,6 +1515,7 @@ name = "deltachat-rpc-server"
version = "1.148.1"
dependencies = [
"anyhow",
"console-subscriber",
"deltachat",
"deltachat-jsonrpc",
"futures-lite 2.3.0",
@@ -1445,6 +1524,7 @@ dependencies = [
"serde_json",
"tokio",
"tokio-util",
"tracing",
"tracing-subscriber",
"yerpc",
]
@@ -2454,7 +2534,26 @@ dependencies = [
"futures-sink",
"futures-util",
"http 0.2.12",
"indexmap",
"indexmap 2.2.5",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "h2"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205"
dependencies = [
"atomic-waker",
"bytes",
"fnv",
"futures-core",
"futures-sink",
"http 1.1.0",
"indexmap 2.2.5",
"slab",
"tokio",
"tokio-util",
@@ -2471,6 +2570,12 @@ dependencies = [
"crunchy",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hashbrown"
version = "0.14.3"
@@ -2486,7 +2591,20 @@ version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af"
dependencies = [
"hashbrown",
"hashbrown 0.14.3",
]
[[package]]
name = "hdrhistogram"
version = "7.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
dependencies = [
"base64 0.21.7",
"byteorder",
"flate2",
"nom",
"num-traits",
]
[[package]]
@@ -2721,7 +2839,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"h2",
"h2 0.3.26",
"http 0.2.12",
"http-body 0.4.6",
"httparse",
@@ -2744,6 +2862,7 @@ dependencies = [
"bytes",
"futures-channel",
"futures-util",
"h2 0.4.6",
"http 1.1.0",
"http-body 1.0.0",
"httparse",
@@ -2773,6 +2892,19 @@ dependencies = [
"webpki-roots",
]
[[package]]
name = "hyper-timeout"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793"
dependencies = [
"hyper 1.4.1",
"hyper-util",
"pin-project-lite",
"tokio",
"tower-service",
]
[[package]]
name = "hyper-util"
version = "0.1.9"
@@ -2895,6 +3027,16 @@ dependencies = [
"nom",
]
[[package]]
name = "indexmap"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
dependencies = [
"autocfg",
"hashbrown 0.12.3",
]
[[package]]
name = "indexmap"
version = "2.2.5"
@@ -2902,7 +3044,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4"
dependencies = [
"equivalent",
"hashbrown",
"hashbrown 0.14.3",
]
[[package]]
@@ -2996,7 +3138,7 @@ dependencies = [
"futures-concurrency",
"futures-lite 2.3.0",
"futures-util",
"indexmap",
"indexmap 2.2.5",
"iroh-base",
"iroh-blake3",
"iroh-metrics",
@@ -4480,6 +4622,38 @@ dependencies = [
"unarray",
]
[[package]]
name = "prost"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-derive"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn 2.0.79",
]
[[package]]
name = "prost-types"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670"
dependencies = [
"prost",
]
[[package]]
name = "qr2term"
version = "0.3.3"
@@ -6001,6 +6175,7 @@ dependencies = [
"signal-hook-registry",
"socket2",
"tokio-macros",
"tracing",
"windows-sys 0.48.0",
]
@@ -6120,7 +6295,7 @@ dependencies = [
"futures-io",
"futures-sink",
"futures-util",
"hashbrown",
"hashbrown 0.14.3",
"pin-project-lite",
"tokio",
]
@@ -6152,13 +6327,43 @@ version = "0.22.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "583c44c02ad26b0c3f3066fe629275e50627026c51ac2e595cca4c230ce1ce1d"
dependencies = [
"indexmap",
"indexmap 2.2.5",
"serde",
"serde_spanned",
"toml_datetime",
"winnow",
]
[[package]]
name = "tonic"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64 0.22.1",
"bytes",
"h2 0.4.6",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper-timeout",
"hyper-util",
"percent-encoding",
"pin-project",
"prost",
"socket2",
"tokio",
"tokio-stream",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower"
version = "0.4.13"
@@ -6167,9 +6372,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"indexmap 1.9.3",
"pin-project",
"pin-project-lite",
"rand 0.8.5",
"slab",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",

View File

@@ -18,7 +18,6 @@ use deltachat::constants::DC_MSG_ID_DAYMARKER;
use deltachat::contact::{may_be_valid_addr, Contact, ContactId, Origin};
use deltachat::context::get_info;
use deltachat::ephemeral::Timer;
use deltachat::location;
use deltachat::message::get_msg_read_receipts;
use deltachat::message::{
self, delete_msgs, markseen_msgs, Message, MessageState, MsgId, Viewtype,
@@ -35,6 +34,7 @@ use deltachat::stock_str::StockMessage;
use deltachat::webxdc::StatusUpdateSerial;
use deltachat::EventEmitter;
use deltachat::{imex, info};
use deltachat::{location, spawn_named_task};
use sanitize_filename::is_sanitized;
use tokio::fs;
use tokio::sync::{watch, Mutex, RwLock};
@@ -1777,7 +1777,7 @@ impl CommandApi {
let ctx = self.get_context(account_id).await?;
let fut = send_webxdc_realtime_advertisement(&ctx, MsgId::new(instance_msg_id)).await?;
if let Some(fut) = fut {
tokio::spawn(async move {
spawn_named_task!("send_webxdc_realtime_advertisement", async move {
fut.await.ok();
info!(ctx, "send_webxdc_realtime_advertisement done")
});

View File

@@ -7,6 +7,7 @@ repository = "https://github.com/deltachat/deltachat-core-rust"
[dependencies]
anyhow = { workspace = true }
console-subscriber = "0.4.0"
deltachat = { workspace = true, features = ["internals"]}
dirs = "5"
log = { workspace = true }
@@ -15,6 +16,7 @@ qr2term = "0.3.3"
rusqlite = { workspace = true }
rustyline = "14"
tokio = { workspace = true, features = ["fs", "rt-multi-thread", "macros"] }
tracing = "0.1.40"
tracing-subscriber = { workspace = true, features = ["env-filter"] }
[features]

57
deltachat-repl/README.md Normal file
View File

@@ -0,0 +1,57 @@
# Delta Chat REPL
This is a [REPL](https://en.wikipedia.org/wiki/Read%E2%80%93eval%E2%80%93print_loop) frontend built on top of Delta Chat core.
Its purpose is to help with testing during development, it is not meant for end users.
Dependencies:
- If you want to use `getqr` you need `qrencode` (To install, use your system's package manager)
## Usage
```
cargo run <path to deltachat db>
```
Type in `help` to learn about what commands are available.
## Usage with `tokio-console`
Tokio is an async runtime that Delta Chat core uses.
Core uses Tokio tasks, which are something similar to threads.
`tokio-console` is like a task manager for these Tokio tasks.
Examples of tasks:
- The event loop in the REPL tool which processes events received from core
- The REPL loop itself which waits for and executes user commands
- The IMAP task that manages IMAP connection in core
```
RUSTFLAGS="--cfg tokio_unstable" cargo run <path to deltachat db>
```
Then in a new console window start [`tokio-console`](https://github.com/tokio-rs/console).
You can install it via `cargo install --locked tokio-console`.
### Example
An example session in the REPL tool:
```
RUSTFLAGS="--cfg tokio_unstable" cargo run test-db/db
setqr dcaccount:https://nine.testrun.org/new
configure
connect
listchats
getqr
```
Use the qrcode/openpgp4fpr link to setup the contact on Delta Chat.
Then write a message to that new contact, after that you can accept the chat in the REPL tool and send a reply:
```
listchats
accept 12
chat 12
send hi!
chat
```

View File

@@ -30,7 +30,7 @@ use rustyline::{
};
use tokio::fs;
use tokio::runtime::Handle;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::{prelude::__tracing_subscriber_SubscriberExt, EnvFilter, Layer};
mod cmdline;
use self::cmdline::*;
@@ -317,7 +317,7 @@ async fn start(args: Vec<String>) -> Result<(), Error> {
.await?;
let events = context.get_event_emitter();
tokio::task::spawn(async move {
spawn_named_task!("repl:receive_event", async move {
while let Some(event) = events.recv().await {
receive_event(event.typ);
}
@@ -333,7 +333,7 @@ async fn start(args: Vec<String>) -> Result<(), Error> {
let mut selected_chat = ChatId::default();
let ctx = context.clone();
let input_loop = tokio::task::spawn_blocking(move || {
let input_loop = spawn_named_blocking_task!("repl:input_loop", move || {
let h = DcHelper {
completer: FilenameCompleter::new(),
highlighter: MatchingBracketHighlighter::new(),
@@ -353,7 +353,6 @@ async fn start(args: Vec<String>) -> Result<(), Error> {
match readline {
Ok(line) => {
// TODO: ignore "set mail_pw"
rl.add_history_entry(line.as_str())?;
let should_continue = Handle::current().block_on(async {
match handle_cmd(line.trim(), ctx.clone(), &mut selected_chat).await {
@@ -481,11 +480,22 @@ async fn handle_cmd(
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::from_default_env().add_directive("deltachat_repl=info".parse()?),
)
.init();
#[allow(unexpected_cfgs)]
tracing::subscriber::set_global_default({
let subscribers = tracing_subscriber::Registry::default().with(
tracing_subscriber::fmt::layer().with_filter(
EnvFilter::from_default_env().add_directive("deltachat_repl=info".parse()?),
),
);
#[cfg(tokio_unstable)]
{
subscribers.with(console_subscriber::spawn())
}
#[cfg(not(tokio_unstable))]
{
subscribers
}
})?;
let args = std::env::args().collect();
start(args).await?;

View File

@@ -22,6 +22,8 @@ tokio = { workspace = true, features = ["io-std"] }
tokio-util = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
yerpc = { workspace = true, features = ["anyhow_expose", "openrpc"] }
console-subscriber = "0.4.0"
tracing = "0.1.40"
[features]
default = ["vendored"]

View File

@@ -35,3 +35,21 @@ languages other than Rust, for example:
Run `deltachat-rpc-server --version` to check the version of the server.
Run `deltachat-rpc-server --openrpc` to get [OpenRPC](https://open-rpc.org/) specification of the provided JSON-RPC API.
## Usage with `tokio-console`
When built with `RUSTFLAGS="--cfg tokio_unstable"`, console-subscriber is enabled.
That means that you can use [`tokio-console`](https://github.com/tokio-rs/console) to inspect active Tokio tasks.
You can install it via `cargo install tokio-console`.
```sh
RUSTFLAGS="--cfg tokio_unstable" cargo run
```
### Usage in deltachat-desktop:
Follow steps from `deltachat-desktop/docs/UPDATE_CORE.md`, but run the `make_local_dev_version` script with the `tokio_unstable` rustflag:
```sh
RUSTFLAGS="--cfg tokio_unstable" python3 deltachat-rpc-server/npm-package/scripts/make_local_dev_version.py
```

View File

@@ -7,11 +7,11 @@ use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{anyhow, Context as _, Result};
use deltachat::constants::DC_VERSION_STR;
use deltachat::{constants::DC_VERSION_STR, spawn_named_task};
use deltachat_jsonrpc::api::{Accounts, CommandApi};
use futures_lite::stream::StreamExt;
use tokio::io::{self, AsyncBufReadExt, BufReader};
use tracing_subscriber::EnvFilter;
use tracing_subscriber::{prelude::*, EnvFilter};
use yerpc::RpcServer as _;
#[cfg(target_family = "unix")]
@@ -67,10 +67,23 @@ async fn main_impl() -> Result<()> {
// Logs from `log` crate and traces from `tracing` crate
// are configurable with `RUST_LOG` environment variable
// and go to stderr to avoid interferring with JSON-RPC using stdout.
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.with_writer(std::io::stderr)
.init();
#[allow(unexpected_cfgs)]
tracing::subscriber::set_global_default({
let subscribers = tracing_subscriber::Registry::default().with(
tracing_subscriber::fmt::layer()
.with_writer(std::io::stderr)
.with_filter(EnvFilter::from_default_env()),
);
#[cfg(tokio_unstable)]
{
subscribers.with(console_subscriber::spawn())
}
#[cfg(not(tokio_unstable))]
{
subscribers
}
})?;
let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string());
log::info!("Starting with accounts directory `{}`.", path);
@@ -87,7 +100,7 @@ async fn main_impl() -> Result<()> {
// Send task prints JSON responses to stdout.
let cancel = main_cancel.clone();
let send_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
let send_task: JoinHandle<anyhow::Result<()>> = spawn_named_task!("send_task", async move {
let _cancel_guard = cancel.clone().drop_guard();
loop {
let message = tokio::select! {
@@ -104,24 +117,25 @@ async fn main_impl() -> Result<()> {
});
let cancel = main_cancel.clone();
let sigterm_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
#[cfg(target_family = "unix")]
{
let _cancel_guard = cancel.clone().drop_guard();
tokio::select! {
_ = cancel.cancelled() => (),
_ = sigterm.recv() => {
log::info!("got SIGTERM");
let sigterm_task: JoinHandle<anyhow::Result<()>> =
spawn_named_task!("sigterm_task", async move {
#[cfg(target_family = "unix")]
{
let _cancel_guard = cancel.clone().drop_guard();
tokio::select! {
_ = cancel.cancelled() => (),
_ = sigterm.recv() => {
log::info!("got SIGTERM");
}
}
}
}
let _ = cancel;
Ok(())
});
let _ = cancel;
Ok(())
});
// Receiver task reads JSON requests from stdin.
let cancel = main_cancel.clone();
let recv_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
let recv_task: JoinHandle<anyhow::Result<()>> = spawn_named_task!("recv_task", async move {
let _cancel_guard = cancel.clone().drop_guard();
let stdin = io::stdin();
let mut lines = BufReader::new(stdin).lines();
@@ -143,7 +157,7 @@ async fn main_impl() -> Result<()> {
};
log::trace!("RPC recv {}", message);
let session = session.clone();
tokio::spawn(async move {
spawn_named_task!("handle_incoming", async move {
session.handle_incoming(&message).await;
});
}

View File

@@ -29,9 +29,11 @@ skip = [
{ name = "futures-lite", version = "1.13.0" },
{ name = "getrandom", version = "<0.2" },
{ name = "h2", version = "0.3.26" },
{ name = "hashbrown", version = "0.12.3"},
{ name = "http-body", version = "0.4.6" },
{ name = "http", version = "0.2.12" },
{ name = "hyper", version = "0.14.28" },
{ name = "indexmap", version = "1.9.3" },
{ name = "nix", version = "0.26.4" },
{ name = "quick-error", version = "<2.0" },
{ name = "rand_chacha", version = "<0.3" },

View File

@@ -400,36 +400,39 @@ impl Config {
#[cfg(not(target_os = "ios"))]
async fn create_lock_task(dir: PathBuf) -> Result<Option<JoinHandle<anyhow::Result<()>>>> {
use crate::spawn_named_task;
let lockfile = dir.join(LOCKFILE_NAME);
let mut lock = fd_lock::RwLock::new(fs::File::create(lockfile).await?);
let (locked_tx, locked_rx) = oneshot::channel();
let lock_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
let mut timeout = Duration::from_millis(100);
let _guard = loop {
match lock.try_write() {
Ok(guard) => break Ok(guard),
Err(err) => {
if timeout.as_millis() > 1600 {
break Err(err);
}
// We need to wait for the previous lock_task to be aborted thus unlocking
// the lockfile. We don't open configs for writing often outside of the
// tests, so this adds delays to the tests, but otherwise ok.
sleep(timeout).await;
if err.kind() == std::io::ErrorKind::WouldBlock {
timeout *= 2;
let lock_task: JoinHandle<anyhow::Result<()>> =
spawn_named_task!("lock_task", async move {
let mut timeout = Duration::from_millis(100);
let _guard = loop {
match lock.try_write() {
Ok(guard) => break Ok(guard),
Err(err) => {
if timeout.as_millis() > 1600 {
break Err(err);
}
// We need to wait for the previous lock_task to be aborted thus unlocking
// the lockfile. We don't open configs for writing often outside of the
// tests, so this adds delays to the tests, but otherwise ok.
sleep(timeout).await;
if err.kind() == std::io::ErrorKind::WouldBlock {
timeout *= 2;
}
}
}
}
}?;
locked_tx
.send(())
.ok()
.context("Cannot notify about lockfile locking")?;
let (_tx, rx) = oneshot::channel();
rx.await?;
Ok(())
});
}?;
locked_tx
.send(())
.ok()
.context("Cannot notify about lockfile locking")?;
let (_tx, rx) = oneshot::channel();
rx.await?;
Ok(())
});
locked_rx.await?;
Ok(Some(lock_task))
}

View File

@@ -12,12 +12,10 @@ use deltachat_contact_tools::{sanitize_bidi_characters, sanitize_single_line, Co
use deltachat_derive::{FromSql, ToSql};
use serde::{Deserialize, Serialize};
use strum_macros::EnumIter;
use tokio::task;
use crate::aheader::EncryptPreference;
use crate::blob::BlobObject;
use crate::chatlist::Chatlist;
use crate::chatlist_events;
use crate::color::str_to_color;
use crate::config::Config;
use crate::constants::{
@@ -50,6 +48,7 @@ use crate::tools::{
truncate_msg_text, IsNoneOrEmpty, SystemTime,
};
use crate::webxdc::StatusUpdateSerial;
use crate::{chatlist_events, spawn_named_task};
/// An chat item, such as a message or a marker.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
@@ -1476,7 +1475,7 @@ impl ChatId {
/// and otherwise notifying the user accordingly.
pub(crate) fn spawn_securejoin_wait(self, context: &Context, timeout: u64) {
let context = context.clone();
task::spawn(async move {
spawn_named_task!("securejoin_wait", async move {
tokio::time::sleep(Duration::from_secs(timeout)).await;
let chat = Chat::load_from_db(&context, self).await?;
chat.check_securejoin_wait(&context, 0).await?;

View File

@@ -21,7 +21,6 @@ use futures::FutureExt;
use futures_lite::FutureExt as _;
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
use server_params::{expand_param_vector, ServerParams};
use tokio::task;
use crate::config::{self, Config};
use crate::context::Context;
@@ -35,10 +34,10 @@ use crate::message::{Message, Viewtype};
use crate::oauth2::get_oauth2_addr;
use crate::provider::{Protocol, Socket, UsernamePattern};
use crate::smtp::Smtp;
use crate::stock_str;
use crate::sync::Sync::*;
use crate::tools::time;
use crate::{chat, e2ee, provider};
use crate::{spawn_named_task, stock_str};
use deltachat_contact_tools::addr_cmp;
macro_rules! progress {
@@ -372,7 +371,9 @@ async fn configure(ctx: &Context, param: &EnteredLoginParam) -> Result<Configure
progress!(ctx, 1);
let ctx2 = ctx.clone();
let update_device_chats_handle = task::spawn(async move { ctx2.update_device_chats().await });
let update_device_chats_handle = spawn_named_task!("update_device_chats", async move {
ctx2.update_device_chats().await
});
let configured_param = get_configured_param(ctx, param).await?;
let strict_tls = configured_param.strict_tls();
@@ -387,7 +388,7 @@ async fn configure(ctx: &Context, param: &EnteredLoginParam) -> Result<Configure
let smtp_addr = configured_param.addr.clone();
let proxy_config = configured_param.proxy_config.clone();
let smtp_config_task = task::spawn(async move {
let smtp_config_task = spawn_named_task!("smtp_config", async move {
let mut smtp = Smtp::new();
smtp.connect(
&context_smtp,

View File

@@ -37,7 +37,7 @@ use crate::peerstate::Peerstate;
use crate::sql::{self, params_iter};
use crate::sync::{self, Sync::*};
use crate::tools::{duration_to_str, get_abs_path, smeared_time, time, SystemTime};
use crate::{chat, chatlist_events, stock_str};
use crate::{chat, chatlist_events, spawn_named_task, stock_str};
/// Time during which a contact is considered as seen recently.
const SEEN_RECENTLY_SECONDS: i64 = 600;
@@ -1791,7 +1791,7 @@ impl RecentlySeenLoop {
pub(crate) fn new(context: Context) -> Self {
let (interrupt_send, interrupt_recv) = channel::bounded(1);
let handle = task::spawn(Self::run(context, interrupt_recv));
let handle = spawn_named_task!("recently_seen_loop", Self::run(context, interrupt_recv));
Self {
handle,
interrupt_send,

View File

@@ -5,6 +5,7 @@ use crate::context::Context;
use crate::events::EventType;
use crate::message::{Message, MsgId, Viewtype};
use crate::param::Param;
use crate::spawn_named_task;
use crate::tools::time;
use crate::webxdc::StatusUpdateItem;
use async_channel::{self as channel, Receiver, Sender};
@@ -150,7 +151,7 @@ pub(crate) async fn set_debug_logging_xdc(ctx: &Context, id: Option<MsgId>) -> a
let (sender, debug_logging_recv) = channel::bounded(1000);
let loop_handle = {
let ctx = ctx.clone();
task::spawn(async move {
spawn_named_task!("xdc_debug_logging_loop", async move {
debug_logging_loop(&ctx, debug_logging_recv).await
})
};

View File

@@ -24,7 +24,6 @@ use rand::Rng;
use ratelimit::Ratelimit;
use url::Url;
use crate::chat::{self, ChatId, ChatIdBlocked};
use crate::chatlist_events;
use crate::config::Config;
use crate::constants::{self, Blocked, Chattype, ShowEmails};
@@ -48,6 +47,10 @@ use crate::scheduler::connectivity::ConnectivityStore;
use crate::sql;
use crate::stock_str;
use crate::tools::{self, create_id, duration_to_str};
use crate::{
chat::{self, ChatId, ChatIdBlocked},
spawn_named_task,
};
pub(crate) mod capabilities;
mod client;
@@ -1556,7 +1559,9 @@ impl Session {
} else if !context.push_subscriber.heartbeat_subscribed().await {
let context = context.clone();
// Subscribe for heartbeat notifications.
tokio::spawn(async move { context.push_subscriber.subscribe(&context).await });
spawn_named_task!("subscribe_to_heartbeat_notifications", async move {
context.push_subscriber.subscribe(&context).await
});
}
Ok(())

View File

@@ -46,7 +46,7 @@ use crate::message::{Message, Viewtype};
use crate::qr::Qr;
use crate::stock_str::backup_transfer_msg_body;
use crate::tools::{create_id, time, TempPathGuard};
use crate::EventType;
use crate::{spawn_named_task, EventType};
use super::{export_backup_stream, export_database, import_backup_stream, DBFILE_BACKUP_NAME};
@@ -132,7 +132,7 @@ impl BackupProvider {
let drop_token = drop_token.clone();
let endpoint = endpoint.clone();
let auth_token = auth_token.clone();
tokio::spawn(async move {
spawn_named_task!("accept_loop", async move {
Self::accept_loop(
context.clone(),
endpoint,

View File

@@ -12,13 +12,13 @@ use pgp::composed::Deserializable;
pub use pgp::composed::{SignedPublicKey, SignedSecretKey};
use pgp::ser::Serialize;
use pgp::types::{KeyTrait, SecretKeyTrait};
use tokio::runtime::Handle;
use crate::config::Config;
use crate::constants::KeyGenType;
use crate::context::Context;
use crate::log::LogExt;
use crate::pgp::KeyPair;
use crate::spawn_named_blocking_task;
use crate::tools::{self, time_elapsed};
/// Convenience trait for working with keys.
@@ -251,8 +251,10 @@ async fn generate_keypair(context: &Context) -> Result<KeyPair> {
let keytype = KeyGenType::from_i32(context.get_config_int(Config::KeyGenType).await?)
.unwrap_or_default();
info!(context, "Generating keypair with type {}", keytype);
let keypair = Handle::current()
.spawn_blocking(move || crate::pgp::create_keypair(addr, keytype))
let keypair =
spawn_named_blocking_task!("generate_keypair", move || crate::pgp::create_keypair(
addr, keytype
))
.await??;
store_self_keypair(context, &keypair, KeyPairUse::Default).await?;

View File

@@ -11,6 +11,7 @@ use crate::context::Context;
use crate::net::proxy::ProxyConfig;
use crate::net::session::SessionStream;
use crate::net::tls::wrap_rustls;
use crate::spawn_named_task;
/// HTTP(S) GET response.
#[derive(Debug)]
@@ -85,7 +86,7 @@ where
let io = TokioIo::new(stream);
let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
tokio::task::spawn(conn);
spawn_named_task!("http_connection", conn);
Ok(sender)
}

View File

@@ -45,7 +45,7 @@ use crate::context::Context;
use crate::headerdef::HeaderDef;
use crate::message::{Message, MsgId, Viewtype};
use crate::mimeparser::SystemMessage;
use crate::EventType;
use crate::{spawn_named_task, EventType};
/// The length of an ed25519 `PublicKey`, in bytes.
const PUBLIC_KEY_LENGTH: usize = 32;
@@ -125,7 +125,7 @@ impl Iroh {
.split();
let ctx = ctx.clone();
let subscribe_loop = tokio::spawn(async move {
let subscribe_loop = spawn_named_task!("subscribe_loop", async move {
if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await {
warn!(ctx, "subscribe_loop failed: {e}")
}
@@ -264,7 +264,10 @@ impl Context {
let context = self.clone();
// Shuts down on deltachat shutdown
tokio::spawn(endpoint_loop(context, endpoint.clone(), gossip.clone()));
spawn_named_task!(
"endpoint_loop",
endpoint_loop(context, endpoint.clone(), gossip.clone())
);
Ok(Iroh {
endpoint,
@@ -442,7 +445,7 @@ async fn endpoint_loop(context: Context, endpoint: Endpoint, gossip: Gossip) {
info!(context, "IROH_REALTIME: accepting iroh connection");
let gossip = gossip.clone();
let context = context.clone();
tokio::spawn(async move {
spawn_named_task!("handle_connection", async move {
if let Err(err) = handle_connection(&context, conn, gossip).await {
warn!(context, "IROH_REALTIME: iroh connection error: {err}");
}

View File

@@ -16,10 +16,10 @@ use pgp::crypto::hash::HashAlgorithm;
use pgp::crypto::sym::SymmetricKeyAlgorithm;
use pgp::types::{CompressionAlgorithm, KeyTrait, Mpi, PublicKeyTrait, StringToKey};
use rand::{thread_rng, CryptoRng, Rng};
use tokio::runtime::Handle;
use crate::constants::KeyGenType;
use crate::key::{DcKey, Fingerprint};
use crate::spawn_named_blocking_task;
#[allow(missing_docs)]
#[cfg(test)]
@@ -250,33 +250,32 @@ pub async fn pk_encrypt(
) -> Result<String> {
let lit_msg = Message::new_literal_bytes("", plain);
Handle::current()
.spawn_blocking(move || {
let pkeys: Vec<SignedPublicKeyOrSubkey> = public_keys_for_encryption
.iter()
.filter_map(select_pk_for_encryption)
.collect();
let pkeys_refs: Vec<&SignedPublicKeyOrSubkey> = pkeys.iter().collect();
spawn_named_blocking_task!("pk_encrypt", move || {
let pkeys: Vec<SignedPublicKeyOrSubkey> = public_keys_for_encryption
.iter()
.filter_map(select_pk_for_encryption)
.collect();
let pkeys_refs: Vec<&SignedPublicKeyOrSubkey> = pkeys.iter().collect();
let mut rng = thread_rng();
let mut rng = thread_rng();
let encrypted_msg = if let Some(ref skey) = private_key_for_signing {
let signed_msg = lit_msg.sign(skey, || "".into(), HASH_ALGORITHM)?;
let compressed_msg = if compress {
signed_msg.compress(CompressionAlgorithm::ZLIB)?
} else {
signed_msg
};
compressed_msg.encrypt_to_keys(&mut rng, SYMMETRIC_KEY_ALGORITHM, &pkeys_refs)?
let encrypted_msg = if let Some(ref skey) = private_key_for_signing {
let signed_msg = lit_msg.sign(skey, || "".into(), HASH_ALGORITHM)?;
let compressed_msg = if compress {
signed_msg.compress(CompressionAlgorithm::ZLIB)?
} else {
lit_msg.encrypt_to_keys(&mut rng, SYMMETRIC_KEY_ALGORITHM, &pkeys_refs)?
signed_msg
};
compressed_msg.encrypt_to_keys(&mut rng, SYMMETRIC_KEY_ALGORITHM, &pkeys_refs)?
} else {
lit_msg.encrypt_to_keys(&mut rng, SYMMETRIC_KEY_ALGORITHM, &pkeys_refs)?
};
let encoded_msg = encrypted_msg.to_armored_string(Default::default())?;
let encoded_msg = encrypted_msg.to_armored_string(Default::default())?;
Ok(encoded_msg)
})
.await?
Ok(encoded_msg)
})
.await?
}
/// Signs `plain` text using `private_key_for_signing`.
@@ -367,7 +366,7 @@ pub async fn symm_encrypt(passphrase: &str, plain: &[u8]) -> Result<String> {
let lit_msg = Message::new_literal_bytes("", plain);
let passphrase = passphrase.to_string();
tokio::task::spawn_blocking(move || {
spawn_named_blocking_task!("symm_encrypt", move || {
let mut rng = thread_rng();
let s2k = StringToKey::new_default(&mut rng);
let msg =
@@ -388,7 +387,7 @@ pub async fn symm_decrypt<T: std::io::Read + std::io::Seek>(
let (enc_msg, _) = Message::from_armor_single(ctext)?;
let passphrase = passphrase.to_string();
tokio::task::spawn_blocking(move || {
spawn_named_blocking_task!("symm_decrypt", move || {
let msg = enc_msg.decrypt_with_password(|| passphrase)?;
match msg.get_content()? {

View File

@@ -19,12 +19,12 @@ use crate::download::{download_msg, DownloadState};
use crate::ephemeral::{self, delete_expired_imap_messages};
use crate::events::EventType;
use crate::imap::{session::Session, FolderMeaning, Imap};
use crate::location;
use crate::log::LogExt;
use crate::message::MsgId;
use crate::smtp::{send_smtp_messages, Smtp};
use crate::sql;
use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
use crate::{location, spawn_named_task};
pub(crate) mod connectivity;
@@ -164,7 +164,7 @@ impl SchedulerState {
}
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
spawn_named_task!("pause", async move {
rx.await.ok();
let mut inner = context.scheduler.inner.write().await;
match *inner {
@@ -849,7 +849,10 @@ impl Scheduler {
let (inbox_start_send, inbox_start_recv) = oneshot::channel();
let handle = {
let ctx = ctx.clone();
task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
spawn_named_task!(
"inbox_loop",
inbox_loop(ctx, inbox_start_send, inbox_handlers)
)
};
let inbox = SchedBox {
meaning: FolderMeaning::Inbox,
@@ -866,7 +869,10 @@ impl Scheduler {
let (conn_state, handlers) = ImapConnectionState::new(ctx).await?;
let (start_send, start_recv) = oneshot::channel();
let ctx = ctx.clone();
let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
let handle = spawn_named_task!(
"simple_imap_loop",
simple_imap_loop(ctx, start_send, handlers, meaning)
);
oboxes.push(SchedBox {
meaning,
conn_state,
@@ -878,20 +884,20 @@ impl Scheduler {
let smtp_handle = {
let ctx = ctx.clone();
task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
spawn_named_task!("smtp_loop", smtp_loop(ctx, smtp_start_send, smtp_handlers))
};
start_recvs.push(smtp_start_recv);
let ephemeral_handle = {
let ctx = ctx.clone();
task::spawn(async move {
spawn_named_task!("ephemeral_loop", async move {
ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
})
};
let location_handle = {
let ctx = ctx.clone();
task::spawn(async move {
spawn_named_task!("location_loop", async move {
location::location_loop(&ctx, location_interrupt_recv).await;
})
};

View File

@@ -6,7 +6,6 @@ pub mod send;
use anyhow::{bail, format_err, Context as _, Error, Result};
use async_smtp::response::{Category, Code, Detail};
use async_smtp::{EmailAddress, SmtpTransport};
use tokio::task;
use crate::chat::{add_info_msg_with_cmd, ChatId};
use crate::config::Config;
@@ -21,9 +20,9 @@ use crate::mimefactory::MimeFactory;
use crate::net::proxy::ProxyConfig;
use crate::net::session::SessionBufStream;
use crate::scheduler::connectivity::ConnectivityStore;
use crate::sql;
use crate::stock_str::unencrypted_email;
use crate::tools::{self, time_elapsed};
use crate::{spawn_named_task, sql};
#[derive(Default)]
pub(crate) struct Smtp {
@@ -56,7 +55,7 @@ impl Smtp {
// Closing connection with a QUIT command may take some time, especially if it's a
// stale connection and an attempt to send the command times out. Send a command in a
// separate task to avoid waiting for reply or timeout.
task::spawn(async move { transport.quit().await });
spawn_named_task!("disconnect SMTP", async move { transport.quit().await });
}
self.last_success = None;
}

View File

@@ -41,6 +41,7 @@ use crate::peerstate::Peerstate;
use crate::pgp::KeyPair;
use crate::receive_imf::receive_imf;
use crate::securejoin::{get_securejoin_qr, join_securejoin};
use crate::spawn_named_task;
use crate::stock_str::StockStrings;
#[allow(non_upper_case_globals)]
@@ -969,7 +970,7 @@ impl InnerLogSink {
/// Subscribes this log sink to event emitter.
pub fn subscribe(&self, event_emitter: EventEmitter) {
let sender = self.sender.clone();
task::spawn(async move {
spawn_named_task!("InnerLogSink", async move {
while let Some(event) = event_emitter.recv().await {
sender.try_send(LogEvent::Event(event.clone())).ok();
}

View File

@@ -708,6 +708,132 @@ pub(crate) fn inc_and_check<T: PrimInt + AddAssign + std::fmt::Debug>(
Ok(())
}
/// Spawns a named asynchronous task if the `tokio_unstable` feature is enabled.
///
/// Spawns a new asynchronous task, returning a [tokio::task::JoinHandle] for it.
/// The provided future will start running in the background immediately when the function is called, even if you don't await the returned JoinHandle.
/// See [tokio::task::spawn].
///
/// If the rustflag `tokio_unstable` is active, the task will be given the specified `name`
/// for easier identification in monitoring tools (like tokio-console).
/// If `tokio_unstable` is not set, the task is spawned normally without a name.
///
/// # Parameters
///
/// - `name`: The name of the task.
/// - `future`: The future to be executed, which must implement `Future`, be `Send`, and `'static`.
///
/// # Returns
///
/// A [tokio::task::JoinHandle] that can be awaited to retrieve the output of the future.
///
/// # Panics
///
/// Panics if the task fails to spawn when `tokio_unstable` is enabled.
///
/// # Example
///
/// ```
/// use deltachat::spawn_named_task; // or inside of core: crate::spawn_named_task
///
/// let handle = spawn_named_task!("my_task", async {
/// // Your async code here
/// });
///
/// let result = handle.await.unwrap();
/// ```
#[macro_export]
macro_rules! spawn_named_task {
($name:expr, $future:expr) => {{
#[inline(always)]
#[allow(unused_variables, unexpected_cfgs)]
pub fn __spawn_named_task<Fut>(
name: &str,
future: Fut,
) -> ::tokio::task::JoinHandle<Fut::Output>
where
Fut: ::std::future::Future + Send + 'static,
Fut::Output: Send + 'static,
{
#[cfg(tokio_unstable)]
{
::tokio::task::Builder::new()
.name(name)
.spawn(future)
.expect("Failed to spawn task")
}
#[cfg(not(tokio_unstable))]
{
::tokio::task::spawn(future)
}
}
__spawn_named_task($name, $future)
}};
}
/// Spawns a named blocking task if the `tokio_unstable` feature is enabled.
///
/// Spawns a new blocking task, returning a [tokio::task::JoinHandle] for it.
/// The provided future will start running in the background immediately when the function is called, even if you don't await the returned JoinHandle.
/// See [tokio::task::spawn_blocking].
///
/// If the rustflag `tokio_unstable` is active, the task will be given the specified `name`
/// for easier identification in monitoring tools (like tokio-console).
/// If `tokio_unstable` is not set, the task is spawned normally without a name.
///
/// # Parameters
///
/// - `name`: The name of the task
/// - `future`: The future to be executed, which must implement `Future`, be `Send`, and `'static`.
///
/// # Returns
///
/// A [tokio::task::JoinHandle] that can be awaited to retrieve the output of the future.
///
/// # Panics
///
/// Panics if the task fails to spawn when `tokio_unstable` is enabled.
///
/// # Example
///
/// ```
/// use deltachat::spawn_named_blocking_task; // or inside of core: crate::spawn_named_blocking_task
///
/// let handle = spawn_named_blocking_task!("my_task", async {
/// // Your async code here
/// });
///
/// let result = handle.await.unwrap();
/// ```
#[macro_export]
macro_rules! spawn_named_blocking_task {
($name:expr, $future:expr) => {{
#[inline(always)]
#[allow(unused_variables, unexpected_cfgs)]
pub fn __spawn_named_blocking_task<Fut, ReturnType>(
name: &str,
future: Fut,
) -> ::tokio::task::JoinHandle<ReturnType>
where
Fut: FnOnce() -> ReturnType + Send + 'static,
ReturnType: Send + 'static,
{
#[cfg(tokio_unstable)]
{
::tokio::task::Builder::new()
.name(name)
.spawn_blocking(future)
.expect("Failed to spawn task")
}
#[cfg(not(tokio_unstable))]
{
::tokio::task::spawn_blocking(future)
}
}
__spawn_named_blocking_task($name, $future)
}};
}
#[cfg(test)]
mod tests {
#![allow(clippy::indexing_slicing)]