Compare commits

...

1 Commits

Author SHA1 Message Date
Simon Laux
482abe154e make the repl tool work with the tokio console
https://github.com/tokio-rs/console/tree/main

this can help with debugging async stuff.
2023-05-22 21:02:30 +02:00
11 changed files with 284 additions and 123 deletions

View File

@@ -9,3 +9,6 @@
# invoking `cargo test` threads are allowed to have a large enough
# stack size without needing to use an optimised build.
RUST_MIN_STACK = "8388608"
[build]
rustflags = ["--cfg", "tokio_unstable"]

127
Cargo.lock generated
View File

@@ -768,6 +768,42 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "console-api"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e"
dependencies = [
"prost",
"prost-types",
"tonic",
"tracing-core",
]
[[package]]
name = "console-subscriber"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57ab2224a0311582eb03adba4caaf18644f7b1f10a760803a803b9b605187fc7"
dependencies = [
"console-api",
"crossbeam-channel",
"crossbeam-utils",
"futures",
"hdrhistogram",
"humantime 2.1.0",
"prost-types",
"serde",
"serde_json",
"thread_local",
"tokio",
"tokio-stream",
"tonic",
"tracing",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "const-oid"
version = "0.9.2"
@@ -1246,6 +1282,7 @@ version = "1.115.0"
dependencies = [
"ansi_term",
"anyhow",
"console-subscriber",
"deltachat",
"dirs",
"log",
@@ -2208,6 +2245,19 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "hdrhistogram"
version = "7.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8"
dependencies = [
"base64 0.13.1",
"byteorder",
"flate2",
"nom",
"num-traits",
]
[[package]]
name = "heck"
version = "0.4.1"
@@ -2369,6 +2419,18 @@ dependencies = [
"want",
]
[[package]]
name = "hyper-timeout"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
dependencies = [
"hyper",
"pin-project-lite",
"tokio",
"tokio-io-timeout",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
@@ -3615,6 +3677,38 @@ dependencies = [
"unarray",
]
[[package]]
name = "prost"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-derive"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "prost-types"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
dependencies = [
"prost",
]
[[package]]
name = "qrcodegen"
version = "1.8.0"
@@ -4858,6 +4952,7 @@ dependencies = [
"signal-hook-registry",
"socket2",
"tokio-macros",
"tracing",
"windows-sys 0.48.0",
]
@@ -4993,6 +5088,34 @@ dependencies = [
"winnow",
]
[[package]]
name = "tonic"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a"
dependencies = [
"async-trait",
"axum",
"base64 0.21.0",
"bytes",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"tokio",
"tokio-stream",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower"
version = "0.4.13"
@@ -5001,9 +5124,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"indexmap",
"pin-project",
"pin-project-lite",
"rand 0.8.5",
"slab",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",

View File

@@ -84,7 +84,7 @@ strum_macros = "0.24"
tagger = "4.3.4"
textwrap = "0.16.0"
thiserror = "1"
tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros"] }
tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros", "tracing"] }
tokio-io-timeout = "1.2.0"
tokio-stream = { version = "0.1.14", features = ["fs"] }
tokio-tar = { version = "0.3" } # TODO: integrate tokio into async-tar

View File

@@ -7,6 +7,7 @@ edition = "2021"
[dependencies]
ansi_term = "0.12.1"
anyhow = "1"
console-subscriber = "0.1.9"
deltachat = { path = "..", features = ["internals"]}
dirs = "5"
log = "0.4.16"

View File

@@ -315,11 +315,13 @@ async fn start(args: Vec<String>) -> Result<(), Error> {
let context = Context::new(Path::new(&args[1]), 0, Events::new(), StockStrings::new()).await?;
let events = context.get_event_emitter();
tokio::task::spawn(async move {
while let Some(event) = events.recv().await {
receive_event(event.typ);
}
});
tokio::task::Builder::new()
.name("repl:receive_event")
.spawn(async move {
while let Some(event) = events.recv().await {
receive_event(event.typ);
}
})?;
println!("Delta Chat Core is awaiting your commands.");
@@ -331,61 +333,63 @@ async fn start(args: Vec<String>) -> Result<(), Error> {
let mut selected_chat = ChatId::default();
let ctx = context.clone();
let input_loop = tokio::task::spawn_blocking(move || {
let h = DcHelper {
completer: FilenameCompleter::new(),
highlighter: MatchingBracketHighlighter::new(),
hinter: HistoryHinter {},
};
let mut rl = Editor::with_config(config)?;
rl.set_helper(Some(h));
rl.bind_sequence(KeyEvent::alt('N'), Cmd::HistorySearchForward);
rl.bind_sequence(KeyEvent::alt('P'), Cmd::HistorySearchBackward);
if rl.load_history(".dc-history.txt").is_err() {
println!("No previous history.");
}
let input_loop = tokio::task::Builder::new()
.name("repl:input_loop")
.spawn_blocking(move || {
let h = DcHelper {
completer: FilenameCompleter::new(),
highlighter: MatchingBracketHighlighter::new(),
hinter: HistoryHinter {},
};
let mut rl = Editor::with_config(config)?;
rl.set_helper(Some(h));
rl.bind_sequence(KeyEvent::alt('N'), Cmd::HistorySearchForward);
rl.bind_sequence(KeyEvent::alt('P'), Cmd::HistorySearchBackward);
if rl.load_history(".dc-history.txt").is_err() {
println!("No previous history.");
}
loop {
let p = "> ";
let readline = rl.readline(p);
loop {
let p = "> ";
let readline = rl.readline(p);
match readline {
Ok(line) => {
// TODO: ignore "set mail_pw"
rl.add_history_entry(line.as_str())?;
let should_continue = Handle::current().block_on(async {
match handle_cmd(line.trim(), ctx.clone(), &mut selected_chat).await {
Ok(ExitResult::Continue) => true,
Ok(ExitResult::Exit) => {
println!("Exiting ...");
false
}
Err(err) => {
println!("Error: {err:#}");
true
match readline {
Ok(line) => {
// TODO: ignore "set mail_pw"
rl.add_history_entry(line.as_str())?;
let should_continue = Handle::current().block_on(async {
match handle_cmd(line.trim(), ctx.clone(), &mut selected_chat).await {
Ok(ExitResult::Continue) => true,
Ok(ExitResult::Exit) => {
println!("Exiting ...");
false
}
Err(err) => {
println!("Error: {err:#}");
true
}
}
});
if !should_continue {
break;
}
});
if !should_continue {
}
Err(ReadlineError::Interrupted) | Err(ReadlineError::Eof) => {
println!("Exiting...");
break;
}
Err(err) => {
println!("Error: {err:#}");
break;
}
}
Err(ReadlineError::Interrupted) | Err(ReadlineError::Eof) => {
println!("Exiting...");
break;
}
Err(err) => {
println!("Error: {err:#}");
break;
}
}
}
rl.save_history(".dc-history.txt")?;
println!("history saved");
Ok::<_, Error>(())
});
rl.save_history(".dc-history.txt")?;
println!("history saved");
Ok::<_, Error>(())
})?;
context.stop_io().await;
input_loop.await??;
@@ -481,6 +485,7 @@ async fn handle_cmd(
#[tokio::main]
async fn main() -> Result<(), Error> {
console_subscriber::init();
let _ = pretty_env_logger::try_init();
let args = std::env::args().collect();

View File

@@ -181,7 +181,9 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> {
let socks5_enabled = socks5_config.is_some();
let ctx2 = ctx.clone();
let update_device_chats_handle = task::spawn(async move { ctx2.update_device_chats().await });
let update_device_chats_handle = tokio::task::Builder::new()
.name("update_device_chats")
.spawn(async move { ctx2.update_device_chats().await })?;
// Step 1: Load the parameters and check email-address and password
@@ -350,44 +352,46 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> {
.provider
.map_or(socks5_config.is_some(), |provider| provider.opt.strict_tls);
let smtp_config_task = task::spawn(async move {
let mut smtp_configured = false;
let mut errors = Vec::new();
for smtp_server in smtp_servers {
smtp_param.user = smtp_server.username.clone();
smtp_param.server = smtp_server.hostname.clone();
smtp_param.port = smtp_server.port;
smtp_param.security = smtp_server.socket;
smtp_param.certificate_checks = match smtp_server.strict_tls {
Some(true) => CertificateChecks::Strict,
Some(false) => CertificateChecks::AcceptInvalidCertificates,
None => CertificateChecks::Automatic,
};
let smtp_config_task = tokio::task::Builder::new()
.name("smtp_config")
.spawn(async move {
let mut smtp_configured = false;
let mut errors = Vec::new();
for smtp_server in smtp_servers {
smtp_param.user = smtp_server.username.clone();
smtp_param.server = smtp_server.hostname.clone();
smtp_param.port = smtp_server.port;
smtp_param.security = smtp_server.socket;
smtp_param.certificate_checks = match smtp_server.strict_tls {
Some(true) => CertificateChecks::Strict,
Some(false) => CertificateChecks::AcceptInvalidCertificates,
None => CertificateChecks::Automatic,
};
match try_smtp_one_param(
&context_smtp,
&smtp_param,
&socks5_config,
&smtp_addr,
provider_strict_tls,
&mut smtp,
)
.await
{
Ok(_) => {
smtp_configured = true;
break;
match try_smtp_one_param(
&context_smtp,
&smtp_param,
&socks5_config,
&smtp_addr,
provider_strict_tls,
&mut smtp,
)
.await
{
Ok(_) => {
smtp_configured = true;
break;
}
Err(e) => errors.push(e),
}
Err(e) => errors.push(e),
}
}
if smtp_configured {
Ok(smtp_param)
} else {
Err(errors)
}
});
if smtp_configured {
Ok(smtp_param)
} else {
Err(errors)
}
})?;
progress!(ctx, 600);

View File

@@ -1555,7 +1555,10 @@ impl RecentlySeenLoop {
pub(crate) fn new(context: Context) -> Self {
let (interrupt_send, interrupt_recv) = channel::bounded(1);
let handle = task::spawn(Self::run(context, interrupt_recv));
let handle = tokio::task::Builder::new()
.name("recently_seen")
.spawn(Self::run(context, interrupt_recv))
.expect("failed to spawn recently_seen task");
Self {
handle,
interrupt_send,

View File

@@ -143,9 +143,11 @@ pub(crate) async fn set_debug_logging_xdc(ctx: &Context, id: Option<MsgId>) -> a
let (sender, debug_logging_recv) = channel::bounded(1000);
let loop_handle = {
let ctx = ctx.clone();
task::spawn(async move {
debug_logging_loop(&ctx, debug_logging_recv).await
})
tokio::task::Builder::new()
.name("debug_logging")
.spawn(async move {
debug_logging_loop(&ctx, debug_logging_recv).await
})?
};
*debug_logging = Some(DebugLogging {
msg_id,

View File

@@ -364,17 +364,19 @@ pub async fn symm_encrypt(passphrase: &str, plain: &[u8]) -> Result<String> {
let lit_msg = Message::new_literal_bytes("", plain);
let passphrase = passphrase.to_string();
tokio::task::spawn_blocking(move || {
let mut rng = thread_rng();
let s2k = StringToKey::new_default(&mut rng);
let msg =
lit_msg.encrypt_with_password(&mut rng, s2k, Default::default(), || passphrase)?;
tokio::task::Builder::new()
.name("symm_encrypt")
.spawn_blocking(move || {
let mut rng = thread_rng();
let s2k = StringToKey::new_default(&mut rng);
let msg =
lit_msg.encrypt_with_password(&mut rng, s2k, Default::default(), || passphrase)?;
let encoded_msg = msg.to_armored_string(None)?;
let encoded_msg = msg.to_armored_string(None)?;
Ok(encoded_msg)
})
.await?
Ok(encoded_msg)
})?
.await?
}
/// Symmetric decryption.
@@ -385,20 +387,22 @@ pub async fn symm_decrypt<T: std::io::Read + std::io::Seek>(
let (enc_msg, _) = Message::from_armor_single(ctext)?;
let passphrase = passphrase.to_string();
tokio::task::spawn_blocking(move || {
let decryptor = enc_msg.decrypt_with_password(|| passphrase)?;
tokio::task::Builder::new()
.name("symm_decrypt")
.spawn_blocking(move || {
let decryptor = enc_msg.decrypt_with_password(|| passphrase)?;
let msgs = decryptor.collect::<pgp::errors::Result<Vec<_>>>()?;
if let Some(msg) = msgs.first() {
match msg.get_content()? {
Some(content) => Ok(content),
None => bail!("Decrypted message is empty"),
let msgs = decryptor.collect::<pgp::errors::Result<Vec<_>>>()?;
if let Some(msg) = msgs.first() {
match msg.get_content()? {
Some(content) => Ok(content),
None => bail!("Decrypted message is empty"),
}
} else {
bail!("No valid messages found")
}
} else {
bail!("No valid messages found")
}
})
.await?
})?
.await?
}
#[cfg(test)]

View File

@@ -740,7 +740,9 @@ impl Scheduler {
let (inbox_start_send, inbox_start_recv) = oneshot::channel();
let handle = {
let ctx = ctx.clone();
task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
tokio::task::Builder::new()
.name("inbox_loop")
.spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))?
};
let inbox = SchedBox {
meaning: FolderMeaning::Inbox,
@@ -760,7 +762,9 @@ impl Scheduler {
let (conn_state, handlers) = ImapConnectionState::new(&ctx).await?;
let (start_send, start_recv) = oneshot::channel();
let ctx = ctx.clone();
let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
let handle = tokio::task::Builder::new()
.name("simple_imap_loop")
.spawn(simple_imap_loop(ctx, start_send, handlers, meaning))?;
oboxes.push(SchedBox {
meaning,
conn_state,
@@ -772,22 +776,28 @@ impl Scheduler {
let smtp_handle = {
let ctx = ctx.clone();
task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
tokio::task::Builder::new()
.name("smtp_loop")
.spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))?
};
start_recvs.push(smtp_start_recv);
let ephemeral_handle = {
let ctx = ctx.clone();
task::spawn(async move {
ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
})
tokio::task::Builder::new()
.name("ephemeral")
.spawn(async move {
ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
})?
};
let location_handle = {
let ctx = ctx.clone();
task::spawn(async move {
location::location_loop(&ctx, location_interrupt_recv).await;
})
tokio::task::Builder::new()
.name("location")
.spawn(async move {
location::location_loop(&ctx, location_interrupt_recv).await;
})?
};
let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());

View File

@@ -8,7 +8,6 @@ use anyhow::{bail, format_err, Context as _, Error, Result};
use async_smtp::response::{Category, Code, Detail};
use async_smtp::{self as smtp, EmailAddress, SmtpTransport};
use tokio::io::BufStream;
use tokio::task;
use crate::config::Config;
use crate::contact::{Contact, ContactId};
@@ -60,7 +59,10 @@ impl Smtp {
// Closing connection with a QUIT command may take some time, especially if it's a
// stale connection and an attempt to send the command times out. Send a command in a
// separate task to avoid waiting for reply or timeout.
task::spawn(async move { transport.quit().await });
tokio::task::Builder::new()
.name("smtp_disconnect")
.spawn(async move { transport.quit().await })
.expect("failed to spawn smtp_disconnect task");
}
self.last_success = None;
}