Merge tag 'v1.126.1'

Release 1.126.1
This commit is contained in:
link2xt
2023-10-24 14:31:39 +00:00
24 changed files with 386 additions and 665 deletions

View File

@@ -26,25 +26,16 @@ jobs:
steps:
- uses: actions/checkout@v3
# Python 3.11 is needed for tomllib used in scripts/wheel-rpc-server.py
- name: Install python 3.12
uses: actions/setup-python@v4
with:
python-version: 3.12
- name: Install ziglang and wheel
- name: Install ziglang
run: pip install wheel ziglang==0.11.0
- name: Build deltachat-rpc-server binaries
run: sh scripts/zig-rpc-server.sh
- name: Build deltachat-rpc-server Python wheels and source package
run: scripts/wheel-rpc-server.py
- name: Upload dist directory
- name: Upload dist directory with Linux binaries
uses: actions/upload-artifact@v3
with:
name: dist
name: linux
path: dist/
if-no-files-found: error
@@ -83,53 +74,87 @@ jobs:
build_macos:
name: Build deltachat-rpc-server for macOS
strategy:
fail-fast: false
matrix:
include:
- arch: x86_64
- arch: aarch64
runs-on: macos-latest
steps:
- uses: actions/checkout@v3
- name: Setup rust target
run: rustup target add x86_64-apple-darwin
run: rustup target add ${{ matrix.arch }}-apple-darwin
- name: Build
run: cargo build --release --package deltachat-rpc-server --target x86_64-apple-darwin --features vendored
run: cargo build --release --package deltachat-rpc-server --target ${{ matrix.arch }}-apple-darwin --features vendored
- name: Upload binary
uses: actions/upload-artifact@v3
with:
name: deltachat-rpc-server-x86_64-macos
path: target/x86_64-apple-darwin/release/deltachat-rpc-server
name: deltachat-rpc-server-${{ matrix.arch }}-macos
path: target/${{ matrix.arch }}-apple-darwin/release/deltachat-rpc-server
if-no-files-found: error
publish:
name: Upload binaries to the release
name: Build wheels and upload binaries to the release
needs: ["build_linux", "build_windows", "build_macos"]
permissions:
contents: write
runs-on: "ubuntu-latest"
steps:
- uses: actions/checkout@v3
- name: Download Linux binaries
uses: actions/download-artifact@v3
with:
name: dist
name: linux
path: dist/
- name: Download win32 binary
uses: actions/download-artifact@v3
with:
name: deltachat-rpc-server-win32.exe
path: dist/deltachat-rpc-server-win32.exe
path: deltachat-rpc-server-win32.exe.d
- name: Download win64 binary
uses: actions/download-artifact@v3
with:
name: deltachat-rpc-server-win64.exe
path: dist/deltachat-rpc-server-win32.exe
path: deltachat-rpc-server-win64.exe.d
- name: Download macOS binary
- name: Download macOS binary for x86_64
uses: actions/download-artifact@v3
with:
name: deltachat-rpc-server-x86_64-macos
path: dist/deltachat-rpc-server-x86_64-macos
path: deltachat-rpc-server-x86_64-macos.d
- name: Download macOS binary for aarch64
uses: actions/download-artifact@v3
with:
name: deltachat-rpc-server-aarch64-macos
path: deltachat-rpc-server-aarch64-macos.d
- name: Flatten dist/ directory
run: |
mv deltachat-rpc-server-win32.exe.d/deltachat-rpc-server.exe dist/deltachat-rpc-server-win32.exe
mv deltachat-rpc-server-win64.exe.d/deltachat-rpc-server.exe dist/deltachat-rpc-server-win64.exe
mv deltachat-rpc-server-x86_64-macos.d/deltachat-rpc-server dist/deltachat-rpc-server-x86_64-macos
mv deltachat-rpc-server-aarch64-macos.d/deltachat-rpc-server dist/deltachat-rpc-server-aarch64-macos
# Python 3.11 is needed for tomllib used in scripts/wheel-rpc-server.py
- name: Install python 3.12
uses: actions/setup-python@v4
with:
python-version: 3.12
- name: Install wheel
run: pip install wheel
- name: Build deltachat-rpc-server Python wheels and source package
run: scripts/wheel-rpc-server.py
- name: List downloaded artifacts
run: ls -l dist/

View File

@@ -1,5 +1,29 @@
# Changelog
## [1.126.1] - 2023-10-24
### Fixes
- Do not hardcode version in deltachat-rpc-server source package.
- Do not interrupt IMAP loop from `get_connectivity_html()`.
### Features / Changes
- imap: Buffer `STARTTLS` command.
### Build system
- Build `deltachat-rpc-server` binary for aarch64 macOS.
- Build `deltachat-rpc-server` wheels for macOS and Windows.
### Refactor
- Remove job queue.
### Miscellaneous Tasks
- cargo: Update `ahash` to make `cargo-deny` happy.
## [1.126.0] - 2023-10-22
### API-Changes

43
Cargo.lock generated
View File

