mirror of
https://github.com/chatmail/core.git
synced 2026-04-05 06:52:10 +03:00
Compare commits
4 Commits
v2.11.0
...
iequidoo/e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6e6b202c85 | ||
|
|
b696a242fc | ||
|
|
7e4822c8ca | ||
|
|
a955cb5400 |
22
src/chat.rs
22
src/chat.rs
@@ -1885,10 +1885,11 @@ impl Chat {
|
||||
pub async fn is_encrypted(&self, context: &Context) -> Result<bool> {
|
||||
let is_encrypted = self.is_protected()
|
||||
|| match self.typ {
|
||||
Chattype::Single => {
|
||||
match context
|
||||
Chattype::Group if !self.grpid.is_empty() => true,
|
||||
Chattype::Group | Chattype::Single => {
|
||||
let contacts = context
|
||||
.sql
|
||||
.query_row_optional(
|
||||
.query_map(
|
||||
"SELECT cc.contact_id, c.fingerprint<>''
|
||||
FROM chats_contacts cc LEFT JOIN contacts c
|
||||
ON c.id=cc.contact_id
|
||||
@@ -1900,16 +1901,13 @@ impl Chat {
|
||||
let is_key: bool = row.get(1)?;
|
||||
Ok((id, is_key))
|
||||
},
|
||||
|ids| ids.collect::<Result<Vec<_>, _>>().map_err(Into::into),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
Some((id, is_key)) => is_key || id == ContactId::DEVICE,
|
||||
None => true,
|
||||
}
|
||||
}
|
||||
Chattype::Group => {
|
||||
// Do not encrypt ad-hoc groups.
|
||||
!self.grpid.is_empty()
|
||||
.await?;
|
||||
(self.typ != Chattype::Group || contacts.iter().any(|&(_, is_key)| is_key))
|
||||
&& contacts.iter().all(|&(id, is_key)| {
|
||||
is_key || matches!(id, ContactId::SELF | ContactId::DEVICE)
|
||||
})
|
||||
}
|
||||
Chattype::Mailinglist => false,
|
||||
Chattype::OutBroadcast | Chattype::InBroadcast => true,
|
||||
|
||||
@@ -1977,8 +1977,9 @@ pub(crate) async fn mark_contact_id_as_verified(
|
||||
}
|
||||
}
|
||||
transaction.execute(
|
||||
"UPDATE contacts SET verifier=? WHERE id=?",
|
||||
(verifier_id, contact_id),
|
||||
"UPDATE contacts SET verifier=?1
|
||||
WHERE id=?2 AND (verifier=0 OR ?1=?3)",
|
||||
(verifier_id, contact_id, ContactId::SELF),
|
||||
)?;
|
||||
Ok(())
|
||||
})
|
||||
|
||||
@@ -227,9 +227,6 @@ pub(crate) async fn update_connect_timestamp(
|
||||
}
|
||||
|
||||
/// Preloaded DNS results that can be used in case of DNS server failures.
|
||||
///
|
||||
/// See <https://support.delta.chat/t/no-dns-resolution-result/2778> and
|
||||
/// <https://github.com/deltachat/deltachat-core-rust/issues/4920> for reasons.
|
||||
static DNS_PRELOAD: LazyLock<HashMap<&'static str, Vec<IpAddr>>> = LazyLock::new(|| {
|
||||
HashMap::from([
|
||||
(
|
||||
|
||||
@@ -114,7 +114,8 @@ enum ChatAssignment {
|
||||
|
||||
/// Group chat without a Group ID.
|
||||
///
|
||||
/// This is not encrypted.
|
||||
/// This is either encrypted or unencrypted. Encryption never downgrades, but it can upgrade if
|
||||
/// the last address-contact is removed from the group.
|
||||
AdHocGroup,
|
||||
|
||||
/// Assign the message to existing chat
|
||||
@@ -372,25 +373,52 @@ async fn get_to_and_past_contact_ids(
|
||||
}
|
||||
}
|
||||
ChatAssignment::AdHocGroup => {
|
||||
to_ids = add_or_lookup_contacts_by_address_list(
|
||||
context,
|
||||
&mime_parser.recipients,
|
||||
if !mime_parser.incoming {
|
||||
Origin::OutgoingTo
|
||||
} else if incoming_origin.is_known() {
|
||||
Origin::IncomingTo
|
||||
} else {
|
||||
Origin::IncomingUnknownTo
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let key_to_ids = match mime_parser.was_encrypted() {
|
||||
false => Vec::new(),
|
||||
true => {
|
||||
let ids = lookup_key_contacts_by_address_list(
|
||||
context,
|
||||
&mime_parser.recipients,
|
||||
to_member_fingerprints,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
match ids.contains(&None) {
|
||||
false => ids,
|
||||
true => Vec::new(),
|
||||
}
|
||||
}
|
||||
};
|
||||
if !key_to_ids.is_empty() {
|
||||
to_ids = key_to_ids;
|
||||
past_ids = lookup_key_contacts_by_address_list(
|
||||
context,
|
||||
&mime_parser.past_members,
|
||||
past_member_fingerprints,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
to_ids = add_or_lookup_contacts_by_address_list(
|
||||
context,
|
||||
&mime_parser.recipients,
|
||||
if !mime_parser.incoming {
|
||||
Origin::OutgoingTo
|
||||
} else if incoming_origin.is_known() {
|
||||
Origin::IncomingTo
|
||||
} else {
|
||||
Origin::IncomingUnknownTo
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
past_ids = add_or_lookup_contacts_by_address_list(
|
||||
context,
|
||||
&mime_parser.past_members,
|
||||
Origin::Hidden,
|
||||
)
|
||||
.await?;
|
||||
past_ids = add_or_lookup_contacts_by_address_list(
|
||||
context,
|
||||
&mime_parser.past_members,
|
||||
Origin::Hidden,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
ChatAssignment::OneOneChat => {
|
||||
let pgp_to_ids = add_or_lookup_key_contacts(
|
||||
@@ -2535,7 +2563,7 @@ async fn lookup_or_create_adhoc_group(
|
||||
.query_row(
|
||||
"SELECT c.id, c.blocked
|
||||
FROM chats c INNER JOIN msgs m ON c.id=m.chat_id
|
||||
WHERE m.hidden=0 AND c.grpid='' AND c.name=?
|
||||
WHERE m.hidden=0 AND (? OR c.grpid='') AND c.name=?
|
||||
AND (SELECT COUNT(*) FROM chats_contacts
|
||||
WHERE chat_id=c.id
|
||||
AND add_timestamp >= remove_timestamp)=?
|
||||
@@ -2544,7 +2572,7 @@ async fn lookup_or_create_adhoc_group(
|
||||
AND contact_id NOT IN (SELECT id FROM temp.contacts)
|
||||
AND add_timestamp >= remove_timestamp)=0
|
||||
ORDER BY m.timestamp DESC",
|
||||
(&grpname, contact_ids.len()),
|
||||
(mime_parser.was_encrypted(), &grpname, contact_ids.len()),
|
||||
|row| {
|
||||
let id: ChatId = row.get(0)?;
|
||||
let blocked: Blocked = row.get(1)?;
|
||||
|
||||
82
src/sql.rs
82
src/sql.rs
@@ -3,7 +3,7 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::{Context as _, Result, bail};
|
||||
use anyhow::{Context as _, Result, bail, ensure};
|
||||
use rusqlite::{Connection, OpenFlags, Row, config::DbConfig, types::ValueRef};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
@@ -13,7 +13,6 @@ use crate::config::Config;
|
||||
use crate::constants::DC_CHAT_ID_TRASH;
|
||||
use crate::context::Context;
|
||||
use crate::debug_logging::set_debug_logging_xdc;
|
||||
use crate::ensure_and_debug_assert;
|
||||
use crate::ephemeral::start_ephemeral_timers;
|
||||
use crate::imex::BLOBS_BACKUP_NAME;
|
||||
use crate::location::delete_orphaned_poi_locations;
|
||||
@@ -24,7 +23,7 @@ use crate::net::http::http_cache_cleanup;
|
||||
use crate::net::prune_connection_history;
|
||||
use crate::param::{Param, Params};
|
||||
use crate::stock_str;
|
||||
use crate::tools::{SystemTime, delete_file, time};
|
||||
use crate::tools::{SystemTime, Time, delete_file, time, time_elapsed};
|
||||
|
||||
/// Extension to [`rusqlite::ToSql`] trait
|
||||
/// which also includes [`Send`] and [`Sync`].
|
||||
@@ -180,7 +179,7 @@ impl Sql {
|
||||
|
||||
/// Creates a new connection pool.
|
||||
fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
|
||||
let mut connections = Vec::new();
|
||||
let mut connections = Vec::with_capacity(Self::N_DB_CONNECTIONS);
|
||||
for _ in 0..Self::N_DB_CONNECTIONS {
|
||||
let connection = new_connection(dbfile, &passphrase)?;
|
||||
connections.push(connection);
|
||||
@@ -642,28 +641,74 @@ impl Sql {
|
||||
}
|
||||
|
||||
/// Runs a checkpoint operation in TRUNCATE mode, so the WAL file is truncated to 0 bytes.
|
||||
pub(crate) async fn wal_checkpoint(&self) -> Result<()> {
|
||||
let lock = self.pool.read().await;
|
||||
let pool = lock.as_ref().context("No SQL connection pool")?;
|
||||
let mut conns = Vec::new();
|
||||
pub(crate) async fn wal_checkpoint(context: &Context) -> Result<()> {
|
||||
let t_start = Time::now();
|
||||
let lock = context.sql.pool.read().await;
|
||||
let Some(pool) = lock.as_ref() else {
|
||||
// No db connections, nothing to checkpoint.
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Do as much work as possible without blocking anybody.
|
||||
let query_only = true;
|
||||
let conn = pool.get(query_only).await?;
|
||||
tokio::task::block_in_place(|| {
|
||||
// Execute some transaction causing the WAL file to be opened so that the
|
||||
// `wal_checkpoint()` can proceed, otherwise it fails when called the first time,
|
||||
// see https://sqlite.org/forum/forumpost/7512d76a05268fc8.
|
||||
conn.query_row("PRAGMA table_list", [], |_| Ok(()))?;
|
||||
conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |_| Ok(()))
|
||||
})?;
|
||||
|
||||
// Kick out writers.
|
||||
const _: () = assert!(Sql::N_DB_CONNECTIONS > 1, "Deadlock possible");
|
||||
let _write_lock = pool.write_lock().await;
|
||||
let t_writers_blocked = Time::now();
|
||||
// Ensure that all readers use the most recent database snapshot (are at the end of WAL) so
|
||||
// that `wal_checkpoint(FULL)` isn't blocked. We could use `PASSIVE` as well, but it's
|
||||
// documented poorly, https://www.sqlite.org/pragma.html#pragma_wal_checkpoint and
|
||||
// https://www.sqlite.org/c3ref/wal_checkpoint_v2.html don't tell how it interacts with new
|
||||
// readers.
|
||||
let mut read_conns = Vec::with_capacity(Self::N_DB_CONNECTIONS - 1);
|
||||
for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
|
||||
read_conns.push(pool.get(query_only).await?);
|
||||
}
|
||||
read_conns.clear();
|
||||
// Checkpoint the remaining WAL pages without blocking readers.
|
||||
let (pages_total, pages_checkpointed) = tokio::task::block_in_place(|| {
|
||||
conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| {
|
||||
let pages_total: i64 = row.get(1)?;
|
||||
let pages_checkpointed: i64 = row.get(2)?;
|
||||
Ok((pages_total, pages_checkpointed))
|
||||
})
|
||||
})?;
|
||||
if pages_checkpointed < pages_total {
|
||||
warn!(
|
||||
context,
|
||||
"Cannot checkpoint whole WAL. Pages total: {pages_total}, checkpointed: {pages_checkpointed}. Make sure there are no external connections running transactions.",
|
||||
);
|
||||
}
|
||||
// Kick out readers to avoid blocking/SQLITE_BUSY.
|
||||
for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
|
||||
conns.push(pool.get(query_only).await?);
|
||||
read_conns.push(pool.get(query_only).await?);
|
||||
}
|
||||
let conn = pool.get(query_only).await?;
|
||||
tokio::task::block_in_place(move || {
|
||||
// Execute some transaction causing the WAL file to be opened so that the
|
||||
// `wal_checkpoint()` can proceed, otherwise it fails when called the first time, see
|
||||
// https://sqlite.org/forum/forumpost/7512d76a05268fc8.
|
||||
conn.query_row("PRAGMA table_list", [], |_row| Ok(()))?;
|
||||
let t_readers_blocked = Time::now();
|
||||
tokio::task::block_in_place(|| {
|
||||
let blocked = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| {
|
||||
let blocked: i64 = row.get(0)?;
|
||||
Ok(blocked)
|
||||
})?;
|
||||
ensure_and_debug_assert!(blocked == 0,);
|
||||
ensure!(blocked == 0);
|
||||
Ok(())
|
||||
})
|
||||
})?;
|
||||
info!(
|
||||
context,
|
||||
"wal_checkpoint: Total time: {:?}. Writers blocked for: {:?}. Readers blocked for: {:?}.",
|
||||
time_elapsed(&t_start),
|
||||
time_elapsed(&t_writers_blocked),
|
||||
time_elapsed(&t_readers_blocked),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -792,8 +837,9 @@ pub async fn housekeeping(context: &Context) -> Result<()> {
|
||||
// bigger than 200M) and also make sure we truncate the WAL periodically. Auto-checkponting does
|
||||
// not normally truncate the WAL (unless the `journal_size_limit` pragma is set), see
|
||||
// https://www.sqlite.org/wal.html.
|
||||
if let Err(err) = context.sql.wal_checkpoint().await {
|
||||
if let Err(err) = Sql::wal_checkpoint(context).await {
|
||||
warn!(context, "wal_checkpoint() failed: {err:#}.");
|
||||
debug_assert!(false);
|
||||
}
|
||||
|
||||
context
|
||||
|
||||
@@ -67,7 +67,7 @@ struct InnerPool {
|
||||
///
|
||||
/// This mutex is locked when write connection
|
||||
/// is outside the pool.
|
||||
write_mutex: Arc<Mutex<()>>,
|
||||
pub(crate) write_mutex: Arc<Mutex<()>>,
|
||||
}
|
||||
|
||||
impl InnerPool {
|
||||
@@ -96,13 +96,13 @@ impl InnerPool {
|
||||
.pop()
|
||||
.context("Got a permit when there are no connections in the pool")?
|
||||
};
|
||||
conn.pragma_update(None, "query_only", "1")?;
|
||||
let conn = PooledConnection {
|
||||
pool: Arc::downgrade(&self),
|
||||
conn: Some(conn),
|
||||
_permit: permit,
|
||||
_write_mutex_guard: None,
|
||||
};
|
||||
conn.pragma_update(None, "query_only", "1")?;
|
||||
Ok(conn)
|
||||
} else {
|
||||
// We get write guard first to avoid taking a permit
|
||||
@@ -119,13 +119,13 @@ impl InnerPool {
|
||||
"Got a permit and write lock when there are no connections in the pool",
|
||||
)?
|
||||
};
|
||||
conn.pragma_update(None, "query_only", "0")?;
|
||||
let conn = PooledConnection {
|
||||
pool: Arc::downgrade(&self),
|
||||
conn: Some(conn),
|
||||
_permit: permit,
|
||||
_write_mutex_guard: Some(write_mutex_guard),
|
||||
};
|
||||
conn.pragma_update(None, "query_only", "0")?;
|
||||
Ok(conn)
|
||||
}
|
||||
}
|
||||
@@ -195,4 +195,12 @@ impl Pool {
|
||||
pub async fn get(&self, query_only: bool) -> Result<PooledConnection> {
|
||||
Arc::clone(&self.inner).get(query_only).await
|
||||
}
|
||||
|
||||
/// Returns a mutex guard guaranteeing that there are no concurrent write connections.
|
||||
///
|
||||
/// NB: Make sure you're not holding all connections when calling this, otherwise it deadlocks
|
||||
/// if there is a concurrent writer waiting for available connection.
|
||||
pub(crate) async fn write_lock(&self) -> OwnedMutexGuard<()> {
|
||||
Arc::clone(&self.inner.write_mutex).lock_owned().await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -803,6 +803,73 @@ async fn test_verified_chat_editor_reordering() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Tests that already verified contact
|
||||
/// does not get a new "verifier"
|
||||
/// via gossip.
|
||||
///
|
||||
/// Directly verifying is still possible.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_no_reverification() -> Result<()> {
|
||||
let mut tcm = TestContextManager::new();
|
||||
let alice = &tcm.alice().await;
|
||||
let bob = &tcm.bob().await;
|
||||
let charlie = &tcm.charlie().await;
|
||||
let fiona = &tcm.fiona().await;
|
||||
|
||||
tcm.execute_securejoin(alice, bob).await;
|
||||
tcm.execute_securejoin(alice, charlie).await;
|
||||
tcm.execute_securejoin(alice, fiona).await;
|
||||
|
||||
tcm.section("Alice creates a protected group with Bob, Charlie and Fiona");
|
||||
let alice_chat_id = alice
|
||||
.create_group_with_members(ProtectionStatus::Protected, "Group", &[bob, charlie, fiona])
|
||||
.await;
|
||||
let alice_sent = alice.send_text(alice_chat_id, "Hi!").await;
|
||||
let bob_rcvd_msg = bob.recv_msg(&alice_sent).await;
|
||||
let bob_alice_id = bob_rcvd_msg.from_id;
|
||||
|
||||
// Charlie is verified by Alice for Bob.
|
||||
let bob_charlie_contact = bob.add_or_lookup_contact(charlie).await;
|
||||
assert_eq!(
|
||||
bob_charlie_contact
|
||||
.get_verifier_id(bob)
|
||||
.await?
|
||||
.unwrap()
|
||||
.unwrap(),
|
||||
bob_alice_id
|
||||
);
|
||||
|
||||
let fiona_rcvd_msg = fiona.recv_msg(&alice_sent).await;
|
||||
let fiona_chat_id = fiona_rcvd_msg.chat_id;
|
||||
let fiona_sent = fiona.send_text(fiona_chat_id, "Post by Fiona").await;
|
||||
bob.recv_msg(&fiona_sent).await;
|
||||
|
||||
// Charlie should still be verified by Alice, not by Fiona.
|
||||
let bob_charlie_contact = bob.add_or_lookup_contact(charlie).await;
|
||||
assert_eq!(
|
||||
bob_charlie_contact
|
||||
.get_verifier_id(bob)
|
||||
.await?
|
||||
.unwrap()
|
||||
.unwrap(),
|
||||
bob_alice_id
|
||||
);
|
||||
|
||||
// Bob can still verify Charlie directly.
|
||||
tcm.execute_securejoin(bob, charlie).await;
|
||||
let bob_charlie_contact = bob.add_or_lookup_contact(charlie).await;
|
||||
assert_eq!(
|
||||
bob_charlie_contact
|
||||
.get_verifier_id(bob)
|
||||
.await?
|
||||
.unwrap()
|
||||
.unwrap(),
|
||||
ContactId::SELF
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ============== Helper Functions ==============
|
||||
|
||||
async fn assert_verified(this: &TestContext, other: &TestContext, protected: ProtectionStatus) {
|
||||
|
||||
Reference in New Issue
Block a user