Compare commits

..

2 Commits

Author SHA1 Message Date
Floris Bruynooghe
16bb02ba96 Add changelog 2023-03-20 19:55:32 +01:00
Floris Bruynooghe
cabefee4c0 Make sure set_last_error is called for .log_err()
We have a common .log_err() trait method which is very prevalent and
useful in the FFI bindings.  However this was not hooked up with the
set_last_error/get_last_error mechanism, which the error! macro was
hooked up with.

This hooks correctly hooks up the two and makes sure to log all errors
around backup provider/retrieval.
2023-03-20 16:32:36 +01:00
16 changed files with 227 additions and 293 deletions

View File

@@ -8,14 +8,35 @@ concurrency:
on:
workflow_dispatch:
release:
types: [published]
jobs:
# Build a version statically linked against musl libc
# to avoid problems with glibc version incompatibility.
build_static_linux:
name: Build deltachat-rpc-server for Linux
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
- name: Setup rust target
run: rustup target add x86_64-unknown-linux-musl
- name: Install musl-gcc
run: sudo apt install musl-tools
- name: Build
env:
RUSTFLAGS: "-C link-arg=-s"
run: cargo build --release --target x86_64-unknown-linux-musl -p deltachat-rpc-server --features vendored
- name: Upload binary
uses: actions/upload-artifact@v3
with:
name: deltachat-rpc-server-x86_64
path: target/x86_64-unknown-linux-musl/release/deltachat-rpc-server
if-no-files-found: error
build_linux:
name: Cross-compile deltachat-rpc-server for x86_64, aarch64 and armv7 Linux
name: Cross-compile deltachat-rpc-server for aarch64 and armv7 Linux
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
@@ -23,13 +44,6 @@ jobs:
- name: Build
run: sh scripts/zig-rpc-server.sh
- name: Upload x86_64 binary
uses: actions/upload-artifact@v3
with:
name: deltachat-rpc-server-x86_64
path: target/x86_64-unknown-linux-musl/release/deltachat-rpc-server
if-no-files-found: error
- name: Upload aarch64 binary
uses: actions/upload-artifact@v3
with:
@@ -44,6 +58,50 @@ jobs:
path: target/armv7-unknown-linux-musleabihf/release/deltachat-rpc-server
if-no-files-found: error
build_android:
name: Cross-compile deltachat-rpc-server for Android (armeabi-v7a, arm64-v8a, x86 and x86_64)
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
- uses: nttld/setup-ndk@v1
id: setup-ndk
with:
ndk-version: r21d
- name: Build
env:
ANDROID_NDK_ROOT: ${{ steps.setup-ndk.outputs.ndk-path }}
run: sh scripts/android-rpc-server.sh
- name: Upload binary
uses: actions/upload-artifact@v3
with:
name: deltachat-rpc-server-android-armv7
path: target/armv7-linux-androideabi/release/deltachat-rpc-server
if-no-files-found: error
- name: Upload binary
uses: actions/upload-artifact@v3
with:
name: deltachat-rpc-server-android-aarch64
path: target/aarch64-linux-android/release/deltachat-rpc-server
if-no-files-found: error
- name: Upload binary
uses: actions/upload-artifact@v3
with:
name: deltachat-rpc-server-android-i686
path: target/i686-linux-android/release/deltachat-rpc-server
if-no-files-found: error
- name: Upload binary
uses: actions/upload-artifact@v3
with:
name: deltachat-rpc-server-android-x86_64
path: target/x86_64-linux-android/release/deltachat-rpc-server
if-no-files-found: error
build_windows:
name: Build deltachat-rpc-server for Windows
strategy:
@@ -76,51 +134,3 @@ jobs:
name: deltachat-rpc-server-${{ matrix.artifact }}
path: target/${{ matrix.target}}/release/${{ matrix.path }}
if-no-files-found: error
publish:
name: Upload binaries to the release
needs: ["build_linux", "build_windows"]
permissions:
contents: write
runs-on: "ubuntu-latest"
steps:
- name: Download deltachat-rpc-server-x86_64
uses: "actions/download-artifact@v3"
with:
name: "deltachat-rpc-server-x86_64"
path: "dist/deltachat-rpc-server-x86_64"
- name: Download deltachat-rpc-server-aarch64
uses: "actions/download-artifact@v3"
with:
name: "deltachat-rpc-server-aarch64"
path: "dist/deltachat-rpc-server-aarch64"
- name: Download deltachat-rpc-server-armv7
uses: "actions/download-artifact@v3"
with:
name: "deltachat-rpc-server-armv7"
path: "dist/deltachat-rpc-server-armv7"
- name: Download deltachat-rpc-server-win32.exe
uses: "actions/download-artifact@v3"
with:
name: "deltachat-rpc-server-win32.exe"
path: "dist/deltachat-rpc-server-win32.exe"
- name: Download deltachat-rpc-server-win64.exe
uses: "actions/download-artifact@v3"
with:
name: "deltachat-rpc-server-win64.exe"
path: "dist/deltachat-rpc-server-win64.exe"
- name: List downloaded artifacts
run: ls -l dist/
- name: Upload binaries to the GitHub release
env:
GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}"
run: |
gh release upload ${{ github.ref_name }} \
--repo ${{ github.repository }} \
dist/deltachat-rpc-server-*

