diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 59e7810d7..01fed6012 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -78,13 +78,13 @@ jobs: rust: 1.68.0 python: false # Python bindings compilation on Windows is not supported. - # Minimum Supported Rust Version = 1.63.0 + # Minimum Supported Rust Version = 1.64.0 # # Minimum Supported Python Version = 3.7 # This is the minimum version for which manylinux Python wheels are # built. - os: ubuntu-latest - rust: 1.63.0 + rust: 1.64.0 python: 3.7 runs-on: ${{ matrix.os }} steps: diff --git a/CHANGELOG.md b/CHANGELOG.md index 8aa830594..84d48d7dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ - "full message view" not needed because of footers that go to contact status #4151 - Pick up system's light/dark mode in generated message HTML #4150 - Support non-persistent configuration with DELTACHAT_* env +- Print deltachat-repl errors with causes. #4166 +- Increase MSRV to 1.64. #4167 +- Core takes care of stopping and re-starting IO itself where needed, + e.g. during backup creation. It is no longer needed to call + dc_stop_io(). dc_start_io() can now be called at any time without + harm. #4138 ### Fixes - Fix segmentation fault if `dc_context_unref()` is called during diff --git a/Cargo.lock b/Cargo.lock index 3af389c66..0e4e323fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -244,13 +244,12 @@ dependencies = [ [[package]] name = "async-smtp" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7384febcabdd07a498c9f4fbaa7e488ff4eb60d0ade14b47b09ec44b8f645301" +checksum = "8709c0d4432be428a88a06746689a9cb543e8e27ef7f61ca4d0455003a3d8c5b" dependencies = [ "anyhow", "base64 0.13.1", - "bufstream", "futures", "hostname", "log", @@ -503,12 +502,6 @@ dependencies = [ "safemem", ] -[[package]] -name = "bufstream" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40e38929add23cdf8a366df9b0e088953150724bcbe5fc330b0d8eb3b328eec8" - [[package]] name = "bumpalo" version = "3.12.0" diff --git a/Cargo.toml b/Cargo.toml index 4d76bc686..b6ca6e84a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "deltachat" version = "1.111.0" edition = "2021" license = "MPL-2.0" -rust-version = "1.63" +rust-version = "1.64" [profile.dev] debug = 0 @@ -38,7 +38,7 @@ anyhow = "1" async-channel = "1.8.0" async-imap = { git = "https://github.com/async-email/async-imap", branch = "master", default-features = false, features = ["runtime-tokio"] } async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] } -async-smtp = { version = "0.8", default-features = false, features = ["runtime-tokio"] } +async-smtp = { version = "0.9", default-features = false, features = ["runtime-tokio"] } async_zip = { version = "0.0.9", default-features = false, features = ["deflate"] } backtrace = "0.3" base64 = "0.21" diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index f20ea9b66..1484ab33b 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -2101,8 +2101,7 @@ dc_contact_t* dc_get_contact (dc_context_t* context, uint32_t co /** * Import/export things. - * During backup import/export IO must not be started, - * if needed stop IO using dc_accounts_stop_io() or dc_stop_io() first. + * * What to do is defined by the _what_ parameter which may be one of the following: * * - **DC_IMEX_EXPORT_BACKUP** (11) - Export a backup to the directory given as `param1` diff --git a/deltachat-jsonrpc/src/api/mod.rs b/deltachat-jsonrpc/src/api/mod.rs index c98287b2d..2990dd363 100644 --- a/deltachat-jsonrpc/src/api/mod.rs +++ b/deltachat-jsonrpc/src/api/mod.rs @@ -1325,16 +1325,13 @@ impl CommandApi { passphrase: Option, ) -> Result<()> { let ctx = self.get_context(account_id).await?; - ctx.stop_io().await; - let result = imex::imex( + imex::imex( &ctx, imex::ImexMode::ExportBackup, destination.as_ref(), passphrase, ) - .await; - ctx.start_io().await; - result + .await } async fn import_backup( diff --git a/deltachat-repl/src/main.rs b/deltachat-repl/src/main.rs index ddbc6471d..2670b311f 100644 --- a/deltachat-repl/src/main.rs +++ b/deltachat-repl/src/main.rs @@ -361,7 +361,7 @@ async fn start(args: Vec) -> Result<(), Error> { false } Err(err) => { - println!("Error: {err}"); + println!("Error: {err:#}"); true } } @@ -376,7 +376,7 @@ async fn start(args: Vec) -> Result<(), Error> { break; } Err(err) => { - println!("Error: {err}"); + println!("Error: {err:#}"); break; } } diff --git a/fuzz/Cargo.lock b/fuzz/Cargo.lock index b4510b7d0..b295ae225 100644 --- a/fuzz/Cargo.lock +++ b/fuzz/Cargo.lock @@ -111,7 +111,7 @@ version = "0.6.0" source = "git+https://github.com/async-email/async-imap?branch=master#85ff7a3d9d71a3715354fabf2fc1a8d047b5710e" dependencies = [ "async-channel", - "async-native-tls", + "async-native-tls 0.4.0", "base64 0.13.1", "byte-pool", "chrono", @@ -139,6 +139,18 @@ dependencies = [ "url", ] +[[package]] +name = "async-native-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9343dc5acf07e79ff82d0c37899f079db3534d99f189a1837c8e549c99405bec" +dependencies = [ + "native-tls", + "thiserror", + "tokio", + "url", +] + [[package]] name = "async-smtp" version = "0.8.0" @@ -696,12 +708,12 @@ checksum = "23d8666cb01533c39dde32bcbab8e227b4ed6679b2c925eba05feabea39508fb" [[package]] name = "deltachat" -version = "1.107.0" +version = "1.111.0" dependencies = [ "anyhow", "async-channel", "async-imap", - "async-native-tls", + "async-native-tls 0.5.0", "async-smtp", "async_zip", "backtrace", @@ -723,17 +735,15 @@ dependencies = [ "lettre_email", "libc", "mailparse 0.14.0", - "native-tls", "num-derive", "num-traits", "num_cpus", "once_cell", + "parking_lot", "percent-encoding", "pgp", "qrcodegen", "quick-xml", - "r2d2", - "r2d2_sqlite", "rand 0.8.5", "ratelimit", "regex", @@ -1284,15 +1294,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "hashbrown" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" -dependencies = [ - "ahash", -] - [[package]] name = "hashbrown" version = "0.12.3" @@ -1304,11 +1305,11 @@ dependencies = [ [[package]] name = "hashlink" -version = "0.7.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" +checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa" dependencies = [ - "hashbrown 0.11.2", + "hashbrown", ] [[package]] @@ -1506,7 +1507,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399" dependencies = [ "autocfg", - "hashbrown 0.12.3", + "hashbrown", ] [[package]] @@ -1631,9 +1632,9 @@ checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb" [[package]] name = "libsqlite3-sys" -version = "0.24.2" +version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "898745e570c7d0453cc1fbc4a701eb6c662ed54e8fec8b7d14be137ebeeb9d14" +checksum = "29f835d03d717946d28b1d1ed632eb6f0e24a299388ee623d0c23118d3e8a7fa" dependencies = [ "cc", "openssl-sys", @@ -2241,27 +2242,6 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20f14e071918cbeefc5edc986a7aa92c425dae244e003a35e1cdddb5ca39b5cb" -[[package]] -name = "r2d2" -version = "0.8.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" -dependencies = [ - "log", - "parking_lot", - "scheduled-thread-pool", -] - -[[package]] -name = "r2d2_sqlite" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fdc8e4da70586127893be32b7adf21326a4c6b1aba907611edf467d13ffe895" -dependencies = [ - "r2d2", - "rusqlite", -] - [[package]] name = "rand" version = "0.7.3" @@ -2451,16 +2431,15 @@ dependencies = [ [[package]] name = "rusqlite" -version = "0.27.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85127183a999f7db96d1a976a309eebbfb6ea3b0b400ddd8340190129de6eb7a" +checksum = "01e213bc3ecb39ac32e81e51ebe31fd888a940515173e3a18a35f8c6e896422a" dependencies = [ "bitflags", "fallible-iterator", "fallible-streaming-iterator", "hashlink", "libsqlite3-sys", - "memchr", "smallvec", ] @@ -2514,15 +2493,6 @@ dependencies = [ "windows-sys 0.36.1", ] -[[package]] -name = "scheduled-thread-pool" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf" -dependencies = [ - "parking_lot", -] - [[package]] name = "scopeguard" version = "1.1.0" @@ -3126,7 +3096,7 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5faade31a542b8b35855fff6e8def199853b2da8da256da52f52f1316ee3137" dependencies = [ - "hashbrown 0.12.3", + "hashbrown", "regex", ] diff --git a/scripts/coredeps/install-rust.sh b/scripts/coredeps/install-rust.sh index b4a08f3f1..8181cc991 100755 --- a/scripts/coredeps/install-rust.sh +++ b/scripts/coredeps/install-rust.sh @@ -7,7 +7,7 @@ set -euo pipefail # # Avoid using rustup here as it depends on reading /proc/self/exe and # has problems running under QEMU. -RUST_VERSION=1.64.0 +RUST_VERSION=1.68.0 ARCH="$(uname -m)" test -f "/lib/libc.musl-$ARCH.so.1" && LIBC=musl || LIBC=gnu diff --git a/scripts/create-provider-data-rs.py b/scripts/create-provider-data-rs.py index ea5d291ee..57f92428e 100755 --- a/scripts/create-provider-data-rs.py +++ b/scripts/create-provider-data-rs.py @@ -220,13 +220,13 @@ if __name__ == "__main__": process_dir(Path(sys.argv[1])) - out_all += "pub(crate) static PROVIDER_DATA: Lazy> = Lazy::new(|| [\n" + out_all += "pub(crate) static PROVIDER_DATA: Lazy> = Lazy::new(|| HashMap::from([\n" out_all += out_domains - out_all += "].iter().copied().collect());\n\n" + out_all += "]));\n\n" - out_all += "pub(crate) static PROVIDER_IDS: Lazy> = Lazy::new(|| [\n" + out_all += "pub(crate) static PROVIDER_IDS: Lazy> = Lazy::new(|| HashMap::from([\n" out_all += out_ids - out_all += "].iter().copied().collect());\n\n" + out_all += "]));\n\n" if len(sys.argv) < 3: now = datetime.datetime.utcnow() diff --git a/src/accounts.rs b/src/accounts.rs index e1620526d..a379b44e1 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -271,14 +271,14 @@ impl Accounts { /// Notifies all accounts that the network may have become available. pub async fn maybe_network(&self) { for account in self.accounts.values() { - account.maybe_network().await; + account.scheduler.maybe_network().await; } } /// Notifies all accounts that the network connection may have been lost. pub async fn maybe_network_lost(&self) { for account in self.accounts.values() { - account.maybe_network_lost().await; + account.scheduler.maybe_network_lost(account).await; } } diff --git a/src/chat.rs b/src/chat.rs index 0442dea3c..9d49ec552 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -639,7 +639,10 @@ impl ChatId { context.emit_msgs_changed_without_ids(); context.set_config(Config::LastHousekeeping, None).await?; - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; if chat.is_self_talk() { let mut msg = Message::new(Viewtype::Text); @@ -1667,7 +1670,7 @@ impl Chat { maybe_set_logging_xdc(context, msg, self.id).await?; } - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; Ok(msg.id) } } @@ -2201,7 +2204,10 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) - context.emit_event(EventType::LocationChanged(Some(ContactId::SELF))); } - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } Ok(msg.id) @@ -3433,7 +3439,10 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId) .await?; curr_timestamp += 1; if create_send_msg_job(context, new_msg_id).await?.is_some() { - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } created_chats.push(chat_id); @@ -3488,7 +3497,10 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { msg_id: msg.id, }); if create_send_msg_job(context, msg.id).await?.is_some() { - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } } diff --git a/src/config.rs b/src/config.rs index 76dca83a2..035caf314 100644 --- a/src/config.rs +++ b/src/config.rs @@ -443,7 +443,7 @@ impl Context { Config::DeleteDeviceAfter => { let ret = self.sql.set_raw_config(key.as_ref(), value).await; // Interrupt ephemeral loop to delete old messages immediately. - self.interrupt_ephemeral_task().await; + self.scheduler.interrupt_ephemeral_task().await; ret? } Config::Displayname => { diff --git a/src/configure.rs b/src/configure.rs index 48b91a3f7..eb69ad25c 100644 --- a/src/configure.rs +++ b/src/configure.rs @@ -59,7 +59,7 @@ impl Context { /// Configures this account with the currently set parameters. pub async fn configure(&self) -> Result<()> { ensure!( - self.scheduler.read().await.is_none(), + !self.scheduler.is_running().await, "cannot configure, already running" ); ensure!( @@ -469,7 +469,9 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> { ctx.set_config_bool(Config::FetchedExistingMsgs, false) .await?; - ctx.interrupt_inbox(InterruptInfo::new(false)).await; + ctx.scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; progress!(ctx, 940); update_device_chats_handle.await??; diff --git a/src/contact.rs b/src/contact.rs index 1d00a80f0..07430d308 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -1466,7 +1466,10 @@ pub(crate) async fn update_last_seen( > 0 && timestamp > time() - SEEN_RECENTLY_SECONDS { - context.interrupt_recently_seen(contact_id, timestamp).await; + context + .scheduler + .interrupt_recently_seen(contact_id, timestamp) + .await; } Ok(()) } diff --git a/src/context.rs b/src/context.rs index 0a0fa45e6..60740e909 100644 --- a/src/context.rs +++ b/src/context.rs @@ -25,7 +25,7 @@ use crate::login_param::LoginParam; use crate::message::{self, MessageState, MsgId}; use crate::qr::Qr; use crate::quota::QuotaInfo; -use crate::scheduler::Scheduler; +use crate::scheduler::SchedulerState; use crate::sql::Sql; use crate::stock_str::StockStrings; use crate::timesmearing::SmearedTimestamp; @@ -206,7 +206,7 @@ pub struct InnerContext { pub(crate) translated_stockstrings: StockStrings, pub(crate) events: Events, - pub(crate) scheduler: RwLock>, + pub(crate) scheduler: SchedulerState, pub(crate) ratelimit: RwLock, /// Recently loaded quota information, if any. @@ -383,7 +383,7 @@ impl Context { wrong_pw_warning_mutex: Mutex::new(()), translated_stockstrings: stockstrings, events, - scheduler: RwLock::new(None), + scheduler: SchedulerState::new(), ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow to send 6 messages immediately, no more than once every 10 seconds. quota: RwLock::new(None), quota_update_request: AtomicBool::new(false), @@ -409,42 +409,23 @@ impl Context { warn!(self, "can not start io on a context that is not configured"); return; } - - info!(self, "starting IO"); - let mut lock = self.inner.scheduler.write().await; - if lock.is_none() { - match Scheduler::start(self.clone()).await { - Err(err) => error!(self, "Failed to start IO: {:#}", err), - Ok(scheduler) => *lock = Some(scheduler), - } - } + self.scheduler.start(self.clone()).await; } /// Stops the IO scheduler. pub async fn stop_io(&self) { - // Sending an event wakes up event pollers (get_next_event) - // so the caller of stop_io() can arrange for proper termination. - // For this, the caller needs to instruct the event poller - // to terminate on receiving the next event and then call stop_io() - // which will emit the below event(s) - info!(self, "stopping IO"); - if let Some(debug_logging) = self.debug_logging.read().await.as_ref() { - debug_logging.loop_handle.abort(); - } - if let Some(scheduler) = self.inner.scheduler.write().await.take() { - scheduler.stop(self).await; - } + self.scheduler.stop(self).await; } /// Restarts the IO scheduler if it was running before /// when it is not running this is an no-op pub async fn restart_io_if_running(&self) { - info!(self, "restarting IO"); - let is_running = { self.inner.scheduler.read().await.is_some() }; - if is_running { - self.stop_io().await; - self.start_io().await; - } + self.scheduler.restart(self).await; + } + + /// Indicate that the network likely has come back. + pub async fn maybe_network(&self) { + self.scheduler.maybe_network().await; } /// Returns a reference to the underlying SQL instance. diff --git a/src/ephemeral.rs b/src/ephemeral.rs index f7dadf5d8..75fb1c127 100644 --- a/src/ephemeral.rs +++ b/src/ephemeral.rs @@ -317,7 +317,7 @@ impl MsgId { paramsv![ephemeral_timestamp, ephemeral_timestamp, self], ) .await?; - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } Ok(()) } @@ -345,7 +345,7 @@ pub(crate) async fn start_ephemeral_timers_msgids( ) .await?; if count > 0 { - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } Ok(()) } diff --git a/src/imap.rs b/src/imap.rs index 91f6ae9cc..6e3437fc0 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -475,7 +475,7 @@ impl Imap { // Note that the `Config::DeleteDeviceAfter` timer starts as soon as the messages are // fetched while the per-chat ephemeral timers start as soon as the messages are marked // as noticed. - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } let session = self @@ -2224,7 +2224,10 @@ pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) paramsv![message_id], ) .await?; - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } diff --git a/src/imex.rs b/src/imex.rs index a1c2fd627..d8753a56d 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -90,13 +90,17 @@ pub async fn imex( ) -> Result<()> { let cancel = context.alloc_ongoing().await?; - let res = imex_inner(context, what, path, passphrase) - .race(async { - cancel.recv().await.ok(); - Err(format_err!("canceled")) - }) - .await; - + let res = { + let mut guard = context.scheduler.pause(context).await; + let res = imex_inner(context, what, path, passphrase) + .race(async { + cancel.recv().await.ok(); + Err(format_err!("canceled")) + }) + .await; + guard.resume().await; + res + }; context.free_ongoing().await; if let Err(err) = res.as_ref() { @@ -417,7 +421,7 @@ async fn import_backup( "Cannot import backups to accounts in use." ); ensure!( - context.scheduler.read().await.is_none(), + !context.scheduler.is_running().await, "cannot import backup, IO is running" ); diff --git a/src/job.rs b/src/job.rs index 25f3814a2..029272d17 100644 --- a/src/job.rs +++ b/src/job.rs @@ -238,6 +238,7 @@ fn get_backoff_time_offset(tries: u32) -> i64 { pub(crate) async fn schedule_resync(context: &Context) -> Result<()> { context.resync_request.store(true, Ordering::Relaxed); context + .scheduler .interrupt_inbox(InterruptInfo { probe_network: false, }) @@ -250,7 +251,10 @@ pub async fn add(context: &Context, job: Job) -> Result<()> { job.save(context).await.context("failed to save job")?; info!(context, "interrupt: imap"); - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } diff --git a/src/location.rs b/src/location.rs index 52315585b..375c40662 100644 --- a/src/location.rs +++ b/src/location.rs @@ -267,7 +267,7 @@ pub async fn send_locations_to_chat( } context.emit_event(EventType::ChatModified(chat_id)); if 0 != seconds { - context.interrupt_location().await; + context.scheduler.interrupt_location().await; } Ok(()) } diff --git a/src/message.rs b/src/message.rs index b287e6c57..415618f08 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1419,7 +1419,10 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { } // Interrupt Inbox loop to start message deletion and run housekeeping. - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } @@ -1531,7 +1534,10 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec) -> Result<()> ) .await .context("failed to insert into smtp_mdns")?; - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } updated_chat_ids.insert(curr_chat_id); diff --git a/src/net/session.rs b/src/net/session.rs index d57288c98..2c4294875 100644 --- a/src/net/session.rs +++ b/src/net/session.rs @@ -2,7 +2,7 @@ use async_native_tls::TlsStream; use fast_socks5::client::Socks5Stream; use std::pin::Pin; use std::time::Duration; -use tokio::io::{AsyncRead, AsyncWrite, BufWriter}; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, BufStream, BufWriter}; use tokio_io_timeout::TimeoutStream; pub(crate) trait SessionStream: @@ -22,6 +22,11 @@ impl SessionStream for TlsStream { self.get_mut().set_read_timeout(timeout); } } +impl SessionStream for BufStream { + fn set_read_timeout(&mut self, timeout: Option) { + self.get_mut().set_read_timeout(timeout); + } +} impl SessionStream for BufWriter { fn set_read_timeout(&mut self, timeout: Option) { self.get_mut().set_read_timeout(timeout); @@ -39,3 +44,8 @@ impl SessionStream for Socks5Stream { self.get_socket_mut().set_read_timeout(timeout) } } + +/// Session stream with a read buffer. +pub(crate) trait SessionBufStream: SessionStream + AsyncBufRead {} + +impl SessionBufStream for T {} diff --git a/src/provider/data.rs b/src/provider/data.rs index f7736c6e9..8ca9d91e9 100644 --- a/src/provider/data.rs +++ b/src/provider/data.rs @@ -1513,7 +1513,7 @@ static P_ZOHO: Lazy = Lazy::new(|| Provider { }); pub(crate) static PROVIDER_DATA: Lazy> = Lazy::new(|| { - [ + HashMap::from([ ("163.com", &*P_163), ("aktivix.org", &*P_AKTIVIX_ORG), ("aol.com", &*P_AOL), @@ -1875,14 +1875,11 @@ pub(crate) static PROVIDER_DATA: Lazy> ("zohomail.eu", &*P_ZOHO), ("zohomail.com", &*P_ZOHO), ("zoho.com", &*P_ZOHO), - ] - .iter() - .copied() - .collect() + ]) }); pub(crate) static PROVIDER_IDS: Lazy> = Lazy::new(|| { - [ + HashMap::from([ ("163", &*P_163), ("aktivix.org", &*P_AKTIVIX_ORG), ("aol", &*P_AOL), @@ -1945,10 +1942,7 @@ pub(crate) static PROVIDER_IDS: Lazy> = ("yggmail", &*P_YGGMAIL), ("ziggo.nl", &*P_ZIGGO_NL), ("zoho", &*P_ZOHO), - ] - .iter() - .copied() - .collect() + ]) }); pub static PROVIDER_UPDATED: Lazy = diff --git a/src/quota.rs b/src/quota.rs index b4a2be225..f192fd1d7 100644 --- a/src/quota.rs +++ b/src/quota.rs @@ -115,7 +115,9 @@ impl Context { let requested = self.quota_update_request.swap(true, Ordering::Relaxed); if !requested { // Quota update was not requested before. - self.interrupt_inbox(InterruptInfo::new(false)).await; + self.scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; } Ok(()) } diff --git a/src/scheduler.rs b/src/scheduler.rs index daa542940..d5571f835 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -5,6 +5,7 @@ use anyhow::{bail, Context as _, Result}; use async_channel::{self as channel, Receiver, Sender}; use futures::future::try_join_all; use futures_lite::FutureExt; +use tokio::sync::{RwLock, RwLockWriteGuard}; use tokio::task; use self::connectivity::ConnectivityStore; @@ -23,10 +24,210 @@ use crate::tools::{duration_to_str, maybe_add_time_based_warnings}; pub(crate) mod connectivity; +/// State of the IO scheduler, as stored on the [`Context`]. +/// +/// The IO scheduler can be stopped or started, but core can also pause it. After pausing +/// the IO scheduler will be restarted only if it was running before paused or +/// [`Context::start_io`] was called in the meantime while it was paused. +#[derive(Debug, Default)] +pub(crate) struct SchedulerState { + inner: RwLock, +} + +impl SchedulerState { + pub(crate) fn new() -> Self { + Default::default() + } + + /// Whether the scheduler is currently running. + pub(crate) async fn is_running(&self) -> bool { + let inner = self.inner.read().await; + inner.scheduler.is_some() + } + + /// Starts the scheduler if it is not yet started. + pub(crate) async fn start(&self, context: Context) { + let mut inner = self.inner.write().await; + inner.started = true; + if inner.scheduler.is_none() && !inner.paused { + Self::do_start(inner, context).await; + } + } + + /// Starts the scheduler if it is not yet started. + async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) { + info!(context, "starting IO"); + let ctx = context.clone(); + match Scheduler::start(context).await { + Ok(scheduler) => inner.scheduler = Some(scheduler), + Err(err) => error!(&ctx, "Failed to start IO: {:#}", err), + } + } + + /// Stops the scheduler if it is currently running. + pub(crate) async fn stop(&self, context: &Context) { + let mut inner = self.inner.write().await; + inner.started = false; + Self::do_stop(inner, context).await; + } + + /// Stops the scheduler if it is currently running. + async fn do_stop(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: &Context) { + // Sending an event wakes up event pollers (get_next_event) + // so the caller of stop_io() can arrange for proper termination. + // For this, the caller needs to instruct the event poller + // to terminate on receiving the next event and then call stop_io() + // which will emit the below event(s) + info!(context, "stopping IO"); + if let Some(debug_logging) = context.debug_logging.read().await.as_ref() { + debug_logging.loop_handle.abort(); + } + if let Some(scheduler) = inner.scheduler.take() { + scheduler.stop(context).await; + } + } + + /// Pauses the IO scheduler. + /// + /// If it is currently running the scheduler will be stopped. When + /// [`IoPausedGuard::resume`] is called the scheduler is started again. + /// + /// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called + /// resume will do the right thing and restore the scheduler to the state requested by + /// the last call. + pub(crate) async fn pause<'a>(&'_ self, context: &'a Context) -> IoPausedGuard<'a> { + let mut inner = self.inner.write().await; + inner.paused = true; + Self::do_stop(inner, context).await; + IoPausedGuard { + context, + done: false, + } + } + + /// Restarts the scheduler, only if it is running. + pub(crate) async fn restart(&self, context: &Context) { + info!(context, "restarting IO"); + if self.is_running().await { + self.stop(context).await; + self.start(context.clone()).await; + } + } + + /// Indicate that the network likely has come back. + pub(crate) async fn maybe_network(&self) { + let inner = self.inner.read().await; + let (inbox, oboxes) = match inner.scheduler { + Some(ref scheduler) => { + scheduler.maybe_network(); + let inbox = scheduler.inbox.conn_state.state.connectivity.clone(); + let oboxes = scheduler + .oboxes + .iter() + .map(|b| b.conn_state.state.connectivity.clone()) + .collect::>(); + (inbox, oboxes) + } + None => return, + }; + drop(inner); + connectivity::idle_interrupted(inbox, oboxes).await; + } + + /// Indicate that the network likely is lost. + pub(crate) async fn maybe_network_lost(&self, context: &Context) { + let inner = self.inner.read().await; + let stores = match inner.scheduler { + Some(ref scheduler) => { + scheduler.maybe_network_lost(); + scheduler + .boxes() + .map(|b| b.conn_state.state.connectivity.clone()) + .collect() + } + None => return, + }; + drop(inner); + connectivity::maybe_network_lost(context, stores).await; + } + + pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_inbox(info); + } + } + + pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_smtp(info); + } + } + + pub(crate) async fn interrupt_ephemeral_task(&self) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_ephemeral_task(); + } + } + + pub(crate) async fn interrupt_location(&self) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_location(); + } + } + + pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_recently_seen(contact_id, timestamp); + } + } +} + +#[derive(Debug, Default)] +struct InnerSchedulerState { + scheduler: Option, + started: bool, + paused: bool, +} + +#[derive(Debug)] +pub(crate) struct IoPausedGuard<'a> { + context: &'a Context, + done: bool, +} + +impl<'a> IoPausedGuard<'a> { + pub(crate) async fn resume(&mut self) { + self.done = true; + let mut inner = self.context.scheduler.inner.write().await; + inner.paused = false; + if inner.started && inner.scheduler.is_none() { + SchedulerState::do_start(inner, self.context.clone()).await; + } + } +} + +impl<'a> Drop for IoPausedGuard<'a> { + fn drop(&mut self) { + if self.done { + return; + } + + // Async .resume() should be called manually due to lack of async drop. + error!(self.context, "Pause guard dropped without resuming."); + } +} + #[derive(Debug)] struct SchedBox { meaning: FolderMeaning, conn_state: ImapConnectionState, + + /// IMAP loop task handle. handle: task::JoinHandle<()>, } @@ -46,56 +247,6 @@ pub(crate) struct Scheduler { recently_seen_loop: RecentlySeenLoop, } -impl Context { - /// Indicate that the network likely has come back. - pub async fn maybe_network(&self) { - let lock = self.scheduler.read().await; - if let Some(scheduler) = &*lock { - scheduler.maybe_network(); - } - connectivity::idle_interrupted(lock).await; - } - - /// Indicate that the network likely is lost. - pub async fn maybe_network_lost(&self) { - let lock = self.scheduler.read().await; - if let Some(scheduler) = &*lock { - scheduler.maybe_network_lost(); - } - connectivity::maybe_network_lost(self, lock).await; - } - - pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_inbox(info); - } - } - - pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_smtp(info); - } - } - - pub(crate) async fn interrupt_ephemeral_task(&self) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_ephemeral_task(); - } - } - - pub(crate) async fn interrupt_location(&self) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_location(); - } - } - - pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_recently_seen(contact_id, timestamp); - } - } -} - async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) { use futures::future::FutureExt; diff --git a/src/scheduler/connectivity.rs b/src/scheduler/connectivity.rs index 6ede2074f..91edcad61 100644 --- a/src/scheduler/connectivity.rs +++ b/src/scheduler/connectivity.rs @@ -3,7 +3,7 @@ use std::{iter::once, ops::Deref, sync::Arc}; use anyhow::{anyhow, Result}; use humansize::{format_size, BINARY}; -use tokio::sync::{Mutex, RwLockReadGuard}; +use tokio::sync::Mutex; use crate::events::EventType; use crate::imap::{scan_folders::get_watched_folder_configs, FolderMeaning}; @@ -12,7 +12,7 @@ use crate::quota::{ }; use crate::tools::time; use crate::{context::Context, log::LogExt}; -use crate::{scheduler::Scheduler, stock_str, tools}; +use crate::{stock_str, tools}; #[derive(Debug, Clone, Copy, PartialEq, Eq, EnumProperty, PartialOrd, Ord)] pub enum Connectivity { @@ -156,19 +156,7 @@ impl ConnectivityStore { /// Set all folder states to InterruptingIdle in case they were `Connected` before. /// Called during `dc_maybe_network()` to make sure that `dc_accounts_all_work_done()` /// returns false immediately after `dc_maybe_network()`. -pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option>) { - let (inbox, oboxes) = match &*scheduler { - Some(Scheduler { inbox, oboxes, .. }) => ( - inbox.conn_state.state.connectivity.clone(), - oboxes - .iter() - .map(|b| b.conn_state.state.connectivity.clone()) - .collect::>(), - ), - None => return, - }; - drop(scheduler); - +pub(crate) async fn idle_interrupted(inbox: ConnectivityStore, oboxes: Vec) { let mut connectivity_lock = inbox.0.lock().await; // For the inbox, we also have to set the connectivity to InterruptingIdle if it was // NotConfigured before: If all folders are NotConfigured, dc_get_connectivity() @@ -195,19 +183,7 @@ pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option>, -) { - let stores: Vec<_> = match &*scheduler { - Some(sched) => sched - .boxes() - .map(|b| b.conn_state.state.connectivity.clone()) - .collect(), - None => return, - }; - drop(scheduler); - +pub(crate) async fn maybe_network_lost(context: &Context, stores: Vec) { for store in &stores { let mut connectivity_lock = store.0.lock().await; if !matches!( @@ -249,9 +225,9 @@ impl Context { /// /// If the connectivity changes, a DC_EVENT_CONNECTIVITY_CHANGED will be emitted. pub async fn get_connectivity(&self) -> Connectivity { - let lock = self.scheduler.read().await; - let stores: Vec<_> = match &*lock { - Some(sched) => sched + let lock = self.scheduler.inner.read().await; + let stores: Vec<_> = match lock.scheduler { + Some(ref sched) => sched .boxes() .map(|b| b.conn_state.state.connectivity.clone()) .collect(), @@ -332,9 +308,9 @@ impl Context { // Get the states from the RwLock // ============================================================================================= - let lock = self.scheduler.read().await; - let (folders_states, smtp) = match &*lock { - Some(sched) => ( + let lock = self.scheduler.inner.read().await; + let (folders_states, smtp) = match lock.scheduler { + Some(ref sched) => ( sched .boxes() .map(|b| (b.meaning, b.conn_state.state.connectivity.clone())) @@ -503,9 +479,9 @@ impl Context { /// Returns true if all background work is done. pub async fn all_work_done(&self) -> bool { - let lock = self.scheduler.read().await; - let stores: Vec<_> = match &*lock { - Some(sched) => sched + let lock = self.scheduler.inner.read().await; + let stores: Vec<_> = match lock.scheduler { + Some(ref sched) => sched .boxes() .map(|b| &b.conn_state.state) .chain(once(&sched.smtp.state)) diff --git a/src/smtp.rs b/src/smtp.rs index 1786006e9..b8590dbb0 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -7,7 +7,7 @@ use std::time::{Duration, SystemTime}; use anyhow::{bail, format_err, Context as _, Error, Result}; use async_smtp::response::{Category, Code, Detail}; use async_smtp::{self as smtp, EmailAddress, SmtpTransport}; -use tokio::io::BufWriter; +use tokio::io::BufStream; use tokio::task; use crate::config::Config; @@ -18,7 +18,7 @@ use crate::message::Message; use crate::message::{self, MsgId}; use crate::mimefactory::MimeFactory; use crate::net::connect_tcp; -use crate::net::session::SessionStream; +use crate::net::session::SessionBufStream; use crate::net::tls::wrap_tls; use crate::oauth2::get_oauth2_access_token; use crate::provider::Socket; @@ -32,7 +32,7 @@ const SMTP_TIMEOUT: Duration = Duration::from_secs(30); #[derive(Default)] pub(crate) struct Smtp { /// SMTP connection. - transport: Option>>, + transport: Option>>, /// Email address we are sending from. from: Option, @@ -116,13 +116,13 @@ impl Smtp { port: u16, strict_tls: bool, socks5_config: Socks5Config, - ) -> Result>> { + ) -> Result>> { let socks5_stream = socks5_config .connect(context, hostname, port, SMTP_TIMEOUT, strict_tls) .await?; let tls_stream = wrap_tls(strict_tls, hostname, socks5_stream).await?; - let buffered_stream = BufWriter::new(tls_stream); - let session_stream: Box = Box::new(buffered_stream); + let buffered_stream = BufStream::new(tls_stream); + let session_stream: Box = Box::new(buffered_stream); let client = smtp::SmtpClient::new().smtp_utf8(true); let transport = SmtpTransport::new(client, session_stream).await?; Ok(transport) @@ -135,20 +135,20 @@ impl Smtp { port: u16, strict_tls: bool, socks5_config: Socks5Config, - ) -> Result>> { + ) -> Result>> { let socks5_stream = socks5_config .connect(context, hostname, port, SMTP_TIMEOUT, strict_tls) .await?; // Run STARTTLS command and convert the client back into a stream. let client = smtp::SmtpClient::new().smtp_utf8(true); - let transport = SmtpTransport::new(client, socks5_stream).await?; + let transport = SmtpTransport::new(client, BufStream::new(socks5_stream)).await?; let tcp_stream = transport.starttls().await?; let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream) .await .context("STARTTLS upgrade failed")?; - let buffered_stream = BufWriter::new(tls_stream); - let session_stream: Box = Box::new(buffered_stream); + let buffered_stream = BufStream::new(tls_stream); + let session_stream: Box = Box::new(buffered_stream); let client = smtp::SmtpClient::new().smtp_utf8(true).without_greeting(); let transport = SmtpTransport::new(client, session_stream).await?; Ok(transport) @@ -160,12 +160,12 @@ impl Smtp { hostname: &str, port: u16, socks5_config: Socks5Config, - ) -> Result>> { + ) -> Result>> { let socks5_stream = socks5_config .connect(context, hostname, port, SMTP_TIMEOUT, false) .await?; - let buffered_stream = BufWriter::new(socks5_stream); - let session_stream: Box = Box::new(buffered_stream); + let buffered_stream = BufStream::new(socks5_stream); + let session_stream: Box = Box::new(buffered_stream); let client = smtp::SmtpClient::new().smtp_utf8(true); let transport = SmtpTransport::new(client, session_stream).await?; Ok(transport) @@ -177,11 +177,11 @@ impl Smtp { hostname: &str, port: u16, strict_tls: bool, - ) -> Result>> { + ) -> Result>> { let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, false).await?; let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream).await?; - let buffered_stream = BufWriter::new(tls_stream); - let session_stream: Box = Box::new(buffered_stream); + let buffered_stream = BufStream::new(tls_stream); + let session_stream: Box = Box::new(buffered_stream); let client = smtp::SmtpClient::new().smtp_utf8(true); let transport = SmtpTransport::new(client, session_stream).await?; Ok(transport) @@ -193,18 +193,18 @@ impl Smtp { hostname: &str, port: u16, strict_tls: bool, - ) -> Result>> { + ) -> Result>> { let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, strict_tls).await?; // Run STARTTLS command and convert the client back into a stream. let client = smtp::SmtpClient::new().smtp_utf8(true); - let transport = SmtpTransport::new(client, tcp_stream).await?; + let transport = SmtpTransport::new(client, BufStream::new(tcp_stream)).await?; let tcp_stream = transport.starttls().await?; let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream) .await .context("STARTTLS upgrade failed")?; - let buffered_stream = BufWriter::new(tls_stream); - let session_stream: Box = Box::new(buffered_stream); + let buffered_stream = BufStream::new(tls_stream); + let session_stream: Box = Box::new(buffered_stream); let client = smtp::SmtpClient::new().smtp_utf8(true).without_greeting(); let transport = SmtpTransport::new(client, session_stream).await?; Ok(transport) @@ -215,10 +215,10 @@ impl Smtp { context: &Context, hostname: &str, port: u16, - ) -> Result>> { + ) -> Result>> { let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, false).await?; - let buffered_stream = BufWriter::new(tcp_stream); - let session_stream: Box = Box::new(buffered_stream); + let buffered_stream = BufStream::new(tcp_stream); + let session_stream: Box = Box::new(buffered_stream); let client = smtp::SmtpClient::new().smtp_utf8(true); let transport = SmtpTransport::new(client, session_stream).await?; Ok(transport) diff --git a/src/sync.rs b/src/sync.rs index 2976a07fc..ea10cbce9 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -109,8 +109,8 @@ impl Context { Ok(()) } - // Add deleted qr-code token to the list of items to be synced - // so that the token also gets deleted on the other devices. + /// Adds deleted qr-code token to the list of items to be synced + /// so that the token also gets deleted on the other devices. pub(crate) async fn sync_qr_code_token_deletion( &self, invitenumber: String, diff --git a/src/webxdc.rs b/src/webxdc.rs index bfaaa8741..d5873df55 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -421,7 +421,9 @@ impl Context { DO UPDATE SET last_serial=excluded.last_serial, descr=excluded.descr", paramsv![instance.id, status_update_serial, status_update_serial, descr], ).await?; - self.interrupt_smtp(InterruptInfo::new(false)).await; + self.scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } Ok(()) }