Compare commits

..

1 Commits

Author SHA1 Message Date
link2xt
3010d28901 fix: prevent reuse of the stream after an error
When a stream timeouts, `tokio_io_timeout::TimeoutStream`
returns an error once, but then allows to keep using
the stream, e.g. calling `poll_read()` again.

This can be dangerous if the error is ignored.
For example in case of IMAP stream,
if IMAP command is sent,
but then reading the response
times out and the error is ignored,
it is possible to send another IMAP command.
In this case leftover response
from a previous command may be read
and interpreted as the response
to the new IMAP command.

ErrorCapturingStream wraps the stream
to prevent its reuse after an error.
2025-07-19 13:44:00 +00:00
30 changed files with 276 additions and 212 deletions

View File

@@ -1,40 +1,5 @@
# Changelog
## [2.6.0] - 2025-07-23
### Fixes
- Fix crash when receiving a verification-gossiping message which a contact also sends to itself ([#7032](https://github.com/chatmail/core/pull/7032)).
## [2.5.0] - 2025-07-22
### Fixes
- Correctly migrate "verified by me".
- Mark all email chats as unprotected in the migration ([#7026](https://github.com/chatmail/core/pull/7026)).
- Do not ignore errors in add_flag_finalized_with_set.
### Documentation
- Deprecate protection-broken and related stuff ([#7018](https://github.com/chatmail/core/pull/7018)).
- Clarify the meaning of is_verified() vs verifier_id() ([#7027](https://github.com/chatmail/core/pull/7027)).
- STYLE.md: Prefer `try_next()` over `next()`.
## [2.4.0] - 2025-07-21
### Fixes
- Do not ignore errors when draining FETCH responses. This avoids IMAP loop getting stuck in an infinite loop retrying reading from the connection.
- Update `tokio-io-timeout` to 1.2.1. This release includes a fix to reset timeout after every error, so timeout error is returned at most once a minute if read is attempted after a timeout.
### Miscellaneous Tasks
- Update async-imap to 0.11.0.
### Refactor
- Use `try_next()` when processing FETCH responses.
## [2.3.0] - 2025-07-19
### Features / Changes
@@ -6530,6 +6495,3 @@ https://github.com/chatmail/core/pulls?q=is%3Apr+is%3Aclosed
[2.1.0]: https://github.com/chatmail/core/compare/v2.0.0..v2.1.0
[2.2.0]: https://github.com/chatmail/core/compare/v2.1.0..v2.2.0
[2.3.0]: https://github.com/chatmail/core/compare/v2.2.0..v2.3.0
[2.4.0]: https://github.com/chatmail/core/compare/v2.3.0..v2.4.0
[2.5.0]: https://github.com/chatmail/core/compare/v2.4.0..v2.5.0
[2.6.0]: https://github.com/chatmail/core/compare/v2.5.0..v2.6.0

20
Cargo.lock generated
View File

@@ -268,9 +268,9 @@ dependencies = [
[[package]]
name = "async-imap"
version = "0.11.0"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9f9a9c94a403cf46aa2b4cecbceefc6e4284441ebbeca79b80f3bab4394458"
checksum = "ca726c61b73c471f531b65e83e161776ba62c2b6ba4ec73d51fad357009ed00a"
dependencies = [
"async-channel 2.3.1",
"async-compression",
@@ -1285,7 +1285,7 @@ dependencies = [
[[package]]
name = "deltachat"
version = "2.6.0"
version = "2.3.0"
dependencies = [
"anyhow",
"async-broadcast",
@@ -1395,7 +1395,7 @@ dependencies = [
[[package]]
name = "deltachat-jsonrpc"
version = "2.6.0"
version = "2.3.0"
dependencies = [
"anyhow",
"async-channel 2.3.1",
@@ -1417,7 +1417,7 @@ dependencies = [
[[package]]
name = "deltachat-repl"
version = "2.6.0"
version = "2.3.0"
dependencies = [
"anyhow",
"deltachat",
@@ -1433,7 +1433,7 @@ dependencies = [
[[package]]
name = "deltachat-rpc-server"
version = "2.6.0"
version = "2.3.0"
dependencies = [
"anyhow",
"deltachat",
@@ -1462,7 +1462,7 @@ dependencies = [
[[package]]
name = "deltachat_ffi"
version = "2.6.0"
version = "2.3.0"
dependencies = [
"anyhow",
"deltachat",
@@ -3826,7 +3826,7 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56"
dependencies = [
"proc-macro-crate 3.2.0",
"proc-macro-crate 2.0.0",
"proc-macro2",
"quote",
"syn 2.0.104",
@@ -6073,9 +6073,9 @@ dependencies = [
[[package]]
name = "tokio-io-timeout"
version = "1.2.1"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bd86198d9ee903fedd2f9a2e72014287c0d9167e4ae43b5853007205dda1b76"
checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
dependencies = [
"pin-project-lite",
"tokio",

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat"
version = "2.6.0"
version = "2.3.0"
edition = "2024"
license = "MPL-2.0"
rust-version = "1.85"
@@ -44,7 +44,7 @@ ratelimit = { path = "./deltachat-ratelimit" }
anyhow = { workspace = true }
async-broadcast = "0.7.2"
async-channel = { workspace = true }
async-imap = { version = "0.11.0", default-features = false, features = ["runtime-tokio", "compress"] }
async-imap = { version = "0.10.4", default-features = false, features = ["runtime-tokio", "compress"] }
async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] }
async-smtp = { version = "0.10.2", default-features = false, features = ["runtime-tokio"] }
async_zip = { version = "0.0.17", default-features = false, features = ["deflate", "tokio-fs"] }
@@ -101,7 +101,7 @@ strum_macros = "0.27"
tagger = "4.3.4"
textwrap = "0.16.2"
thiserror = { workspace = true }
tokio-io-timeout = "1.2.1"
tokio-io-timeout = "1.2.0"
tokio-rustls = { version = "0.26.2", default-features = false }
tokio-stream = { version = "0.1.17", features = ["fs"] }
tokio-tar = { version = "0.3" } # TODO: integrate tokio into async-tar

View File

@@ -78,27 +78,6 @@ All errors should be handled in one of these ways:
- With `.log_err().ok()`.
- Bubbled up with `?`.
When working with [async streams](https://docs.rs/futures/0.3.31/futures/stream/index.html),
prefer [`try_next`](https://docs.rs/futures/0.3.31/futures/stream/trait.TryStreamExt.html#method.try_next) over `next()`, e.g. do
```
while let Some(event) = stream.try_next().await? {
todo!();
}
```
instead of
```
while let Some(event_res) = stream.next().await {
todo!();
}
```
as it allows bubbling up the error early with `?`
with no way to accidentally skip error processing
with early `continue` or `break`.
Some streams reading from a connection
return infinite number of `Some(Err(_))`
items when connection breaks and not processing
errors may result in infinite loop.
`backtrace` feature is enabled for `anyhow` crate
and `debug = 1` option is set in the test profile.
This allows to run `RUST_BACKTRACE=1 cargo test`

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat_ffi"
version = "2.6.0"
version = "2.3.0"
description = "Deltachat FFI"
edition = "2018"
readme = "README.md"

View File

@@ -503,6 +503,13 @@ char* dc_get_blobdir (const dc_context_t* context);
* - `gossip_period` = How often to gossip Autocrypt keys in chats with multiple recipients, in
* seconds. 2 days by default.
* This is not supposed to be changed by UIs and only used for testing.
* - `verified_one_on_one_chats` = Feature flag for verified 1:1 chats; the UI should set it
* to 1 if it supports verified 1:1 chats.
* Regardless of this setting, `dc_chat_is_protected()` returns true while the key is verified,
* and when the key changes, an info message is posted into the chat.
* 0=Nothing else happens when the key changes.
* 1=After the key changed, `dc_chat_can_send()` returns false and `dc_chat_is_protection_broken()` returns true
* until `dc_accept_chat()` is called.
* - `is_chatmail` = 1 if the the server is a chatmail server, 0 otherwise.
* - `is_muted` = Whether a context is muted by the user.
* Muted contexts should not sound, vibrate or show notifications.
@@ -3811,12 +3818,21 @@ int dc_chat_can_send (const dc_chat_t* chat);
/**
* Check if a chat is protected.
*
* Only verified contacts
* End-to-end encryption is guaranteed in protected chats
* and only verified contacts
* as determined by dc_contact_is_verified()
* can be added to protected chats.
*
* Protected chats are created using dc_create_group_chat()
* by setting the 'protect' parameter to 1.
* 1:1 chats become protected or unprotected automatically
* if `verified_one_on_one_chats` setting is enabled.
*
* UI should display a green checkmark
* in the chat title,
* in the chatlist item
* and in the chat profile
* if chat protection is enabled.
*
* @memberof dc_chat_t
* @param chat The chat object.
@@ -3853,8 +3869,6 @@ int dc_chat_is_encrypted (const dc_chat_t *chat);
*
* The UI should let the user confirm that this is OK with a message like
* `Bob sent a message from another device. Tap to learn more` and then call dc_accept_chat().
*
* @deprecated 2025-07 chats protection cannot break any longer
* @memberof dc_chat_t
* @param chat The chat object.
* @return 1=chat protection broken, 0=otherwise.
@@ -5253,14 +5267,20 @@ int dc_contact_is_blocked (const dc_contact_t* contact);
/**
* Check if the contact
* can be added to protected chats.
* can be added to verified chats,
* i.e. has a verified key
* and Autocrypt key matches the verified key.
*
* See dc_contact_get_verifier_id() for a guidance how to display these information.
* If contact is verified
* UI should display green checkmark after the contact name
* in contact list items,
* in chat member list items
* and in profiles if no chat with the contact exist (otherwise, use dc_chat_is_protected()).
*
* @memberof dc_contact_t
* @param contact The contact object.
* @return 0: contact is not verified.
* 2: SELF and contact have verified their fingerprints in both directions.
* 2: SELF and contact have verified their fingerprints in both directions; in the UI typically checkmarks are shown.
*/
int dc_contact_is_verified (dc_contact_t* contact);
@@ -5291,22 +5311,16 @@ int dc_contact_is_key_contact (dc_contact_t* contact);
/**
* Return the contact ID that verified a contact.
*
* As verifier may be unknown,
* use dc_contact_is_verified() to check if a contact can be added to a protected chat.
* If the function returns non-zero result,
* display green checkmark in the profile and "Introduced by ..." line
* with the name and address of the contact
* formatted by dc_contact_get_name_n_addr.
*
* UI should display the information in the contact's profile as follows:
*
* - If dc_contact_get_verifier_id() != 0,
* display text "Introduced by ..."
* with the name and address of the contact
* formatted by dc_contact_get_name_n_addr().
* Prefix the text by a green checkmark.
*
* - If dc_contact_get_verifier_id() == 0 and dc_contact_is_verified() != 0,
* display "Introduced" prefixed by a green checkmark.
*
* - if dc_contact_get_verifier_id() == 0 and dc_contact_is_verified() == 0,
* display nothing
* If this function returns a verifier,
* this does not necessarily mean
* you can add the contact to verified chats.
* Use dc_contact_is_verified() to check
* if a contact can be added to a verified chat instead.
*
* @memberof dc_contact_t
* @param contact The contact object.
@@ -6372,6 +6386,7 @@ void dc_event_unref(dc_event_t* event);
/**
* Chat changed. The name or the image of a chat group was changed or members were added or removed.
* Or the verify state of a chat has changed.
* See dc_set_chat_name(), dc_set_chat_profile_image(), dc_add_contact_to_chat()
* and dc_remove_contact_from_chat().
*

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat-jsonrpc"
version = "2.6.0"
version = "2.3.0"
description = "DeltaChat JSON-RPC API"
edition = "2021"
license = "MPL-2.0"

View File

@@ -21,16 +21,15 @@ pub struct FullChat {
/// True if the chat is protected.
///
/// Only verified contacts
/// as determined by [`ContactObject::is_verified`] / `Contact.isVerified`
/// can be added to protected chats.
///
/// Protected chats are created using [`create_group_chat`] / `createGroupChat()`
/// by setting the 'protect' parameter to true.
///
/// [`create_group_chat`]: crate::api::CommandApi::create_group_chat
/// UI should display a green checkmark
/// in the chat title,
/// in the chat profile title and
/// in the chatlist item
/// if chat protection is enabled.
/// UI should also display a green checkmark
/// in the contact profile
/// if 1:1 chat with this contact exists and is protected.
is_protected: bool,
/// True if the chat is encrypted.
/// This means that all messages in the chat are encrypted,
/// and all contacts in the chat are "key-contacts",
@@ -71,7 +70,7 @@ pub struct FullChat {
fresh_message_counter: usize,
// is_group - please check over chat.type in frontend instead
is_contact_request: bool,
is_protection_broken: bool, // deprecated 2025-07
is_protection_broken: bool,
is_device_chat: bool,
self_in_group: bool,
is_muted: bool,

View File

@@ -31,11 +31,13 @@ pub struct ContactObject {
/// e.g. if we just scanned the fingerprint from a QR code.
e2ee_avail: bool,
/// True if the contact
/// can be added to protected chats
/// because SELF and contact have verified their fingerprints in both directions.
/// True if the contact can be added to verified groups.
///
/// See [`Self::verifier_id`]/`Contact.verifierId` for a guidance how to display these information.
/// If this is true
/// UI should display green checkmark after the contact name
/// in contact list items,
/// in chat member list items
/// and in profiles if no chat with the contact exist.
is_verified: bool,
/// True if the contact profile title should have a green checkmark.
@@ -44,29 +46,12 @@ pub struct ContactObject {
/// or will have a green checkmark if created.
is_profile_verified: bool,
/// The contact ID that verified a contact.
/// The ID of the contact that verified this contact.
///
/// As verifier may be unknown,
/// use [`Self::is_verified`]/`Contact.isVerified` to check if a contact can be added to a protected chat.
///
/// UI should display the information in the contact's profile as follows:
///
/// - If `verifierId` != 0,
/// display text "Introduced by ..."
/// with the name and address of the contact
/// formatted by `name_and_addr`/`nameAndAddr`.
/// Prefix the text by a green checkmark.
///
/// - If `verifierId` == 0 and `isVerified` != 0,
/// display "Introduced" prefixed by a green checkmark.
///
/// - if `verifierId` == 0 and `isVerified` == 0,
/// display nothing
///
/// This contains the contact ID of the verifier.
/// If it is `DC_CONTACT_ID_SELF`, we verified the contact ourself.
/// If it is None/Null, we don't have verifier information or
/// the contact is not verified.
/// If this is present,
/// display a green checkmark and "Introduced by ..."
/// string followed by the verifier contact name and address
/// in the contact profile.
verifier_id: Option<u32>,
/// the contact's last seen timestamp

View File

@@ -224,6 +224,7 @@ pub enum EventType {
},
/// Chat changed. The name or the image of a chat group was changed or members were added or removed.
/// Or the verify state of a chat has changed.
/// See setChatName(), setChatProfileImage(), addContactToChat()
/// and removeContactFromChat().
///

View File

@@ -54,5 +54,5 @@
},
"type": "module",
"types": "dist/deltachat.d.ts",
"version": "2.6.0"
"version": "2.3.0"
}

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat-repl"
version = "2.6.0"
version = "2.3.0"
license = "MPL-2.0"
edition = "2021"
repository = "https://github.com/chatmail/core"

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "deltachat-rpc-client"
version = "2.6.0"
version = "2.3.0"
description = "Python client for Delta Chat core JSON-RPC interface"
classifiers = [
"Development Status :: 5 - Production/Stable",

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat-rpc-server"
version = "2.6.0"
version = "2.3.0"
description = "DeltaChat JSON-RPC server"
edition = "2021"
readme = "README.md"

View File

@@ -15,5 +15,5 @@
},
"type": "module",
"types": "index.d.ts",
"version": "2.6.0"
"version": "2.3.0"
}

View File

@@ -33,6 +33,7 @@ skip = [
{ name = "lru", version = "0.12.3" },
{ name = "netlink-packet-route", version = "0.17.1" },
{ name = "nom", version = "7.1.3" },
{ name = "proc-macro-crate", version = "2.0.0" },
{ name = "rand_chacha", version = "0.3.1" },
{ name = "rand_core", version = "0.6.4" },
{ name = "rand", version = "0.8.5" },
@@ -48,6 +49,7 @@ skip = [
{ name = "syn", version = "1.0.109" },
{ name = "thiserror-impl", version = "1.0.69" },
{ name = "thiserror", version = "1.0.69" },
{ name = "toml_edit", version = "0.20.7" },
{ name = "wasi", version = "0.11.0+wasi-snapshot-preview1" },
{ name = "windows" },
{ name = "windows_aarch64_gnullvm" },
@@ -65,6 +67,7 @@ skip = [
{ name = "windows_x86_64_gnu" },
{ name = "windows_x86_64_gnullvm" },
{ name = "windows_x86_64_msvc" },
{ name = "winnow", version = "0.5.40" },
{ name = "zerocopy", version = "0.7.32" },
]

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "deltachat"
version = "2.6.0"
version = "2.3.0"
description = "Python bindings for the Delta Chat Core library using CFFI against the Rust-implemented libdeltachat"
readme = "README.rst"
requires-python = ">=3.8"

View File

@@ -1 +1 @@
2025-07-23
2025-07-19

View File

@@ -118,7 +118,7 @@ pub(crate) enum CantSendReason {
/// The chat is a contact request, it needs to be accepted before sending a message.
ContactRequest,
/// Deprecated. The chat was protected, but now a new message came in
/// The chat was protected, but now a new message came in
/// which was not encrypted / signed correctly.
ProtectionBroken,
@@ -1935,7 +1935,7 @@ impl Chat {
Ok(is_encrypted)
}
/// Deprecated 2025-07. Returns true if the chat was protected, and then an incoming message broke this protection.
/// Returns true if the chat was protected, and then an incoming message broke this protection.
///
/// This function is only useful if the UI enabled the `verified_one_on_one_chats` feature flag,
/// otherwise it will return false for all chats.

View File

@@ -417,7 +417,7 @@ pub enum Config {
#[strum(props(default = "172800"))]
GossipPeriod,
/// Deprecated 2025-07. Feature flag for verified 1:1 chats; the UI should set it
/// Feature flag for verified 1:1 chats; the UI should set it
/// to 1 if it supports verified 1:1 chats.
/// Regardless of this setting, `chat.is_protected()` returns true while the key is verified,
/// and when the key changes, an info message is posted into the chat.

View File

@@ -1043,7 +1043,7 @@ impl Context {
self.get_config_int(Config::GossipPeriod).await?.to_string(),
);
res.insert(
"verified_one_on_one_chats", // deprecated 2025-07
"verified_one_on_one_chats",
self.get_config_bool(Config::VerifiedOneOnOneChats)
.await?
.to_string(),

View File

@@ -17,7 +17,7 @@ use anyhow::{Context as _, Result, bail, ensure, format_err};
use async_channel::{self, Receiver, Sender};
use async_imap::types::{Fetch, Flag, Name, NameAttribute, UnsolicitedResponse};
use deltachat_contact_tools::ContactAddress;
use futures::{FutureExt as _, TryStreamExt};
use futures::{FutureExt as _, StreamExt, TryStreamExt};
use futures_lite::FutureExt;
use num_traits::FromPrimitive;
use rand::Rng;
@@ -1384,15 +1384,14 @@ impl Session {
// Try to find a requested UID in returned FETCH responses.
while fetch_response.is_none() {
let Some(next_fetch_response) = fetch_responses
.try_next()
.await
.context("Failed to process IMAP FETCH result")?
else {
let Some(next_fetch_response) = fetch_responses.next().await else {
// No more FETCH responses received from the server.
break;
};
let next_fetch_response =
next_fetch_response.context("Failed to process IMAP FETCH result")?;
if let Some(next_uid) = next_fetch_response.uid {
if next_uid == request_uid {
fetch_response = Some(next_fetch_response);
@@ -1492,16 +1491,7 @@ impl Session {
// If we don't process the whole response, IMAP client is left in a broken state where
// it will try to process the rest of response as the next response.
//
// Make sure to not ignore the errors, because
// if connection times out, it will return
// infinite stream of `Some(Err(_))` results.
while fetch_responses
.try_next()
.await
.context("Failed to drain FETCH responses")?
.is_some()
{}
while fetch_responses.next().await.is_some() {}
if count != request_uids.len() {
warn!(
@@ -1698,7 +1688,7 @@ impl Session {
.uid_store(uid_set, &query)
.await
.with_context(|| format!("IMAP failed to store: ({uid_set}, {query})"))?;
while let Some(_response) = responses.try_next().await? {
while let Some(_response) = responses.next().await {
// Read all the responses
}
Ok(())

View File

@@ -216,8 +216,8 @@ impl Client {
let mut client = Client::new(session_stream);
let _greeting = client
.read_response()
.await?
.context("Failed to read greeting")?;
.await
.context("failed to read greeting")??;
Ok(client)
}
@@ -231,8 +231,8 @@ impl Client {
let mut client = Client::new(session_stream);
let _greeting = client
.read_response()
.await?
.context("Failed to read greeting")?;
.await
.context("failed to read greeting")??;
Ok(client)
}
@@ -253,8 +253,8 @@ impl Client {
let mut client = async_imap::Client::new(buffered_tcp_stream);
let _greeting = client
.read_response()
.await?
.context("Failed to read greeting")?;
.await
.context("failed to read greeting")??;
client
.run_command_and_check_ok("STARTTLS", None)
.await
@@ -287,8 +287,8 @@ impl Client {
let mut client = Client::new(session_stream);
let _greeting = client
.read_response()
.await?
.context("Failed to read greeting")?;
.await
.context("failed to read greeting")??;
Ok(client)
}
@@ -304,8 +304,8 @@ impl Client {
let mut client = Client::new(session_stream);
let _greeting = client
.read_response()
.await?
.context("Failed to read greeting")?;
.await
.context("failed to read greeting")??;
Ok(client)
}
@@ -325,8 +325,8 @@ impl Client {
let mut client = ImapClient::new(buffered_proxy_stream);
let _greeting = client
.read_response()
.await?
.context("Failed to read greeting")?;
.await
.context("failed to read greeting")??;
client
.run_command_and_check_ok("STARTTLS", None)
.await

View File

@@ -804,7 +804,7 @@ async fn export_database(
"UPDATE backup.config SET value='0' WHERE keyname='verified_one_on_one_chats';",
[],
)
.ok(); // Deprecated 2025-07. If verified_one_on_one_chats was not set, this errors, which we ignore
.ok(); // If verified_one_on_one_chats was not set, this errors, which we ignore
conn.execute("DETACH DATABASE backup", [])
.context("failed to detach backup database")?;
res?;

View File

@@ -16,12 +16,14 @@ use crate::sql::Sql;
use crate::tools::time;
pub(crate) mod dns;
pub(crate) mod error_capturing_stream;
pub(crate) mod http;
pub(crate) mod proxy;
pub(crate) mod session;
pub(crate) mod tls;
use dns::lookup_host_with_cache;
pub(crate) use error_capturing_stream::ErrorCapturingStream;
pub use http::{Response as HttpResponse, read_url, read_url_blob};
use tls::wrap_tls;
@@ -105,7 +107,7 @@ pub(crate) async fn load_connection_timestamp(
/// to the network, which is important to reduce the latency of interactive protocols such as IMAP.
pub(crate) async fn connect_tcp_inner(
addr: SocketAddr,
) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
) -> Result<Pin<Box<ErrorCapturingStream<TimeoutStream<TcpStream>>>>> {
let tcp_stream = timeout(TIMEOUT, TcpStream::connect(addr))
.await
.context("connection timeout")?
@@ -118,7 +120,9 @@ pub(crate) async fn connect_tcp_inner(
timeout_stream.set_write_timeout(Some(TIMEOUT));
timeout_stream.set_read_timeout(Some(TIMEOUT));
Ok(Box::pin(timeout_stream))
let error_capturing_stream = ErrorCapturingStream::new(timeout_stream);
Ok(Box::pin(error_capturing_stream))
}
/// Attempts to establish TLS connection
@@ -235,7 +239,7 @@ pub(crate) async fn connect_tcp(
host: &str,
port: u16,
load_cache: bool,
) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
) -> Result<Pin<Box<ErrorCapturingStream<TimeoutStream<TcpStream>>>>> {
let connection_futures = lookup_host_with_cache(context, host, port, "", load_cache)
.await?
.into_iter()

View File

@@ -0,0 +1,136 @@
use std::io::IoSlice;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::{self, AsyncRead, AsyncWrite, ReadBuf};
use pin_project::pin_project;
use crate::net::SessionStream;
/// Stream that remembers the first error
/// and keeps returning it afterwards.
///
/// It is needed to avoid accidentally using
/// the stream after read timeout.
#[derive(Debug)]
#[pin_project]
pub(crate) struct ErrorCapturingStream<T: AsyncRead + AsyncWrite + std::fmt::Debug> {
#[pin]
inner: T,
/// If true, the stream has already returned an error once.
///
/// All read and write operations return error in this case.
is_broken: bool,
}
impl<T: AsyncRead + AsyncWrite + std::fmt::Debug> ErrorCapturingStream<T> {
pub fn new(inner: T) -> Self {
Self {
inner,
is_broken: false,
}
}
/// Gets a reference to the underlying stream.
pub fn get_ref(&self) -> &T {
&self.inner
}
/// Gets a pinned mutable reference to the underlying stream.
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
self.project().inner
}
}
impl<T: AsyncRead + AsyncWrite + std::fmt::Debug> AsyncRead for ErrorCapturingStream<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf,
) -> Poll<io::Result<()>> {
let this = self.project();
if *this.is_broken {
return Poll::Ready(Err(io::Error::other("Broken stream")));
}
let res = this.inner.poll_read(cx, buf);
if let Poll::Ready(Err(_)) = res {
*this.is_broken = true;
}
res
}
}
impl<T: AsyncRead + AsyncWrite + std::fmt::Debug> AsyncWrite for ErrorCapturingStream<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let this = self.project();
if *this.is_broken {
return Poll::Ready(Err(io::Error::other("Broken stream")));
}
let res = this.inner.poll_write(cx, buf);
if let Poll::Ready(Err(_)) = res {
*this.is_broken = true;
}
res
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.project();
if *this.is_broken {
return Poll::Ready(Err(io::Error::other("Broken stream")));
}
let res = this.inner.poll_flush(cx);
if let Poll::Ready(Err(_)) = res {
*this.is_broken = true;
}
res
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.project();
if *this.is_broken {
return Poll::Ready(Err(io::Error::other("Broken stream")));
}
let res = this.inner.poll_shutdown(cx);
if let Poll::Ready(Err(_)) = res {
*this.is_broken = true;
}
res
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
let this = self.project();
if *this.is_broken {
return Poll::Ready(Err(io::Error::other("Broken stream")));
}
let res = this.inner.poll_write_vectored(cx, bufs);
if let Poll::Ready(Err(_)) = res {
*this.is_broken = true;
}
res
}
fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored()
}
}
impl<T: SessionStream> SessionStream for ErrorCapturingStream<T> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.inner.set_read_timeout(timeout)
}
fn peer_addr(&self) -> anyhow::Result<SocketAddr> {
self.inner.peer_addr()
}
}

View File

@@ -21,9 +21,9 @@ use url::Url;
use crate::config::Config;
use crate::constants::NON_ALPHANUMERIC_WITHOUT_DOT;
use crate::context::Context;
use crate::net::connect_tcp;
use crate::net::session::SessionStream;
use crate::net::tls::wrap_rustls;
use crate::net::{ErrorCapturingStream, connect_tcp};
use crate::sql::Sql;
/// Default SOCKS5 port according to [RFC 1928](https://tools.ietf.org/html/rfc1928).
@@ -118,7 +118,7 @@ impl Socks5Config {
target_host: &str,
target_port: u16,
load_dns_cache: bool,
) -> Result<Socks5Stream<Pin<Box<TimeoutStream<TcpStream>>>>> {
) -> Result<Socks5Stream<Pin<Box<ErrorCapturingStream<TimeoutStream<TcpStream>>>>>> {
let tcp_stream = connect_tcp(context, &self.host, self.port, load_dns_cache)
.await
.context("Failed to connect to SOCKS5 proxy")?;

View File

@@ -7,6 +7,8 @@ use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, BufStream, BufWriter};
use tokio::net::TcpStream;
use tokio_io_timeout::TimeoutStream;
use crate::net::ErrorCapturingStream;
pub(crate) trait SessionStream:
AsyncRead + AsyncWrite + Unpin + Send + Sync + std::fmt::Debug
{
@@ -61,13 +63,13 @@ impl<T: SessionStream> SessionStream for BufWriter<T> {
self.get_ref().peer_addr()
}
}
impl SessionStream for Pin<Box<TimeoutStream<TcpStream>>> {
impl SessionStream for Pin<Box<ErrorCapturingStream<TimeoutStream<TcpStream>>>> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.as_mut().set_read_timeout_pinned(timeout);
self.as_mut().get_pin_mut().set_read_timeout_pinned(timeout);
}
fn peer_addr(&self) -> Result<SocketAddr> {
Ok(self.get_ref().peer_addr()?)
Ok(self.get_ref().get_ref().peer_addr()?)
}
}
impl<T: SessionStream> SessionStream for Socks5Stream<T> {

View File

@@ -3642,7 +3642,7 @@ async fn mark_recipients_as_verified(
return Ok(());
}
for to_id in to_ids.iter().filter_map(|&x| x) {
if to_id == ContactId::SELF || to_id == from_id {
if to_id == ContactId::SELF {
continue;
}

View File

@@ -1505,28 +1505,19 @@ fn migrate_key_contacts(
};
let new_id = insert_contact(verified_key).context("Step 13")?;
verified_key_contacts.insert(original_id.try_into().context("Step 14")?, new_id);
let verifier_id = if addr_cmp(&verifier, &addr) {
// Earlier versions of Delta Chat signalled a direct verification
// by putting the contact's own address into the verifier column
1 // 1=ContactId::SELF
} else {
// If the original verifier is unknown, we represent this in the database
// by putting `new_id` into the place of the verifier,
// i.e. we say that this contact verified itself.
original_contact_id_from_addr(&verifier, new_id).context("Step 15")?
};
// If the original verifier is unknown, we represent this in the database
// by putting `new_id` into the place of the verifier,
// i.e. we say that this contact verified itself.
let verifier_id =
original_contact_id_from_addr(&verifier, new_id).context("Step 15")?;
verifications.insert(new_id, verifier_id);
let Some(secondary_verified_key) = secondary_verified_key else {
continue;
};
let new_id = insert_contact(secondary_verified_key).context("Step 16")?;
let verifier_id: u32 = if addr_cmp(&secondary_verifier, &addr) {
1 // 1=ContactId::SELF
} else {
original_contact_id_from_addr(&secondary_verifier, new_id).context("Step 17")?
};
let verifier_id: u32 =
original_contact_id_from_addr(&secondary_verifier, new_id).context("Step 17")?;
// Only use secondary verification if there is no primary verification:
verifications.entry(new_id).or_insert(verifier_id);
}
@@ -1651,7 +1642,7 @@ fn migrate_key_contacts(
.collect::<Result<Vec<_>, _>>()
.context("Step 26")?;
let mut keep_address_contacts = |reason: &str| -> Result<()> {
let mut keep_address_contacts = |reason: &str| {
info!(
context,
"Chat {chat_id} will be an unencrypted chat with contacts identified by email address: {reason}."
@@ -1659,15 +1650,6 @@ fn migrate_key_contacts(
for (m, _) in &old_members {
orphaned_contacts.remove(m);
}
// Unprotect this chat if it was protected.
//
// Otherwise we get protected chat with address-contact(s).
transaction
.execute("UPDATE chats SET protected=0 WHERE id=?", (chat_id,))
.context("Step 26.0")?;
Ok(())
};
let old_and_new_members: Vec<(u32, bool, Option<u32>)> = match typ {
// 1:1 chats retain:
@@ -1687,13 +1669,19 @@ fn migrate_key_contacts(
};
let Some(new_contact) = map_to_key_contact(old_member) else {
keep_address_contacts("No peerstate, or peerstate in 'reset' state")?;
keep_address_contacts("No peerstate, or peerstate in 'reset' state");
continue;
};
if !addr_cmp_stmt
.query_row((old_member, new_contact), |row| row.get::<_, bool>(0))?
{
keep_address_contacts("key contact has different email")?;
// Unprotect this 1:1 chat if it was protected.
//
// Otherwise we get protected chat with address-contact.
transaction
.execute("UPDATE chats SET protected=0 WHERE id=?", (chat_id,))?;
keep_address_contacts("key contact has different email");
continue;
}
vec![(*old_member, true, Some(new_contact))]
@@ -1704,7 +1692,7 @@ fn migrate_key_contacts(
if grpid.is_empty() {
// Ad-hoc group that has empty Chat-Group-ID
// because it was created in response to receiving a non-chat email.
keep_address_contacts("Empty chat-Group-ID")?;
keep_address_contacts("Empty chat-Group-ID");
continue;
} else if protected == 1 {
old_members
@@ -1723,7 +1711,7 @@ fn migrate_key_contacts(
// Mailinglist
140 => {
keep_address_contacts("Mailinglist")?;
keep_address_contacts("Mailinglist");
continue;
}
@@ -1762,7 +1750,7 @@ fn migrate_key_contacts(
transaction
.execute("UPDATE chats SET grpid='' WHERE id=?", (chat_id,))
.context("Step 26.1")?;
keep_address_contacts("Group contains contact without peerstate")?;
keep_address_contacts("Group contains contact without peerstate");
continue;
}