View File

@@ -13,6 +13,8 @@
dc_stop_io(). dc_start_io() can now be called at any time without
harm. #4138
- More accurate maybe_add_bcc_self device message text #4175
- LogExt::log_err now calls Context::set_last_error so that the FFI
can use this and have consistent dc_get_last_error() reporting. #4187
### Fixes
- Fix segmentation fault if `dc_context_unref()` is called during
@@ -22,7 +24,6 @@
- Delete expired messages using multiple SQL requests. #4158
- Do not emit "Failed to run incremental vacuum" warnings on success. #4160
- Ability to send backup over network and QR code to setup second device #4007
- Disable buffering during STARTTLS setup. #4190
## [1.111.0] - 2023-03-05

View File

@@ -2644,6 +2644,10 @@ void dc_str_unref (char* str);
/**
* Creates an object for sending a backup to another device.
*
* Before calling this function IO must be stopped using dc_accounts_stop_io()
* or dc_stop_io() so that no changes to the blobs or database are happening.
* IO should only be restarted once dc_backup_provider_wait() has returned.
*
* The backup is sent to through a peer-to-peer channel which is bootstrapped
* by a QR-code. The backup contains the entire state of the account
* including credentials. This can be used to setup a new device.
@@ -2704,7 +2708,9 @@ char* dc_backup_provider_get_qr_svg (const dc_backup_provider_t* backup_provider
/**
* Waits for the sending to finish.
*
* This is a blocking call and should only be called once.
* This is a blocking call and should only be called once. Once this function
* returns IO can be started again using dc_accounts_start_io() or
* dc_start_io().
*
* @memberof dc_backup_provider_t
* @param backup_provider The backup provider object as created by

View File

@@ -4166,8 +4166,6 @@ pub unsafe extern "C" fn dc_backup_provider_new(
})
.map(|ffi_provider| Box::into_raw(Box::new(ffi_provider)))
.log_err(ctx, "BackupProvider failed")
.context("BackupProvider failed")
.set_last_error(ctx)
.unwrap_or(ptr::null_mut())
}
@@ -4183,8 +4181,6 @@ pub unsafe extern "C" fn dc_backup_provider_get_qr(
let ctx = &*ffi_provider.context;
deltachat::qr::format_backup(&ffi_provider.provider.qr())
.log_err(ctx, "BackupProvider get_qr failed")
.context("BackupProvider get_qr failed")
.set_last_error(ctx)
.unwrap_or_default()
.strdup()
}
@@ -4202,8 +4198,6 @@ pub unsafe extern "C" fn dc_backup_provider_get_qr_svg(
let provider = &ffi_provider.provider;
block_on(generate_backup_qr(ctx, &provider.qr()))
.log_err(ctx, "BackupProvider get_qr_svg failed")
.context("BackupProvider get_qr_svg failed")
.set_last_error(ctx)
.unwrap_or_default()
.strdup()
}
@@ -4219,8 +4213,6 @@ pub unsafe extern "C" fn dc_backup_provider_wait(provider: *mut dc_backup_provid
let provider = &mut ffi_provider.provider;
block_on(provider)
.log_err(ctx, "Failed to await BackupProvider")
.context("Failed to await BackupProvider")
.set_last_error(ctx)
.ok();
}
@@ -4273,56 +4265,6 @@ impl<T: Default, E: std::fmt::Display> ResultExt<T, E> for Result<T, E> {
}
}
trait ResultLastError<T, E>
where
E: std::fmt::Display,
{
/// Sets this `Err` value using [`Context::set_last_error`].
///
/// Normally each FFI-API *should* call this if it handles an error from the Rust API:
/// errors which need to be reported to users in response to an API call need to be
/// propagated up the Rust API and at the FFI boundary need to be stored into the "last
/// error" so the FFI users can retrieve an appropriate error message on failure. Often
/// you will want to combine this with a call to [`LogExt::log_err`].
///
/// Since historically calls to the `deltachat::log::error!()` macro were (and sometimes
/// still are) shown as error toasts to the user, this macro also calls
/// [`Context::set_last_error`]. It is preferable however to rely on normal error
/// propagation in Rust code however and only use this `ResultExt::set_last_error` call
/// in the FFI layer.
///
/// # Example
///
/// Fully handling an error in the FFI code looks like this currently:
///
/// ```no_compile
/// some_dc_rust_api_call_returning_result()
/// .log_err(&context, "My API call failed")
/// .context("My API call failed")
/// .set_last_error(&context)
/// .unwrap_or_default()
/// ```
///
/// As shows it is a shame the `.log_err()` call currently needs a message instead of
/// relying on an implicit call to the [`anyhow::Context`] call if needed. This stems
/// from a time before we fully embraced anyhow. Some day we'll also fix that.
///
/// [`Context::set_last_error`]: context::Context::set_last_error
fn set_last_error(self, context: &context::Context) -> Result<T, E>;
}
impl<T, E> ResultLastError<T, E> for Result<T, E>
where
E: std::fmt::Display,
{
fn set_last_error(self, context: &context::Context) -> Result<T, E> {
if let Err(ref err) = self {
context.set_last_error(&format!("{err:#}"));
}
self
}
}
trait ResultNullableExt<T> {
fn into_raw(self) -> *mut T;
}

View File

@@ -4,7 +4,6 @@ use std::{collections::HashMap, str::FromStr};
use anyhow::{anyhow, bail, ensure, Context, Result};
pub use deltachat::accounts::Accounts;
use deltachat::qr::Qr;
use deltachat::{
chat::{
self, add_contact_to_chat, forward_msgs, get_chat_media, get_chat_msgs, get_chat_msgs_ex,
@@ -30,8 +29,7 @@ use deltachat::{
webxdc::StatusUpdateSerial,
};
use sanitize_filename::is_sanitized;
use tokio::fs;
use tokio::sync::{watch, Mutex, RwLock};
use tokio::{fs, sync::RwLock};
use walkdir::WalkDir;
use yerpc::rpc;
@@ -59,45 +57,21 @@ use self::types::{
use crate::api::types::chat_list::{get_chat_list_item_by_id, ChatListItemFetchResult};
use crate::api::types::qr::QrObject;
#[derive(Debug)]
struct AccountState {
/// The Qr code for current [`CommandApi::provide_backup`] call.
///
/// If there currently is a call to [`CommandApi::provide_backup`] this will be
/// `Pending` or `Ready`, otherwise `NoProvider`.
backup_provider_qr: watch::Sender<ProviderQr>,
}
impl Default for AccountState {
fn default() -> Self {
let (tx, _rx) = watch::channel(ProviderQr::NoProvider);
Self {
backup_provider_qr: tx,
}
}
}
#[derive(Clone, Debug)]
pub struct CommandApi {
pub(crate) accounts: Arc<RwLock<Accounts>>,
states: Arc<Mutex<BTreeMap<u32, AccountState>>>,
}
impl CommandApi {
pub fn new(accounts: Accounts) -> Self {
CommandApi {
accounts: Arc::new(RwLock::new(accounts)),
states: Arc::new(Mutex::new(BTreeMap::new())),
}
}
#[allow(dead_code)]
pub fn from_arc(accounts: Arc<RwLock<Accounts>>) -> Self {
CommandApi {
accounts,
states: Arc::new(Mutex::new(BTreeMap::new())),
}
CommandApi { accounts }
}
async fn get_context(&self, id: u32) -> Result<deltachat::context::Context> {
@@ -109,38 +83,6 @@ impl CommandApi {
.ok_or_else(|| anyhow!("account with id {} not found", id))?;
Ok(sc)
}
async fn with_state<F, T>(&self, id: u32, with_state: F) -> T
where
F: FnOnce(&AccountState) -> T,
{
let mut states = self.states.lock().await;
let state = states.entry(id).or_insert_with(Default::default);
with_state(state)
}
async fn inner_get_backup_qr(&self, account_id: u32) -> Result<Qr> {
let mut receiver = self
.with_state(account_id, |state| state.backup_provider_qr.subscribe())
.await;
let val: ProviderQr = receiver.borrow_and_update().clone();
match val {
ProviderQr::NoProvider => bail!("No backup being provided"),
ProviderQr::Pending => loop {
if receiver.changed().await.is_err() {
bail!("No backup being provided (account state dropped)");
}
let val: ProviderQr = receiver.borrow().clone();
match val {
ProviderQr::NoProvider => bail!("No backup being provided"),
ProviderQr::Pending => continue,
ProviderQr::Ready(qr) => break Ok(qr),
};
},
ProviderQr::Ready(qr) => Ok(qr),
}
}
}
#[rpc(all_positional, ts_outdir = "typescript/generated")]
@@ -173,13 +115,7 @@ impl CommandApi {
}
async fn remove_account(&self, account_id: u32) -> Result<()> {
self.accounts
.write()
.await
.remove_account(account_id)
.await?;
self.states.lock().await.remove(&account_id);
Ok(())
self.accounts.write().await.remove_account(account_id).await
}
async fn get_all_account_ids(&self) -> Vec<u32> {
@@ -1422,35 +1358,31 @@ impl CommandApi {
///
/// This **stops IO** while it is running.
///
/// Returns once a remote device has retrieved the backup, or is cancelled.
/// Returns once a remote device has retrieved the backup.
async fn provide_backup(&self, account_id: u32) -> Result<()> {
let ctx = self.get_context(account_id).await?;
self.with_state(account_id, |state| {
state.backup_provider_qr.send_replace(ProviderQr::Pending);
})
.await;
let provider = imex::BackupProvider::prepare(&ctx).await?;
self.with_state(account_id, |state| {
state
.backup_provider_qr
.send_replace(ProviderQr::Ready(provider.qr()));
})
.await;
provider.await
ctx.stop_io().await;
let provider = match imex::BackupProvider::prepare(&ctx).await {
Ok(provider) => provider,
Err(err) => {
ctx.start_io().await;
return Err(err);
}
};
let res = provider.await;
ctx.start_io().await;
res
}
/// Returns the text of the QR code for the running [`CommandApi::provide_backup`].
///
/// This QR code text can be used in [`CommandApi::get_backup`] on a second device to
/// retrieve the backup and setup this second device.
///
/// This call will fail if there is currently no concurrent call to
/// [`CommandApi::provide_backup`]. This call may block if the QR code is not yet
/// ready.
async fn get_backup_qr(&self, account_id: u32) -> Result<String> {
let qr = self.inner_get_backup_qr(account_id).await?;
let ctx = self.get_context(account_id).await?;
let qr = ctx
.backup_export_qr()
.ok_or(anyhow!("no backup being exported"))?;
qr::format_backup(&qr)
}
@@ -1459,14 +1391,12 @@ impl CommandApi {
/// This QR code can be used in [`CommandApi::get_backup`] on a second device to
/// retrieve the backup and setup this second device.
///
/// This call will fail if there is currently no concurrent call to
/// [`CommandApi::provide_backup`]. This call may block if the QR code is not yet
/// ready.
///
/// Returns the QR code rendered as an SVG image.
async fn get_backup_qr_svg(&self, account_id: u32) -> Result<String> {
let ctx = self.get_context(account_id).await?;
let qr = self.inner_get_backup_qr(account_id).await?;
let qr = ctx
.backup_export_qr()
.ok_or(anyhow!("no backup being exported"))?;
generate_backup_qr(&ctx, &qr).await
}
@@ -1970,15 +1900,3 @@ async fn get_config(
.await
}
}
/// Whether a QR code for a BackupProvider is currently available.
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug)]
enum ProviderQr {
/// There is no provider, asking for a QR is an error.
NoProvider,
/// There is a provider, the QR code is pending.
Pending,
/// There is a provider and QR code.
Ready(Qr),
}

View File

@@ -6,7 +6,7 @@ envlist =
[testenv]
commands =
pytest --exitfirst {posargs}
pytest {posargs}
setenv =
# Avoid stack overflow when Rust core is built without optimizations.
RUST_MIN_STACK=8388608
@@ -25,5 +25,5 @@ deps =
ruff
black
commands =
black --quiet --check --diff src/ examples/ tests/
black --check --diff src/ examples/ tests/
ruff src/ examples/ tests/

View File

@@ -8,7 +8,7 @@ envlist =
[testenv]
commands =
pytest -n6 --exitfirst --extra-info -v -rsXx --ignored --strict-tls {posargs: tests examples}
pytest -n6 --extra-info -v -rsXx --ignored --strict-tls {posargs: tests examples}
pip wheel . -w {toxworkdir}/wheelhouse --no-deps
setenv =
# Avoid stack overflow when Rust core is built without optimizations.
@@ -55,7 +55,7 @@ deps =
pygments
restructuredtext_lint
commands =
black --quiet --check --diff setup.py install_python_bindings.py src/deltachat examples/ tests/
black --check --diff setup.py install_python_bindings.py src/deltachat examples/ tests/
ruff src/deltachat tests/ examples/
rst-lint --encoding 'utf-8' README.rst

43
scripts/android-rpc-server.sh Executable file
View File

@@ -0,0 +1,43 @@
#!/bin/sh
# Build deltachat-rpc-server for Android.
set -e
test -n "$ANDROID_NDK_ROOT" || exit 1
RUSTUP_TOOLCHAIN="1.64.0"
rustup install "$RUSTUP_TOOLCHAIN"
rustup target add armv7-linux-androideabi aarch64-linux-android i686-linux-android x86_64-linux-android --toolchain "$RUSTUP_TOOLCHAIN"
KERNEL="$(uname -s | tr '[:upper:]' '[:lower:]')"
ARCH="$(uname -m)"
NDK_HOST_TAG="$KERNEL-$ARCH"
TOOLCHAIN="$ANDROID_NDK_ROOT/toolchains/llvm/prebuilt/$NDK_HOST_TAG"
PACKAGE="deltachat-rpc-server"
export CARGO_PROFILE_RELEASE_LTO=on
CARGO_TARGET_ARMV7_LINUX_ANDROIDEABI_LINKER="$TOOLCHAIN/bin/armv7a-linux-androideabi16-clang" \
TARGET_CC="$TOOLCHAIN/bin/armv7a-linux-androideabi16-clang" \
TARGET_AR="$TOOLCHAIN/bin/llvm-ar" \
TARGET_RANLIB="$TOOLCHAIN/bin/llvm-ranlib" \
cargo "+$RUSTUP_TOOLCHAIN" rustc --release --target armv7-linux-androideabi -p $PACKAGE
CARGO_TARGET_AARCH64_LINUX_ANDROID_LINKER="$TOOLCHAIN/bin/aarch64-linux-android21-clang" \
TARGET_CC="$TOOLCHAIN/bin/aarch64-linux-android21-clang" \
TARGET_AR="$TOOLCHAIN/bin/llvm-ar" \
TARGET_RANLIB="$TOOLCHAIN/bin/llvm-ranlib" \
cargo "+$RUSTUP_TOOLCHAIN" rustc --release --target aarch64-linux-android -p $PACKAGE
CARGO_TARGET_I686_LINUX_ANDROID_LINKER="$TOOLCHAIN/bin/i686-linux-android16-clang" \
TARGET_CC="$TOOLCHAIN/bin/i686-linux-android16-clang" \
TARGET_AR="$TOOLCHAIN/bin/llvm-ar" \
TARGET_RANLIB="$TOOLCHAIN/bin/llvm-ranlib" \
cargo "+$RUSTUP_TOOLCHAIN" rustc --release --target i686-linux-android -p $PACKAGE
CARGO_TARGET_X86_64_LINUX_ANDROID_LINKER="$TOOLCHAIN/bin/x86_64-linux-android21-clang" \
TARGET_CC="$TOOLCHAIN/bin/x86_64-linux-android21-clang" \
TARGET_AR="$TOOLCHAIN/bin/llvm-ar" \
TARGET_RANLIB="$TOOLCHAIN/bin/llvm-ranlib" \
cargo "+$RUSTUP_TOOLCHAIN" rustc --release --target x86_64-linux-android -p $PACKAGE

View File

@@ -7,17 +7,17 @@ set -e
unset RUSTFLAGS
ZIG_VERSION=0.11.0-dev.2213+515e1c93e
ZIG_VERSION=0.11.0-dev.1935+1d96a17af
# Download Zig
rm -fr "$ZIG_VERSION" "zig-linux-x86_64-$ZIG_VERSION.tar.xz"
rm -fr "$ZIG_VERSION" "ZIG_VERSION.tar.xz"
wget "https://ziglang.org/builds/zig-linux-x86_64-$ZIG_VERSION.tar.xz"
tar xf "zig-linux-x86_64-$ZIG_VERSION.tar.xz"
export PATH="$PWD/zig-linux-x86_64-$ZIG_VERSION:$PATH"
cargo install cargo-zigbuild
for TARGET in x86_64-unknown-linux-musl aarch64-unknown-linux-musl armv7-unknown-linux-musleabihf; do
for TARGET in aarch64-unknown-linux-musl armv7-unknown-linux-musleabihf; do
rustup target add "$TARGET"
cargo zigbuild --release --target "$TARGET" -p deltachat-rpc-server --features vendored
done

View File

@@ -23,6 +23,7 @@ use crate::events::{Event, EventEmitter, EventType, Events};
use crate::key::{DcKey, SignedPublicKey};
use crate::login_param::LoginParam;
use crate::message::{self, MessageState, MsgId};
use crate::qr::Qr;
use crate::quota::QuotaInfo;
use crate::scheduler::SchedulerState;
use crate::sql::Sql;
@@ -240,6 +241,14 @@ pub struct InnerContext {
/// If debug logging is enabled, this contains all necessary information
pub(crate) debug_logging: RwLock<Option<DebugLogging>>,
/// QR code for currently running [`BackupProvider`].
///
/// This is only available if a backup export is currently running, it will also be
/// holding the ongoing process while running.
///
/// [`BackupProvider`]: crate::imex::BackupProvider
pub(crate) export_provider: std::sync::Mutex<Option<Qr>>,
}
#[derive(Debug)]
@@ -384,6 +393,7 @@ impl Context {
last_full_folder_scan: Mutex::new(None),
last_error: std::sync::RwLock::new("".to_string()),
debug_logging: RwLock::new(None),
export_provider: std::sync::Mutex::new(None),
};
let ctx = Context {
@@ -558,6 +568,17 @@ impl Context {
}
}
/// Returns the QR-code of the currently running [`BackupProvider`].
///
/// [`BackupProvider`]: crate::imex::BackupProvider
pub fn backup_export_qr(&self) -> Option<Qr> {
self.export_provider
.lock()
.expect("poisoned lock")
.as_ref()
.cloned()
}
/*******************************************************************************
* UI chat/message related API
******************************************************************************/

