Compare commits

...

12 Commits

Author SHA1 Message Date
link2xt
44953d6bcc Release 1.109.0 2023-02-19 21:40:33 +00:00
link2xt
10066b2bc7 sql: use semaphore to limit access to the connection pool
This ensures that if multiple connections are returned
to the pool at the same time, waiters get them in the order
they were placed in the queue.
2023-02-19 17:06:25 +00:00
Simon Laux
609fc67f0d fix websocket server & add ci test for building it (#4047)
the axum update broke the websocket server, because yerpc still uses a the old 5.9 version.
So I needed to downgrade the dependency until https://github.com/deltachat/yerpc/pull/35 is merged.

I want to retry the kaiOS client experiment, for this I need a working websocket server binary.
2023-02-19 14:57:00 +01:00
Hocuri
641d102aba Add dc_msg_set_subject() C FFI (#4057) 2023-02-19 13:42:39 +00:00
link2xt
f65e1c1587 Sort dependencies in Cargo.toml 2023-02-19 13:18:58 +00:00
link2xt
626ec5e793 Document the meaning of empty Message.subject 2023-02-19 13:05:39 +00:00
link2xt
85517abf58 Derive Debug for Pool 2023-02-19 02:26:26 +00:00
link2xt
75f65b06e8 Run VACUUM on the same connection as sqlcipher_export
Otherwise export_and_import_backup test fails due to SQLITE_SCHEMA error.
2023-02-19 02:26:26 +00:00
link2xt
f2b05ccc29 Reimplement connection pool on top of crossbeam 2023-02-19 02:26:26 +00:00
link2xt
e88f21c010 Remove try_many_times r2d2 bug workaround 2023-02-19 02:26:26 +00:00
link2xt
ed8e2c4818 Replace r2d2 with an own connection pool
New connection pool does not use threads
and does not remove idle connections
or create new connections at runtime.
2023-02-19 02:26:26 +00:00
link2xt
48fee4fc92 python: replace "while 1" with "while True"
This makes pyright recognize that the function never returns implicitly.
2023-02-18 11:31:07 +00:00
24 changed files with 292 additions and 342 deletions

View File

@@ -35,6 +35,10 @@ jobs:
npm run test
env:
DCC_NEW_TMP_EMAIL: ${{ secrets.DCC_NEW_TMP_EMAIL }}
- name: make sure websocket server version still builds
run: |
cd deltachat-jsonrpc
cargo build --bin deltachat-jsonrpc-server --features webserver
- name: Run linter
run: |
cd deltachat-jsonrpc/typescript

View File

@@ -1,22 +1,24 @@
# Changelog
## Unreleased
## 1.109.0
### Changes
- deltachat-rpc-client: use `dataclass` for `Account`, `Chat`, `Contact` and `Message` #4042
- python: mark bindings as supporting typing according to PEP 561 #4045
- retry filesystem operations during account migration #4043
- remove `r2d2_sqlite` dependency #4050
### Fixes
- deltachat-rpc-server: do not block stdin while processing the request. #4041
deltachat-rpc-server now reads the next request as soon as previous request handler is spawned.
- enable `auto_vacuum` on all SQL connections #2955
- Enable `auto_vacuum` on all SQL connections. #2955
- Replace `r2d2` connection pool with an own implementation. #4050 #4053 #4043 #4061
This change improves reliability
by closing all database connections immediately when the context is closed.
### API-Changes
- Remove `MimeMessage::from_bytes()` public interface. #4033
- BREAKING Types: jsonrpc: `get_messages` now returns a map with `MessageLoadResult` instead of failing completely if one of the requested messages could not be loaded. #4038
- Add `dc_msg_set_subject()`. C-FFI #4057
- Mark python bindings as supporting typing according to PEP 561 #4045
## 1.108.0

149
Cargo.lock generated
View File

@@ -239,7 +239,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acee9fd5073ab6b045a275b3e709c163dd36c90685219cb21804a147b58dba43"
dependencies = [
"async-trait",
"axum-core 0.2.9",
"axum-core",
"base64 0.13.1",
"bitflags",
"bytes",
@@ -248,7 +248,7 @@ dependencies = [
"http-body",
"hyper",
"itoa",
"matchit 0.5.0",
"matchit",
"memchr",
"mime",
"percent-encoding",
@@ -259,43 +259,7 @@ dependencies = [
"sha-1",
"sync_wrapper",
"tokio",
"tokio-tungstenite 0.17.2",
"tower",
"tower-http",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5694b64066a2459918d8074c2ce0d5a88f409431994c2356617c8ae0c4721fc"
dependencies = [
"async-trait",
"axum-core 0.3.2",
"base64 0.20.0",
"bitflags",
"bytes",
"futures-util",
"http",
"http-body",
"hyper",
"itoa",
"matchit 0.7.0",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sha1",
"sync_wrapper",
"tokio",
"tokio-tungstenite 0.18.0",
"tokio-tungstenite",
"tower",
"tower-http",
"tower-layer",
@@ -318,23 +282,6 @@ dependencies = [
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cae3e661676ffbacb30f1a824089a8c9150e71017f7e1e38f2aa32009188d34"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http-body",
"mime",
"rustversion",
"tower-layer",
"tower-service",
]
[[package]]
name = "backtrace"
version = "0.3.67"
@@ -368,12 +315,6 @@ version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
[[package]]
name = "base64"
version = "0.21.0"
@@ -893,7 +834,7 @@ checksum = "23d8666cb01533c39dde32bcbab8e227b4ed6679b2c925eba05feabea39508fb"
[[package]]
name = "deltachat"
version = "1.108.0"
version = "1.109.0"
dependencies = [
"ansi_term",
"anyhow",
@@ -907,6 +848,7 @@ dependencies = [
"bitflags",
"chrono",
"criterion",
"crossbeam-queue",
"deltachat_derive",
"email",
"encoded-words",
@@ -934,7 +876,6 @@ dependencies = [
"proptest",
"qrcodegen",
"quick-xml",
"r2d2",
"rand 0.8.5",
"ratelimit",
"regex",
@@ -965,11 +906,11 @@ dependencies = [
[[package]]
name = "deltachat-jsonrpc"
version = "1.108.0"
version = "1.109.0"
dependencies = [
"anyhow",
"async-channel",
"axum 0.6.4",
"axum",
"deltachat",
"env_logger 0.10.0",
"futures",
@@ -987,7 +928,7 @@ dependencies = [
[[package]]
name = "deltachat-repl"
version = "1.108.0"
version = "1.109.0"
dependencies = [
"ansi_term",
"anyhow",
@@ -1002,7 +943,7 @@ dependencies = [
[[package]]
name = "deltachat-rpc-server"
version = "1.108.0"
version = "1.109.0"
dependencies = [
"anyhow",
"deltachat-jsonrpc",
@@ -1025,7 +966,7 @@ dependencies = [
[[package]]
name = "deltachat_ffi"
version = "1.108.0"
version = "1.109.0"
dependencies = [
"anyhow",
"deltachat",
@@ -2158,12 +2099,6 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb"
[[package]]
name = "matchit"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40"
[[package]]
name = "md-5"
version = "0.10.5"
@@ -2795,17 +2730,6 @@ version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20f14e071918cbeefc5edc986a7aa92c425dae244e003a35e1cdddb5ca39b5cb"
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot",
"scheduled-thread-pool",
]
[[package]]
name = "radix_trie"
version = "0.2.1"
@@ -3171,15 +3095,6 @@ dependencies = [
"windows-sys 0.36.1",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf"
dependencies = [
"parking_lot",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
@@ -3240,15 +3155,6 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_path_to_error"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "184c643044780f7ceb59104cef98a5a6f12cb2288a7bc701ab93a362b49fd47d"
dependencies = [
"serde",
]
[[package]]
name = "serde_spanned"
version = "0.6.1"
@@ -3658,19 +3564,7 @@ dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite 0.17.3",
]
[[package]]
name = "tokio-tungstenite"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite 0.18.0",
"tungstenite",
]
[[package]]
@@ -3880,25 +3774,6 @@ dependencies = [
"utf-8",
]
[[package]]
name = "tungstenite"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788"
dependencies = [
"base64 0.13.1",
"byteorder",
"bytes",
"http",
"httparse",
"log",
"rand 0.8.5",
"sha1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "twofish"
version = "0.7.1"
@@ -4359,7 +4234,7 @@ dependencies = [
"async-channel",
"async-mutex",
"async-trait",
"axum 0.5.17",
"axum",
"futures",
"futures-util",
"log",

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat"
version = "1.108.0"
version = "1.109.0"
edition = "2021"
license = "MPL-2.0"
rust-version = "1.63"
@@ -29,21 +29,24 @@ format-flowed = { path = "./format-flowed" }
ratelimit = { path = "./deltachat-ratelimit" }
anyhow = "1"
async-channel = "1.8.0"
async-imap = { git = "https://github.com/async-email/async-imap", branch = "master", default-features = false, features = ["runtime-tokio"] }
async-native-tls = { version = "0.4", default-features = false, features = ["runtime-tokio"] }
async-smtp = { version = "0.8", default-features = false, features = ["runtime-tokio"] }
trust-dns-resolver = "0.22"
tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros"] }
tokio-tar = { version = "0.3" } # TODO: integrate tokio into async-tar
async_zip = { version = "0.0.9", default-features = false, features = ["deflate"] }
backtrace = "0.3"
base64 = "0.21"
bitflags = "1.3"
chrono = { version = "0.4", default-features=false, features = ["clock", "std"] }
crossbeam-queue = "0.3"
email = { git = "https://github.com/deltachat/rust-email", branch = "master" }
encoded-words = { git = "https://github.com/async-email/encoded-words", branch = "master" }
escaper = "0.1"
fast-socks5 = "0.8"
futures = "0.3"
futures-lite = "1.12.0"
hex = "0.4.0"
humansize = "2"
image = { version = "0.24.5", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] }
kamadak-exif = "0.5"
lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" }
@@ -57,10 +60,11 @@ once_cell = "1.17.0"
percent-encoding = "2.2"
pgp = { version = "0.9", default-features = false }
pretty_env_logger = { version = "0.4", optional = true }
qrcodegen = "1.7.0"
quick-xml = "0.27"
r2d2 = "0.8"
rand = "0.8"
regex = "1.7"
reqwest = { version = "0.11.14", features = ["json"] }
rusqlite = { version = "0.28", features = ["sqlcipher"] }
rust-hsluv = "0.1"
sanitize-filename = "0.4"
@@ -71,21 +75,17 @@ sha2 = "0.10"
smallvec = "1"
strum = "0.24"
strum_macros = "0.24"
thiserror = "1"
toml = "0.7"
url = "2"
uuid = { version = "1", features = ["serde", "v4"] }
fast-socks5 = "0.8"
humansize = "2"
qrcodegen = "1.7.0"
tagger = "4.3.4"
textwrap = "0.16.0"
async-channel = "1.8.0"
futures-lite = "1.12.0"
tokio-stream = { version = "0.1.11", features = ["fs"] }
thiserror = "1"
tokio-io-timeout = "1.2.0"
reqwest = { version = "0.11.14", features = ["json"] }
async_zip = { version = "0.0.9", default-features = false, features = ["deflate"] }
tokio-stream = { version = "0.1.11", features = ["fs"] }
tokio-tar = { version = "0.3" } # TODO: integrate tokio into async-tar
tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros"] }
toml = "0.7"
trust-dns-resolver = "0.22"
url = "2"
uuid = { version = "1", features = ["serde", "v4"] }
[dev-dependencies]
ansi_term = "0.12.0"

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat_ffi"
version = "1.108.0"
version = "1.109.0"
description = "Deltachat FFI"
edition = "2018"
readme = "README.md"

View File

@@ -4327,6 +4327,18 @@ void dc_msg_set_text (dc_msg_t* msg, const char* text);
void dc_msg_set_html (dc_msg_t* msg, const char* html);
/**
* Sets the email's subject. If it's empty, a default subject
* will be used (e.g. `Message from Alice` or `Re: <last subject>`).
* This does not alter any information in the database.
*
* @memberof dc_msg_t
* @param msg The message object.
* @param subject The new subject.
*/
void dc_msg_set_subject (dc_msg_t* msg, const char* subject);
/**
* Set different sender name for a message.
* This overrides the name set by the dc_set_config()-option `displayname`.

View File

@@ -3598,6 +3598,16 @@ pub unsafe extern "C" fn dc_msg_set_html(msg: *mut dc_msg_t, html: *const libc::
ffi_msg.message.set_html(to_opt_string_lossy(html))
}
#[no_mangle]
pub unsafe extern "C" fn dc_msg_set_subject(msg: *mut dc_msg_t, subject: *const libc::c_char) {
if msg.is_null() {
eprintln!("ignoring careless call to dc_msg_get_subject()");
return;
}
let ffi_msg = &mut *msg;
ffi_msg.message.set_subject(to_string_lossy(subject));
}
#[no_mangle]
pub unsafe extern "C" fn dc_msg_set_override_sender_name(
msg: *mut dc_msg_t,

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat-jsonrpc"
version = "1.108.0"
version = "1.109.0"
description = "DeltaChat JSON-RPC API"
edition = "2021"
default-run = "deltachat-jsonrpc-server"
@@ -28,7 +28,7 @@ sanitize-filename = "0.4"
walkdir = "2.3.2"
# optional dependencies
axum = { version = "0.6.4", optional = true, features = ["ws"] }
axum = { version = "0.5.9", optional = true, features = ["ws"] }
env_logger = { version = "0.10.0", optional = true }
[dev-dependencies]

View File

@@ -48,5 +48,5 @@
},
"type": "module",
"types": "dist/deltachat.d.ts",
"version": "1.108.0"
"version": "1.109.0"
}

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat-repl"
version = "1.108.0"
version = "1.109.0"
edition = "2021"
[dependencies]

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat-rpc-server"
version = "1.108.0"
version = "1.109.0"
description = "DeltaChat JSON-RPC server"
edition = "2021"
readme = "README.md"

View File

@@ -60,5 +60,5 @@
"test:mocha": "mocha -r esm node/test/test.js --growl --reporter=spec --bail --exit"
},
"types": "node/dist/index.d.ts",
"version": "1.108.0"
"version": "1.109.0"
}

View File

@@ -207,14 +207,14 @@ class IdleManager:
return res
def wait_for_new_message(self, timeout=None) -> bytes:
while 1:
while True:
for item in self.check(timeout=timeout):
if b"EXISTS" in item or b"RECENT" in item:
return item
def wait_for_seen(self, timeout=None) -> int:
"""Return first message with SEEN flag from a running idle-stream."""
while 1:
while True:
for item in self.check(timeout=timeout):
if FETCH in item:
self.log(str(item))

View File

@@ -108,7 +108,7 @@ class FFIEventTracker:
return ev
def iter_events(self, timeout=None, check_error=True):
while 1:
while True:
yield self.get(timeout=timeout, check_error=check_error)
def get_matching(self, event_name_regex, check_error=True, timeout=None):
@@ -119,14 +119,14 @@ class FFIEventTracker:
def get_info_contains(self, regex: str) -> FFIEvent:
rex = re.compile(regex)
while 1:
while True:
ev = self.get_matching("DC_EVENT_INFO")
if rex.search(ev.data2):
return ev
def get_info_regex_groups(self, regex, check_error=True):
rex = re.compile(regex)
while 1:
while True:
ev = self.get_matching("DC_EVENT_INFO", check_error=check_error)
m = rex.match(ev.data2)
if m is not None:
@@ -137,7 +137,7 @@ class FFIEventTracker:
This only works reliably if the connectivity doesn't change
again too quickly, otherwise we might miss it.
"""
while 1:
while True:
if self.account.get_connectivity() == connectivity:
return
self.get_matching("DC_EVENT_CONNECTIVITY_CHANGED")
@@ -146,7 +146,7 @@ class FFIEventTracker:
"""Wait until the connectivity changes to `expected_next`.
Fails the test if it changes to something else.
"""
while 1:
while True:
current = self.account.get_connectivity()
if current == expected_next:
return
@@ -156,7 +156,7 @@ class FFIEventTracker:
self.get_matching("DC_EVENT_CONNECTIVITY_CHANGED")
def wait_for_all_work_done(self):
while 1:
while True:
if self.account.all_work_done():
return
self.get_matching("DC_EVENT_CONNECTIVITY_CHANGED")
@@ -164,7 +164,7 @@ class FFIEventTracker:
def ensure_event_not_queued(self, event_name_regex):
__tracebackhide__ = True
rex = re.compile(f"(?:{event_name_regex}).*")
while 1:
while True:
try:
ev = self._event_queue.get(False)
except Empty:
@@ -173,7 +173,7 @@ class FFIEventTracker:
assert not rex.match(ev.name), f"event found {ev}"
def wait_securejoin_inviter_progress(self, target):
while 1:
while True:
event = self.get_matching("DC_EVENT_SECUREJOIN_INVITER_PROGRESS")
if event.data2 >= target:
print(f"** SECUREJOINT-INVITER PROGRESS {target}", self.account)

View File

@@ -309,7 +309,7 @@ class ACSetup:
def wait_one_configured(self, account):
"""wait until this account has successfully configured."""
if self._account2state[account] == self.CONFIGURING:
while 1:
while True:
acc = self._pop_config_success()
if acc == account:
break
@@ -638,7 +638,7 @@ class BotProcess:
def _run_stdout_thread(self) -> None:
try:
while 1:
while True:
line = self.popen.stdout.readline()
if not line:
break
@@ -659,7 +659,7 @@ class BotProcess:
for next_pattern in patterns:
print("+++FNMATCH:", next_pattern)
ignored = []
while 1:
while True:
line = self.stdout_queue.get()
if line is None:
if ignored:

View File

@@ -85,7 +85,7 @@ class ConfigureTracker:
self._imap_finished.wait()
def wait_progress(self, data1=None):
while 1:
while True:
evdata = self._progress.get()
if data1 is None or evdata == data1:
break

View File

@@ -1,7 +1,6 @@
//! # Account manager module.
use std::collections::BTreeMap;
use std::future::Future;
use std::path::{Path, PathBuf};
use anyhow::{ensure, Context as _, Result};
@@ -151,7 +150,7 @@ impl Accounts {
if let Some(cfg) = self.config.get_account(id) {
let account_path = self.dir.join(cfg.dir);
try_many_times(|| fs::remove_dir_all(&account_path))
fs::remove_dir_all(&account_path)
.await
.context("failed to remove account data")?;
}
@@ -187,10 +186,10 @@ impl Accounts {
fs::create_dir_all(self.dir.join(&account_config.dir))
.await
.context("failed to create dir")?;
try_many_times(|| fs::rename(&dbfile, &new_dbfile))
fs::rename(&dbfile, &new_dbfile)
.await
.context("failed to rename dbfile")?;
try_many_times(|| fs::rename(&blobdir, &new_blobdir))
fs::rename(&blobdir, &new_blobdir)
.await
.context("failed to rename blobdir")?;
if walfile.exists() {
@@ -215,7 +214,7 @@ impl Accounts {
}
Err(err) => {
let account_path = std::path::PathBuf::from(&account_config.dir);
try_many_times(|| fs::remove_dir_all(&account_path))
fs::remove_dir_all(&account_path)
.await
.context("failed to remove account data")?;
self.config.remove_account(account_config.id).await?;
@@ -472,33 +471,6 @@ impl Config {
}
}
/// Spend up to 1 minute trying to do the operation.
///
/// Files may remain locked up to 30 seconds due to r2d2 bug:
/// <https://github.com/sfackler/r2d2/issues/99>
async fn try_many_times<F, Fut, T>(f: F) -> std::result::Result<(), T>
where
F: Fn() -> Fut,
Fut: Future<Output = std::result::Result<(), T>>,
{
let mut counter = 0;
loop {
counter += 1;
if let Err(err) = f().await {
if counter > 60 {
return Err(err);
}
// Wait 1 second and try again.
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
} else {
break;
}
}
Ok(())
}
/// Configuration of a single account.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
struct AccountConfig {

View File

@@ -3397,7 +3397,7 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId)
msg.param.remove(Param::WebxdcSummaryTimestamp);
msg.in_reply_to = None;
// do not leak data as group names; a default subject is generated by mimfactory
// do not leak data as group names; a default subject is generated by mimefactory
msg.subject = "".to_string();
let new_msg_id: MsgId;

View File

@@ -5,7 +5,7 @@ use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use ::pgp::types::KeyTrait;
use anyhow::{bail, ensure, format_err, Context as _, Result};
use anyhow::{bail, ensure, format_err, Context as _, Error, Result};
use futures::StreamExt;
use futures_lite::FutureExt;
use rand::{thread_rng, Rng};
@@ -508,6 +508,9 @@ fn get_next_backup_path(folder: &Path, backup_time: i64) -> Result<(PathBuf, Pat
bail!("could not create backup file, disk full?");
}
/// Exports the database to a separate file with the given passphrase.
///
/// Set passphrase to empty string to export the database unencrypted.
async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Result<()> {
// get a fine backup file name (the name includes the date so that multiple backup instances are possible)
let now = time();
@@ -521,13 +524,6 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res
.await?;
sql::housekeeping(context).await.ok_or_log(context);
context
.sql
.execute("VACUUM;", paramsv![])
.await
.map_err(|e| warn!(context, "Vacuum failed, exporting anyway {}", e))
.ok();
ensure!(
context.scheduler.read().await.is_none(),
"cannot export backup, IO is running"
@@ -540,11 +536,29 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res
dest_path.display(),
);
context
.sql
.export(&temp_db_path, passphrase)
.await
.with_context(|| format!("failed to backup plaintext database to {temp_db_path:?}"))?;
let path_str = temp_db_path
.to_str()
.with_context(|| format!("path {temp_db_path:?} is not valid unicode"))?;
let conn = context.sql.get_conn().await?;
tokio::task::block_in_place(move || {
if let Err(err) = conn.execute("VACUUM", params![]) {
info!(context, "Vacuum failed, exporting anyway: {:#}.", err);
}
conn.execute(
"ATTACH DATABASE ? AS backup KEY ?",
paramsv![path_str, passphrase],
)
.context("failed to attach backup database")?;
let res = conn
.query_row("SELECT sqlcipher_export('backup')", [], |_row| Ok(()))
.context("failed to export to attached backup database");
conn.execute("DETACH DATABASE backup", [])
.context("failed to detach backup database")?;
res?;
Ok::<_, Error>(())
})?;
let res = export_backup_inner(context, &temp_db_path, &temp_path).await;

View File

@@ -265,6 +265,8 @@ pub struct Message {
pub(crate) text: Option<String>,
/// Message subject.
///
/// If empty, a default subject will be generated when sending.
pub(crate) subject: String,
/// `Message-ID` header value.
@@ -795,6 +797,12 @@ impl Message {
self.text = text;
}
/// Sets the email's subject. If it's empty, a default subject
/// will be used (e.g. `Message from Alice` or `Re: <last subject>`).
pub fn set_subject(&mut self, subject: String) {
self.subject = subject;
}
/// Sets the file associated with a message.
///
/// This function does not use the file or check if it exists,

View File

@@ -1612,6 +1612,22 @@ mod tests {
assert_eq!(maybe_encode_words("äöü"), "=?utf-8?b?w6TDtsO8?=");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_manually_set_subject() -> Result<()> {
let t = TestContext::new_alice().await;
let chat = t.create_chat_with_contact("bob", "bob@example.org").await;
let mut msg = Message::new(Viewtype::Text);
msg.set_subject("Subjeeeeect".to_string());
let sent_msg = t.send_msg(chat.id, &mut msg).await;
let payload = sent_msg.payload();
assert_eq!(payload.match_indices("Subject: Subjeeeeect").count(), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_subject_from_mua() {
// 1.: Receive a mail from an MUA

View File

@@ -4,10 +4,9 @@ use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use anyhow::{bail, Context as _, Result};
use rusqlite::{self, config::DbConfig, Connection};
use rusqlite::{self, config::DbConfig, Connection, OpenFlags};
use tokio::sync::RwLock;
use crate::blob::BlobObject;
@@ -47,10 +46,10 @@ pub(crate) fn params_iter(iter: &[impl crate::ToSql]) -> impl Iterator<Item = &d
iter.iter().map(|item| item as &dyn crate::ToSql)
}
mod connection_manager;
mod migrations;
mod pool;
use connection_manager::ConnectionManager;
use pool::{Pool, PooledConnection};
/// A wrapper around the underlying Sqlite3 object.
#[derive(Debug)]
@@ -59,7 +58,7 @@ pub struct Sql {
pub(crate) dbfile: PathBuf,
/// SQL connection pool.
pool: RwLock<Option<r2d2::Pool<ConnectionManager>>>,
pool: RwLock<Option<Pool>>,
/// None if the database is not open, true if it is open with passphrase and false if it is
/// open without a passphrase.
@@ -125,31 +124,6 @@ impl Sql {
// drop closes the connection
}
/// Exports the database to a separate file with the given passphrase.
///
/// Set passphrase to empty string to export the database unencrypted.
pub(crate) async fn export(&self, path: &Path, passphrase: String) -> Result<()> {
let path_str = path
.to_str()
.with_context(|| format!("path {path:?} is not valid unicode"))?;
let conn = self.get_conn().await?;
tokio::task::block_in_place(move || {
conn.execute(
"ATTACH DATABASE ? AS backup KEY ?",
paramsv![path_str, passphrase],
)
.context("failed to attach backup database")?;
let res = conn
.query_row("SELECT sqlcipher_export('backup')", [], |_row| Ok(()))
.context("failed to export to attached backup database");
conn.execute("DETACH DATABASE backup", [])
.context("failed to detach backup database")?;
res?;
Ok(())
})
}
/// Imports the database from a separate file with the given passphrase.
pub(crate) async fn import(&self, path: &Path, passphrase: String) -> Result<()> {
let path_str = path
@@ -195,17 +169,15 @@ impl Sql {
})
}
fn new_pool(dbfile: &Path, passphrase: String) -> Result<r2d2::Pool<ConnectionManager>> {
// this actually creates min_idle database handles just now.
// therefore, with_init() must not try to modify the database as otherwise
// we easily get busy-errors (eg. table-creation, journal_mode etc. should be done on only one handle)
let mgr = ConnectionManager::new(dbfile.to_path_buf(), passphrase);
let pool = r2d2::Pool::builder()
.min_idle(Some(2))
.max_size(10)
.connection_timeout(Duration::from_secs(60))
.build(mgr)
.context("Can't build SQL connection pool")?;
/// Creates a new connection pool.
fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
let mut connections = Vec::new();
for _ in 0..3 {
let connection = new_connection(dbfile, &passphrase)?;
connections.push(connection);
}
let pool = Pool::new(connections);
Ok(pool)
}
@@ -363,10 +335,10 @@ impl Sql {
}
/// Allocates a connection from the connection pool and returns it.
pub(crate) async fn get_conn(&self) -> Result<r2d2::PooledConnection<ConnectionManager>> {
pub(crate) async fn get_conn(&self) -> Result<PooledConnection> {
let lock = self.pool.read().await;
let pool = lock.as_ref().context("no SQL connection")?;
let conn = pool.get()?;
let conn = pool.get().await?;
Ok(conn)
}
@@ -610,6 +582,42 @@ impl Sql {
}
}
/// Creates a new SQLite connection.
///
/// `path` is the database path.
///
/// `passphrase` is the SQLCipher database passphrase.
/// Empty string if database is not encrypted.
fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX;
flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE);
flags.insert(OpenFlags::SQLITE_OPEN_CREATE);
let conn = Connection::open_with_flags(path, flags)?;
conn.execute_batch(
"PRAGMA cipher_memory_security = OFF; -- Too slow on Android
PRAGMA secure_delete=on;
PRAGMA busy_timeout = 60000; -- 60 seconds
PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android
PRAGMA foreign_keys=on;
",
)?;
conn.pragma_update(None, "key", passphrase)?;
// Try to enable auto_vacuum. This will only be
// applied if the database is new or after successful
// VACUUM, which usually happens before backup export.
// When auto_vacuum is INCREMENTAL, it is possible to
// use PRAGMA incremental_vacuum to return unused
// database pages to the filesystem.
conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
// Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
Ok(conn)
}
/// Cleanup the account to restore some storage and optimize the database.
pub async fn housekeeping(context: &Context) -> Result<()> {
if let Err(err) = remove_unused_files(context).await {

View File

@@ -1,73 +0,0 @@
use std::path::PathBuf;
use std::time::Duration;
use r2d2::ManageConnection;
use rusqlite::{Connection, Error, OpenFlags};
#[derive(Debug)]
pub struct ConnectionManager {
/// Database file path.
path: PathBuf,
/// SQLite open flags.
flags: rusqlite::OpenFlags,
/// SQLCipher database passphrase.
/// Empty string if database is not encrypted.
passphrase: String,
}
impl ConnectionManager {
/// Creates new connection manager.
pub fn new(path: PathBuf, passphrase: String) -> Self {
let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX;
flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE);
flags.insert(OpenFlags::SQLITE_OPEN_CREATE);
Self {
path,
flags,
passphrase,
}
}
}
impl ManageConnection for ConnectionManager {
type Connection = Connection;
type Error = Error;
fn connect(&self) -> Result<Connection, Error> {
let conn = Connection::open_with_flags(&self.path, self.flags)?;
conn.execute_batch(&format!(
"PRAGMA cipher_memory_security = OFF; -- Too slow on Android
PRAGMA secure_delete=on;
PRAGMA busy_timeout = {};
PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android
PRAGMA foreign_keys=on;
",
Duration::from_secs(60).as_millis()
))?;
conn.pragma_update(None, "key", &self.passphrase)?;
// Try to enable auto_vacuum. This will only be
// applied if the database is new or after successful
// VACUUM, which usually happens before backup export.
// When auto_vacuum is INCREMENTAL, it is possible to
// use PRAGMA incremental_vacuum to return unused
// database pages to the filesystem.
conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
// Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
Ok(conn)
}
fn is_valid(&self, _conn: &mut Connection) -> Result<(), Error> {
Ok(())
}
fn has_broken(&self, _conn: &mut Connection) -> bool {
false
}
}

102
src/sql/pool.rs Normal file
View File

@@ -0,0 +1,102 @@
//! Connection pool.
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Weak};
use anyhow::{Context, Result};
use crossbeam_queue::ArrayQueue;
use rusqlite::Connection;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
/// Inner connection pool.
#[derive(Debug)]
struct InnerPool {
/// Available connections.
connections: ArrayQueue<Connection>,
/// Counts the number of available connections.
semaphore: Arc<Semaphore>,
}
impl InnerPool {
/// Puts a connection into the pool.
///
/// The connection could be new or returned back.
fn put(&self, connection: Connection) {
self.connections.force_push(connection);
}
}
/// Pooled connection.
pub struct PooledConnection {
/// Weak reference to the pool used to return the connection back.
pool: Weak<InnerPool>,
/// Only `None` right after moving the connection back to the pool.
conn: Option<Connection>,
/// Semaphore permit, dropped after returning the connection to the pool.
_permit: OwnedSemaphorePermit,
}
impl Drop for PooledConnection {
fn drop(&mut self) {
// Put the connection back unless the pool is already dropped.
if let Some(pool) = self.pool.upgrade() {
if let Some(conn) = self.conn.take() {
pool.put(conn);
}
}
}
}
impl Deref for PooledConnection {
type Target = Connection;
fn deref(&self) -> &Connection {
self.conn.as_ref().unwrap()
}
}
impl DerefMut for PooledConnection {
fn deref_mut(&mut self) -> &mut Connection {
self.conn.as_mut().unwrap()
}
}
/// Connection pool.
#[derive(Clone, Debug)]
pub struct Pool {
/// Reference to the actual connection pool.
inner: Arc<InnerPool>,
}
impl Pool {
/// Creates a new connection pool.
pub fn new(connections: Vec<Connection>) -> Self {
let inner = Arc::new(InnerPool {
connections: ArrayQueue::new(connections.len()),
semaphore: Arc::new(Semaphore::new(connections.len())),
});
for connection in connections {
inner.connections.force_push(connection);
}
Pool { inner }
}
/// Retrieves a connection from the pool.
pub async fn get(&self) -> Result<PooledConnection> {
let permit = self.inner.semaphore.clone().acquire_owned().await?;
let conn = self
.inner
.connections
.pop()
.context("got a permit when there are no connections in the pool")?;
let conn = PooledConnection {
pool: Arc::downgrade(&self.inner),
conn: Some(conn),
_permit: permit,
};
Ok(conn)
}
}