mirror of
https://github.com/chatmail/core.git
synced 2026-05-02 04:46:29 +03:00
Merge remote-tracking branch 'origin/master' into feat/async-jobs
This commit is contained in:
180
src/job.rs
180
src/job.rs
@@ -78,10 +78,13 @@ pub enum Action {
|
||||
// Jobs in the INBOX-thread, range from DC_IMAP_THREAD..DC_IMAP_THREAD+999
|
||||
Housekeeping = 105, // low priority ...
|
||||
EmptyServer = 107,
|
||||
DeleteMsgOnImap = 110,
|
||||
MarkseenMdnOnImap = 120,
|
||||
OldDeleteMsgOnImap = 110,
|
||||
MarkseenMsgOnImap = 130,
|
||||
|
||||
// Moving message is prioritized lower than deletion so we don't
|
||||
// bother moving message if it is already scheduled for deletion.
|
||||
MoveMsg = 200,
|
||||
DeleteMsgOnImap = 210,
|
||||
|
||||
// Jobs in the SMTP-thread, range from DC_SMTP_THREAD..DC_SMTP_THREAD+999
|
||||
MaybeSendLocations = 5005, // low priority ...
|
||||
@@ -104,9 +107,9 @@ impl From<Action> for Thread {
|
||||
Unknown => Thread::Unknown,
|
||||
|
||||
Housekeeping => Thread::Imap,
|
||||
OldDeleteMsgOnImap => Thread::Imap,
|
||||
DeleteMsgOnImap => Thread::Imap,
|
||||
EmptyServer => Thread::Imap,
|
||||
MarkseenMdnOnImap => Thread::Imap,
|
||||
MarkseenMsgOnImap => Thread::Imap,
|
||||
MoveMsg => Thread::Imap,
|
||||
|
||||
@@ -417,22 +420,15 @@ impl Job {
|
||||
|
||||
if let Some(dest_folder) = dest_folder {
|
||||
let server_folder = msg.server_folder.as_ref().unwrap();
|
||||
let mut dest_uid = 0;
|
||||
|
||||
match imap
|
||||
.mv(
|
||||
context,
|
||||
server_folder,
|
||||
msg.server_uid,
|
||||
&dest_folder,
|
||||
&mut dest_uid,
|
||||
)
|
||||
.mv(context, server_folder, msg.server_uid, &dest_folder)
|
||||
.await
|
||||
{
|
||||
ImapActionResult::RetryLater => Status::RetryLater,
|
||||
ImapActionResult::Success => {
|
||||
message::update_server_uid(context, &msg.rfc724_mid, &dest_folder, dest_uid)
|
||||
.await;
|
||||
// XXX Rust-Imap provides no target uid on mv, so just set it to 0
|
||||
message::update_server_uid(context, &msg.rfc724_mid, &dest_folder, 0).await;
|
||||
Status::Finished(Ok(()))
|
||||
}
|
||||
ImapActionResult::Failed => {
|
||||
@@ -445,11 +441,26 @@ impl Job {
|
||||
}
|
||||
}
|
||||
|
||||
/// Deletes a message on the server.
|
||||
///
|
||||
/// foreign_id is a MsgId pointing to a message in the trash chat
|
||||
/// or a hidden message.
|
||||
///
|
||||
/// This job removes the database record. If there are no more
|
||||
/// records pointing to the same message on the server, the job
|
||||
/// also removes the message on the server.
|
||||
async fn delete_msg_on_imap(&mut self, context: &Context, imap: &mut Imap) -> Status {
|
||||
let mut msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await);
|
||||
let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await);
|
||||
|
||||
if !msg.rfc724_mid.is_empty() {
|
||||
if message::rfc724_mid_cnt(context, &msg.rfc724_mid).await > 1 {
|
||||
let cnt = message::rfc724_mid_cnt(context, &msg.rfc724_mid).await;
|
||||
info!(
|
||||
context,
|
||||
"Running delete job for message {} which has {} entries in the database",
|
||||
&msg.rfc724_mid,
|
||||
cnt
|
||||
);
|
||||
if cnt > 1 {
|
||||
info!(
|
||||
context,
|
||||
"The message is deleted from the server when all parts are deleted.",
|
||||
@@ -459,15 +470,48 @@ impl Job {
|
||||
we delete the message from the server */
|
||||
let mid = msg.rfc724_mid;
|
||||
let server_folder = msg.server_folder.as_ref().unwrap();
|
||||
let res = imap
|
||||
.delete_msg(context, &mid, server_folder, &mut msg.server_uid)
|
||||
.await;
|
||||
if res == ImapActionResult::RetryLater {
|
||||
// XXX RetryLater is converted to RetryNow here
|
||||
return Status::RetryNow;
|
||||
let res = if msg.server_uid == 0 {
|
||||
// Message is already deleted on IMAP server.
|
||||
ImapActionResult::AlreadyDone
|
||||
} else {
|
||||
imap.delete_msg(context, &mid, server_folder, msg.server_uid)
|
||||
.await
|
||||
};
|
||||
match res {
|
||||
ImapActionResult::AlreadyDone | ImapActionResult::Success => {}
|
||||
ImapActionResult::RetryLater | ImapActionResult::Failed => {
|
||||
// If job has failed, for example due to some
|
||||
// IMAP bug, we postpone it instead of failing
|
||||
// immediately. This will prevent adding it
|
||||
// immediately again if user has enabled
|
||||
// automatic message deletion. Without this,
|
||||
// we might waste a lot of traffic constantly
|
||||
// retrying message deletion.
|
||||
return Status::RetryLater;
|
||||
}
|
||||
}
|
||||
}
|
||||
Message::delete_from_db(context, msg.id).await;
|
||||
if msg.chat_id.is_trash() || msg.hidden {
|
||||
// Messages are stored in trash chat only to keep
|
||||
// their server UID and Message-ID. Once message is
|
||||
// deleted from the server, database record can be
|
||||
// removed as well.
|
||||
//
|
||||
// Hidden messages are similar to trashed, but are
|
||||
// related to some chat. We also delete their
|
||||
// database records.
|
||||
job_try!(msg.id.delete_from_db(context).await)
|
||||
} else {
|
||||
// Remove server UID from the database record.
|
||||
//
|
||||
// We have either just removed the message from the
|
||||
// server, in which case UID is not valid anymore, or
|
||||
// we have more refernces to the same server UID, so
|
||||
// we remove UID to reduce the number of messages
|
||||
// pointing to the corresponding UID. Once the counter
|
||||
// reaches zero, we will remove the message.
|
||||
job_try!(msg.id.unlink(context).await);
|
||||
}
|
||||
Status::Finished(Ok(()))
|
||||
} else {
|
||||
/* eg. device messages have no Message-ID */
|
||||
@@ -515,46 +559,6 @@ impl Job {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn markseen_mdn_on_imap(&mut self, context: &Context, imap: &mut Imap) -> Status {
|
||||
let folder = self
|
||||
.param
|
||||
.get(Param::ServerFolder)
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32;
|
||||
|
||||
if imap.set_seen(context, &folder, uid).await == ImapActionResult::RetryLater {
|
||||
return Status::RetryLater;
|
||||
}
|
||||
|
||||
if self.param.get_bool(Param::AlsoMove).unwrap_or_default() {
|
||||
if let Err(err) = imap.ensure_configured_folders(context, true).await {
|
||||
warn!(context, "configuring folders failed: {:?}", err);
|
||||
return Status::RetryLater;
|
||||
}
|
||||
let dest_folder = context
|
||||
.sql
|
||||
.get_raw_config(context, "configured_mvbox_folder")
|
||||
.await;
|
||||
if let Some(dest_folder) = dest_folder {
|
||||
let mut dest_uid = 0;
|
||||
if ImapActionResult::RetryLater
|
||||
== imap
|
||||
.mv(context, &folder, uid, &dest_folder, &mut dest_uid)
|
||||
.await
|
||||
{
|
||||
Status::RetryLater
|
||||
} else {
|
||||
Status::Finished(Ok(()))
|
||||
}
|
||||
} else {
|
||||
Status::Finished(Err(format_err!("MVBOX is not configured")))
|
||||
}
|
||||
} else {
|
||||
Status::Finished(Ok(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete all pending jobs with the given action.
|
||||
@@ -628,7 +632,11 @@ pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
let lowercase_from = from.to_lowercase();
|
||||
|
||||
// Send BCC to self if it is enabled and we are not going to
|
||||
// delete it immediately.
|
||||
if context.get_config_bool(Config::BccSelf).await
|
||||
&& context.get_config_delete_server_after().await != Some(0)
|
||||
&& !recipients
|
||||
.iter()
|
||||
.any(|x| x.to_lowercase() == lowercase_from)
|
||||
@@ -716,6 +724,45 @@ pub enum Connection<'a> {
|
||||
Smtp(&'a mut Smtp),
|
||||
}
|
||||
|
||||
async fn add_imap_deletion_jobs(context: &Context) -> sql::Result<()> {
|
||||
if let Some(delete_server_after) = context.get_config_delete_server_after().await {
|
||||
let threshold_timestamp = time() - delete_server_after;
|
||||
|
||||
// Select all expired messages which don't have a
|
||||
// corresponding message deletion job yet.
|
||||
let msg_ids = context
|
||||
.sql
|
||||
.query_map(
|
||||
"SELECT id FROM msgs \
|
||||
WHERE timestamp < ? \
|
||||
AND server_uid != 0 \
|
||||
AND NOT EXISTS (SELECT 1 FROM jobs WHERE foreign_id = msgs.id \
|
||||
AND action = ?)",
|
||||
paramsv![threshold_timestamp, Action::DeleteMsgOnImap],
|
||||
|row| row.get::<_, MsgId>(0),
|
||||
|ids| {
|
||||
ids.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.map_err(Into::into)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Schedule IMAP deletion for expired messages.
|
||||
for msg_id in msg_ids {
|
||||
add(
|
||||
context,
|
||||
Action::DeleteMsgOnImap,
|
||||
msg_id.to_u32() as i32,
|
||||
Params::new(),
|
||||
0,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl<'a> fmt::Display for Connection<'a> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
@@ -811,10 +858,7 @@ async fn perform_job_action(
|
||||
);
|
||||
|
||||
let try_res = match job.action {
|
||||
Action::Unknown => {
|
||||
warn!(context, "ignoring unknown job");
|
||||
Status::Finished(Ok(()))
|
||||
}
|
||||
Action::Unknown => Status::Finished(Err(format_err!("Unknown job id found"))),
|
||||
Action::SendMsgToSmtp => job.send_msg_to_smtp(context, connection.smtp()).await,
|
||||
Action::SendMdn => job.send_mdn(context, connection.smtp()).await,
|
||||
Action::MaybeSendLocations => location::job_maybe_send_locations(context, job).await,
|
||||
@@ -822,9 +866,9 @@ async fn perform_job_action(
|
||||
location::job_maybe_send_locations_ended(context, job).await
|
||||
}
|
||||
Action::EmptyServer => job.empty_server(context, connection.inbox()).await,
|
||||
Action::OldDeleteMsgOnImap => job.delete_msg_on_imap(context, connection.inbox()).await,
|
||||
Action::DeleteMsgOnImap => job.delete_msg_on_imap(context, connection.inbox()).await,
|
||||
Action::MarkseenMsgOnImap => job.markseen_msg_on_imap(context, connection.inbox()).await,
|
||||
Action::MarkseenMdnOnImap => job.markseen_mdn_on_imap(context, connection.inbox()).await,
|
||||
Action::MoveMsg => job.move_msg(context, connection.inbox()).await,
|
||||
Action::Housekeeping => {
|
||||
sql::housekeeping(context).await;
|
||||
@@ -913,8 +957,8 @@ pub async fn add(
|
||||
Action::Unknown => unreachable!(),
|
||||
Action::Housekeeping
|
||||
| Action::EmptyServer
|
||||
| Action::OldDeleteMsgOnImap
|
||||
| Action::DeleteMsgOnImap
|
||||
| Action::MarkseenMdnOnImap
|
||||
| Action::MarkseenMsgOnImap
|
||||
| Action::MoveMsg => {
|
||||
context.interrupt_inbox().await;
|
||||
|
||||
Reference in New Issue
Block a user