mirror of
https://github.com/chatmail/core.git
synced 2026-04-19 22:46:29 +03:00
feat: remove MvboxMove and OnlyFetchMvbox
This commit is contained in:
180
src/scheduler.rs
180
src/scheduler.rs
@@ -17,7 +17,7 @@ use crate::context::Context;
|
||||
use crate::download::{download_known_post_messages_without_pre_message, download_msgs};
|
||||
use crate::ephemeral::{self, delete_expired_imap_messages};
|
||||
use crate::events::EventType;
|
||||
use crate::imap::{FolderMeaning, Imap, session::Session};
|
||||
use crate::imap::{Imap, session::Session};
|
||||
use crate::location;
|
||||
use crate::log::{LogExt, warn};
|
||||
use crate::smtp::{Smtp, send_smtp_messages};
|
||||
@@ -211,25 +211,19 @@ impl SchedulerState {
|
||||
/// Indicate that the network likely has come back.
|
||||
pub(crate) async fn maybe_network(&self) {
|
||||
let inner = self.inner.read().await;
|
||||
let (inboxes, oboxes) = match *inner {
|
||||
let inboxes = match *inner {
|
||||
InnerSchedulerState::Started(ref scheduler) => {
|
||||
scheduler.maybe_network();
|
||||
let inboxes = scheduler
|
||||
scheduler
|
||||
.inboxes
|
||||
.iter()
|
||||
.map(|b| b.conn_state.state.connectivity.clone())
|
||||
.collect::<Vec<_>>();
|
||||
let oboxes = scheduler
|
||||
.oboxes
|
||||
.iter()
|
||||
.map(|b| b.conn_state.state.connectivity.clone())
|
||||
.collect::<Vec<_>>();
|
||||
(inboxes, oboxes)
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
_ => return,
|
||||
};
|
||||
drop(inner);
|
||||
connectivity::idle_interrupted(inboxes, oboxes);
|
||||
connectivity::idle_interrupted(inboxes);
|
||||
}
|
||||
|
||||
/// Indicate that the network likely is lost.
|
||||
@@ -318,7 +312,10 @@ impl Drop for IoPausedGuard {
|
||||
struct SchedBox {
|
||||
/// Address at the used chatmail/email relay
|
||||
addr: String,
|
||||
meaning: FolderMeaning,
|
||||
|
||||
/// Folder name
|
||||
folder: String,
|
||||
|
||||
conn_state: ImapConnectionState,
|
||||
|
||||
/// IMAP loop task handle.
|
||||
@@ -330,8 +327,6 @@ struct SchedBox {
|
||||
pub(crate) struct Scheduler {
|
||||
/// Inboxes, one per transport.
|
||||
inboxes: Vec<SchedBox>,
|
||||
/// Optional boxes -- mvbox.
|
||||
oboxes: Vec<SchedBox>,
|
||||
smtp: SmtpConnectionState,
|
||||
smtp_handle: task::JoinHandle<()>,
|
||||
ephemeral_handle: task::JoinHandle<()>,
|
||||
@@ -412,42 +407,12 @@ async fn inbox_loop(
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Convert folder meaning
|
||||
/// used internally by [fetch_idle] and [Context::background_fetch].
|
||||
///
|
||||
/// Returns folder configuration key and folder name
|
||||
/// if such folder is configured, `Ok(None)` otherwise.
|
||||
pub async fn convert_folder_meaning(
|
||||
ctx: &Context,
|
||||
folder_meaning: FolderMeaning,
|
||||
) -> Result<Option<(Config, String)>> {
|
||||
let folder_config = match folder_meaning.to_config() {
|
||||
Some(c) => c,
|
||||
None => {
|
||||
// Such folder cannot be configured,
|
||||
// e.g. a `FolderMeaning::Spam` folder.
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
let folder = ctx
|
||||
.get_config(folder_config)
|
||||
.await
|
||||
.with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
|
||||
|
||||
if let Some(watch_folder) = folder {
|
||||
Ok(Some((folder_config, watch_folder)))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
|
||||
let transport_id = session.transport_id();
|
||||
|
||||
// Update quota no more than once a minute.
|
||||
if ctx.quota_needs_update(session.transport_id(), 60).await
|
||||
&& let Err(err) = ctx.update_recent_quota(&mut session).await
|
||||
&& let Err(err) = ctx.update_recent_quota(&mut session, &imap.folder).await
|
||||
{
|
||||
warn!(
|
||||
ctx,
|
||||
@@ -494,7 +459,7 @@ async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session)
|
||||
.await
|
||||
.context("Failed to register push token")?;
|
||||
|
||||
let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
|
||||
let session = fetch_idle(ctx, imap, session).await?;
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
@@ -503,34 +468,19 @@ async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session)
|
||||
/// This function performs all IMAP operations on a single folder, selecting it if necessary and
|
||||
/// handling all the errors. In case of an error, an error is returned and connection is dropped,
|
||||
/// otherwise connection is returned.
|
||||
async fn fetch_idle(
|
||||
ctx: &Context,
|
||||
connection: &mut Imap,
|
||||
mut session: Session,
|
||||
folder_meaning: FolderMeaning,
|
||||
) -> Result<Session> {
|
||||
async fn fetch_idle(ctx: &Context, connection: &mut Imap, mut session: Session) -> Result<Session> {
|
||||
let transport_id = session.transport_id();
|
||||
|
||||
let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
|
||||
else {
|
||||
// The folder is not configured.
|
||||
// For example, this happens if the server does not have Sent folder
|
||||
// but watching Sent folder is enabled.
|
||||
connection.connectivity.set_not_configured(ctx);
|
||||
connection.idle_interrupt_receiver.recv().await.ok();
|
||||
bail!("Cannot fetch folder {folder_meaning} because it is not configured");
|
||||
};
|
||||
let watch_folder = connection.folder.clone();
|
||||
|
||||
if folder_config == Config::ConfiguredInboxFolder {
|
||||
session
|
||||
.store_seen_flags_on_imap(ctx)
|
||||
.await
|
||||
.context("store_seen_flags_on_imap")?;
|
||||
}
|
||||
session
|
||||
.store_seen_flags_on_imap(ctx)
|
||||
.await
|
||||
.context("store_seen_flags_on_imap")?;
|
||||
|
||||
// Fetch the watched folder.
|
||||
connection
|
||||
.fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
|
||||
.fetch_move_delete(ctx, &mut session, &watch_folder)
|
||||
.await
|
||||
.context("fetch_move_delete")?;
|
||||
|
||||
@@ -564,7 +514,7 @@ async fn fetch_idle(
|
||||
ctx,
|
||||
"Transport {transport_id}: IMAP session does not support IDLE, going to fake idle."
|
||||
);
|
||||
connection.fake_idle(ctx, watch_folder).await?;
|
||||
connection.fake_idle(ctx, &watch_folder).await?;
|
||||
return Ok(session);
|
||||
}
|
||||
|
||||
@@ -579,7 +529,7 @@ async fn fetch_idle(
|
||||
ctx,
|
||||
"Transport {transport_id}: IMAP IDLE is disabled, going to fake idle."
|
||||
);
|
||||
connection.fake_idle(ctx, watch_folder).await?;
|
||||
connection.fake_idle(ctx, &watch_folder).await?;
|
||||
return Ok(session);
|
||||
}
|
||||
|
||||
@@ -595,69 +545,6 @@ async fn fetch_idle(
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
/// Simplified IMAP loop to watch non-inbox folders.
|
||||
async fn simple_imap_loop(
|
||||
ctx: Context,
|
||||
started: oneshot::Sender<()>,
|
||||
inbox_handlers: ImapConnectionHandlers,
|
||||
folder_meaning: FolderMeaning,
|
||||
) {
|
||||
use futures::future::FutureExt;
|
||||
|
||||
info!(ctx, "Starting simple loop for {folder_meaning}.");
|
||||
let ImapConnectionHandlers {
|
||||
mut connection,
|
||||
stop_token,
|
||||
} = inbox_handlers;
|
||||
|
||||
let ctx1 = ctx.clone();
|
||||
|
||||
let fut = async move {
|
||||
let ctx = ctx1;
|
||||
if let Err(()) = started.send(()) {
|
||||
warn!(
|
||||
ctx,
|
||||
"Simple imap loop for {folder_meaning}, missing started receiver."
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let mut old_session: Option<Session> = None;
|
||||
loop {
|
||||
let session = if let Some(session) = old_session.take() {
|
||||
session
|
||||
} else {
|
||||
info!(ctx, "Preparing new IMAP session for {folder_meaning}.");
|
||||
match connection.prepare(&ctx).await {
|
||||
Err(err) => {
|
||||
warn!(
|
||||
ctx,
|
||||
"Failed to prepare {folder_meaning} connection: {err:#}."
|
||||
);
|
||||
continue;
|
||||
}
|
||||
Ok(session) => session,
|
||||
}
|
||||
};
|
||||
|
||||
match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
|
||||
Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
|
||||
Ok(session) => {
|
||||
old_session = Some(session);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
stop_token
|
||||
.cancelled()
|
||||
.map(|_| {
|
||||
info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
|
||||
})
|
||||
.race(fut)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn smtp_loop(
|
||||
ctx: Context,
|
||||
started: oneshot::Sender<()>,
|
||||
@@ -760,7 +647,6 @@ impl Scheduler {
|
||||
let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
|
||||
|
||||
let mut inboxes = Vec::new();
|
||||
let mut oboxes = Vec::new();
|
||||
let mut start_recvs = Vec::new();
|
||||
|
||||
for (transport_id, configured_login_param) in ConfiguredLoginParam::load_all(ctx).await? {
|
||||
@@ -772,30 +658,17 @@ impl Scheduler {
|
||||
task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
|
||||
};
|
||||
let addr = configured_login_param.addr.clone();
|
||||
let folder = configured_login_param
|
||||
.imap_folder
|
||||
.unwrap_or_else(|| "INBOX".to_string());
|
||||
let inbox = SchedBox {
|
||||
addr: addr.clone(),
|
||||
meaning: FolderMeaning::Inbox,
|
||||
folder,
|
||||
conn_state,
|
||||
handle,
|
||||
};
|
||||
inboxes.push(inbox);
|
||||
start_recvs.push(inbox_start_recv);
|
||||
|
||||
if ctx.should_watch_mvbox().await? {
|
||||
let (conn_state, handlers) =
|
||||
ImapConnectionState::new(ctx, transport_id, configured_login_param).await?;
|
||||
let (start_send, start_recv) = oneshot::channel();
|
||||
let ctx = ctx.clone();
|
||||
let meaning = FolderMeaning::Mvbox;
|
||||
let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
|
||||
oboxes.push(SchedBox {
|
||||
addr,
|
||||
meaning,
|
||||
conn_state,
|
||||
handle,
|
||||
});
|
||||
start_recvs.push(start_recv);
|
||||
}
|
||||
}
|
||||
|
||||
let smtp_handle = {
|
||||
@@ -822,7 +695,6 @@ impl Scheduler {
|
||||
|
||||
let res = Self {
|
||||
inboxes,
|
||||
oboxes,
|
||||
smtp,
|
||||
smtp_handle,
|
||||
ephemeral_handle,
|
||||
@@ -842,7 +714,7 @@ impl Scheduler {
|
||||
}
|
||||
|
||||
fn boxes(&self) -> impl Iterator<Item = &SchedBox> {
|
||||
self.inboxes.iter().chain(self.oboxes.iter())
|
||||
self.inboxes.iter()
|
||||
}
|
||||
|
||||
fn maybe_network(&self) {
|
||||
@@ -896,7 +768,7 @@ impl Scheduler {
|
||||
let timeout_duration = std::time::Duration::from_secs(30);
|
||||
|
||||
let tracker = TaskTracker::new();
|
||||
for b in self.inboxes.into_iter().chain(self.oboxes) {
|
||||
for b in self.inboxes {
|
||||
let context = context.clone();
|
||||
tracker.spawn(async move {
|
||||
tokio::time::timeout(timeout_duration, b.handle)
|
||||
|
||||
Reference in New Issue
Block a user