rustify dc_job_send and use a Delay enum for try_again later logic

This commit is contained in:
holger krekel
2019-08-19 11:34:08 +02:00
parent 8294b5eb28
commit 34f37ccd9c
2 changed files with 97 additions and 110 deletions

View File

@@ -122,6 +122,15 @@ pub const DC_CONTACT_ID_LAST_SPECIAL: usize = 9;
pub const DC_CREATE_MVBOX: usize = 1; pub const DC_CREATE_MVBOX: usize = 1;
#[repr(i32)]
#[derive(Debug, Display, Clone, Copy, PartialEq, Eq, FromPrimitive, ToPrimitive, ToSql, FromSql)]
pub enum Delay {
Do_not_try_again = 0,
At_once = -1,
Standard = 3,
Increation_poll = 2,
}
// Flags for configuring IMAP and SMTP servers. // Flags for configuring IMAP and SMTP servers.
// These flags are optional // These flags are optional
// and may be set together with the username, password etc. // and may be set together with the username, password etc.

View File

@@ -82,7 +82,7 @@ pub struct Job {
pub added_timestamp: i64, pub added_timestamp: i64,
pub tries: i32, pub tries: i32,
pub param: Params, pub param: Params,
pub try_again: i32, pub try_again: Delay,
pub pending_error: Option<String>, pub pending_error: Option<String>,
} }
@@ -111,114 +111,92 @@ impl Job {
#[allow(non_snake_case)] #[allow(non_snake_case)]
fn do_DC_JOB_SEND(&mut self, context: &Context) { fn do_DC_JOB_SEND(&mut self, context: &Context) {
let ok_to_continue;
let mut filename = ptr::null_mut();
let mut buf = ptr::null_mut();
let mut buf_bytes = 0;
/* connect to SMTP server, if not yet done */ /* connect to SMTP server, if not yet done */
if !context.smtp.lock().unwrap().is_connected() { if !context.smtp.lock().unwrap().is_connected() {
let loginparam = dc_loginparam_read(context, &context.sql, "configured_"); let loginparam = dc_loginparam_read(context, &context.sql, "configured_");
let connected = context.smtp.lock().unwrap().connect(context, &loginparam); let connected = context.smtp.lock().unwrap().connect(context, &loginparam);
if !connected { if !connected {
self.try_again_later(3i32, None); self.try_again_later(Delay::Standard, None);
ok_to_continue = false; return;
} else { }
ok_to_continue = true;
}
} else {
ok_to_continue = true;
} }
if ok_to_continue { let filename = self.param.get(Param::File).unwrap_or_default();
let filename_s = self.param.get(Param::File).unwrap_or_default(); let body = match dc_read_file_safe(context, filename) {
filename = unsafe { filename_s.strdup() }; Some(bytes) => bytes,
if unsafe { strlen(filename) } == 0 { None => {
warn!(context, 0, "Missing file name for job {}", self.job_id,); warn!(context, 0, "job {} error", self.job_id);
} else if 0 != unsafe { dc_read_file(context, filename, &mut buf, &mut buf_bytes) } { return;
let recipients = self.param.get(Param::Recipients); }
if recipients.is_none() { };
warn!(context, 0, "Missing recipients for job {}", self.job_id,);
} else {
let recipients_list = recipients
.unwrap()
.split("\x1e")
.filter_map(|addr| match lettre::EmailAddress::new(addr.to_string()) {
Ok(addr) => Some(addr),
Err(err) => {
eprintln!("WARNING: invalid recipient: {} {:?}", addr, err);
None
}
})
.collect::<Vec<_>>();
/* if there is a msg-id and it does not exist in the db, cancel sending.
this happends if dc_delete_msgs() was called
before the generated mime was sent out */
let ok_to_continue1;
if 0 != self.foreign_id {
if 0 == unsafe { dc_msg_exists(context, self.foreign_id) } {
warn!(
context,
0,
"Message {} for job {} does not exist",
self.foreign_id,
self.job_id,
);
ok_to_continue1 = false;
} else {
ok_to_continue1 = true;
}
} else {
ok_to_continue1 = true;
}
if ok_to_continue1 {
/* send message */
let body = unsafe {
std::slice::from_raw_parts(buf as *const u8, buf_bytes).to_vec()
};
// hold the smtp lock during sending of a job and let recipients = self.param.get(Param::Recipients);
// its ok/error response processing. Note that if a message if recipients.is_none() {
// was sent we need to mark it in the database as we error!(context, 0, "Missing recipients for job {}", self.job_id,);
// otherwise might send it twice. return;
let mut sock = context.smtp.lock().unwrap(); }
if 0 == sock.send(context, recipients_list, body) { let recipients_list = recipients
sock.disconnect(); .unwrap()
self.try_again_later(-1i32, Some(as_str(sock.error))); .split("\x1e")
} else { .filter_map(|addr| match lettre::EmailAddress::new(addr.to_string()) {
dc_delete_file(context, filename_s); Ok(addr) => Some(addr),
if 0 != self.foreign_id { Err(err) => {
dc_update_msg_state( eprintln!("WARNING: invalid recipient: {} {:?}", addr, err);
context, None
self.foreign_id,
MessageState::OutDelivered,
);
let chat_id: i32 = context
.sql
.query_row_col(
context,
"SELECT chat_id FROM msgs WHERE id=?",
params![self.foreign_id as i32],
0,
)
.unwrap_or_default();
context.call_cb(
Event::MSG_DELIVERED,
chat_id as uintptr_t,
self.foreign_id as uintptr_t,
);
}
}
}
} }
})
.collect::<Vec<_>>();
/* if there is a msg-id and it does not exist in the db, cancel sending.
this happends if dc_delete_msgs() was called
before the generated mime was sent out */
if 0 != self.foreign_id {
if 0 == unsafe { dc_msg_exists(context, self.foreign_id) } {
warn!(
context,
0,
"Message {} for job {} does not exist",
self.foreign_id,
self.job_id,
);
return;
} }
} }
unsafe { free(buf) }; /* send message while holding the smtp lock long enough
unsafe { free(filename.cast()) }; to also mark success in the database, to reduce chances
of a message getting sent twice.
*/
let mut sock = context.smtp.lock().unwrap();
if 0 == sock.send(context, recipients_list, body) {
sock.disconnect();
self.try_again_later(Delay::At_once, Some(as_str(sock.error)));
return;
}
dc_delete_file(context, filename);
if 0 != self.foreign_id {
dc_update_msg_state(
context,
self.foreign_id,
MessageState::OutDelivered,
);
let chat_id: i32 = context
.sql
.query_row_col(
context,
"SELECT chat_id FROM msgs WHERE id=?",
params![self.foreign_id as i32],
0,
)
.unwrap_or_default();
context.call_cb(
Event::MSG_DELIVERED,
chat_id as uintptr_t,
self.foreign_id as uintptr_t,
);
}
} }
// this value does not increase the number of tries // this value does not increase the number of tries
fn try_again_later(&mut self, try_again: libc::c_int, pending_error: Option<&str>) { fn try_again_later(&mut self, try_again: Delay, pending_error: Option<&str>) {
self.try_again = try_again; self.try_again = try_again;
self.pending_error = pending_error.map(|s| s.to_string()); self.pending_error = pending_error.map(|s| s.to_string());
} }
@@ -234,7 +212,7 @@ impl Job {
if !inbox.is_connected() { if !inbox.is_connected() {
connect_to_inbox(context, &inbox); connect_to_inbox(context, &inbox);
if !inbox.is_connected() { if !inbox.is_connected() {
self.try_again_later(3, None); self.try_again_later(Delay::Standard, None);
ok_to_continue = false; ok_to_continue = false;
} else { } else {
ok_to_continue = true; ok_to_continue = true;
@@ -268,7 +246,7 @@ impl Job {
) as libc::c_uint ) as libc::c_uint
{ {
1 => { 1 => {
self.try_again_later(3i32, None); self.try_again_later(Delay::Standard, None);
} }
3 => { 3 => {
dc_update_server_uid(context, msg.rfc724_mid, &dest_folder, dest_uid); dc_update_server_uid(context, msg.rfc724_mid, &dest_folder, dest_uid);
@@ -307,7 +285,7 @@ impl Job {
if !inbox.is_connected() { if !inbox.is_connected() {
connect_to_inbox(context, &inbox); connect_to_inbox(context, &inbox);
if !inbox.is_connected() { if !inbox.is_connected() {
self.try_again_later(3i32, None); self.try_again_later(Delay::Standard, None);
ok_to_continue = false; ok_to_continue = false;
} else { } else {
ok_to_continue = true; ok_to_continue = true;
@@ -321,7 +299,7 @@ impl Job {
if 0 == inbox.delete_msg(context, mid, server_folder, unsafe { if 0 == inbox.delete_msg(context, mid, server_folder, unsafe {
&mut (*msg).server_uid &mut (*msg).server_uid
}) { }) {
self.try_again_later(-1i32, None); self.try_again_later(Delay::At_once, None);
ok_to_continue1 = false; ok_to_continue1 = false;
} else { } else {
ok_to_continue1 = true; ok_to_continue1 = true;
@@ -348,7 +326,7 @@ impl Job {
if !inbox.is_connected() { if !inbox.is_connected() {
connect_to_inbox(context, &inbox); connect_to_inbox(context, &inbox);
if !inbox.is_connected() { if !inbox.is_connected() {
self.try_again_later(3i32, None); self.try_again_later(Delay::Standard, None);
ok_to_continue = false; ok_to_continue = false;
} else { } else {
ok_to_continue = true; ok_to_continue = true;
@@ -364,7 +342,7 @@ impl Job {
{ {
0 => {} 0 => {}
1 => { 1 => {
self.try_again_later(3i32, None); self.try_again_later(Delay::Standard, None);
} }
_ => { _ => {
if 0 != unsafe { (*msg).param.get_int(Param::WantsMdn).unwrap_or_default() } if 0 != unsafe { (*msg).param.get_int(Param::WantsMdn).unwrap_or_default() }
@@ -379,7 +357,7 @@ impl Job {
as libc::c_uint as libc::c_uint
{ {
1 => { 1 => {
self.try_again_later(3i32, None); self.try_again_later(Delay::Standard, None);
} }
3 => { 3 => {
send_mdn(context, unsafe { (*msg).id }); send_mdn(context, unsafe { (*msg).id });
@@ -409,7 +387,7 @@ impl Job {
if !inbox.is_connected() { if !inbox.is_connected() {
connect_to_inbox(context, &inbox); connect_to_inbox(context, &inbox);
if !inbox.is_connected() { if !inbox.is_connected() {
self.try_again_later(3, None); self.try_again_later(Delay::Standard, None);
ok_to_continue = false; ok_to_continue = false;
} else { } else {
ok_to_continue = true; ok_to_continue = true;
@@ -419,7 +397,7 @@ impl Job {
} }
if ok_to_continue { if ok_to_continue {
if inbox.set_seen(context, &folder, uid) == 0 { if inbox.set_seen(context, &folder, uid) == 0 {
self.try_again_later(3i32, None); self.try_again_later(Delay::Standard, None);
} }
if 0 != self.param.get_int(Param::AlsoMove).unwrap_or_default() { if 0 != self.param.get_int(Param::AlsoMove).unwrap_or_default() {
if context if context
@@ -435,7 +413,7 @@ impl Job {
if 1 == inbox.mv(context, folder, uid, dest_folder, &mut dest_uid) if 1 == inbox.mv(context, folder, uid, dest_folder, &mut dest_uid)
as libc::c_uint as libc::c_uint
{ {
self.try_again_later(3, None); self.try_again_later(Delay::Standard, None);
} }
} }
} }
@@ -840,7 +818,7 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
added_timestamp: row.get(4)?, added_timestamp: row.get(4)?,
tries: row.get(6)?, tries: row.get(6)?,
param: row.get::<_, String>(3)?.parse().unwrap_or_default(), param: row.get::<_, String>(3)?.parse().unwrap_or_default(),
try_again: 0, try_again: Delay::Do_not_try_again,
pending_error: None, pending_error: None,
}; };
@@ -893,7 +871,7 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
let mut tries = 0; let mut tries = 0;
while tries <= 1 { while tries <= 1 {
// this can be modified by a job using dc_job_try_again_later() // this can be modified by a job using dc_job_try_again_later()
job.try_again = 0; job.try_again = Delay::Do_not_try_again;
match job.action { match job.action {
Action::SendMsgToSmtp => job.do_DC_JOB_SEND(context), Action::SendMsgToSmtp => job.do_DC_JOB_SEND(context),
@@ -914,7 +892,7 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
Action::SendMdnOld => {} Action::SendMdnOld => {}
Action::SendMsgToSmtpOld => {} Action::SendMsgToSmtpOld => {}
} }
if job.try_again != -1 { if job.try_again != Delay::At_once {
break; break;
} }
tries += 1 tries += 1
@@ -934,7 +912,7 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
.unsuspend(context); .unsuspend(context);
suspend_smtp_thread(context, false); suspend_smtp_thread(context, false);
break; break;
} else if job.try_again == 2 { } else if job.try_again == Delay::Increation_poll {
// just try over next loop unconditionally, the ui typically interrupts idle when the file (video) is ready // just try over next loop unconditionally, the ui typically interrupts idle when the file (video) is ready
info!( info!(
context, context,
@@ -947,7 +925,7 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
}, },
job.job_id job.job_id
); );
} else if job.try_again == -1 || job.try_again == 3 { } else if job.try_again == Delay::At_once || job.try_again == Delay::Standard {
let tries = job.tries + 1; let tries = job.tries + 1;
if tries < 17 { if tries < 17 {
job.tries = tries; job.tries = tries;