@@ -43,13 +43,14 @@ dependencies = [
[[package]]
name = "ahash"
version = "0.8.3"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f"
checksum = "cd7d5a2cecb58716e47d67d5703a249964b14c7be1ec3cad3affc295b2d1c35d"
dependencies = [
"cfg-if",
"once_cell",
"version_check",
"zerocopy",
]
[[package]]
@@ -1085,7 +1086,7 @@ dependencies = [
[[package]]
name = "deltachat"
version = "1.126.0"
version = "1.126.1"
dependencies = [
"ansi_term",
"anyhow",
@@ -1162,7 +1163,7 @@ dependencies = [
[[package]]
name = "deltachat-jsonrpc"
version = "1.126.0"
version = "1.126.1"
dependencies = [
"anyhow",
"async-channel",
@@ -1186,7 +1187,7 @@ dependencies = [
[[package]]
name = "deltachat-repl"
version = "1.126.0"
version = "1.126.1"
dependencies = [
"ansi_term",
"anyhow",
@@ -1201,7 +1202,7 @@ dependencies = [
[[package]]
name = "deltachat-rpc-server"
version = "1.126.0"
version = "1.126.1"
dependencies = [
"anyhow",
"deltachat",
@@ -1226,7 +1227,7 @@ dependencies = [
[[package]]
name = "deltachat_ffi"
version = "1.126.0"
version = "1.126.1"
dependencies = [
"anyhow",
"deltachat",
@@ -2169,9 +2170,9 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hashbrown"
version = "0.14.0"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
checksum = "f93e7192158dbcda357bdec5fb5788eebf8bbac027f3f33e719d29135ae84156"
dependencies = [
"ahash",
"allocator-api2",
@@ -2183,7 +2184,7 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "312f66718a2d7789ffef4f4b7b213138ed9f1eb3aa1d0d82fc99f88fb3ffd26f"
dependencies = [
"hashbrown 0.14.0",
"hashbrown 0.14.2",
]
[[package]]
@@ -2432,7 +2433,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d"
dependencies = [
"equivalent",
"hashbrown 0.14.0",
"hashbrown 0.14.2",
]
[[package]]
@@ -5733,6 +5734,26 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "zerocopy"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a7af71d8643341260a65f89fa60c0eeaa907f34544d8f6d9b0df72f069b5e74"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9731702e2f0617ad526794ae28fbc6f6ca8849b5ba729666c2a5bc4b6ddee2cd"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
]
[[package]]
name = "zeroize"
version = "1.6.0"

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat"
version = "1.126.0"
version = "1.126.1"
edition = "2021"
license = "MPL-2.0"
rust-version = "1.67"

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat_ffi"
version = "1.126.0"
version = "1.126.1"
description = "Deltachat FFI"
edition = "2018"
readme = "README.md"

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat-jsonrpc"
version = "1.126.0"
version = "1.126.1"
description = "DeltaChat JSON-RPC API"
edition = "2021"
default-run = "deltachat-jsonrpc-server"

View File

@@ -55,5 +55,5 @@
},
"type": "module",
"types": "dist/deltachat.d.ts",
"version": "1.126.0"
"version": "1.126.1"
}

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat-repl"
version = "1.126.0"
version = "1.126.1"
license = "MPL-2.0"
edition = "2021"

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat-rpc-server"
version = "1.126.0"
version = "1.126.1"
description = "DeltaChat JSON-RPC server"
edition = "2021"
readme = "README.md"

View File

@@ -60,5 +60,5 @@
"test:mocha": "mocha -r esm node/test/test.js --growl --reporter=spec --bail --exit"
},
"types": "node/dist/index.d.ts",
"version": "1.126.0"
"version": "1.126.1"
}

View File

@@ -1 +1 @@
2023-10-22
2023-10-24

View File

@@ -16,7 +16,36 @@ Summary: Delta Chat JSON-RPC server
"""
SETUP_PY = """
def build_source_package(version):
filename = f"dist/deltachat-rpc-server-{version}.tar.gz"
with tarfile.open(filename, "w:gz") as pkg:
def pack(name, contents):
contents = contents.encode()
tar_info = tarfile.TarInfo(f"deltachat-rpc-server-{version}/{name}")
tar_info.mode = 0o644
tar_info.size = len(contents)
pkg.addfile(tar_info, BytesIO(contents))
pack("PKG-INFO", metadata_contents(version))
pack(
"pyproject.toml",
f"""[build-system]
requires = ["setuptools==68.2.2", "pip"]
build-backend = "setuptools.build_meta"
[project]
name = "deltachat-rpc-server"
version = "{version}"
[project.scripts]
deltachat-rpc-server = "deltachat_rpc_server:main"
""",
)
pack(
"setup.py",
f"""
import sys
from setuptools import setup, find_packages
from distutils.cmd import Command
@@ -45,7 +74,7 @@ class BuildCommand(build):
"--platform",
"musllinux_1_1_" + platform.machine(),
"--only-binary=:all:",
"deltachat-rpc-server",
"deltachat-rpc-server=={version}",
],
cwd=tmpdir,
)
@@ -61,61 +90,45 @@ class BuildCommand(build):
setup(
cmdclass={"build": BuildCommand},
package_data={"deltachat_rpc_server": ["deltachat-rpc-server"]},
cmdclass={{"build": BuildCommand}},
package_data={{"deltachat_rpc_server": ["deltachat-rpc-server"]}},
)
"""
def build_source_package(version):
filename = f"dist/deltachat-rpc-server-{version}.tar.gz"
with tarfile.open(filename, "w:gz") as pkg:
def pack(name, contents):
contents = contents.encode()
tar_info = tarfile.TarInfo(f"deltachat-rpc-server-{version}/{name}")
tar_info.mode = 0o644
tar_info.size = len(contents)
pkg.addfile(tar_info, BytesIO(contents))
pack("PKG-INFO", metadata_contents(version))
pack(
"pyproject.toml",
"""[build-system]
requires = ["setuptools==68.2.2", "pip"]
build-backend = "setuptools.build_meta"
[project]
name = "deltachat-rpc-server"
version = "1.125.0"
[project.scripts]
deltachat-rpc-server = "deltachat_rpc_server:main"
""",
)
pack("setup.py", SETUP_PY)
pack("src/deltachat_rpc_server/__init__.py", "")
def build_wheel(version, binary, tag):
def build_wheel(version, binary, tag, windows=False):
filename = f"dist/deltachat_rpc_server-{version}-{tag}.whl"
with WheelFile(filename, "w") as wheel:
wheel.write("LICENSE", "deltachat_rpc_server/LICENSE")
wheel.write("deltachat-rpc-server/README.md", "deltachat_rpc_server/README.md")
wheel.writestr(
"deltachat_rpc_server/__init__.py",
"""import os, sys
if windows:
wheel.writestr(
"deltachat_rpc_server/__init__.py",
"""import os, sys, subprocess
def main():
argv = [os.path.join(os.path.dirname(__file__), "deltachat-rpc-server.exe"), *sys.argv[1:]]
sys.exit(subprocess.call(argv))
""",
)
else:
wheel.writestr(
"deltachat_rpc_server/__init__.py",
"""import os, sys
def main():
argv = [os.path.join(os.path.dirname(__file__), "deltachat-rpc-server"), *sys.argv[1:]]
os.execv(argv[0], argv)
""",
)
)
Path(binary).chmod(0o755)
wheel.write(
binary,
"deltachat_rpc_server/deltachat-rpc-server",
"deltachat_rpc_server/deltachat-rpc-server.exe"
if windows
else "deltachat_rpc_server/deltachat-rpc-server",
)
wheel.writestr(
f"deltachat_rpc_server-{version}.dist-info/METADATA",
@@ -158,5 +171,27 @@ def main():
"py3-none-manylinux_2_12_i686.manylinux2010_i686.musllinux_1_1_i686",
)
# macOS versions for platform compatibility tags are taken from https://doc.rust-lang.org/rustc/platform-support.html
build_wheel(
version,
"dist/deltachat-rpc-server-x86_64-macos",
"py3-none-macosx_10_7_x86_64",
)
build_wheel(
version,
"dist/deltachat-rpc-server-aarch64-macos",
"py3-none-macosx_11_0_arm64",
)
build_wheel(
version, "dist/deltachat-rpc-server-win32.exe", "py3-none-win32", windows=True
)
build_wheel(
version,
"dist/deltachat-rpc-server-win64.exe",
"py3-none-win_amd64",
windows=True,
)
main()

View File

@@ -26,7 +26,6 @@ use crate::config::Config;
use crate::contact::addr_cmp;
use crate::context::Context;
use crate::imap::Imap;
use crate::job;
use crate::log::LogExt;
use crate::login_param::{CertificateChecks, LoginParam, ServerLoginParam};
use crate::message::{Message, Viewtype};
@@ -466,7 +465,7 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> {
if configured_addr != param.addr {
// Switched account, all server UIDs we know are invalid
info!(ctx, "Scheduling resync because the address has changed.");
job::schedule_resync(ctx).await?;
ctx.schedule_resync().await?;
}
}

View File

@@ -4,7 +4,7 @@ use std::collections::{BTreeMap, HashMap};
use std::ffi::OsString;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
@@ -23,7 +23,7 @@ use crate::key::{load_self_public_key, DcKey as _};
use crate::login_param::LoginParam;
use crate::message::{self, MessageState, MsgId};
use crate::quota::QuotaInfo;
use crate::scheduler::SchedulerState;
use crate::scheduler::{InterruptInfo, SchedulerState};
use crate::sql::Sql;
use crate::stock_str::StockStrings;
use crate::timesmearing::SmearedTimestamp;
@@ -211,9 +211,6 @@ pub struct InnerContext {
/// Set to `None` if quota was never tried to load.
pub(crate) quota: RwLock<Option<QuotaInfo>>,
/// Set to true if quota update is requested.
pub(crate) quota_update_request: AtomicBool,
/// IMAP UID resync request.
pub(crate) resync_request: AtomicBool,
@@ -384,7 +381,6 @@ impl Context {
scheduler: SchedulerState::new(),
ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow at least 1 message every 10 seconds + a burst of 6.
quota: RwLock::new(None),
quota_update_request: AtomicBool::new(false),
resync_request: AtomicBool::new(false),
new_msgs_notify,
server_id: RwLock::new(None),
@@ -426,6 +422,16 @@ impl Context {
self.scheduler.maybe_network().await;
}
pub(crate) async fn schedule_resync(&self) -> Result<()> {
self.resync_request.store(true, Ordering::Relaxed);
self.scheduler
.interrupt_inbox(InterruptInfo {
probe_network: false,
})
.await;
Ok(())
}
/// Returns a reference to the underlying SQL instance.
///
/// Warning: this is only here for testing, not part of the public API.

View File

@@ -10,11 +10,11 @@ use serde::{Deserialize, Serialize};
use crate::config::Config;
use crate::context::Context;
use crate::imap::{Imap, ImapActionResult};
use crate::job::{self, Action, Job, Status};
use crate::message::{Message, MsgId, Viewtype};
use crate::mimeparser::{MimeMessage, Part};
use crate::scheduler::InterruptInfo;
use crate::tools::time;
use crate::{job_try, stock_str, EventType};
use crate::{stock_str, EventType};
/// Download limits should not be used below `MIN_DOWNLOAD_LIMIT`.
///
@@ -90,7 +90,14 @@ impl MsgId {
DownloadState::Available | DownloadState::Failure => {
self.update_download_state(context, DownloadState::InProgress)
.await?;
job::add(context, Job::new(Action::DownloadMsg, self.to_u32())).await?;
context
.sql
.execute("INSERT INTO download (msg_id) VALUES (?)", (self,))
.await?;
context
.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
}
}
Ok(())
@@ -124,59 +131,49 @@ impl Message {
}
}
impl Job {
/// Actually download a message.
/// Called in response to `Action::DownloadMsg`.
pub(crate) async fn download_msg(&self, context: &Context, imap: &mut Imap) -> Status {
if let Err(err) = imap.prepare(context).await {
warn!(context, "download: could not connect: {:#}", err);
return Status::RetryNow;
}
/// Actually download a message partially downloaded before.
///
/// Most messages are downloaded automatically on fetch instead.
pub(crate) async fn download_msg(context: &Context, msg_id: MsgId, imap: &mut Imap) -> Result<()> {
imap.prepare(context).await?;
let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await);
let row = job_try!(
context
.sql
.query_row_optional(
"SELECT uid, folder FROM imap WHERE rfc724_mid=? AND target=folder",
(&msg.rfc724_mid,),
|row| {
let server_uid: u32 = row.get(0)?;
let server_folder: String = row.get(1)?;
Ok((server_uid, server_folder))
}
)
.await
);
let msg = Message::load_from_db(context, msg_id).await?;
let row = context
.sql
.query_row_optional(
"SELECT uid, folder FROM imap WHERE rfc724_mid=? AND target!=''",
(&msg.rfc724_mid,),
|row| {
let server_uid: u32 = row.get(0)?;
let server_folder: String = row.get(1)?;
Ok((server_uid, server_folder))
},
)
.await?;
if let Some((server_uid, server_folder)) = row {
match imap
.fetch_single_msg(context, &server_folder, server_uid, msg.rfc724_mid.clone())
.await
{
ImapActionResult::RetryLater | ImapActionResult::Failed => {
job_try!(
msg.id
.update_download_state(context, DownloadState::Failure)
.await
);
Status::Finished(Err(anyhow!("Call download_full() again to try over.")))
}
ImapActionResult::Success => {
// update_download_state() not needed as receive_imf() already
// set the state and emitted the event.
Status::Finished(Ok(()))
}
}
} else {
// No IMAP record found, we don't know the UID and folder.
job_try!(
if let Some((server_uid, server_folder)) = row {
match imap
.fetch_single_msg(context, &server_folder, server_uid, msg.rfc724_mid.clone())
.await
{
ImapActionResult::RetryLater | ImapActionResult::Failed => {
msg.id
.update_download_state(context, DownloadState::Failure)
.await
);
Status::Finished(Err(anyhow!("Call download_full() again to try over.")))
.await?;
Err(anyhow!("Call download_full() again to try over."))
}
ImapActionResult::Success => {
// update_download_state() not needed as receive_imf() already
// set the state and emitted the event.
Ok(())
}
}
} else {
// No IMAP record found, we don't know the UID and folder.
msg.id
.update_download_state(context, DownloadState::Failure)
.await?;
Err(anyhow!("Call download_full() again to try over."))
}
}

View File

@@ -26,7 +26,6 @@ use crate::contact::{normalize_name, Contact, ContactAddress, ContactId, Modifie
use crate::context::Context;
use crate::events::EventType;
use crate::headerdef::{HeaderDef, HeaderDefMap};
use crate::job;
use crate::login_param::{CertificateChecks, LoginParam, ServerLoginParam};
use crate::message::{self, Message, MessageState, MessengerMessage, MsgId, Viewtype};
use crate::mimeparser;
@@ -614,7 +613,7 @@ impl Imap {
"The server illegally decreased the uid_next of folder {folder:?} from {old_uid_next} to {uid_next} without changing validity ({new_uid_validity}), resyncing UIDs...",
);
set_uid_next(context, folder, uid_next).await?;
job::schedule_resync(context).await?;
context.schedule_resync().await?;
}
uid_next != old_uid_next // If uid_next changed, there are new emails
} else {
@@ -678,7 +677,7 @@ impl Imap {
.await?;
if old_uid_validity != 0 || old_uid_next != 0 {
job::schedule_resync(context).await?;
context.schedule_resync().await?;
}
info!(
context,

View File

@@ -15,6 +15,7 @@ use crate::net::connect_tcp;
use crate::net::session::SessionStream;
use crate::net::tls::wrap_tls;
use crate::socks::Socks5Config;
use fast_socks5::client::Socks5Stream;
/// IMAP write and read timeout.
pub(crate) const IMAP_TIMEOUT: Duration = Duration::from_secs(30);
@@ -64,6 +65,12 @@ async fn determine_capabilities(
}
impl Client {
fn new(stream: Box<dyn SessionStream>) -> Self {
Self {
inner: ImapClient::new(stream),
}
}
pub(crate) async fn login(self, username: &str, password: &str) -> Result<Session> {
let Client { inner, .. } = self;
let mut session = inner
@@ -98,27 +105,24 @@ impl Client {
let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream).await?;
let buffered_stream = BufWriter::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let mut client = ImapClient::new(session_stream);
let mut client = Client::new(session_stream);
let _greeting = client
.read_response()
.await
.context("failed to read greeting")??;
Ok(Client { inner: client })
Ok(client)
}
pub async fn connect_insecure(context: &Context, hostname: &str, port: u16) -> Result<Self> {
let tcp_stream = connect_tcp(context, hostname, port, IMAP_TIMEOUT, false).await?;
let buffered_stream = BufWriter::new(tcp_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let mut client = ImapClient::new(session_stream);
let mut client = Client::new(session_stream);
let _greeting = client
.read_response()
.await
.context("failed to read greeting")??;
Ok(Client { inner: client })
Ok(client)
}
pub async fn connect_starttls(
@@ -130,7 +134,8 @@ impl Client {
let tcp_stream = connect_tcp(context, hostname, port, IMAP_TIMEOUT, strict_tls).await?;
// Run STARTTLS command and convert the client back into a stream.
let mut client = ImapClient::new(tcp_stream);
let buffered_tcp_stream = BufWriter::new(tcp_stream);
let mut client = ImapClient::new(buffered_tcp_stream);
let _greeting = client
.read_response()
.await
@@ -139,7 +144,8 @@ impl Client {
.run_command_and_check_ok("STARTTLS", None)
.await
.context("STARTTLS command failed")?;
let tcp_stream = client.into_inner();
let buffered_tcp_stream = client.into_inner();
let tcp_stream = buffered_tcp_stream.into_inner();
let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream)
.await
@@ -147,9 +153,8 @@ impl Client {
let buffered_stream = BufWriter::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let client = ImapClient::new(session_stream);
Ok(Client { inner: client })
let client = Client::new(session_stream);
Ok(client)
}
pub async fn connect_secure_socks5(
@@ -165,13 +170,12 @@ impl Client {
let tls_stream = wrap_tls(strict_tls, domain, socks5_stream).await?;
let buffered_stream = BufWriter::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let mut client = ImapClient::new(session_stream);
let mut client = Client::new(session_stream);
let _greeting = client
.read_response()
.await
.context("failed to read greeting")??;
Ok(Client { inner: client })
Ok(client)
}
pub async fn connect_insecure_socks5(
@@ -185,13 +189,12 @@ impl Client {
.await?;
let buffered_stream = BufWriter::new(socks5_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let mut client = ImapClient::new(session_stream);
let mut client = Client::new(session_stream);
let _greeting = client
.read_response()
.await
.context("failed to read greeting")??;
Ok(Client { inner: client })
Ok(client)
}
pub async fn connect_starttls_socks5(
@@ -206,7 +209,8 @@ impl Client {
.await?;
// Run STARTTLS command and convert the client back into a stream.
let mut client = ImapClient::new(socks5_stream);
let buffered_socks5_stream = BufWriter::new(socks5_stream);
let mut client = ImapClient::new(buffered_socks5_stream);
let _greeting = client
.read_response()
.await
@@ -215,15 +219,15 @@ impl Client {
.run_command_and_check_ok("STARTTLS", None)
.await
.context("STARTTLS command failed")?;
let socks5_stream = client.into_inner();
let buffered_socks5_stream = client.into_inner();
let socks5_stream: Socks5Stream<_> = buffered_socks5_stream.into_inner();
let tls_stream = wrap_tls(strict_tls, hostname, socks5_stream)
.await
.context("STARTTLS upgrade failed")?;
let buffered_stream = BufWriter::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let client = ImapClient::new(session_stream);
Ok(Client { inner: client })
let client = Client::new(session_stream);
Ok(client)
}
}

View File

@@ -1,390 +0,0 @@
//! # Job module.
//!
//! This module implements a job queue maintained in the SQLite database
//! and job types.
#![allow(missing_docs)]
use std::fmt;
use std::sync::atomic::Ordering;
use anyhow::{Context as _, Result};
use deltachat_derive::{FromSql, ToSql};
use rand::{thread_rng, Rng};
use crate::context::Context;
use crate::imap::Imap;
use crate::scheduler::InterruptInfo;
use crate::tools::time;
// results in ~3 weeks for the last backoff timespan
const JOB_RETRIES: u32 = 17;
/// Job try result.
#[derive(Debug, Display)]
pub enum Status {
Finished(Result<()>),
RetryNow,
}
#[macro_export]
macro_rules! job_try {
($expr:expr) => {
match $expr {
std::result::Result::Ok(val) => val,
std::result::Result::Err(err) => {
return $crate::job::Status::Finished(Err(err.into()));
}
}
};
($expr:expr,) => {
$crate::job_try!($expr)
};
}
#[derive(
Debug,
Display,
Copy,
Clone,
PartialEq,
Eq,
PartialOrd,
FromPrimitive,
ToPrimitive,
FromSql,
ToSql,
)]
#[repr(u32)]
pub enum Action {
// This job will download partially downloaded messages completely
// and is added when download_full() is called.
// Most messages are downloaded automatically on fetch
// and do not go through this job.
DownloadMsg = 250,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Job {
pub job_id: u32,
pub action: Action,
pub foreign_id: u32,
pub desired_timestamp: i64,
pub added_timestamp: i64,
pub tries: u32,
}
impl fmt::Display for Job {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "#{}, action {}", self.job_id, self.action)
}
}
impl Job {
pub fn new(action: Action, foreign_id: u32) -> Self {
let timestamp = time();
Self {
job_id: 0,
action,
foreign_id,
desired_timestamp: timestamp,
added_timestamp: timestamp,
tries: 0,
}
}
/// Deletes the job from the database.
async fn delete(self, context: &Context) -> Result<()> {
if self.job_id != 0 {
context
.sql
.execute("DELETE FROM jobs WHERE id=?;", (self.job_id as i32,))
.await?;
}
Ok(())
}
/// Saves the job to the database, creating a new entry if necessary.
///
/// The Job is consumed by this method.
pub(crate) async fn save(self, context: &Context) -> Result<()> {
info!(context, "saving job {:?}", self);
if self.job_id != 0 {
context
.sql
.execute(
"UPDATE jobs SET desired_timestamp=?, tries=? WHERE id=?;",
(
self.desired_timestamp,
i64::from(self.tries),
self.job_id as i32,
),
)
.await?;
} else {
context.sql.execute(
"INSERT INTO jobs (added_timestamp, action, foreign_id, desired_timestamp) VALUES (?,?,?,?);",
(
self.added_timestamp,
self.action,
self.foreign_id,
self.desired_timestamp
)
).await?;
}
Ok(())
}
}
pub(crate) enum Connection<'a> {
Inbox(&'a mut Imap),
}
impl<'a> Connection<'a> {
fn inbox(&mut self) -> &mut Imap {
match self {
Connection::Inbox(imap) => imap,
}
}
}
pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_>, mut job: Job) {
info!(context, "Job {} started...", &job);
let try_res = match perform_job_action(context, &job, &mut connection, 0).await {
Status::RetryNow => perform_job_action(context, &job, &mut connection, 1).await,
x => x,
};
match try_res {
Status::RetryNow => {
let tries = job.tries + 1;
if tries < JOB_RETRIES {
info!(context, "Increase job {job} tries to {tries}.");
job.tries = tries;
let time_offset = get_backoff_time_offset(tries);
job.desired_timestamp = time() + time_offset;
info!(
context,
"job #{} not succeeded on try #{}, retry in {} seconds.",
job.job_id,
tries,
time_offset
);
job.save(context).await.unwrap_or_else(|err| {
error!(context, "Failed to save job: {err:#}.");
});
} else {
info!(
context,
"Remove job {job} as it exhausted {JOB_RETRIES} retries."
);
job.delete(context).await.unwrap_or_else(|err| {
error!(context, "Failed to delete job: {err:#}.");
});
}
}
Status::Finished(res) => {
if let Err(err) = res {
warn!(context, "Remove job {job} as it failed with error {err:#}.");
} else {
info!(context, "Remove job {job} as it succeeded.");
}
job.delete(context).await.unwrap_or_else(|err| {
error!(context, "failed to delete job: {:#}", err);
});
}
}
}
async fn perform_job_action(
context: &Context,
job: &Job,
connection: &mut Connection<'_>,
tries: u32,
) -> Status {
info!(context, "Begin immediate try {tries} of job {job}.");
let try_res = match job.action {
Action::DownloadMsg => job.download_msg(context, connection.inbox()).await,
};
info!(context, "Finished immediate try {tries} of job {job}.");
try_res
}
fn get_backoff_time_offset(tries: u32) -> i64 {
// Exponential backoff
let n = 2_i32.pow(tries - 1) * 60;
let mut rng = thread_rng();
let r: i32 = rng.gen();
let mut seconds = r % (n + 1);
if seconds < 1 {
seconds = 1;
}
i64::from(seconds)
}
pub(crate) async fn schedule_resync(context: &Context) -> Result<()> {
context.resync_request.store(true, Ordering::Relaxed);
context
.scheduler
.interrupt_inbox(InterruptInfo {
probe_network: false,
})
.await;
Ok(())
}
/// Adds a job to the database, scheduling it.
pub async fn add(context: &Context, job: Job) -> Result<()> {
job.save(context).await.context("failed to save job")?;
info!(context, "Interrupt: IMAP.");
context
.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
Ok(())
}
/// Load jobs from the database.
///
/// The `probe_network` parameter decides how to query
/// jobs, this is tricky and probably wrong currently. Look at the
/// SQL queries for details.
pub(crate) async fn load_next(context: &Context, info: &InterruptInfo) -> Result<Option<Job>> {
info!(context, "Loading job.");
let query;
let params;
let t = time();
if !info.probe_network {
// processing for first-try and after backoff-timeouts:
// process jobs in the order they were added.
query = r#"
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
FROM jobs
WHERE desired_timestamp<=?
ORDER BY action DESC, added_timestamp
LIMIT 1;
"#;
params = vec![t];
} else {
// processing after call to dc_maybe_network():
// process _all_ pending jobs that failed before
// in the order of their backoff-times.
query = r#"
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
FROM jobs
WHERE tries>0
ORDER BY desired_timestamp, action DESC
LIMIT 1;
"#;
params = vec![];
};
loop {
let job_res = context
.sql
.query_row_optional(query, rusqlite::params_from_iter(params.clone()), |row| {
let job = Job {
job_id: row.get("id")?,
action: row.get("action")?,
foreign_id: row.get("foreign_id")?,
desired_timestamp: row.get("desired_timestamp")?,
added_timestamp: row.get("added_timestamp")?,
tries: row.get("tries")?,
};
Ok(job)
})
.await;
match job_res {
Ok(job) => return Ok(job),
Err(err) => {
// Remove invalid job from the DB
info!(context, "Cleaning up job, because of {err:#}.");
// TODO: improve by only doing a single query
let id = context
.sql
.query_row(query, rusqlite::params_from_iter(params.clone()), |row| {
row.get::<_, i32>(0)
})
.await
.context("failed to retrieve invalid job ID from the database")?;
context
.sql
.execute("DELETE FROM jobs WHERE id=?;", (id,))
.await
.with_context(|| format!("failed to delete invalid job {id}"))?;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::TestContext;
async fn insert_job(context: &Context, foreign_id: i64, valid: bool) {
let now = time();
context
.sql
.execute(
"INSERT INTO jobs
(added_timestamp, action, foreign_id, desired_timestamp)
VALUES (?, ?, ?, ?);",
(
now,
if valid {
Action::DownloadMsg as i32
} else {
-1
},
foreign_id,
now,
),
)
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_load_next_job_two() -> Result<()> {
// We want to ensure that loading jobs skips over jobs which
// fails to load from the database instead of failing to load
// all jobs.
let t = TestContext::new().await;
insert_job(&t, 1, false).await; // This can not be loaded into Job struct.
let jobs = load_next(&t, &InterruptInfo::new(false)).await?;
assert!(jobs.is_none());
insert_job(&t, 1, true).await;
let jobs = load_next(&t, &InterruptInfo::new(false)).await?;
assert!(jobs.is_some());
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_load_next_job_one() -> Result<()> {
let t = TestContext::new().await;
insert_job(&t, 1, true).await;
let jobs = load_next(&t, &InterruptInfo::new(false)).await?;
assert!(jobs.is_some());
Ok(())
}
}

View File

@@ -65,10 +65,6 @@ mod e2ee;
pub mod ephemeral;
mod imap;
pub mod imex;
pub mod release;
mod scheduler;
#[macro_use]
mod job;
pub mod key;
pub mod location;
mod login_param;
@@ -83,6 +79,8 @@ pub mod provider;
pub mod qr;
pub mod qr_code_generator;
pub mod quota;
pub mod release;
mod scheduler;
pub mod securejoin;
mod simplify;
mod smtp;

View File

@@ -1,7 +1,6 @@
//! # Support for IMAP QUOTA extension.
use std::collections::BTreeMap;
use std::sync::atomic::Ordering;
use anyhow::{anyhow, Context as _, Result};
use async_imap::types::{Quota, QuotaResource};
@@ -13,7 +12,6 @@ use crate::imap::scan_folders::get_watched_folders;
use crate::imap::session::Session as ImapSession;
use crate::imap::Imap;
use crate::message::{Message, Viewtype};
use crate::scheduler::InterruptInfo;
use crate::tools::time;
use crate::{stock_str, EventType};
@@ -34,17 +32,12 @@ pub const QUOTA_ERROR_THRESHOLD_PERCENTAGE: u64 = 95;
/// providers report bad values and we would then spam the user.
pub const QUOTA_ALLCLEAR_PERCENTAGE: u64 = 75;
/// if recent quota is older,
/// it is re-fetched on dc_get_connectivity_html()
pub const QUOTA_MAX_AGE_SECONDS: i64 = 60;
/// Server quota information with an update timestamp.
#[derive(Debug)]
pub struct QuotaInfo {
/// Recently loaded quota information.
/// set to `Err()` if the provider does not support quota or on other errors,
/// set to `Ok()` for valid quota information.
/// Updated by `Action::UpdateRecentQuota`
pub(crate) recent: Result<BTreeMap<String, Vec<QuotaResource>>>,
/// Timestamp when structure was modified.
@@ -110,18 +103,6 @@ pub fn needs_quota_warning(curr_percentage: u64, warned_at_percentage: u64) -> b
}
impl Context {
// Adds a job to update `quota.recent`
pub(crate) async fn schedule_quota_update(&self) -> Result<()> {
let requested = self.quota_update_request.swap(true, Ordering::Relaxed);
if !requested {
// Quota update was not requested before.
self.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
}
Ok(())
}
/// Updates `quota.recent`, sets `quota.modified` to the current time
/// and emits an event to let the UIs update connectivity view.
///
@@ -130,8 +111,6 @@ impl Context {
/// As the message is added only once, the user is not spammed
/// in case for some providers the quota is always at ~100%
/// and new space is allocated as needed.
///
/// Called in response to `Action::UpdateRecentQuota`.
pub(crate) async fn update_recent_quota(&self, imap: &mut Imap) -> Result<()> {
if let Err(err) = imap.prepare(self).await {
warn!(self, "could not connect: {:#}", err);
@@ -166,9 +145,6 @@ impl Context {
}
}
// Clear the request to update quota.
self.quota_update_request.store(false, Ordering::Relaxed);
*self.quota.write().await = Some(QuotaInfo {
recent: quota,
modified: time(),

View File

@@ -13,16 +13,16 @@ use self::connectivity::ConnectivityStore;
use crate::config::Config;
use crate::contact::{ContactId, RecentlySeenLoop};
use crate::context::Context;
use crate::download::download_msg;
use crate::ephemeral::{self, delete_expired_imap_messages};
use crate::events::EventType;
use crate::imap::{FolderMeaning, Imap};
use crate::job;
use crate::location;
use crate::log::LogExt;
use crate::message::MsgId;
use crate::smtp::{send_smtp_messages, Smtp};
use crate::sql;
use crate::tools::time;
use crate::tools::{duration_to_str, maybe_add_time_based_warnings};
use crate::tools::{duration_to_str, maybe_add_time_based_warnings, time};
pub(crate) mod connectivity;
@@ -323,6 +323,37 @@ pub(crate) struct Scheduler {
recently_seen_loop: RecentlySeenLoop,
}
async fn download_msgs(context: &Context, imap: &mut Imap) -> Result<()> {
let msg_ids = context
.sql
.query_map(
"SELECT msg_id FROM download",
(),
|row| {
let msg_id: MsgId = row.get(0)?;
Ok(msg_id)
},
|rowids| {
rowids
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.await?;
for msg_id in msg_ids {
if let Err(err) = download_msg(context, msg_id, imap).await {
warn!(context, "Failed to download message {msg_id}: {:#}.", err);
}
context
.sql
.execute("DELETE FROM download WHERE msg_id=?", (msg_id,))
.await?;
}
Ok(())
}
async fn inbox_loop(
ctx: Context,
started: oneshot::Sender<()>,
@@ -344,79 +375,76 @@ async fn inbox_loop(
return;
};
let mut info = InterruptInfo::default();
loop {
let job = match job::load_next(&ctx, &info).await {
Err(err) => {
error!(ctx, "Failed loading job from the database: {:#}.", err);
None
}
Ok(job) => job,
};
{
// Update quota no more than once a minute.
let quota_needs_update = {
let quota = ctx.quota.read().await;
quota
.as_ref()
.filter(|quota| quota.modified + 60 > time())
.is_none()
};
match job {
Some(job) => {
job::perform_job(&ctx, job::Connection::Inbox(&mut connection), job).await;
info = Default::default();
}
None => {
let quota_requested = ctx.quota_update_request.swap(false, Ordering::Relaxed);
if quota_requested {
if let Err(err) = ctx.update_recent_quota(&mut connection).await {
warn!(ctx, "Failed to update quota: {:#}.", err);
}
if quota_needs_update {
if let Err(err) = ctx.update_recent_quota(&mut connection).await {
warn!(ctx, "Failed to update quota: {:#}.", err);
}
let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed);
if resync_requested {
if let Err(err) = connection.resync_folders(&ctx).await {
warn!(ctx, "Failed to resync folders: {:#}.", err);
ctx.resync_request.store(true, Ordering::Relaxed);
}
}
maybe_add_time_based_warnings(&ctx).await;
match ctx.get_config_i64(Config::LastHousekeeping).await {
Ok(last_housekeeping_time) => {
let next_housekeeping_time =
last_housekeeping_time.saturating_add(60 * 60 * 24);
if next_housekeeping_time <= time() {
sql::housekeeping(&ctx).await.log_err(&ctx).ok();
}
}
Err(err) => {
warn!(ctx, "Failed to get last housekeeping time: {}", err);
}
};
match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
Ok(fetched_existing_msgs) => {
if !fetched_existing_msgs {
// Consider it done even if we fail.
//
// This operation is not critical enough to retry,
// especially if the error is persistent.
if let Err(err) =
ctx.set_config_bool(Config::FetchedExistingMsgs, true).await
{
warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
}
if let Err(err) = connection.fetch_existing_msgs(&ctx).await {
warn!(ctx, "Failed to fetch existing messages: {:#}", err);
connection.trigger_reconnect(&ctx);
}
}
}
Err(err) => {
warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
}
}
info = fetch_idle(&ctx, &mut connection, FolderMeaning::Inbox).await;
}
}
let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed);
if resync_requested {
if let Err(err) = connection.resync_folders(&ctx).await {
warn!(ctx, "Failed to resync folders: {:#}.", err);
ctx.resync_request.store(true, Ordering::Relaxed);
}
}
maybe_add_time_based_warnings(&ctx).await;
match ctx.get_config_i64(Config::LastHousekeeping).await {
Ok(last_housekeeping_time) => {
let next_housekeeping_time =
last_housekeeping_time.saturating_add(60 * 60 * 24);
if next_housekeeping_time <= time() {
sql::housekeeping(&ctx).await.log_err(&ctx).ok();
}
}
Err(err) => {
warn!(ctx, "Failed to get last housekeeping time: {}", err);
}
};
match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
Ok(fetched_existing_msgs) => {
if !fetched_existing_msgs {
// Consider it done even if we fail.
//
// This operation is not critical enough to retry,
// especially if the error is persistent.
if let Err(err) =
ctx.set_config_bool(Config::FetchedExistingMsgs, true).await
{
warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
}
if let Err(err) = connection.fetch_existing_msgs(&ctx).await {
warn!(ctx, "Failed to fetch existing messages: {:#}", err);
connection.trigger_reconnect(&ctx);
}
}
}
Err(err) => {
warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
}
}
if let Err(err) = download_msgs(&ctx, &mut connection).await {
warn!(ctx, "Failed to download messages: {:#}", err);
}
fetch_idle(&ctx, &mut connection, FolderMeaning::Inbox).await;
}
};

View File

@@ -8,10 +8,7 @@ use tokio::sync::Mutex;
use crate::events::EventType;
use crate::imap::{scan_folders::get_watched_folder_configs, FolderMeaning};
use crate::quota::{
QUOTA_ERROR_THRESHOLD_PERCENTAGE, QUOTA_MAX_AGE_SECONDS, QUOTA_WARN_THRESHOLD_PERCENTAGE,
};
use crate::tools::time;
use crate::quota::{QUOTA_ERROR_THRESHOLD_PERCENTAGE, QUOTA_WARN_THRESHOLD_PERCENTAGE};
use crate::{context::Context, log::LogExt};
use crate::{stock_str, tools};
@@ -472,14 +469,9 @@ impl Context {
ret += format!("<li>{e}</li>").as_str();
}
}
if quota.modified + QUOTA_MAX_AGE_SECONDS < time() {
self.schedule_quota_update().await?;
}
} else {
let not_connected = stock_str::not_connected(self).await;
ret += &format!("<li>{not_connected}</li>");
self.schedule_quota_update().await?;
}
ret += "</ul>";

View File

@@ -742,8 +742,6 @@ pub async fn housekeeping(context: &Context) -> Result<()> {
warn!(context, "Failed to deduplicate peerstates: {:#}.", err)
}
context.schedule_quota_update().await?;
// Try to clear the freelist to free some space on the disk. This
// only works if auto_vacuum is enabled.
match context

View File

@@ -730,6 +730,15 @@ CREATE INDEX smtp_messageid ON imap(rfc724_mid);
)
.await?;
}
if dbversion < 102 {
sql.execute_migration(
"CREATE TABLE download (
msg_id INTEGER NOT NULL -- id of the message stub in msgs table
)",
102,
)
.await?;
}
// Add is_bot column to contacts table with default false.
if dbversion < 102 {