feat: allow adding second transport

This commit is contained in:
link2xt
2025-11-05 02:01:31 +00:00
committed by l
parent 57aadfbbf6
commit be3e202470
20 changed files with 582 additions and 213 deletions

View File

@@ -64,6 +64,7 @@ describe("online tests", function () {
await dc.rpc.setConfig(accountId1, "addr", account1.email); await dc.rpc.setConfig(accountId1, "addr", account1.email);
await dc.rpc.setConfig(accountId1, "mail_pw", account1.password); await dc.rpc.setConfig(accountId1, "mail_pw", account1.password);
await dc.rpc.configure(accountId1); await dc.rpc.configure(accountId1);
await waitForEvent(dc, "ImapInboxIdle", accountId1);
accountId2 = await dc.rpc.addAccount(); accountId2 = await dc.rpc.addAccount();
await dc.rpc.batchSetConfig(accountId2, { await dc.rpc.batchSetConfig(accountId2, {
@@ -71,6 +72,7 @@ describe("online tests", function () {
mail_pw: account2.password, mail_pw: account2.password,
}); });
await dc.rpc.configure(accountId2); await dc.rpc.configure(accountId2);
await waitForEvent(dc, "ImapInboxIdle", accountId2);
accountsConfigured = true; accountsConfigured = true;
}); });

View File

@@ -130,6 +130,10 @@ class Account:
"""Add a new transport using a QR code.""" """Add a new transport using a QR code."""
yield self._rpc.add_transport_from_qr.future(self.id, qr) yield self._rpc.add_transport_from_qr.future(self.id, qr)
def delete_transport(self, addr: str):
"""Delete a transport."""
self._rpc.delete_transport(self.id, addr)
@futuremethod @futuremethod
def list_transports(self): def list_transports(self):
"""Return the list of all email accounts that are used as a transport in the current profile.""" """Return the list of all email accounts that are used as a transport in the current profile."""

View File

@@ -40,12 +40,17 @@ class ACFactory:
username = "ci-" + "".join(random.choice("2345789acdefghjkmnpqrstuvwxyz") for i in range(6)) username = "ci-" + "".join(random.choice("2345789acdefghjkmnpqrstuvwxyz") for i in range(6))
return f"{username}@{domain}", f"{username}${username}" return f"{username}@{domain}", f"{username}${username}"
def get_account_qr(self):
"""Return "dcaccount:" QR code for testing chatmail relay."""
domain = os.getenv("CHATMAIL_DOMAIN")
return f"dcaccount:{domain}"
@futuremethod @futuremethod
def new_configured_account(self): def new_configured_account(self):
"""Create a new configured account.""" """Create a new configured account."""
account = self.get_unconfigured_account() account = self.get_unconfigured_account()
domain = os.getenv("CHATMAIL_DOMAIN") qr = self.get_account_qr()
yield account.add_transport_from_qr.future(f"dcaccount:{domain}") yield account.add_transport_from_qr.future(qr)
assert account.is_configured() assert account.is_configured()
return account return account
@@ -77,6 +82,7 @@ class ACFactory:
ac_clone = self.get_unconfigured_account() ac_clone = self.get_unconfigured_account()
for transport in transports: for transport in transports:
ac_clone.add_or_update_transport(transport) ac_clone.add_or_update_transport(transport)
ac_clone.bring_online()
return ac_clone return ac_clone
def get_accepted_chat(self, ac1: Account, ac2: Account) -> Chat: def get_accepted_chat(self, ac1: Account, ac2: Account) -> Chat:

View File

@@ -143,7 +143,7 @@ def test_delete_deltachat_folder(acfactory, direct_imap):
# Wait until new folder is created and UIDVALIDITY is updated. # Wait until new folder is created and UIDVALIDITY is updated.
while True: while True:
event = ac1.wait_for_event() event = ac1.wait_for_event()
if event.kind == EventType.INFO and "uid/validity change folder DeltaChat" in event.msg: if event.kind == EventType.INFO and "transport 1: UID validity for folder DeltaChat changed from " in event.msg:
break break
ac2 = acfactory.get_online_account() ac2 = acfactory.get_online_account()

View File

@@ -0,0 +1,158 @@
import pytest
from deltachat_rpc_client.rpc import JsonRpcError
def test_add_second_address(acfactory) -> None:
account = acfactory.new_configured_account()
assert len(account.list_transports()) == 1
# When the first transport is created,
# mvbox_move and only_fetch_mvbox should be disabled.
assert account.get_config("mvbox_move") == "0"
assert account.get_config("only_fetch_mvbox") == "0"
assert account.get_config("show_emails") == "2"
qr = acfactory.get_account_qr()
account.add_transport_from_qr(qr)
assert len(account.list_transports()) == 2
account.add_transport_from_qr(qr)
assert len(account.list_transports()) == 3
first_addr = account.list_transports()[0]["addr"]
second_addr = account.list_transports()[1]["addr"]
# Cannot delete the first address.
with pytest.raises(JsonRpcError):
account.delete_transport(first_addr)
account.delete_transport(second_addr)
assert len(account.list_transports()) == 2
# Enabling mvbox_move or only_fetch_mvbox
# is not allowed when multi-transport is enabled.
for option in ["mvbox_move", "only_fetch_mvbox"]:
with pytest.raises(JsonRpcError):
account.set_config(option, "1")
with pytest.raises(JsonRpcError):
account.set_config("show_emails", "0")
@pytest.mark.parametrize("key", ["mvbox_move", "only_fetch_mvbox"])
def test_no_second_transport_with_mvbox(acfactory, key) -> None:
"""Test that second transport cannot be configured if mvbox is used."""
account = acfactory.new_configured_account()
assert len(account.list_transports()) == 1
assert account.get_config("mvbox_move") == "0"
assert account.get_config("only_fetch_mvbox") == "0"
qr = acfactory.get_account_qr()
account.set_config(key, "1")
with pytest.raises(JsonRpcError):
account.add_transport_from_qr(qr)
def test_no_second_transport_without_classic_emails(acfactory) -> None:
"""Test that second transport cannot be configured if classic emails are not fetched."""
account = acfactory.new_configured_account()
assert len(account.list_transports()) == 1
assert account.get_config("show_emails") == "2"
qr = acfactory.get_account_qr()
account.set_config("show_emails", "0")
with pytest.raises(JsonRpcError):
account.add_transport_from_qr(qr)
def test_change_address(acfactory) -> None:
"""Test Alice configuring a second transport and setting it as a primary one."""
alice, bob = acfactory.get_online_accounts(2)
bob_addr = bob.get_config("configured_addr")
bob.create_chat(alice)
alice_chat_bob = alice.create_chat(bob)
alice_chat_bob.send_text("Hello!")
msg1 = bob.wait_for_incoming_msg().get_snapshot()
sender_addr1 = msg1.sender.get_snapshot().address
alice.stop_io()
old_alice_addr = alice.get_config("configured_addr")
alice_vcard = alice.self_contact.make_vcard()
assert old_alice_addr in alice_vcard
qr = acfactory.get_account_qr()
alice.add_transport_from_qr(qr)
new_alice_addr = alice.list_transports()[1]["addr"]
with pytest.raises(JsonRpcError):
# Cannot use the address that is not
# configured for any transport.
alice.set_config("configured_addr", bob_addr)
# Load old address so it is cached.
assert alice.get_config("configured_addr") == old_alice_addr
alice.set_config("configured_addr", new_alice_addr)
# Make sure that setting `configured_addr` invalidated the cache.
assert alice.get_config("configured_addr") == new_alice_addr
alice_vcard = alice.self_contact.make_vcard()
assert old_alice_addr not in alice_vcard
assert new_alice_addr in alice_vcard
with pytest.raises(JsonRpcError):
alice.delete_transport(new_alice_addr)
alice.start_io()
alice_chat_bob.send_text("Hello again!")
msg2 = bob.wait_for_incoming_msg().get_snapshot()
sender_addr2 = msg2.sender.get_snapshot().address
assert msg1.sender == msg2.sender
assert sender_addr1 != sender_addr2
assert sender_addr1 == old_alice_addr
assert sender_addr2 == new_alice_addr
@pytest.mark.parametrize("is_chatmail", ["0", "1"])
def test_mvbox_move_first_transport(acfactory, is_chatmail) -> None:
"""Test that mvbox_move is disabled by default even for non-chatmail accounts.
Disabling mvbox_move is required to be able to setup a second transport.
"""
account = acfactory.get_unconfigured_account()
account.set_config("fix_is_chatmail", "1")
account.set_config("is_chatmail", is_chatmail)
# The default value when the setting is unset is "1".
# This is not changed for compatibility with old databases
# imported from backups.
assert account.get_config("mvbox_move") == "1"
qr = acfactory.get_account_qr()
account.add_transport_from_qr(qr)
# Once the first transport is set up,
# mvbox_move is disabled.
assert account.get_config("mvbox_move") == "0"
assert account.get_config("is_chatmail") == is_chatmail
def test_reconfigure_transport(acfactory) -> None:
"""Test that reconfiguring the transport works
even if settings not supported for multi-transport
like mvbox_move are enabled."""
account = acfactory.get_online_account()
account.set_config("mvbox_move", "1")
[transport] = account.list_transports()
account.add_or_update_transport(transport)
# Reconfiguring the transport should not reset
# the settings as if when configuring the first transport.
assert account.get_config("mvbox_move") == "1"

