make the repl tool work with the tokio console

https://github.com/tokio-rs/console/tree/main

this can help with debugging async stuff.
This commit is contained in:
Simon Laux
2023-05-22 21:02:30 +02:00
parent c68a2e3820
commit 482abe154e
11 changed files with 284 additions and 123 deletions

View File

@@ -9,3 +9,6 @@
# invoking `cargo test` threads are allowed to have a large enough # invoking `cargo test` threads are allowed to have a large enough
# stack size without needing to use an optimised build. # stack size without needing to use an optimised build.
RUST_MIN_STACK = "8388608" RUST_MIN_STACK = "8388608"
[build]
rustflags = ["--cfg", "tokio_unstable"]

127
Cargo.lock generated
View File

@@ -768,6 +768,42 @@ dependencies = [
"crossbeam-utils", "crossbeam-utils",
] ]
[[package]]
name = "console-api"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e"
dependencies = [
"prost",
"prost-types",
"tonic",
"tracing-core",
]
[[package]]
name = "console-subscriber"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57ab2224a0311582eb03adba4caaf18644f7b1f10a760803a803b9b605187fc7"
dependencies = [
"console-api",
"crossbeam-channel",
"crossbeam-utils",
"futures",
"hdrhistogram",
"humantime 2.1.0",
"prost-types",
"serde",
"serde_json",
"thread_local",
"tokio",
"tokio-stream",
"tonic",
"tracing",
"tracing-core",
"tracing-subscriber",
]
[[package]] [[package]]
name = "const-oid" name = "const-oid"
version = "0.9.2" version = "0.9.2"
@@ -1246,6 +1282,7 @@ version = "1.115.0"
dependencies = [ dependencies = [
"ansi_term", "ansi_term",
"anyhow", "anyhow",
"console-subscriber",
"deltachat", "deltachat",
"dirs", "dirs",
"log", "log",
@@ -2208,6 +2245,19 @@ dependencies = [
"hashbrown", "hashbrown",
] ]
[[package]]
name = "hdrhistogram"
version = "7.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8"
dependencies = [
"base64 0.13.1",
"byteorder",
"flate2",
"nom",
"num-traits",
]
[[package]] [[package]]
name = "heck" name = "heck"
version = "0.4.1" version = "0.4.1"
@@ -2369,6 +2419,18 @@ dependencies = [
"want", "want",
] ]
[[package]]
name = "hyper-timeout"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
dependencies = [
"hyper",
"pin-project-lite",
"tokio",
"tokio-io-timeout",
]
[[package]] [[package]]
name = "hyper-tls" name = "hyper-tls"
version = "0.5.0" version = "0.5.0"
@@ -3615,6 +3677,38 @@ dependencies = [
"unarray", "unarray",
] ]
[[package]]
name = "prost"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-derive"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "prost-types"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
dependencies = [
"prost",
]
[[package]] [[package]]
name = "qrcodegen" name = "qrcodegen"
version = "1.8.0" version = "1.8.0"
@@ -4858,6 +4952,7 @@ dependencies = [
"signal-hook-registry", "signal-hook-registry",
"socket2", "socket2",
"tokio-macros", "tokio-macros",
"tracing",
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
@@ -4993,6 +5088,34 @@ dependencies = [
"winnow", "winnow",
] ]
[[package]]
name = "tonic"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a"
dependencies = [
"async-trait",
"axum",
"base64 0.21.0",
"bytes",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"tokio",
"tokio-stream",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.4.13" version = "0.4.13"
@@ -5001,9 +5124,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-util", "futures-util",
"indexmap",
"pin-project", "pin-project",
"pin-project-lite", "pin-project-lite",
"rand 0.8.5",
"slab",
"tokio", "tokio",
"tokio-util",
"tower-layer", "tower-layer",
"tower-service", "tower-service",
"tracing", "tracing",

View File

@@ -84,7 +84,7 @@ strum_macros = "0.24"
tagger = "4.3.4" tagger = "4.3.4"
textwrap = "0.16.0" textwrap = "0.16.0"
thiserror = "1" thiserror = "1"
tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros"] } tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros", "tracing"] }
tokio-io-timeout = "1.2.0" tokio-io-timeout = "1.2.0"
tokio-stream = { version = "0.1.14", features = ["fs"] } tokio-stream = { version = "0.1.14", features = ["fs"] }
tokio-tar = { version = "0.3" } # TODO: integrate tokio into async-tar tokio-tar = { version = "0.3" } # TODO: integrate tokio into async-tar

View File

@@ -7,6 +7,7 @@ edition = "2021"
[dependencies] [dependencies]
ansi_term = "0.12.1" ansi_term = "0.12.1"
anyhow = "1" anyhow = "1"
console-subscriber = "0.1.9"
deltachat = { path = "..", features = ["internals"]} deltachat = { path = "..", features = ["internals"]}
dirs = "5" dirs = "5"
log = "0.4.16" log = "0.4.16"

