mirror of
https://github.com/chatmail/core.git
synced 2026-04-28 19:06:35 +03:00
refactor(imap): remove Session from Imap structure
Connection establishment now happens only in one place in each IMAP loop. Now all connection establishment happens in one place and is limited by the ratelimit. Backoff was removed from fake_idle as it does not establish connections anymore. If connection fails, fake_idle will return an error. We then drop the connection and get back to the beginning of IMAP loop. Backoff may be still nice to have to delay retries in case of constant connection failures so we don't immediately hit ratelimit if the network is unusable and returns immediate error on each connection attempt (e.g. ICMP network unreachable error), but adding backoff for connection failures is out of scope for this change.
This commit is contained in:
346
src/scheduler.rs
346
src/scheduler.rs
@@ -19,7 +19,7 @@ use crate::context::Context;
|
||||
use crate::download::{download_msg, DownloadState};
|
||||
use crate::ephemeral::{self, delete_expired_imap_messages};
|
||||
use crate::events::EventType;
|
||||
use crate::imap::{FolderMeaning, Imap};
|
||||
use crate::imap::{session::Session, FolderMeaning, Imap};
|
||||
use crate::location;
|
||||
use crate::log::LogExt;
|
||||
use crate::message::MsgId;
|
||||
@@ -330,7 +330,7 @@ pub(crate) struct Scheduler {
|
||||
recently_seen_loop: RecentlySeenLoop,
|
||||
}
|
||||
|
||||
async fn download_msgs(context: &Context, imap: &mut Imap) -> Result<()> {
|
||||
async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
|
||||
let msg_ids = context
|
||||
.sql
|
||||
.query_map(
|
||||
@@ -349,7 +349,7 @@ async fn download_msgs(context: &Context, imap: &mut Imap) -> Result<()> {
|
||||
.await?;
|
||||
|
||||
for msg_id in msg_ids {
|
||||
if let Err(err) = download_msg(context, msg_id, imap).await {
|
||||
if let Err(err) = download_msg(context, msg_id, session).await {
|
||||
warn!(context, "Failed to download message {msg_id}: {:#}.", err);
|
||||
|
||||
// Update download state to failure
|
||||
@@ -392,94 +392,26 @@ async fn inbox_loop(
|
||||
return;
|
||||
};
|
||||
|
||||
let mut old_session: Option<Session> = None;
|
||||
loop {
|
||||
if let Err(err) = connection.prepare(&ctx).await {
|
||||
warn!(ctx, "Failed to prepare connection: {:#}.", err);
|
||||
}
|
||||
|
||||
{
|
||||
// Update quota no more than once a minute.
|
||||
let quota_needs_update = {
|
||||
let quota = ctx.quota.read().await;
|
||||
quota
|
||||
.as_ref()
|
||||
.filter(|quota| time_elapsed("a.modified) > Duration::from_secs(60))
|
||||
.is_none()
|
||||
};
|
||||
|
||||
if quota_needs_update {
|
||||
if let Some(session) = connection.session.as_mut() {
|
||||
if let Err(err) = ctx.update_recent_quota(session).await {
|
||||
warn!(ctx, "Failed to update quota: {:#}.", err);
|
||||
}
|
||||
let session = if let Some(session) = old_session.take() {
|
||||
session
|
||||
} else {
|
||||
match connection.prepare(&ctx).await {
|
||||
Err(err) => {
|
||||
warn!(ctx, "Failed to prepare connection: {:#}.", err);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed);
|
||||
if resync_requested {
|
||||
if let Some(session) = connection.session.as_mut() {
|
||||
if let Err(err) = session.resync_folders(&ctx).await {
|
||||
warn!(ctx, "Failed to resync folders: {:#}.", err);
|
||||
ctx.resync_request.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
maybe_add_time_based_warnings(&ctx).await;
|
||||
|
||||
match ctx.get_config_i64(Config::LastHousekeeping).await {
|
||||
Ok(last_housekeeping_time) => {
|
||||
let next_housekeeping_time =
|
||||
last_housekeeping_time.saturating_add(60 * 60 * 24);
|
||||
if next_housekeeping_time <= time() {
|
||||
sql::housekeeping(&ctx).await.log_err(&ctx).ok();
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(ctx, "Failed to get last housekeeping time: {}", err);
|
||||
Ok(session) => session,
|
||||
}
|
||||
};
|
||||
|
||||
match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
|
||||
Ok(fetched_existing_msgs) => {
|
||||
if !fetched_existing_msgs {
|
||||
// Consider it done even if we fail.
|
||||
//
|
||||
// This operation is not critical enough to retry,
|
||||
// especially if the error is persistent.
|
||||
if let Err(err) = ctx
|
||||
.set_config_internal(
|
||||
Config::FetchedExistingMsgs,
|
||||
config::from_bool(true),
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
|
||||
}
|
||||
|
||||
if let Err(err) = connection.fetch_existing_msgs(&ctx).await {
|
||||
warn!(ctx, "Failed to fetch existing messages: {:#}", err);
|
||||
connection.trigger_reconnect(&ctx);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
|
||||
match inbox_fetch_idle(&ctx, &mut connection, session).await {
|
||||
Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
|
||||
Ok(session) => {
|
||||
old_session = Some(session);
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(err) = download_msgs(&ctx, &mut connection).await {
|
||||
warn!(ctx, "Failed to download messages: {:#}", err);
|
||||
}
|
||||
|
||||
if let Some(session) = connection.session.as_mut() {
|
||||
if let Err(err) = session.fetch_metadata(&ctx).await {
|
||||
warn!(ctx, "Failed to fetch metadata: {err:#}.");
|
||||
}
|
||||
}
|
||||
|
||||
fetch_idle(&ctx, &mut connection, FolderMeaning::Inbox).await;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -525,82 +457,123 @@ pub async fn convert_folder_meaning(
|
||||
Ok((folder_config, watch_folder))
|
||||
}
|
||||
|
||||
/// Implement a single iteration of IMAP loop.
|
||||
///
|
||||
/// This function performs all IMAP operations on a single folder, selecting it if necessary and
|
||||
/// handling all the errors. In case of an error, it is logged, but not propagated upwards. If
|
||||
/// critical operation fails such as fetching new messages fails, connection is reset via
|
||||
/// `trigger_reconnect`, so a fresh one can be opened.
|
||||
async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_meaning: FolderMeaning) {
|
||||
let create_mvbox = true;
|
||||
if let Err(err) = connection
|
||||
.ensure_configured_folders(ctx, create_mvbox)
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
ctx,
|
||||
"Cannot watch {folder_meaning}, ensure_configured_folders() failed: {:#}", err,
|
||||
);
|
||||
connection.idle_interrupt_receiver.recv().await.ok();
|
||||
return;
|
||||
async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
|
||||
// Update quota no more than once a minute.
|
||||
let quota_needs_update = {
|
||||
let quota = ctx.quota.read().await;
|
||||
quota
|
||||
.as_ref()
|
||||
.filter(|quota| time_elapsed("a.modified) > Duration::from_secs(60))
|
||||
.is_none()
|
||||
};
|
||||
if quota_needs_update {
|
||||
if let Err(err) = ctx.update_recent_quota(&mut session).await {
|
||||
warn!(ctx, "Failed to update quota: {:#}.", err);
|
||||
}
|
||||
}
|
||||
let (folder_config, watch_folder) = match convert_folder_meaning(ctx, folder_meaning).await {
|
||||
Ok(meaning) => meaning,
|
||||
Err(error) => {
|
||||
// Warning instead of error because the folder may not be configured.
|
||||
// For example, this happens if the server does not have Sent folder
|
||||
// but watching Sent folder is enabled.
|
||||
warn!(ctx, "Error converting IMAP Folder name: {:?}", error);
|
||||
connection.connectivity.set_not_configured(ctx).await;
|
||||
connection.idle_interrupt_receiver.recv().await.ok();
|
||||
return;
|
||||
|
||||
let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed);
|
||||
if resync_requested {
|
||||
if let Err(err) = session.resync_folders(ctx).await {
|
||||
warn!(ctx, "Failed to resync folders: {:#}.", err);
|
||||
ctx.resync_request.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
maybe_add_time_based_warnings(ctx).await;
|
||||
|
||||
match ctx.get_config_i64(Config::LastHousekeeping).await {
|
||||
Ok(last_housekeeping_time) => {
|
||||
let next_housekeeping_time = last_housekeeping_time.saturating_add(60 * 60 * 24);
|
||||
if next_housekeeping_time <= time() {
|
||||
sql::housekeeping(ctx).await.log_err(ctx).ok();
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(ctx, "Failed to get last housekeeping time: {}", err);
|
||||
}
|
||||
};
|
||||
|
||||
// connect and fake idle if unable to connect
|
||||
if let Err(err) = connection
|
||||
.prepare(ctx)
|
||||
.await
|
||||
.context("prepare IMAP connection")
|
||||
{
|
||||
warn!(ctx, "{:#}", err);
|
||||
connection.trigger_reconnect(ctx);
|
||||
return;
|
||||
}
|
||||
match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
|
||||
Ok(fetched_existing_msgs) => {
|
||||
if !fetched_existing_msgs {
|
||||
// Consider it done even if we fail.
|
||||
//
|
||||
// This operation is not critical enough to retry,
|
||||
// especially if the error is persistent.
|
||||
if let Err(err) = ctx
|
||||
.set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true))
|
||||
.await
|
||||
{
|
||||
warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
|
||||
}
|
||||
|
||||
if folder_config == Config::ConfiguredInboxFolder {
|
||||
if let Some(session) = connection.session.as_mut() {
|
||||
session
|
||||
.store_seen_flags_on_imap(ctx)
|
||||
.await
|
||||
.context("store_seen_flags_on_imap")
|
||||
.log_err(ctx)
|
||||
.ok();
|
||||
} else {
|
||||
warn!(ctx, "No session even though we just prepared it");
|
||||
if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await {
|
||||
warn!(ctx, "Failed to fetch existing messages: {:#}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch the watched folder.
|
||||
if let Err(err) = connection
|
||||
.fetch_move_delete(ctx, &watch_folder, folder_meaning)
|
||||
download_msgs(ctx, &mut session)
|
||||
.await
|
||||
.context("fetch_move_delete")
|
||||
{
|
||||
connection.trigger_reconnect(ctx);
|
||||
warn!(ctx, "{:#}", err);
|
||||
return;
|
||||
.context("Failed to download messages")?;
|
||||
session
|
||||
.fetch_metadata(ctx)
|
||||
.await
|
||||
.context("Failed to fetch metadata")?;
|
||||
|
||||
let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
/// Implement a single iteration of IMAP loop.
|
||||
///
|
||||
/// This function performs all IMAP operations on a single folder, selecting it if necessary and
|
||||
/// handling all the errors. In case of an error, an error is returned and connection is dropped,
|
||||
/// otherwise connection is returned.
|
||||
async fn fetch_idle(
|
||||
ctx: &Context,
|
||||
connection: &mut Imap,
|
||||
mut session: Session,
|
||||
folder_meaning: FolderMeaning,
|
||||
) -> Result<Session> {
|
||||
let (folder_config, watch_folder) = match convert_folder_meaning(ctx, folder_meaning).await {
|
||||
Ok(meaning) => meaning,
|
||||
Err(err) => {
|
||||
// Warning instead of error because the folder may not be configured.
|
||||
// For example, this happens if the server does not have Sent folder
|
||||
// but watching Sent folder is enabled.
|
||||
warn!(ctx, "Error converting IMAP Folder name: {err:#}.");
|
||||
connection.connectivity.set_not_configured(ctx).await;
|
||||
connection.idle_interrupt_receiver.recv().await.ok();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
if folder_config == Config::ConfiguredInboxFolder {
|
||||
session
|
||||
.store_seen_flags_on_imap(ctx)
|
||||
.await
|
||||
.context("store_seen_flags_on_imap")?;
|
||||
}
|
||||
|
||||
// Fetch the watched folder.
|
||||
connection
|
||||
.fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
|
||||
.await
|
||||
.context("fetch_move_delete")?;
|
||||
|
||||
// Mark expired messages for deletion. Marked messages will be deleted from the server
|
||||
// on the next iteration of `fetch_move_delete`. `delete_expired_imap_messages` is not
|
||||
// called right before `fetch_move_delete` because it is not well optimized and would
|
||||
// otherwise slow down message fetching.
|
||||
delete_expired_imap_messages(ctx)
|
||||
.await
|
||||
.context("delete_expired_imap_messages")
|
||||
.log_err(ctx)
|
||||
.ok();
|
||||
.context("delete_expired_imap_messages")?;
|
||||
|
||||
// Scan additional folders only after finishing fetching the watched folder.
|
||||
//
|
||||
@@ -608,7 +581,11 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_meaning: Folder
|
||||
// be able to scan all folders before time is up if there are many of them.
|
||||
if folder_config == Config::ConfiguredInboxFolder {
|
||||
// Only scan on the Inbox thread in order to prevent parallel scans, which might lead to duplicate messages
|
||||
match connection.scan_folders(ctx).await.context("scan_folders") {
|
||||
match connection
|
||||
.scan_folders(ctx, &mut session)
|
||||
.await
|
||||
.context("scan_folders")
|
||||
{
|
||||
Err(err) => {
|
||||
// Don't reconnect, if there is a problem with the connection we will realize this when IDLEing
|
||||
// but maybe just one folder can't be selected or something
|
||||
@@ -621,42 +598,26 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_meaning: Folder
|
||||
// In most cases this will select the watched folder and return because there are
|
||||
// no new messages. We want to select the watched folder anyway before going IDLE
|
||||
// there, so this does not take additional protocol round-trip.
|
||||
if let Err(err) = connection
|
||||
.fetch_move_delete(ctx, &watch_folder, folder_meaning)
|
||||
connection
|
||||
.fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
|
||||
.await
|
||||
.context("fetch_move_delete after scan_folders")
|
||||
{
|
||||
connection.trigger_reconnect(ctx);
|
||||
warn!(ctx, "{:#}", err);
|
||||
return;
|
||||
}
|
||||
.context("fetch_move_delete after scan_folders")?;
|
||||
}
|
||||
Ok(false) => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Synchronize Seen flags.
|
||||
if let Some(session) = connection.session.as_mut() {
|
||||
session
|
||||
.sync_seen_flags(ctx, &watch_folder)
|
||||
.await
|
||||
.context("sync_seen_flags")
|
||||
.log_err(ctx)
|
||||
.ok();
|
||||
} else {
|
||||
warn!(ctx, "No IMAP session, skipping flag synchronization.");
|
||||
}
|
||||
session
|
||||
.sync_seen_flags(ctx, &watch_folder)
|
||||
.await
|
||||
.context("sync_seen_flags")
|
||||
.log_err(ctx)
|
||||
.ok();
|
||||
|
||||
connection.connectivity.set_idle(ctx).await;
|
||||
|
||||
ctx.emit_event(EventType::ImapInboxIdle);
|
||||
let Some(session) = connection.session.take() else {
|
||||
warn!(ctx, "No IMAP session, going to fake idle.");
|
||||
connection
|
||||
.fake_idle(ctx, watch_folder, folder_meaning)
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
|
||||
if !session.can_idle() {
|
||||
info!(
|
||||
@@ -664,9 +625,9 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_meaning: Folder
|
||||
"IMAP session does not support IDLE, going to fake idle."
|
||||
);
|
||||
connection
|
||||
.fake_idle(ctx, watch_folder, folder_meaning)
|
||||
.await;
|
||||
return;
|
||||
.fake_idle(ctx, &mut session, watch_folder, folder_meaning)
|
||||
.await?;
|
||||
return Ok(session);
|
||||
}
|
||||
|
||||
if ctx
|
||||
@@ -678,29 +639,22 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_meaning: Folder
|
||||
{
|
||||
info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
|
||||
connection
|
||||
.fake_idle(ctx, watch_folder, folder_meaning)
|
||||
.await;
|
||||
return;
|
||||
.fake_idle(ctx, &mut session, watch_folder, folder_meaning)
|
||||
.await?;
|
||||
return Ok(session);
|
||||
}
|
||||
|
||||
info!(ctx, "IMAP session supports IDLE, using it.");
|
||||
match session
|
||||
let session = session
|
||||
.idle(
|
||||
ctx,
|
||||
connection.idle_interrupt_receiver.clone(),
|
||||
&watch_folder,
|
||||
)
|
||||
.await
|
||||
.context("idle")
|
||||
{
|
||||
Ok(session) => {
|
||||
connection.session = Some(session);
|
||||
}
|
||||
Err(err) => {
|
||||
connection.trigger_reconnect(ctx);
|
||||
warn!(ctx, "{:#}", err);
|
||||
}
|
||||
}
|
||||
.context("idle")?;
|
||||
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
async fn simple_imap_loop(
|
||||
@@ -726,8 +680,26 @@ async fn simple_imap_loop(
|
||||
return;
|
||||
}
|
||||
|
||||
let mut old_session: Option<Session> = None;
|
||||
loop {
|
||||
fetch_idle(&ctx, &mut connection, folder_meaning).await;
|
||||
let session = if let Some(session) = old_session.take() {
|
||||
session
|
||||
} else {
|
||||
match connection.prepare(&ctx).await {
|
||||
Err(err) => {
|
||||
warn!(ctx, "Failed to prepare connection: {:#}.", err);
|
||||
continue;
|
||||
}
|
||||
Ok(session) => session,
|
||||
}
|
||||
};
|
||||
|
||||
match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
|
||||
Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
|
||||
Ok(session) => {
|
||||
old_session = Some(session);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user