feat: synchronize transports via sync messages

This commit is contained in:
link2xt
2025-11-21 12:09:55 +00:00
committed by l
parent 3a7f82c66e
commit f7ae2abe52
8 changed files with 350 additions and 59 deletions

View File

@@ -1,5 +1,6 @@
import pytest import pytest
from deltachat_rpc_client import EventType
from deltachat_rpc_client.rpc import JsonRpcError from deltachat_rpc_client.rpc import JsonRpcError
@@ -156,3 +157,47 @@ def test_reconfigure_transport(acfactory) -> None:
# Reconfiguring the transport should not reset # Reconfiguring the transport should not reset
# the settings as if when configuring the first transport. # the settings as if when configuring the first transport.
assert account.get_config("mvbox_move") == "1" assert account.get_config("mvbox_move") == "1"
def test_transport_synchronization(acfactory, log) -> None:
"""Test synchronization of transports between devices."""
ac1, ac2 = acfactory.get_online_accounts(2)
ac1_clone = ac1.clone()
ac1_clone.bring_online()
qr = acfactory.get_account_qr()
ac1.add_transport_from_qr(qr)
ac1_clone.wait_for_event(EventType.TRANSPORTS_MODIFIED)
assert len(ac1.list_transports()) == 2
assert len(ac1_clone.list_transports()) == 2
ac1_clone.add_transport_from_qr(qr)
ac1.wait_for_event(EventType.TRANSPORTS_MODIFIED)
assert len(ac1.list_transports()) == 3
assert len(ac1_clone.list_transports()) == 3
log.section("ac1 clone removes second transport")
[transport1, transport2, transport3] = ac1_clone.list_transports()
addr3 = transport3["addr"]
ac1_clone.delete_transport(transport2["addr"])
ac1.wait_for_event(EventType.TRANSPORTS_MODIFIED)
[transport1, transport3] = ac1.list_transports()
log.section("ac1 changes the primary transport")
ac1.set_config("configured_addr", transport3["addr"])
log.section("ac1 removes the first transport")
ac1.delete_transport(transport1["addr"])
ac1_clone.wait_for_event(EventType.TRANSPORTS_MODIFIED)
[transport3] = ac1_clone.list_transports()
assert transport3["addr"] == addr3
assert ac1_clone.get_config("configured_addr") == addr3
ac2_chat = ac2.create_chat(ac1)
ac2_chat.send_text("Hello!")
assert ac1.wait_for_incoming_msg().get_snapshot().text == "Hello!"
assert ac1_clone.wait_for_incoming_msg().get_snapshot().text == "Hello!"

View File

@@ -819,10 +819,18 @@ impl Context {
self, self,
"Creating a pseudo configured account which will not be able to send or receive messages. Only meant for tests!" "Creating a pseudo configured account which will not be able to send or receive messages. Only meant for tests!"
); );
ConfiguredLoginParam::from_json(&format!( self.sql
r#"{{"addr":"{addr}","imap":[],"imap_user":"","imap_password":"","smtp":[],"smtp_user":"","smtp_password":"","certificate_checks":"Automatic","oauth2":false}}"# .execute(
))? "INSERT INTO transports (addr, entered_param, configured_param) VALUES (?, ?, ?)",
.save_to_transports_table(self, &EnteredLoginParam::default()) (
addr,
serde_json::to_string(&EnteredLoginParam::default())?,
format!(r#"{{"addr":"{addr}","imap":[],"imap_user":"","imap_password":"","smtp":[],"smtp_user":"","smtp_password":"","certificate_checks":"Automatic","oauth2":false}}"#)
),
)
.await?;
self.sql
.set_raw_config(Config::ConfiguredAddr.as_ref(), Some(addr))
.await?; .await?;
} }
self.sql self.sql

View File

