api: add clear_all_relay_storage API

This commit is contained in:
link2xt
2026-04-08 21:55:07 +02:00
parent e3bf6bf352
commit ae8cbb3cf3
4 changed files with 109 additions and 4 deletions

View File

@@ -318,6 +318,15 @@ impl CommandApi {
Ok(()) Ok(())
} }
/// Requests to clear storage on all chatmail relays.
///
/// I/O must be started for this request to take effect.
async fn clear_all_relay_storage(&self, account_id: u32) -> Result<()> {
let ctx = self.get_context(account_id).await?;
ctx.clear_all_relay_storage().await;
Ok(())
}
/// Get top-level info for an account. /// Get top-level info for an account.
async fn get_account_info(&self, account_id: u32) -> Result<Account> { async fn get_account_info(&self, account_id: u32) -> Result<Account> {
let context_option = self.accounts.read().await.get_account(account_id); let context_option = self.accounts.read().await.get_account(account_id);

View File

@@ -560,6 +560,15 @@ impl Context {
} }
} }
/// Requests deletion of all messages from chatmail relays.
///
/// Non-chatmail relays are excluded
/// to avoid accidentally deleting emails
/// from shared inboxes.
pub async fn clear_all_relay_storage(&self) {
self.scheduler.clear_all_relay_storage().await;
}
/// Restarts the IO scheduler if it was running before /// Restarts the IO scheduler if it was running before
/// when it is not running this is an no-op /// when it is not running this is an no-op
pub async fn restart_io_if_running(&self) { pub async fn restart_io_if_running(&self) {

View File

@@ -975,6 +975,29 @@ impl Session {
Ok(()) Ok(())
} }
/// Deletes all messages from IMAP folder.
pub(crate) async fn delete_all_messages(
&mut self,
context: &Context,
folder: &str,
) -> Result<()> {
let transport_id = self.transport_id();
if self.select_with_uidvalidity(context, folder).await? {
self.add_flag_finalized_with_set("1:*", "\\Deleted").await?;
self.selected_folder_needs_expunge = true;
context
.sql
.execute(
"DELETE FROM imap WHERE transport_id=? AND folder=?",
(transport_id, folder),
)
.await?;
}
Ok(())
}
/// Moves batch of messages identified by their UID from the currently /// Moves batch of messages identified by their UID from the currently
/// selected folder to the target folder. /// selected folder to the target folder.
async fn move_message_batch( async fn move_message_batch(

View File

@@ -256,6 +256,13 @@ impl SchedulerState {
} }
} }
pub(crate) async fn clear_all_relay_storage(&self) {
let inner = self.inner.read().await;
if let InnerSchedulerState::Started(ref scheduler) = *inner {
scheduler.clear_all_relay_storage();
}
}
pub(crate) async fn interrupt_smtp(&self) { pub(crate) async fn interrupt_smtp(&self) {
let inner = self.inner.read().await; let inner = self.inner.read().await;
if let InnerSchedulerState::Started(ref scheduler) = *inner { if let InnerSchedulerState::Started(ref scheduler) = *inner {
@@ -353,6 +360,7 @@ async fn inbox_loop(
let ImapConnectionHandlers { let ImapConnectionHandlers {
mut connection, mut connection,
stop_token, stop_token,
clear_storage_request_receiver,
} = inbox_handlers; } = inbox_handlers;
let transport_id = connection.transport_id(); let transport_id = connection.transport_id();
@@ -391,7 +399,14 @@ async fn inbox_loop(
} }
}; };
match inbox_fetch_idle(&ctx, &mut connection, session).await { match inbox_fetch_idle(
&ctx,
&mut connection,
session,
&clear_storage_request_receiver,
)
.await
{
Err(err) => warn!( Err(err) => warn!(
ctx, ctx,
"Transport {transport_id}: Failed inbox fetch_idle: {err:#}." "Transport {transport_id}: Failed inbox fetch_idle: {err:#}."
@@ -442,11 +457,32 @@ pub async fn convert_folder_meaning(
} }
} }
async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> { async fn inbox_fetch_idle(
ctx: &Context,
imap: &mut Imap,
mut session: Session,
clear_storage_request_receiver: &Receiver<()>,
) -> Result<Session> {
let transport_id = session.transport_id(); let transport_id = session.transport_id();
// Clear IMAP storage on request.
//
// Only doing this for chatmail relays to avoid
// accidentally deleting all emails in a shared mailbox.
let should_clear_imap_storage =
clear_storage_request_receiver.try_recv().is_ok() && session.is_chatmail();
if should_clear_imap_storage
&& let Some((_folder_config, folder)) =
convert_folder_meaning(ctx, FolderMeaning::Inbox).await?
{
info!(ctx, "Transport {transport_id}: Clearing IMAP storage.");
session.delete_all_messages(ctx, &folder).await?;
}
// Update quota no more than once a minute. // Update quota no more than once a minute.
if ctx.quota_needs_update(session.transport_id(), 60).await //
// Always update if we just cleared IMAP storage.
if (ctx.quota_needs_update(session.transport_id(), 60).await || should_clear_imap_storage)
&& let Err(err) = ctx.update_recent_quota(&mut session).await && let Err(err) = ctx.update_recent_quota(&mut session).await
{ {
warn!( warn!(
@@ -608,6 +644,7 @@ async fn simple_imap_loop(
let ImapConnectionHandlers { let ImapConnectionHandlers {
mut connection, mut connection,
stop_token, stop_token,
clear_storage_request_receiver: _,
} = inbox_handlers; } = inbox_handlers;
let ctx1 = ctx.clone(); let ctx1 = ctx.clone();
@@ -865,6 +902,12 @@ impl Scheduler {
} }
} }
fn clear_all_relay_storage(&self) {
for b in &self.inboxes {
b.conn_state.clear_relay_storage();
}
}
fn interrupt_smtp(&self) { fn interrupt_smtp(&self) {
self.smtp.interrupt(); self.smtp.interrupt();
} }
@@ -998,6 +1041,13 @@ struct SmtpConnectionHandlers {
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct ImapConnectionState { pub(crate) struct ImapConnectionState {
state: ConnectionState, state: ConnectionState,
/// Channel to request clearing the folder.
///
/// IMAP loop receiving this should clear the folder
/// on the next iteration if IMAP server is a chatmail relay
/// and otherwise ignore the request.
clear_storage_request_sender: Sender<()>,
} }
impl ImapConnectionState { impl ImapConnectionState {
@@ -1009,11 +1059,13 @@ impl ImapConnectionState {
) -> Result<(Self, ImapConnectionHandlers)> { ) -> Result<(Self, ImapConnectionHandlers)> {
let stop_token = CancellationToken::new(); let stop_token = CancellationToken::new();
let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1); let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
let (clear_storage_request_sender, clear_storage_request_receiver) = channel::bounded(1);
let handlers = ImapConnectionHandlers { let handlers = ImapConnectionHandlers {
connection: Imap::new(context, transport_id, login_param, idle_interrupt_receiver) connection: Imap::new(context, transport_id, login_param, idle_interrupt_receiver)
.await?, .await?,
stop_token: stop_token.clone(), stop_token: stop_token.clone(),
clear_storage_request_receiver,
}; };
let state = ConnectionState { let state = ConnectionState {
@@ -1022,7 +1074,10 @@ impl ImapConnectionState {
connectivity: handlers.connection.connectivity.clone(), connectivity: handlers.connection.connectivity.clone(),
}; };
let conn = ImapConnectionState { state }; let conn = ImapConnectionState {
state,
clear_storage_request_sender,
};
Ok((conn, handlers)) Ok((conn, handlers))
} }
@@ -1036,10 +1091,19 @@ impl ImapConnectionState {
fn stop(&self) { fn stop(&self) {
self.state.stop(); self.state.stop();
} }
/// Requests clearing relay storage and interrupts the inbox.
fn clear_relay_storage(&self) {
self.clear_storage_request_sender.try_send(()).ok();
self.state.interrupt();
}
} }
#[derive(Debug)] #[derive(Debug)]
struct ImapConnectionHandlers { struct ImapConnectionHandlers {
connection: Imap, connection: Imap,
stop_token: CancellationToken, stop_token: CancellationToken,
/// Channel receiver to get requests to clear IMAP storage.
pub(crate) clear_storage_request_receiver: Receiver<()>,
} }