View File

@@ -467,7 +467,7 @@ def test_bot(acfactory) -> None:
def test_wait_next_messages(acfactory) -> None: def test_wait_next_messages(acfactory) -> None:
alice = acfactory.new_configured_account() alice = acfactory.get_online_account()
# Create a bot account so it does not receive device messages in the beginning. # Create a bot account so it does not receive device messages in the beginning.
addr, password = acfactory.get_credentials() addr, password = acfactory.get_credentials()
@@ -475,6 +475,7 @@ def test_wait_next_messages(acfactory) -> None:
bot.set_config("bot", "1") bot.set_config("bot", "1")
bot.add_or_update_transport({"addr": addr, "password": password}) bot.add_or_update_transport({"addr": addr, "password": password})
assert bot.is_configured() assert bot.is_configured()
bot.bring_online()
# There are no old messages and the call returns immediately. # There are no old messages and the call returns immediately.
assert not bot.wait_next_messages() assert not bot.wait_next_messages()

View File

@@ -14,6 +14,7 @@ def datadir():
return None return None
@pytest.mark.skip("The test is flaky in CI and crashes the interpreter as of 2025-11-12")
def test_echo_quit_plugin(acfactory, lp): def test_echo_quit_plugin(acfactory, lp):
lp.sec("creating one echo_and_quit bot") lp.sec("creating one echo_and_quit bot")
botproc = acfactory.run_bot_process(echo_and_quit) botproc = acfactory.run_bot_process(echo_and_quit)

View File

