name spawned tasks in core

This commit is contained in:
Simon Laux
2024-10-03 09:46:05 +02:00
parent f37fb9574d
commit 308053dc44
13 changed files with 78 additions and 57 deletions

View File

@@ -18,7 +18,6 @@ use deltachat::constants::DC_MSG_ID_DAYMARKER;
use deltachat::contact::{may_be_valid_addr, Contact, ContactId, Origin};
use deltachat::context::get_info;
use deltachat::ephemeral::Timer;
use deltachat::location;
use deltachat::message::get_msg_read_receipts;
use deltachat::message::{
self, delete_msgs, markseen_msgs, Message, MessageState, MsgId, Viewtype,
@@ -35,6 +34,7 @@ use deltachat::stock_str::StockMessage;
use deltachat::webxdc::StatusUpdateSerial;
use deltachat::EventEmitter;
use deltachat::{imex, info};
use deltachat::{location, spawn_named_task};
use sanitize_filename::is_sanitized;
use tokio::fs;
use tokio::sync::{watch, Mutex, RwLock};
@@ -1777,7 +1777,7 @@ impl CommandApi {
let ctx = self.get_context(account_id).await?;
let fut = send_webxdc_realtime_advertisement(&ctx, MsgId::new(instance_msg_id)).await?;
if let Some(fut) = fut {
tokio::spawn(async move {
spawn_named_task!("send_webxdc_realtime_advertisement", async move {
fut.await.ok();
info!(ctx, "send_webxdc_realtime_advertisement done")
});

View File

@@ -400,36 +400,39 @@ impl Config {
#[cfg(not(target_os = "ios"))]
async fn create_lock_task(dir: PathBuf) -> Result<Option<JoinHandle<anyhow::Result<()>>>> {
use crate::spawn_named_task;
let lockfile = dir.join(LOCKFILE_NAME);
let mut lock = fd_lock::RwLock::new(fs::File::create(lockfile).await?);
let (locked_tx, locked_rx) = oneshot::channel();
let lock_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
let mut timeout = Duration::from_millis(100);
let _guard = loop {
match lock.try_write() {
Ok(guard) => break Ok(guard),
Err(err) => {
if timeout.as_millis() > 1600 {
break Err(err);
}
// We need to wait for the previous lock_task to be aborted thus unlocking
// the lockfile. We don't open configs for writing often outside of the
// tests, so this adds delays to the tests, but otherwise ok.
sleep(timeout).await;
if err.kind() == std::io::ErrorKind::WouldBlock {
timeout *= 2;
let lock_task: JoinHandle<anyhow::Result<()>> =
spawn_named_task!("lock_task", async move {
let mut timeout = Duration::from_millis(100);
let _guard = loop {
match lock.try_write() {
Ok(guard) => break Ok(guard),
Err(err) => {
if timeout.as_millis() > 1600 {
break Err(err);
}
// We need to wait for the previous lock_task to be aborted thus unlocking
// the lockfile. We don't open configs for writing often outside of the
// tests, so this adds delays to the tests, but otherwise ok.
sleep(timeout).await;
if err.kind() == std::io::ErrorKind::WouldBlock {
timeout *= 2;
}
}
}
}
}?;
locked_tx
.send(())
.ok()
.context("Cannot notify about lockfile locking")?;
let (_tx, rx) = oneshot::channel();
rx.await?;
Ok(())
});
}?;
locked_tx
.send(())
.ok()
.context("Cannot notify about lockfile locking")?;
let (_tx, rx) = oneshot::channel();
rx.await?;
Ok(())
});
locked_rx.await?;
Ok(Some(lock_task))
}

View File

@@ -12,12 +12,11 @@ use deltachat_contact_tools::{sanitize_bidi_characters, sanitize_single_line, Co
use deltachat_derive::{FromSql, ToSql};
use serde::{Deserialize, Serialize};
use strum_macros::EnumIter;
use tokio::task;
use crate::aheader::EncryptPreference;
use crate::blob::BlobObject;
use crate::chatlist::Chatlist;
use crate::chatlist_events;
use crate::color::str_to_color;
use crate::config::Config;
use crate::constants::{
@@ -50,6 +49,7 @@ use crate::tools::{
truncate_msg_text, IsNoneOrEmpty, SystemTime,
};
use crate::webxdc::StatusUpdateSerial;
use crate::{chatlist_events, spawn_named_task};
/// An chat item, such as a message or a marker.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
@@ -1476,7 +1476,7 @@ impl ChatId {
/// and otherwise notifying the user accordingly.
pub(crate) fn spawn_securejoin_wait(self, context: &Context, timeout: u64) {
let context = context.clone();
task::spawn(async move {
spawn_named_task!("securejoin_wait", async move {
tokio::time::sleep(Duration::from_secs(timeout)).await;
let chat = Chat::load_from_db(&context, self).await?;
chat.check_securejoin_wait(&context, 0).await?;

View File

@@ -21,7 +21,6 @@ use futures::FutureExt;
use futures_lite::FutureExt as _;
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
use server_params::{expand_param_vector, ServerParams};
use tokio::task;
use crate::config::{self, Config};
use crate::context::Context;
@@ -35,10 +34,10 @@ use crate::message::{Message, Viewtype};
use crate::oauth2::get_oauth2_addr;
use crate::provider::{Protocol, Socket, UsernamePattern};
use crate::smtp::Smtp;
use crate::stock_str;
use crate::sync::Sync::*;
use crate::tools::time;
use crate::{chat, e2ee, provider};
use crate::{spawn_named_task, stock_str};
use deltachat_contact_tools::addr_cmp;
macro_rules! progress {
@@ -372,7 +371,9 @@ async fn configure(ctx: &Context, param: &EnteredLoginParam) -> Result<Configure
progress!(ctx, 1);
let ctx2 = ctx.clone();
let update_device_chats_handle = task::spawn(async move { ctx2.update_device_chats().await });
let update_device_chats_handle = spawn_named_task!("update_device_chats", async move {
ctx2.update_device_chats().await
});
let configured_param = get_configured_param(ctx, param).await?;
let strict_tls = configured_param.strict_tls();
@@ -387,7 +388,7 @@ async fn configure(ctx: &Context, param: &EnteredLoginParam) -> Result<Configure
let smtp_addr = configured_param.addr.clone();
let proxy_config = configured_param.proxy_config.clone();
let smtp_config_task = task::spawn(async move {
let smtp_config_task = spawn_named_task!("smtp_config", async move {
let mut smtp = Smtp::new();
smtp.connect(
&context_smtp,

View File

@@ -37,7 +37,7 @@ use crate::peerstate::Peerstate;
use crate::sql::{self, params_iter};
use crate::sync::{self, Sync::*};
use crate::tools::{duration_to_str, get_abs_path, smeared_time, time, SystemTime};
use crate::{chat, chatlist_events, stock_str};
use crate::{chat, chatlist_events, spawn_named_task, stock_str};
/// Time during which a contact is considered as seen recently.
const SEEN_RECENTLY_SECONDS: i64 = 600;
@@ -1791,7 +1791,7 @@ impl RecentlySeenLoop {
pub(crate) fn new(context: Context) -> Self {
let (interrupt_send, interrupt_recv) = channel::bounded(1);
let handle = task::spawn(Self::run(context, interrupt_recv));
let handle = spawn_named_task!("recently_seen_loop", Self::run(context, interrupt_recv));
Self {
handle,
interrupt_send,

View File

@@ -5,6 +5,7 @@ use crate::context::Context;
use crate::events::EventType;
use crate::message::{Message, MsgId, Viewtype};
use crate::param::Param;
use crate::spawn_named_task;
use crate::tools::time;
use crate::webxdc::StatusUpdateItem;
use async_channel::{self as channel, Receiver, Sender};
@@ -150,7 +151,7 @@ pub(crate) async fn set_debug_logging_xdc(ctx: &Context, id: Option<MsgId>) -> a
let (sender, debug_logging_recv) = channel::bounded(1000);
let loop_handle = {
let ctx = ctx.clone();
task::spawn(async move {
spawn_named_task!("debug_logging_loop", async move {
debug_logging_loop(&ctx, debug_logging_recv).await
})
};

View File

@@ -24,7 +24,6 @@ use rand::Rng;
use ratelimit::Ratelimit;
use url::Url;
use crate::chat::{self, ChatId, ChatIdBlocked};
use crate::chatlist_events;
use crate::config::Config;
use crate::constants::{self, Blocked, Chattype, ShowEmails};
@@ -48,6 +47,10 @@ use crate::scheduler::connectivity::ConnectivityStore;
use crate::sql;
use crate::stock_str;
use crate::tools::{self, create_id, duration_to_str};
use crate::{
chat::{self, ChatId, ChatIdBlocked},
spawn_named_task,
};
pub(crate) mod capabilities;
mod client;
@@ -1556,7 +1559,9 @@ impl Session {
} else if !context.push_subscriber.heartbeat_subscribed().await {
let context = context.clone();
// Subscribe for heartbeat notifications.
tokio::spawn(async move { context.push_subscriber.subscribe(&context).await });
spawn_named_task!("subscribe_to_heartbeat_notifications", async move {
context.push_subscriber.subscribe(&context).await
});
}
Ok(())

View File

@@ -46,7 +46,7 @@ use crate::message::{Message, Viewtype};
use crate::qr::Qr;
use crate::stock_str::backup_transfer_msg_body;
use crate::tools::{create_id, time, TempPathGuard};
use crate::EventType;
use crate::{spawn_named_task, EventType};
use super::{export_backup_stream, export_database, import_backup_stream, DBFILE_BACKUP_NAME};
@@ -132,7 +132,7 @@ impl BackupProvider {
let drop_token = drop_token.clone();
let endpoint = endpoint.clone();
let auth_token = auth_token.clone();
tokio::spawn(async move {
spawn_named_task!("accept_loop", async move {
Self::accept_loop(
context.clone(),
endpoint,

View File

@@ -11,6 +11,7 @@ use crate::context::Context;
use crate::net::proxy::ProxyConfig;
use crate::net::session::SessionStream;
use crate::net::tls::wrap_rustls;
use crate::spawn_named_task;
/// HTTP(S) GET response.
#[derive(Debug)]
@@ -85,7 +86,7 @@ where
let io = TokioIo::new(stream);
let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
tokio::task::spawn(conn);
spawn_named_task!("http_connection", conn);
Ok(sender)
}

View File

@@ -45,7 +45,7 @@ use crate::context::Context;
use crate::headerdef::HeaderDef;
use crate::message::{Message, MsgId, Viewtype};
use crate::mimeparser::SystemMessage;
use crate::EventType;
use crate::{spawn_named_task, EventType};
/// The length of an ed25519 `PublicKey`, in bytes.
const PUBLIC_KEY_LENGTH: usize = 32;
@@ -125,7 +125,7 @@ impl Iroh {
.split();
let ctx = ctx.clone();
let subscribe_loop = tokio::spawn(async move {
let subscribe_loop = spawn_named_task!("subscribe_loop", async move {
if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await {
warn!(ctx, "subscribe_loop failed: {e}")
}
@@ -264,7 +264,10 @@ impl Context {
let context = self.clone();
// Shuts down on deltachat shutdown
tokio::spawn(endpoint_loop(context, endpoint.clone(), gossip.clone()));
spawn_named_task!(
"endpoint_loop",
endpoint_loop(context, endpoint.clone(), gossip.clone())
);
Ok(Iroh {
endpoint,
@@ -442,7 +445,7 @@ async fn endpoint_loop(context: Context, endpoint: Endpoint, gossip: Gossip) {
info!(context, "IROH_REALTIME: accepting iroh connection");
let gossip = gossip.clone();
let context = context.clone();
tokio::spawn(async move {
spawn_named_task!("handle_connection", async move {
if let Err(err) = handle_connection(&context, conn, gossip).await {
warn!(context, "IROH_REALTIME: iroh connection error: {err}");
}

View File

@@ -19,12 +19,12 @@ use crate::download::{download_msg, DownloadState};
use crate::ephemeral::{self, delete_expired_imap_messages};
use crate::events::EventType;
use crate::imap::{session::Session, FolderMeaning, Imap};
use crate::location;
use crate::log::LogExt;
use crate::message::MsgId;
use crate::smtp::{send_smtp_messages, Smtp};
use crate::sql;
use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
use crate::{location, spawn_named_task};
pub(crate) mod connectivity;
@@ -164,7 +164,7 @@ impl SchedulerState {
}
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
spawn_named_task!("pause", async move {
rx.await.ok();
let mut inner = context.scheduler.inner.write().await;
match *inner {
@@ -849,7 +849,10 @@ impl Scheduler {
let (inbox_start_send, inbox_start_recv) = oneshot::channel();
let handle = {
let ctx = ctx.clone();
task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
spawn_named_task!(
"inbox_loop",
inbox_loop(ctx, inbox_start_send, inbox_handlers)
)
};
let inbox = SchedBox {
meaning: FolderMeaning::Inbox,
@@ -866,7 +869,10 @@ impl Scheduler {
let (conn_state, handlers) = ImapConnectionState::new(ctx).await?;
let (start_send, start_recv) = oneshot::channel();
let ctx = ctx.clone();
let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
let handle = spawn_named_task!(
"simple_imap_loop",
simple_imap_loop(ctx, start_send, handlers, meaning)
);
oboxes.push(SchedBox {
meaning,
conn_state,
@@ -878,20 +884,20 @@ impl Scheduler {
let smtp_handle = {
let ctx = ctx.clone();
task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
spawn_named_task!("smtp_loop", smtp_loop(ctx, smtp_start_send, smtp_handlers))
};
start_recvs.push(smtp_start_recv);
let ephemeral_handle = {
let ctx = ctx.clone();
task::spawn(async move {
spawn_named_task!("ephemeral_loop", async move {
ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
})
};
let location_handle = {
let ctx = ctx.clone();
task::spawn(async move {
spawn_named_task!("location_loop", async move {
location::location_loop(&ctx, location_interrupt_recv).await;
})
};

View File

@@ -6,7 +6,7 @@ pub mod send;
use anyhow::{bail, format_err, Context as _, Error, Result};
use async_smtp::response::{Category, Code, Detail};
use async_smtp::{EmailAddress, SmtpTransport};
use tokio::task;
use crate::chat::{add_info_msg_with_cmd, ChatId};
use crate::config::Config;
@@ -21,9 +21,9 @@ use crate::mimefactory::MimeFactory;
use crate::net::proxy::ProxyConfig;
use crate::net::session::SessionBufStream;
use crate::scheduler::connectivity::ConnectivityStore;
use crate::sql;
use crate::stock_str::unencrypted_email;
use crate::tools::{self, time_elapsed};
use crate::{spawn_named_task, sql};
#[derive(Default)]
pub(crate) struct Smtp {
@@ -56,7 +56,7 @@ impl Smtp {
// Closing connection with a QUIT command may take some time, especially if it's a
// stale connection and an attempt to send the command times out. Send a command in a
// separate task to avoid waiting for reply or timeout.
task::spawn(async move { transport.quit().await });
spawn_named_task!("disconnect SMTP", async move { transport.quit().await });
}
self.last_success = None;
}

View File

@@ -41,6 +41,7 @@ use crate::peerstate::Peerstate;
use crate::pgp::KeyPair;
use crate::receive_imf::receive_imf;
use crate::securejoin::{get_securejoin_qr, join_securejoin};
use crate::spawn_named_task;
use crate::stock_str::StockStrings;
#[allow(non_upper_case_globals)]
@@ -969,7 +970,7 @@ impl InnerLogSink {
/// Subscribes this log sink to event emitter.
pub fn subscribe(&self, event_emitter: EventEmitter) {
let sender = self.sender.clone();
task::spawn(async move {
spawn_named_task!("InnerLogSink", async move {
while let Some(event) = event_emitter.recv().await {
sender.try_send(LogEvent::Event(event.clone())).ok();
}