mirror of
https://github.com/chatmail/core.git
synced 2026-05-19 14:56:33 +03:00
Remove UpdateRecentQuota job
This commit is contained in:
@@ -4,6 +4,7 @@ use std::collections::{BTreeMap, HashMap};
|
|||||||
use std::ffi::OsString;
|
use std::ffi::OsString;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::sync::atomic::AtomicBool;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant, SystemTime};
|
use std::time::{Duration, Instant, SystemTime};
|
||||||
|
|
||||||
@@ -206,6 +207,9 @@ pub struct InnerContext {
|
|||||||
/// Set to `None` if quota was never tried to load.
|
/// Set to `None` if quota was never tried to load.
|
||||||
pub(crate) quota: RwLock<Option<QuotaInfo>>,
|
pub(crate) quota: RwLock<Option<QuotaInfo>>,
|
||||||
|
|
||||||
|
/// Set to true if quota update is requested.
|
||||||
|
pub(crate) quota_update_request: AtomicBool,
|
||||||
|
|
||||||
/// Server ID response if ID capability is supported
|
/// Server ID response if ID capability is supported
|
||||||
/// and the server returned non-NIL on the inbox connection.
|
/// and the server returned non-NIL on the inbox connection.
|
||||||
/// <https://datatracker.ietf.org/doc/html/rfc2971>
|
/// <https://datatracker.ietf.org/doc/html/rfc2971>
|
||||||
@@ -365,6 +369,7 @@ impl Context {
|
|||||||
scheduler: RwLock::new(None),
|
scheduler: RwLock::new(None),
|
||||||
ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow to send 6 messages immediately, no more than once every 10 seconds.
|
ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow to send 6 messages immediately, no more than once every 10 seconds.
|
||||||
quota: RwLock::new(None),
|
quota: RwLock::new(None),
|
||||||
|
quota_update_request: AtomicBool::new(false),
|
||||||
server_id: RwLock::new(None),
|
server_id: RwLock::new(None),
|
||||||
creation_time: std::time::SystemTime::now(),
|
creation_time: std::time::SystemTime::now(),
|
||||||
last_full_folder_scan: Mutex::new(None),
|
last_full_folder_scan: Mutex::new(None),
|
||||||
|
|||||||
48
src/job.rs
48
src/job.rs
@@ -58,9 +58,6 @@ macro_rules! job_try {
|
|||||||
)]
|
)]
|
||||||
#[repr(u32)]
|
#[repr(u32)]
|
||||||
pub enum Action {
|
pub enum Action {
|
||||||
// this is user initiated so it should have a fairly high priority
|
|
||||||
UpdateRecentQuota = 140,
|
|
||||||
|
|
||||||
// This job will download partially downloaded messages completely
|
// This job will download partially downloaded messages completely
|
||||||
// and is added when download_full() is called.
|
// and is added when download_full() is called.
|
||||||
// Most messages are downloaded automatically on fetch
|
// Most messages are downloaded automatically on fetch
|
||||||
@@ -202,17 +199,6 @@ pub async fn kill_action(context: &Context, action: Action) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn action_exists(context: &Context, action: Action) -> Result<bool> {
|
|
||||||
let exists = context
|
|
||||||
.sql
|
|
||||||
.exists(
|
|
||||||
"SELECT COUNT(*) FROM jobs WHERE action=?;",
|
|
||||||
paramsv![action],
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
Ok(exists)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) enum Connection<'a> {
|
pub(crate) enum Connection<'a> {
|
||||||
Inbox(&'a mut Imap),
|
Inbox(&'a mut Imap),
|
||||||
}
|
}
|
||||||
@@ -240,7 +226,7 @@ pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_
|
|||||||
if tries < JOB_RETRIES {
|
if tries < JOB_RETRIES {
|
||||||
info!(context, "increase job {} tries to {}", job, tries);
|
info!(context, "increase job {} tries to {}", job, tries);
|
||||||
job.tries = tries;
|
job.tries = tries;
|
||||||
let time_offset = get_backoff_time_offset(tries, job.action);
|
let time_offset = get_backoff_time_offset(tries);
|
||||||
job.desired_timestamp = time() + time_offset;
|
job.desired_timestamp = time() + time_offset;
|
||||||
info!(
|
info!(
|
||||||
context,
|
context,
|
||||||
@@ -289,10 +275,6 @@ async fn perform_job_action(
|
|||||||
|
|
||||||
let try_res = match job.action {
|
let try_res = match job.action {
|
||||||
Action::ResyncFolders => job.resync_folders(context, connection.inbox()).await,
|
Action::ResyncFolders => job.resync_folders(context, connection.inbox()).await,
|
||||||
Action::UpdateRecentQuota => match context.update_recent_quota(connection.inbox()).await {
|
|
||||||
Ok(status) => status,
|
|
||||||
Err(err) => Status::Finished(Err(err)),
|
|
||||||
},
|
|
||||||
Action::DownloadMsg => job.download_msg(context, connection.inbox()).await,
|
Action::DownloadMsg => job.download_msg(context, connection.inbox()).await,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -301,24 +283,16 @@ async fn perform_job_action(
|
|||||||
try_res
|
try_res
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_backoff_time_offset(tries: u32, action: Action) -> i64 {
|
fn get_backoff_time_offset(tries: u32) -> i64 {
|
||||||
match action {
|
// Exponential backoff
|
||||||
// Just try every 10s to update the quota
|
let n = 2_i32.pow(tries - 1) * 60;
|
||||||
// If all retries are exhausted, a new job will be created when the quota information is needed
|
let mut rng = thread_rng();
|
||||||
Action::UpdateRecentQuota => 10,
|
let r: i32 = rng.gen();
|
||||||
|
let mut seconds = r % (n + 1);
|
||||||
_ => {
|
if seconds < 1 {
|
||||||
// Exponential backoff
|
seconds = 1;
|
||||||
let n = 2_i32.pow(tries - 1) * 60;
|
|
||||||
let mut rng = thread_rng();
|
|
||||||
let r: i32 = rng.gen();
|
|
||||||
let mut seconds = r % (n + 1);
|
|
||||||
if seconds < 1 {
|
|
||||||
seconds = 1;
|
|
||||||
}
|
|
||||||
i64::from(seconds)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
i64::from(seconds)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn schedule_resync(context: &Context) -> Result<()> {
|
pub(crate) async fn schedule_resync(context: &Context) -> Result<()> {
|
||||||
@@ -339,7 +313,7 @@ pub async fn add(context: &Context, job: Job) -> Result<()> {
|
|||||||
|
|
||||||
if delay_seconds == 0 {
|
if delay_seconds == 0 {
|
||||||
match action {
|
match action {
|
||||||
Action::ResyncFolders | Action::UpdateRecentQuota | Action::DownloadMsg => {
|
Action::ResyncFolders | Action::DownloadMsg => {
|
||||||
info!(context, "interrupt: imap");
|
info!(context, "interrupt: imap");
|
||||||
context.interrupt_inbox(InterruptInfo::new(false)).await;
|
context.interrupt_inbox(InterruptInfo::new(false)).await;
|
||||||
}
|
}
|
||||||
|
|||||||
25
src/quota.rs
25
src/quota.rs
@@ -1,6 +1,7 @@
|
|||||||
//! # Support for IMAP QUOTA extension.
|
//! # Support for IMAP QUOTA extension.
|
||||||
|
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
|
|
||||||
use anyhow::{anyhow, Context as _, Result};
|
use anyhow::{anyhow, Context as _, Result};
|
||||||
use async_imap::types::{Quota, QuotaResource};
|
use async_imap::types::{Quota, QuotaResource};
|
||||||
@@ -11,11 +12,10 @@ use crate::context::Context;
|
|||||||
use crate::imap::scan_folders::get_watched_folders;
|
use crate::imap::scan_folders::get_watched_folders;
|
||||||
use crate::imap::session::Session as ImapSession;
|
use crate::imap::session::Session as ImapSession;
|
||||||
use crate::imap::Imap;
|
use crate::imap::Imap;
|
||||||
use crate::job::{Action, Status};
|
|
||||||
use crate::message::{Message, Viewtype};
|
use crate::message::{Message, Viewtype};
|
||||||
use crate::param::Params;
|
use crate::scheduler::InterruptInfo;
|
||||||
use crate::tools::time;
|
use crate::tools::time;
|
||||||
use crate::{job, stock_str, EventType};
|
use crate::{stock_str, EventType};
|
||||||
|
|
||||||
/// warn about a nearly full mailbox after this usage percentage is reached.
|
/// warn about a nearly full mailbox after this usage percentage is reached.
|
||||||
/// quota icon is "yellow".
|
/// quota icon is "yellow".
|
||||||
@@ -112,12 +112,10 @@ pub fn needs_quota_warning(curr_percentage: u64, warned_at_percentage: u64) -> b
|
|||||||
impl Context {
|
impl Context {
|
||||||
// Adds a job to update `quota.recent`
|
// Adds a job to update `quota.recent`
|
||||||
pub(crate) async fn schedule_quota_update(&self) -> Result<()> {
|
pub(crate) async fn schedule_quota_update(&self) -> Result<()> {
|
||||||
if !job::action_exists(self, Action::UpdateRecentQuota).await? {
|
let requested = self.quota_update_request.swap(true, Ordering::Relaxed);
|
||||||
job::add(
|
if !requested {
|
||||||
self,
|
// Quota update was not requested before.
|
||||||
job::Job::new(Action::UpdateRecentQuota, 0, Params::new(), 0),
|
self.interrupt_inbox(InterruptInfo::new(false)).await;
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -132,10 +130,10 @@ impl Context {
|
|||||||
/// and new space is allocated as needed.
|
/// and new space is allocated as needed.
|
||||||
///
|
///
|
||||||
/// Called in response to `Action::UpdateRecentQuota`.
|
/// Called in response to `Action::UpdateRecentQuota`.
|
||||||
pub(crate) async fn update_recent_quota(&self, imap: &mut Imap) -> Result<Status> {
|
pub(crate) async fn update_recent_quota(&self, imap: &mut Imap) -> Result<()> {
|
||||||
if let Err(err) = imap.prepare(self).await {
|
if let Err(err) = imap.prepare(self).await {
|
||||||
warn!(self, "could not connect: {:#}", err);
|
warn!(self, "could not connect: {:#}", err);
|
||||||
return Ok(Status::RetryNow);
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let session = imap.session.as_mut().context("no session")?;
|
let session = imap.session.as_mut().context("no session")?;
|
||||||
@@ -166,13 +164,16 @@ impl Context {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clear the request to update quota.
|
||||||
|
self.quota_update_request.store(false, Ordering::Relaxed);
|
||||||
|
|
||||||
*self.quota.write().await = Some(QuotaInfo {
|
*self.quota.write().await = Some(QuotaInfo {
|
||||||
recent: quota,
|
recent: quota,
|
||||||
modified: time(),
|
modified: time(),
|
||||||
});
|
});
|
||||||
|
|
||||||
self.emit_event(EventType::ConnectivityChanged);
|
self.emit_event(EventType::ConnectivityChanged);
|
||||||
Ok(Status::Finished(Ok(())))
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
use std::iter::{self, once};
|
use std::iter::{self, once};
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
|
|
||||||
use anyhow::{bail, Context as _, Result};
|
use anyhow::{bail, Context as _, Result};
|
||||||
use async_channel::{self as channel, Receiver, Sender};
|
use async_channel::{self as channel, Receiver, Sender};
|
||||||
@@ -128,6 +129,13 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
|
|||||||
info = Default::default();
|
info = Default::default();
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
|
let requested = ctx.quota_update_request.swap(false, Ordering::Relaxed);
|
||||||
|
if requested {
|
||||||
|
if let Err(err) = ctx.update_recent_quota(&mut connection).await {
|
||||||
|
warn!(ctx, "Failed to update quota: {:#}.", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
maybe_add_time_based_warnings(&ctx).await;
|
maybe_add_time_based_warnings(&ctx).await;
|
||||||
|
|
||||||
match ctx.get_config_i64(Config::LastHousekeeping).await {
|
match ctx.get_config_i64(Config::LastHousekeeping).await {
|
||||||
|
|||||||
@@ -983,7 +983,7 @@ mod tests {
|
|||||||
assert_eq!(avatar_bytes, &tokio::fs::read(&a).await.unwrap()[..]);
|
assert_eq!(avatar_bytes, &tokio::fs::read(&a).await.unwrap()[..]);
|
||||||
|
|
||||||
t.sql.close().await;
|
t.sql.close().await;
|
||||||
housekeeping(&t).await.unwrap_err(); // housekeeping should fail as the db is closed
|
housekeeping(&t).await.unwrap(); // housekeeping should emit warnings but not fail
|
||||||
t.sql.open(&t, "".to_string()).await.unwrap();
|
t.sql.open(&t, "".to_string()).await.unwrap();
|
||||||
|
|
||||||
let a = t.get_config(Config::Selfavatar).await.unwrap().unwrap();
|
let a = t.get_config(Config::Selfavatar).await.unwrap().unwrap();
|
||||||
|
|||||||
Reference in New Issue
Block a user