@@ -40,7 +40,7 @@ use crate::sync::Sync::*;
use crate::tools::time; use crate::tools::time;
use crate::transport::{ use crate::transport::{
ConfiguredCertificateChecks, ConfiguredLoginParam, ConfiguredServerLoginParam, ConfiguredCertificateChecks, ConfiguredLoginParam, ConfiguredServerLoginParam,
ConnectionCandidate, ConnectionCandidate, send_sync_transports,
}; };
use crate::{EventType, stock_str}; use crate::{EventType, stock_str};
use crate::{chat, provider}; use crate::{chat, provider};
@@ -205,6 +205,7 @@ impl Context {
/// Removes the transport with the specified email address /// Removes the transport with the specified email address
/// (i.e. [EnteredLoginParam::addr]). /// (i.e. [EnteredLoginParam::addr]).
pub async fn delete_transport(&self, addr: &str) -> Result<()> { pub async fn delete_transport(&self, addr: &str) -> Result<()> {
let now = time();
self.sql self.sql
.transaction(|transaction| { .transaction(|transaction| {
let primary_addr = transaction.query_row( let primary_addr = transaction.query_row(
@@ -219,12 +220,13 @@ impl Context {
if primary_addr == addr { if primary_addr == addr {
bail!("Cannot delete primary transport"); bail!("Cannot delete primary transport");
} }
let transport_id = transaction.query_row( let (transport_id, add_timestamp) = transaction.query_row(
"DELETE FROM transports WHERE addr=? RETURNING id", "DELETE FROM transports WHERE addr=? RETURNING id, add_timestamp",
(addr,), (addr,),
|row| { |row| {
let id: u32 = row.get(0)?; let id: u32 = row.get(0)?;
Ok(id) let add_timestamp: i64 = row.get(1)?;
Ok((id, add_timestamp))
}, },
)?; )?;
transaction.execute("DELETE FROM imap WHERE transport_id=?", (transport_id,))?; transaction.execute("DELETE FROM imap WHERE transport_id=?", (transport_id,))?;
@@ -233,9 +235,23 @@ impl Context {
(transport_id,), (transport_id,),
)?; )?;
// Removal timestamp should not be lower than addition timestamp
// to be accepted by other devices when synced.
let remove_timestamp = std::cmp::max(now, add_timestamp);
transaction.execute(
"INSERT INTO removed_transports (addr, remove_timestamp)
VALUES (?, ?)
ON CONFLICT (addr)
DO UPDATE SET remove_timestamp = excluded.remove_timestamp",
(addr, remove_timestamp),
)?;
Ok(()) Ok(())
}) })
.await?; .await?;
send_sync_transports(self).await?;
Ok(()) Ok(())
} }
@@ -552,7 +568,8 @@ async fn configure(ctx: &Context, param: &EnteredLoginParam) -> Result<Option<&'
progress!(ctx, 900); progress!(ctx, 900);
if !ctx.is_configured().await? { let is_configured = ctx.is_configured().await?;
if !is_configured {
ctx.sql.set_raw_config("mvbox_move", Some("0")).await?; ctx.sql.set_raw_config("mvbox_move", Some("0")).await?;
ctx.sql.set_raw_config("only_fetch_mvbox", None).await?; ctx.sql.set_raw_config("only_fetch_mvbox", None).await?;
} }
@@ -563,8 +580,10 @@ async fn configure(ctx: &Context, param: &EnteredLoginParam) -> Result<Option<&'
let provider = configured_param.provider; let provider = configured_param.provider;
configured_param configured_param
.save_to_transports_table(ctx, param) .clone()
.save_to_transports_table(ctx, param, time())
.await?; .await?;
send_sync_transports(ctx).await?;
ctx.set_config_internal(Config::ConfiguredTimestamp, Some(&time().to_string())) ctx.set_config_internal(Config::ConfiguredTimestamp, Some(&time().to_string()))
.await?; .await?;

View File

@@ -827,6 +827,41 @@ pub(crate) async fn receive_imf_inner(
if let Some(ref sync_items) = mime_parser.sync_items { if let Some(ref sync_items) = mime_parser.sync_items {
if from_id == ContactId::SELF { if from_id == ContactId::SELF {
if mime_parser.was_encrypted() { if mime_parser.was_encrypted() {
// Receiving encrypted message from self updates primary transport.
let from_addr = &mime_parser.from.addr;
let transport_changed = context
.sql
.transaction(|transaction| {
let transport_exists = transaction.query_row(
"SELECT COUNT(*) FROM transports WHERE addr=?",
(from_addr,),
|row| {
let count: i64 = row.get(0)?;
Ok(count > 0)
},
)?;
let transport_changed = if transport_exists {
transaction.execute(
"UPDATE config SET value=? WHERE keyname='configured_addr'",
(from_addr,),
)? > 0
} else {
warn!(
context,
"Received sync message from unknown address {from_addr:?}."
);
false
};
Ok(transport_changed)
})
.await?;
if transport_changed {
info!(context, "Primary transport changed to {from_addr:?}.");
context.sql.uncache_raw_config("configured_addr").await;
}
context context
.execute_sync_items(sync_items, mime_parser.timestamp_sent) .execute_sync_items(sync_items, mime_parser.timestamp_sent)
.await; .await;

View File

@@ -1439,6 +1439,21 @@ CREATE INDEX imap_sync_index ON imap_sync(transport_id, folder);
.await?; .await?;
} }
inc_and_check(&mut migration_version, 142)?;
if dbversion < migration_version {
sql.execute_migration(
"ALTER TABLE transports
ADD COLUMN add_timestamp INTEGER NOT NULL DEFAULT 0;
CREATE TABLE removed_transports (
addr TEXT NOT NULL,
remove_timestamp INTEGER NOT NULL,
UNIQUE(addr)
) STRICT;",
migration_version,
)
.await?;
}
let new_version = sql let new_version = sql
.get_raw_config_int(VERSION_CFG) .get_raw_config_int(VERSION_CFG)
.await? .await?

View File

@@ -9,14 +9,15 @@ use crate::config::Config;
use crate::constants::Blocked; use crate::constants::Blocked;
use crate::contact::ContactId; use crate::contact::ContactId;
use crate::context::Context; use crate::context::Context;
use crate::log::LogExt; use crate::log::{LogExt as _, warn};
use crate::log::warn; use crate::login_param::EnteredLoginParam;
use crate::message::{Message, MsgId, Viewtype}; use crate::message::{Message, MsgId, Viewtype};
use crate::mimeparser::SystemMessage; use crate::mimeparser::SystemMessage;
use crate::param::Param; use crate::param::Param;
use crate::sync::SyncData::{AddQrToken, AlterChat, DeleteQrToken}; use crate::sync::SyncData::{AddQrToken, AlterChat, DeleteQrToken};
use crate::token::Namespace; use crate::token::Namespace;
use crate::tools::time; use crate::tools::time;
use crate::transport::{ConfiguredLoginParamJson, sync_transports};
use crate::{message, stock_str, token}; use crate::{message, stock_str, token};
use std::collections::HashSet; use std::collections::HashSet;
@@ -52,6 +53,29 @@ pub(crate) struct QrTokenData {
pub(crate) grpid: Option<String>, pub(crate) grpid: Option<String>,
} }
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct TransportData {
/// Configured login parameters.
pub(crate) configured: ConfiguredLoginParamJson,
/// Login parameters entered by the user.
///
/// They can be used to reconfigure the transport.
pub(crate) entered: EnteredLoginParam,
/// Timestamp of when the transport was last time (re)configured.
pub(crate) timestamp: i64,
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct RemovedTransportData {
/// Address of the removed transport.
pub(crate) addr: String,
/// Timestamp of when the transport was removed.
pub(crate) timestamp: i64,
}
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub(crate) enum SyncData { pub(crate) enum SyncData {
AddQrToken(QrTokenData), AddQrToken(QrTokenData),
@@ -71,6 +95,28 @@ pub(crate) enum SyncData {
DeleteMessages { DeleteMessages {
msgs: Vec<String>, // RFC724 id (i.e. "Message-Id" header) msgs: Vec<String>, // RFC724 id (i.e. "Message-Id" header)
}, },
/// Update transport configuration.
///
/// This message contains a list of all added transports
/// together with their addition timestamp,
/// and all removed transports together with
/// the removal timestamp.
///
/// In case of a tie, addition and removal timestamps
/// being the same, removal wins.
/// It is more likely that transport is added
/// and then removed within a second,
/// but unlikely the other way round
/// as adding new transport takes time
/// to run configuration.
Transports {
/// Active transports.
transports: Vec<TransportData>,
/// Removed transports with the timestamp of removal.
removed_transports: Vec<RemovedTransportData>,
},
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@@ -274,6 +320,10 @@ impl Context {
SyncData::Config { key, val } => self.sync_config(key, val).await, SyncData::Config { key, val } => self.sync_config(key, val).await,
SyncData::SaveMessage { src, dest } => self.save_message(src, dest).await, SyncData::SaveMessage { src, dest } => self.save_message(src, dest).await,
SyncData::DeleteMessages { msgs } => self.sync_message_deletion(msgs).await, SyncData::DeleteMessages { msgs } => self.sync_message_deletion(msgs).await,
SyncData::Transports {
transports,
removed_transports,
} => sync_transports(self, transports, removed_transports).await,
}, },
SyncDataOrUnknown::Unknown(data) => { SyncDataOrUnknown::Unknown(data) => {
warn!(self, "Ignored unknown sync item: {data}."); warn!(self, "Ignored unknown sync item: {data}.");

View File

@@ -600,7 +600,7 @@ impl TestContext {
self.ctx self.ctx
.set_config(Config::ConfiguredAddr, Some(addr)) .set_config(Config::ConfiguredAddr, Some(addr))
.await .await
.unwrap(); .expect("Failed to configure address");
if let Some(name) = addr.split('@').next() { if let Some(name) = addr.split('@').next() {
self.set_name(name); self.set_name(name);

View File

@@ -18,10 +18,12 @@ use crate::config::Config;
use crate::configure::server_params::{ServerParams, expand_param_vector}; use crate::configure::server_params::{ServerParams, expand_param_vector};
use crate::constants::{DC_LP_AUTH_FLAGS, DC_LP_AUTH_OAUTH2}; use crate::constants::{DC_LP_AUTH_FLAGS, DC_LP_AUTH_OAUTH2};
use crate::context::Context; use crate::context::Context;
use crate::events::EventType;
use crate::login_param::EnteredLoginParam; use crate::login_param::EnteredLoginParam;
use crate::net::load_connection_timestamp; use crate::net::load_connection_timestamp;
use crate::provider::{Protocol, Provider, Socket, UsernamePattern, get_provider_by_id}; use crate::provider::{Protocol, Provider, Socket, UsernamePattern, get_provider_by_id};
use crate::sql::Sql; use crate::sql::Sql;
use crate::sync::{RemovedTransportData, SyncData, TransportData};
#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) enum ConnectionSecurity { pub(crate) enum ConnectionSecurity {
@@ -190,10 +192,10 @@ pub(crate) struct ConfiguredLoginParam {
pub oauth2: bool, pub oauth2: bool,
} }
/// The representation of ConfiguredLoginParam in the database, /// JSON representation of ConfiguredLoginParam
/// saved as Json. /// for the database and sync messages.
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
struct ConfiguredLoginParamJson { pub(crate) struct ConfiguredLoginParamJson {
pub addr: String, pub addr: String,
pub imap: Vec<ConfiguredServerLoginParam>, pub imap: Vec<ConfiguredServerLoginParam>,
pub imap_user: String, pub imap_user: String,
@@ -557,35 +559,9 @@ impl ConfiguredLoginParam {
self, self,
context: &Context, context: &Context,
entered_param: &EnteredLoginParam, entered_param: &EnteredLoginParam,
timestamp: i64,
) -> Result<()> { ) -> Result<()> {
let addr = addr_normalize(&self.addr); save_transport(context, entered_param, &self.into(), timestamp).await?;
let provider_id = self.provider.map(|provider| provider.id);
let configured_addr = context.get_config(Config::ConfiguredAddr).await?;
context
.sql
.execute(
"INSERT INTO transports (addr, entered_param, configured_param)
VALUES (?, ?, ?)
ON CONFLICT (addr)
DO UPDATE SET entered_param=excluded.entered_param, configured_param=excluded.configured_param",
(
self.addr.clone(),
serde_json::to_string(entered_param)?,
self.into_json()?,
),
)
.await?;
if configured_addr.is_none() {
// If there is no transport yet, set the new transport as the primary one
context
.sql
.set_raw_config(Config::ConfiguredProvider.as_ref(), provider_id)
.await?;
context
.sql
.set_raw_config(Config::ConfiguredAddr.as_ref(), Some(&addr))
.await?;
}
Ok(()) Ok(())
} }
@@ -609,18 +585,7 @@ impl ConfiguredLoginParam {
} }
pub(crate) fn into_json(self) -> Result<String> { pub(crate) fn into_json(self) -> Result<String> {
let json = ConfiguredLoginParamJson { let json: ConfiguredLoginParamJson = self.into();
addr: self.addr,
imap: self.imap,
imap_user: self.imap_user,
imap_password: self.imap_password,
smtp: self.smtp,
smtp_user: self.smtp_user,
smtp_password: self.smtp_password,
provider_id: self.provider.map(|p| p.id.to_string()),
certificate_checks: self.certificate_checks,
oauth2: self.oauth2,
};
Ok(serde_json::to_string(&json)?) Ok(serde_json::to_string(&json)?)
} }
@@ -638,12 +603,166 @@ impl ConfiguredLoginParam {
} }
} }
impl From<ConfiguredLoginParam> for ConfiguredLoginParamJson {
fn from(configured_login_param: ConfiguredLoginParam) -> Self {
Self {
addr: configured_login_param.addr,
imap: configured_login_param.imap,
imap_user: configured_login_param.imap_user,
imap_password: configured_login_param.imap_password,
smtp: configured_login_param.smtp,
smtp_user: configured_login_param.smtp_user,
smtp_password: configured_login_param.smtp_password,
provider_id: configured_login_param.provider.map(|p| p.id.to_string()),
certificate_checks: configured_login_param.certificate_checks,
oauth2: configured_login_param.oauth2,
}
}
}
/// Saves transport to the database.
pub(crate) async fn save_transport(
context: &Context,
entered_param: &EnteredLoginParam,
configured: &ConfiguredLoginParamJson,
add_timestamp: i64,
) -> Result<()> {
let addr = addr_normalize(&configured.addr);
let configured_addr = context.get_config(Config::ConfiguredAddr).await?;
context
.sql
.execute(
"INSERT INTO transports (addr, entered_param, configured_param, add_timestamp)
VALUES (?, ?, ?, ?)
ON CONFLICT (addr)
DO UPDATE SET entered_param=excluded.entered_param,
configured_param=excluded.configured_param,
add_timestamp=excluded.add_timestamp",
(
&addr,
serde_json::to_string(entered_param)?,
serde_json::to_string(configured)?,
add_timestamp,
),
)
.await?;
if configured_addr.is_none() {
// If there is no transport yet, set the new transport as the primary one
context
.sql
.set_raw_config(Config::ConfiguredAddr.as_ref(), Some(&addr))
.await?;
}
Ok(())
}
/// Sends a sync message to synchronize transports across devices.
pub(crate) async fn send_sync_transports(context: &Context) -> Result<()> {
info!(context, "Sending transport synchronization message.");
// Synchronize all transport configurations.
//
// Transport with ID 1 is never synchronized
// because it can only be created during initial configuration.
// This also guarantees that credentials for the first
// transport are never sent in sync messages,
// so this is not worse than when not using multi-transport.
// If transport ID 1 is reconfigured,
// likely because the password has changed,
// user has to reconfigure it manually on all devices.
let transports = context
.sql
.query_map_vec(
"SELECT entered_param, configured_param, add_timestamp
FROM transports WHERE id>1",
(),
|row| {
let entered_json: String = row.get(0)?;
let entered: EnteredLoginParam = serde_json::from_str(&entered_json)?;
let configured_json: String = row.get(1)?;
let configured: ConfiguredLoginParamJson = serde_json::from_str(&configured_json)?;
let timestamp: i64 = row.get(2)?;
Ok(TransportData {
configured,
entered,
timestamp,
})
},
)
.await?;
let removed_transports = context
.sql
.query_map_vec(
"SELECT addr, remove_timestamp FROM removed_transports",
(),
|row| {
let addr: String = row.get(0)?;
let timestamp: i64 = row.get(1)?;
Ok(RemovedTransportData { addr, timestamp })
},
)
.await?;
context
.add_sync_item(SyncData::Transports {
transports,
removed_transports,
})
.await?;
context.scheduler.interrupt_inbox().await;
Ok(())
}
/// Process received data for transport synchronization.
pub(crate) async fn sync_transports(
context: &Context,
transports: &[TransportData],
removed_transports: &[RemovedTransportData],
) -> Result<()> {
for TransportData {
configured,
entered,
timestamp,
} in transports
{
save_transport(context, entered, configured, *timestamp).await?;
}
context
.sql
.transaction(|transaction| {
for RemovedTransportData { addr, timestamp } in removed_transports {
transaction.execute(
"DELETE FROM transports
WHERE addr=? AND add_timestamp<=?",
(addr, timestamp),
)?;
transaction.execute(
"INSERT INTO removed_transports (addr, remove_timestamp)
VALUES (?, ?)
ON CONFLICT (addr) DO
UPDATE SET remove_timestamp = excluded.remove_timestamp
WHERE excluded.remove_timestamp > remove_timestamp",
(addr, timestamp),
)?;
}
Ok(())
})
.await?;
context.emit_event(EventType::TransportsModified);
Ok(())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::log::LogExt as _; use crate::log::LogExt as _;
use crate::provider::get_provider_by_id; use crate::provider::get_provider_by_id;
use crate::test_utils::TestContext; use crate::test_utils::TestContext;
use crate::tools::time;
#[test] #[test]
fn test_configured_certificate_checks_display() { fn test_configured_certificate_checks_display() {
@@ -688,7 +807,7 @@ mod tests {
param param
.clone() .clone()
.save_to_transports_table(&t, &EnteredLoginParam::default()) .save_to_transports_table(&t, &EnteredLoginParam::default(), time())
.await?; .await?;
let expected_param = r#"{"addr":"alice@example.org","imap":[{"connection":{"host":"imap.example.com","port":123,"security":"Starttls"},"user":"alice"}],"imap_user":"","imap_password":"foo","smtp":[{"connection":{"host":"smtp.example.com","port":456,"security":"Tls"},"user":"alice@example.org"}],"smtp_user":"","smtp_password":"bar","provider_id":null,"certificate_checks":"Strict","oauth2":false}"#; let expected_param = r#"{"addr":"alice@example.org","imap":[{"connection":{"host":"imap.example.com","port":123,"security":"Starttls"},"user":"alice"}],"imap_user":"","imap_password":"foo","smtp":[{"connection":{"host":"smtp.example.com","port":456,"security":"Tls"},"user":"alice@example.org"}],"smtp_user":"","smtp_password":"bar","provider_id":null,"certificate_checks":"Strict","oauth2":false}"#;
assert_eq!( assert_eq!(
@@ -906,7 +1025,7 @@ mod tests {
certificate_checks: ConfiguredCertificateChecks::Automatic, certificate_checks: ConfiguredCertificateChecks::Automatic,
oauth2: false, oauth2: false,
} }
.save_to_transports_table(&t, &EnteredLoginParam::default()) .save_to_transports_table(&t, &EnteredLoginParam::default(), time())
.await?; .await?;
let (_transport_id, loaded) = ConfiguredLoginParam::load(&t).await?.unwrap(); let (_transport_id, loaded) = ConfiguredLoginParam::load(&t).await?.unwrap();