diff --git a/deltachat-jsonrpc/src/api.rs b/deltachat-jsonrpc/src/api.rs index 9499b9206..4f93fa734 100644 --- a/deltachat-jsonrpc/src/api.rs +++ b/deltachat-jsonrpc/src/api.rs @@ -18,7 +18,6 @@ use deltachat::constants::DC_MSG_ID_DAYMARKER; use deltachat::contact::{may_be_valid_addr, Contact, ContactId, Origin}; use deltachat::context::get_info; use deltachat::ephemeral::Timer; -use deltachat::location; use deltachat::message::get_msg_read_receipts; use deltachat::message::{ self, delete_msgs, markseen_msgs, Message, MessageState, MsgId, Viewtype, @@ -35,6 +34,7 @@ use deltachat::stock_str::StockMessage; use deltachat::webxdc::StatusUpdateSerial; use deltachat::EventEmitter; use deltachat::{imex, info}; +use deltachat::{location, spawn_named_task}; use sanitize_filename::is_sanitized; use tokio::fs; use tokio::sync::{watch, Mutex, RwLock}; @@ -1777,7 +1777,7 @@ impl CommandApi { let ctx = self.get_context(account_id).await?; let fut = send_webxdc_realtime_advertisement(&ctx, MsgId::new(instance_msg_id)).await?; if let Some(fut) = fut { - tokio::spawn(async move { + spawn_named_task!("send_webxdc_realtime_advertisement", async move { fut.await.ok(); info!(ctx, "send_webxdc_realtime_advertisement done") }); diff --git a/src/accounts.rs b/src/accounts.rs index 663513ddd..3b42e80f6 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -400,36 +400,39 @@ impl Config { #[cfg(not(target_os = "ios"))] async fn create_lock_task(dir: PathBuf) -> Result>>> { + use crate::spawn_named_task; + let lockfile = dir.join(LOCKFILE_NAME); let mut lock = fd_lock::RwLock::new(fs::File::create(lockfile).await?); let (locked_tx, locked_rx) = oneshot::channel(); - let lock_task: JoinHandle> = tokio::spawn(async move { - let mut timeout = Duration::from_millis(100); - let _guard = loop { - match lock.try_write() { - Ok(guard) => break Ok(guard), - Err(err) => { - if timeout.as_millis() > 1600 { - break Err(err); - } - // We need to wait for the previous lock_task to be aborted thus unlocking - // the lockfile. We don't open configs for writing often outside of the - // tests, so this adds delays to the tests, but otherwise ok. - sleep(timeout).await; - if err.kind() == std::io::ErrorKind::WouldBlock { - timeout *= 2; + let lock_task: JoinHandle> = + spawn_named_task!("lock_task", async move { + let mut timeout = Duration::from_millis(100); + let _guard = loop { + match lock.try_write() { + Ok(guard) => break Ok(guard), + Err(err) => { + if timeout.as_millis() > 1600 { + break Err(err); + } + // We need to wait for the previous lock_task to be aborted thus unlocking + // the lockfile. We don't open configs for writing often outside of the + // tests, so this adds delays to the tests, but otherwise ok. + sleep(timeout).await; + if err.kind() == std::io::ErrorKind::WouldBlock { + timeout *= 2; + } } } - } - }?; - locked_tx - .send(()) - .ok() - .context("Cannot notify about lockfile locking")?; - let (_tx, rx) = oneshot::channel(); - rx.await?; - Ok(()) - }); + }?; + locked_tx + .send(()) + .ok() + .context("Cannot notify about lockfile locking")?; + let (_tx, rx) = oneshot::channel(); + rx.await?; + Ok(()) + }); locked_rx.await?; Ok(Some(lock_task)) } diff --git a/src/chat.rs b/src/chat.rs index b168dafeb..6e97443c9 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -12,12 +12,11 @@ use deltachat_contact_tools::{sanitize_bidi_characters, sanitize_single_line, Co use deltachat_derive::{FromSql, ToSql}; use serde::{Deserialize, Serialize}; use strum_macros::EnumIter; -use tokio::task; + use crate::aheader::EncryptPreference; use crate::blob::BlobObject; use crate::chatlist::Chatlist; -use crate::chatlist_events; use crate::color::str_to_color; use crate::config::Config; use crate::constants::{ @@ -50,6 +49,7 @@ use crate::tools::{ truncate_msg_text, IsNoneOrEmpty, SystemTime, }; use crate::webxdc::StatusUpdateSerial; +use crate::{chatlist_events, spawn_named_task}; /// An chat item, such as a message or a marker. #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -1476,7 +1476,7 @@ impl ChatId { /// and otherwise notifying the user accordingly. pub(crate) fn spawn_securejoin_wait(self, context: &Context, timeout: u64) { let context = context.clone(); - task::spawn(async move { + spawn_named_task!("securejoin_wait", async move { tokio::time::sleep(Duration::from_secs(timeout)).await; let chat = Chat::load_from_db(&context, self).await?; chat.check_securejoin_wait(&context, 0).await?; diff --git a/src/configure.rs b/src/configure.rs index a46338d9c..72058a692 100644 --- a/src/configure.rs +++ b/src/configure.rs @@ -21,7 +21,6 @@ use futures::FutureExt; use futures_lite::FutureExt as _; use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; use server_params::{expand_param_vector, ServerParams}; -use tokio::task; use crate::config::{self, Config}; use crate::context::Context; @@ -35,10 +34,10 @@ use crate::message::{Message, Viewtype}; use crate::oauth2::get_oauth2_addr; use crate::provider::{Protocol, Socket, UsernamePattern}; use crate::smtp::Smtp; -use crate::stock_str; use crate::sync::Sync::*; use crate::tools::time; use crate::{chat, e2ee, provider}; +use crate::{spawn_named_task, stock_str}; use deltachat_contact_tools::addr_cmp; macro_rules! progress { @@ -372,7 +371,9 @@ async fn configure(ctx: &Context, param: &EnteredLoginParam) -> Result Result Self { let (interrupt_send, interrupt_recv) = channel::bounded(1); - let handle = task::spawn(Self::run(context, interrupt_recv)); + let handle = spawn_named_task!("recently_seen_loop", Self::run(context, interrupt_recv)); Self { handle, interrupt_send, diff --git a/src/debug_logging.rs b/src/debug_logging.rs index 766a39675..a895c8a26 100644 --- a/src/debug_logging.rs +++ b/src/debug_logging.rs @@ -5,6 +5,7 @@ use crate::context::Context; use crate::events::EventType; use crate::message::{Message, MsgId, Viewtype}; use crate::param::Param; +use crate::spawn_named_task; use crate::tools::time; use crate::webxdc::StatusUpdateItem; use async_channel::{self as channel, Receiver, Sender}; @@ -150,7 +151,7 @@ pub(crate) async fn set_debug_logging_xdc(ctx: &Context, id: Option) -> a let (sender, debug_logging_recv) = channel::bounded(1000); let loop_handle = { let ctx = ctx.clone(); - task::spawn(async move { + spawn_named_task!("debug_logging_loop", async move { debug_logging_loop(&ctx, debug_logging_recv).await }) }; diff --git a/src/imap.rs b/src/imap.rs index 0a74b8b6b..dac8be591 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -24,7 +24,6 @@ use rand::Rng; use ratelimit::Ratelimit; use url::Url; -use crate::chat::{self, ChatId, ChatIdBlocked}; use crate::chatlist_events; use crate::config::Config; use crate::constants::{self, Blocked, Chattype, ShowEmails}; @@ -48,6 +47,10 @@ use crate::scheduler::connectivity::ConnectivityStore; use crate::sql; use crate::stock_str; use crate::tools::{self, create_id, duration_to_str}; +use crate::{ + chat::{self, ChatId, ChatIdBlocked}, + spawn_named_task, +}; pub(crate) mod capabilities; mod client; @@ -1556,7 +1559,9 @@ impl Session { } else if !context.push_subscriber.heartbeat_subscribed().await { let context = context.clone(); // Subscribe for heartbeat notifications. - tokio::spawn(async move { context.push_subscriber.subscribe(&context).await }); + spawn_named_task!("subscribe_to_heartbeat_notifications", async move { + context.push_subscriber.subscribe(&context).await + }); } Ok(()) diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index d280dd8f8..ad81421ea 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -46,7 +46,7 @@ use crate::message::{Message, Viewtype}; use crate::qr::Qr; use crate::stock_str::backup_transfer_msg_body; use crate::tools::{create_id, time, TempPathGuard}; -use crate::EventType; +use crate::{spawn_named_task, EventType}; use super::{export_backup_stream, export_database, import_backup_stream, DBFILE_BACKUP_NAME}; @@ -132,7 +132,7 @@ impl BackupProvider { let drop_token = drop_token.clone(); let endpoint = endpoint.clone(); let auth_token = auth_token.clone(); - tokio::spawn(async move { + spawn_named_task!("accept_loop", async move { Self::accept_loop( context.clone(), endpoint, diff --git a/src/net/http.rs b/src/net/http.rs index 94e85f68c..2c83a8e32 100644 --- a/src/net/http.rs +++ b/src/net/http.rs @@ -11,6 +11,7 @@ use crate::context::Context; use crate::net::proxy::ProxyConfig; use crate::net::session::SessionStream; use crate::net::tls::wrap_rustls; +use crate::spawn_named_task; /// HTTP(S) GET response. #[derive(Debug)] @@ -85,7 +86,7 @@ where let io = TokioIo::new(stream); let (sender, conn) = hyper::client::conn::http1::handshake(io).await?; - tokio::task::spawn(conn); + spawn_named_task!("http_connection", conn); Ok(sender) } diff --git a/src/peer_channels.rs b/src/peer_channels.rs index 614e1bf46..043130ef0 100644 --- a/src/peer_channels.rs +++ b/src/peer_channels.rs @@ -45,7 +45,7 @@ use crate::context::Context; use crate::headerdef::HeaderDef; use crate::message::{Message, MsgId, Viewtype}; use crate::mimeparser::SystemMessage; -use crate::EventType; +use crate::{spawn_named_task, EventType}; /// The length of an ed25519 `PublicKey`, in bytes. const PUBLIC_KEY_LENGTH: usize = 32; @@ -125,7 +125,7 @@ impl Iroh { .split(); let ctx = ctx.clone(); - let subscribe_loop = tokio::spawn(async move { + let subscribe_loop = spawn_named_task!("subscribe_loop", async move { if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await { warn!(ctx, "subscribe_loop failed: {e}") } @@ -264,7 +264,10 @@ impl Context { let context = self.clone(); // Shuts down on deltachat shutdown - tokio::spawn(endpoint_loop(context, endpoint.clone(), gossip.clone())); + spawn_named_task!( + "endpoint_loop", + endpoint_loop(context, endpoint.clone(), gossip.clone()) + ); Ok(Iroh { endpoint, @@ -442,7 +445,7 @@ async fn endpoint_loop(context: Context, endpoint: Endpoint, gossip: Gossip) { info!(context, "IROH_REALTIME: accepting iroh connection"); let gossip = gossip.clone(); let context = context.clone(); - tokio::spawn(async move { + spawn_named_task!("handle_connection", async move { if let Err(err) = handle_connection(&context, conn, gossip).await { warn!(context, "IROH_REALTIME: iroh connection error: {err}"); } diff --git a/src/scheduler.rs b/src/scheduler.rs index a901c7c97..36e55f351 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -19,12 +19,12 @@ use crate::download::{download_msg, DownloadState}; use crate::ephemeral::{self, delete_expired_imap_messages}; use crate::events::EventType; use crate::imap::{session::Session, FolderMeaning, Imap}; -use crate::location; use crate::log::LogExt; use crate::message::MsgId; use crate::smtp::{send_smtp_messages, Smtp}; use crate::sql; use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed}; +use crate::{location, spawn_named_task}; pub(crate) mod connectivity; @@ -164,7 +164,7 @@ impl SchedulerState { } let (tx, rx) = oneshot::channel(); - tokio::spawn(async move { + spawn_named_task!("pause", async move { rx.await.ok(); let mut inner = context.scheduler.inner.write().await; match *inner { @@ -849,7 +849,10 @@ impl Scheduler { let (inbox_start_send, inbox_start_recv) = oneshot::channel(); let handle = { let ctx = ctx.clone(); - task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers)) + spawn_named_task!( + "inbox_loop", + inbox_loop(ctx, inbox_start_send, inbox_handlers) + ) }; let inbox = SchedBox { meaning: FolderMeaning::Inbox, @@ -866,7 +869,10 @@ impl Scheduler { let (conn_state, handlers) = ImapConnectionState::new(ctx).await?; let (start_send, start_recv) = oneshot::channel(); let ctx = ctx.clone(); - let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning)); + let handle = spawn_named_task!( + "simple_imap_loop", + simple_imap_loop(ctx, start_send, handlers, meaning) + ); oboxes.push(SchedBox { meaning, conn_state, @@ -878,20 +884,20 @@ impl Scheduler { let smtp_handle = { let ctx = ctx.clone(); - task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers)) + spawn_named_task!("smtp_loop", smtp_loop(ctx, smtp_start_send, smtp_handlers)) }; start_recvs.push(smtp_start_recv); let ephemeral_handle = { let ctx = ctx.clone(); - task::spawn(async move { + spawn_named_task!("ephemeral_loop", async move { ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await; }) }; let location_handle = { let ctx = ctx.clone(); - task::spawn(async move { + spawn_named_task!("location_loop", async move { location::location_loop(&ctx, location_interrupt_recv).await; }) }; diff --git a/src/smtp.rs b/src/smtp.rs index 9d73501cc..112043566 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -6,7 +6,7 @@ pub mod send; use anyhow::{bail, format_err, Context as _, Error, Result}; use async_smtp::response::{Category, Code, Detail}; use async_smtp::{EmailAddress, SmtpTransport}; -use tokio::task; + use crate::chat::{add_info_msg_with_cmd, ChatId}; use crate::config::Config; @@ -21,9 +21,9 @@ use crate::mimefactory::MimeFactory; use crate::net::proxy::ProxyConfig; use crate::net::session::SessionBufStream; use crate::scheduler::connectivity::ConnectivityStore; -use crate::sql; use crate::stock_str::unencrypted_email; use crate::tools::{self, time_elapsed}; +use crate::{spawn_named_task, sql}; #[derive(Default)] pub(crate) struct Smtp { @@ -56,7 +56,7 @@ impl Smtp { // Closing connection with a QUIT command may take some time, especially if it's a // stale connection and an attempt to send the command times out. Send a command in a // separate task to avoid waiting for reply or timeout. - task::spawn(async move { transport.quit().await }); + spawn_named_task!("disconnect SMTP", async move { transport.quit().await }); } self.last_success = None; } diff --git a/src/test_utils.rs b/src/test_utils.rs index 507529889..c9fec9150 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -41,6 +41,7 @@ use crate::peerstate::Peerstate; use crate::pgp::KeyPair; use crate::receive_imf::receive_imf; use crate::securejoin::{get_securejoin_qr, join_securejoin}; +use crate::spawn_named_task; use crate::stock_str::StockStrings; #[allow(non_upper_case_globals)] @@ -969,7 +970,7 @@ impl InnerLogSink { /// Subscribes this log sink to event emitter. pub fn subscribe(&self, event_emitter: EventEmitter) { let sender = self.sender.clone(); - task::spawn(async move { + spawn_named_task!("InnerLogSink", async move { while let Some(event) = event_emitter.recv().await { sender.try_send(LogEvent::Event(event.clone())).ok(); }