mirror of
https://github.com/chatmail/core.git
synced 2026-05-20 07:16:31 +03:00
feat: improve internal sql interface
Switches from rusqlite to sqlx to have a fully async based interface to sqlite. Co-authored-by: B. Petersen <r10s@b44t.com> Co-authored-by: Hocuri <hocuri@gmx.de> Co-authored-by: link2xt <link2xt@testrun.org>
This commit is contained in:
committed by
dignifiedquire
parent
4dedc2d8ce
commit
6bb5721f29
359
src/job.rs
359
src/job.rs
@@ -7,10 +7,11 @@ use std::{fmt, time::Duration};
|
||||
|
||||
use anyhow::{bail, ensure, format_err, Context as _, Error, Result};
|
||||
use async_smtp::smtp::response::{Category, Code, Detail};
|
||||
use async_std::prelude::*;
|
||||
use async_std::task::sleep;
|
||||
use deltachat_derive::{FromSql, ToSql};
|
||||
use itertools::Itertools;
|
||||
use rand::{thread_rng, Rng};
|
||||
use sqlx::Row;
|
||||
|
||||
use crate::dc_tools::{dc_delete_file, dc_read_file, time};
|
||||
use crate::ephemeral::load_imap_deletion_msgid;
|
||||
@@ -36,10 +37,8 @@ use crate::{scheduler::InterruptInfo, sql};
|
||||
const JOB_RETRIES: u32 = 17;
|
||||
|
||||
/// Thread IDs
|
||||
#[derive(
|
||||
Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql,
|
||||
)]
|
||||
#[repr(i32)]
|
||||
#[derive(Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive, sqlx::Type)]
|
||||
#[repr(u32)]
|
||||
pub(crate) enum Thread {
|
||||
Unknown = 0,
|
||||
Imap = 100,
|
||||
@@ -76,19 +75,9 @@ impl Default for Thread {
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Debug,
|
||||
Display,
|
||||
Copy,
|
||||
Clone,
|
||||
PartialEq,
|
||||
Eq,
|
||||
PartialOrd,
|
||||
FromPrimitive,
|
||||
ToPrimitive,
|
||||
FromSql,
|
||||
ToSql,
|
||||
Debug, Display, Copy, Clone, PartialEq, Eq, PartialOrd, FromPrimitive, ToPrimitive, sqlx::Type,
|
||||
)]
|
||||
#[repr(i32)]
|
||||
#[repr(u32)]
|
||||
pub enum Action {
|
||||
Unknown = 0,
|
||||
|
||||
@@ -184,7 +173,7 @@ impl Job {
|
||||
if self.job_id != 0 {
|
||||
context
|
||||
.sql
|
||||
.execute("DELETE FROM jobs WHERE id=?;", paramsv![self.job_id as i32])
|
||||
.execute(sqlx::query("DELETE FROM jobs WHERE id=?;").bind(self.job_id as i32))
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -203,26 +192,24 @@ impl Job {
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;",
|
||||
paramsv![
|
||||
self.desired_timestamp,
|
||||
self.tries as i64,
|
||||
self.param.to_string(),
|
||||
self.job_id as i32,
|
||||
],
|
||||
sqlx::query(
|
||||
"UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;",
|
||||
)
|
||||
.bind(self.desired_timestamp)
|
||||
.bind(self.tries as i64)
|
||||
.bind(self.param.to_string())
|
||||
.bind(self.job_id as i32),
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
context.sql.execute(
|
||||
"INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);",
|
||||
paramsv![
|
||||
self.added_timestamp,
|
||||
thread,
|
||||
self.action,
|
||||
self.foreign_id,
|
||||
self.param.to_string(),
|
||||
self.desired_timestamp
|
||||
]
|
||||
sqlx::query("INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);")
|
||||
.bind(self.added_timestamp)
|
||||
.bind(thread)
|
||||
.bind(self.action)
|
||||
.bind(self.foreign_id)
|
||||
.bind(self.param.to_string())
|
||||
.bind(self.desired_timestamp)
|
||||
).await?;
|
||||
}
|
||||
|
||||
@@ -253,7 +240,7 @@ impl Job {
|
||||
let status = match smtp.send(context, recipients, message, job_id).await {
|
||||
Err(crate::smtp::send::Error::SendError(err)) => {
|
||||
// Remote error, retry later.
|
||||
warn!(context, "SMTP failed to send: {}", err);
|
||||
warn!(context, "SMTP failed to send: {:?}", err);
|
||||
self.pending_error = Some(err.to_string());
|
||||
|
||||
let res = match err {
|
||||
@@ -339,6 +326,12 @@ impl Job {
|
||||
error!(context, "SMTP job failed because SMTP has no transport");
|
||||
Status::Finished(Err(format_err!("SMTP has not transport")))
|
||||
}
|
||||
Err(crate::smtp::send::Error::Other(err)) => {
|
||||
// Local error, job is invalid, do not retry.
|
||||
smtp.disconnect().await;
|
||||
warn!(context, "unable to load job: {}", err);
|
||||
Status::Finished(Err(err))
|
||||
}
|
||||
Ok(()) => {
|
||||
job_try!(success_cb().await);
|
||||
Status::Finished(Ok(()))
|
||||
@@ -387,11 +380,21 @@ impl Job {
|
||||
/* if there is a msg-id and it does not exist in the db, cancel sending.
|
||||
this happends if dc_delete_msgs() was called
|
||||
before the generated mime was sent out */
|
||||
if 0 != self.foreign_id && !message::exists(context, MsgId::new(self.foreign_id)).await {
|
||||
return Status::Finished(Err(format_err!(
|
||||
"Not sending Message {} as it was deleted",
|
||||
self.foreign_id
|
||||
)));
|
||||
if 0 != self.foreign_id {
|
||||
match message::exists(context, MsgId::new(self.foreign_id)).await {
|
||||
Ok(exists) => {
|
||||
if !exists {
|
||||
return Status::Finished(Err(format_err!(
|
||||
"Not sending Message {} as it was deleted",
|
||||
self.foreign_id
|
||||
)));
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(context, "failed to check message existence: {:?}", err);
|
||||
return Status::RetryLater;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let foreign_id = self.foreign_id;
|
||||
@@ -399,7 +402,7 @@ impl Job {
|
||||
async move {
|
||||
// smtp success, update db ASAP, then delete smtp file
|
||||
if 0 != foreign_id {
|
||||
set_delivered(context, MsgId::new(foreign_id)).await;
|
||||
set_delivered(context, MsgId::new(foreign_id)).await?;
|
||||
}
|
||||
// now also delete the generated file
|
||||
dc_delete_file(context, filename).await;
|
||||
@@ -416,44 +419,38 @@ impl Job {
|
||||
contact_id: u32,
|
||||
) -> sql::Result<(Vec<u32>, Vec<String>)> {
|
||||
// Extract message IDs from job parameters
|
||||
let res: Vec<(u32, MsgId)> = context
|
||||
let mut rows = context
|
||||
.sql
|
||||
.query_map(
|
||||
"SELECT id, param FROM jobs WHERE foreign_id=? AND id!=?",
|
||||
paramsv![contact_id, self.job_id],
|
||||
|row| {
|
||||
let job_id: u32 = row.get(0)?;
|
||||
let params_str: String = row.get(1)?;
|
||||
let params: Params = params_str.parse().unwrap_or_default();
|
||||
Ok((job_id, params))
|
||||
},
|
||||
|jobs| {
|
||||
let res = jobs
|
||||
.filter_map(|row| {
|
||||
let (job_id, params) = row.ok()?;
|
||||
let msg_id = params.get_msg_id()?;
|
||||
Some((job_id, msg_id))
|
||||
})
|
||||
.collect();
|
||||
Ok(res)
|
||||
},
|
||||
.fetch(
|
||||
sqlx::query("SELECT id, param FROM jobs WHERE foreign_id=? AND id!=?")
|
||||
.bind(contact_id)
|
||||
.bind(self.job_id),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Load corresponding RFC724 message IDs
|
||||
let mut job_ids = Vec::new();
|
||||
let mut rfc724_mids = Vec::new();
|
||||
for (job_id, msg_id) in res {
|
||||
if let Ok(Message { rfc724_mid, .. }) = Message::load_from_db(context, msg_id).await {
|
||||
job_ids.push(job_id);
|
||||
rfc724_mids.push(rfc724_mid);
|
||||
|
||||
while let Some(row) = rows.next().await {
|
||||
let row = row?;
|
||||
let job_id: u32 = row.try_get(0)?;
|
||||
let params_str: String = row.try_get(1)?;
|
||||
let params: Params = params_str.parse().unwrap_or_default();
|
||||
if let Some(msg_id) = params.get_msg_id() {
|
||||
if let Ok(Message { rfc724_mid, .. }) = Message::load_from_db(context, msg_id).await
|
||||
{
|
||||
job_ids.push(job_id);
|
||||
rfc724_mids.push(rfc724_mid);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok((job_ids, rfc724_mids))
|
||||
}
|
||||
|
||||
async fn send_mdn(&mut self, context: &Context, smtp: &mut Smtp) -> Status {
|
||||
if !context.get_config_bool(Config::MdnsEnabled).await {
|
||||
let mdns_enabled = job_try!(context.get_config_bool(Config::MdnsEnabled).await);
|
||||
if !mdns_enabled {
|
||||
// User has disabled MDNs after job scheduling but before
|
||||
// execution.
|
||||
return Status::Finished(Err(format_err!("MDNs are disabled")));
|
||||
@@ -539,7 +536,13 @@ impl Job {
|
||||
);
|
||||
return Status::Finished(Ok(()));
|
||||
}
|
||||
Ok(Some(config)) => context.get_config(config).await,
|
||||
Ok(Some(config)) => match context.get_config(config).await {
|
||||
Ok(folder) => folder,
|
||||
Err(err) => {
|
||||
warn!(context, "failed to load config: {}", err);
|
||||
return Status::RetryLater;
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
if let Some(dest_folder) = dest_folder {
|
||||
@@ -657,7 +660,7 @@ impl Job {
|
||||
/// Then, Fetch the last messages DC_FETCH_EXISTING_MSGS_COUNT emails from the server
|
||||
/// and show them in the chat list.
|
||||
async fn fetch_existing_msgs(&mut self, context: &Context, imap: &mut Imap) -> Status {
|
||||
if context.get_config_bool(Config::Bot).await {
|
||||
if job_try!(context.get_config_bool(Config::Bot).await) {
|
||||
return Status::Finished(Ok(())); // Bots don't want those messages
|
||||
}
|
||||
if let Err(err) = imap.connect_configured(context).await {
|
||||
@@ -669,13 +672,13 @@ impl Job {
|
||||
add_all_recipients_as_contacts(context, imap, Config::ConfiguredMvboxFolder).await;
|
||||
add_all_recipients_as_contacts(context, imap, Config::ConfiguredInboxFolder).await;
|
||||
|
||||
if context.get_config_bool(Config::FetchExistingMsgs).await {
|
||||
if job_try!(context.get_config_bool(Config::FetchExistingMsgs).await) {
|
||||
for config in &[
|
||||
Config::ConfiguredMvboxFolder,
|
||||
Config::ConfiguredInboxFolder,
|
||||
Config::ConfiguredSentboxFolder,
|
||||
] {
|
||||
if let Some(folder) = context.get_config(*config).await {
|
||||
if let Some(folder) = job_try!(context.get_config(*config).await) {
|
||||
if let Err(e) = imap.fetch_new_messages(context, folder, true).await {
|
||||
// We are using Anyhow's .context() and to show the inner error, too, we need the {:#}:
|
||||
warn!(context, "Could not fetch messages, retrying: {:#}", e);
|
||||
@@ -688,7 +691,7 @@ impl Job {
|
||||
// Make sure that if there now is a chat with a contact (created by an outgoing
|
||||
// message), then group contact requests from this contact should also be unblocked.
|
||||
// See https://github.com/deltachat/deltachat-core-rust/issues/2097.
|
||||
for item in chat::get_chat_msgs(context, ChatId::new(DC_CHAT_ID_DEADDROP), 0, None).await {
|
||||
for item in job_try!(chat::get_chat_msgs(context, DC_CHAT_ID_DEADDROP, 0, None).await) {
|
||||
if let ChatItem::Message { msg_id } = item {
|
||||
let msg = match Message::load_from_db(context, msg_id).await {
|
||||
Err(e) => {
|
||||
@@ -736,26 +739,21 @@ impl Job {
|
||||
return Status::RetryLater;
|
||||
}
|
||||
|
||||
if let Some(sentbox_folder) = &context.get_config(Config::ConfiguredSentboxFolder).await {
|
||||
job_try!(
|
||||
imap.resync_folder_uids(context, sentbox_folder.to_string())
|
||||
.await
|
||||
);
|
||||
let sentbox_folder = job_try!(context.get_config(Config::ConfiguredSentboxFolder).await);
|
||||
if let Some(sentbox_folder) = sentbox_folder {
|
||||
job_try!(imap.resync_folder_uids(context, sentbox_folder).await);
|
||||
}
|
||||
|
||||
if let Some(inbox_folder) = &context.get_config(Config::ConfiguredInboxFolder).await {
|
||||
job_try!(
|
||||
imap.resync_folder_uids(context, inbox_folder.to_string())
|
||||
.await
|
||||
);
|
||||
let inbox_folder = job_try!(context.get_config(Config::ConfiguredInboxFolder).await);
|
||||
if let Some(inbox_folder) = inbox_folder {
|
||||
job_try!(imap.resync_folder_uids(context, inbox_folder).await);
|
||||
}
|
||||
|
||||
if let Some(mvbox_folder) = &context.get_config(Config::ConfiguredMvboxFolder).await {
|
||||
job_try!(
|
||||
imap.resync_folder_uids(context, mvbox_folder.to_string())
|
||||
.await
|
||||
);
|
||||
let mvbox_folder = job_try!(context.get_config(Config::ConfiguredMvboxFolder).await);
|
||||
if let Some(mvbox_folder) = mvbox_folder {
|
||||
job_try!(imap.resync_folder_uids(context, mvbox_folder).await);
|
||||
}
|
||||
|
||||
Status::Finished(Ok(()))
|
||||
}
|
||||
|
||||
@@ -803,11 +801,13 @@ impl Job {
|
||||
// the name sent in the From field by the user.
|
||||
if msg.param.get_bool(Param::WantsMdn).unwrap_or_default()
|
||||
&& !msg.is_system_message()
|
||||
&& context.get_config_bool(Config::MdnsEnabled).await
|
||||
{
|
||||
if let Err(err) = send_mdn(context, &msg).await {
|
||||
warn!(context, "could not send out mdn for {}: {}", msg.id, err);
|
||||
return Status::Finished(Err(err));
|
||||
let mdns_enabled = job_try!(context.get_config_bool(Config::MdnsEnabled).await);
|
||||
if mdns_enabled {
|
||||
if let Err(err) = send_mdn(context, &msg).await {
|
||||
warn!(context, "could not send out mdn for {}: {}", msg.id, err);
|
||||
return Status::Finished(Err(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
Status::Finished(Ok(()))
|
||||
@@ -820,50 +820,46 @@ impl Job {
|
||||
pub async fn kill_action(context: &Context, action: Action) -> bool {
|
||||
context
|
||||
.sql
|
||||
.execute("DELETE FROM jobs WHERE action=?;", paramsv![action])
|
||||
.execute(sqlx::query("DELETE FROM jobs WHERE action=?;").bind(action))
|
||||
.await
|
||||
.is_ok()
|
||||
}
|
||||
|
||||
/// Remove jobs with specified IDs.
|
||||
async fn kill_ids(context: &Context, job_ids: &[u32]) -> sql::Result<()> {
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
format!(
|
||||
"DELETE FROM jobs WHERE id IN({})",
|
||||
job_ids.iter().map(|_| "?").join(",")
|
||||
),
|
||||
job_ids.iter().map(|i| i as &dyn crate::ToSql).collect(),
|
||||
)
|
||||
.await?;
|
||||
let q = format!(
|
||||
"DELETE FROM jobs WHERE id IN({})",
|
||||
job_ids.iter().map(|_| "?").join(",")
|
||||
);
|
||||
let mut query = sqlx::query(&q);
|
||||
for id in job_ids {
|
||||
query = query.bind(*id);
|
||||
}
|
||||
context.sql.execute(query).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn action_exists(context: &Context, action: Action) -> bool {
|
||||
context
|
||||
.sql
|
||||
.exists("SELECT id FROM jobs WHERE action=?;", paramsv![action])
|
||||
.exists(sqlx::query("SELECT COUNT(*) FROM jobs WHERE action=?;").bind(action))
|
||||
.await
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
async fn set_delivered(context: &Context, msg_id: MsgId) {
|
||||
async fn set_delivered(context: &Context, msg_id: MsgId) -> Result<()> {
|
||||
message::update_msg_state(context, msg_id, MessageState::OutDelivered).await;
|
||||
let chat_id: ChatId = context
|
||||
.sql
|
||||
.query_get_value(
|
||||
context,
|
||||
"SELECT chat_id FROM msgs WHERE id=?",
|
||||
paramsv![msg_id],
|
||||
)
|
||||
.await
|
||||
.query_get_value(sqlx::query("SELECT chat_id FROM msgs WHERE id=?").bind(msg_id))
|
||||
.await?
|
||||
.unwrap_or_default();
|
||||
context.emit_event(EventType::MsgDelivered { chat_id, msg_id });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn add_all_recipients_as_contacts(context: &Context, imap: &mut Imap, folder: Config) {
|
||||
let mailbox = if let Some(m) = context.get_config(folder).await {
|
||||
let mailbox = if let Ok(Some(m)) = context.get_config(folder).await {
|
||||
m
|
||||
} else {
|
||||
return;
|
||||
@@ -933,14 +929,14 @@ pub async fn send_msg_job(context: &Context, msg_id: MsgId) -> Result<Option<Job
|
||||
|
||||
let from = context
|
||||
.get_config(Config::ConfiguredAddr)
|
||||
.await
|
||||
.await?
|
||||
.unwrap_or_default();
|
||||
let lowercase_from = from.to_lowercase();
|
||||
|
||||
// Send BCC to self if it is enabled and we are not going to
|
||||
// delete it immediately.
|
||||
if context.get_config_bool(Config::BccSelf).await
|
||||
&& context.get_config_delete_server_after().await != Some(0)
|
||||
if context.get_config_bool(Config::BccSelf).await?
|
||||
&& context.get_config_delete_server_after().await? != Some(0)
|
||||
&& !recipients
|
||||
.iter()
|
||||
.any(|x| x.to_lowercase() == lowercase_from)
|
||||
@@ -954,7 +950,7 @@ pub async fn send_msg_job(context: &Context, msg_id: MsgId) -> Result<Option<Job
|
||||
context,
|
||||
"message {} has no recipient, skipping smtp-send", msg_id
|
||||
);
|
||||
set_delivered(context, msg_id).await;
|
||||
set_delivered(context, msg_id).await?;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -1022,7 +1018,7 @@ pub async fn send_msg_job(context: &Context, msg_id: MsgId) -> Result<Option<Job
|
||||
msg.subject = rendered_msg.subject.clone();
|
||||
msg.update_subject(context).await;
|
||||
|
||||
let job = create(Action::SendMsgToSmtp, msg_id.to_u32() as i32, param, 0)?;
|
||||
let job = create(Action::SendMsgToSmtp, msg_id.to_u32(), param, 0)?;
|
||||
|
||||
Ok(Some(job))
|
||||
}
|
||||
@@ -1200,13 +1196,13 @@ pub(crate) async fn schedule_resync(context: &Context) {
|
||||
}
|
||||
|
||||
/// Creates a job.
|
||||
pub fn create(action: Action, foreign_id: i32, param: Params, delay_seconds: i64) -> Result<Job> {
|
||||
pub fn create(action: Action, foreign_id: u32, param: Params, delay_seconds: i64) -> Result<Job> {
|
||||
ensure!(
|
||||
action != Action::Unknown,
|
||||
"Invalid action passed to job_add"
|
||||
);
|
||||
|
||||
Ok(Job::new(action, foreign_id as u32, param, delay_seconds))
|
||||
Ok(Job::new(action, foreign_id, param, delay_seconds))
|
||||
}
|
||||
|
||||
/// Adds a job to the database, scheduling it.
|
||||
@@ -1245,7 +1241,13 @@ pub async fn add(context: &Context, job: Job) {
|
||||
}
|
||||
|
||||
async fn load_housekeeping_job(context: &Context) -> Option<Job> {
|
||||
let last_time = context.get_config_i64(Config::LastHousekeeping).await;
|
||||
let last_time = match context.get_config_i64(Config::LastHousekeeping).await {
|
||||
Ok(last_time) => last_time,
|
||||
Err(err) => {
|
||||
warn!(context, "failed to load housekeeping config: {:?}", err);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let next_time = last_time + (60 * 60 * 24);
|
||||
if next_time <= time() {
|
||||
@@ -1280,65 +1282,77 @@ pub(crate) async fn load_next(
|
||||
sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
|
||||
let query;
|
||||
let params;
|
||||
let t = time();
|
||||
let m;
|
||||
let thread_i = thread as i64;
|
||||
|
||||
if let Some(msg_id) = info.msg_id {
|
||||
query = r#"
|
||||
let get_query = || {
|
||||
if let Some(msg_id) = info.msg_id {
|
||||
sqlx::query(
|
||||
r#"
|
||||
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
|
||||
FROM jobs
|
||||
WHERE thread=? AND foreign_id=?
|
||||
ORDER BY action DESC, added_timestamp
|
||||
LIMIT 1;
|
||||
"#;
|
||||
m = msg_id;
|
||||
params = paramsv![thread_i, m];
|
||||
} else if !info.probe_network {
|
||||
// processing for first-try and after backoff-timeouts:
|
||||
// process jobs in the order they were added.
|
||||
query = r#"
|
||||
"#,
|
||||
)
|
||||
.bind(thread_i)
|
||||
.bind(msg_id)
|
||||
} else if !info.probe_network {
|
||||
// processing for first-try and after backoff-timeouts:
|
||||
// process jobs in the order they were added.
|
||||
sqlx::query(
|
||||
r#"
|
||||
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
|
||||
FROM jobs
|
||||
WHERE thread=? AND desired_timestamp<=?
|
||||
ORDER BY action DESC, added_timestamp
|
||||
LIMIT 1;
|
||||
"#;
|
||||
params = paramsv![thread_i, t];
|
||||
} else {
|
||||
// processing after call to dc_maybe_network():
|
||||
// process _all_ pending jobs that failed before
|
||||
// in the order of their backoff-times.
|
||||
query = r#"
|
||||
"#,
|
||||
)
|
||||
.bind(thread_i)
|
||||
.bind(t)
|
||||
} else {
|
||||
// processing after call to dc_maybe_network():
|
||||
// process _all_ pending jobs that failed before
|
||||
// in the order of their backoff-times.
|
||||
sqlx::query(
|
||||
r#"
|
||||
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
|
||||
FROM jobs
|
||||
WHERE thread=? AND tries>0
|
||||
ORDER BY desired_timestamp, action DESC
|
||||
LIMIT 1;
|
||||
"#;
|
||||
params = paramsv![thread_i];
|
||||
"#,
|
||||
)
|
||||
.bind(thread_i)
|
||||
}
|
||||
};
|
||||
|
||||
let job = loop {
|
||||
let job_res = context
|
||||
.sql
|
||||
.query_row_optional(query, params.clone(), |row| {
|
||||
let job = Job {
|
||||
job_id: row.get("id")?,
|
||||
action: row.get("action")?,
|
||||
foreign_id: row.get("foreign_id")?,
|
||||
desired_timestamp: row.get("desired_timestamp")?,
|
||||
added_timestamp: row.get("added_timestamp")?,
|
||||
tries: row.get("tries")?,
|
||||
param: row.get::<_, String>("param")?.parse().unwrap_or_default(),
|
||||
pending_error: None,
|
||||
};
|
||||
|
||||
Ok(job)
|
||||
})
|
||||
.await;
|
||||
.fetch_optional(get_query())
|
||||
.await
|
||||
.and_then(|row| {
|
||||
if let Some(row) = row {
|
||||
Ok(Some(Job {
|
||||
job_id: row.try_get("id")?,
|
||||
action: row.try_get("action")?,
|
||||
foreign_id: row.try_get("foreign_id")?,
|
||||
desired_timestamp: row.try_get("desired_timestamp")?,
|
||||
added_timestamp: row.try_get("added_timestamp")?,
|
||||
tries: row.try_get::<i64, _>("tries")? as u32,
|
||||
param: row
|
||||
.try_get::<String, _>("param")?
|
||||
.parse()
|
||||
.unwrap_or_default(),
|
||||
pending_error: None,
|
||||
}))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
});
|
||||
|
||||
match job_res {
|
||||
Ok(job) => break job,
|
||||
@@ -1349,15 +1363,18 @@ LIMIT 1;
|
||||
// TODO: improve by only doing a single query
|
||||
match context
|
||||
.sql
|
||||
.query_row(query, params.clone(), |row| row.get::<_, i32>(0))
|
||||
.fetch_one(get_query())
|
||||
.await
|
||||
.and_then(|row| row.try_get::<i32, _>(0).map_err(Into::into))
|
||||
{
|
||||
Ok(id) => {
|
||||
context
|
||||
if let Err(err) = context
|
||||
.sql
|
||||
.execute("DELETE FROM jobs WHERE id=?;", paramsv![id])
|
||||
.execute(sqlx::query("DELETE FROM jobs WHERE id=?;").bind(id))
|
||||
.await
|
||||
.ok();
|
||||
{
|
||||
warn!(context, "failed to delete job {}: {:?}", id, err);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!(context, "failed to retrieve invalid job from DB: {}", err);
|
||||
@@ -1399,22 +1416,22 @@ mod tests {
|
||||
|
||||
use crate::test_utils::TestContext;
|
||||
|
||||
async fn insert_job(context: &Context, foreign_id: i64) {
|
||||
async fn insert_job(context: &Context, foreign_id: i64, valid: bool) {
|
||||
let now = time();
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"INSERT INTO jobs
|
||||
sqlx::query(
|
||||
"INSERT INTO jobs
|
||||
(added_timestamp, thread, action, foreign_id, param, desired_timestamp)
|
||||
VALUES (?, ?, ?, ?, ?, ?);",
|
||||
paramsv![
|
||||
now,
|
||||
Thread::from(Action::MoveMsg),
|
||||
Action::MoveMsg,
|
||||
foreign_id,
|
||||
Params::new().to_string(),
|
||||
now
|
||||
],
|
||||
)
|
||||
.bind(now)
|
||||
.bind(Thread::from(Action::MoveMsg))
|
||||
.bind(if valid { Action::MoveMsg as i32 } else { -1 })
|
||||
.bind(foreign_id)
|
||||
.bind(Params::new().to_string())
|
||||
.bind(now),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1426,7 +1443,7 @@ mod tests {
|
||||
// fails to load from the database instead of failing to load
|
||||
// all jobs.
|
||||
let t = TestContext::new().await;
|
||||
insert_job(&t, -1).await; // This can not be loaded into Job struct.
|
||||
insert_job(&t, 1, false).await; // This can not be loaded into Job struct.
|
||||
let jobs = load_next(
|
||||
&t,
|
||||
Thread::from(Action::MoveMsg),
|
||||
@@ -1436,7 +1453,7 @@ mod tests {
|
||||
// The housekeeping job should be loaded as we didn't run housekeeping in the last day:
|
||||
assert!(jobs.unwrap().action == Action::Housekeeping);
|
||||
|
||||
insert_job(&t, 1).await;
|
||||
insert_job(&t, 1, true).await;
|
||||
let jobs = load_next(
|
||||
&t,
|
||||
Thread::from(Action::MoveMsg),
|
||||
@@ -1450,7 +1467,7 @@ mod tests {
|
||||
async fn test_load_next_job_one() {
|
||||
let t = TestContext::new().await;
|
||||
|
||||
insert_job(&t, 1).await;
|
||||
insert_job(&t, 1, true).await;
|
||||
|
||||
let jobs = load_next(
|
||||
&t,
|
||||
|
||||
Reference in New Issue
Block a user