@@ -477,7 +477,10 @@ impl Config {
/// Whether the config option needs an IO scheduler restart to take effect. /// Whether the config option needs an IO scheduler restart to take effect.
pub(crate) fn needs_io_restart(&self) -> bool { pub(crate) fn needs_io_restart(&self) -> bool {
matches!(self, Config::MvboxMove | Config::OnlyFetchMvbox) matches!(
self,
Config::MvboxMove | Config::OnlyFetchMvbox | Config::ConfiguredAddr
)
} }
} }
@@ -713,6 +716,16 @@ impl Context {
pub async fn set_config(&self, key: Config, value: Option<&str>) -> Result<()> { pub async fn set_config(&self, key: Config, value: Option<&str>) -> Result<()> {
Self::check_config(key, value)?; Self::check_config(key, value)?;
let n_transports = self.count_transports().await?;
if n_transports > 1
&& matches!(
key,
Config::MvboxMove | Config::OnlyFetchMvbox | Config::ShowEmails
)
{
bail!("Cannot reconfigure {key} when multiple transports are configured");
}
let _pause = match key.needs_io_restart() { let _pause = match key.needs_io_restart() {
true => self.scheduler.pause(self).await?, true => self.scheduler.pause(self).await?,
_ => Default::default(), _ => Default::default(),
@@ -798,10 +811,11 @@ impl Context {
.await?; .await?;
} }
Config::ConfiguredAddr => { Config::ConfiguredAddr => {
if self.is_configured().await? { let Some(addr) = value else {
bail!("Cannot change ConfiguredAddr"); bail!("Cannot unset configured_addr");
} };
if let Some(addr) = value {
if !self.is_configured().await? {
info!( info!(
self, self,
"Creating a pseudo configured account which will not be able to send or receive messages. Only meant for tests!" "Creating a pseudo configured account which will not be able to send or receive messages. Only meant for tests!"
@@ -812,6 +826,36 @@ impl Context {
.save_to_transports_table(self, &EnteredLoginParam::default()) .save_to_transports_table(self, &EnteredLoginParam::default())
.await?; .await?;
} }
self.sql
.transaction(|transaction| {
if transaction.query_row(
"SELECT COUNT(*) FROM transports WHERE addr=?",
(addr,),
|row| {
let res: i64 = row.get(0)?;
Ok(res)
},
)? == 0
{
bail!("Address does not belong to any transport.");
}
transaction.execute(
"UPDATE config SET value=? WHERE keyname='configured_addr'",
(addr,),
)?;
// Clean up SMTP and IMAP APPEND queue.
//
// The messages in the queue have a different
// From address so we cannot send them over
// the new SMTP transport.
transaction.execute("DELETE FROM smtp", ())?;
transaction.execute("DELETE FROM imap_send", ())?;
Ok(())
})
.await?;
self.sql.uncache_raw_config("configured_addr").await;
} }
_ => { _ => {
self.sql.set_raw_config(key.as_ref(), value).await?; self.sql.set_raw_config(key.as_ref(), value).await?;

View File

@@ -130,12 +130,6 @@ impl Context {
"cannot configure, database not opened." "cannot configure, database not opened."
); );
param.addr = addr_normalize(&param.addr); param.addr = addr_normalize(&param.addr);
let old_addr = self.get_config(Config::ConfiguredAddr).await?;
if self.is_configured().await? && !addr_cmp(&old_addr.unwrap_or_default(), &param.addr) {
let error_msg = "Changing your email address is not supported right now. Check back in a few months!";
progress!(self, 0, Some(error_msg.to_string()));
bail!(error_msg);
}
let cancel_channel = self.alloc_ongoing().await?; let cancel_channel = self.alloc_ongoing().await?;
let res = self let res = self
@@ -204,19 +198,72 @@ impl Context {
Ok(transports) Ok(transports)
} }
/// Returns the number of configured transports.
pub async fn count_transports(&self) -> Result<usize> {
self.sql.count("SELECT COUNT(*) FROM transports", ()).await
}
/// Removes the transport with the specified email address /// Removes the transport with the specified email address
/// (i.e. [EnteredLoginParam::addr]). /// (i.e. [EnteredLoginParam::addr]).
#[expect(clippy::unused_async)] pub async fn delete_transport(&self, addr: &str) -> Result<()> {
pub async fn delete_transport(&self, _addr: &str) -> Result<()> { self.sql
bail!( .transaction(|transaction| {
"Adding and removing additional transports is not supported yet. Check back in a few months!" let primary_addr = transaction.query_row(
) "SELECT value FROM config WHERE keyname='configured_addr'",
(),
|row| {
let addr: String = row.get(0)?;
Ok(addr)
},
)?;
if primary_addr == addr {
bail!("Cannot delete primary transport");
}
let transport_id = transaction.query_row(
"DELETE FROM transports WHERE addr=? RETURNING id",
(addr,),
|row| {
let id: u32 = row.get(0)?;
Ok(id)
},
)?;
transaction.execute("DELETE FROM imap WHERE transport_id=?", (transport_id,))?;
transaction.execute(
"DELETE FROM imap_sync WHERE transport_id=?",
(transport_id,),
)?;
Ok(())
})
.await?;
Ok(())
} }
async fn inner_configure(&self, param: &EnteredLoginParam) -> Result<()> { async fn inner_configure(&self, param: &EnteredLoginParam) -> Result<()> {
info!(self, "Configure ..."); info!(self, "Configure ...");
let old_addr = self.get_config(Config::ConfiguredAddr).await?; let old_addr = self.get_config(Config::ConfiguredAddr).await?;
if old_addr.is_some()
&& !self
.sql
.exists(
"SELECT COUNT(*) FROM transports WHERE addr=?",
(&param.addr,),
)
.await?
{
if self.get_config(Config::MvboxMove).await?.as_deref() != Some("0") {
bail!("Cannot use multi-transport with mvbox_move enabled.");
}
if self.get_config(Config::OnlyFetchMvbox).await?.as_deref() != Some("0") {
bail!("Cannot use multi-transport with only_fetch_mvbox enabled.");
}
if self.get_config(Config::ShowEmails).await?.as_deref() != Some("2") {
bail!("Cannot use multi-transport with disabled fetching of classic emails.");
}
}
let provider = configure(self, param).await?; let provider = configure(self, param).await?;
self.set_config_internal(Config::NotifyAboutWrongPw, Some("1")) self.set_config_internal(Config::NotifyAboutWrongPw, Some("1"))
.await?; .await?;
@@ -503,16 +550,9 @@ async fn configure(ctx: &Context, param: &EnteredLoginParam) -> Result<Option<&'
// Configure IMAP // Configure IMAP
let transport_id = 0;
let (_s, r) = async_channel::bounded(1); let (_s, r) = async_channel::bounded(1);
let mut imap = Imap::new( let mut imap = Imap::new(ctx, transport_id, configured_param.clone(), r).await?;
configured_param.imap.clone(),
configured_param.imap_password.clone(),
proxy_config,
&configured_param.addr,
strict_tls,
configured_param.oauth2,
r,
);
let configuring = true; let configuring = true;
let mut imap_session = match imap.connect(ctx, configuring).await { let mut imap_session = match imap.connect(ctx, configuring).await {
Ok(session) => session, Ok(session) => session,
@@ -529,37 +569,15 @@ async fn configure(ctx: &Context, param: &EnteredLoginParam) -> Result<Option<&'
progress!(ctx, 900); progress!(ctx, 900);
let is_chatmail = match ctx.get_config_bool(Config::FixIsChatmail).await? { if !ctx.is_configured().await? {
false => { ctx.sql.set_raw_config("mvbox_move", Some("0")).await?;
let is_chatmail = imap_session.is_chatmail(); ctx.sql.set_raw_config("only_fetch_mvbox", None).await?;
ctx.set_config(
Config::IsChatmail,
Some(match is_chatmail {
false => "0",
true => "1",
}),
)
.await?;
is_chatmail
}
true => ctx.get_config_bool(Config::IsChatmail).await?,
};
if is_chatmail {
ctx.set_config(Config::MvboxMove, Some("0")).await?;
ctx.set_config(Config::OnlyFetchMvbox, None).await?;
ctx.set_config(Config::ShowEmails, None).await?;
} }
let create_mvbox = !is_chatmail; let create_mvbox = false;
imap.configure_folders(ctx, &mut imap_session, create_mvbox) imap.configure_folders(ctx, &mut imap_session, create_mvbox)
.await?; .await?;
let create = true;
imap_session
.select_with_uidvalidity(ctx, "INBOX", create)
.await
.context("could not read INBOX status")?;
drop(imap); drop(imap);
progress!(ctx, 910); progress!(ctx, 910);

View File

@@ -807,9 +807,10 @@ impl Context {
/// Returns information about the context as key-value pairs. /// Returns information about the context as key-value pairs.
pub async fn get_info(&self) -> Result<BTreeMap<&'static str, String>> { pub async fn get_info(&self) -> Result<BTreeMap<&'static str, String>> {
let l = EnteredLoginParam::load(self).await?; let l = EnteredLoginParam::load(self).await?;
let l2 = ConfiguredLoginParam::load(self) let l2 = ConfiguredLoginParam::load(self).await?.map_or_else(
.await? || "Not configured".to_string(),
.map_or_else(|| "Not configured".to_string(), |param| param.to_string()); |(_transport_id, param)| param.to_string(),
);
let secondary_addrs = self.get_secondary_self_addrs().await?.join(", "); let secondary_addrs = self.get_secondary_self_addrs().await?.join(", ");
let chats = get_chat_cnt(self).await?; let chats = get_chat_cnt(self).await?;
let unblocked_msgs = message::get_unblocked_msg_cnt(self).await; let unblocked_msgs = message::get_unblocked_msg_cnt(self).await;

View File

@@ -451,6 +451,8 @@ async fn test_delete_expired_imap_messages() -> Result<()> {
let t = TestContext::new_alice().await; let t = TestContext::new_alice().await;
const HOUR: i64 = 60 * 60; const HOUR: i64 = 60 * 60;
let now = time(); let now = time();
let transport_id = 1;
let uidvalidity = 12345;
for (id, timestamp, ephemeral_timestamp) in &[ for (id, timestamp, ephemeral_timestamp) in &[
(900, now - 2 * HOUR, 0), (900, now - 2 * HOUR, 0),
(1000, now - 23 * HOUR - MIN_DELETE_SERVER_AFTER, 0), (1000, now - 23 * HOUR - MIN_DELETE_SERVER_AFTER, 0),
@@ -470,8 +472,8 @@ async fn test_delete_expired_imap_messages() -> Result<()> {
.await?; .await?;
t.sql t.sql
.execute( .execute(
"INSERT INTO imap (rfc724_mid, folder, uid, target) VALUES (?,'INBOX',?, 'INBOX');", "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, target, uidvalidity) VALUES (?, ?,'INBOX',?, 'INBOX', ?);",
(&message_id, id), (transport_id, &message_id, id, uidvalidity),
) )
.await?; .await?;
} }

View File

@@ -71,6 +71,11 @@ const BODY_PARTIAL: &str = "(FLAGS RFC822.SIZE BODY.PEEK[HEADER])";
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Imap { pub(crate) struct Imap {
/// ID of the transport configuration in the `transports` table.
///
/// This ID is used to namespace records in the `imap` table.
transport_id: u32,
pub(crate) idle_interrupt_receiver: Receiver<()>, pub(crate) idle_interrupt_receiver: Receiver<()>,
/// Email address. /// Email address.
@@ -249,19 +254,21 @@ impl<T: Iterator<Item = (i64, u32, String)>> Iterator for UidGrouper<T> {
impl Imap { impl Imap {
/// Creates new disconnected IMAP client using the specific login parameters. /// Creates new disconnected IMAP client using the specific login parameters.
/// pub async fn new(
/// `addr` is used to renew token if OAuth2 authentication is used. context: &Context,
pub fn new( transport_id: u32,
lp: Vec<ConfiguredServerLoginParam>, param: ConfiguredLoginParam,
password: String,
proxy_config: Option<ProxyConfig>,
addr: &str,
strict_tls: bool,
oauth2: bool,
idle_interrupt_receiver: Receiver<()>, idle_interrupt_receiver: Receiver<()>,
) -> Self { ) -> Result<Self> {
let lp = param.imap.clone();
let password = param.imap_password.clone();
let proxy_config = ProxyConfig::load(context).await?;
let addr = &param.addr;
let strict_tls = param.strict_tls(proxy_config.is_some());
let oauth2 = param.oauth2;
let (resync_request_sender, resync_request_receiver) = async_channel::bounded(1); let (resync_request_sender, resync_request_receiver) = async_channel::bounded(1);
Imap { Ok(Imap {
transport_id,
idle_interrupt_receiver, idle_interrupt_receiver,
addr: addr.to_string(), addr: addr.to_string(),
lp, lp,
@@ -277,7 +284,7 @@ impl Imap {
ratelimit: Ratelimit::new(Duration::new(120, 0), 2.0), ratelimit: Ratelimit::new(Duration::new(120, 0), 2.0),
resync_request_sender, resync_request_sender,
resync_request_receiver, resync_request_receiver,
} })
} }
/// Creates new disconnected IMAP client using configured parameters. /// Creates new disconnected IMAP client using configured parameters.
@@ -285,20 +292,10 @@ impl Imap {
context: &Context, context: &Context,
idle_interrupt_receiver: Receiver<()>, idle_interrupt_receiver: Receiver<()>,
) -> Result<Self> { ) -> Result<Self> {
let param = ConfiguredLoginParam::load(context) let (transport_id, param) = ConfiguredLoginParam::load(context)
.await? .await?
.context("Not configured")?; .context("Not configured")?;
let proxy_config = ProxyConfig::load(context).await?; let imap = Self::new(context, transport_id, param, idle_interrupt_receiver).await?;
let strict_tls = param.strict_tls(proxy_config.is_some());
let imap = Self::new(
param.imap.clone(),
param.imap_password.clone(),
proxy_config,
&param.addr,
strict_tls,
param.oauth2,
idle_interrupt_receiver,
);
Ok(imap) Ok(imap)
} }
@@ -412,9 +409,19 @@ impl Imap {
}) })
.await .await
.context("Failed to enable IMAP compression")?; .context("Failed to enable IMAP compression")?;
Session::new(compressed_session, capabilities, resync_request_sender) Session::new(
compressed_session,
capabilities,
resync_request_sender,
self.transport_id,
)
} else { } else {
Session::new(session, capabilities, resync_request_sender) Session::new(
session,
capabilities,
resync_request_sender,
self.transport_id,
)
}; };
// Store server ID in the context to display in account info. // Store server ID in the context to display in account info.
@@ -593,8 +600,9 @@ impl Imap {
folder: &str, folder: &str,
folder_meaning: FolderMeaning, folder_meaning: FolderMeaning,
) -> Result<(usize, bool)> { ) -> Result<(usize, bool)> {
let uid_validity = get_uidvalidity(context, folder).await?; let transport_id = self.transport_id;
let old_uid_next = get_uid_next(context, folder).await?; let uid_validity = get_uidvalidity(context, transport_id, folder).await?;
let old_uid_next = get_uid_next(context, transport_id, folder).await?;
info!( info!(
context, context,
"fetch_new_msg_batch({folder}): UIDVALIDITY={uid_validity}, UIDNEXT={old_uid_next}." "fetch_new_msg_batch({folder}): UIDVALIDITY={uid_validity}, UIDNEXT={old_uid_next}."
@@ -662,12 +670,19 @@ impl Imap {
context context
.sql .sql
.execute( .execute(
"INSERT INTO imap (rfc724_mid, folder, uid, uidvalidity, target) "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
VALUES (?1, ?2, ?3, ?4, ?5) VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(folder, uid, uidvalidity) ON CONFLICT(transport_id, folder, uid, uidvalidity)
DO UPDATE SET rfc724_mid=excluded.rfc724_mid, DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
target=excluded.target", target=excluded.target",
(&message_id, &folder, uid, uid_validity, target), (
self.transport_id,
&message_id,
&folder,
uid,
uid_validity,
target,
),
) )
.await?; .await?;
@@ -778,7 +793,7 @@ impl Imap {
prefetch_uid_next < mailbox_uid_next prefetch_uid_next < mailbox_uid_next
}; };
if new_uid_next > old_uid_next { if new_uid_next > old_uid_next {
set_uid_next(context, folder, new_uid_next).await?; set_uid_next(context, self.transport_id, folder, new_uid_next).await?;
} }
info!(context, "{} mails read from \"{}\".", read_cnt, folder); info!(context, "{} mails read from \"{}\".", read_cnt, folder);
@@ -858,6 +873,7 @@ impl Session {
let folder_exists = self let folder_exists = self
.select_with_uidvalidity(context, folder, create) .select_with_uidvalidity(context, folder, create)
.await?; .await?;
let transport_id = self.transport_id();
if folder_exists { if folder_exists {
let mut list = self let mut list = self
.uid_fetch("1:*", RFC724MID_UID) .uid_fetch("1:*", RFC724MID_UID)
@@ -890,7 +906,7 @@ impl Session {
msgs.len(), msgs.len(),
); );
uid_validity = get_uidvalidity(context, folder).await?; uid_validity = get_uidvalidity(context, transport_id, folder).await?;
} else { } else {
warn!(context, "resync_folder_uids: No folder {folder}."); warn!(context, "resync_folder_uids: No folder {folder}.");
uid_validity = 0; uid_validity = 0;
@@ -905,12 +921,12 @@ impl Session {
// This may detect previously undetected moved // This may detect previously undetected moved
// messages, so we update server_folder too. // messages, so we update server_folder too.
transaction.execute( transaction.execute(
"INSERT INTO imap (rfc724_mid, folder, uid, uidvalidity, target) "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
VALUES (?1, ?2, ?3, ?4, ?5) VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(folder, uid, uidvalidity) ON CONFLICT(transport_id, folder, uid, uidvalidity)
DO UPDATE SET rfc724_mid=excluded.rfc724_mid, DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
target=excluded.target", target=excluded.target",
(rfc724_mid, folder, uid, uid_validity, target), (transport_id, rfc724_mid, folder, uid, uid_validity, target),
)?; )?;
} }
Ok(()) Ok(())
@@ -1232,11 +1248,12 @@ impl Session {
return Ok(()); return Ok(());
} }
let transport_id = self.transport_id();
let mut updated_chat_ids = BTreeSet::new(); let mut updated_chat_ids = BTreeSet::new();
let uid_validity = get_uidvalidity(context, folder) let uid_validity = get_uidvalidity(context, transport_id, folder)
.await .await
.with_context(|| format!("failed to get UID validity for folder {folder}"))?; .with_context(|| format!("failed to get UID validity for folder {folder}"))?;
let mut highest_modseq = get_modseq(context, folder) let mut highest_modseq = get_modseq(context, transport_id, folder)
.await .await
.with_context(|| format!("failed to get MODSEQ for folder {folder}"))?; .with_context(|| format!("failed to get MODSEQ for folder {folder}"))?;
let mut list = self let mut list = self
@@ -1287,7 +1304,7 @@ impl Session {
self.new_mail = true; self.new_mail = true;
} }
set_modseq(context, folder, highest_modseq) set_modseq(context, transport_id, folder, highest_modseq)
.await .await
.with_context(|| format!("failed to set MODSEQ for folder {folder}"))?; .with_context(|| format!("failed to set MODSEQ for folder {folder}"))?;
if !updated_chat_ids.is_empty() { if !updated_chat_ids.is_empty() {
@@ -2417,13 +2434,18 @@ pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str)
/// uid_next is the next unique identifier value from the last time we fetched a folder /// uid_next is the next unique identifier value from the last time we fetched a folder
/// See <https://tools.ietf.org/html/rfc3501#section-2.3.1.1> /// See <https://tools.ietf.org/html/rfc3501#section-2.3.1.1>
/// This function is used to update our uid_next after fetching messages. /// This function is used to update our uid_next after fetching messages.
pub(crate) async fn set_uid_next(context: &Context, folder: &str, uid_next: u32) -> Result<()> { pub(crate) async fn set_uid_next(
context: &Context,
transport_id: u32,
folder: &str,
uid_next: u32,
) -> Result<()> {
context context
.sql .sql
.execute( .execute(
"INSERT INTO imap_sync (folder, uid_next) VALUES (?,?) "INSERT INTO imap_sync (transport_id, folder, uid_next) VALUES (?, ?,?)
ON CONFLICT(folder) DO UPDATE SET uid_next=excluded.uid_next", ON CONFLICT(transport_id, folder) DO UPDATE SET uid_next=excluded.uid_next",
(folder, uid_next), (transport_id, folder, uid_next),
) )
.await?; .await?;
Ok(()) Ok(())
@@ -2434,57 +2456,69 @@ pub(crate) async fn set_uid_next(context: &Context, folder: &str, uid_next: u32)
/// This method returns the uid_next from the last time we fetched messages. /// This method returns the uid_next from the last time we fetched messages.
/// We can compare this to the current uid_next to find out whether there are new messages /// We can compare this to the current uid_next to find out whether there are new messages
/// and fetch from this value on to get all new messages. /// and fetch from this value on to get all new messages.
async fn get_uid_next(context: &Context, folder: &str) -> Result<u32> { async fn get_uid_next(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
Ok(context Ok(context
.sql .sql
.query_get_value("SELECT uid_next FROM imap_sync WHERE folder=?;", (folder,)) .query_get_value(
"SELECT uid_next FROM imap_sync WHERE transport_id=? AND folder=?",
(transport_id, folder),
)
.await? .await?
.unwrap_or(0)) .unwrap_or(0))
} }
pub(crate) async fn set_uidvalidity( pub(crate) async fn set_uidvalidity(
context: &Context, context: &Context,
transport_id: u32,
folder: &str, folder: &str,
uidvalidity: u32, uidvalidity: u32,
) -> Result<()> { ) -> Result<()> {
context context
.sql .sql
.execute( .execute(
"INSERT INTO imap_sync (folder, uidvalidity) VALUES (?,?) "INSERT INTO imap_sync (transport_id, folder, uidvalidity) VALUES (?,?,?)
ON CONFLICT(folder) DO UPDATE SET uidvalidity=excluded.uidvalidity", ON CONFLICT(transport_id, folder) DO UPDATE SET uidvalidity=excluded.uidvalidity",
(folder, uidvalidity), (transport_id, folder, uidvalidity),
) )
.await?; .await?;
Ok(()) Ok(())
} }
async fn get_uidvalidity(context: &Context, folder: &str) -> Result<u32> { async fn get_uidvalidity(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
Ok(context Ok(context
.sql .sql
.query_get_value( .query_get_value(
"SELECT uidvalidity FROM imap_sync WHERE folder=?;", "SELECT uidvalidity FROM imap_sync WHERE transport_id=? AND folder=?",
(folder,), (transport_id, folder),
) )
.await? .await?
.unwrap_or(0)) .unwrap_or(0))
} }
pub(crate) async fn set_modseq(context: &Context, folder: &str, modseq: u64) -> Result<()> { pub(crate) async fn set_modseq(
context: &Context,
transport_id: u32,
folder: &str,
modseq: u64,
) -> Result<()> {
context context
.sql .sql
.execute( .execute(
"INSERT INTO imap_sync (folder, modseq) VALUES (?,?) "INSERT INTO imap_sync (transport_id, folder, modseq) VALUES (?,?,?)
ON CONFLICT(folder) DO UPDATE SET modseq=excluded.modseq", ON CONFLICT(transport_id, folder) DO UPDATE SET modseq=excluded.modseq",
(folder, modseq), (transport_id, folder, modseq),
) )
.await?; .await?;
Ok(()) Ok(())
} }
async fn get_modseq(context: &Context, folder: &str) -> Result<u64> { async fn get_modseq(context: &Context, transport_id: u32, folder: &str) -> Result<u64> {
Ok(context Ok(context
.sql .sql
.query_get_value("SELECT modseq FROM imap_sync WHERE folder=?;", (folder,)) .query_get_value(
"SELECT modseq FROM imap_sync WHERE transport_id=? AND folder=?",
(transport_id, folder),
)
.await? .await?
.unwrap_or(0)) .unwrap_or(0))
} }

View File

@@ -11,17 +11,23 @@ fn test_get_folder_meaning_by_name() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_set_uid_next_validity() { async fn test_set_uid_next_validity() {
let t = TestContext::new_alice().await; let t = TestContext::new_alice().await;
assert_eq!(get_uid_next(&t.ctx, "Inbox").await.unwrap(), 0); assert_eq!(get_uid_next(&t.ctx, 1, "Inbox").await.unwrap(), 0);
assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await.unwrap(), 0); assert_eq!(get_uidvalidity(&t.ctx, 1, "Inbox").await.unwrap(), 0);
set_uidvalidity(&t.ctx, "Inbox", 7).await.unwrap(); set_uidvalidity(&t.ctx, 1, "Inbox", 7).await.unwrap();
assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await.unwrap(), 7); assert_eq!(get_uidvalidity(&t.ctx, 1, "Inbox").await.unwrap(), 7);
assert_eq!(get_uid_next(&t.ctx, "Inbox").await.unwrap(), 0); assert_eq!(get_uid_next(&t.ctx, 1, "Inbox").await.unwrap(), 0);
set_uid_next(&t.ctx, "Inbox", 5).await.unwrap(); // For another transport there is still no UIDVALIDITY set.
set_uidvalidity(&t.ctx, "Inbox", 6).await.unwrap(); assert_eq!(get_uidvalidity(&t.ctx, 2, "Inbox").await.unwrap(), 0);
assert_eq!(get_uid_next(&t.ctx, "Inbox").await.unwrap(), 5);
assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await.unwrap(), 6); set_uid_next(&t.ctx, 1, "Inbox", 5).await.unwrap();
set_uidvalidity(&t.ctx, 1, "Inbox", 6).await.unwrap();
assert_eq!(get_uid_next(&t.ctx, 1, "Inbox").await.unwrap(), 5);
assert_eq!(get_uidvalidity(&t.ctx, 1, "Inbox").await.unwrap(), 6);
assert_eq!(get_uid_next(&t.ctx, 2, "Inbox").await.unwrap(), 0);
assert_eq!(get_uidvalidity(&t.ctx, 2, "Inbox").await.unwrap(), 0);
} }
#[test] #[test]

View File

@@ -5,6 +5,7 @@ use anyhow::Context as _;
use super::session::Session as ImapSession; use super::session::Session as ImapSession;
use super::{get_uid_next, get_uidvalidity, set_modseq, set_uid_next, set_uidvalidity}; use super::{get_uid_next, get_uidvalidity, set_modseq, set_uid_next, set_uidvalidity};
use crate::context::Context; use crate::context::Context;
use crate::ensure_and_debug_assert;
use crate::log::warn; use crate::log::warn;
type Result<T> = std::result::Result<T, Error>; type Result<T> = std::result::Result<T, Error>;
@@ -129,7 +130,7 @@ impl ImapSession {
context: &Context, context: &Context,
folder: &str, folder: &str,
create: bool, create: bool,
) -> Result<bool> { ) -> anyhow::Result<bool> {
let newly_selected = if create { let newly_selected = if create {
self.select_or_create_folder(context, folder) self.select_or_create_folder(context, folder)
.await .await
@@ -146,15 +147,24 @@ impl ImapSession {
}, },
} }
}; };
let transport_id = self.transport_id();
// Folders should not be selected when transport_id is not assigned yet
// because we cannot save UID validity then.
ensure_and_debug_assert!(
transport_id > 0,
"Cannot select folder when transport ID is unknown"
);
let mailbox = self let mailbox = self
.selected_mailbox .selected_mailbox
.as_mut() .as_mut()
.with_context(|| format!("No mailbox selected, folder: {folder:?}"))?; .with_context(|| format!("No mailbox selected, folder: {folder:?}"))?;
let old_uid_validity = get_uidvalidity(context, folder) let old_uid_validity = get_uidvalidity(context, transport_id, folder)
.await .await
.with_context(|| format!("Failed to get old UID validity for folder {folder:?}"))?; .with_context(|| format!("Failed to get old UID validity for folder {folder:?}"))?;
let old_uid_next = get_uid_next(context, folder) let old_uid_next = get_uid_next(context, transport_id, folder)
.await .await
.with_context(|| format!("Failed to get old UID NEXT for folder {folder:?}"))?; .with_context(|| format!("Failed to get old UID NEXT for folder {folder:?}"))?;
@@ -205,7 +215,7 @@ impl ImapSession {
context, context,
"The server illegally decreased the uid_next of folder {folder:?} from {old_uid_next} to {new_uid_next} without changing validity ({new_uid_validity}), resyncing UIDs...", "The server illegally decreased the uid_next of folder {folder:?} from {old_uid_next} to {new_uid_next} without changing validity ({new_uid_validity}), resyncing UIDs...",
); );
set_uid_next(context, folder, new_uid_next).await?; set_uid_next(context, transport_id, folder, new_uid_next).await?;
self.resync_request_sender.try_send(()).ok(); self.resync_request_sender.try_send(()).ok();
} }
@@ -224,21 +234,21 @@ impl ImapSession {
} }
// UIDVALIDITY is modified, reset highest seen MODSEQ. // UIDVALIDITY is modified, reset highest seen MODSEQ.
set_modseq(context, folder, 0).await?; set_modseq(context, transport_id, folder, 0).await?;
// ============== uid_validity has changed or is being set the first time. ============== // ============== uid_validity has changed or is being set the first time. ==============
let new_uid_next = new_uid_next.unwrap_or_default(); let new_uid_next = new_uid_next.unwrap_or_default();
set_uid_next(context, folder, new_uid_next).await?; set_uid_next(context, transport_id, folder, new_uid_next).await?;
set_uidvalidity(context, folder, new_uid_validity).await?; set_uidvalidity(context, transport_id, folder, new_uid_validity).await?;
self.new_mail = true; self.new_mail = true;
// Collect garbage entries in `imap` table. // Collect garbage entries in `imap` table.
context context
.sql .sql
.execute( .execute(
"DELETE FROM imap WHERE folder=? AND uidvalidity!=?", "DELETE FROM imap WHERE transport_id=? AND folder=? AND uidvalidity!=?",
(&folder, new_uid_validity), (transport_id, &folder, new_uid_validity),
) )
.await?; .await?;
@@ -247,12 +257,7 @@ impl ImapSession {
} }
info!( info!(
context, context,
"uid/validity change folder {}: new {}/{} previous {}/{}.", "transport {transport_id}: UID validity for folder {folder} changed from {old_uid_validity}/{old_uid_next} to {new_uid_validity}/{new_uid_next}.",
folder,
new_uid_next,
new_uid_validity,
old_uid_next,
old_uid_validity,
); );
Ok(true) Ok(true)
} }

View File

@@ -30,6 +30,8 @@ const PREFETCH_FLAGS: &str = "(UID INTERNALDATE RFC822.SIZE BODY.PEEK[HEADER.FIE
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Session { pub(crate) struct Session {
transport_id: u32,
pub(super) inner: ImapSession<Box<dyn SessionStream>>, pub(super) inner: ImapSession<Box<dyn SessionStream>>,
pub capabilities: Capabilities, pub capabilities: Capabilities,
@@ -71,8 +73,10 @@ impl Session {
inner: ImapSession<Box<dyn SessionStream>>, inner: ImapSession<Box<dyn SessionStream>>,
capabilities: Capabilities, capabilities: Capabilities,
resync_request_sender: async_channel::Sender<()>, resync_request_sender: async_channel::Sender<()>,
transport_id: u32,
) -> Self { ) -> Self {
Self { Self {
transport_id,
inner, inner,
capabilities, capabilities,
selected_folder: None, selected_folder: None,
@@ -84,6 +88,11 @@ impl Session {
} }
} }
/// Returns ID of the transport for which this session was created.
pub(crate) fn transport_id(&self) -> u32 {
self.transport_id
}
pub fn can_idle(&self) -> bool { pub fn can_idle(&self) -> bool {
self.capabilities.can_idle self.capabilities.can_idle
} }

View File

@@ -1,5 +1,4 @@
use std::cmp; use std::cmp;
use std::iter::{self, once};
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use anyhow::{Context as _, Error, Result, bail}; use anyhow::{Context as _, Error, Result, bail};
@@ -26,6 +25,7 @@ use crate::smtp::{Smtp, send_smtp_messages};
use crate::sql; use crate::sql;
use crate::stats::maybe_send_stats; use crate::stats::maybe_send_stats;
use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed}; use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
use crate::transport::ConfiguredLoginParam;
use crate::{constants, stats}; use crate::{constants, stats};
pub(crate) mod connectivity; pub(crate) mod connectivity;
@@ -212,21 +212,25 @@ impl SchedulerState {
/// Indicate that the network likely has come back. /// Indicate that the network likely has come back.
pub(crate) async fn maybe_network(&self) { pub(crate) async fn maybe_network(&self) {
let inner = self.inner.read().await; let inner = self.inner.read().await;
let (inbox, oboxes) = match *inner { let (inboxes, oboxes) = match *inner {
InnerSchedulerState::Started(ref scheduler) => { InnerSchedulerState::Started(ref scheduler) => {
scheduler.maybe_network(); scheduler.maybe_network();
let inbox = scheduler.inbox.conn_state.state.connectivity.clone(); let inboxes = scheduler
.inboxes
.iter()
.map(|b| b.conn_state.state.connectivity.clone())
.collect::<Vec<_>>();
let oboxes = scheduler let oboxes = scheduler
.oboxes .oboxes
.iter() .iter()
.map(|b| b.conn_state.state.connectivity.clone()) .map(|b| b.conn_state.state.connectivity.clone())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
(inbox, oboxes) (inboxes, oboxes)
} }
_ => return, _ => return,
}; };
drop(inner); drop(inner);
connectivity::idle_interrupted(inbox, oboxes); connectivity::idle_interrupted(inboxes, oboxes);
} }
/// Indicate that the network likely is lost. /// Indicate that the network likely is lost.
@@ -331,7 +335,8 @@ struct SchedBox {
/// Job and connection scheduler. /// Job and connection scheduler.
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Scheduler { pub(crate) struct Scheduler {
inbox: SchedBox, /// Inboxes, one per transport.
inboxes: Vec<SchedBox>,
/// Optional boxes -- mvbox. /// Optional boxes -- mvbox.
oboxes: Vec<SchedBox>, oboxes: Vec<SchedBox>,
smtp: SmtpConnectionState, smtp: SmtpConnectionState,
@@ -857,10 +862,13 @@ impl Scheduler {
let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1); let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1); let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
let mut inboxes = Vec::new();
let mut oboxes = Vec::new(); let mut oboxes = Vec::new();
let mut start_recvs = Vec::new(); let mut start_recvs = Vec::new();
let (conn_state, inbox_handlers) = ImapConnectionState::new(ctx).await?; for (transport_id, configured_login_param) in ConfiguredLoginParam::load_all(ctx).await? {
let (conn_state, inbox_handlers) =
ImapConnectionState::new(ctx, transport_id, configured_login_param.clone()).await?;
let (inbox_start_send, inbox_start_recv) = oneshot::channel(); let (inbox_start_send, inbox_start_recv) = oneshot::channel();
let handle = { let handle = {
let ctx = ctx.clone(); let ctx = ctx.clone();
@@ -871,10 +879,12 @@ impl Scheduler {
conn_state, conn_state,
handle, handle,
}; };
inboxes.push(inbox);
start_recvs.push(inbox_start_recv); start_recvs.push(inbox_start_recv);
if ctx.should_watch_mvbox().await? { if ctx.should_watch_mvbox().await? {
let (conn_state, handlers) = ImapConnectionState::new(ctx).await?; let (conn_state, handlers) =
ImapConnectionState::new(ctx, transport_id, configured_login_param).await?;
let (start_send, start_recv) = oneshot::channel(); let (start_send, start_recv) = oneshot::channel();
let ctx = ctx.clone(); let ctx = ctx.clone();
let meaning = FolderMeaning::Mvbox; let meaning = FolderMeaning::Mvbox;
@@ -886,6 +896,7 @@ impl Scheduler {
}); });
start_recvs.push(start_recv); start_recvs.push(start_recv);
} }
}
let smtp_handle = { let smtp_handle = {
let ctx = ctx.clone(); let ctx = ctx.clone();
@@ -910,7 +921,7 @@ impl Scheduler {
let recently_seen_loop = RecentlySeenLoop::new(ctx.clone()); let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
let res = Self { let res = Self {
inbox, inboxes,
oboxes, oboxes,
smtp, smtp,
smtp_handle, smtp_handle,
@@ -930,8 +941,8 @@ impl Scheduler {
Ok(res) Ok(res)
} }
fn boxes(&self) -> iter::Chain<iter::Once<&SchedBox>, std::slice::Iter<'_, SchedBox>> { fn boxes(&self) -> impl Iterator<Item = &SchedBox> {
once(&self.inbox).chain(self.oboxes.iter()) self.inboxes.iter().chain(self.oboxes.iter())
} }
fn maybe_network(&self) { fn maybe_network(&self) {
@@ -949,7 +960,9 @@ impl Scheduler {
} }
fn interrupt_inbox(&self) { fn interrupt_inbox(&self) {
self.inbox.conn_state.interrupt(); for b in &self.inboxes {
b.conn_state.interrupt();
}
} }
fn interrupt_oboxes(&self) { fn interrupt_oboxes(&self) {
@@ -989,7 +1002,7 @@ impl Scheduler {
let timeout_duration = std::time::Duration::from_secs(30); let timeout_duration = std::time::Duration::from_secs(30);
let tracker = TaskTracker::new(); let tracker = TaskTracker::new();
for b in once(self.inbox).chain(self.oboxes) { for b in self.inboxes.into_iter().chain(self.oboxes.into_iter()) {
let context = context.clone(); let context = context.clone();
tracker.spawn(async move { tracker.spawn(async move {
tokio::time::timeout(timeout_duration, b.handle) tokio::time::timeout(timeout_duration, b.handle)
@@ -1095,12 +1108,17 @@ pub(crate) struct ImapConnectionState {
impl ImapConnectionState { impl ImapConnectionState {
/// Construct a new connection. /// Construct a new connection.
async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> { async fn new(
context: &Context,
transport_id: u32,
login_param: ConfiguredLoginParam,
) -> Result<(Self, ImapConnectionHandlers)> {
let stop_token = CancellationToken::new(); let stop_token = CancellationToken::new();
let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1); let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
let handlers = ImapConnectionHandlers { let handlers = ImapConnectionHandlers {
connection: Imap::new_configured(context, idle_interrupt_receiver).await?, connection: Imap::new(context, transport_id, login_param, idle_interrupt_receiver)
.await?,
stop_token: stop_token.clone(), stop_token: stop_token.clone(),
}; };

View File

@@ -201,7 +201,8 @@ impl ConnectivityStore {
/// Set all folder states to InterruptingIdle in case they were `Idle` before. /// Set all folder states to InterruptingIdle in case they were `Idle` before.
/// Called during `dc_maybe_network()` to make sure that `all_work_done()` /// Called during `dc_maybe_network()` to make sure that `all_work_done()`
/// returns false immediately after `dc_maybe_network()`. /// returns false immediately after `dc_maybe_network()`.
pub(crate) fn idle_interrupted(inbox: ConnectivityStore, oboxes: Vec<ConnectivityStore>) { pub(crate) fn idle_interrupted(inboxes: Vec<ConnectivityStore>, oboxes: Vec<ConnectivityStore>) {
for inbox in inboxes {
let mut connectivity_lock = inbox.0.lock(); let mut connectivity_lock = inbox.0.lock();
// For the inbox, we also have to set the connectivity to InterruptingIdle if it was // For the inbox, we also have to set the connectivity to InterruptingIdle if it was
// NotConfigured before: If all folders are NotConfigured, dc_get_connectivity() // NotConfigured before: If all folders are NotConfigured, dc_get_connectivity()
@@ -213,7 +214,7 @@ pub(crate) fn idle_interrupted(inbox: ConnectivityStore, oboxes: Vec<Connectivit
{ {
*connectivity_lock = DetailedConnectivity::InterruptingIdle; *connectivity_lock = DetailedConnectivity::InterruptingIdle;
} }
drop(connectivity_lock); }
for state in oboxes { for state in oboxes {
let mut connectivity_lock = state.0.lock(); let mut connectivity_lock = state.0.lock();

View File

@@ -89,7 +89,7 @@ impl Smtp {
} }
self.connectivity.set_connecting(context); self.connectivity.set_connecting(context);
let lp = ConfiguredLoginParam::load(context) let (_transport_id, lp) = ConfiguredLoginParam::load(context)
.await? .await?
.context("Not configured")?; .context("Not configured")?;
let proxy_config = ProxyConfig::load(context).await?; let proxy_config = ProxyConfig::load(context).await?;

View File

@@ -1402,6 +1402,49 @@ CREATE INDEX gossip_timestamp_index ON gossip_timestamp (chat_id, fingerprint);
.await?; .await?;
} }
inc_and_check(&mut migration_version, 140)?;
if dbversion < migration_version {
sql.execute_migration(
"
CREATE TABLE new_imap (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transport_id INTEGER NOT NULL, -- ID of the transport in the `transports` table.
rfc724_mid TEXT NOT NULL, -- Message-ID header
folder TEXT NOT NULL, -- IMAP folder
target TEXT NOT NULL, -- Destination folder. Empty string means that the message shall be deleted.
uid INTEGER NOT NULL, -- UID
uidvalidity INTEGER NOT NULL,
UNIQUE (transport_id, folder, uid, uidvalidity)
) STRICT;
INSERT OR IGNORE INTO new_imap SELECT
id, 1, rfc724_mid, folder, target, uid, uidvalidity
FROM imap;
DROP TABLE imap;
ALTER TABLE new_imap RENAME TO imap;
CREATE INDEX imap_folder ON imap(transport_id, folder);
CREATE INDEX imap_rfc724_mid ON imap(transport_id, rfc724_mid);
CREATE TABLE new_imap_sync (
transport_id INTEGER NOT NULL, -- ID of the transport in the `transports` table.
folder TEXT NOT NULL,
uidvalidity INTEGER NOT NULL DEFAULT 0,
uid_next INTEGER NOT NULL DEFAULT 0,
modseq INTEGER NOT NULL DEFAULT 0,
UNIQUE (transport_id, folder)
) STRICT;
INSERT OR IGNORE INTO new_imap_sync SELECT
1, folder, uidvalidity, uid_next, modseq
FROM imap_sync;
DROP TABLE imap_sync;
ALTER TABLE new_imap_sync RENAME TO imap_sync;
CREATE INDEX imap_sync_index ON imap_sync(transport_id, folder);
",
migration_version,
)
.await?;
}
let new_version = sql let new_version = sql
.get_raw_config_int(VERSION_CFG) .get_raw_config_int(VERSION_CFG)
.await? .await?

View File

@@ -10,8 +10,8 @@
use std::fmt; use std::fmt;
use anyhow::{Context as _, Result, bail, ensure, format_err}; use anyhow::{Context as _, Result, bail, format_err};
use deltachat_contact_tools::{EmailAddress, addr_cmp, addr_normalize}; use deltachat_contact_tools::{EmailAddress, addr_normalize};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::config::Config; use crate::config::Config;
@@ -240,24 +240,46 @@ impl fmt::Display for ConfiguredLoginParam {
impl ConfiguredLoginParam { impl ConfiguredLoginParam {
/// Load configured account settings from the database. /// Load configured account settings from the database.
/// ///
/// Returns transport ID and configured parameters
/// of the current primary transport.
/// Returns `None` if account is not configured. /// Returns `None` if account is not configured.
pub(crate) async fn load(context: &Context) -> Result<Option<Self>> { pub(crate) async fn load(context: &Context) -> Result<Option<(u32, Self)>> {
let Some(self_addr) = context.get_config(Config::ConfiguredAddr).await? else { let Some(self_addr) = context.get_config(Config::ConfiguredAddr).await? else {
return Ok(None); return Ok(None);
}; };
let json: Option<String> = context let Some((id, json)) = context
.sql .sql
.query_get_value( .query_row_optional(
"SELECT configured_param FROM transports WHERE addr=?", "SELECT id, configured_param FROM transports WHERE addr=?",
(&self_addr,), (&self_addr,),
|row| {
let id: u32 = row.get(0)?;
let json: String = row.get(1)?;
Ok((id, json))
},
) )
.await?; .await?
if let Some(json) = json { else {
Ok(Some(Self::from_json(&json)?))
} else {
bail!("Self address {self_addr} doesn't have a corresponding transport"); bail!("Self address {self_addr} doesn't have a corresponding transport");
};
Ok(Some((id, Self::from_json(&json)?)))
} }
/// Loads configured login parameters for all transports.
///
/// Returns a vector of all transport IDs
/// paired with the configured parameters for the transports.
pub(crate) async fn load_all(context: &Context) -> Result<Vec<(u32, Self)>> {
context
.sql
.query_map_vec("SELECT id, configured_param FROM transports", (), |row| {
let id: u32 = row.get(0)?;
let json: String = row.get(1)?;
let param = Self::from_json(&json)?;
Ok((id, param))
})
.await
} }
/// Loads legacy configured param. Only used for tests and the migration. /// Loads legacy configured param. Only used for tests and the migration.
@@ -536,12 +558,6 @@ impl ConfiguredLoginParam {
let addr = addr_normalize(&self.addr); let addr = addr_normalize(&self.addr);
let provider_id = self.provider.map(|provider| provider.id); let provider_id = self.provider.map(|provider| provider.id);
let configured_addr = context.get_config(Config::ConfiguredAddr).await?; let configured_addr = context.get_config(Config::ConfiguredAddr).await?;
if let Some(configured_addr) = &configured_addr {
ensure!(
addr_cmp(configured_addr, &addr),
"Adding a second transport is not supported right now."
);
}
context context
.sql .sql
.execute( .execute(
@@ -680,7 +696,7 @@ mod tests {
expected_param expected_param
); );
assert_eq!(t.is_configured().await?, true); assert_eq!(t.is_configured().await?, true);
let loaded = ConfiguredLoginParam::load(&t).await?.unwrap(); let (_transport_id, loaded) = ConfiguredLoginParam::load(&t).await?.unwrap();
assert_eq!(param, loaded); assert_eq!(param, loaded);
// Legacy ConfiguredImapCertificateChecks config is ignored // Legacy ConfiguredImapCertificateChecks config is ignored
@@ -789,7 +805,7 @@ mod tests {
assert_eq!(loaded, param); assert_eq!(loaded, param);
migrate_configured_login_param(&t).await; migrate_configured_login_param(&t).await;
let loaded = ConfiguredLoginParam::load(&t).await?.unwrap(); let (_transport_id, loaded) = ConfiguredLoginParam::load(&t).await?.unwrap();
assert_eq!(loaded, param); assert_eq!(loaded, param);
Ok(()) Ok(())
@@ -833,7 +849,7 @@ mod tests {
migrate_configured_login_param(&t).await; migrate_configured_login_param(&t).await;
let loaded = ConfiguredLoginParam::load(&t).await?.unwrap(); let (_transport_id, loaded) = ConfiguredLoginParam::load(&t).await?.unwrap();
assert_eq!(loaded.provider, Some(*provider)); assert_eq!(loaded.provider, Some(*provider));
assert_eq!(loaded.imap.is_empty(), false); assert_eq!(loaded.imap.is_empty(), false);
assert_eq!(loaded.smtp.is_empty(), false); assert_eq!(loaded.smtp.is_empty(), false);
@@ -890,7 +906,7 @@ mod tests {
.save_to_transports_table(&t, &EnteredLoginParam::default()) .save_to_transports_table(&t, &EnteredLoginParam::default())
.await?; .await?;
let loaded = ConfiguredLoginParam::load(&t).await?.unwrap(); let (_transport_id, loaded) = ConfiguredLoginParam::load(&t).await?.unwrap();
assert_eq!(loaded.provider, Some(*provider)); assert_eq!(loaded.provider, Some(*provider));
assert_eq!(loaded.imap.is_empty(), false); assert_eq!(loaded.imap.is_empty(), false);
assert_eq!(loaded.smtp.is_empty(), false); assert_eq!(loaded.smtp.is_empty(), false);