mirror of
https://github.com/chatmail/core.git
synced 2026-04-02 13:32:11 +03:00
Compare commits
152 Commits
link2xt/bu
...
link2xt/py
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
959ca06691 | ||
|
|
e985588c6c | ||
|
|
7ec3a1a9a2 | ||
|
|
19fa86b276 | ||
|
|
c4657991c8 | ||
|
|
484aebdb16 | ||
|
|
9c15cd5c8f | ||
|
|
8302d22622 | ||
|
|
034cde9289 | ||
|
|
02455d8485 | ||
|
|
35f50a8965 | ||
|
|
e04efdbd94 | ||
|
|
57445eedb1 | ||
|
|
a501f10756 | ||
|
|
5d80d4788c | ||
|
|
0c02886005 | ||
|
|
24856f3050 | ||
|
|
8e6434068e | ||
|
|
800d2b14a5 | ||
|
|
3a861d2f84 | ||
|
|
4ba00f7440 | ||
|
|
40fc61da4f | ||
|
|
eb0f896d57 | ||
|
|
71bb89fac1 | ||
|
|
b89199db54 | ||
|
|
e39429c2e3 | ||
|
|
17de3d3236 | ||
|
|
3177f9967d | ||
|
|
81418d8ee5 | ||
|
|
a2e7d914a0 | ||
|
|
4bf38c0e29 | ||
|
|
0079cd4766 | ||
|
|
2c3b2b8c2d | ||
|
|
52fa58a3ce | ||
|
|
32a7e5ed82 | ||
|
|
097113f01e | ||
|
|
1d42e4743f | ||
|
|
5ecdea47db | ||
|
|
5b92b6355e | ||
|
|
5eb7206b2d | ||
|
|
0a65081db0 | ||
|
|
a59e72e7d8 | ||
|
|
fd358617f5 | ||
|
|
b26a351786 | ||
|
|
575b43d9a0 | ||
|
|
0a5542a698 | ||
|
|
518bd19e96 | ||
|
|
961e3ad7e2 | ||
|
|
7a49e9401f | ||
|
|
3701936129 | ||
|
|
c02686b56e | ||
|
|
9a7ff9d2b1 | ||
|
|
56f6d6849e | ||
|
|
cbc18ee5a4 | ||
|
|
14521cfc2d | ||
|
|
5e4807b7ac | ||
|
|
28d9bec0b4 | ||
|
|
05e50ea787 | ||
|
|
02afacf989 | ||
|
|
c7de4f66e7 | ||
|
|
c9b8c5079b | ||
|
|
eec5ae96e8 | ||
|
|
4b94eadf5e | ||
|
|
52a1886937 | ||
|
|
9767f51c3d | ||
|
|
6674b888cc | ||
|
|
a5e6bd3e8e | ||
|
|
b6c24932a7 | ||
|
|
d73d56c399 | ||
|
|
731e90f0d5 | ||
|
|
e0a6c2ef54 | ||
|
|
c5408e0561 | ||
|
|
c1a2df91ac | ||
|
|
da85c2412e | ||
|
|
d108f9b3e3 | ||
|
|
e3014a349c | ||
|
|
9d88ef069e | ||
|
|
155dff2813 | ||
|
|
38d4ea8514 | ||
|
|
6f24874eb8 | ||
|
|
2d20812652 | ||
|
|
5762fbb9a7 | ||
|
|
0e06da22df | ||
|
|
5833a9b347 | ||
|
|
0ef8d57881 | ||
|
|
fc64c33368 | ||
|
|
1b39be8a42 | ||
|
|
a1e19e2c41 | ||
|
|
b920db12c7 | ||
|
|
73b90eee3e | ||
|
|
4637a28bf6 | ||
|
|
d0638c1542 | ||
|
|
788d3125a3 | ||
|
|
3c4ffc3550 | ||
|
|
ada858f439 | ||
|
|
f2570945c6 | ||
|
|
8072f78058 | ||
|
|
8ae0ee5a67 | ||
|
|
a75d2b1c80 | ||
|
|
c48c2af7a1 | ||
|
|
490a14c5ef | ||
|
|
dcce6ef50b | ||
|
|
7cf0820d2b | ||
|
|
0bae3caaff | ||
|
|
bca0b256c9 | ||
|
|
a53d30c459 | ||
|
|
7a9f497aa7 | ||
|
|
f9f9bc3efb | ||
|
|
904990bf91 | ||
|
|
b2266ffca1 | ||
|
|
bb9a3d4b8e | ||
|
|
e565e19b42 | ||
|
|
41319c85c7 | ||
|
|
daf56804a5 | ||
|
|
6f7a43804d | ||
|
|
0ca76d36ef | ||
|
|
ec5789997a | ||
|
|
7a0d61bbb0 | ||
|
|
1c2461974d | ||
|
|
2a754744fe | ||
|
|
b413593c43 | ||
|
|
c73edd7e21 | ||
|
|
a34a69d8e4 | ||
|
|
020a9d33f6 | ||
|
|
19f6f89312 | ||
|
|
d56e05a11a | ||
|
|
c379a4e5a7 | ||
|
|
44c1efe4e4 | ||
|
|
ff0d675082 | ||
|
|
e1087b4145 | ||
|
|
323535584b | ||
|
|
852adbe514 | ||
|
|
4c78553d90 | ||
|
|
a31ae5297a | ||
|
|
e7792a0c65 | ||
|
|
3c32de1859 | ||
|
|
6a3fe3db92 | ||
|
|
ac048c154d | ||
|
|
3f51a8ffc2 | ||
|
|
2129b2b7a0 | ||
|
|
3734fc25a7 | ||
|
|
05ddc13054 | ||
|
|
716504b833 | ||
|
|
187861c3b2 | ||
|
|
0b075ac762 | ||
|
|
a6c889ed5e | ||
|
|
ca1533b0e4 | ||
|
|
3267596a30 | ||
|
|
5f29b93970 | ||
|
|
2a6a21c33a | ||
|
|
059af398eb | ||
|
|
6044e5961b |
126
.github/workflows/deltachat-rpc-server.yml
vendored
126
.github/workflows/deltachat-rpc-server.yml
vendored
@@ -8,35 +8,14 @@ concurrency:
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
release:
|
||||
types: [published]
|
||||
|
||||
jobs:
|
||||
# Build a version statically linked against musl libc
|
||||
# to avoid problems with glibc version incompatibility.
|
||||
build_static_linux:
|
||||
name: Build deltachat-rpc-server for Linux
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- name: Setup rust target
|
||||
run: rustup target add x86_64-unknown-linux-musl
|
||||
|
||||
- name: Install musl-gcc
|
||||
run: sudo apt install musl-tools
|
||||
|
||||
- name: Build
|
||||
env:
|
||||
RUSTFLAGS: "-C link-arg=-s"
|
||||
run: cargo build --release --target x86_64-unknown-linux-musl -p deltachat-rpc-server --features vendored
|
||||
- name: Upload binary
|
||||
uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: deltachat-rpc-server-x86_64
|
||||
path: target/x86_64-unknown-linux-musl/release/deltachat-rpc-server
|
||||
if-no-files-found: error
|
||||
|
||||
build_linux:
|
||||
name: Cross-compile deltachat-rpc-server for aarch64 and armv7 Linux
|
||||
name: Cross-compile deltachat-rpc-server for x86_64, aarch64 and armv7 Linux
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
@@ -44,6 +23,13 @@ jobs:
|
||||
- name: Build
|
||||
run: sh scripts/zig-rpc-server.sh
|
||||
|
||||
- name: Upload x86_64 binary
|
||||
uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: deltachat-rpc-server-x86_64
|
||||
path: target/x86_64-unknown-linux-musl/release/deltachat-rpc-server
|
||||
if-no-files-found: error
|
||||
|
||||
- name: Upload aarch64 binary
|
||||
uses: actions/upload-artifact@v3
|
||||
with:
|
||||
@@ -58,50 +44,6 @@ jobs:
|
||||
path: target/armv7-unknown-linux-musleabihf/release/deltachat-rpc-server
|
||||
if-no-files-found: error
|
||||
|
||||
build_android:
|
||||
name: Cross-compile deltachat-rpc-server for Android (armeabi-v7a, arm64-v8a, x86 and x86_64)
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- uses: nttld/setup-ndk@v1
|
||||
id: setup-ndk
|
||||
with:
|
||||
ndk-version: r21d
|
||||
|
||||
- name: Build
|
||||
env:
|
||||
ANDROID_NDK_ROOT: ${{ steps.setup-ndk.outputs.ndk-path }}
|
||||
run: sh scripts/android-rpc-server.sh
|
||||
|
||||
- name: Upload binary
|
||||
uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: deltachat-rpc-server-android-armv7
|
||||
path: target/armv7-linux-androideabi/release/deltachat-rpc-server
|
||||
if-no-files-found: error
|
||||
|
||||
- name: Upload binary
|
||||
uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: deltachat-rpc-server-android-aarch64
|
||||
path: target/aarch64-linux-android/release/deltachat-rpc-server
|
||||
if-no-files-found: error
|
||||
|
||||
- name: Upload binary
|
||||
uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: deltachat-rpc-server-android-i686
|
||||
path: target/i686-linux-android/release/deltachat-rpc-server
|
||||
if-no-files-found: error
|
||||
|
||||
- name: Upload binary
|
||||
uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: deltachat-rpc-server-android-x86_64
|
||||
path: target/x86_64-linux-android/release/deltachat-rpc-server
|
||||
if-no-files-found: error
|
||||
|
||||
build_windows:
|
||||
name: Build deltachat-rpc-server for Windows
|
||||
strategy:
|
||||
@@ -134,3 +76,51 @@ jobs:
|
||||
name: deltachat-rpc-server-${{ matrix.artifact }}
|
||||
path: target/${{ matrix.target}}/release/${{ matrix.path }}
|
||||
if-no-files-found: error
|
||||
|
||||
publish:
|
||||
name: Upload binaries to the release
|
||||
needs: ["build_linux", "build_windows"]
|
||||
permissions:
|
||||
contents: write
|
||||
runs-on: "ubuntu-latest"
|
||||
steps:
|
||||
- name: Download deltachat-rpc-server-x86_64
|
||||
uses: "actions/download-artifact@v3"
|
||||
with:
|
||||
name: "deltachat-rpc-server-x86_64"
|
||||
path: "dist/deltachat-rpc-server-x86_64"
|
||||
|
||||
- name: Download deltachat-rpc-server-aarch64
|
||||
uses: "actions/download-artifact@v3"
|
||||
with:
|
||||
name: "deltachat-rpc-server-aarch64"
|
||||
path: "dist/deltachat-rpc-server-aarch64"
|
||||
|
||||
- name: Download deltachat-rpc-server-armv7
|
||||
uses: "actions/download-artifact@v3"
|
||||
with:
|
||||
name: "deltachat-rpc-server-armv7"
|
||||
path: "dist/deltachat-rpc-server-armv7"
|
||||
|
||||
- name: Download deltachat-rpc-server-win32.exe
|
||||
uses: "actions/download-artifact@v3"
|
||||
with:
|
||||
name: "deltachat-rpc-server-win32.exe"
|
||||
path: "dist/deltachat-rpc-server-win32.exe"
|
||||
|
||||
- name: Download deltachat-rpc-server-win64.exe
|
||||
uses: "actions/download-artifact@v3"
|
||||
with:
|
||||
name: "deltachat-rpc-server-win64.exe"
|
||||
path: "dist/deltachat-rpc-server-win64.exe"
|
||||
|
||||
- name: List downloaded artifacts
|
||||
run: ls -l dist/
|
||||
|
||||
- name: Upload binaries to the GitHub release
|
||||
env:
|
||||
GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}"
|
||||
run: |
|
||||
gh release upload ${{ github.ref_name }} \
|
||||
--repo ${{ github.repository }} \
|
||||
dist/deltachat-rpc-server-*
|
||||
|
||||
15
CHANGELOG.md
15
CHANGELOG.md
@@ -1,6 +1,6 @@
|
||||
# Changelog
|
||||
|
||||
## Unreleased
|
||||
## [Unreleased]
|
||||
|
||||
### Changes
|
||||
- "full message view" not needed because of footers that go to contact status #4151
|
||||
@@ -8,6 +8,11 @@
|
||||
- Support non-persistent configuration with DELTACHAT_* env
|
||||
- Print deltachat-repl errors with causes. #4166
|
||||
- Increase MSRV to 1.64. #4167
|
||||
- Core takes care of stopping and re-starting IO itself where needed,
|
||||
e.g. during backup creation. It is no longer needed to call
|
||||
dc_stop_io(). dc_start_io() can now be called at any time without
|
||||
harm. #4138
|
||||
- More accurate maybe_add_bcc_self device message text #4175
|
||||
|
||||
### Fixes
|
||||
- Fix segmentation fault if `dc_context_unref()` is called during
|
||||
@@ -16,9 +21,10 @@
|
||||
during handling the JSON-RPC request. #4153
|
||||
- Delete expired messages using multiple SQL requests. #4158
|
||||
- Do not emit "Failed to run incremental vacuum" warnings on success. #4160
|
||||
- Ability to send backup over network and QR code to setup second device #4007
|
||||
- Disable buffering during STARTTLS setup. #4190
|
||||
|
||||
|
||||
## 1.111.0
|
||||
## [1.111.0] - 2023-03-05
|
||||
|
||||
### Changes
|
||||
- Make smeared timestamp generation non-async. #4075
|
||||
@@ -2307,3 +2313,6 @@
|
||||
For a full list of changes, please see our closed Pull Requests:
|
||||
|
||||
https://github.com/deltachat/deltachat-core-rust/pulls?q=is%3Apr+is%3Aclosed
|
||||
|
||||
[unreleased]: https://github.com/deltachat/deltachat-core-rust/compare/v1.111.0...HEAD
|
||||
[1.111.0]: https://github.com/deltachat/deltachat-core-rust/compare/v1.110.0...v1.111.0
|
||||
|
||||
1553
Cargo.lock
generated
1553
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -24,6 +24,11 @@ lto = true
|
||||
panic = 'abort'
|
||||
opt-level = "z"
|
||||
|
||||
[patch.crates-io]
|
||||
default-net = { git = "https://github.com/dignifiedquire/default-net.git", branch="feat-android" }
|
||||
quinn-udp = { git = "https://github.com/quinn-rs/quinn", branch="main" }
|
||||
quinn-proto = { git = "https://github.com/quinn-rs/quinn", branch="main" }
|
||||
|
||||
[dependencies]
|
||||
deltachat_derive = { path = "./deltachat_derive" }
|
||||
format-flowed = { path = "./format-flowed" }
|
||||
@@ -48,6 +53,8 @@ futures-lite = "1.12.0"
|
||||
hex = "0.4.0"
|
||||
humansize = "2"
|
||||
image = { version = "0.24.5", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] }
|
||||
# iroh = { version = "0.3.0", default-features = false }
|
||||
iroh = { git = 'https://github.com/n0-computer/iroh', branch = "flub/ticket-multiple-addrs" }
|
||||
kamadak-exif = "0.5"
|
||||
lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" }
|
||||
libc = "0.2"
|
||||
@@ -95,6 +102,7 @@ log = "0.4"
|
||||
pretty_env_logger = "0.4"
|
||||
proptest = { version = "1", default-features = false, features = ["std"] }
|
||||
tempfile = "3"
|
||||
testdir = "0.7.2"
|
||||
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "macros"] }
|
||||
|
||||
[workspace]
|
||||
|
||||
@@ -17,7 +17,7 @@ crate-type = ["cdylib", "staticlib"]
|
||||
deltachat = { path = "../", default-features = false }
|
||||
deltachat-jsonrpc = { path = "../deltachat-jsonrpc", optional = true }
|
||||
libc = "0.2"
|
||||
human-panic = "1"
|
||||
human-panic = { version = "1", default-features = false }
|
||||
num-traits = "0.2"
|
||||
serde_json = "1.0"
|
||||
tokio = { version = "1", features = ["rt-multi-thread"] }
|
||||
|
||||
@@ -24,6 +24,7 @@ typedef struct _dc_provider dc_provider_t;
|
||||
typedef struct _dc_event dc_event_t;
|
||||
typedef struct _dc_event_emitter dc_event_emitter_t;
|
||||
typedef struct _dc_jsonrpc_instance dc_jsonrpc_instance_t;
|
||||
typedef struct _dc_backup_provider dc_backup_provider_t;
|
||||
|
||||
// Alias for backwards compatibility, use dc_event_emitter_t instead.
|
||||
typedef struct _dc_event_emitter dc_accounts_event_emitter_t;
|
||||
@@ -2100,8 +2101,7 @@ dc_contact_t* dc_get_contact (dc_context_t* context, uint32_t co
|
||||
|
||||
/**
|
||||
* Import/export things.
|
||||
* During backup import/export IO must not be started,
|
||||
* if needed stop IO using dc_accounts_stop_io() or dc_stop_io() first.
|
||||
*
|
||||
* What to do is defined by the _what_ parameter which may be one of the following:
|
||||
*
|
||||
* - **DC_IMEX_EXPORT_BACKUP** (11) - Export a backup to the directory given as `param1`
|
||||
@@ -2295,6 +2295,7 @@ void dc_stop_ongoing_process (dc_context_t* context);
|
||||
#define DC_QR_FPR_MISMATCH 220 // id=contact
|
||||
#define DC_QR_FPR_WITHOUT_ADDR 230 // test1=formatted fingerprint
|
||||
#define DC_QR_ACCOUNT 250 // text1=domain
|
||||
#define DC_QR_BACKUP 251
|
||||
#define DC_QR_WEBRTC_INSTANCE 260 // text1=domain, text2=instance pattern
|
||||
#define DC_QR_ADDR 320 // id=contact
|
||||
#define DC_QR_TEXT 330 // text1=text
|
||||
@@ -2320,7 +2321,7 @@ void dc_stop_ongoing_process (dc_context_t* context);
|
||||
* ask whether to verify the contact;
|
||||
* if so, start the protocol with dc_join_securejoin().
|
||||
*
|
||||
* - DC_QR_ASK_VERIFYGROUP withdc_lot_t::text1=Group name:
|
||||
* - DC_QR_ASK_VERIFYGROUP with dc_lot_t::text1=Group name:
|
||||
* ask whether to join the group;
|
||||
* if so, start the protocol with dc_join_securejoin().
|
||||
*
|
||||
@@ -2340,6 +2341,10 @@ void dc_stop_ongoing_process (dc_context_t* context);
|
||||
* ask the user if they want to create an account on the given domain,
|
||||
* if so, call dc_set_config_from_qr() and then dc_configure().
|
||||
*
|
||||
* - DC_QR_BACKUP:
|
||||
* ask the user if they want to set up a new device.
|
||||
* If so, pass the qr-code to dc_receive_backup().
|
||||
*
|
||||
* - DC_QR_WEBRTC_INSTANCE with dc_lot_t::text1=domain:
|
||||
* ask the user if they want to use the given service for video chats;
|
||||
* if so, call dc_set_config_from_qr().
|
||||
@@ -2630,6 +2635,117 @@ char* dc_get_last_error (dc_context_t* context);
|
||||
void dc_str_unref (char* str);
|
||||
|
||||
|
||||
/**
|
||||
* @class dc_backup_provider_t
|
||||
*
|
||||
* Set up another device.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Creates an object for sending a backup to another device.
|
||||
*
|
||||
* The backup is sent to through a peer-to-peer channel which is bootstrapped
|
||||
* by a QR-code. The backup contains the entire state of the account
|
||||
* including credentials. This can be used to setup a new device.
|
||||
*
|
||||
* This is a blocking call as some preparations are made like e.g. exporting
|
||||
* the database. Once this function returns, the backup is being offered to
|
||||
* remote devices. To wait until one device received the backup, use
|
||||
* dc_backup_provider_wait(). Alternatively abort the operation using
|
||||
* dc_stop_ongoing_process().
|
||||
*
|
||||
* During execution of the job #DC_EVENT_IMEX_PROGRESS is sent out to indicate
|
||||
* state and progress.
|
||||
*
|
||||
* @memberof dc_backup_provider_t
|
||||
* @param context The context.
|
||||
* @return Opaque object for sending the backup.
|
||||
* On errors, NULL is returned and dc_get_last_error() returns an error that
|
||||
* should be shown to the user.
|
||||
*/
|
||||
dc_backup_provider_t* dc_backup_provider_new (dc_context_t* context);
|
||||
|
||||
|
||||
/**
|
||||
* Returns the QR code text that will offer the backup to other devices.
|
||||
*
|
||||
* The QR code contains a ticket which will validate the backup and provide
|
||||
* authentication for both the provider and the recipient.
|
||||
*
|
||||
* The scanning device should call the scanned text to dc_check_qr(). If
|
||||
* dc_check_qr() returns DC_QR_BACKUP, the backup transfer can be started using
|
||||
* dc_get_backup().
|
||||
*
|
||||
* @memberof dc_backup_provider_t
|
||||
* @param backup_provider The backup provider object as created by
|
||||
* dc_backup_provider_new().
|
||||
* @return The text that should be put in the QR code.
|
||||
* On errors an empty string is returned, NULL is never returned.
|
||||
* the returned string must be released using dc_str_unref() after usage.
|
||||
*/
|
||||
char* dc_backup_provider_get_qr (const dc_backup_provider_t* backup_provider);
|
||||
|
||||
|
||||
/**
|
||||
* Returns the QR code SVG image that will offer the backup to other devices.
|
||||
*
|
||||
* This works like dc_backup_provider_qr() but returns the text of a rendered
|
||||
* SVG image containing the QR code.
|
||||
*
|
||||
* @memberof dc_backup_provider_t
|
||||
* @param backup_provider The backup provider object as created by
|
||||
* dc_backup_provider_new().
|
||||
* @return The QR code rendered as SVG.
|
||||
* On errors an empty string is returned, NULL is never returned.
|
||||
* the returned string must be released using dc_str_unref() after usage.
|
||||
*/
|
||||
char* dc_backup_provider_get_qr_svg (const dc_backup_provider_t* backup_provider);
|
||||
|
||||
/**
|
||||
* Waits for the sending to finish.
|
||||
*
|
||||
* This is a blocking call and should only be called once.
|
||||
*
|
||||
* @memberof dc_backup_provider_t
|
||||
* @param backup_provider The backup provider object as created by
|
||||
* dc_backup_provider_new(). If NULL is given nothing is done.
|
||||
*/
|
||||
void dc_backup_provider_wait (dc_backup_provider_t* backup_provider);
|
||||
|
||||
/**
|
||||
* Frees a dc_backup_provider_t object.
|
||||
*
|
||||
* @memberof dc_backup_provider_t
|
||||
* @param backup_provider The backup provider object as created by
|
||||
* dc_backup_provider_new().
|
||||
*/
|
||||
void dc_backup_provider_unref (dc_backup_provider_t* backup_provider);
|
||||
|
||||
/**
|
||||
* Gets a backup offered by a dc_backup_provider_t object on another device.
|
||||
*
|
||||
* This function is called on a device that scanned the QR code offered by
|
||||
* dc_backup_sender_qr() or dc_backup_sender_qr_svg(). Typically this is a
|
||||
* different device than that which provides the backup.
|
||||
*
|
||||
* This call will block while the backup is being transferred and only
|
||||
* complete on success or failure. Use dc_stop_ongoing_process() to abort it
|
||||
* early.
|
||||
*
|
||||
* During execution of the job #DC_EVENT_IMEX_PROGRESS is sent out to indicate
|
||||
* state and progress. The process is finished when the event emits either 0
|
||||
* or 1000, 0 means it failed and 1000 means it succeeded. These events are
|
||||
* for showing progress and informational only, success and failure is also
|
||||
* shown in the return code of this function.
|
||||
*
|
||||
* @memberof dc_context_t
|
||||
* @param context The context.
|
||||
* @param qr The qr code text, dc_check_qr() must have returned DC_QR_BACKUP
|
||||
* on this text.
|
||||
* @return 0=failure, 1=success.
|
||||
*/
|
||||
int dc_receive_backup (dc_context_t* context, const char* qr);
|
||||
|
||||
/**
|
||||
* @class dc_accounts_t
|
||||
*
|
||||
|
||||
@@ -28,9 +28,10 @@ use deltachat::constants::DC_MSG_ID_LAST_SPECIAL;
|
||||
use deltachat::contact::{Contact, ContactId, Origin};
|
||||
use deltachat::context::Context;
|
||||
use deltachat::ephemeral::Timer as EphemeralTimer;
|
||||
use deltachat::imex::BackupProvider;
|
||||
use deltachat::key::DcKey;
|
||||
use deltachat::message::MsgId;
|
||||
use deltachat::qr_code_generator::get_securejoin_qr_svg;
|
||||
use deltachat::qr_code_generator::{generate_backup_qr, get_securejoin_qr_svg};
|
||||
use deltachat::reaction::{get_msg_reactions, send_reaction, Reactions};
|
||||
use deltachat::stock_str::StockMessage;
|
||||
use deltachat::stock_str::StockStrings;
|
||||
@@ -4142,6 +4143,116 @@ pub unsafe extern "C" fn dc_str_unref(s: *mut libc::c_char) {
|
||||
libc::free(s as *mut _)
|
||||
}
|
||||
|
||||
pub struct BackupProviderWrapper {
|
||||
context: *const dc_context_t,
|
||||
provider: BackupProvider,
|
||||
}
|
||||
|
||||
pub type dc_backup_provider_t = BackupProviderWrapper;
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn dc_backup_provider_new(
|
||||
context: *mut dc_context_t,
|
||||
) -> *mut dc_backup_provider_t {
|
||||
if context.is_null() {
|
||||
eprintln!("ignoring careless call to dc_backup_provider_new()");
|
||||
return ptr::null_mut();
|
||||
}
|
||||
let ctx = &*context;
|
||||
block_on(BackupProvider::prepare(ctx))
|
||||
.map(|provider| BackupProviderWrapper {
|
||||
context: ctx,
|
||||
provider,
|
||||
})
|
||||
.map(|ffi_provider| Box::into_raw(Box::new(ffi_provider)))
|
||||
.log_err(ctx, "BackupProvider failed")
|
||||
.context("BackupProvider failed")
|
||||
.set_last_error(ctx)
|
||||
.unwrap_or(ptr::null_mut())
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn dc_backup_provider_get_qr(
|
||||
provider: *const dc_backup_provider_t,
|
||||
) -> *mut libc::c_char {
|
||||
if provider.is_null() {
|
||||
eprintln!("ignoring careless call to dc_backup_provider_qr");
|
||||
return "".strdup();
|
||||
}
|
||||
let ffi_provider = &*provider;
|
||||
let ctx = &*ffi_provider.context;
|
||||
deltachat::qr::format_backup(&ffi_provider.provider.qr())
|
||||
.log_err(ctx, "BackupProvider get_qr failed")
|
||||
.context("BackupProvider get_qr failed")
|
||||
.set_last_error(ctx)
|
||||
.unwrap_or_default()
|
||||
.strdup()
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn dc_backup_provider_get_qr_svg(
|
||||
provider: *const dc_backup_provider_t,
|
||||
) -> *mut libc::c_char {
|
||||
if provider.is_null() {
|
||||
eprintln!("ignoring careless call to dc_backup_provider_qr_svg()");
|
||||
return "".strdup();
|
||||
}
|
||||
let ffi_provider = &*provider;
|
||||
let ctx = &*ffi_provider.context;
|
||||
let provider = &ffi_provider.provider;
|
||||
block_on(generate_backup_qr(ctx, &provider.qr()))
|
||||
.log_err(ctx, "BackupProvider get_qr_svg failed")
|
||||
.context("BackupProvider get_qr_svg failed")
|
||||
.set_last_error(ctx)
|
||||
.unwrap_or_default()
|
||||
.strdup()
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn dc_backup_provider_wait(provider: *mut dc_backup_provider_t) {
|
||||
if provider.is_null() {
|
||||
eprintln!("ignoring careless call to dc_backup_provider_wait()");
|
||||
return;
|
||||
}
|
||||
let ffi_provider = &mut *provider;
|
||||
let ctx = &*ffi_provider.context;
|
||||
let provider = &mut ffi_provider.provider;
|
||||
block_on(provider)
|
||||
.log_err(ctx, "Failed to await BackupProvider")
|
||||
.context("Failed to await BackupProvider")
|
||||
.set_last_error(ctx)
|
||||
.ok();
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn dc_backup_provider_unref(provider: *mut dc_backup_provider_t) {
|
||||
drop(Box::from_raw(provider));
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn dc_receive_backup(
|
||||
context: *mut dc_context_t,
|
||||
qr: *const libc::c_char,
|
||||
) -> libc::c_int {
|
||||
if context.is_null() {
|
||||
eprintln!("ignoring careless call to dc_receive_backup()");
|
||||
return 0;
|
||||
}
|
||||
let ctx = &*context;
|
||||
let qr_text = to_string_lossy(qr);
|
||||
let qr = match block_on(qr::check_qr(ctx, &qr_text)).log_err(ctx, "Invalid QR code") {
|
||||
Ok(qr) => qr,
|
||||
Err(_) => return 0,
|
||||
};
|
||||
spawn(async move {
|
||||
imex::get_backup(ctx, qr)
|
||||
.await
|
||||
.log_err(ctx, "Get backup failed")
|
||||
.ok();
|
||||
});
|
||||
1
|
||||
}
|
||||
|
||||
trait ResultExt<T, E> {
|
||||
/// Like `log_err()`, but:
|
||||
/// - returns the default value instead of an Err value.
|
||||
@@ -4162,6 +4273,56 @@ impl<T: Default, E: std::fmt::Display> ResultExt<T, E> for Result<T, E> {
|
||||
}
|
||||
}
|
||||
|
||||
trait ResultLastError<T, E>
|
||||
where
|
||||
E: std::fmt::Display,
|
||||
{
|
||||
/// Sets this `Err` value using [`Context::set_last_error`].
|
||||
///
|
||||
/// Normally each FFI-API *should* call this if it handles an error from the Rust API:
|
||||
/// errors which need to be reported to users in response to an API call need to be
|
||||
/// propagated up the Rust API and at the FFI boundary need to be stored into the "last
|
||||
/// error" so the FFI users can retrieve an appropriate error message on failure. Often
|
||||
/// you will want to combine this with a call to [`LogExt::log_err`].
|
||||
///
|
||||
/// Since historically calls to the `deltachat::log::error!()` macro were (and sometimes
|
||||
/// still are) shown as error toasts to the user, this macro also calls
|
||||
/// [`Context::set_last_error`]. It is preferable however to rely on normal error
|
||||
/// propagation in Rust code however and only use this `ResultExt::set_last_error` call
|
||||
/// in the FFI layer.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// Fully handling an error in the FFI code looks like this currently:
|
||||
///
|
||||
/// ```no_compile
|
||||
/// some_dc_rust_api_call_returning_result()
|
||||
/// .log_err(&context, "My API call failed")
|
||||
/// .context("My API call failed")
|
||||
/// .set_last_error(&context)
|
||||
/// .unwrap_or_default()
|
||||
/// ```
|
||||
///
|
||||
/// As shows it is a shame the `.log_err()` call currently needs a message instead of
|
||||
/// relying on an implicit call to the [`anyhow::Context`] call if needed. This stems
|
||||
/// from a time before we fully embraced anyhow. Some day we'll also fix that.
|
||||
///
|
||||
/// [`Context::set_last_error`]: context::Context::set_last_error
|
||||
fn set_last_error(self, context: &context::Context) -> Result<T, E>;
|
||||
}
|
||||
|
||||
impl<T, E> ResultLastError<T, E> for Result<T, E>
|
||||
where
|
||||
E: std::fmt::Display,
|
||||
{
|
||||
fn set_last_error(self, context: &context::Context) -> Result<T, E> {
|
||||
if let Err(ref err) = self {
|
||||
context.set_last_error(&format!("{err:#}"));
|
||||
}
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
trait ResultNullableExt<T> {
|
||||
fn into_raw(self) -> *mut T;
|
||||
}
|
||||
|
||||
@@ -14,6 +14,8 @@ use crate::summary::{Summary, SummaryPrefix};
|
||||
/// eg. by chatlist.get_summary() or dc_msg_get_summary().
|
||||
///
|
||||
/// *Lot* is used in the meaning *heap* here.
|
||||
// The QR code grew too large. So be it.
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[derive(Debug)]
|
||||
pub enum Lot {
|
||||
Summary(Summary),
|
||||
@@ -47,6 +49,7 @@ impl Lot {
|
||||
Qr::FprMismatch { .. } => None,
|
||||
Qr::FprWithoutAddr { fingerprint, .. } => Some(fingerprint),
|
||||
Qr::Account { domain } => Some(domain),
|
||||
Qr::Backup { .. } => None,
|
||||
Qr::WebrtcInstance { domain, .. } => Some(domain),
|
||||
Qr::Addr { draft, .. } => draft.as_deref(),
|
||||
Qr::Url { url } => Some(url),
|
||||
@@ -98,6 +101,7 @@ impl Lot {
|
||||
Qr::FprMismatch { .. } => LotState::QrFprMismatch,
|
||||
Qr::FprWithoutAddr { .. } => LotState::QrFprWithoutAddr,
|
||||
Qr::Account { .. } => LotState::QrAccount,
|
||||
Qr::Backup { .. } => LotState::QrBackup,
|
||||
Qr::WebrtcInstance { .. } => LotState::QrWebrtcInstance,
|
||||
Qr::Addr { .. } => LotState::QrAddr,
|
||||
Qr::Url { .. } => LotState::QrUrl,
|
||||
@@ -122,6 +126,7 @@ impl Lot {
|
||||
Qr::FprMismatch { contact_id } => contact_id.unwrap_or_default().to_u32(),
|
||||
Qr::FprWithoutAddr { .. } => Default::default(),
|
||||
Qr::Account { .. } => Default::default(),
|
||||
Qr::Backup { .. } => Default::default(),
|
||||
Qr::WebrtcInstance { .. } => Default::default(),
|
||||
Qr::Addr { contact_id, .. } => contact_id.to_u32(),
|
||||
Qr::Url { .. } => Default::default(),
|
||||
@@ -170,6 +175,8 @@ pub enum LotState {
|
||||
/// text1=domain
|
||||
QrAccount = 250,
|
||||
|
||||
QrBackup = 251,
|
||||
|
||||
/// text1=domain, text2=instance pattern
|
||||
QrWebrtcInstance = 260,
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ walkdir = "2.3.2"
|
||||
base64 = "0.21"
|
||||
|
||||
# optional dependencies
|
||||
axum = { version = "0.6.6", optional = true, features = ["ws"] }
|
||||
axum = { version = "0.6.11", optional = true, features = ["ws"] }
|
||||
env_logger = { version = "0.10.0", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::{collections::HashMap, str::FromStr};
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
pub use deltachat::accounts::Accounts;
|
||||
use deltachat::qr::Qr;
|
||||
use deltachat::{
|
||||
chat::{
|
||||
self, add_contact_to_chat, forward_msgs, get_chat_media, get_chat_msgs, get_chat_msgs_ex,
|
||||
@@ -22,14 +23,15 @@ use deltachat::{
|
||||
},
|
||||
provider::get_provider_info,
|
||||
qr,
|
||||
qr_code_generator::get_securejoin_qr_svg,
|
||||
qr_code_generator::{generate_backup_qr, get_securejoin_qr_svg},
|
||||
reaction::send_reaction,
|
||||
securejoin,
|
||||
stock_str::StockMessage,
|
||||
webxdc::StatusUpdateSerial,
|
||||
};
|
||||
use sanitize_filename::is_sanitized;
|
||||
use tokio::{fs, sync::RwLock};
|
||||
use tokio::fs;
|
||||
use tokio::sync::{watch, Mutex, RwLock};
|
||||
use walkdir::WalkDir;
|
||||
use yerpc::rpc;
|
||||
|
||||
@@ -57,21 +59,45 @@ use self::types::{
|
||||
use crate::api::types::chat_list::{get_chat_list_item_by_id, ChatListItemFetchResult};
|
||||
use crate::api::types::qr::QrObject;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct AccountState {
|
||||
/// The Qr code for current [`CommandApi::provide_backup`] call.
|
||||
///
|
||||
/// If there currently is a call to [`CommandApi::provide_backup`] this will be
|
||||
/// `Pending` or `Ready`, otherwise `NoProvider`.
|
||||
backup_provider_qr: watch::Sender<ProviderQr>,
|
||||
}
|
||||
|
||||
impl Default for AccountState {
|
||||
fn default() -> Self {
|
||||
let (tx, _rx) = watch::channel(ProviderQr::NoProvider);
|
||||
Self {
|
||||
backup_provider_qr: tx,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct CommandApi {
|
||||
pub(crate) accounts: Arc<RwLock<Accounts>>,
|
||||
|
||||
states: Arc<Mutex<BTreeMap<u32, AccountState>>>,
|
||||
}
|
||||
|
||||
impl CommandApi {
|
||||
pub fn new(accounts: Accounts) -> Self {
|
||||
CommandApi {
|
||||
accounts: Arc::new(RwLock::new(accounts)),
|
||||
states: Arc::new(Mutex::new(BTreeMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn from_arc(accounts: Arc<RwLock<Accounts>>) -> Self {
|
||||
CommandApi { accounts }
|
||||
CommandApi {
|
||||
accounts,
|
||||
states: Arc::new(Mutex::new(BTreeMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_context(&self, id: u32) -> Result<deltachat::context::Context> {
|
||||
@@ -83,6 +109,38 @@ impl CommandApi {
|
||||
.ok_or_else(|| anyhow!("account with id {} not found", id))?;
|
||||
Ok(sc)
|
||||
}
|
||||
|
||||
async fn with_state<F, T>(&self, id: u32, with_state: F) -> T
|
||||
where
|
||||
F: FnOnce(&AccountState) -> T,
|
||||
{
|
||||
let mut states = self.states.lock().await;
|
||||
let state = states.entry(id).or_insert_with(Default::default);
|
||||
with_state(state)
|
||||
}
|
||||
|
||||
async fn inner_get_backup_qr(&self, account_id: u32) -> Result<Qr> {
|
||||
let mut receiver = self
|
||||
.with_state(account_id, |state| state.backup_provider_qr.subscribe())
|
||||
.await;
|
||||
|
||||
let val: ProviderQr = receiver.borrow_and_update().clone();
|
||||
match val {
|
||||
ProviderQr::NoProvider => bail!("No backup being provided"),
|
||||
ProviderQr::Pending => loop {
|
||||
if receiver.changed().await.is_err() {
|
||||
bail!("No backup being provided (account state dropped)");
|
||||
}
|
||||
let val: ProviderQr = receiver.borrow().clone();
|
||||
match val {
|
||||
ProviderQr::NoProvider => bail!("No backup being provided"),
|
||||
ProviderQr::Pending => continue,
|
||||
ProviderQr::Ready(qr) => break Ok(qr),
|
||||
};
|
||||
},
|
||||
ProviderQr::Ready(qr) => Ok(qr),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[rpc(all_positional, ts_outdir = "typescript/generated")]
|
||||
@@ -115,7 +173,13 @@ impl CommandApi {
|
||||
}
|
||||
|
||||
async fn remove_account(&self, account_id: u32) -> Result<()> {
|
||||
self.accounts.write().await.remove_account(account_id).await
|
||||
self.accounts
|
||||
.write()
|
||||
.await
|
||||
.remove_account(account_id)
|
||||
.await?;
|
||||
self.states.lock().await.remove(&account_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_all_account_ids(&self) -> Vec<u32> {
|
||||
@@ -1325,16 +1389,13 @@ impl CommandApi {
|
||||
passphrase: Option<String>,
|
||||
) -> Result<()> {
|
||||
let ctx = self.get_context(account_id).await?;
|
||||
ctx.stop_io().await;
|
||||
let result = imex::imex(
|
||||
imex::imex(
|
||||
&ctx,
|
||||
imex::ImexMode::ExportBackup,
|
||||
destination.as_ref(),
|
||||
passphrase,
|
||||
)
|
||||
.await;
|
||||
ctx.start_io().await;
|
||||
result
|
||||
.await
|
||||
}
|
||||
|
||||
async fn import_backup(
|
||||
@@ -1353,6 +1414,75 @@ impl CommandApi {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Offers a backup for remote devices to retrieve.
|
||||
///
|
||||
/// Can be cancelled by stopping the ongoing process. Success or failure can be tracked
|
||||
/// via the `ImexProgress` event which should either reach `1000` for success or `0` for
|
||||
/// failure.
|
||||
///
|
||||
/// This **stops IO** while it is running.
|
||||
///
|
||||
/// Returns once a remote device has retrieved the backup, or is cancelled.
|
||||
async fn provide_backup(&self, account_id: u32) -> Result<()> {
|
||||
let ctx = self.get_context(account_id).await?;
|
||||
self.with_state(account_id, |state| {
|
||||
state.backup_provider_qr.send_replace(ProviderQr::Pending);
|
||||
})
|
||||
.await;
|
||||
|
||||
let provider = imex::BackupProvider::prepare(&ctx).await?;
|
||||
self.with_state(account_id, |state| {
|
||||
state
|
||||
.backup_provider_qr
|
||||
.send_replace(ProviderQr::Ready(provider.qr()));
|
||||
})
|
||||
.await;
|
||||
|
||||
provider.await
|
||||
}
|
||||
|
||||
/// Returns the text of the QR code for the running [`CommandApi::provide_backup`].
|
||||
///
|
||||
/// This QR code text can be used in [`CommandApi::get_backup`] on a second device to
|
||||
/// retrieve the backup and setup this second device.
|
||||
///
|
||||
/// This call will fail if there is currently no concurrent call to
|
||||
/// [`CommandApi::provide_backup`]. This call may block if the QR code is not yet
|
||||
/// ready.
|
||||
async fn get_backup_qr(&self, account_id: u32) -> Result<String> {
|
||||
let qr = self.inner_get_backup_qr(account_id).await?;
|
||||
qr::format_backup(&qr)
|
||||
}
|
||||
|
||||
/// Returns the rendered QR code for the running [`CommandApi::provide_backup`].
|
||||
///
|
||||
/// This QR code can be used in [`CommandApi::get_backup`] on a second device to
|
||||
/// retrieve the backup and setup this second device.
|
||||
///
|
||||
/// This call will fail if there is currently no concurrent call to
|
||||
/// [`CommandApi::provide_backup`]. This call may block if the QR code is not yet
|
||||
/// ready.
|
||||
///
|
||||
/// Returns the QR code rendered as an SVG image.
|
||||
async fn get_backup_qr_svg(&self, account_id: u32) -> Result<String> {
|
||||
let ctx = self.get_context(account_id).await?;
|
||||
let qr = self.inner_get_backup_qr(account_id).await?;
|
||||
generate_backup_qr(&ctx, &qr).await
|
||||
}
|
||||
|
||||
/// Gets a backup from a remote provider.
|
||||
///
|
||||
/// This retrieves the backup from a remote device over the network and imports it into
|
||||
/// the current device.
|
||||
///
|
||||
/// Can be cancelled by stopping the ongoing process.
|
||||
async fn get_backup(&self, account_id: u32, qr_text: String) -> Result<()> {
|
||||
let ctx = self.get_context(account_id).await?;
|
||||
let qr = qr::check_qr(&ctx, &qr_text).await?;
|
||||
imex::get_backup(&ctx, qr).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ---------------------------------------------
|
||||
// connectivity
|
||||
// ---------------------------------------------
|
||||
@@ -1840,3 +1970,15 @@ async fn get_config(
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether a QR code for a BackupProvider is currently available.
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[derive(Clone, Debug)]
|
||||
enum ProviderQr {
|
||||
/// There is no provider, asking for a QR is an error.
|
||||
NoProvider,
|
||||
/// There is a provider, the QR code is pending.
|
||||
Pending,
|
||||
/// There is a provider and QR code.
|
||||
Ready(Qr),
|
||||
}
|
||||
|
||||
@@ -32,6 +32,9 @@ pub enum QrObject {
|
||||
Account {
|
||||
domain: String,
|
||||
},
|
||||
Backup {
|
||||
ticket: String,
|
||||
},
|
||||
WebrtcInstance {
|
||||
domain: String,
|
||||
instance_pattern: String,
|
||||
@@ -126,6 +129,9 @@ impl From<Qr> for QrObject {
|
||||
}
|
||||
Qr::FprWithoutAddr { fingerprint } => QrObject::FprWithoutAddr { fingerprint },
|
||||
Qr::Account { domain } => QrObject::Account { domain },
|
||||
Qr::Backup { ticket } => QrObject::Backup {
|
||||
ticket: ticket.to_string(),
|
||||
},
|
||||
Qr::WebrtcInstance {
|
||||
domain,
|
||||
instance_pattern,
|
||||
|
||||
@@ -336,6 +336,8 @@ pub async fn cmdline(context: Context, line: &str, chat_id: &mut ChatId) -> Resu
|
||||
has-backup\n\
|
||||
export-backup\n\
|
||||
import-backup <backup-file>\n\
|
||||
send-backup\n\
|
||||
receive-backup <qr>\n\
|
||||
export-keys\n\
|
||||
import-keys\n\
|
||||
export-setup\n\
|
||||
@@ -486,6 +488,17 @@ pub async fn cmdline(context: Context, line: &str, chat_id: &mut ChatId) -> Resu
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
"send-backup" => {
|
||||
let provider = BackupProvider::prepare(&context).await?;
|
||||
let qr = provider.qr();
|
||||
println!("QR code: {}", format_backup(&qr)?);
|
||||
provider.await?;
|
||||
}
|
||||
"receive-backup" => {
|
||||
ensure!(!arg1.is_empty(), "Argument <qr> is missing.");
|
||||
let qr = check_qr(&context, arg1).await?;
|
||||
deltachat::imex::get_backup(&context, qr).await?;
|
||||
}
|
||||
"export-keys" => {
|
||||
let dir = dirs::home_dir().unwrap_or_default();
|
||||
imex(&context, ImexMode::ExportSelfKeys, dir.as_ref(), None).await?;
|
||||
|
||||
@@ -152,13 +152,15 @@ impl Completer for DcHelper {
|
||||
}
|
||||
}
|
||||
|
||||
const IMEX_COMMANDS: [&str; 12] = [
|
||||
const IMEX_COMMANDS: [&str; 14] = [
|
||||
"initiate-key-transfer",
|
||||
"get-setupcodebegin",
|
||||
"continue-key-transfer",
|
||||
"has-backup",
|
||||
"export-backup",
|
||||
"import-backup",
|
||||
"send-backup",
|
||||
"receive-backup",
|
||||
"export-keys",
|
||||
"import-keys",
|
||||
"export-setup",
|
||||
|
||||
@@ -6,7 +6,7 @@ envlist =
|
||||
|
||||
[testenv]
|
||||
commands =
|
||||
pytest {posargs}
|
||||
pytest --exitfirst {posargs}
|
||||
setenv =
|
||||
# Avoid stack overflow when Rust core is built without optimizations.
|
||||
RUST_MIN_STACK=8388608
|
||||
@@ -25,5 +25,5 @@ deps =
|
||||
ruff
|
||||
black
|
||||
commands =
|
||||
black --check --diff src/ examples/ tests/
|
||||
black --quiet --check --diff src/ examples/ tests/
|
||||
ruff src/ examples/ tests/
|
||||
|
||||
73
deny.toml
73
deny.toml
@@ -12,27 +12,41 @@ ignore = [
|
||||
# becoming empty. Adding versions forces us to revisit this at least
|
||||
# when upgrading.
|
||||
skip = [
|
||||
{ name = "windows-sys", version = "<0.45" },
|
||||
{ name = "wasi", version = "<0.11" },
|
||||
{ name = "version_check", version = "<0.9" },
|
||||
{ name = "uuid", version = "<1.3" },
|
||||
{ name = "sha2", version = "<0.10" },
|
||||
{ name = "rand_core", version = "<0.6" },
|
||||
{ name = "rand_chacha", version = "<0.3" },
|
||||
{ name = "rand", version = "<0.8" },
|
||||
{ name = "nom", version = "<7.1" },
|
||||
{ name = "idna", version = "<0.3" },
|
||||
{ name = "humantime", version = "<2.1" },
|
||||
{ name = "hermit-abi", version = "<0.3" },
|
||||
{ name = "getrandom", version = "<0.2" },
|
||||
{ name = "quick-error", version = "<2.0" },
|
||||
{ name = "env_logger", version = "<0.10" },
|
||||
{ name = "digest", version = "<0.10" },
|
||||
{ name = "darling_macro", version = "<0.14" },
|
||||
{ name = "darling_core", version = "<0.14" },
|
||||
{ name = "darling", version = "<0.14" },
|
||||
{ name = "block-buffer", version = "<0.10" },
|
||||
{ name = "base64", version = "<0.21" },
|
||||
{ name = "block-buffer", version = "<0.10" },
|
||||
{ name = "darling", version = "<0.14" },
|
||||
{ name = "darling_core", version = "<0.14" },
|
||||
{ name = "darling_macro", version = "<0.14" },
|
||||
{ name = "digest", version = "<0.10" },
|
||||
{ name = "env_logger", version = "<0.10" },
|
||||
{ name = "getrandom", version = "<0.2" },
|
||||
{ name = "hermit-abi", version = "<0.3" },
|
||||
{ name = "humantime", version = "<2.1" },
|
||||
{ name = "idna", version = "<0.3" },
|
||||
{ name = "nom", version = "<7.1" },
|
||||
{ name = "quick-error", version = "<2.0" },
|
||||
{ name = "rand", version = "<0.8" },
|
||||
{ name = "rand_chacha", version = "<0.3" },
|
||||
{ name = "rand_core", version = "<0.6" },
|
||||
{ name = "sha2", version = "<0.10" },
|
||||
{ name = "time", version = "<0.3" },
|
||||
{ name = "version_check", version = "<0.9" },
|
||||
{ name = "wasi", version = "<0.11" },
|
||||
{ name = "windows-sys", version = "<0.45" },
|
||||
{ name = "windows_x86_64_msvc", version = "<0.42" },
|
||||
{ name = "windows_x86_64_gnu", version = "<0.42" },
|
||||
{ name = "windows_i686_msvc", version = "<0.42" },
|
||||
{ name = "windows_i686_gnu", version = "<0.42" },
|
||||
{ name = "windows_aarch64_msvc", version = "<0.42" },
|
||||
{ name = "unicode-xid", version = "<0.2.4" },
|
||||
{ name = "syn", version = "<1.0" },
|
||||
{ name = "quote", version = "<1.0" },
|
||||
{ name = "proc-macro2", version = "<1.0" },
|
||||
{ name = "portable-atomic", version = "<1.0" },
|
||||
{ name = "spin", version = "<0.9.6" },
|
||||
{ name = "convert_case", version = "0.4.0" },
|
||||
{ name = "clap_lex", version = "0.2.4" },
|
||||
{ name = "clap", version = "3.2.23" },
|
||||
]
|
||||
|
||||
|
||||
@@ -42,11 +56,21 @@ allow = [
|
||||
"Apache-2.0",
|
||||
"BSD-2-Clause",
|
||||
"BSD-3-Clause",
|
||||
"CC0-1.0",
|
||||
"MIT",
|
||||
"BSL-1.0", # Boost Software License 1.0
|
||||
"Unicode-DFS-2016",
|
||||
"CC0-1.0",
|
||||
"ISC",
|
||||
"MIT",
|
||||
"MPL-2.0",
|
||||
"OpenSSL",
|
||||
"Unicode-DFS-2016",
|
||||
"Zlib",
|
||||
]
|
||||
|
||||
[[licenses.clarify]]
|
||||
name = "ring"
|
||||
expression = "MIT AND ISC AND OpenSSL"
|
||||
license-files = [
|
||||
{ path = "LICENSE", hash = 0xbd0eed23 },
|
||||
]
|
||||
|
||||
[sources.allow-org]
|
||||
@@ -54,4 +78,7 @@ allow = [
|
||||
github = [
|
||||
"async-email",
|
||||
"deltachat",
|
||||
"n0-computer",
|
||||
"quinn-rs",
|
||||
"dignifiedquire",
|
||||
]
|
||||
|
||||
80
fuzz/Cargo.lock
generated
80
fuzz/Cargo.lock
generated
@@ -111,7 +111,7 @@ version = "0.6.0"
|
||||
source = "git+https://github.com/async-email/async-imap?branch=master#85ff7a3d9d71a3715354fabf2fc1a8d047b5710e"
|
||||
dependencies = [
|
||||
"async-channel",
|
||||
"async-native-tls",
|
||||
"async-native-tls 0.4.0",
|
||||
"base64 0.13.1",
|
||||
"byte-pool",
|
||||
"chrono",
|
||||
@@ -139,6 +139,18 @@ dependencies = [
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-native-tls"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9343dc5acf07e79ff82d0c37899f079db3534d99f189a1837c8e549c99405bec"
|
||||
dependencies = [
|
||||
"native-tls",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-smtp"
|
||||
version = "0.8.0"
|
||||
@@ -696,12 +708,12 @@ checksum = "23d8666cb01533c39dde32bcbab8e227b4ed6679b2c925eba05feabea39508fb"
|
||||
|
||||
[[package]]
|
||||
name = "deltachat"
|
||||
version = "1.107.0"
|
||||
version = "1.111.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-channel",
|
||||
"async-imap",
|
||||
"async-native-tls",
|
||||
"async-native-tls 0.5.0",
|
||||
"async-smtp",
|
||||
"async_zip",
|
||||
"backtrace",
|
||||
@@ -723,17 +735,15 @@ dependencies = [
|
||||
"lettre_email",
|
||||
"libc",
|
||||
"mailparse 0.14.0",
|
||||
"native-tls",
|
||||
"num-derive",
|
||||
"num-traits",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
"parking_lot",
|
||||
"percent-encoding",
|
||||
"pgp",
|
||||
"qrcodegen",
|
||||
"quick-xml",
|
||||
"r2d2",
|
||||
"r2d2_sqlite",
|
||||
"rand 0.8.5",
|
||||
"ratelimit",
|
||||
"regex",
|
||||
@@ -1284,15 +1294,6 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.11.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.12.3"
|
||||
@@ -1304,11 +1305,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "hashlink"
|
||||
version = "0.7.0"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf"
|
||||
checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa"
|
||||
dependencies = [
|
||||
"hashbrown 0.11.2",
|
||||
"hashbrown",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1506,7 +1507,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"hashbrown 0.12.3",
|
||||
"hashbrown",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1631,9 +1632,9 @@ checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb"
|
||||
|
||||
[[package]]
|
||||
name = "libsqlite3-sys"
|
||||
version = "0.24.2"
|
||||
version = "0.25.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "898745e570c7d0453cc1fbc4a701eb6c662ed54e8fec8b7d14be137ebeeb9d14"
|
||||
checksum = "29f835d03d717946d28b1d1ed632eb6f0e24a299388ee623d0c23118d3e8a7fa"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"openssl-sys",
|
||||
@@ -2241,27 +2242,6 @@ version = "0.4.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "20f14e071918cbeefc5edc986a7aa92c425dae244e003a35e1cdddb5ca39b5cb"
|
||||
|
||||
[[package]]
|
||||
name = "r2d2"
|
||||
version = "0.8.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
|
||||
dependencies = [
|
||||
"log",
|
||||
"parking_lot",
|
||||
"scheduled-thread-pool",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "r2d2_sqlite"
|
||||
version = "0.20.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6fdc8e4da70586127893be32b7adf21326a4c6b1aba907611edf467d13ffe895"
|
||||
dependencies = [
|
||||
"r2d2",
|
||||
"rusqlite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.7.3"
|
||||
@@ -2451,16 +2431,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rusqlite"
|
||||
version = "0.27.0"
|
||||
version = "0.28.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "85127183a999f7db96d1a976a309eebbfb6ea3b0b400ddd8340190129de6eb7a"
|
||||
checksum = "01e213bc3ecb39ac32e81e51ebe31fd888a940515173e3a18a35f8c6e896422a"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"fallible-iterator",
|
||||
"fallible-streaming-iterator",
|
||||
"hashlink",
|
||||
"libsqlite3-sys",
|
||||
"memchr",
|
||||
"smallvec",
|
||||
]
|
||||
|
||||
@@ -2514,15 +2493,6 @@ dependencies = [
|
||||
"windows-sys 0.36.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scheduled-thread-pool"
|
||||
version = "0.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf"
|
||||
dependencies = [
|
||||
"parking_lot",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.1.0"
|
||||
@@ -3126,7 +3096,7 @@ version = "0.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c5faade31a542b8b35855fff6e8def199853b2da8da256da52f52f1316ee3137"
|
||||
dependencies = [
|
||||
"hashbrown 0.12.3",
|
||||
"hashbrown",
|
||||
"regex",
|
||||
]
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ envlist =
|
||||
|
||||
[testenv]
|
||||
commands =
|
||||
pytest -n6 --extra-info -v -rsXx --ignored --strict-tls {posargs: tests examples}
|
||||
pytest -n6 --exitfirst --extra-info -v -rsXx --ignored --strict-tls {posargs: tests examples}
|
||||
pip wheel . -w {toxworkdir}/wheelhouse --no-deps
|
||||
setenv =
|
||||
# Avoid stack overflow when Rust core is built without optimizations.
|
||||
@@ -55,7 +55,7 @@ deps =
|
||||
pygments
|
||||
restructuredtext_lint
|
||||
commands =
|
||||
black --check --diff setup.py install_python_bindings.py src/deltachat examples/ tests/
|
||||
black --quiet --check --diff setup.py install_python_bindings.py src/deltachat examples/ tests/
|
||||
ruff src/deltachat tests/ examples/
|
||||
rst-lint --encoding 'utf-8' README.rst
|
||||
|
||||
|
||||
@@ -1,43 +0,0 @@
|
||||
#!/bin/sh
|
||||
# Build deltachat-rpc-server for Android.
|
||||
|
||||
set -e
|
||||
|
||||
test -n "$ANDROID_NDK_ROOT" || exit 1
|
||||
|
||||
RUSTUP_TOOLCHAIN="1.64.0"
|
||||
rustup install "$RUSTUP_TOOLCHAIN"
|
||||
rustup target add armv7-linux-androideabi aarch64-linux-android i686-linux-android x86_64-linux-android --toolchain "$RUSTUP_TOOLCHAIN"
|
||||
|
||||
KERNEL="$(uname -s | tr '[:upper:]' '[:lower:]')"
|
||||
ARCH="$(uname -m)"
|
||||
NDK_HOST_TAG="$KERNEL-$ARCH"
|
||||
TOOLCHAIN="$ANDROID_NDK_ROOT/toolchains/llvm/prebuilt/$NDK_HOST_TAG"
|
||||
|
||||
PACKAGE="deltachat-rpc-server"
|
||||
|
||||
export CARGO_PROFILE_RELEASE_LTO=on
|
||||
|
||||
CARGO_TARGET_ARMV7_LINUX_ANDROIDEABI_LINKER="$TOOLCHAIN/bin/armv7a-linux-androideabi16-clang" \
|
||||
TARGET_CC="$TOOLCHAIN/bin/armv7a-linux-androideabi16-clang" \
|
||||
TARGET_AR="$TOOLCHAIN/bin/llvm-ar" \
|
||||
TARGET_RANLIB="$TOOLCHAIN/bin/llvm-ranlib" \
|
||||
cargo "+$RUSTUP_TOOLCHAIN" rustc --release --target armv7-linux-androideabi -p $PACKAGE
|
||||
|
||||
CARGO_TARGET_AARCH64_LINUX_ANDROID_LINKER="$TOOLCHAIN/bin/aarch64-linux-android21-clang" \
|
||||
TARGET_CC="$TOOLCHAIN/bin/aarch64-linux-android21-clang" \
|
||||
TARGET_AR="$TOOLCHAIN/bin/llvm-ar" \
|
||||
TARGET_RANLIB="$TOOLCHAIN/bin/llvm-ranlib" \
|
||||
cargo "+$RUSTUP_TOOLCHAIN" rustc --release --target aarch64-linux-android -p $PACKAGE
|
||||
|
||||
CARGO_TARGET_I686_LINUX_ANDROID_LINKER="$TOOLCHAIN/bin/i686-linux-android16-clang" \
|
||||
TARGET_CC="$TOOLCHAIN/bin/i686-linux-android16-clang" \
|
||||
TARGET_AR="$TOOLCHAIN/bin/llvm-ar" \
|
||||
TARGET_RANLIB="$TOOLCHAIN/bin/llvm-ranlib" \
|
||||
cargo "+$RUSTUP_TOOLCHAIN" rustc --release --target i686-linux-android -p $PACKAGE
|
||||
|
||||
CARGO_TARGET_X86_64_LINUX_ANDROID_LINKER="$TOOLCHAIN/bin/x86_64-linux-android21-clang" \
|
||||
TARGET_CC="$TOOLCHAIN/bin/x86_64-linux-android21-clang" \
|
||||
TARGET_AR="$TOOLCHAIN/bin/llvm-ar" \
|
||||
TARGET_RANLIB="$TOOLCHAIN/bin/llvm-ranlib" \
|
||||
cargo "+$RUSTUP_TOOLCHAIN" rustc --release --target x86_64-linux-android -p $PACKAGE
|
||||
@@ -7,17 +7,17 @@ set -e
|
||||
|
||||
unset RUSTFLAGS
|
||||
|
||||
ZIG_VERSION=0.11.0-dev.1935+1d96a17af
|
||||
ZIG_VERSION=0.11.0-dev.2213+515e1c93e
|
||||
|
||||
# Download Zig
|
||||
rm -fr "$ZIG_VERSION" "ZIG_VERSION.tar.xz"
|
||||
rm -fr "$ZIG_VERSION" "zig-linux-x86_64-$ZIG_VERSION.tar.xz"
|
||||
wget "https://ziglang.org/builds/zig-linux-x86_64-$ZIG_VERSION.tar.xz"
|
||||
tar xf "zig-linux-x86_64-$ZIG_VERSION.tar.xz"
|
||||
export PATH="$PWD/zig-linux-x86_64-$ZIG_VERSION:$PATH"
|
||||
|
||||
cargo install cargo-zigbuild
|
||||
|
||||
for TARGET in aarch64-unknown-linux-musl armv7-unknown-linux-musleabihf; do
|
||||
for TARGET in x86_64-unknown-linux-musl aarch64-unknown-linux-musl armv7-unknown-linux-musleabihf; do
|
||||
rustup target add "$TARGET"
|
||||
cargo zigbuild --release --target "$TARGET" -p deltachat-rpc-server --features vendored
|
||||
done
|
||||
|
||||
@@ -271,14 +271,14 @@ impl Accounts {
|
||||
/// Notifies all accounts that the network may have become available.
|
||||
pub async fn maybe_network(&self) {
|
||||
for account in self.accounts.values() {
|
||||
account.maybe_network().await;
|
||||
account.scheduler.maybe_network().await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Notifies all accounts that the network connection may have been lost.
|
||||
pub async fn maybe_network_lost(&self) {
|
||||
for account in self.accounts.values() {
|
||||
account.maybe_network_lost().await;
|
||||
account.scheduler.maybe_network_lost(account).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
88
src/blob.rs
88
src/blob.rs
@@ -4,13 +4,16 @@ use core::cmp::max;
|
||||
use std::ffi::OsStr;
|
||||
use std::fmt;
|
||||
use std::io::Cursor;
|
||||
use std::iter::FusedIterator;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::{format_err, Context as _, Result};
|
||||
use futures::StreamExt;
|
||||
use image::{DynamicImage, ImageFormat};
|
||||
use num_traits::FromPrimitive;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::{fs, io};
|
||||
use tokio_stream::wrappers::ReadDirStream;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::constants::{
|
||||
@@ -160,9 +163,9 @@ impl<'a> BlobObject<'a> {
|
||||
pub fn from_path(context: &'a Context, path: &Path) -> Result<BlobObject<'a>> {
|
||||
let rel_path = path
|
||||
.strip_prefix(context.get_blobdir())
|
||||
.context("wrong blobdir")?;
|
||||
.with_context(|| format!("wrong blobdir: {}", path.display()))?;
|
||||
if !BlobObject::is_acceptible_blob_name(rel_path) {
|
||||
return Err(format_err!("wrong name"));
|
||||
return Err(format_err!("bad blob name: {}", rel_path.display()));
|
||||
}
|
||||
let name = rel_path.to_str().context("wrong name")?;
|
||||
BlobObject::from_name(context, name.to_string())
|
||||
@@ -468,6 +471,87 @@ impl<'a> fmt::Display for BlobObject<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
/// All files in the blobdir.
|
||||
///
|
||||
/// This exists so we can have a [`BlobDirIter`] which needs something to own the data of
|
||||
/// it's `&Path`. Use [`BlobDirContents::iter`] to create the iterator.
|
||||
///
|
||||
/// Additionally pre-allocating this means we get a length for progress report.
|
||||
pub(crate) struct BlobDirContents<'a> {
|
||||
inner: Vec<PathBuf>,
|
||||
context: &'a Context,
|
||||
}
|
||||
|
||||
impl<'a> BlobDirContents<'a> {
|
||||
pub(crate) async fn new(context: &'a Context) -> Result<BlobDirContents<'a>> {
|
||||
let readdir = fs::read_dir(context.get_blobdir()).await?;
|
||||
let inner = ReadDirStream::new(readdir)
|
||||
.filter_map(|entry| async move {
|
||||
match entry {
|
||||
Ok(entry) => Some(entry),
|
||||
Err(err) => {
|
||||
error!(context, "Failed to read blob file: {err}");
|
||||
None
|
||||
}
|
||||
}
|
||||
})
|
||||
.filter_map(|entry| async move {
|
||||
match entry.file_type().await.ok()?.is_file() {
|
||||
true => Some(entry.path()),
|
||||
false => {
|
||||
warn!(
|
||||
context,
|
||||
"Export: Found blob dir entry {} that is not a file, ignoring",
|
||||
entry.path().display()
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
Ok(Self { inner, context })
|
||||
}
|
||||
|
||||
pub(crate) fn iter(&self) -> BlobDirIter<'_> {
|
||||
BlobDirIter::new(self.context, self.inner.iter())
|
||||
}
|
||||
|
||||
pub(crate) fn len(&self) -> usize {
|
||||
self.inner.len()
|
||||
}
|
||||
}
|
||||
|
||||
/// A iterator over all the [`BlobObject`]s in the blobdir.
|
||||
pub(crate) struct BlobDirIter<'a> {
|
||||
iter: std::slice::Iter<'a, PathBuf>,
|
||||
context: &'a Context,
|
||||
}
|
||||
|
||||
impl<'a> BlobDirIter<'a> {
|
||||
fn new(context: &'a Context, iter: std::slice::Iter<'a, PathBuf>) -> BlobDirIter<'a> {
|
||||
Self { iter, context }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for BlobDirIter<'a> {
|
||||
type Item = BlobObject<'a>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
for path in self.iter.by_ref() {
|
||||
// In theory this can error but we'd have corrupted filenames in the blobdir, so
|
||||
// silently skipping them is fine.
|
||||
match BlobObject::from_path(self.context, path) {
|
||||
Ok(blob) => return Some(blob),
|
||||
Err(err) => warn!(self.context, "{err}"),
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl FusedIterator for BlobDirIter<'_> {}
|
||||
|
||||
fn encode_img(img: &DynamicImage, encoded: &mut Vec<u8>) -> anyhow::Result<()> {
|
||||
encoded.clear();
|
||||
let mut buf = Cursor::new(encoded);
|
||||
|
||||
22
src/chat.rs
22
src/chat.rs
@@ -639,7 +639,10 @@ impl ChatId {
|
||||
context.emit_msgs_changed_without_ids();
|
||||
|
||||
context.set_config(Config::LastHousekeeping, None).await?;
|
||||
context.interrupt_inbox(InterruptInfo::new(false)).await;
|
||||
context
|
||||
.scheduler
|
||||
.interrupt_inbox(InterruptInfo::new(false))
|
||||
.await;
|
||||
|
||||
if chat.is_self_talk() {
|
||||
let mut msg = Message::new(Viewtype::Text);
|
||||
@@ -1667,7 +1670,7 @@ impl Chat {
|
||||
|
||||
maybe_set_logging_xdc(context, msg, self.id).await?;
|
||||
}
|
||||
context.interrupt_ephemeral_task().await;
|
||||
context.scheduler.interrupt_ephemeral_task().await;
|
||||
Ok(msg.id)
|
||||
}
|
||||
}
|
||||
@@ -2201,7 +2204,10 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) -
|
||||
context.emit_event(EventType::LocationChanged(Some(ContactId::SELF)));
|
||||
}
|
||||
|
||||
context.interrupt_smtp(InterruptInfo::new(false)).await;
|
||||
context
|
||||
.scheduler
|
||||
.interrupt_smtp(InterruptInfo::new(false))
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(msg.id)
|
||||
@@ -3433,7 +3439,10 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId)
|
||||
.await?;
|
||||
curr_timestamp += 1;
|
||||
if create_send_msg_job(context, new_msg_id).await?.is_some() {
|
||||
context.interrupt_smtp(InterruptInfo::new(false)).await;
|
||||
context
|
||||
.scheduler
|
||||
.interrupt_smtp(InterruptInfo::new(false))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
created_chats.push(chat_id);
|
||||
@@ -3488,7 +3497,10 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
|
||||
msg_id: msg.id,
|
||||
});
|
||||
if create_send_msg_job(context, msg.id).await?.is_some() {
|
||||
context.interrupt_smtp(InterruptInfo::new(false)).await;
|
||||
context
|
||||
.scheduler
|
||||
.interrupt_smtp(InterruptInfo::new(false))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -443,7 +443,7 @@ impl Context {
|
||||
Config::DeleteDeviceAfter => {
|
||||
let ret = self.sql.set_raw_config(key.as_ref(), value).await;
|
||||
// Interrupt ephemeral loop to delete old messages immediately.
|
||||
self.interrupt_ephemeral_task().await;
|
||||
self.scheduler.interrupt_ephemeral_task().await;
|
||||
ret?
|
||||
}
|
||||
Config::Displayname => {
|
||||
|
||||
@@ -59,7 +59,7 @@ impl Context {
|
||||
/// Configures this account with the currently set parameters.
|
||||
pub async fn configure(&self) -> Result<()> {
|
||||
ensure!(
|
||||
self.scheduler.read().await.is_none(),
|
||||
!self.scheduler.is_running().await,
|
||||
"cannot configure, already running"
|
||||
);
|
||||
ensure!(
|
||||
@@ -469,7 +469,9 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> {
|
||||
|
||||
ctx.set_config_bool(Config::FetchedExistingMsgs, false)
|
||||
.await?;
|
||||
ctx.interrupt_inbox(InterruptInfo::new(false)).await;
|
||||
ctx.scheduler
|
||||
.interrupt_inbox(InterruptInfo::new(false))
|
||||
.await;
|
||||
|
||||
progress!(ctx, 940);
|
||||
update_device_chats_handle.await??;
|
||||
|
||||
@@ -1466,7 +1466,10 @@ pub(crate) async fn update_last_seen(
|
||||
> 0
|
||||
&& timestamp > time() - SEEN_RECENTLY_SECONDS
|
||||
{
|
||||
context.interrupt_recently_seen(contact_id, timestamp).await;
|
||||
context
|
||||
.scheduler
|
||||
.interrupt_recently_seen(contact_id, timestamp)
|
||||
.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ use crate::key::{DcKey, SignedPublicKey};
|
||||
use crate::login_param::LoginParam;
|
||||
use crate::message::{self, MessageState, MsgId};
|
||||
use crate::quota::QuotaInfo;
|
||||
use crate::scheduler::Scheduler;
|
||||
use crate::scheduler::SchedulerState;
|
||||
use crate::sql::Sql;
|
||||
use crate::stock_str::StockStrings;
|
||||
use crate::timesmearing::SmearedTimestamp;
|
||||
@@ -191,6 +191,10 @@ pub struct InnerContext {
|
||||
pub(crate) blobdir: PathBuf,
|
||||
pub(crate) sql: Sql,
|
||||
pub(crate) smeared_timestamp: SmearedTimestamp,
|
||||
/// The global "ongoing" process state.
|
||||
///
|
||||
/// This is a global mutex-like state for operations which should be modal in the
|
||||
/// clients.
|
||||
running_state: RwLock<RunningState>,
|
||||
/// Mutex to avoid generating the key for the user more than once.
|
||||
pub(crate) generating_key_mutex: Mutex<()>,
|
||||
@@ -201,7 +205,7 @@ pub struct InnerContext {
|
||||
pub(crate) translated_stockstrings: StockStrings,
|
||||
pub(crate) events: Events,
|
||||
|
||||
pub(crate) scheduler: RwLock<Option<Scheduler>>,
|
||||
pub(crate) scheduler: SchedulerState,
|
||||
pub(crate) ratelimit: RwLock<Ratelimit>,
|
||||
|
||||
/// Recently loaded quota information, if any.
|
||||
@@ -370,7 +374,7 @@ impl Context {
|
||||
wrong_pw_warning_mutex: Mutex::new(()),
|
||||
translated_stockstrings: stockstrings,
|
||||
events,
|
||||
scheduler: RwLock::new(None),
|
||||
scheduler: SchedulerState::new(),
|
||||
ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow to send 6 messages immediately, no more than once every 10 seconds.
|
||||
quota: RwLock::new(None),
|
||||
quota_update_request: AtomicBool::new(false),
|
||||
@@ -395,42 +399,23 @@ impl Context {
|
||||
warn!(self, "can not start io on a context that is not configured");
|
||||
return;
|
||||
}
|
||||
|
||||
info!(self, "starting IO");
|
||||
let mut lock = self.inner.scheduler.write().await;
|
||||
if lock.is_none() {
|
||||
match Scheduler::start(self.clone()).await {
|
||||
Err(err) => error!(self, "Failed to start IO: {:#}", err),
|
||||
Ok(scheduler) => *lock = Some(scheduler),
|
||||
}
|
||||
}
|
||||
self.scheduler.start(self.clone()).await;
|
||||
}
|
||||
|
||||
/// Stops the IO scheduler.
|
||||
pub async fn stop_io(&self) {
|
||||
// Sending an event wakes up event pollers (get_next_event)
|
||||
// so the caller of stop_io() can arrange for proper termination.
|
||||
// For this, the caller needs to instruct the event poller
|
||||
// to terminate on receiving the next event and then call stop_io()
|
||||
// which will emit the below event(s)
|
||||
info!(self, "stopping IO");
|
||||
if let Some(debug_logging) = self.debug_logging.read().await.as_ref() {
|
||||
debug_logging.loop_handle.abort();
|
||||
}
|
||||
if let Some(scheduler) = self.inner.scheduler.write().await.take() {
|
||||
scheduler.stop(self).await;
|
||||
}
|
||||
self.scheduler.stop(self).await;
|
||||
}
|
||||
|
||||
/// Restarts the IO scheduler if it was running before
|
||||
/// when it is not running this is an no-op
|
||||
pub async fn restart_io_if_running(&self) {
|
||||
info!(self, "restarting IO");
|
||||
let is_running = { self.inner.scheduler.read().await.is_some() };
|
||||
if is_running {
|
||||
self.stop_io().await;
|
||||
self.start_io().await;
|
||||
}
|
||||
self.scheduler.restart(self).await;
|
||||
}
|
||||
|
||||
/// Indicate that the network likely has come back.
|
||||
pub async fn maybe_network(&self) {
|
||||
self.scheduler.maybe_network().await;
|
||||
}
|
||||
|
||||
/// Returns a reference to the underlying SQL instance.
|
||||
@@ -521,6 +506,13 @@ impl Context {
|
||||
|
||||
// Ongoing process allocation/free/check
|
||||
|
||||
/// Tries to acquire the global UI "ongoing" mutex.
|
||||
///
|
||||
/// This is for modal operations during which no other user actions are allowed. Only
|
||||
/// one such operation is allowed at any given time.
|
||||
///
|
||||
/// The return value is a cancel token, which will release the ongoing mutex when
|
||||
/// dropped.
|
||||
pub(crate) async fn alloc_ongoing(&self) -> Result<Receiver<()>> {
|
||||
let mut s = self.running_state.write().await;
|
||||
ensure!(
|
||||
|
||||
@@ -317,7 +317,7 @@ impl MsgId {
|
||||
paramsv![ephemeral_timestamp, ephemeral_timestamp, self],
|
||||
)
|
||||
.await?;
|
||||
context.interrupt_ephemeral_task().await;
|
||||
context.scheduler.interrupt_ephemeral_task().await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -345,7 +345,7 @@ pub(crate) async fn start_ephemeral_timers_msgids(
|
||||
)
|
||||
.await?;
|
||||
if count > 0 {
|
||||
context.interrupt_ephemeral_task().await;
|
||||
context.scheduler.interrupt_ephemeral_task().await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -475,7 +475,7 @@ impl Imap {
|
||||
// Note that the `Config::DeleteDeviceAfter` timer starts as soon as the messages are
|
||||
// fetched while the per-chat ephemeral timers start as soon as the messages are marked
|
||||
// as noticed.
|
||||
context.interrupt_ephemeral_task().await;
|
||||
context.scheduler.interrupt_ephemeral_task().await;
|
||||
}
|
||||
|
||||
let session = self
|
||||
@@ -2224,7 +2224,10 @@ pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str)
|
||||
paramsv![message_id],
|
||||
)
|
||||
.await?;
|
||||
context.interrupt_inbox(InterruptInfo::new(false)).await;
|
||||
context
|
||||
.scheduler
|
||||
.interrupt_inbox(InterruptInfo::new(false))
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::{
|
||||
use anyhow::{Context as _, Result};
|
||||
use async_imap::Client as ImapClient;
|
||||
use async_imap::Session as ImapSession;
|
||||
use tokio::io::BufStream;
|
||||
use tokio::io::BufWriter;
|
||||
|
||||
use super::capabilities::Capabilities;
|
||||
use super::session::Session;
|
||||
@@ -96,7 +96,7 @@ impl Client {
|
||||
) -> Result<Self> {
|
||||
let tcp_stream = connect_tcp(context, hostname, port, IMAP_TIMEOUT, strict_tls).await?;
|
||||
let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream).await?;
|
||||
let buffered_stream = BufStream::new(tls_stream);
|
||||
let buffered_stream = BufWriter::new(tls_stream);
|
||||
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
|
||||
let mut client = ImapClient::new(session_stream);
|
||||
|
||||
@@ -110,7 +110,7 @@ impl Client {
|
||||
|
||||
pub async fn connect_insecure(context: &Context, hostname: &str, port: u16) -> Result<Self> {
|
||||
let tcp_stream = connect_tcp(context, hostname, port, IMAP_TIMEOUT, false).await?;
|
||||
let buffered_stream = BufStream::new(tcp_stream);
|
||||
let buffered_stream = BufWriter::new(tcp_stream);
|
||||
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
|
||||
let mut client = ImapClient::new(session_stream);
|
||||
let _greeting = client
|
||||
@@ -145,7 +145,7 @@ impl Client {
|
||||
.await
|
||||
.context("STARTTLS upgrade failed")?;
|
||||
|
||||
let buffered_stream = BufStream::new(tls_stream);
|
||||
let buffered_stream = BufWriter::new(tls_stream);
|
||||
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
|
||||
let client = ImapClient::new(session_stream);
|
||||
|
||||
@@ -163,7 +163,7 @@ impl Client {
|
||||
.connect(context, domain, port, IMAP_TIMEOUT, strict_tls)
|
||||
.await?;
|
||||
let tls_stream = wrap_tls(strict_tls, domain, socks5_stream).await?;
|
||||
let buffered_stream = BufStream::new(tls_stream);
|
||||
let buffered_stream = BufWriter::new(tls_stream);
|
||||
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
|
||||
let mut client = ImapClient::new(session_stream);
|
||||
let _greeting = client
|
||||
@@ -183,7 +183,7 @@ impl Client {
|
||||
let socks5_stream = socks5_config
|
||||
.connect(context, domain, port, IMAP_TIMEOUT, false)
|
||||
.await?;
|
||||
let buffered_stream = BufStream::new(socks5_stream);
|
||||
let buffered_stream = BufWriter::new(socks5_stream);
|
||||
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
|
||||
let mut client = ImapClient::new(session_stream);
|
||||
let _greeting = client
|
||||
@@ -220,7 +220,7 @@ impl Client {
|
||||
let tls_stream = wrap_tls(strict_tls, hostname, socks5_stream)
|
||||
.await
|
||||
.context("STARTTLS upgrade failed")?;
|
||||
let buffered_stream = BufStream::new(tls_stream);
|
||||
let buffered_stream = BufWriter::new(tls_stream);
|
||||
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
|
||||
let client = ImapClient::new(session_stream);
|
||||
|
||||
|
||||
139
src/imex.rs
139
src/imex.rs
@@ -5,18 +5,19 @@ use std::ffi::OsStr;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use ::pgp::types::KeyTrait;
|
||||
use anyhow::{bail, ensure, format_err, Context as _, Error, Result};
|
||||
use anyhow::{bail, ensure, format_err, Context as _, Result};
|
||||
use futures::StreamExt;
|
||||
use futures_lite::FutureExt;
|
||||
use rand::{thread_rng, Rng};
|
||||
use tokio::fs::{self, File};
|
||||
use tokio_tar::Archive;
|
||||
|
||||
use crate::blob::BlobObject;
|
||||
use crate::blob::{BlobDirContents, BlobObject};
|
||||
use crate::chat::{self, delete_and_reset_all_device_msgs, ChatId};
|
||||
use crate::config::Config;
|
||||
use crate::contact::ContactId;
|
||||
use crate::context::Context;
|
||||
use crate::e2ee;
|
||||
use crate::events::EventType;
|
||||
use crate::key::{self, DcKey, DcSecretKey, SignedPublicKey, SignedSecretKey};
|
||||
use crate::log::LogExt;
|
||||
@@ -30,7 +31,10 @@ use crate::tools::{
|
||||
create_folder, delete_file, get_filesuffix_lc, open_file_std, read_file, time, write_file,
|
||||
EmailAddress,
|
||||
};
|
||||
use crate::{e2ee, tools};
|
||||
|
||||
mod transfer;
|
||||
|
||||
pub use transfer::{get_backup, BackupProvider};
|
||||
|
||||
// Name of the database file in the backup.
|
||||
const DBFILE_BACKUP_NAME: &str = "dc_database_backup.sqlite";
|
||||
@@ -86,13 +90,17 @@ pub async fn imex(
|
||||
) -> Result<()> {
|
||||
let cancel = context.alloc_ongoing().await?;
|
||||
|
||||
let res = imex_inner(context, what, path, passphrase)
|
||||
.race(async {
|
||||
cancel.recv().await.ok();
|
||||
Err(format_err!("canceled"))
|
||||
})
|
||||
.await;
|
||||
|
||||
let res = {
|
||||
let mut guard = context.scheduler.pause(context.clone()).await;
|
||||
let res = imex_inner(context, what, path, passphrase)
|
||||
.race(async {
|
||||
cancel.recv().await.ok();
|
||||
Err(format_err!("canceled"))
|
||||
})
|
||||
.await;
|
||||
guard.resume().await;
|
||||
res
|
||||
};
|
||||
context.free_ongoing().await;
|
||||
|
||||
if let Err(err) = res.as_ref() {
|
||||
@@ -250,7 +258,7 @@ async fn maybe_add_bcc_self_device_msg(context: &Context) -> Result<()> {
|
||||
msg.text = Some(
|
||||
"It seems you are using multiple devices with Delta Chat. Great!\n\n\
|
||||
If you also want to synchronize outgoing messages across all devices, \
|
||||
go to the settings and enable \"Send copy to self\"."
|
||||
go to \"Settings → Advanced\" and enable \"Send Copy to Self\"."
|
||||
.to_string(),
|
||||
);
|
||||
chat::add_device_msg(context, Some("bcc-self-hint"), Some(&mut msg)).await?;
|
||||
@@ -413,7 +421,7 @@ async fn import_backup(
|
||||
"Cannot import backups to accounts in use."
|
||||
);
|
||||
ensure!(
|
||||
context.scheduler.read().await.is_none(),
|
||||
!context.scheduler.is_running().await,
|
||||
"cannot import backup, IO is running"
|
||||
);
|
||||
|
||||
@@ -516,16 +524,9 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res
|
||||
let _d1 = DeleteOnDrop(temp_db_path.clone());
|
||||
let _d2 = DeleteOnDrop(temp_path.clone());
|
||||
|
||||
context
|
||||
.sql
|
||||
.set_raw_config_int("backup_time", now as i32)
|
||||
.await?;
|
||||
sql::housekeeping(context).await.ok_or_log(context);
|
||||
|
||||
ensure!(
|
||||
context.scheduler.read().await.is_none(),
|
||||
"cannot export backup, IO is running"
|
||||
);
|
||||
export_database(context, &temp_db_path, passphrase)
|
||||
.await
|
||||
.context("could not export database")?;
|
||||
|
||||
info!(
|
||||
context,
|
||||
@@ -534,32 +535,6 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res
|
||||
dest_path.display(),
|
||||
);
|
||||
|
||||
let path_str = temp_db_path
|
||||
.to_str()
|
||||
.with_context(|| format!("path {temp_db_path:?} is not valid unicode"))?;
|
||||
|
||||
context
|
||||
.sql
|
||||
.call_write(|conn| {
|
||||
if let Err(err) = conn.execute("VACUUM", params![]) {
|
||||
info!(context, "Vacuum failed, exporting anyway: {:#}.", err);
|
||||
}
|
||||
conn.execute(
|
||||
"ATTACH DATABASE ? AS backup KEY ?",
|
||||
paramsv![path_str, passphrase],
|
||||
)
|
||||
.context("failed to attach backup database")?;
|
||||
let res = conn
|
||||
.query_row("SELECT sqlcipher_export('backup')", [], |_row| Ok(()))
|
||||
.context("failed to export to attached backup database");
|
||||
conn.execute("DETACH DATABASE backup", [])
|
||||
.context("failed to detach backup database")?;
|
||||
res?;
|
||||
|
||||
Ok::<_, Error>(())
|
||||
})
|
||||
.await?;
|
||||
|
||||
let res = export_backup_inner(context, &temp_db_path, &temp_path).await;
|
||||
|
||||
match &res {
|
||||
@@ -597,29 +572,15 @@ async fn export_backup_inner(
|
||||
.append_path_with_name(temp_db_path, DBFILE_BACKUP_NAME)
|
||||
.await?;
|
||||
|
||||
let read_dir = tools::read_dir(context.get_blobdir()).await?;
|
||||
let count = read_dir.len();
|
||||
let mut written_files = 0;
|
||||
|
||||
let blobdir = BlobDirContents::new(context).await?;
|
||||
let mut last_progress = 0;
|
||||
for entry in read_dir {
|
||||
let name = entry.file_name();
|
||||
if !entry.file_type().await?.is_file() {
|
||||
warn!(
|
||||
context,
|
||||
"Export: Found dir entry {} that is not a file, ignoring",
|
||||
name.to_string_lossy()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let mut file = File::open(entry.path()).await?;
|
||||
let path_in_archive = PathBuf::from(BLOBS_BACKUP_NAME).join(name);
|
||||
builder.append_file(path_in_archive, &mut file).await?;
|
||||
|
||||
written_files += 1;
|
||||
let progress = 1000 * written_files / count;
|
||||
for (i, blob) in blobdir.iter().enumerate() {
|
||||
let mut file = File::open(blob.to_abs_path()).await?;
|
||||
let path_in_archive = PathBuf::from(BLOBS_BACKUP_NAME).join(blob.as_name());
|
||||
builder.append_file(path_in_archive, &mut file).await?;
|
||||
let progress = 1000 * i / blobdir.len();
|
||||
if progress != last_progress && progress > 10 && progress < 1000 {
|
||||
// We already emitted ImexProgress(10) above
|
||||
context.emit_event(EventType::ImexProgress(progress));
|
||||
last_progress = progress;
|
||||
}
|
||||
@@ -781,6 +742,48 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Exports the database to *dest*, encrypted using *passphrase*.
|
||||
///
|
||||
/// The directory of *dest* must already exist, if *dest* itself exists it will be
|
||||
/// overwritten.
|
||||
///
|
||||
/// This also verifies that IO is not running during the export.
|
||||
async fn export_database(context: &Context, dest: &Path, passphrase: String) -> Result<()> {
|
||||
ensure!(
|
||||
!context.scheduler.is_running().await,
|
||||
"cannot export backup, IO is running"
|
||||
);
|
||||
let now = time().try_into().context("32-bit UNIX time overflow")?;
|
||||
|
||||
// TODO: Maybe introduce camino crate for UTF-8 paths where we need them.
|
||||
let dest = dest
|
||||
.to_str()
|
||||
.with_context(|| format!("path {} is not valid unicode", dest.display()))?;
|
||||
|
||||
context.sql.set_raw_config_int("backup_time", now).await?;
|
||||
sql::housekeeping(context).await.ok_or_log(context);
|
||||
context
|
||||
.sql
|
||||
.call_write(|conn| {
|
||||
conn.execute("VACUUM;", params![])
|
||||
.map_err(|err| warn!(context, "Vacuum failed, exporting anyway {err}"))
|
||||
.ok();
|
||||
conn.execute(
|
||||
"ATTACH DATABASE ? AS backup KEY ?",
|
||||
paramsv![dest, passphrase],
|
||||
)
|
||||
.context("failed to attach backup database")?;
|
||||
let res = conn
|
||||
.query_row("SELECT sqlcipher_export('backup')", [], |_row| Ok(()))
|
||||
.context("failed to export to attached backup database");
|
||||
conn.execute("DETACH DATABASE backup", [])
|
||||
.context("failed to detach backup database")?;
|
||||
res?;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
694
src/imex/transfer.rs
Normal file
694
src/imex/transfer.rs
Normal file
@@ -0,0 +1,694 @@
|
||||
//! Transfer a backup to an other device.
|
||||
//!
|
||||
//! This module provides support for using n0's iroh tool to initiate transfer of a backup
|
||||
//! to another device using a QR code.
|
||||
//!
|
||||
//! Using the iroh terminology there are two parties to this:
|
||||
//!
|
||||
//! - The *Provider*, which starts a server and listens for connections.
|
||||
//! - The *Getter*, which connects to the server and retrieves the data.
|
||||
//!
|
||||
//! Iroh is designed around the idea of verifying hashes, the downloads are verified as
|
||||
//! they are retrieved. The entire transfer is initiated by requesting the data of a single
|
||||
//! root hash.
|
||||
//!
|
||||
//! Both the provider and the getter are authenticated:
|
||||
//!
|
||||
//! - The provider is known by its *peer ID*.
|
||||
//! - The provider needs an *authentication token* from the getter before it accepts a
|
||||
//! connection.
|
||||
//!
|
||||
//! Both these are transferred in the QR code offered to the getter. This ensures that the
|
||||
//! getter can not connect to an impersonated provider and the provider does not offer the
|
||||
//! download to an impersonated getter.
|
||||
|
||||
use std::future::Future;
|
||||
use std::net::Ipv4Addr;
|
||||
use std::ops::Deref;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::Pin;
|
||||
use std::task::Poll;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
|
||||
use async_channel::Receiver;
|
||||
use futures_lite::StreamExt;
|
||||
use iroh::get::{DataStream, Options};
|
||||
use iroh::progress::ProgressEmitter;
|
||||
use iroh::protocol::AuthToken;
|
||||
use iroh::provider::{DataSource, Event, Provider, Ticket};
|
||||
use iroh::Hash;
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::{self, AsyncWriteExt, BufWriter};
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::sync::{broadcast, Mutex};
|
||||
use tokio::task::{JoinHandle, JoinSet};
|
||||
use tokio_stream::wrappers::ReadDirStream;
|
||||
|
||||
use crate::blob::BlobDirContents;
|
||||
use crate::chat::delete_and_reset_all_device_msgs;
|
||||
use crate::context::Context;
|
||||
use crate::qr::Qr;
|
||||
use crate::{e2ee, EventType};
|
||||
|
||||
use super::{export_database, DBFILE_BACKUP_NAME};
|
||||
|
||||
/// Provide or send a backup of this device.
|
||||
///
|
||||
/// This creates a backup of the current device and starts a service which offers another
|
||||
/// device to download this backup.
|
||||
///
|
||||
/// This does not make a full backup on disk, only the SQLite database is created on disk,
|
||||
/// the blobs in the blob directory are not copied.
|
||||
///
|
||||
/// This starts a task which acquires the global "ongoing" mutex. If you need to stop the
|
||||
/// task use the [`Context::stop_ongoing`] mechanism.
|
||||
///
|
||||
/// The task implements [`Future`] and awaiting it will complete once a transfer has been
|
||||
/// either completed or aborted.
|
||||
#[derive(Debug)]
|
||||
pub struct BackupProvider {
|
||||
/// The supervisor task, run by [`BackupProvider::watch_provider`].
|
||||
handle: JoinHandle<Result<()>>,
|
||||
/// The ticket to retrieve the backup collection.
|
||||
ticket: Ticket,
|
||||
}
|
||||
|
||||
impl BackupProvider {
|
||||
/// Prepares for sending a backup to a second device.
|
||||
///
|
||||
/// Before calling this function all I/O must be stopped so that no changes to the blobs
|
||||
/// or database are happening, this is done by calling the [`Accounts::stop_io`] or
|
||||
/// [`Context::stop_io`] APIs first.
|
||||
///
|
||||
/// This will acquire the global "ongoing process" mutex, which can be used to cancel
|
||||
/// the process.
|
||||
///
|
||||
/// [`Accounts::stop_io`]: crate::accounts::Accounts::stop_io
|
||||
pub async fn prepare(context: &Context) -> Result<Self> {
|
||||
e2ee::ensure_secret_key_exists(context)
|
||||
.await
|
||||
.context("Private key not available, aborting backup export")?;
|
||||
|
||||
// Acquire global "ongoing" mutex.
|
||||
let cancel_token = context.alloc_ongoing().await?;
|
||||
let mut paused_guard = context.scheduler.pause(context.clone()).await;
|
||||
let context_dir = context
|
||||
.get_blobdir()
|
||||
.parent()
|
||||
.ok_or(anyhow!("Context dir not found"))?;
|
||||
let dbfile = context_dir.join(DBFILE_BACKUP_NAME);
|
||||
if fs::metadata(&dbfile).await.is_ok() {
|
||||
fs::remove_file(&dbfile).await?;
|
||||
warn!(context, "Previous database export deleted");
|
||||
}
|
||||
let dbfile = TempPathGuard::new(dbfile);
|
||||
let res = tokio::select! {
|
||||
biased;
|
||||
res = Self::prepare_inner(context, &dbfile) => {
|
||||
match res {
|
||||
Ok(slf) => Ok(slf),
|
||||
Err(err) => {
|
||||
error!(context, "Failed to set up second device setup: {:#}", err);
|
||||
Err(err)
|
||||
},
|
||||
}
|
||||
},
|
||||
_ = cancel_token.recv() => Err(format_err!("cancelled")),
|
||||
};
|
||||
let (provider, ticket) = match res {
|
||||
Ok((provider, ticket)) => (provider, ticket),
|
||||
Err(err) => {
|
||||
context.free_ongoing().await;
|
||||
paused_guard.resume().await;
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
let handle = {
|
||||
let context = context.clone();
|
||||
tokio::spawn(async move {
|
||||
let res = Self::watch_provider(&context, provider, cancel_token).await;
|
||||
context.free_ongoing().await;
|
||||
paused_guard.resume().await;
|
||||
drop(dbfile);
|
||||
res
|
||||
})
|
||||
};
|
||||
Ok(Self { handle, ticket })
|
||||
}
|
||||
|
||||
/// Creates the provider task.
|
||||
///
|
||||
/// Having this as a function makes it easier to cancel it when needed.
|
||||
async fn prepare_inner(context: &Context, dbfile: &Path) -> Result<(Provider, Ticket)> {
|
||||
// Generate the token up front: we also use it to encrypt the database.
|
||||
let token = AuthToken::generate();
|
||||
context.emit_event(SendProgress::Started.into());
|
||||
export_database(context, dbfile, token.to_string())
|
||||
.await
|
||||
.context("Database export failed")?;
|
||||
context.emit_event(SendProgress::DatabaseExported.into());
|
||||
|
||||
// Now we can be sure IO is not running.
|
||||
let mut files = vec![DataSource::with_name(
|
||||
dbfile.to_owned(),
|
||||
format!("db/{DBFILE_BACKUP_NAME}"),
|
||||
)];
|
||||
let blobdir = BlobDirContents::new(context).await?;
|
||||
for blob in blobdir.iter() {
|
||||
let path = blob.to_abs_path();
|
||||
let name = format!("blob/{}", blob.as_file_name());
|
||||
files.push(DataSource::with_name(path, name));
|
||||
}
|
||||
|
||||
// Start listening.
|
||||
let (db, hash) = iroh::provider::create_collection(files).await?;
|
||||
context.emit_event(SendProgress::CollectionCreated.into());
|
||||
let provider = Provider::builder(db)
|
||||
.bind_addr((Ipv4Addr::UNSPECIFIED, 0).into())
|
||||
.auth_token(token)
|
||||
.spawn()?;
|
||||
context.emit_event(SendProgress::ProviderListening.into());
|
||||
info!(context, "Waiting for remote to connect");
|
||||
let ticket = provider.ticket(hash);
|
||||
Ok((provider, ticket))
|
||||
}
|
||||
|
||||
/// Supervises the iroh [`Provider`], terminating it when needed.
|
||||
///
|
||||
/// This will watch the provider and terminate it when:
|
||||
///
|
||||
/// - A transfer is completed, successful or unsuccessful.
|
||||
/// - An event could not be observed to protect against not knowing of a completed event.
|
||||
/// - The ongoing process is cancelled.
|
||||
///
|
||||
/// The *cancel_token* is the handle for the ongoing process mutex, when this completes
|
||||
/// we must cancel this operation.
|
||||
async fn watch_provider(
|
||||
context: &Context,
|
||||
mut provider: Provider,
|
||||
cancel_token: Receiver<()>,
|
||||
) -> Result<()> {
|
||||
// _dbfile exists so we can clean up the file once it is no longer needed
|
||||
let mut events = provider.subscribe();
|
||||
let mut total_size = 0;
|
||||
let mut current_size = 0;
|
||||
let res = loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
res = &mut provider => {
|
||||
break res.context("BackupProvider failed");
|
||||
},
|
||||
maybe_event = events.recv() => {
|
||||
match maybe_event {
|
||||
Ok(event) => {
|
||||
match event {
|
||||
Event::ClientConnected { ..} => {
|
||||
context.emit_event(SendProgress::ClientConnected.into());
|
||||
}
|
||||
Event::RequestReceived { .. } => {
|
||||
}
|
||||
Event::TransferCollectionStarted { total_blobs_size, .. } => {
|
||||
total_size = total_blobs_size;
|
||||
context.emit_event(SendProgress::TransferInProgress {
|
||||
current_size,
|
||||
total_size,
|
||||
}.into());
|
||||
}
|
||||
Event::TransferBlobCompleted { size, .. } => {
|
||||
current_size += size;
|
||||
context.emit_event(SendProgress::TransferInProgress {
|
||||
current_size,
|
||||
total_size,
|
||||
}.into());
|
||||
}
|
||||
Event::TransferCollectionCompleted { .. } => {
|
||||
context.emit_event(SendProgress::TransferInProgress {
|
||||
current_size: total_size,
|
||||
total_size
|
||||
}.into());
|
||||
provider.shutdown();
|
||||
}
|
||||
Event::TransferAborted { .. } => {
|
||||
provider.shutdown();
|
||||
break Err(anyhow!("BackupProvider transfer aborted"));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
// We should never see this, provider.join() should complete
|
||||
// first.
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(_)) => {
|
||||
// We really shouldn't be lagging, if we did we may have missed
|
||||
// a completion event.
|
||||
provider.shutdown();
|
||||
break Err(anyhow!("Missed events from BackupProvider"));
|
||||
}
|
||||
}
|
||||
},
|
||||
_ = cancel_token.recv() => {
|
||||
provider.shutdown();
|
||||
break Err(anyhow!("BackupSender cancelled"));
|
||||
},
|
||||
}
|
||||
};
|
||||
match &res {
|
||||
Ok(_) => context.emit_event(SendProgress::Completed.into()),
|
||||
Err(err) => {
|
||||
error!(context, "Backup transfer failure: {err:#}");
|
||||
context.emit_event(SendProgress::Failed.into())
|
||||
}
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
/// Returns a QR code that allows fetching this backup.
|
||||
///
|
||||
/// This QR code can be passed to [`get_backup`] on a (different) device.
|
||||
pub fn qr(&self) -> Qr {
|
||||
Qr::Backup {
|
||||
ticket: self.ticket.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for BackupProvider {
|
||||
type Output = Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
|
||||
Pin::new(&mut self.handle).poll(cx)?
|
||||
}
|
||||
}
|
||||
|
||||
/// A guard which will remove the path when dropped.
|
||||
///
|
||||
/// It implements [`Deref`] it it can be used as a `&Path`.
|
||||
#[derive(Debug)]
|
||||
struct TempPathGuard {
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl TempPathGuard {
|
||||
fn new(path: PathBuf) -> Self {
|
||||
Self { path }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TempPathGuard {
|
||||
fn drop(&mut self) {
|
||||
let path = self.path.clone();
|
||||
tokio::spawn(async move {
|
||||
fs::remove_file(&path).await.ok();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for TempPathGuard {
|
||||
type Target = Path;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.path
|
||||
}
|
||||
}
|
||||
|
||||
/// Create [`EventType::ImexProgress`] events using readable names.
|
||||
///
|
||||
/// Plus you get warnings if you don't use all variants.
|
||||
#[derive(Debug)]
|
||||
enum SendProgress {
|
||||
Failed,
|
||||
Started,
|
||||
DatabaseExported,
|
||||
CollectionCreated,
|
||||
ProviderListening,
|
||||
ClientConnected,
|
||||
TransferInProgress { current_size: u64, total_size: u64 },
|
||||
Completed,
|
||||
}
|
||||
|
||||
impl From<SendProgress> for EventType {
|
||||
fn from(source: SendProgress) -> Self {
|
||||
use SendProgress::*;
|
||||
let num: u16 = match source {
|
||||
Failed => 0,
|
||||
Started => 100,
|
||||
DatabaseExported => 300,
|
||||
CollectionCreated => 350,
|
||||
ProviderListening => 400,
|
||||
ClientConnected => 450,
|
||||
TransferInProgress {
|
||||
current_size,
|
||||
total_size,
|
||||
} => {
|
||||
// the range is 450..=950
|
||||
450 + ((current_size as f64 / total_size as f64) * 500.).floor() as u16
|
||||
}
|
||||
Completed => 1000,
|
||||
};
|
||||
Self::ImexProgress(num.into())
|
||||
}
|
||||
}
|
||||
|
||||
/// Contacts a backup provider and receives the backup from it.
|
||||
///
|
||||
/// This uses a QR code to contact another instance of deltachat which is providing a backup
|
||||
/// using the [`BackupProvider`]. Once connected it will authenticate using the secrets in
|
||||
/// the QR code and retrieve the backup.
|
||||
///
|
||||
/// This is a long running operation which will only when completed.
|
||||
///
|
||||
/// Using [`Qr`] as argument is a bit odd as it only accepts one specific variant of it. It
|
||||
/// does avoid having [`iroh::provider::Ticket`] in the primary API however, without
|
||||
/// having to revert to untyped bytes.
|
||||
pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
|
||||
ensure!(
|
||||
matches!(qr, Qr::Backup { .. }),
|
||||
"QR code for backup must be of type DCBACKUP"
|
||||
);
|
||||
ensure!(
|
||||
!context.is_configured().await?,
|
||||
"Cannot import backups to accounts in use."
|
||||
);
|
||||
let mut guard = context.scheduler.pause(context.clone()).await;
|
||||
|
||||
// Acquire global "ongoing" mutex.
|
||||
let cancel_token = context.alloc_ongoing().await?;
|
||||
let res = tokio::select! {
|
||||
biased;
|
||||
res = get_backup_inner(context, qr) => {
|
||||
context.free_ongoing().await;
|
||||
res
|
||||
}
|
||||
_ = cancel_token.recv() => Err(format_err!("cancelled")),
|
||||
};
|
||||
guard.resume().await;
|
||||
res
|
||||
}
|
||||
|
||||
async fn get_backup_inner(context: &Context, qr: Qr) -> Result<()> {
|
||||
let ticket = match qr {
|
||||
Qr::Backup { ticket } => ticket,
|
||||
_ => bail!("QR code for backup must be of type DCBACKUP"),
|
||||
};
|
||||
if ticket.addrs.is_empty() {
|
||||
bail!("ticket is missing addresses to dial");
|
||||
}
|
||||
for addr in &ticket.addrs {
|
||||
let opts = Options {
|
||||
addr: *addr,
|
||||
peer_id: Some(ticket.peer),
|
||||
keylog: false,
|
||||
};
|
||||
info!(context, "attempting to contact {}", addr);
|
||||
match transfer_from_provider(context, &ticket, opts).await {
|
||||
Ok(_) => {
|
||||
delete_and_reset_all_device_msgs(context).await?;
|
||||
context.emit_event(ReceiveProgress::Completed.into());
|
||||
return Ok(());
|
||||
}
|
||||
Err(TransferError::ConnectionError(err)) => {
|
||||
warn!(context, "Connection error: {err:#}.");
|
||||
continue;
|
||||
}
|
||||
Err(TransferError::Other(err)) => {
|
||||
// Clean up any blobs we already wrote.
|
||||
let readdir = fs::read_dir(context.get_blobdir()).await?;
|
||||
let mut readdir = ReadDirStream::new(readdir);
|
||||
while let Some(dirent) = readdir.next().await {
|
||||
if let Ok(dirent) = dirent {
|
||||
fs::remove_file(dirent.path()).await.ok();
|
||||
}
|
||||
}
|
||||
context.emit_event(ReceiveProgress::Failed.into());
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(anyhow!("failed to contact provider"))
|
||||
}
|
||||
|
||||
/// Error during a single transfer attempt.
|
||||
///
|
||||
/// Mostly exists to distinguish between `ConnectionError` and any other errors.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
enum TransferError {
|
||||
#[error("connection error")]
|
||||
ConnectionError(#[source] anyhow::Error),
|
||||
#[error("other")]
|
||||
Other(#[source] anyhow::Error),
|
||||
}
|
||||
|
||||
async fn transfer_from_provider(
|
||||
context: &Context,
|
||||
ticket: &Ticket,
|
||||
opts: Options,
|
||||
) -> Result<(), TransferError> {
|
||||
let progress = ProgressEmitter::new(0, ReceiveProgress::max_blob_progress());
|
||||
spawn_progress_proxy(context.clone(), progress.subscribe());
|
||||
let mut connected = false;
|
||||
let on_connected = || {
|
||||
context.emit_event(ReceiveProgress::Connected.into());
|
||||
connected = true;
|
||||
async { Ok(()) }
|
||||
};
|
||||
let jobs = Mutex::new(JoinSet::default());
|
||||
let on_blob =
|
||||
|hash, reader, name| on_blob(context, &progress, &jobs, ticket, hash, reader, name);
|
||||
let res = iroh::get::run(
|
||||
ticket.hash,
|
||||
ticket.token,
|
||||
opts,
|
||||
on_connected,
|
||||
|collection| {
|
||||
context.emit_event(ReceiveProgress::CollectionReceived.into());
|
||||
progress.set_total(collection.total_blobs_size());
|
||||
async { Ok(()) }
|
||||
},
|
||||
on_blob,
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut jobs = jobs.lock().await;
|
||||
while let Some(job) = jobs.join_next().await {
|
||||
job.context("job failed").map_err(TransferError::Other)?;
|
||||
}
|
||||
|
||||
drop(progress);
|
||||
match res {
|
||||
Ok(stats) => {
|
||||
info!(
|
||||
context,
|
||||
"Backup transfer finished, transfer rate is {} Mbps.",
|
||||
stats.mbits()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => match connected {
|
||||
true => Err(TransferError::Other(err)),
|
||||
false => Err(TransferError::ConnectionError(err)),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Get callback when a blob is received from the provider.
|
||||
///
|
||||
/// This writes the blobs to the blobdir. If the blob is the database it will import it to
|
||||
/// the database of the current [`Context`].
|
||||
async fn on_blob(
|
||||
context: &Context,
|
||||
progress: &ProgressEmitter,
|
||||
jobs: &Mutex<JoinSet<()>>,
|
||||
ticket: &Ticket,
|
||||
_hash: Hash,
|
||||
mut reader: DataStream,
|
||||
name: String,
|
||||
) -> Result<DataStream> {
|
||||
ensure!(!name.is_empty(), "Received a nameless blob");
|
||||
let path = if name.starts_with("db/") {
|
||||
let context_dir = context
|
||||
.get_blobdir()
|
||||
.parent()
|
||||
.ok_or(anyhow!("Context dir not found"))?;
|
||||
let dbfile = context_dir.join(DBFILE_BACKUP_NAME);
|
||||
if fs::metadata(&dbfile).await.is_ok() {
|
||||
fs::remove_file(&dbfile).await?;
|
||||
warn!(context, "Previous database export deleted");
|
||||
}
|
||||
dbfile
|
||||
} else {
|
||||
ensure!(name.starts_with("blob/"), "malformatted blob name");
|
||||
let blobname = name.rsplit('/').next().context("malformatted blob name")?;
|
||||
context.get_blobdir().join(blobname)
|
||||
};
|
||||
|
||||
let mut wrapped_reader = progress.wrap_async_read(&mut reader);
|
||||
let file = File::create(&path).await?;
|
||||
let mut file = BufWriter::with_capacity(128 * 1024, file);
|
||||
io::copy(&mut wrapped_reader, &mut file).await?;
|
||||
file.flush().await?;
|
||||
|
||||
if name.starts_with("db/") {
|
||||
let context = context.clone();
|
||||
let token = ticket.token.to_string();
|
||||
jobs.lock().await.spawn(async move {
|
||||
if let Err(err) = context.sql.import(&path, token).await {
|
||||
error!(context, "cannot import database: {:#?}", err);
|
||||
}
|
||||
if let Err(err) = fs::remove_file(&path).await {
|
||||
error!(
|
||||
context,
|
||||
"failed to delete database import file '{}': {:#?}",
|
||||
path.display(),
|
||||
err,
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
Ok(reader)
|
||||
}
|
||||
|
||||
/// Spawns a task proxying progress events.
|
||||
///
|
||||
/// This spawns a tokio task which receives events from the [`ProgressEmitter`] and sends
|
||||
/// them to the context. The task finishes when the emitter is dropped.
|
||||
///
|
||||
/// This could be done directly in the emitter by making it less generic.
|
||||
fn spawn_progress_proxy(context: Context, mut rx: broadcast::Receiver<u16>) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(step) => context.emit_event(ReceiveProgress::BlobProgress(step).into()),
|
||||
Err(RecvError::Closed) => break,
|
||||
Err(RecvError::Lagged(_)) => continue,
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Create [`EventType::ImexProgress`] events using readable names.
|
||||
///
|
||||
/// Plus you get warnings if you don't use all variants.
|
||||
#[derive(Debug)]
|
||||
enum ReceiveProgress {
|
||||
Connected,
|
||||
CollectionReceived,
|
||||
/// A value between 0 and 85 interpreted as a percentage.
|
||||
///
|
||||
/// Other values are already used by the other variants of this enum.
|
||||
BlobProgress(u16),
|
||||
Completed,
|
||||
Failed,
|
||||
}
|
||||
|
||||
impl ReceiveProgress {
|
||||
/// The maximum value for [`ReceiveProgress::BlobProgress`].
|
||||
///
|
||||
/// This only exists to keep this magic value local in this type.
|
||||
fn max_blob_progress() -> u16 {
|
||||
85
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ReceiveProgress> for EventType {
|
||||
fn from(source: ReceiveProgress) -> Self {
|
||||
let val = match source {
|
||||
ReceiveProgress::Connected => 50,
|
||||
ReceiveProgress::CollectionReceived => 100,
|
||||
ReceiveProgress::BlobProgress(val) => 100 + 10 * val,
|
||||
ReceiveProgress::Completed => 1000,
|
||||
ReceiveProgress::Failed => 0,
|
||||
};
|
||||
EventType::ImexProgress(val.into())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::chat::{get_chat_msgs, send_msg, ChatItem};
|
||||
use crate::message::{Message, Viewtype};
|
||||
use crate::test_utils::TestContextManager;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_send_receive() {
|
||||
let mut tcm = TestContextManager::new();
|
||||
|
||||
// Create first device.
|
||||
let ctx0 = tcm.alice().await;
|
||||
|
||||
// Write a message in the self chat
|
||||
let self_chat = ctx0.get_self_chat().await;
|
||||
let mut msg = Message::new(Viewtype::Text);
|
||||
msg.set_text(Some("hi there".to_string()));
|
||||
send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
|
||||
|
||||
// Send an attachment in the self chat
|
||||
let file = ctx0.get_blobdir().join("hello.txt");
|
||||
fs::write(&file, "i am attachment").await.unwrap();
|
||||
let mut msg = Message::new(Viewtype::File);
|
||||
msg.set_file(file.to_str().unwrap(), Some("text/plain"));
|
||||
send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
|
||||
|
||||
// Prepare to transfer backup.
|
||||
let provider = BackupProvider::prepare(&ctx0).await.unwrap();
|
||||
|
||||
// Set up second device.
|
||||
let ctx1 = tcm.unconfigured().await;
|
||||
get_backup(&ctx1, provider.qr()).await.unwrap();
|
||||
|
||||
// Make sure the provider finishes without an error.
|
||||
tokio::time::timeout(Duration::from_secs(30), provider)
|
||||
.await
|
||||
.expect("timed out")
|
||||
.expect("error in provider");
|
||||
|
||||
// Check that we have the self message.
|
||||
let self_chat = ctx1.get_self_chat().await;
|
||||
let msgs = get_chat_msgs(&ctx1, self_chat.id).await.unwrap();
|
||||
assert_eq!(msgs.len(), 2);
|
||||
let msgid = match msgs.get(0).unwrap() {
|
||||
ChatItem::Message { msg_id } => msg_id,
|
||||
_ => panic!("wrong chat item"),
|
||||
};
|
||||
let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
|
||||
let text = msg.get_text().unwrap();
|
||||
assert_eq!(text, "hi there");
|
||||
let msgid = match msgs.get(1).unwrap() {
|
||||
ChatItem::Message { msg_id } => msg_id,
|
||||
_ => panic!("wrong chat item"),
|
||||
};
|
||||
let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
|
||||
let path = msg.get_file(&ctx1).unwrap();
|
||||
let text = fs::read_to_string(&path).await.unwrap();
|
||||
assert_eq!(text, "i am attachment");
|
||||
|
||||
// Check that both received the ImexProgress events.
|
||||
ctx0.evtracker
|
||||
.get_matching(|ev| matches!(ev, EventType::ImexProgress(1000)))
|
||||
.await;
|
||||
ctx1.evtracker
|
||||
.get_matching(|ev| matches!(ev, EventType::ImexProgress(1000)))
|
||||
.await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_send_progress() {
|
||||
let cases = [
|
||||
((0, 100), 450),
|
||||
((10, 100), 500),
|
||||
((50, 100), 700),
|
||||
((100, 100), 950),
|
||||
];
|
||||
|
||||
for ((current_size, total_size), progress) in cases {
|
||||
let out = EventType::from(SendProgress::TransferInProgress {
|
||||
current_size,
|
||||
total_size,
|
||||
});
|
||||
assert_eq!(out, EventType::ImexProgress(progress));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -238,6 +238,7 @@ fn get_backoff_time_offset(tries: u32) -> i64 {
|
||||
pub(crate) async fn schedule_resync(context: &Context) -> Result<()> {
|
||||
context.resync_request.store(true, Ordering::Relaxed);
|
||||
context
|
||||
.scheduler
|
||||
.interrupt_inbox(InterruptInfo {
|
||||
probe_network: false,
|
||||
})
|
||||
@@ -250,7 +251,10 @@ pub async fn add(context: &Context, job: Job) -> Result<()> {
|
||||
job.save(context).await.context("failed to save job")?;
|
||||
|
||||
info!(context, "interrupt: imap");
|
||||
context.interrupt_inbox(InterruptInfo::new(false)).await;
|
||||
context
|
||||
.scheduler
|
||||
.interrupt_inbox(InterruptInfo::new(false))
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -267,7 +267,7 @@ pub async fn send_locations_to_chat(
|
||||
}
|
||||
context.emit_event(EventType::ChatModified(chat_id));
|
||||
if 0 != seconds {
|
||||
context.interrupt_location().await;
|
||||
context.scheduler.interrupt_location().await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1419,7 +1419,10 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
|
||||
}
|
||||
|
||||
// Interrupt Inbox loop to start message deletion and run housekeeping.
|
||||
context.interrupt_inbox(InterruptInfo::new(false)).await;
|
||||
context
|
||||
.scheduler
|
||||
.interrupt_inbox(InterruptInfo::new(false))
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1531,7 +1534,10 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> Result<()>
|
||||
)
|
||||
.await
|
||||
.context("failed to insert into smtp_mdns")?;
|
||||
context.interrupt_smtp(InterruptInfo::new(false)).await;
|
||||
context
|
||||
.scheduler
|
||||
.interrupt_smtp(InterruptInfo::new(false))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
updated_chat_ids.insert(curr_chat_id);
|
||||
|
||||
@@ -2,7 +2,7 @@ use async_native_tls::TlsStream;
|
||||
use fast_socks5::client::Socks5Stream;
|
||||
use std::pin::Pin;
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, BufStream};
|
||||
use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, BufStream, BufWriter};
|
||||
use tokio_io_timeout::TimeoutStream;
|
||||
|
||||
pub(crate) trait SessionStream:
|
||||
@@ -27,6 +27,11 @@ impl<T: SessionStream> SessionStream for BufStream<T> {
|
||||
self.get_mut().set_read_timeout(timeout);
|
||||
}
|
||||
}
|
||||
impl<T: SessionStream> SessionStream for BufWriter<T> {
|
||||
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
|
||||
self.get_mut().set_read_timeout(timeout);
|
||||
}
|
||||
}
|
||||
impl<T: AsyncRead + AsyncWrite + Send + Sync + std::fmt::Debug> SessionStream
|
||||
for Pin<Box<TimeoutStream<T>>>
|
||||
{
|
||||
|
||||
42
src/qr.rs
42
src/qr.rs
@@ -34,6 +34,7 @@ const VCARD_SCHEME: &str = "BEGIN:VCARD";
|
||||
const SMTP_SCHEME: &str = "SMTP:";
|
||||
const HTTP_SCHEME: &str = "http://";
|
||||
const HTTPS_SCHEME: &str = "https://";
|
||||
pub(crate) const DCBACKUP_SCHEME: &str = "DCBACKUP:";
|
||||
|
||||
/// Scanned QR code.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
@@ -102,6 +103,20 @@ pub enum Qr {
|
||||
domain: String,
|
||||
},
|
||||
|
||||
/// Provides a backup that can be retrieve.
|
||||
///
|
||||
/// This contains all the data needed to connect to a device and download a backup from
|
||||
/// it to configure the receiving device with the same account.
|
||||
Backup {
|
||||
/// Printable version of the provider information.
|
||||
///
|
||||
/// This is the printable version of a `sendme` ticket, which contains all the
|
||||
/// information to connect to and authenticate a backup provider.
|
||||
///
|
||||
/// The format is somewhat opaque, but `sendme` can deserialise this.
|
||||
ticket: iroh::provider::Ticket,
|
||||
},
|
||||
|
||||
/// Ask the user if they want to use the given service for video chats.
|
||||
WebrtcInstance {
|
||||
/// Server domain name.
|
||||
@@ -244,6 +259,8 @@ pub async fn check_qr(context: &Context, qr: &str) -> Result<Qr> {
|
||||
dclogin_scheme::decode_login(qr)?
|
||||
} else if starts_with_ignore_case(qr, DCWEBRTC_SCHEME) {
|
||||
decode_webrtc_instance(context, qr)?
|
||||
} else if starts_with_ignore_case(qr, DCBACKUP_SCHEME) {
|
||||
decode_backup(qr)?
|
||||
} else if qr.starts_with(MAILTO_SCHEME) {
|
||||
decode_mailto(context, qr).await?
|
||||
} else if qr.starts_with(SMTP_SCHEME) {
|
||||
@@ -264,6 +281,19 @@ pub async fn check_qr(context: &Context, qr: &str) -> Result<Qr> {
|
||||
Ok(qrcode)
|
||||
}
|
||||
|
||||
/// Formats the text of the [`Qr::Backup`] variant.
|
||||
///
|
||||
/// This is the inverse of [`check_qr`] for that variant only.
|
||||
///
|
||||
/// TODO: Refactor this so all variants have a correct [`Display`] and transform `check_qr`
|
||||
/// into `FromStr`.
|
||||
pub fn format_backup(qr: &Qr) -> Result<String> {
|
||||
match qr {
|
||||
Qr::Backup { ref ticket } => Ok(format!("{DCBACKUP_SCHEME}{ticket}")),
|
||||
_ => Err(anyhow!("Not a backup QR code")),
|
||||
}
|
||||
}
|
||||
|
||||
/// scheme: `OPENPGP4FPR:FINGERPRINT#a=ADDR&n=NAME&i=INVITENUMBER&s=AUTH`
|
||||
/// or: `OPENPGP4FPR:FINGERPRINT#a=ADDR&g=GROUPNAME&x=GROUPID&i=INVITENUMBER&s=AUTH`
|
||||
/// or: `OPENPGP4FPR:FINGERPRINT#a=ADDR`
|
||||
@@ -471,6 +501,18 @@ fn decode_webrtc_instance(_context: &Context, qr: &str) -> Result<Qr> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Decodes a [`DCBACKUP_SCHEME`] QR code.
|
||||
///
|
||||
/// The format of this scheme is `DCBACKUP:<encoded ticket>`. The encoding is the
|
||||
/// [`iroh::provider::Ticket`]'s `Display` impl.
|
||||
fn decode_backup(qr: &str) -> Result<Qr> {
|
||||
let payload = qr
|
||||
.strip_prefix(DCBACKUP_SCHEME)
|
||||
.ok_or(anyhow!("invalid DCBACKUP scheme"))?;
|
||||
let ticket: iroh::provider::Ticket = payload.parse().context("invalid DCBACKUP payload")?;
|
||||
Ok(Qr::Backup { ticket })
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct CreateAccountSuccessResponse {
|
||||
/// Email address.
|
||||
|
||||
@@ -11,7 +11,9 @@ use crate::{
|
||||
config::Config,
|
||||
contact::{Contact, ContactId},
|
||||
context::Context,
|
||||
securejoin, stock_str,
|
||||
qr::{self, Qr},
|
||||
securejoin,
|
||||
stock_str::{self, backup_transfer_qr},
|
||||
};
|
||||
|
||||
/// Returns SVG of the QR code to join the group or verify contact.
|
||||
@@ -47,6 +49,34 @@ async fn generate_join_group_qr_code(context: &Context, chat_id: ChatId) -> Resu
|
||||
}
|
||||
|
||||
async fn generate_verification_qr(context: &Context) -> Result<String> {
|
||||
let (avatar, displayname, addr, color) = self_info(context).await?;
|
||||
|
||||
inner_generate_secure_join_qr_code(
|
||||
&stock_str::setup_contact_qr_description(context, &displayname, &addr).await,
|
||||
&securejoin::get_securejoin_qr(context, None).await?,
|
||||
&color,
|
||||
avatar,
|
||||
displayname.chars().next().unwrap_or('#'),
|
||||
)
|
||||
}
|
||||
|
||||
/// Renders a [`Qr::Backup`] QR code as an SVG image.
|
||||
pub async fn generate_backup_qr(context: &Context, qr: &Qr) -> Result<String> {
|
||||
let content = qr::format_backup(qr)?;
|
||||
let (avatar, displayname, _addr, color) = self_info(context).await?;
|
||||
let description = backup_transfer_qr(context).await?;
|
||||
|
||||
inner_generate_secure_join_qr_code(
|
||||
&description,
|
||||
&content,
|
||||
&color,
|
||||
avatar,
|
||||
displayname.chars().next().unwrap_or('#'),
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns `(avatar, displayname, addr, color) of the configured account.
|
||||
async fn self_info(context: &Context) -> Result<(Option<Vec<u8>>, String, String, String)> {
|
||||
let contact = Contact::get_by_id(context, ContactId::SELF).await?;
|
||||
|
||||
let avatar = match contact.get_profile_image(context).await? {
|
||||
@@ -59,16 +89,11 @@ async fn generate_verification_qr(context: &Context) -> Result<String> {
|
||||
|
||||
let displayname = match context.get_config(Config::Displayname).await? {
|
||||
Some(name) => name,
|
||||
None => contact.get_addr().to_owned(),
|
||||
None => contact.get_addr().to_string(),
|
||||
};
|
||||
|
||||
inner_generate_secure_join_qr_code(
|
||||
&stock_str::setup_contact_qr_description(context, &displayname, contact.get_addr()).await,
|
||||
&securejoin::get_securejoin_qr(context, None).await?,
|
||||
&color_int_to_hex_string(contact.get_color()),
|
||||
avatar,
|
||||
displayname.chars().next().unwrap_or('#'),
|
||||
)
|
||||
let addr = contact.get_addr().to_string();
|
||||
let color = color_int_to_hex_string(contact.get_color());
|
||||
Ok((avatar, displayname, addr, color))
|
||||
}
|
||||
|
||||
fn inner_generate_secure_join_qr_code(
|
||||
@@ -272,6 +297,12 @@ fn inner_generate_secure_join_qr_code(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use testdir::testdir;
|
||||
|
||||
use crate::imex::BackupProvider;
|
||||
use crate::qr::format_backup;
|
||||
use crate::test_utils::TestContextManager;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
@@ -286,4 +317,20 @@ mod tests {
|
||||
.unwrap();
|
||||
assert!(svg.contains("descr123 " < > &"))
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_generate_backup_qr() {
|
||||
let dir = testdir!();
|
||||
let mut tcm = TestContextManager::new();
|
||||
let ctx = tcm.alice().await;
|
||||
let provider = BackupProvider::prepare(&ctx).await.unwrap();
|
||||
let qr = provider.qr();
|
||||
|
||||
println!("{}", format_backup(&qr).unwrap());
|
||||
let rendered = generate_backup_qr(&ctx, &qr).await.unwrap();
|
||||
tokio::fs::write(dir.join("qr.svg"), &rendered)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(rendered.get(..4), Some("<svg"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,7 +115,9 @@ impl Context {
|
||||
let requested = self.quota_update_request.swap(true, Ordering::Relaxed);
|
||||
if !requested {
|
||||
// Quota update was not requested before.
|
||||
self.interrupt_inbox(InterruptInfo::new(false)).await;
|
||||
self.scheduler
|
||||
.interrupt_inbox(InterruptInfo::new(false))
|
||||
.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
251
src/scheduler.rs
251
src/scheduler.rs
@@ -5,6 +5,7 @@ use anyhow::{bail, Context as _, Result};
|
||||
use async_channel::{self as channel, Receiver, Sender};
|
||||
use futures::future::try_join_all;
|
||||
use futures_lite::FutureExt;
|
||||
use tokio::sync::{RwLock, RwLockWriteGuard};
|
||||
use tokio::task;
|
||||
|
||||
use self::connectivity::ConnectivityStore;
|
||||
@@ -23,10 +24,210 @@ use crate::tools::{duration_to_str, maybe_add_time_based_warnings};
|
||||
|
||||
pub(crate) mod connectivity;
|
||||
|
||||
/// State of the IO scheduler, as stored on the [`Context`].
|
||||
///
|
||||
/// The IO scheduler can be stopped or started, but core can also pause it. After pausing
|
||||
/// the IO scheduler will be restarted only if it was running before paused or
|
||||
/// [`Context::start_io`] was called in the meantime while it was paused.
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct SchedulerState {
|
||||
inner: RwLock<InnerSchedulerState>,
|
||||
}
|
||||
|
||||
impl SchedulerState {
|
||||
pub(crate) fn new() -> Self {
|
||||
Default::default()
|
||||
}
|
||||
|
||||
/// Whether the scheduler is currently running.
|
||||
pub(crate) async fn is_running(&self) -> bool {
|
||||
let inner = self.inner.read().await;
|
||||
inner.scheduler.is_some()
|
||||
}
|
||||
|
||||
/// Starts the scheduler if it is not yet started.
|
||||
pub(crate) async fn start(&self, context: Context) {
|
||||
let mut inner = self.inner.write().await;
|
||||
inner.started = true;
|
||||
if inner.scheduler.is_none() && !inner.paused {
|
||||
Self::do_start(inner, context).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts the scheduler if it is not yet started.
|
||||
async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) {
|
||||
info!(context, "starting IO");
|
||||
let ctx = context.clone();
|
||||
match Scheduler::start(context).await {
|
||||
Ok(scheduler) => inner.scheduler = Some(scheduler),
|
||||
Err(err) => error!(&ctx, "Failed to start IO: {:#}", err),
|
||||
}
|
||||
}
|
||||
|
||||
/// Stops the scheduler if it is currently running.
|
||||
pub(crate) async fn stop(&self, context: &Context) {
|
||||
let mut inner = self.inner.write().await;
|
||||
inner.started = false;
|
||||
Self::do_stop(inner, context).await;
|
||||
}
|
||||
|
||||
/// Stops the scheduler if it is currently running.
|
||||
async fn do_stop(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: &Context) {
|
||||
// Sending an event wakes up event pollers (get_next_event)
|
||||
// so the caller of stop_io() can arrange for proper termination.
|
||||
// For this, the caller needs to instruct the event poller
|
||||
// to terminate on receiving the next event and then call stop_io()
|
||||
// which will emit the below event(s)
|
||||
info!(context, "stopping IO");
|
||||
if let Some(debug_logging) = context.debug_logging.read().await.as_ref() {
|
||||
debug_logging.loop_handle.abort();
|
||||
}
|
||||
if let Some(scheduler) = inner.scheduler.take() {
|
||||
scheduler.stop(context).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Pauses the IO scheduler.
|
||||
///
|
||||
/// If it is currently running the scheduler will be stopped. When
|
||||
/// [`IoPausedGuard::resume`] is called the scheduler is started again.
|
||||
///
|
||||
/// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
|
||||
/// resume will do the right thing and restore the scheduler to the state requested by
|
||||
/// the last call.
|
||||
pub(crate) async fn pause<'a>(&'_ self, context: Context) -> IoPausedGuard {
|
||||
let mut inner = self.inner.write().await;
|
||||
inner.paused = true;
|
||||
Self::do_stop(inner, &context).await;
|
||||
IoPausedGuard {
|
||||
context,
|
||||
done: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Restarts the scheduler, only if it is running.
|
||||
pub(crate) async fn restart(&self, context: &Context) {
|
||||
info!(context, "restarting IO");
|
||||
if self.is_running().await {
|
||||
self.stop(context).await;
|
||||
self.start(context.clone()).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Indicate that the network likely has come back.
|
||||
pub(crate) async fn maybe_network(&self) {
|
||||
let inner = self.inner.read().await;
|
||||
let (inbox, oboxes) = match inner.scheduler {
|
||||
Some(ref scheduler) => {
|
||||
scheduler.maybe_network();
|
||||
let inbox = scheduler.inbox.conn_state.state.connectivity.clone();
|
||||
let oboxes = scheduler
|
||||
.oboxes
|
||||
.iter()
|
||||
.map(|b| b.conn_state.state.connectivity.clone())
|
||||
.collect::<Vec<_>>();
|
||||
(inbox, oboxes)
|
||||
}
|
||||
None => return,
|
||||
};
|
||||
drop(inner);
|
||||
connectivity::idle_interrupted(inbox, oboxes).await;
|
||||
}
|
||||
|
||||
/// Indicate that the network likely is lost.
|
||||
pub(crate) async fn maybe_network_lost(&self, context: &Context) {
|
||||
let inner = self.inner.read().await;
|
||||
let stores = match inner.scheduler {
|
||||
Some(ref scheduler) => {
|
||||
scheduler.maybe_network_lost();
|
||||
scheduler
|
||||
.boxes()
|
||||
.map(|b| b.conn_state.state.connectivity.clone())
|
||||
.collect()
|
||||
}
|
||||
None => return,
|
||||
};
|
||||
drop(inner);
|
||||
connectivity::maybe_network_lost(context, stores).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) {
|
||||
let inner = self.inner.read().await;
|
||||
if let Some(ref scheduler) = inner.scheduler {
|
||||
scheduler.interrupt_inbox(info);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) {
|
||||
let inner = self.inner.read().await;
|
||||
if let Some(ref scheduler) = inner.scheduler {
|
||||
scheduler.interrupt_smtp(info);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn interrupt_ephemeral_task(&self) {
|
||||
let inner = self.inner.read().await;
|
||||
if let Some(ref scheduler) = inner.scheduler {
|
||||
scheduler.interrupt_ephemeral_task();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn interrupt_location(&self) {
|
||||
let inner = self.inner.read().await;
|
||||
if let Some(ref scheduler) = inner.scheduler {
|
||||
scheduler.interrupt_location();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
|
||||
let inner = self.inner.read().await;
|
||||
if let Some(ref scheduler) = inner.scheduler {
|
||||
scheduler.interrupt_recently_seen(contact_id, timestamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct InnerSchedulerState {
|
||||
scheduler: Option<Scheduler>,
|
||||
started: bool,
|
||||
paused: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct IoPausedGuard {
|
||||
context: Context,
|
||||
done: bool,
|
||||
}
|
||||
|
||||
impl IoPausedGuard {
|
||||
pub(crate) async fn resume(&mut self) {
|
||||
self.done = true;
|
||||
let mut inner = self.context.scheduler.inner.write().await;
|
||||
inner.paused = false;
|
||||
if inner.started && inner.scheduler.is_none() {
|
||||
SchedulerState::do_start(inner, self.context.clone()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for IoPausedGuard {
|
||||
fn drop(&mut self) {
|
||||
if self.done {
|
||||
return;
|
||||
}
|
||||
|
||||
// Async .resume() should be called manually due to lack of async drop.
|
||||
error!(self.context, "Pause guard dropped without resuming.");
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SchedBox {
|
||||
meaning: FolderMeaning,
|
||||
conn_state: ImapConnectionState,
|
||||
|
||||
/// IMAP loop task handle.
|
||||
handle: task::JoinHandle<()>,
|
||||
}
|
||||
|
||||
@@ -46,56 +247,6 @@ pub(crate) struct Scheduler {
|
||||
recently_seen_loop: RecentlySeenLoop,
|
||||
}
|
||||
|
||||
impl Context {
|
||||
/// Indicate that the network likely has come back.
|
||||
pub async fn maybe_network(&self) {
|
||||
let lock = self.scheduler.read().await;
|
||||
if let Some(scheduler) = &*lock {
|
||||
scheduler.maybe_network();
|
||||
}
|
||||
connectivity::idle_interrupted(lock).await;
|
||||
}
|
||||
|
||||
/// Indicate that the network likely is lost.
|
||||
pub async fn maybe_network_lost(&self) {
|
||||
let lock = self.scheduler.read().await;
|
||||
if let Some(scheduler) = &*lock {
|
||||
scheduler.maybe_network_lost();
|
||||
}
|
||||
connectivity::maybe_network_lost(self, lock).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) {
|
||||
if let Some(scheduler) = &*self.scheduler.read().await {
|
||||
scheduler.interrupt_inbox(info);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) {
|
||||
if let Some(scheduler) = &*self.scheduler.read().await {
|
||||
scheduler.interrupt_smtp(info);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn interrupt_ephemeral_task(&self) {
|
||||
if let Some(scheduler) = &*self.scheduler.read().await {
|
||||
scheduler.interrupt_ephemeral_task();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn interrupt_location(&self) {
|
||||
if let Some(scheduler) = &*self.scheduler.read().await {
|
||||
scheduler.interrupt_location();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
|
||||
if let Some(scheduler) = &*self.scheduler.read().await {
|
||||
scheduler.interrupt_recently_seen(contact_id, timestamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) {
|
||||
use futures::future::FutureExt;
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::{iter::once, ops::Deref, sync::Arc};
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use humansize::{format_size, BINARY};
|
||||
use tokio::sync::{Mutex, RwLockReadGuard};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::events::EventType;
|
||||
use crate::imap::{scan_folders::get_watched_folder_configs, FolderMeaning};
|
||||
@@ -12,7 +12,7 @@ use crate::quota::{
|
||||
};
|
||||
use crate::tools::time;
|
||||
use crate::{context::Context, log::LogExt};
|
||||
use crate::{scheduler::Scheduler, stock_str, tools};
|
||||
use crate::{stock_str, tools};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumProperty, PartialOrd, Ord)]
|
||||
pub enum Connectivity {
|
||||
@@ -156,19 +156,7 @@ impl ConnectivityStore {
|
||||
/// Set all folder states to InterruptingIdle in case they were `Connected` before.
|
||||
/// Called during `dc_maybe_network()` to make sure that `dc_accounts_all_work_done()`
|
||||
/// returns false immediately after `dc_maybe_network()`.
|
||||
pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option<Scheduler>>) {
|
||||
let (inbox, oboxes) = match &*scheduler {
|
||||
Some(Scheduler { inbox, oboxes, .. }) => (
|
||||
inbox.conn_state.state.connectivity.clone(),
|
||||
oboxes
|
||||
.iter()
|
||||
.map(|b| b.conn_state.state.connectivity.clone())
|
||||
.collect::<Vec<_>>(),
|
||||
),
|
||||
None => return,
|
||||
};
|
||||
drop(scheduler);
|
||||
|
||||
pub(crate) async fn idle_interrupted(inbox: ConnectivityStore, oboxes: Vec<ConnectivityStore>) {
|
||||
let mut connectivity_lock = inbox.0.lock().await;
|
||||
// For the inbox, we also have to set the connectivity to InterruptingIdle if it was
|
||||
// NotConfigured before: If all folders are NotConfigured, dc_get_connectivity()
|
||||
@@ -195,19 +183,7 @@ pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option<Sched
|
||||
/// Set the connectivity to "Not connected" after a call to dc_maybe_network_lost().
|
||||
/// If we did not do this, the connectivity would stay "Connected" for quite a long time
|
||||
/// after `maybe_network_lost()` was called.
|
||||
pub(crate) async fn maybe_network_lost(
|
||||
context: &Context,
|
||||
scheduler: RwLockReadGuard<'_, Option<Scheduler>>,
|
||||
) {
|
||||
let stores: Vec<_> = match &*scheduler {
|
||||
Some(sched) => sched
|
||||
.boxes()
|
||||
.map(|b| b.conn_state.state.connectivity.clone())
|
||||
.collect(),
|
||||
None => return,
|
||||
};
|
||||
drop(scheduler);
|
||||
|
||||
pub(crate) async fn maybe_network_lost(context: &Context, stores: Vec<ConnectivityStore>) {
|
||||
for store in &stores {
|
||||
let mut connectivity_lock = store.0.lock().await;
|
||||
if !matches!(
|
||||
@@ -249,9 +225,9 @@ impl Context {
|
||||
///
|
||||
/// If the connectivity changes, a DC_EVENT_CONNECTIVITY_CHANGED will be emitted.
|
||||
pub async fn get_connectivity(&self) -> Connectivity {
|
||||
let lock = self.scheduler.read().await;
|
||||
let stores: Vec<_> = match &*lock {
|
||||
Some(sched) => sched
|
||||
let lock = self.scheduler.inner.read().await;
|
||||
let stores: Vec<_> = match lock.scheduler {
|
||||
Some(ref sched) => sched
|
||||
.boxes()
|
||||
.map(|b| b.conn_state.state.connectivity.clone())
|
||||
.collect(),
|
||||
@@ -332,9 +308,9 @@ impl Context {
|
||||
// Get the states from the RwLock
|
||||
// =============================================================================================
|
||||
|
||||
let lock = self.scheduler.read().await;
|
||||
let (folders_states, smtp) = match &*lock {
|
||||
Some(sched) => (
|
||||
let lock = self.scheduler.inner.read().await;
|
||||
let (folders_states, smtp) = match lock.scheduler {
|
||||
Some(ref sched) => (
|
||||
sched
|
||||
.boxes()
|
||||
.map(|b| (b.meaning, b.conn_state.state.connectivity.clone()))
|
||||
@@ -503,9 +479,9 @@ impl Context {
|
||||
|
||||
/// Returns true if all background work is done.
|
||||
pub async fn all_work_done(&self) -> bool {
|
||||
let lock = self.scheduler.read().await;
|
||||
let stores: Vec<_> = match &*lock {
|
||||
Some(sched) => sched
|
||||
let lock = self.scheduler.inner.read().await;
|
||||
let stores: Vec<_> = match lock.scheduler {
|
||||
Some(ref sched) => sched
|
||||
.boxes()
|
||||
.map(|b| &b.conn_state.state)
|
||||
.chain(once(&sched.smtp.state))
|
||||
|
||||
@@ -143,7 +143,7 @@ impl Smtp {
|
||||
// Run STARTTLS command and convert the client back into a stream.
|
||||
let client = smtp::SmtpClient::new().smtp_utf8(true);
|
||||
let transport = SmtpTransport::new(client, BufStream::new(socks5_stream)).await?;
|
||||
let tcp_stream = transport.starttls().await?;
|
||||
let tcp_stream = transport.starttls().await?.into_inner();
|
||||
let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream)
|
||||
.await
|
||||
.context("STARTTLS upgrade failed")?;
|
||||
@@ -199,7 +199,7 @@ impl Smtp {
|
||||
// Run STARTTLS command and convert the client back into a stream.
|
||||
let client = smtp::SmtpClient::new().smtp_utf8(true);
|
||||
let transport = SmtpTransport::new(client, BufStream::new(tcp_stream)).await?;
|
||||
let tcp_stream = transport.starttls().await?;
|
||||
let tcp_stream = transport.starttls().await?.into_inner();
|
||||
let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream)
|
||||
.await
|
||||
.context("STARTTLS upgrade failed")?;
|
||||
|
||||
62
src/sql.rs
62
src/sql.rs
@@ -306,37 +306,49 @@ impl Sql {
|
||||
}
|
||||
}
|
||||
|
||||
/// Locks the write transactions mutex.
|
||||
/// We do not make all transactions
|
||||
/// [IMMEDIATE](https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions)
|
||||
/// for more parallelism -- at least read transactions can be made DEFERRED to run in parallel
|
||||
/// w/o any drawbacks. But if we make write transactions DEFERRED also w/o any external locking,
|
||||
/// then they are upgraded from read to write ones on the first write statement. This has some
|
||||
/// drawbacks:
|
||||
/// - If there are other write transactions, we block the thread and the db connection until
|
||||
/// upgraded. Also if some reader comes then, it has to get next, less used connection with a
|
||||
/// worse per-connection page cache.
|
||||
/// - If a transaction is blocked for more than busy_timeout, it fails with SQLITE_BUSY.
|
||||
/// - Configuring busy_timeout is not the best way to manage transaction timeouts, we would
|
||||
/// prefer it to be integrated with Rust/tokio asyncs. Moreover, SQLite implements waiting
|
||||
/// using sleeps.
|
||||
/// - If upon a successful upgrade to a write transaction the db has been modified by another
|
||||
/// one, the transaction has to be rolled back and retried. It is an extra work in terms of
|
||||
/// Locks the write transactions mutex in order to make sure that there never are
|
||||
/// multiple write transactions at once.
|
||||
///
|
||||
/// Doing the locking ourselves instead of relying on SQLite has these reasons:
|
||||
///
|
||||
/// - SQLite's locking mechanism is non-async, blocking a thread
|
||||
/// - SQLite's locking mechanism just sleeps in a loop, which is really inefficient
|
||||
///
|
||||
/// ---
|
||||
///
|
||||
/// More considerations on alternatives to the current approach:
|
||||
///
|
||||
/// We use [DEFERRED](https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions) transactions.
|
||||
///
|
||||
/// In order to never get concurrency issues, we could make all transactions IMMEDIATE,
|
||||
/// but this would mean that there can never be two simultaneous transactions.
|
||||
///
|
||||
/// Read transactions can simply be made DEFERRED to run in parallel w/o any drawbacks.
|
||||
///
|
||||
/// DEFERRED write transactions without doing the locking ourselves would have these drawbacks:
|
||||
///
|
||||
/// 1. As mentioned above, SQLite's locking mechanism is non-async and sleeps in a loop.
|
||||
/// 2. If there are other write transactions, we block the db connection until
|
||||
/// upgraded. If some reader comes then, it has to get the next, less used connection with a
|
||||
/// worse per-connection page cache (SQLite allows one write and any number of reads in parallel).
|
||||
/// 3. If a transaction is blocked for more than `busy_timeout`, it fails with SQLITE_BUSY.
|
||||
/// 4. If upon a successful upgrade to a write transaction the db has been modified,
|
||||
/// the transaction has to be rolled back and retried, which means extra work in terms of
|
||||
/// CPU/battery.
|
||||
/// - Maybe minor, but we lose some fairness in servicing write transactions, i.e. we service
|
||||
/// them in the order of the first write statement, not in the order they come.
|
||||
/// The only pro of making write transactions DEFERRED w/o the external locking is some
|
||||
/// parallelism between them. Also we have an option to make write transactions IMMEDIATE, also
|
||||
/// w/o the external locking. But then the most of cons above are still valid. Instead, if we
|
||||
/// perform all write transactions under an async mutex, the only cons is losing some
|
||||
/// parallelism for write transactions.
|
||||
///
|
||||
/// The only pro of making write transactions DEFERRED w/o the external locking would be some
|
||||
/// parallelism between them.
|
||||
///
|
||||
/// Another option would be to make write transactions IMMEDIATE, also
|
||||
/// w/o the external locking. But then cons 1. - 3. above would still be valid.
|
||||
pub async fn write_lock(&self) -> MutexGuard<'_, ()> {
|
||||
self.write_mtx.lock().await
|
||||
}
|
||||
|
||||
/// Allocates a connection and calls `function` with the connection. If `function` does write
|
||||
/// queries, either a lock must be taken first using `write_lock()` or `call_write()` used
|
||||
/// instead.
|
||||
/// queries,
|
||||
/// - either first take a lock using `write_lock()`
|
||||
/// - or use `call_write()` instead.
|
||||
///
|
||||
/// Returns the result of the function.
|
||||
async fn call<'a, F, R>(&'a self, function: F) -> Result<R>
|
||||
|
||||
@@ -404,6 +404,9 @@ pub enum StockMessage {
|
||||
|
||||
#[strum(props(fallback = "Chat protection disabled by %1$s."))]
|
||||
ProtectionDisabledBy = 161,
|
||||
|
||||
#[strum(props(fallback = "Scan to set up second device for %1$s"))]
|
||||
BackupTransferQr = 162,
|
||||
}
|
||||
|
||||
impl StockMessage {
|
||||
@@ -741,14 +744,14 @@ pub(crate) async fn setup_contact_qr_description(
|
||||
display_name: &str,
|
||||
addr: &str,
|
||||
) -> String {
|
||||
let name = &if display_name == addr {
|
||||
let name = if display_name == addr {
|
||||
addr.to_owned()
|
||||
} else {
|
||||
format!("{display_name} ({addr})")
|
||||
};
|
||||
translated(context, StockMessage::SetupContactQRDescription)
|
||||
.await
|
||||
.replace1(name)
|
||||
.replace1(&name)
|
||||
}
|
||||
|
||||
/// Stock string: `Scan to join %1$s`.
|
||||
@@ -1240,6 +1243,24 @@ pub(crate) async fn aeap_explanation_and_link(
|
||||
.replace2(new_addr)
|
||||
}
|
||||
|
||||
/// Text to put in the [`Qr::Backup`] rendered SVG image.
|
||||
///
|
||||
/// The default is "Scan to set up second device for <account name (account addr)>". The
|
||||
/// account name and address are looked up from the context.
|
||||
///
|
||||
/// [`Qr::Backup`]: crate::qr::Qr::Backup
|
||||
pub(crate) async fn backup_transfer_qr(context: &Context) -> Result<String> {
|
||||
let contact = Contact::get_by_id(context, ContactId::SELF).await?;
|
||||
let addr = contact.get_addr();
|
||||
let full_name = match context.get_config(Config::Displayname).await? {
|
||||
Some(name) if name != addr => format!("{name} ({addr})"),
|
||||
_ => addr.to_string(),
|
||||
};
|
||||
Ok(translated(context, StockMessage::BackupTransferQr)
|
||||
.await
|
||||
.replace1(&full_name))
|
||||
}
|
||||
|
||||
impl Context {
|
||||
/// Set the stock string for the [StockMessage].
|
||||
///
|
||||
|
||||
@@ -109,8 +109,8 @@ impl Context {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Add deleted qr-code token to the list of items to be synced
|
||||
// so that the token also gets deleted on the other devices.
|
||||
/// Adds deleted qr-code token to the list of items to be synced
|
||||
/// so that the token also gets deleted on the other devices.
|
||||
pub(crate) async fn sync_qr_code_token_deletion(
|
||||
&self,
|
||||
invitenumber: String,
|
||||
|
||||
@@ -40,6 +40,11 @@ pub const AVATAR_900x900_BYTES: &[u8] = include_bytes!("../test-data/image/avata
|
||||
static CONTEXT_NAMES: Lazy<std::sync::RwLock<BTreeMap<u32, String>>> =
|
||||
Lazy::new(|| std::sync::RwLock::new(BTreeMap::new()));
|
||||
|
||||
/// Manage multiple [`TestContext`]s in one place.
|
||||
///
|
||||
/// The main advantage is that the log records of the contexts will appear in the order they
|
||||
/// occurred rather than grouped by context like would happen when you use separate
|
||||
/// [`TestContext`]s without managing your own [`LogSink`].
|
||||
pub struct TestContextManager {
|
||||
log_tx: Sender<LogEvent>,
|
||||
_log_sink: LogSink,
|
||||
|
||||
@@ -421,7 +421,9 @@ impl Context {
|
||||
DO UPDATE SET last_serial=excluded.last_serial, descr=excluded.descr",
|
||||
paramsv![instance.id, status_update_serial, status_update_serial, descr],
|
||||
).await?;
|
||||
self.interrupt_smtp(InterruptInfo::new(false)).await;
|
||||
self.scheduler
|
||||
.interrupt_smtp(InterruptInfo::new(false))
|
||||
.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user