View File

@@ -315,11 +315,13 @@ async fn start(args: Vec<String>) -> Result<(), Error> {
let context = Context::new(Path::new(&args[1]), 0, Events::new(), StockStrings::new()).await?; let context = Context::new(Path::new(&args[1]), 0, Events::new(), StockStrings::new()).await?;
let events = context.get_event_emitter(); let events = context.get_event_emitter();
tokio::task::spawn(async move { tokio::task::Builder::new()
while let Some(event) = events.recv().await { .name("repl:receive_event")
receive_event(event.typ); .spawn(async move {
} while let Some(event) = events.recv().await {
}); receive_event(event.typ);
}
})?;
println!("Delta Chat Core is awaiting your commands."); println!("Delta Chat Core is awaiting your commands.");
@@ -331,61 +333,63 @@ async fn start(args: Vec<String>) -> Result<(), Error> {
let mut selected_chat = ChatId::default(); let mut selected_chat = ChatId::default();
let ctx = context.clone(); let ctx = context.clone();
let input_loop = tokio::task::spawn_blocking(move || { let input_loop = tokio::task::Builder::new()
let h = DcHelper { .name("repl:input_loop")
completer: FilenameCompleter::new(), .spawn_blocking(move || {
highlighter: MatchingBracketHighlighter::new(), let h = DcHelper {
hinter: HistoryHinter {}, completer: FilenameCompleter::new(),
}; highlighter: MatchingBracketHighlighter::new(),
let mut rl = Editor::with_config(config)?; hinter: HistoryHinter {},
rl.set_helper(Some(h)); };
rl.bind_sequence(KeyEvent::alt('N'), Cmd::HistorySearchForward); let mut rl = Editor::with_config(config)?;
rl.bind_sequence(KeyEvent::alt('P'), Cmd::HistorySearchBackward); rl.set_helper(Some(h));
if rl.load_history(".dc-history.txt").is_err() { rl.bind_sequence(KeyEvent::alt('N'), Cmd::HistorySearchForward);
println!("No previous history."); rl.bind_sequence(KeyEvent::alt('P'), Cmd::HistorySearchBackward);
} if rl.load_history(".dc-history.txt").is_err() {
println!("No previous history.");
}
loop { loop {
let p = "> "; let p = "> ";
let readline = rl.readline(p); let readline = rl.readline(p);
match readline { match readline {
Ok(line) => { Ok(line) => {
// TODO: ignore "set mail_pw" // TODO: ignore "set mail_pw"
rl.add_history_entry(line.as_str())?; rl.add_history_entry(line.as_str())?;
let should_continue = Handle::current().block_on(async { let should_continue = Handle::current().block_on(async {
match handle_cmd(line.trim(), ctx.clone(), &mut selected_chat).await { match handle_cmd(line.trim(), ctx.clone(), &mut selected_chat).await {
Ok(ExitResult::Continue) => true, Ok(ExitResult::Continue) => true,
Ok(ExitResult::Exit) => { Ok(ExitResult::Exit) => {
println!("Exiting ..."); println!("Exiting ...");
false false
} }
Err(err) => { Err(err) => {
println!("Error: {err:#}"); println!("Error: {err:#}");
true true
}
} }
});
if !should_continue {
break;
} }
}); }
Err(ReadlineError::Interrupted) | Err(ReadlineError::Eof) => {
if !should_continue { println!("Exiting...");
break;
}
Err(err) => {
println!("Error: {err:#}");
break; break;
} }
} }
Err(ReadlineError::Interrupted) | Err(ReadlineError::Eof) => {
println!("Exiting...");
break;
}
Err(err) => {
println!("Error: {err:#}");
break;
}
} }
}
rl.save_history(".dc-history.txt")?; rl.save_history(".dc-history.txt")?;
println!("history saved"); println!("history saved");
Ok::<_, Error>(()) Ok::<_, Error>(())
}); })?;
context.stop_io().await; context.stop_io().await;
input_loop.await??; input_loop.await??;
@@ -481,6 +485,7 @@ async fn handle_cmd(
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Error> { async fn main() -> Result<(), Error> {
console_subscriber::init();
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let args = std::env::args().collect(); let args = std::env::args().collect();

View File

@@ -181,7 +181,9 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> {
let socks5_enabled = socks5_config.is_some(); let socks5_enabled = socks5_config.is_some();
let ctx2 = ctx.clone(); let ctx2 = ctx.clone();
let update_device_chats_handle = task::spawn(async move { ctx2.update_device_chats().await }); let update_device_chats_handle = tokio::task::Builder::new()
.name("update_device_chats")
.spawn(async move { ctx2.update_device_chats().await })?;
// Step 1: Load the parameters and check email-address and password // Step 1: Load the parameters and check email-address and password
@@ -350,44 +352,46 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> {
.provider .provider
.map_or(socks5_config.is_some(), |provider| provider.opt.strict_tls); .map_or(socks5_config.is_some(), |provider| provider.opt.strict_tls);
let smtp_config_task = task::spawn(async move { let smtp_config_task = tokio::task::Builder::new()
let mut smtp_configured = false; .name("smtp_config")
let mut errors = Vec::new(); .spawn(async move {
for smtp_server in smtp_servers { let mut smtp_configured = false;
smtp_param.user = smtp_server.username.clone(); let mut errors = Vec::new();
smtp_param.server = smtp_server.hostname.clone(); for smtp_server in smtp_servers {
smtp_param.port = smtp_server.port; smtp_param.user = smtp_server.username.clone();
smtp_param.security = smtp_server.socket; smtp_param.server = smtp_server.hostname.clone();
smtp_param.certificate_checks = match smtp_server.strict_tls { smtp_param.port = smtp_server.port;
Some(true) => CertificateChecks::Strict, smtp_param.security = smtp_server.socket;
Some(false) => CertificateChecks::AcceptInvalidCertificates, smtp_param.certificate_checks = match smtp_server.strict_tls {
None => CertificateChecks::Automatic, Some(true) => CertificateChecks::Strict,
}; Some(false) => CertificateChecks::AcceptInvalidCertificates,
None => CertificateChecks::Automatic,
};
match try_smtp_one_param( match try_smtp_one_param(
&context_smtp, &context_smtp,
&smtp_param, &smtp_param,
&socks5_config, &socks5_config,
&smtp_addr, &smtp_addr,
provider_strict_tls, provider_strict_tls,
&mut smtp, &mut smtp,
) )
.await .await
{ {
Ok(_) => { Ok(_) => {
smtp_configured = true; smtp_configured = true;
break; break;
}
Err(e) => errors.push(e),
} }
Err(e) => errors.push(e),
} }
}
if smtp_configured { if smtp_configured {
Ok(smtp_param) Ok(smtp_param)
} else { } else {
Err(errors) Err(errors)
} }
}); })?;
progress!(ctx, 600); progress!(ctx, 600);

View File

@@ -1555,7 +1555,10 @@ impl RecentlySeenLoop {
pub(crate) fn new(context: Context) -> Self { pub(crate) fn new(context: Context) -> Self {
let (interrupt_send, interrupt_recv) = channel::bounded(1); let (interrupt_send, interrupt_recv) = channel::bounded(1);
let handle = task::spawn(Self::run(context, interrupt_recv)); let handle = tokio::task::Builder::new()
.name("recently_seen")
.spawn(Self::run(context, interrupt_recv))
.expect("failed to spawn recently_seen task");
Self { Self {
handle, handle,
interrupt_send, interrupt_send,

View File

@@ -143,9 +143,11 @@ pub(crate) async fn set_debug_logging_xdc(ctx: &Context, id: Option<MsgId>) -> a
let (sender, debug_logging_recv) = channel::bounded(1000); let (sender, debug_logging_recv) = channel::bounded(1000);
let loop_handle = { let loop_handle = {
let ctx = ctx.clone(); let ctx = ctx.clone();
task::spawn(async move { tokio::task::Builder::new()
debug_logging_loop(&ctx, debug_logging_recv).await .name("debug_logging")
}) .spawn(async move {
debug_logging_loop(&ctx, debug_logging_recv).await
})?
}; };
*debug_logging = Some(DebugLogging { *debug_logging = Some(DebugLogging {
msg_id, msg_id,

View File

@@ -364,17 +364,19 @@ pub async fn symm_encrypt(passphrase: &str, plain: &[u8]) -> Result<String> {
let lit_msg = Message::new_literal_bytes("", plain); let lit_msg = Message::new_literal_bytes("", plain);
let passphrase = passphrase.to_string(); let passphrase = passphrase.to_string();
tokio::task::spawn_blocking(move || { tokio::task::Builder::new()
let mut rng = thread_rng(); .name("symm_encrypt")
let s2k = StringToKey::new_default(&mut rng); .spawn_blocking(move || {
let msg = let mut rng = thread_rng();
lit_msg.encrypt_with_password(&mut rng, s2k, Default::default(), || passphrase)?; let s2k = StringToKey::new_default(&mut rng);
let msg =
lit_msg.encrypt_with_password(&mut rng, s2k, Default::default(), || passphrase)?;
let encoded_msg = msg.to_armored_string(None)?; let encoded_msg = msg.to_armored_string(None)?;
Ok(encoded_msg) Ok(encoded_msg)
}) })?
.await? .await?
} }
/// Symmetric decryption. /// Symmetric decryption.
@@ -385,20 +387,22 @@ pub async fn symm_decrypt<T: std::io::Read + std::io::Seek>(
let (enc_msg, _) = Message::from_armor_single(ctext)?; let (enc_msg, _) = Message::from_armor_single(ctext)?;
let passphrase = passphrase.to_string(); let passphrase = passphrase.to_string();
tokio::task::spawn_blocking(move || { tokio::task::Builder::new()
let decryptor = enc_msg.decrypt_with_password(|| passphrase)?; .name("symm_decrypt")
.spawn_blocking(move || {
let decryptor = enc_msg.decrypt_with_password(|| passphrase)?;
let msgs = decryptor.collect::<pgp::errors::Result<Vec<_>>>()?; let msgs = decryptor.collect::<pgp::errors::Result<Vec<_>>>()?;
if let Some(msg) = msgs.first() { if let Some(msg) = msgs.first() {
match msg.get_content()? { match msg.get_content()? {
Some(content) => Ok(content), Some(content) => Ok(content),
None => bail!("Decrypted message is empty"), None => bail!("Decrypted message is empty"),
}
} else {
bail!("No valid messages found")
} }
} else { })?
bail!("No valid messages found") .await?
}
})
.await?
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -740,7 +740,9 @@ impl Scheduler {
let (inbox_start_send, inbox_start_recv) = oneshot::channel(); let (inbox_start_send, inbox_start_recv) = oneshot::channel();
let handle = { let handle = {
let ctx = ctx.clone(); let ctx = ctx.clone();
task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers)) tokio::task::Builder::new()
.name("inbox_loop")
.spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))?
}; };
let inbox = SchedBox { let inbox = SchedBox {
meaning: FolderMeaning::Inbox, meaning: FolderMeaning::Inbox,
@@ -760,7 +762,9 @@ impl Scheduler {
let (conn_state, handlers) = ImapConnectionState::new(&ctx).await?; let (conn_state, handlers) = ImapConnectionState::new(&ctx).await?;
let (start_send, start_recv) = oneshot::channel(); let (start_send, start_recv) = oneshot::channel();
let ctx = ctx.clone(); let ctx = ctx.clone();
let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning)); let handle = tokio::task::Builder::new()
.name("simple_imap_loop")
.spawn(simple_imap_loop(ctx, start_send, handlers, meaning))?;
oboxes.push(SchedBox { oboxes.push(SchedBox {
meaning, meaning,
conn_state, conn_state,
@@ -772,22 +776,28 @@ impl Scheduler {
let smtp_handle = { let smtp_handle = {
let ctx = ctx.clone(); let ctx = ctx.clone();
task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers)) tokio::task::Builder::new()
.name("smtp_loop")
.spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))?
}; };
start_recvs.push(smtp_start_recv); start_recvs.push(smtp_start_recv);
let ephemeral_handle = { let ephemeral_handle = {
let ctx = ctx.clone(); let ctx = ctx.clone();
task::spawn(async move { tokio::task::Builder::new()
ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await; .name("ephemeral")
}) .spawn(async move {
ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
})?
}; };
let location_handle = { let location_handle = {
let ctx = ctx.clone(); let ctx = ctx.clone();
task::spawn(async move { tokio::task::Builder::new()
location::location_loop(&ctx, location_interrupt_recv).await; .name("location")
}) .spawn(async move {
location::location_loop(&ctx, location_interrupt_recv).await;
})?
}; };
let recently_seen_loop = RecentlySeenLoop::new(ctx.clone()); let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());

View File

@@ -8,7 +8,6 @@ use anyhow::{bail, format_err, Context as _, Error, Result};
use async_smtp::response::{Category, Code, Detail}; use async_smtp::response::{Category, Code, Detail};
use async_smtp::{self as smtp, EmailAddress, SmtpTransport}; use async_smtp::{self as smtp, EmailAddress, SmtpTransport};
use tokio::io::BufStream; use tokio::io::BufStream;
use tokio::task;
use crate::config::Config; use crate::config::Config;
use crate::contact::{Contact, ContactId}; use crate::contact::{Contact, ContactId};
@@ -60,7 +59,10 @@ impl Smtp {
// Closing connection with a QUIT command may take some time, especially if it's a // Closing connection with a QUIT command may take some time, especially if it's a
// stale connection and an attempt to send the command times out. Send a command in a // stale connection and an attempt to send the command times out. Send a command in a
// separate task to avoid waiting for reply or timeout. // separate task to avoid waiting for reply or timeout.
task::spawn(async move { transport.quit().await }); tokio::task::Builder::new()
.name("smtp_disconnect")
.spawn(async move { transport.quit().await })
.expect("failed to spawn smtp_disconnect task");
} }
self.last_success = None; self.last_success = None;
} }