api: add clear_all_relay_storage API

This commit is contained in:
link2xt
2026-04-08 21:55:07 +02:00
committed by l
parent 8cd06bb785
commit 3c25e4b726
4 changed files with 108 additions and 4 deletions

View File

@@ -318,6 +318,15 @@ impl CommandApi {
Ok(())
}
/// Requests to clear storage on all chatmail relays.
///
/// I/O must be started for this request to take effect.
async fn clear_all_relay_storage(&self, account_id: u32) -> Result<()> {
let ctx = self.get_context(account_id).await?;
ctx.clear_all_relay_storage().await?;
Ok(())
}
/// Get top-level info for an account.
async fn get_account_info(&self, account_id: u32) -> Result<Account> {
let context_option = self.accounts.read().await.get_account(account_id);

View File

@@ -568,6 +568,15 @@ impl Context {
}
}
/// Requests deletion of all messages from chatmail relays.
///
/// Non-chatmail relays are excluded
/// to avoid accidentally deleting emails
/// from shared inboxes.
pub async fn clear_all_relay_storage(&self) -> Result<()> {
self.scheduler.clear_all_relay_storage().await
}
/// Restarts the IO scheduler if it was running before
/// when it is not running this is an no-op
pub async fn restart_io_if_running(&self) {

View File

@@ -945,6 +945,29 @@ impl Session {
Ok(())
}
/// Deletes all messages from IMAP folder.
pub(crate) async fn delete_all_messages(
&mut self,
context: &Context,
folder: &str,
) -> Result<()> {
let transport_id = self.transport_id();
if self.select_with_uidvalidity(context, folder).await? {
self.add_flag_finalized_with_set("1:*", "\\Deleted").await?;
self.selected_folder_needs_expunge = true;
context
.sql
.execute(
"DELETE FROM imap WHERE transport_id=? AND folder=?",
(transport_id, folder),
)
.await?;
}
Ok(())
}
/// Moves batch of messages identified by their UID from the currently
/// selected folder to the target folder.
async fn move_message_batch(

View File

@@ -250,6 +250,16 @@ impl SchedulerState {
}
}
pub(crate) async fn clear_all_relay_storage(&self) -> Result<()> {
let inner = self.inner.read().await;
if let InnerSchedulerState::Started(ref scheduler) = *inner {
scheduler.clear_all_relay_storage();
Ok(())
} else {
bail!("IO is not started");
}
}
pub(crate) async fn interrupt_smtp(&self) {
let inner = self.inner.read().await;
if let InnerSchedulerState::Started(ref scheduler) = *inner {
@@ -348,6 +358,7 @@ async fn inbox_loop(
let ImapConnectionHandlers {
mut connection,
stop_token,
clear_storage_request_receiver,
} = inbox_handlers;
let transport_id = connection.transport_id();
@@ -386,7 +397,14 @@ async fn inbox_loop(
}
};
match inbox_fetch_idle(&ctx, &mut connection, session).await {
match inbox_fetch_idle(
&ctx,
&mut connection,
session,
&clear_storage_request_receiver,
)
.await
{
Err(err) => warn!(
ctx,
"Transport {transport_id}: Failed inbox fetch_idle: {err:#}."
@@ -407,11 +425,29 @@ async fn inbox_loop(
.await;
}
async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
async fn inbox_fetch_idle(
ctx: &Context,
imap: &mut Imap,
mut session: Session,
clear_storage_request_receiver: &Receiver<()>,
) -> Result<Session> {
let transport_id = session.transport_id();
// Clear IMAP storage on request.
//
// Only doing this for chatmail relays to avoid
// accidentally deleting all emails in a shared mailbox.
let should_clear_imap_storage =
clear_storage_request_receiver.try_recv().is_ok() && session.is_chatmail();
if should_clear_imap_storage {
info!(ctx, "Transport {transport_id}: Clearing IMAP storage.");
session.delete_all_messages(ctx, &imap.folder).await?;
}
// Update quota no more than once a minute.
if ctx.quota_needs_update(session.transport_id(), 60).await
//
// Always update if we just cleared IMAP storage.
if (ctx.quota_needs_update(session.transport_id(), 60).await || should_clear_imap_storage)
&& let Err(err) = ctx.update_recent_quota(&mut session, &imap.folder).await
{
warn!(
@@ -737,6 +773,12 @@ impl Scheduler {
}
}
fn clear_all_relay_storage(&self) {
for b in &self.inboxes {
b.conn_state.clear_relay_storage();
}
}
fn interrupt_smtp(&self) {
self.smtp.interrupt();
}
@@ -870,6 +912,13 @@ struct SmtpConnectionHandlers {
#[derive(Debug)]
pub(crate) struct ImapConnectionState {
state: ConnectionState,
/// Channel to request clearing the folder.
///
/// IMAP loop receiving this should clear the folder
/// on the next iteration if IMAP server is a chatmail relay
/// and otherwise ignore the request.
clear_storage_request_sender: Sender<()>,
}
impl ImapConnectionState {
@@ -881,11 +930,13 @@ impl ImapConnectionState {
) -> Result<(Self, ImapConnectionHandlers)> {
let stop_token = CancellationToken::new();
let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
let (clear_storage_request_sender, clear_storage_request_receiver) = channel::bounded(1);
let handlers = ImapConnectionHandlers {
connection: Imap::new(context, transport_id, login_param, idle_interrupt_receiver)
.await?,
stop_token: stop_token.clone(),
clear_storage_request_receiver,
};
let state = ConnectionState {
@@ -894,7 +945,10 @@ impl ImapConnectionState {
connectivity: handlers.connection.connectivity.clone(),
};
let conn = ImapConnectionState { state };
let conn = ImapConnectionState {
state,
clear_storage_request_sender,
};
Ok((conn, handlers))
}
@@ -908,10 +962,19 @@ impl ImapConnectionState {
fn stop(&self) {
self.state.stop();
}
/// Requests clearing relay storage and interrupts the inbox.
fn clear_relay_storage(&self) {
self.clear_storage_request_sender.try_send(()).ok();
self.state.interrupt();
}
}
#[derive(Debug)]
struct ImapConnectionHandlers {
connection: Imap,
stop_token: CancellationToken,
/// Channel receiver to get requests to clear IMAP storage.
pub(crate) clear_storage_request_receiver: Receiver<()>,
}