View File

@@ -91,7 +91,7 @@ pub async fn imex(
let cancel = context.alloc_ongoing().await?;
let res = {
let mut guard = context.scheduler.pause(context.clone()).await;
let mut guard = context.scheduler.pause(context).await;
let res = imex_inner(context, what, path, passphrase)
.race(async {
cancel.recv().await.ok();

View File

@@ -91,7 +91,6 @@ impl BackupProvider {
// Acquire global "ongoing" mutex.
let cancel_token = context.alloc_ongoing().await?;
let mut paused_guard = context.scheduler.pause(context.clone()).await;
let context_dir = context
.get_blobdir()
.parent()
@@ -119,21 +118,19 @@ impl BackupProvider {
Ok((provider, ticket)) => (provider, ticket),
Err(err) => {
context.free_ongoing().await;
paused_guard.resume().await;
return Err(err);
}
};
let handle = {
let context = context.clone();
tokio::spawn(async move {
let res = Self::watch_provider(&context, provider, cancel_token).await;
context.free_ongoing().await;
paused_guard.resume().await;
drop(dbfile);
res
})
};
Ok(Self { handle, ticket })
let handle = tokio::spawn(Self::watch_provider(
context.clone(),
provider,
cancel_token,
dbfile,
));
let slf = Self { handle, ticket };
let qr = slf.qr();
*context.export_provider.lock().expect("poisoned lock") = Some(qr);
Ok(slf)
}
/// Creates the provider task.
@@ -184,9 +181,10 @@ impl BackupProvider {
/// The *cancel_token* is the handle for the ongoing process mutex, when this completes
/// we must cancel this operation.
async fn watch_provider(
context: &Context,
context: Context,
mut provider: Provider,
cancel_token: Receiver<()>,
_dbfile: TempPathGuard,
) -> Result<()> {
// _dbfile exists so we can clean up the file once it is no longer needed
let mut events = provider.subscribe();
@@ -252,6 +250,11 @@ impl BackupProvider {
},
}
};
context
.export_provider
.lock()
.expect("poisoned lock")
.take();
match &res {
Ok(_) => context.emit_event(SendProgress::Completed.into()),
Err(err) => {
@@ -259,6 +262,7 @@ impl BackupProvider {
context.emit_event(SendProgress::Failed.into())
}
}
context.free_ongoing().await;
res
}
@@ -369,7 +373,7 @@ pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
!context.is_configured().await?,
"Cannot import backups to accounts in use."
);
let mut guard = context.scheduler.pause(context.clone()).await;
let mut guard = context.scheduler.pause(context).await;
// Acquire global "ongoing" mutex.
let cancel_token = context.alloc_ongoing().await?;
@@ -460,7 +464,7 @@ async fn transfer_from_provider(
opts,
on_connected,
|collection| {
context.emit_event(ReceiveProgress::CollectionReceived.into());
context.emit_event(ReceiveProgress::CollectionRecieved.into());
progress.set_total(collection.total_blobs_size());
async { Ok(()) }
},
@@ -571,7 +575,7 @@ fn spawn_progress_proxy(context: Context, mut rx: broadcast::Receiver<u16>) {
#[derive(Debug)]
enum ReceiveProgress {
Connected,
CollectionReceived,
CollectionRecieved,
/// A value between 0 and 85 interpreted as a percentage.
///
/// Other values are already used by the other variants of this enum.
@@ -593,7 +597,7 @@ impl From<ReceiveProgress> for EventType {
fn from(source: ReceiveProgress) -> Self {
let val = match source {
ReceiveProgress::Connected => 50,
ReceiveProgress::CollectionReceived => 100,
ReceiveProgress::CollectionRecieved => 100,
ReceiveProgress::BlobProgress(val) => 100 + 10 * val,
ReceiveProgress::Completed => 1000,
ReceiveProgress::Failed => 0,

View File

@@ -147,6 +147,7 @@ impl<T, E: std::fmt::Display> LogExt<T, E> for Result<T, E> {
);
// We can't use the warn!() macro here as the file!() and line!() macros
// don't work with #[track_caller]
context.set_last_error(&full);
context.emit_event(crate::EventType::Warning(full));
};
self

View File

@@ -95,10 +95,10 @@ impl SchedulerState {
/// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
/// resume will do the right thing and restore the scheduler to the state requested by
/// the last call.
pub(crate) async fn pause<'a>(&'_ self, context: Context) -> IoPausedGuard {
pub(crate) async fn pause<'a>(&'_ self, context: &'a Context) -> IoPausedGuard<'a> {
let mut inner = self.inner.write().await;
inner.paused = true;
Self::do_stop(inner, &context).await;
Self::do_stop(inner, context).await;
IoPausedGuard {
context,
done: false,
@@ -195,12 +195,12 @@ struct InnerSchedulerState {
}
#[derive(Debug)]
pub(crate) struct IoPausedGuard {
context: Context,
pub(crate) struct IoPausedGuard<'a> {
context: &'a Context,
done: bool,
}
impl IoPausedGuard {
impl<'a> IoPausedGuard<'a> {
pub(crate) async fn resume(&mut self) {
self.done = true;
let mut inner = self.context.scheduler.inner.write().await;
@@ -211,7 +211,7 @@ impl IoPausedGuard {
}
}
impl Drop for IoPausedGuard {
impl<'a> Drop for IoPausedGuard<'a> {
fn drop(&mut self) {
if self.done {
return;

View File

@@ -143,7 +143,7 @@ impl Smtp {
// Run STARTTLS command and convert the client back into a stream.
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, BufStream::new(socks5_stream)).await?;
let tcp_stream = transport.starttls().await?.into_inner();
let tcp_stream = transport.starttls().await?;
let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream)
.await
.context("STARTTLS upgrade failed")?;
@@ -199,7 +199,7 @@ impl Smtp {
// Run STARTTLS command and convert the client back into a stream.
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, BufStream::new(tcp_stream)).await?;
let tcp_stream = transport.starttls().await?.into_inner();
let tcp_stream = transport.starttls().await?;
let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream)
.await
.context("STARTTLS upgrade failed")?;

View File

@@ -306,49 +306,37 @@ impl Sql {
}
}
/// Locks the write transactions mutex in order to make sure that there never are
/// multiple write transactions at once.
///
/// Doing the locking ourselves instead of relying on SQLite has these reasons:
///
/// - SQLite's locking mechanism is non-async, blocking a thread
/// - SQLite's locking mechanism just sleeps in a loop, which is really inefficient
///
/// ---
///
/// More considerations on alternatives to the current approach:
///
/// We use [DEFERRED](https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions) transactions.
///
/// In order to never get concurrency issues, we could make all transactions IMMEDIATE,
/// but this would mean that there can never be two simultaneous transactions.
///
/// Read transactions can simply be made DEFERRED to run in parallel w/o any drawbacks.
///
/// DEFERRED write transactions without doing the locking ourselves would have these drawbacks:
///
/// 1. As mentioned above, SQLite's locking mechanism is non-async and sleeps in a loop.
/// 2. If there are other write transactions, we block the db connection until
/// upgraded. If some reader comes then, it has to get the next, less used connection with a
/// worse per-connection page cache (SQLite allows one write and any number of reads in parallel).
/// 3. If a transaction is blocked for more than `busy_timeout`, it fails with SQLITE_BUSY.
/// 4. If upon a successful upgrade to a write transaction the db has been modified,
/// the transaction has to be rolled back and retried, which means extra work in terms of
/// Locks the write transactions mutex.
/// We do not make all transactions
/// [IMMEDIATE](https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions)
/// for more parallelism -- at least read transactions can be made DEFERRED to run in parallel
/// w/o any drawbacks. But if we make write transactions DEFERRED also w/o any external locking,
/// then they are upgraded from read to write ones on the first write statement. This has some
/// drawbacks:
/// - If there are other write transactions, we block the thread and the db connection until
/// upgraded. Also if some reader comes then, it has to get next, less used connection with a
/// worse per-connection page cache.
/// - If a transaction is blocked for more than busy_timeout, it fails with SQLITE_BUSY.
/// - Configuring busy_timeout is not the best way to manage transaction timeouts, we would
/// prefer it to be integrated with Rust/tokio asyncs. Moreover, SQLite implements waiting
/// using sleeps.
/// - If upon a successful upgrade to a write transaction the db has been modified by another
/// one, the transaction has to be rolled back and retried. It is an extra work in terms of
/// CPU/battery.
///
/// The only pro of making write transactions DEFERRED w/o the external locking would be some
/// parallelism between them.
///
/// Another option would be to make write transactions IMMEDIATE, also
/// w/o the external locking. But then cons 1. - 3. above would still be valid.
/// - Maybe minor, but we lose some fairness in servicing write transactions, i.e. we service
/// them in the order of the first write statement, not in the order they come.
/// The only pro of making write transactions DEFERRED w/o the external locking is some
/// parallelism between them. Also we have an option to make write transactions IMMEDIATE, also
/// w/o the external locking. But then the most of cons above are still valid. Instead, if we
/// perform all write transactions under an async mutex, the only cons is losing some
/// parallelism for write transactions.
pub async fn write_lock(&self) -> MutexGuard<'_, ()> {
self.write_mtx.lock().await
}
/// Allocates a connection and calls `function` with the connection. If `function` does write
/// queries,
/// - either first take a lock using `write_lock()`
/// - or use `call_write()` instead.
/// queries, either a lock must be taken first using `write_lock()` or `call_write()` used
/// instead.
///
/// Returns the result of the function.
async fn call<'a, F, R>(&'a self, function: F) -> Result<R>