From a32a3b8ccadb873d77e658dd31b071f80db7ab43 Mon Sep 17 00:00:00 2001 From: link2xt Date: Fri, 17 Mar 2023 09:13:41 +0000 Subject: [PATCH 01/19] Construct HashMaps in provider database from array This saves 450 lines according to `cargo llvm-lines --release`. --- scripts/create-provider-data-rs.py | 8 ++++---- src/provider/data.rs | 14 ++++---------- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/scripts/create-provider-data-rs.py b/scripts/create-provider-data-rs.py index ea5d291ee..57f92428e 100755 --- a/scripts/create-provider-data-rs.py +++ b/scripts/create-provider-data-rs.py @@ -220,13 +220,13 @@ if __name__ == "__main__": process_dir(Path(sys.argv[1])) - out_all += "pub(crate) static PROVIDER_DATA: Lazy> = Lazy::new(|| [\n" + out_all += "pub(crate) static PROVIDER_DATA: Lazy> = Lazy::new(|| HashMap::from([\n" out_all += out_domains - out_all += "].iter().copied().collect());\n\n" + out_all += "]));\n\n" - out_all += "pub(crate) static PROVIDER_IDS: Lazy> = Lazy::new(|| [\n" + out_all += "pub(crate) static PROVIDER_IDS: Lazy> = Lazy::new(|| HashMap::from([\n" out_all += out_ids - out_all += "].iter().copied().collect());\n\n" + out_all += "]));\n\n" if len(sys.argv) < 3: now = datetime.datetime.utcnow() diff --git a/src/provider/data.rs b/src/provider/data.rs index f7736c6e9..8ca9d91e9 100644 --- a/src/provider/data.rs +++ b/src/provider/data.rs @@ -1513,7 +1513,7 @@ static P_ZOHO: Lazy = Lazy::new(|| Provider { }); pub(crate) static PROVIDER_DATA: Lazy> = Lazy::new(|| { - [ + HashMap::from([ ("163.com", &*P_163), ("aktivix.org", &*P_AKTIVIX_ORG), ("aol.com", &*P_AOL), @@ -1875,14 +1875,11 @@ pub(crate) static PROVIDER_DATA: Lazy> ("zohomail.eu", &*P_ZOHO), ("zohomail.com", &*P_ZOHO), ("zoho.com", &*P_ZOHO), - ] - .iter() - .copied() - .collect() + ]) }); pub(crate) static PROVIDER_IDS: Lazy> = Lazy::new(|| { - [ + HashMap::from([ ("163", &*P_163), ("aktivix.org", &*P_AKTIVIX_ORG), ("aol", &*P_AOL), @@ -1945,10 +1942,7 @@ pub(crate) static PROVIDER_IDS: Lazy> = ("yggmail", &*P_YGGMAIL), ("ziggo.nl", &*P_ZIGGO_NL), ("zoho", &*P_ZOHO), - ] - .iter() - .copied() - .collect() + ]) }); pub static PROVIDER_UPDATED: Lazy = From b83b9db7122723b2db37a80fed0199128e7b9260 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sat, 18 Mar 2023 14:36:56 +0000 Subject: [PATCH 02/19] repl: print errors with causes --- CHANGELOG.md | 1 + deltachat-repl/src/main.rs | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8bdd67bc3..ace9fd044 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - "full message view" not needed because of footers that go to contact status #4151 - Pick up system's light/dark mode in generated message HTML #4150 - Support non-persistent configuration with DELTACHAT_* env +- Print deltachat-repl errors with causes. #4166 ### Fixes - Fix segmentation fault if `dc_context_unref()` is called during diff --git a/deltachat-repl/src/main.rs b/deltachat-repl/src/main.rs index 33074d6ab..80b1bd8be 100644 --- a/deltachat-repl/src/main.rs +++ b/deltachat-repl/src/main.rs @@ -359,7 +359,7 @@ async fn start(args: Vec) -> Result<(), Error> { false } Err(err) => { - println!("Error: {err}"); + println!("Error: {err:#}"); true } } @@ -374,7 +374,7 @@ async fn start(args: Vec) -> Result<(), Error> { break; } Err(err) => { - println!("Error: {err}"); + println!("Error: {err:#}"); break; } } From dd57854ee362143aca9f48caaf54f1bca6fdd086 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sat, 18 Mar 2023 19:01:44 +0000 Subject: [PATCH 03/19] Increase Minimum Supported Rust Version to 1.64 It is required for clap_lex v0.3.2 and async_zip 0.0.11. --- .github/workflows/ci.yml | 4 ++-- CHANGELOG.md | 1 + Cargo.toml | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 59e7810d7..01fed6012 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -78,13 +78,13 @@ jobs: rust: 1.68.0 python: false # Python bindings compilation on Windows is not supported. - # Minimum Supported Rust Version = 1.63.0 + # Minimum Supported Rust Version = 1.64.0 # # Minimum Supported Python Version = 3.7 # This is the minimum version for which manylinux Python wheels are # built. - os: ubuntu-latest - rust: 1.63.0 + rust: 1.64.0 python: 3.7 runs-on: ${{ matrix.os }} steps: diff --git a/CHANGELOG.md b/CHANGELOG.md index ace9fd044..5538e6767 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Pick up system's light/dark mode in generated message HTML #4150 - Support non-persistent configuration with DELTACHAT_* env - Print deltachat-repl errors with causes. #4166 +- Increase MSRV to 1.64. #4167 ### Fixes - Fix segmentation fault if `dc_context_unref()` is called during diff --git a/Cargo.toml b/Cargo.toml index 506fb0b5e..b141b410f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "deltachat" version = "1.111.0" edition = "2021" license = "MPL-2.0" -rust-version = "1.63" +rust-version = "1.64" [profile.dev] debug = 0 From 0a65081db04de064f950da0d3f06a328344556cc Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sat, 18 Mar 2023 19:31:46 +0000 Subject: [PATCH 04/19] Bump libsqlite3-sys from 0.24.2 to 0.25.2 in /fuzz Bumps [libsqlite3-sys](https://github.com/rusqlite/rusqlite) from 0.24.2 to 0.25.2. - [Release notes](https://github.com/rusqlite/rusqlite/releases) - [Changelog](https://github.com/rusqlite/rusqlite/blob/master/Changelog.md) - [Commits](https://github.com/rusqlite/rusqlite/compare/libsqlite3-sys-0.24.2...v0.25.2) --- updated-dependencies: - dependency-name: libsqlite3-sys dependency-type: indirect ... Signed-off-by: dependabot[bot] --- fuzz/Cargo.lock | 80 ++++++++++++++++--------------------------------- 1 file changed, 25 insertions(+), 55 deletions(-) diff --git a/fuzz/Cargo.lock b/fuzz/Cargo.lock index b4510b7d0..b295ae225 100644 --- a/fuzz/Cargo.lock +++ b/fuzz/Cargo.lock @@ -111,7 +111,7 @@ version = "0.6.0" source = "git+https://github.com/async-email/async-imap?branch=master#85ff7a3d9d71a3715354fabf2fc1a8d047b5710e" dependencies = [ "async-channel", - "async-native-tls", + "async-native-tls 0.4.0", "base64 0.13.1", "byte-pool", "chrono", @@ -139,6 +139,18 @@ dependencies = [ "url", ] +[[package]] +name = "async-native-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9343dc5acf07e79ff82d0c37899f079db3534d99f189a1837c8e549c99405bec" +dependencies = [ + "native-tls", + "thiserror", + "tokio", + "url", +] + [[package]] name = "async-smtp" version = "0.8.0" @@ -696,12 +708,12 @@ checksum = "23d8666cb01533c39dde32bcbab8e227b4ed6679b2c925eba05feabea39508fb" [[package]] name = "deltachat" -version = "1.107.0" +version = "1.111.0" dependencies = [ "anyhow", "async-channel", "async-imap", - "async-native-tls", + "async-native-tls 0.5.0", "async-smtp", "async_zip", "backtrace", @@ -723,17 +735,15 @@ dependencies = [ "lettre_email", "libc", "mailparse 0.14.0", - "native-tls", "num-derive", "num-traits", "num_cpus", "once_cell", + "parking_lot", "percent-encoding", "pgp", "qrcodegen", "quick-xml", - "r2d2", - "r2d2_sqlite", "rand 0.8.5", "ratelimit", "regex", @@ -1284,15 +1294,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "hashbrown" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" -dependencies = [ - "ahash", -] - [[package]] name = "hashbrown" version = "0.12.3" @@ -1304,11 +1305,11 @@ dependencies = [ [[package]] name = "hashlink" -version = "0.7.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" +checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa" dependencies = [ - "hashbrown 0.11.2", + "hashbrown", ] [[package]] @@ -1506,7 +1507,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399" dependencies = [ "autocfg", - "hashbrown 0.12.3", + "hashbrown", ] [[package]] @@ -1631,9 +1632,9 @@ checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb" [[package]] name = "libsqlite3-sys" -version = "0.24.2" +version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "898745e570c7d0453cc1fbc4a701eb6c662ed54e8fec8b7d14be137ebeeb9d14" +checksum = "29f835d03d717946d28b1d1ed632eb6f0e24a299388ee623d0c23118d3e8a7fa" dependencies = [ "cc", "openssl-sys", @@ -2241,27 +2242,6 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20f14e071918cbeefc5edc986a7aa92c425dae244e003a35e1cdddb5ca39b5cb" -[[package]] -name = "r2d2" -version = "0.8.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" -dependencies = [ - "log", - "parking_lot", - "scheduled-thread-pool", -] - -[[package]] -name = "r2d2_sqlite" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fdc8e4da70586127893be32b7adf21326a4c6b1aba907611edf467d13ffe895" -dependencies = [ - "r2d2", - "rusqlite", -] - [[package]] name = "rand" version = "0.7.3" @@ -2451,16 +2431,15 @@ dependencies = [ [[package]] name = "rusqlite" -version = "0.27.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85127183a999f7db96d1a976a309eebbfb6ea3b0b400ddd8340190129de6eb7a" +checksum = "01e213bc3ecb39ac32e81e51ebe31fd888a940515173e3a18a35f8c6e896422a" dependencies = [ "bitflags", "fallible-iterator", "fallible-streaming-iterator", "hashlink", "libsqlite3-sys", - "memchr", "smallvec", ] @@ -2514,15 +2493,6 @@ dependencies = [ "windows-sys 0.36.1", ] -[[package]] -name = "scheduled-thread-pool" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf" -dependencies = [ - "parking_lot", -] - [[package]] name = "scopeguard" version = "1.1.0" @@ -3126,7 +3096,7 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5faade31a542b8b35855fff6e8def199853b2da8da256da52f52f1316ee3137" dependencies = [ - "hashbrown 0.12.3", + "hashbrown", "regex", ] From 3eadc86217e3d6913947db44d916e177cf4b8118 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sat, 18 Mar 2023 21:08:40 +0000 Subject: [PATCH 05/19] Update Rust in coredeps docker image to 1.68.0 --- scripts/coredeps/install-rust.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/coredeps/install-rust.sh b/scripts/coredeps/install-rust.sh index b4a08f3f1..8181cc991 100755 --- a/scripts/coredeps/install-rust.sh +++ b/scripts/coredeps/install-rust.sh @@ -7,7 +7,7 @@ set -euo pipefail # # Avoid using rustup here as it depends on reading /proc/self/exe and # has problems running under QEMU. -RUST_VERSION=1.64.0 +RUST_VERSION=1.68.0 ARCH="$(uname -m)" test -f "/lib/libc.musl-$ARCH.so.1" && LIBC=musl || LIBC=gnu From a566fd6301844bf03c652a4cc6204c66a00b2f19 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 12 Mar 2023 13:46:11 +0000 Subject: [PATCH 06/19] Upgrade async-smtp to 0.9.0 async-smtp does not implement read buffering anymore and expects library user to implement it. To implement read buffer, we wrap streams into BufStream instead of BufWriter. --- Cargo.lock | 11 ++--------- Cargo.toml | 2 +- src/net/session.rs | 12 +++++++++++- src/smtp.rs | 46 +++++++++++++++++++++++----------------------- 4 files changed, 37 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a13a40acb..2919cc46c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -177,13 +177,12 @@ dependencies = [ [[package]] name = "async-smtp" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7384febcabdd07a498c9f4fbaa7e488ff4eb60d0ade14b47b09ec44b8f645301" +checksum = "8709c0d4432be428a88a06746689a9cb543e8e27ef7f61ca4d0455003a3d8c5b" dependencies = [ "anyhow", "base64 0.13.1", - "bufstream", "futures", "hostname", "log", @@ -401,12 +400,6 @@ dependencies = [ "safemem", ] -[[package]] -name = "bufstream" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40e38929add23cdf8a366df9b0e088953150724bcbe5fc330b0d8eb3b328eec8" - [[package]] name = "bumpalo" version = "3.12.0" diff --git a/Cargo.toml b/Cargo.toml index b141b410f..3a91592d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ anyhow = "1" async-channel = "1.8.0" async-imap = { git = "https://github.com/async-email/async-imap", branch = "master", default-features = false, features = ["runtime-tokio"] } async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] } -async-smtp = { version = "0.8", default-features = false, features = ["runtime-tokio"] } +async-smtp = { version = "0.9", default-features = false, features = ["runtime-tokio"] } async_zip = { version = "0.0.9", default-features = false, features = ["deflate"] } backtrace = "0.3" base64 = "0.21" diff --git a/src/net/session.rs b/src/net/session.rs index d57288c98..2c4294875 100644 --- a/src/net/session.rs +++ b/src/net/session.rs @@ -2,7 +2,7 @@ use async_native_tls::TlsStream; use fast_socks5::client::Socks5Stream; use std::pin::Pin; use std::time::Duration; -use tokio::io::{AsyncRead, AsyncWrite, BufWriter}; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, BufStream, BufWriter}; use tokio_io_timeout::TimeoutStream; pub(crate) trait SessionStream: @@ -22,6 +22,11 @@ impl SessionStream for TlsStream { self.get_mut().set_read_timeout(timeout); } } +impl SessionStream for BufStream { + fn set_read_timeout(&mut self, timeout: Option) { + self.get_mut().set_read_timeout(timeout); + } +} impl SessionStream for BufWriter { fn set_read_timeout(&mut self, timeout: Option) { self.get_mut().set_read_timeout(timeout); @@ -39,3 +44,8 @@ impl SessionStream for Socks5Stream { self.get_socket_mut().set_read_timeout(timeout) } } + +/// Session stream with a read buffer. +pub(crate) trait SessionBufStream: SessionStream + AsyncBufRead {} + +impl SessionBufStream for T {} diff --git a/src/smtp.rs b/src/smtp.rs index 1786006e9..b8590dbb0 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -7,7 +7,7 @@ use std::time::{Duration, SystemTime}; use anyhow::{bail, format_err, Context as _, Error, Result}; use async_smtp::response::{Category, Code, Detail}; use async_smtp::{self as smtp, EmailAddress, SmtpTransport}; -use tokio::io::BufWriter; +use tokio::io::BufStream; use tokio::task; use crate::config::Config; @@ -18,7 +18,7 @@ use crate::message::Message; use crate::message::{self, MsgId}; use crate::mimefactory::MimeFactory; use crate::net::connect_tcp; -use crate::net::session::SessionStream; +use crate::net::session::SessionBufStream; use crate::net::tls::wrap_tls; use crate::oauth2::get_oauth2_access_token; use crate::provider::Socket; @@ -32,7 +32,7 @@ const SMTP_TIMEOUT: Duration = Duration::from_secs(30); #[derive(Default)] pub(crate) struct Smtp { /// SMTP connection. - transport: Option>>, + transport: Option>>, /// Email address we are sending from. from: Option, @@ -116,13 +116,13 @@ impl Smtp { port: u16, strict_tls: bool, socks5_config: Socks5Config, - ) -> Result>> { + ) -> Result>> { let socks5_stream = socks5_config .connect(context, hostname, port, SMTP_TIMEOUT, strict_tls) .await?; let tls_stream = wrap_tls(strict_tls, hostname, socks5_stream).await?; - let buffered_stream = BufWriter::new(tls_stream); - let session_stream: Box = Box::new(buffered_stream); + let buffered_stream = BufStream::new(tls_stream); + let session_stream: Box = Box::new(buffered_stream); let client = smtp::SmtpClient::new().smtp_utf8(true); let transport = SmtpTransport::new(client, session_stream).await?; Ok(transport) @@ -135,20 +135,20 @@ impl Smtp { port: u16, strict_tls: bool, socks5_config: Socks5Config, - ) -> Result>> { + ) -> Result>> { let socks5_stream = socks5_config .connect(context, hostname, port, SMTP_TIMEOUT, strict_tls) .await?; // Run STARTTLS command and convert the client back into a stream. let client = smtp::SmtpClient::new().smtp_utf8(true); - let transport = SmtpTransport::new(client, socks5_stream).await?; + let transport = SmtpTransport::new(client, BufStream::new(socks5_stream)).await?; let tcp_stream = transport.starttls().await?; let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream) .await .context("STARTTLS upgrade failed")?; - let buffered_stream = BufWriter::new(tls_stream); - let session_stream: Box = Box::new(buffered_stream); + let buffered_stream = BufStream::new(tls_stream); + let session_stream: Box = Box::new(buffered_stream); let client = smtp::SmtpClient::new().smtp_utf8(true).without_greeting(); let transport = SmtpTransport::new(client, session_stream).await?; Ok(transport) @@ -160,12 +160,12 @@ impl Smtp { hostname: &str, port: u16, socks5_config: Socks5Config, - ) -> Result>> { + ) -> Result>> { let socks5_stream = socks5_config .connect(context, hostname, port, SMTP_TIMEOUT, false) .await?; - let buffered_stream = BufWriter::new(socks5_stream); - let session_stream: Box = Box::new(buffered_stream); + let buffered_stream = BufStream::new(socks5_stream); + let session_stream: Box = Box::new(buffered_stream); let client = smtp::SmtpClient::new().smtp_utf8(true); let transport = SmtpTransport::new(client, session_stream).await?; Ok(transport) @@ -177,11 +177,11 @@ impl Smtp { hostname: &str, port: u16, strict_tls: bool, - ) -> Result>> { + ) -> Result>> { let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, false).await?; let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream).await?; - let buffered_stream = BufWriter::new(tls_stream); - let session_stream: Box = Box::new(buffered_stream); + let buffered_stream = BufStream::new(tls_stream); + let session_stream: Box = Box::new(buffered_stream); let client = smtp::SmtpClient::new().smtp_utf8(true); let transport = SmtpTransport::new(client, session_stream).await?; Ok(transport) @@ -193,18 +193,18 @@ impl Smtp { hostname: &str, port: u16, strict_tls: bool, - ) -> Result>> { + ) -> Result>> { let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, strict_tls).await?; // Run STARTTLS command and convert the client back into a stream. let client = smtp::SmtpClient::new().smtp_utf8(true); - let transport = SmtpTransport::new(client, tcp_stream).await?; + let transport = SmtpTransport::new(client, BufStream::new(tcp_stream)).await?; let tcp_stream = transport.starttls().await?; let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream) .await .context("STARTTLS upgrade failed")?; - let buffered_stream = BufWriter::new(tls_stream); - let session_stream: Box = Box::new(buffered_stream); + let buffered_stream = BufStream::new(tls_stream); + let session_stream: Box = Box::new(buffered_stream); let client = smtp::SmtpClient::new().smtp_utf8(true).without_greeting(); let transport = SmtpTransport::new(client, session_stream).await?; Ok(transport) @@ -215,10 +215,10 @@ impl Smtp { context: &Context, hostname: &str, port: u16, - ) -> Result>> { + ) -> Result>> { let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, false).await?; - let buffered_stream = BufWriter::new(tcp_stream); - let session_stream: Box = Box::new(buffered_stream); + let buffered_stream = BufStream::new(tcp_stream); + let session_stream: Box = Box::new(buffered_stream); let client = smtp::SmtpClient::new().smtp_utf8(true); let transport = SmtpTransport::new(client, session_stream).await?; Ok(transport) From 5eb7206b2d338158420b359e19bf29755079daca Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 19 Mar 2023 00:10:45 +0000 Subject: [PATCH 07/19] Format documentation comment for `sync_qr_code_token_deletion` --- src/sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sync.rs b/src/sync.rs index 2976a07fc..ea10cbce9 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -109,8 +109,8 @@ impl Context { Ok(()) } - // Add deleted qr-code token to the list of items to be synced - // so that the token also gets deleted on the other devices. + /// Adds deleted qr-code token to the list of items to be synced + /// so that the token also gets deleted on the other devices. pub(crate) async fn sync_qr_code_token_deletion( &self, invitenumber: String, From 1d42e4743fac7f1e38ca3a2c6f601809c489840f Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 8 Mar 2023 14:20:42 +0100 Subject: [PATCH 08/19] Allow pausing IO scheduler from inside core To handle backups the UIs have to make sure they do stop the IO scheduler and also don't accidentally restart it while working on it. Since they have to call start_io from a bunch of locations this can be a bit difficult to manage. This introduces a mechanism for the core to pause IO for some time, which is used by the imex function. It interacts well with other calls to dc_start_io() and dc_stop_io() making sure that when resumed the scheduler will be running or not as the latest calls to them. This was a little more invasive then hoped due to the scheduler. The additional abstraction of the scheduler on the context seems a nice improvement though. --- src/accounts.rs | 4 +- src/chat.rs | 22 ++- src/config.rs | 2 +- src/configure.rs | 6 +- src/contact.rs | 5 +- src/context.rs | 51 +++---- src/ephemeral.rs | 4 +- src/imap.rs | 7 +- src/imex.rs | 22 +-- src/job.rs | 6 +- src/location.rs | 2 +- src/message.rs | 10 +- src/quota.rs | 4 +- src/scheduler.rs | 253 +++++++++++++++++++++++++++------- src/scheduler/connectivity.rs | 50 ++----- src/webxdc.rs | 4 +- 16 files changed, 305 insertions(+), 147 deletions(-) diff --git a/src/accounts.rs b/src/accounts.rs index e1620526d..a379b44e1 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -271,14 +271,14 @@ impl Accounts { /// Notifies all accounts that the network may have become available. pub async fn maybe_network(&self) { for account in self.accounts.values() { - account.maybe_network().await; + account.scheduler.maybe_network().await; } } /// Notifies all accounts that the network connection may have been lost. pub async fn maybe_network_lost(&self) { for account in self.accounts.values() { - account.maybe_network_lost().await; + account.scheduler.maybe_network_lost(account).await; } } diff --git a/src/chat.rs b/src/chat.rs index 0442dea3c..9d49ec552 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -639,7 +639,10 @@ impl ChatId { context.emit_msgs_changed_without_ids(); context.set_config(Config::LastHousekeeping, None).await?; - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; if chat.is_self_talk() { let mut msg = Message::new(Viewtype::Text); @@ -1667,7 +1670,7 @@ impl Chat { maybe_set_logging_xdc(context, msg, self.id).await?; } - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; Ok(msg.id) } } @@ -2201,7 +2204,10 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) - context.emit_event(EventType::LocationChanged(Some(ContactId::SELF))); } - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } Ok(msg.id) @@ -3433,7 +3439,10 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId) .await?; curr_timestamp += 1; if create_send_msg_job(context, new_msg_id).await?.is_some() { - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } created_chats.push(chat_id); @@ -3488,7 +3497,10 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { msg_id: msg.id, }); if create_send_msg_job(context, msg.id).await?.is_some() { - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } } diff --git a/src/config.rs b/src/config.rs index 76dca83a2..035caf314 100644 --- a/src/config.rs +++ b/src/config.rs @@ -443,7 +443,7 @@ impl Context { Config::DeleteDeviceAfter => { let ret = self.sql.set_raw_config(key.as_ref(), value).await; // Interrupt ephemeral loop to delete old messages immediately. - self.interrupt_ephemeral_task().await; + self.scheduler.interrupt_ephemeral_task().await; ret? } Config::Displayname => { diff --git a/src/configure.rs b/src/configure.rs index 48b91a3f7..eb69ad25c 100644 --- a/src/configure.rs +++ b/src/configure.rs @@ -59,7 +59,7 @@ impl Context { /// Configures this account with the currently set parameters. pub async fn configure(&self) -> Result<()> { ensure!( - self.scheduler.read().await.is_none(), + !self.scheduler.is_running().await, "cannot configure, already running" ); ensure!( @@ -469,7 +469,9 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> { ctx.set_config_bool(Config::FetchedExistingMsgs, false) .await?; - ctx.interrupt_inbox(InterruptInfo::new(false)).await; + ctx.scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; progress!(ctx, 940); update_device_chats_handle.await??; diff --git a/src/contact.rs b/src/contact.rs index 1d00a80f0..07430d308 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -1466,7 +1466,10 @@ pub(crate) async fn update_last_seen( > 0 && timestamp > time() - SEEN_RECENTLY_SECONDS { - context.interrupt_recently_seen(contact_id, timestamp).await; + context + .scheduler + .interrupt_recently_seen(contact_id, timestamp) + .await; } Ok(()) } diff --git a/src/context.rs b/src/context.rs index 13479a7ea..9c2eb33e2 100644 --- a/src/context.rs +++ b/src/context.rs @@ -24,7 +24,7 @@ use crate::key::{DcKey, SignedPublicKey}; use crate::login_param::LoginParam; use crate::message::{self, MessageState, MsgId}; use crate::quota::QuotaInfo; -use crate::scheduler::Scheduler; +use crate::scheduler::{IoPausedGuard, SchedulerState}; use crate::sql::Sql; use crate::stock_str::StockStrings; use crate::timesmearing::SmearedTimestamp; @@ -201,7 +201,7 @@ pub struct InnerContext { pub(crate) translated_stockstrings: StockStrings, pub(crate) events: Events, - pub(crate) scheduler: RwLock>, + pub(crate) scheduler: SchedulerState, pub(crate) ratelimit: RwLock, /// Recently loaded quota information, if any. @@ -370,7 +370,7 @@ impl Context { wrong_pw_warning_mutex: Mutex::new(()), translated_stockstrings: stockstrings, events, - scheduler: RwLock::new(None), + scheduler: SchedulerState::new(), ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow to send 6 messages immediately, no more than once every 10 seconds. quota: RwLock::new(None), quota_update_request: AtomicBool::new(false), @@ -395,42 +395,33 @@ impl Context { warn!(self, "can not start io on a context that is not configured"); return; } - - info!(self, "starting IO"); - let mut lock = self.inner.scheduler.write().await; - if lock.is_none() { - match Scheduler::start(self.clone()).await { - Err(err) => error!(self, "Failed to start IO: {:#}", err), - Ok(scheduler) => *lock = Some(scheduler), - } - } + self.scheduler.start(self.clone()).await; } /// Stops the IO scheduler. pub async fn stop_io(&self) { - // Sending an event wakes up event pollers (get_next_event) - // so the caller of stop_io() can arrange for proper termination. - // For this, the caller needs to instruct the event poller - // to terminate on receiving the next event and then call stop_io() - // which will emit the below event(s) - info!(self, "stopping IO"); - if let Some(debug_logging) = self.debug_logging.read().await.as_ref() { - debug_logging.loop_handle.abort(); - } - if let Some(scheduler) = self.inner.scheduler.write().await.take() { - scheduler.stop(self).await; - } + self.scheduler.stop(self).await; } /// Restarts the IO scheduler if it was running before /// when it is not running this is an no-op pub async fn restart_io_if_running(&self) { - info!(self, "restarting IO"); - let is_running = { self.inner.scheduler.read().await.is_some() }; - if is_running { - self.stop_io().await; - self.start_io().await; - } + self.scheduler.restart(self).await; + } + + /// Pauses the IO scheduler. + /// + /// This temporarily pauses the IO scheduler and will make sure calls to + /// [`Context::start_io`] are no-ops while being paused. + /// + /// It is recommended to call [`IoPausedGuard::resume`] rather than simply dropping it. + pub(crate) async fn pause_io(&self) -> IoPausedGuard<'_> { + self.scheduler.pause(self).await + } + + /// Indicate that the network likely has come back. + pub async fn maybe_network(&self) { + self.scheduler.maybe_network().await; } /// Returns a reference to the underlying SQL instance. diff --git a/src/ephemeral.rs b/src/ephemeral.rs index f7dadf5d8..75fb1c127 100644 --- a/src/ephemeral.rs +++ b/src/ephemeral.rs @@ -317,7 +317,7 @@ impl MsgId { paramsv![ephemeral_timestamp, ephemeral_timestamp, self], ) .await?; - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } Ok(()) } @@ -345,7 +345,7 @@ pub(crate) async fn start_ephemeral_timers_msgids( ) .await?; if count > 0 { - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } Ok(()) } diff --git a/src/imap.rs b/src/imap.rs index 91f6ae9cc..6e3437fc0 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -475,7 +475,7 @@ impl Imap { // Note that the `Config::DeleteDeviceAfter` timer starts as soon as the messages are // fetched while the per-chat ephemeral timers start as soon as the messages are marked // as noticed. - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } let session = self @@ -2224,7 +2224,10 @@ pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) paramsv![message_id], ) .await?; - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } diff --git a/src/imex.rs b/src/imex.rs index 7ca3bb15c..af02ba1d8 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -86,13 +86,17 @@ pub async fn imex( ) -> Result<()> { let cancel = context.alloc_ongoing().await?; - let res = imex_inner(context, what, path, passphrase) - .race(async { - cancel.recv().await.ok(); - Err(format_err!("canceled")) - }) - .await; - + let res = { + let mut guard = context.pause_io().await; + let res = imex_inner(context, what, path, passphrase) + .race(async { + cancel.recv().await.ok(); + Err(format_err!("canceled")) + }) + .await; + guard.resume().await; + res + }; context.free_ongoing().await; if let Err(err) = res.as_ref() { @@ -413,7 +417,7 @@ async fn import_backup( "Cannot import backups to accounts in use." ); ensure!( - context.scheduler.read().await.is_none(), + !context.scheduler.is_running().await, "cannot import backup, IO is running" ); @@ -523,7 +527,7 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res sql::housekeeping(context).await.ok_or_log(context); ensure!( - context.scheduler.read().await.is_none(), + !context.scheduler.is_running().await, "cannot export backup, IO is running" ); diff --git a/src/job.rs b/src/job.rs index 25f3814a2..029272d17 100644 --- a/src/job.rs +++ b/src/job.rs @@ -238,6 +238,7 @@ fn get_backoff_time_offset(tries: u32) -> i64 { pub(crate) async fn schedule_resync(context: &Context) -> Result<()> { context.resync_request.store(true, Ordering::Relaxed); context + .scheduler .interrupt_inbox(InterruptInfo { probe_network: false, }) @@ -250,7 +251,10 @@ pub async fn add(context: &Context, job: Job) -> Result<()> { job.save(context).await.context("failed to save job")?; info!(context, "interrupt: imap"); - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } diff --git a/src/location.rs b/src/location.rs index 52315585b..375c40662 100644 --- a/src/location.rs +++ b/src/location.rs @@ -267,7 +267,7 @@ pub async fn send_locations_to_chat( } context.emit_event(EventType::ChatModified(chat_id)); if 0 != seconds { - context.interrupt_location().await; + context.scheduler.interrupt_location().await; } Ok(()) } diff --git a/src/message.rs b/src/message.rs index b287e6c57..415618f08 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1419,7 +1419,10 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { } // Interrupt Inbox loop to start message deletion and run housekeeping. - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } @@ -1531,7 +1534,10 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec) -> Result<()> ) .await .context("failed to insert into smtp_mdns")?; - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } updated_chat_ids.insert(curr_chat_id); diff --git a/src/quota.rs b/src/quota.rs index b4a2be225..f192fd1d7 100644 --- a/src/quota.rs +++ b/src/quota.rs @@ -115,7 +115,9 @@ impl Context { let requested = self.quota_update_request.swap(true, Ordering::Relaxed); if !requested { // Quota update was not requested before. - self.interrupt_inbox(InterruptInfo::new(false)).await; + self.scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; } Ok(()) } diff --git a/src/scheduler.rs b/src/scheduler.rs index daa542940..3970200b6 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -5,6 +5,7 @@ use anyhow::{bail, Context as _, Result}; use async_channel::{self as channel, Receiver, Sender}; use futures::future::try_join_all; use futures_lite::FutureExt; +use tokio::sync::{RwLock, RwLockWriteGuard}; use tokio::task; use self::connectivity::ConnectivityStore; @@ -23,6 +24,208 @@ use crate::tools::{duration_to_str, maybe_add_time_based_warnings}; pub(crate) mod connectivity; +/// State of the IO scheduler, as stored on the [`Context`]. +/// +/// The IO scheduler can be stopped or started, but core can also pause it. After pausing +/// the IO scheduler will be restarted only if it was running before paused or +/// [`Context::start_io`] was called in the meantime while it was paused. +#[derive(Debug, Default)] +pub(crate) struct SchedulerState { + inner: RwLock, +} + +impl SchedulerState { + pub(crate) fn new() -> Self { + Default::default() + } + + /// Whether the scheduler is currently running. + pub(crate) async fn is_running(&self) -> bool { + let inner = self.inner.read().await; + inner.scheduler.is_some() + } + + /// Starts the scheduler if it is not yet started. + pub(crate) async fn start(&self, context: Context) { + let mut inner = self.inner.write().await; + inner.started = true; + if inner.scheduler.is_none() { + Self::do_start(inner, context).await; + } + } + + /// Starts the scheduler if it is not yet started. + async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) { + info!(context, "starting IO"); + let ctx = context.clone(); + match Scheduler::start(context).await { + Ok(scheduler) => inner.scheduler = Some(scheduler), + Err(err) => error!(&ctx, "Failed to start IO: {:#}", err), + } + } + + /// Stops the scheduler if it is currently running. + pub(crate) async fn stop(&self, context: &Context) { + let mut inner = self.inner.write().await; + inner.started = false; + Self::do_stop(inner, context).await; + } + + /// Stops the scheduler if it is currently running. + async fn do_stop(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: &Context) { + // Sending an event wakes up event pollers (get_next_event) + // so the caller of stop_io() can arrange for proper termination. + // For this, the caller needs to instruct the event poller + // to terminate on receiving the next event and then call stop_io() + // which will emit the below event(s) + info!(context, "stopping IO"); + if let Some(debug_logging) = context.debug_logging.read().await.as_ref() { + debug_logging.loop_handle.abort(); + } + if let Some(scheduler) = inner.scheduler.take() { + scheduler.stop(context).await; + } + } + + /// Pauses the scheduler. + /// + /// If it is currently running the scheduler will be stopped. When + /// [`IoPausedGuard::resume`] is called the scheduler is started again. If in the + /// meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called resume will + /// do the right thing. + pub(crate) async fn pause<'a>(&'_ self, context: &'a Context) -> IoPausedGuard<'a> { + let mut inner = self.inner.write().await; + inner.paused = true; + Self::do_stop(inner, context).await; + IoPausedGuard { + context, + done: false, + } + } + + /// Restarts the scheduler, only if it is running. + pub(crate) async fn restart(&self, context: &Context) { + info!(context, "restarting IO"); + if self.is_running().await { + self.stop(context).await; + self.start(context.clone()).await; + } + } + + /// Indicate that the network likely has come back. + pub(crate) async fn maybe_network(&self) { + let inner = self.inner.read().await; + let (inbox, oboxes) = match inner.scheduler { + Some(ref scheduler) => { + scheduler.maybe_network(); + let inbox = scheduler.inbox.conn_state.state.connectivity.clone(); + let oboxes = scheduler + .oboxes + .iter() + .map(|b| b.conn_state.state.connectivity.clone()) + .collect::>(); + (inbox, oboxes) + } + None => return, + }; + drop(inner); + // TODO: maybe this called code should move into scheduler.maybe_network() instead? + connectivity::idle_interrupted(inbox, oboxes).await; + } + + /// Indicate that the network likely is lost. + pub(crate) async fn maybe_network_lost(&self, context: &Context) { + let inner = self.inner.read().await; + let stores = match inner.scheduler { + Some(ref scheduler) => { + scheduler.maybe_network_lost(); + scheduler + .boxes() + .map(|b| b.conn_state.state.connectivity.clone()) + .collect() + } + None => return, + }; + drop(inner); + // TODO; maybe this called code should move into scheduler.maybe_network_lost() + // instead? + connectivity::maybe_network_lost(context, stores).await; + } + + pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_inbox(info); + } + } + + pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_smtp(info); + } + } + + pub(crate) async fn interrupt_ephemeral_task(&self) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_ephemeral_task(); + } + } + + pub(crate) async fn interrupt_location(&self) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_location(); + } + } + + pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_recently_seen(contact_id, timestamp); + } + } +} + +#[derive(Debug, Default)] +struct InnerSchedulerState { + scheduler: Option, + started: bool, + paused: bool, +} + +#[derive(Debug)] +pub(crate) struct IoPausedGuard<'a> { + context: &'a Context, + done: bool, +} + +impl<'a> IoPausedGuard<'a> { + pub(crate) async fn resume(&mut self) { + self.done = true; + let inner = self.context.scheduler.inner.write().await; + if inner.started && inner.scheduler.is_none() { + SchedulerState::do_start(inner, self.context.clone()).await; + } + } +} + +impl<'a> Drop for IoPausedGuard<'a> { + fn drop(&mut self) { + if self.done { + return; + } + let context = self.context.clone(); + tokio::spawn(async move { + let inner = context.scheduler.inner.write().await; + if inner.started && inner.scheduler.is_none() { + SchedulerState::do_start(inner, context.clone()).await; + } + }); + } +} + #[derive(Debug)] struct SchedBox { meaning: FolderMeaning, @@ -46,56 +249,6 @@ pub(crate) struct Scheduler { recently_seen_loop: RecentlySeenLoop, } -impl Context { - /// Indicate that the network likely has come back. - pub async fn maybe_network(&self) { - let lock = self.scheduler.read().await; - if let Some(scheduler) = &*lock { - scheduler.maybe_network(); - } - connectivity::idle_interrupted(lock).await; - } - - /// Indicate that the network likely is lost. - pub async fn maybe_network_lost(&self) { - let lock = self.scheduler.read().await; - if let Some(scheduler) = &*lock { - scheduler.maybe_network_lost(); - } - connectivity::maybe_network_lost(self, lock).await; - } - - pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_inbox(info); - } - } - - pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_smtp(info); - } - } - - pub(crate) async fn interrupt_ephemeral_task(&self) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_ephemeral_task(); - } - } - - pub(crate) async fn interrupt_location(&self) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_location(); - } - } - - pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_recently_seen(contact_id, timestamp); - } - } -} - async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) { use futures::future::FutureExt; diff --git a/src/scheduler/connectivity.rs b/src/scheduler/connectivity.rs index 6ede2074f..91edcad61 100644 --- a/src/scheduler/connectivity.rs +++ b/src/scheduler/connectivity.rs @@ -3,7 +3,7 @@ use std::{iter::once, ops::Deref, sync::Arc}; use anyhow::{anyhow, Result}; use humansize::{format_size, BINARY}; -use tokio::sync::{Mutex, RwLockReadGuard}; +use tokio::sync::Mutex; use crate::events::EventType; use crate::imap::{scan_folders::get_watched_folder_configs, FolderMeaning}; @@ -12,7 +12,7 @@ use crate::quota::{ }; use crate::tools::time; use crate::{context::Context, log::LogExt}; -use crate::{scheduler::Scheduler, stock_str, tools}; +use crate::{stock_str, tools}; #[derive(Debug, Clone, Copy, PartialEq, Eq, EnumProperty, PartialOrd, Ord)] pub enum Connectivity { @@ -156,19 +156,7 @@ impl ConnectivityStore { /// Set all folder states to InterruptingIdle in case they were `Connected` before. /// Called during `dc_maybe_network()` to make sure that `dc_accounts_all_work_done()` /// returns false immediately after `dc_maybe_network()`. -pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option>) { - let (inbox, oboxes) = match &*scheduler { - Some(Scheduler { inbox, oboxes, .. }) => ( - inbox.conn_state.state.connectivity.clone(), - oboxes - .iter() - .map(|b| b.conn_state.state.connectivity.clone()) - .collect::>(), - ), - None => return, - }; - drop(scheduler); - +pub(crate) async fn idle_interrupted(inbox: ConnectivityStore, oboxes: Vec) { let mut connectivity_lock = inbox.0.lock().await; // For the inbox, we also have to set the connectivity to InterruptingIdle if it was // NotConfigured before: If all folders are NotConfigured, dc_get_connectivity() @@ -195,19 +183,7 @@ pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option>, -) { - let stores: Vec<_> = match &*scheduler { - Some(sched) => sched - .boxes() - .map(|b| b.conn_state.state.connectivity.clone()) - .collect(), - None => return, - }; - drop(scheduler); - +pub(crate) async fn maybe_network_lost(context: &Context, stores: Vec) { for store in &stores { let mut connectivity_lock = store.0.lock().await; if !matches!( @@ -249,9 +225,9 @@ impl Context { /// /// If the connectivity changes, a DC_EVENT_CONNECTIVITY_CHANGED will be emitted. pub async fn get_connectivity(&self) -> Connectivity { - let lock = self.scheduler.read().await; - let stores: Vec<_> = match &*lock { - Some(sched) => sched + let lock = self.scheduler.inner.read().await; + let stores: Vec<_> = match lock.scheduler { + Some(ref sched) => sched .boxes() .map(|b| b.conn_state.state.connectivity.clone()) .collect(), @@ -332,9 +308,9 @@ impl Context { // Get the states from the RwLock // ============================================================================================= - let lock = self.scheduler.read().await; - let (folders_states, smtp) = match &*lock { - Some(sched) => ( + let lock = self.scheduler.inner.read().await; + let (folders_states, smtp) = match lock.scheduler { + Some(ref sched) => ( sched .boxes() .map(|b| (b.meaning, b.conn_state.state.connectivity.clone())) @@ -503,9 +479,9 @@ impl Context { /// Returns true if all background work is done. pub async fn all_work_done(&self) -> bool { - let lock = self.scheduler.read().await; - let stores: Vec<_> = match &*lock { - Some(sched) => sched + let lock = self.scheduler.inner.read().await; + let stores: Vec<_> = match lock.scheduler { + Some(ref sched) => sched .boxes() .map(|b| &b.conn_state.state) .chain(once(&sched.smtp.state)) diff --git a/src/webxdc.rs b/src/webxdc.rs index bfaaa8741..d5873df55 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -421,7 +421,9 @@ impl Context { DO UPDATE SET last_serial=excluded.last_serial, descr=excluded.descr", paramsv![instance.id, status_update_serial, status_update_serial, descr], ).await?; - self.interrupt_smtp(InterruptInfo::new(false)).await; + self.scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } Ok(()) } From 097113f01e8b041dd829e7c2b3488e6c389583d2 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 8 Mar 2023 14:39:40 +0100 Subject: [PATCH 09/19] fixup paused flag use --- src/scheduler.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/scheduler.rs b/src/scheduler.rs index 3970200b6..5c2db37ea 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -49,7 +49,7 @@ impl SchedulerState { pub(crate) async fn start(&self, context: Context) { let mut inner = self.inner.write().await; inner.started = true; - if inner.scheduler.is_none() { + if inner.scheduler.is_none() && !inner.paused { Self::do_start(inner, context).await; } } @@ -204,7 +204,8 @@ pub(crate) struct IoPausedGuard<'a> { impl<'a> IoPausedGuard<'a> { pub(crate) async fn resume(&mut self) { self.done = true; - let inner = self.context.scheduler.inner.write().await; + let mut inner = self.context.scheduler.inner.write().await; + inner.paused = false; if inner.started && inner.scheduler.is_none() { SchedulerState::do_start(inner, self.context.clone()).await; } @@ -218,7 +219,8 @@ impl<'a> Drop for IoPausedGuard<'a> { } let context = self.context.clone(); tokio::spawn(async move { - let inner = context.scheduler.inner.write().await; + let mut inner = context.scheduler.inner.write().await; + inner.paused = false; if inner.started && inner.scheduler.is_none() { SchedulerState::do_start(inner, context.clone()).await; } From 32a7e5ed824ccf724c674b99b3674e8c057031a2 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 8 Mar 2023 16:17:14 +0100 Subject: [PATCH 10/19] Remove requirement for stopping io for imex --- deltachat-ffi/deltachat.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index 9208579c1..086f2cd98 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -2100,8 +2100,7 @@ dc_contact_t* dc_get_contact (dc_context_t* context, uint32_t co /** * Import/export things. - * During backup import/export IO must not be started, - * if needed stop IO using dc_accounts_stop_io() or dc_stop_io() first. + * * What to do is defined by the _what_ parameter which may be one of the following: * * - **DC_IMEX_EXPORT_BACKUP** (11) - Export a backup to the directory given as `param1` From 52fa58a3ce8c7695bc033881ebfc9ad6b73365e1 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 8 Mar 2023 16:24:24 +0100 Subject: [PATCH 11/19] No need for jsonrpc to do this manually --- deltachat-jsonrpc/src/api/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/deltachat-jsonrpc/src/api/mod.rs b/deltachat-jsonrpc/src/api/mod.rs index 0f466d1d4..b47ab6a5c 100644 --- a/deltachat-jsonrpc/src/api/mod.rs +++ b/deltachat-jsonrpc/src/api/mod.rs @@ -1325,7 +1325,6 @@ impl CommandApi { passphrase: Option, ) -> Result<()> { let ctx = self.get_context(account_id).await?; - ctx.stop_io().await; let result = imex::imex( &ctx, imex::ImexMode::ExportBackup, @@ -1333,7 +1332,6 @@ impl CommandApi { passphrase, ) .await; - ctx.start_io().await; result } From 2c3b2b8c2dc97ed5ed1a8e69b3ffdf497af4a335 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 9 Mar 2023 15:44:17 +0100 Subject: [PATCH 12/19] move pause to only exist on Scheduler --- src/context.rs | 12 +----------- src/imex.rs | 2 +- src/scheduler.rs | 10 ++++++---- 3 files changed, 8 insertions(+), 16 deletions(-) diff --git a/src/context.rs b/src/context.rs index 9c2eb33e2..682d39f9c 100644 --- a/src/context.rs +++ b/src/context.rs @@ -24,7 +24,7 @@ use crate::key::{DcKey, SignedPublicKey}; use crate::login_param::LoginParam; use crate::message::{self, MessageState, MsgId}; use crate::quota::QuotaInfo; -use crate::scheduler::{IoPausedGuard, SchedulerState}; +use crate::scheduler::SchedulerState; use crate::sql::Sql; use crate::stock_str::StockStrings; use crate::timesmearing::SmearedTimestamp; @@ -409,16 +409,6 @@ impl Context { self.scheduler.restart(self).await; } - /// Pauses the IO scheduler. - /// - /// This temporarily pauses the IO scheduler and will make sure calls to - /// [`Context::start_io`] are no-ops while being paused. - /// - /// It is recommended to call [`IoPausedGuard::resume`] rather than simply dropping it. - pub(crate) async fn pause_io(&self) -> IoPausedGuard<'_> { - self.scheduler.pause(self).await - } - /// Indicate that the network likely has come back. pub async fn maybe_network(&self) { self.scheduler.maybe_network().await; diff --git a/src/imex.rs b/src/imex.rs index af02ba1d8..eeffbe231 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -87,7 +87,7 @@ pub async fn imex( let cancel = context.alloc_ongoing().await?; let res = { - let mut guard = context.pause_io().await; + let mut guard = context.scheduler.pause(context).await; let res = imex_inner(context, what, path, passphrase) .race(async { cancel.recv().await.ok(); diff --git a/src/scheduler.rs b/src/scheduler.rs index 5c2db37ea..9c9600d3a 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -87,12 +87,14 @@ impl SchedulerState { } } - /// Pauses the scheduler. + /// Pauses the IO scheduler. /// /// If it is currently running the scheduler will be stopped. When - /// [`IoPausedGuard::resume`] is called the scheduler is started again. If in the - /// meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called resume will - /// do the right thing. + /// [`IoPausedGuard::resume`] is called the scheduler is started again. + /// + /// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called + /// resume will do the right thing and restore the scheduler to the state requested by + /// the last call. pub(crate) async fn pause<'a>(&'_ self, context: &'a Context) -> IoPausedGuard<'a> { let mut inner = self.inner.write().await; inner.paused = true; From 0079cd47660bbccefcca1f361e53d8581380fc36 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 9 Mar 2023 15:58:58 +0100 Subject: [PATCH 13/19] Add changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5538e6767..0eb34661a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ - Support non-persistent configuration with DELTACHAT_* env - Print deltachat-repl errors with causes. #4166 - Increase MSRV to 1.64. #4167 +- Core takes of stopping and re-starting IO itself where needed, + e.g. during backup creation. It is no longer needed to call + dc_stop_io(). dc_start_io() can now be called at any time without + harm. #4138 ### Fixes - Fix segmentation fault if `dc_context_unref()` is called during From 4bf38c0e2966d02ef5c087b2a76e5aed29d3c765 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Sat, 11 Mar 2023 12:32:33 +0100 Subject: [PATCH 14/19] clippy --- deltachat-jsonrpc/src/api/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/deltachat-jsonrpc/src/api/mod.rs b/deltachat-jsonrpc/src/api/mod.rs index b47ab6a5c..cc0ad3b01 100644 --- a/deltachat-jsonrpc/src/api/mod.rs +++ b/deltachat-jsonrpc/src/api/mod.rs @@ -1325,14 +1325,13 @@ impl CommandApi { passphrase: Option, ) -> Result<()> { let ctx = self.get_context(account_id).await?; - let result = imex::imex( + imex::imex( &ctx, imex::ImexMode::ExportBackup, destination.as_ref(), passphrase, ) - .await; - result + .await } async fn import_backup( From a2e7d914a0fb31c26891a669463a4834c2b2e168 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 19 Mar 2023 09:37:09 +0000 Subject: [PATCH 15/19] Changelog fixup --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0eb34661a..1fb0f71eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ - Support non-persistent configuration with DELTACHAT_* env - Print deltachat-repl errors with causes. #4166 - Increase MSRV to 1.64. #4167 -- Core takes of stopping and re-starting IO itself where needed, +- Core takes care of stopping and re-starting IO itself where needed, e.g. during backup creation. It is no longer needed to call dc_stop_io(). dc_start_io() can now be called at any time without harm. #4138 From 81418d8ee55630984cb2ae6ceec5e504e1c5fc33 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 19 Mar 2023 10:13:59 +0000 Subject: [PATCH 16/19] Log error on pause guard drop without resuming instead of working around I checked that tests still pass even if error! is replaced with panic! --- src/scheduler.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/scheduler.rs b/src/scheduler.rs index 9c9600d3a..e1638fd09 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -219,14 +219,9 @@ impl<'a> Drop for IoPausedGuard<'a> { if self.done { return; } - let context = self.context.clone(); - tokio::spawn(async move { - let mut inner = context.scheduler.inner.write().await; - inner.paused = false; - if inner.started && inner.scheduler.is_none() { - SchedulerState::do_start(inner, context.clone()).await; - } - }); + + // Async .resume() should be called manually due to lack of async drop. + error!(self.context, "Pause guard dropped without resuming."); } } From 3177f9967deb0c0ff0ee913a84a2d8ca64fc9285 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 19 Mar 2023 10:16:27 +0000 Subject: [PATCH 17/19] Add a comment aronud IMAP loop task handle --- src/scheduler.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/scheduler.rs b/src/scheduler.rs index e1638fd09..41f005434 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -229,6 +229,8 @@ impl<'a> Drop for IoPausedGuard<'a> { struct SchedBox { meaning: FolderMeaning, conn_state: ImapConnectionState, + + /// IMAP loop task handle. handle: task::JoinHandle<()>, } From 17de3d323642e0ae2d126bca88b6ff4e146abb16 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 19 Mar 2023 10:17:18 +0000 Subject: [PATCH 18/19] Remove TODOs --- src/scheduler.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/scheduler.rs b/src/scheduler.rs index 41f005434..b57cda371 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -131,7 +131,6 @@ impl SchedulerState { None => return, }; drop(inner); - // TODO: maybe this called code should move into scheduler.maybe_network() instead? connectivity::idle_interrupted(inbox, oboxes).await; } @@ -149,8 +148,6 @@ impl SchedulerState { None => return, }; drop(inner); - // TODO; maybe this called code should move into scheduler.maybe_network_lost() - // instead? connectivity::maybe_network_lost(context, stores).await; } From e39429c2e35a794b3f78cdc0ee69df310203f79e Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 19 Mar 2023 10:18:49 +0000 Subject: [PATCH 19/19] rustfmt --- src/scheduler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scheduler.rs b/src/scheduler.rs index b57cda371..d5571f835 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -216,7 +216,7 @@ impl<'a> Drop for IoPausedGuard<'a> { if self.done { return; } - + // Async .resume() should be called manually due to lack of async drop. error!(self.context, "Pause guard dropped without resuming."); }