diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index 139d92e3f..d66e7cf69 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -189,7 +189,7 @@ pub unsafe extern "C" fn dc_perform_imap_jobs(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_imap_jobs(context) + job::perform_imap_jobs(context) } #[no_mangle] @@ -197,7 +197,7 @@ pub unsafe extern "C" fn dc_perform_imap_fetch(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_imap_fetch(context) + job::perform_imap_fetch(context) } #[no_mangle] @@ -205,7 +205,7 @@ pub unsafe extern "C" fn dc_perform_imap_idle(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_imap_idle(context) + job::perform_imap_idle(context) } #[no_mangle] @@ -213,7 +213,7 @@ pub unsafe extern "C" fn dc_interrupt_imap_idle(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_interrupt_imap_idle(context) + job::interrupt_imap_idle(context) } #[no_mangle] @@ -221,7 +221,7 @@ pub unsafe extern "C" fn dc_perform_mvbox_fetch(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_mvbox_fetch(context) + job::perform_mvbox_fetch(context) } #[no_mangle] @@ -229,7 +229,7 @@ pub unsafe extern "C" fn dc_perform_mvbox_idle(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_mvbox_idle(context) + job::perform_mvbox_idle(context) } #[no_mangle] @@ -237,7 +237,7 @@ pub unsafe extern "C" fn dc_interrupt_mvbox_idle(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_interrupt_mvbox_idle(context) + job::interrupt_mvbox_idle(context) } #[no_mangle] @@ -245,7 +245,7 @@ pub unsafe extern "C" fn dc_perform_sentbox_fetch(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_sentbox_fetch(context) + job::perform_sentbox_fetch(context) } #[no_mangle] @@ -253,7 +253,7 @@ pub unsafe extern "C" fn dc_perform_sentbox_idle(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_sentbox_idle(context) + job::perform_sentbox_idle(context) } #[no_mangle] @@ -261,7 +261,7 @@ pub unsafe extern "C" fn dc_interrupt_sentbox_idle(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_interrupt_sentbox_idle(context) + job::interrupt_sentbox_idle(context) } #[no_mangle] @@ -269,7 +269,7 @@ pub unsafe extern "C" fn dc_perform_smtp_jobs(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_smtp_jobs(context) + job::perform_smtp_jobs(context) } #[no_mangle] @@ -277,7 +277,7 @@ pub unsafe extern "C" fn dc_perform_smtp_idle(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_smtp_idle(context) + job::perform_smtp_idle(context) } #[no_mangle] @@ -285,7 +285,7 @@ pub unsafe extern "C" fn dc_interrupt_smtp_idle(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_interrupt_smtp_idle(context) + job::interrupt_smtp_idle(context) } #[no_mangle] @@ -293,7 +293,7 @@ pub unsafe extern "C" fn dc_maybe_network(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_maybe_network(context) + job::maybe_network(context) } #[no_mangle] @@ -1027,7 +1027,7 @@ pub unsafe extern "C" fn dc_send_locations_to_chat( assert!(!context.is_null()); let context = &*context; - dc_location::dc_send_locations_to_chat(context, chat_id, seconds) + dc_location::dc_send_locations_to_chat(context, chat_id, seconds as i64) } #[no_mangle] diff --git a/examples/repl/cmdline.rs b/examples/repl/cmdline.rs index 4048a0b1d..9cc46c1ba 100644 --- a/examples/repl/cmdline.rs +++ b/examples/repl/cmdline.rs @@ -10,11 +10,11 @@ use deltachat::context::*; use deltachat::dc_array::*; use deltachat::dc_configure::*; use deltachat::dc_imex::*; -use deltachat::dc_job::*; use deltachat::dc_location::*; use deltachat::dc_msg::*; use deltachat::dc_receive_imf::*; use deltachat::dc_tools::*; +use deltachat::job::*; use deltachat::lot::LotState; use deltachat::peerstate::*; use deltachat::qr::*; @@ -582,7 +582,7 @@ pub unsafe fn dc_cmdline(context: &Context, line: &str) -> Result<(), failure::E println!("{}", to_string(dc_get_info(context))); } "maybenetwork" => { - dc_maybe_network(context); + maybe_network(context); } "housekeeping" => { sql::housekeeping(context); diff --git a/examples/repl/main.rs b/examples/repl/main.rs index 1c7015332..358ffeaf4 100644 --- a/examples/repl/main.rs +++ b/examples/repl/main.rs @@ -21,9 +21,9 @@ use deltachat::config; use deltachat::constants::*; use deltachat::context::*; use deltachat::dc_configure::*; -use deltachat::dc_job::*; use deltachat::dc_securejoin::*; use deltachat::dc_tools::*; +use deltachat::job::*; use deltachat::oauth2::*; use deltachat::types::*; use deltachat::x::*; @@ -172,13 +172,11 @@ fn start_threads(c: Arc>) { let ctx = c.clone(); let handle_imap = std::thread::spawn(move || loop { while_running!({ - unsafe { - dc_perform_imap_jobs(&ctx.read().unwrap()); - dc_perform_imap_fetch(&ctx.read().unwrap()); - } + perform_imap_jobs(&ctx.read().unwrap()); + perform_imap_fetch(&ctx.read().unwrap()); while_running!({ let context = ctx.read().unwrap(); - dc_perform_imap_idle(&context); + perform_imap_idle(&context); }); }); }); @@ -186,9 +184,9 @@ fn start_threads(c: Arc>) { let ctx = c.clone(); let handle_mvbox = std::thread::spawn(move || loop { while_running!({ - dc_perform_mvbox_fetch(&ctx.read().unwrap()); + perform_mvbox_fetch(&ctx.read().unwrap()); while_running!({ - dc_perform_mvbox_idle(&ctx.read().unwrap()); + perform_mvbox_idle(&ctx.read().unwrap()); }); }); }); @@ -196,9 +194,9 @@ fn start_threads(c: Arc>) { let ctx = c.clone(); let handle_sentbox = std::thread::spawn(move || loop { while_running!({ - dc_perform_sentbox_fetch(&ctx.read().unwrap()); + perform_sentbox_fetch(&ctx.read().unwrap()); while_running!({ - dc_perform_sentbox_idle(&ctx.read().unwrap()); + perform_sentbox_idle(&ctx.read().unwrap()); }); }); }); @@ -206,9 +204,9 @@ fn start_threads(c: Arc>) { let ctx = c; let handle_smtp = std::thread::spawn(move || loop { while_running!({ - unsafe { dc_perform_smtp_jobs(&ctx.read().unwrap()) }; + perform_smtp_jobs(&ctx.read().unwrap()); while_running!({ - unsafe { dc_perform_smtp_idle(&ctx.read().unwrap()) }; + perform_smtp_idle(&ctx.read().unwrap()); }); }); }); @@ -226,12 +224,10 @@ fn stop_threads(context: &Context) { println!("Stopping threads"); IS_RUNNING.store(false, Ordering::Relaxed); - unsafe { - dc_interrupt_imap_idle(context); - dc_interrupt_mvbox_idle(context); - dc_interrupt_sentbox_idle(context); - dc_interrupt_smtp_idle(context); - } + interrupt_imap_idle(context); + interrupt_mvbox_idle(context); + interrupt_sentbox_idle(context); + interrupt_smtp_idle(context); handle.handle_imap.take().unwrap().join().unwrap(); handle.handle_mvbox.take().unwrap().join().unwrap(); @@ -487,14 +483,14 @@ unsafe fn handle_cmd(line: &str, ctx: Arc>) -> Result { if HANDLE.clone().lock().unwrap().is_some() { println!("imap-jobs are already running in a thread."); } else { - dc_perform_imap_jobs(&ctx.read().unwrap()); + perform_imap_jobs(&ctx.read().unwrap()); } } "configure" => { diff --git a/examples/simple.rs b/examples/simple.rs index 9b39fbfdf..9db186533 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -12,9 +12,8 @@ use deltachat::constants::Event; use deltachat::contact::*; use deltachat::context::*; use deltachat::dc_configure::*; -use deltachat::dc_job::{ - dc_perform_imap_fetch, dc_perform_imap_idle, dc_perform_imap_jobs, dc_perform_smtp_idle, - dc_perform_smtp_jobs, +use deltachat::job::{ + perform_imap_fetch, perform_imap_idle, perform_imap_jobs, perform_smtp_idle, perform_smtp_jobs, }; extern "C" fn cb(_ctx: &Context, event: Event, data1: usize, data2: usize) -> usize { @@ -52,12 +51,12 @@ fn main() { let r1 = running.clone(); let t1 = thread::spawn(move || { while *r1.read().unwrap() { - dc_perform_imap_jobs(&ctx1); + perform_imap_jobs(&ctx1); if *r1.read().unwrap() { - dc_perform_imap_fetch(&ctx1); + perform_imap_fetch(&ctx1); if *r1.read().unwrap() { - dc_perform_imap_idle(&ctx1); + perform_imap_idle(&ctx1); } } } @@ -67,9 +66,9 @@ fn main() { let r1 = running.clone(); let t2 = thread::spawn(move || { while *r1.read().unwrap() { - dc_perform_smtp_jobs(&ctx1); + perform_smtp_jobs(&ctx1); if *r1.read().unwrap() { - dc_perform_smtp_idle(&ctx1); + perform_smtp_idle(&ctx1); } } }); @@ -123,8 +122,8 @@ fn main() { println!("stopping threads"); *running.clone().write().unwrap() = false; - deltachat::dc_job::dc_interrupt_imap_idle(&ctx); - deltachat::dc_job::dc_interrupt_smtp_idle(&ctx); + deltachat::job::interrupt_imap_idle(&ctx); + deltachat::job::interrupt_smtp_idle(&ctx); println!("joining"); t1.join().unwrap(); diff --git a/src/chat.rs b/src/chat.rs index c17d4bde0..56c180cb6 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -6,10 +6,10 @@ use crate::constants::*; use crate::contact::*; use crate::context::Context; use crate::dc_array::*; -use crate::dc_job::*; use crate::dc_msg::*; use crate::dc_tools::*; use crate::error::Error; +use crate::job::*; use crate::param::*; use crate::sql::{self, Sql}; use crate::stock::StockMessage; @@ -899,7 +899,7 @@ pub unsafe fn send_msg<'a>( } ensure!( - dc_job_send_msg(context, (*msg).id) != 0, + job_send_msg(context, (*msg).id) != 0, "Failed to initiate send job" ); @@ -1353,8 +1353,8 @@ pub fn delete(context: &Context, chat_id: u32) -> Result<(), Error> { context.call_cb(Event::MSGS_CHANGED, 0 as uintptr_t, 0 as uintptr_t); - dc_job_kill_action(context, 105); - unsafe { dc_job_add(context, 105, 0, Params::new(), 10) }; + job_kill_action(context, Action::Housekeeping); + job_add(context, Action::Housekeeping, 0, Params::new(), 10); Ok(()) } @@ -1941,7 +1941,7 @@ pub unsafe fn forward_msgs( new_msg_id = chat .prepare_msg_raw(context, msg, fresh10) .unwrap_or_default(); - dc_job_send_msg(context, new_msg_id); + job_send_msg(context, new_msg_id); } created_db_entries.push(chat_id); created_db_entries.push(new_msg_id); diff --git a/src/config.rs b/src/config.rs index df70b5076..0d23f1101 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,9 +3,9 @@ use strum_macros::{AsRefStr, Display, EnumIter, EnumProperty, EnumString}; use crate::constants::DC_VERSION_STR; use crate::context::Context; -use crate::dc_job::*; use crate::dc_tools::*; use crate::error::Error; +use crate::job::*; use crate::stock::StockMessage; /// The available configuration keys. @@ -102,17 +102,17 @@ impl Context { } Config::InboxWatch => { let ret = self.sql.set_config(self, key, value); - unsafe { dc_interrupt_imap_idle(self) }; + interrupt_imap_idle(self); ret } Config::SentboxWatch => { let ret = self.sql.set_config(self, key, value); - dc_interrupt_sentbox_idle(self); + interrupt_sentbox_idle(self); ret } Config::MvboxWatch => { let ret = self.sql.set_config(self, key, value); - dc_interrupt_mvbox_idle(self); + interrupt_mvbox_idle(self); ret } Config::Selfstatus => { diff --git a/src/context.rs b/src/context.rs index c85e015b9..c18e972b4 100644 --- a/src/context.rs +++ b/src/context.rs @@ -3,13 +3,13 @@ use std::sync::{Arc, Condvar, Mutex, RwLock}; use crate::chat::*; use crate::constants::*; use crate::contact::*; -use crate::dc_job::*; use crate::dc_loginparam::*; use crate::dc_move::*; use crate::dc_msg::*; use crate::dc_receive_imf::*; use crate::dc_tools::*; use crate::imap::*; +use crate::job::*; use crate::job_thread::JobThread; use crate::key::*; use crate::lot::Lot; @@ -230,7 +230,13 @@ unsafe fn cb_precheck_imf( } dc_do_heuristics_moves(context, server_folder, msg_id); if 0 != mark_seen { - dc_job_add(context, 130, msg_id as libc::c_int, Params::new(), 0); + job_add( + context, + Action::MarkseenMsgOnImap, + msg_id as libc::c_int, + Params::new(), + 0, + ); } } free(old_server_folder as *mut libc::c_void); diff --git a/src/dc_configure.rs b/src/dc_configure.rs index 2a94b4868..ef7860092 100644 --- a/src/dc_configure.rs +++ b/src/dc_configure.rs @@ -5,10 +5,10 @@ use quick_xml::events::{BytesEnd, BytesStart, BytesText}; use crate::constants::Event; use crate::context::Context; use crate::dc_e2ee::*; -use crate::dc_job::*; use crate::dc_loginparam::*; use crate::dc_tools::*; use crate::imap::*; +use crate::job::*; use crate::oauth2::*; use crate::param::Params; use crate::types::*; @@ -77,8 +77,8 @@ pub unsafe fn dc_configure(context: &Context) { ); return; } - dc_job_kill_action(context, 900); - dc_job_add(context, 900, 0, Params::new(), 0); + job_kill_action(context, Action::ConfigureImap); + job_add(context, Action::ConfigureImap, 0, Params::new(), 0); } unsafe fn dc_has_ongoing(context: &Context) -> libc::c_int { @@ -118,7 +118,7 @@ pub fn dc_stop_ongoing_process(context: &Context) { // the other dc_job_do_DC_JOB_*() functions are declared static in the c-file #[allow(non_snake_case, unused_must_use)] -pub unsafe fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context, _job: *mut dc_job_t) { +pub unsafe fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context, _job: &Job) { let flags: libc::c_int; let mut success = false; let mut imap_connected_here = false; diff --git a/src/dc_imex.rs b/src/dc_imex.rs index f15cf2ec9..55858e759 100644 --- a/src/dc_imex.rs +++ b/src/dc_imex.rs @@ -12,10 +12,10 @@ use crate::constants::*; use crate::context::Context; use crate::dc_configure::*; use crate::dc_e2ee::*; -use crate::dc_job::*; use crate::dc_msg::*; use crate::dc_tools::*; use crate::error::*; +use crate::job::*; use crate::key::*; use crate::param::*; use crate::pgp::*; @@ -44,8 +44,8 @@ pub unsafe fn dc_imex( param.set(Param::Arg2, as_str(param2)); } - dc_job_kill_action(context, 910); - dc_job_add(context, 910, 0, param, 0); + job_kill_action(context, Action::ImexImap); + job_add(context, Action::ImexImap, 0, param, 0); } /// Returns the filename of the backup if found, nullptr otherwise. @@ -506,7 +506,7 @@ pub unsafe fn dc_normalize_setup_code( } #[allow(non_snake_case)] -pub unsafe fn dc_job_do_DC_JOB_IMEX_IMAP(context: &Context, job: *mut dc_job_t) { +pub unsafe fn dc_job_do_DC_JOB_IMEX_IMAP(context: &Context, job: &Job) { let mut ok_to_continue = true; let mut success: libc::c_int = 0; let mut ongoing_allocated_here: libc::c_int = 0; @@ -514,10 +514,10 @@ pub unsafe fn dc_job_do_DC_JOB_IMEX_IMAP(context: &Context, job: *mut dc_job_t) if !(0 == dc_alloc_ongoing(context)) { ongoing_allocated_here = 1; - what = (*job).param.get_int(Param::Cmd).unwrap_or_default(); - let param1_s = (*job).param.get(Param::Arg).unwrap_or_default(); + what = job.param.get_int(Param::Cmd).unwrap_or_default(); + let param1_s = job.param.get(Param::Arg).unwrap_or_default(); let param1 = CString::yolo(param1_s); - let _param2 = CString::yolo((*job).param.get(Param::Arg2).unwrap_or_default()); + let _param2 = CString::yolo(job.param.get(Param::Arg2).unwrap_or_default()); if strlen(param1.as_ptr()) == 0 { error!(context, 0, "No Import/export dir/file given.",); diff --git a/src/dc_location.rs b/src/dc_location.rs index 86a23908e..eb3744762 100644 --- a/src/dc_location.rs +++ b/src/dc_location.rs @@ -5,9 +5,9 @@ use crate::chat; use crate::constants::Event; use crate::constants::*; use crate::context::*; -use crate::dc_job::*; use crate::dc_msg::*; use crate::dc_tools::*; +use crate::job::*; use crate::param::*; use crate::sql; use crate::stock::StockMessage; @@ -68,15 +68,11 @@ impl dc_kml_t { } // location streaming -pub unsafe fn dc_send_locations_to_chat( - context: &Context, - chat_id: uint32_t, - seconds: libc::c_int, -) { +pub unsafe fn dc_send_locations_to_chat(context: &Context, chat_id: uint32_t, seconds: i64) { let now = time(); let mut msg: *mut dc_msg_t = 0 as *mut dc_msg_t; let is_sending_locations_before: bool; - if !(seconds < 0i32 || chat_id <= 9i32 as libc::c_uint) { + if !(seconds < 0 || chat_id <= 9i32 as libc::c_uint) { is_sending_locations_before = dc_is_sending_locations_to_chat(context, chat_id); if sql::execute( context, @@ -87,11 +83,7 @@ pub unsafe fn dc_send_locations_to_chat( WHERE id=?", params![ if 0 != seconds { now } else { 0 }, - if 0 != seconds { - now + seconds as i64 - } else { - 0 - }, + if 0 != seconds { now + seconds } else { 0 }, chat_id as i32, ], ) @@ -115,12 +107,12 @@ pub unsafe fn dc_send_locations_to_chat( ); if 0 != seconds { schedule_MAYBE_SEND_LOCATIONS(context, 0i32); - dc_job_add( + job_add( context, - 5007i32, + Action::MaybeSendLocationsEnded, chat_id as libc::c_int, Params::new(), - seconds + 1i32, + seconds + 1, ); } } @@ -133,8 +125,8 @@ pub unsafe fn dc_send_locations_to_chat( ******************************************************************************/ #[allow(non_snake_case)] unsafe fn schedule_MAYBE_SEND_LOCATIONS(context: &Context, flags: libc::c_int) { - if 0 != flags & 0x1 || !dc_job_action_exists(context, 5005) { - dc_job_add(context, 5005, 0, Params::new(), 60); + if 0 != flags & 0x1 || !job_action_exists(context, Action::MaybeSendLocations) { + job_add(context, Action::MaybeSendLocations, 0, Params::new(), 60); }; } @@ -625,7 +617,7 @@ pub unsafe fn dc_kml_unref(kml: &mut dc_kml_t) { } #[allow(non_snake_case)] -pub unsafe fn dc_job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context: &Context, _job: *mut dc_job_t) { +pub unsafe fn dc_job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context: &Context, _job: &Job) { let now = time(); let mut continue_streaming: libc::c_int = 1; info!( @@ -707,7 +699,7 @@ pub unsafe fn dc_job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context: &Context, _job: *mu } #[allow(non_snake_case)] -pub unsafe fn dc_job_do_DC_JOB_MAYBE_SEND_LOC_ENDED(context: &Context, job: &mut dc_job_t) { +pub unsafe fn dc_job_do_DC_JOB_MAYBE_SEND_LOC_ENDED(context: &Context, job: &mut Job) { // this function is called when location-streaming _might_ have ended for a chat. // the function checks, if location-streaming is really ended; // if so, a device-message is added if not yet done. diff --git a/src/dc_move.rs b/src/dc_move.rs index 9c937db61..3767db86a 100644 --- a/src/dc_move.rs +++ b/src/dc_move.rs @@ -1,7 +1,7 @@ use crate::constants::*; use crate::context::*; -use crate::dc_job::*; use crate::dc_msg::*; +use crate::job::*; use crate::param::Params; pub unsafe fn dc_do_heuristics_moves(context: &Context, folder: &str, msg_id: u32) { @@ -32,7 +32,13 @@ pub unsafe fn dc_do_heuristics_moves(context: &Context, folder: &str, msg_id: u3 // 1 = dc message, 2 = reply to dc message if 0 != (*msg).is_dc_message { - dc_job_add(context, 200, (*msg).id as libc::c_int, Params::new(), 0); + job_add( + context, + Action::MoveMsg, + (*msg).id as libc::c_int, + Params::new(), + 0, + ); dc_update_msg_move_state(context, (*msg).rfc724_mid, MoveState::Moving); } diff --git a/src/dc_msg.rs b/src/dc_msg.rs index 4f374f92d..5bd739497 100644 --- a/src/dc_msg.rs +++ b/src/dc_msg.rs @@ -9,8 +9,8 @@ use crate::chat::{self, Chat}; use crate::constants::*; use crate::contact::*; use crate::context::*; -use crate::dc_job::*; use crate::dc_tools::*; +use crate::job::*; use crate::lot::{Lot, LotState, Meaning}; use crate::param::*; use crate::pgp::*; @@ -591,9 +591,9 @@ pub unsafe fn dc_delete_msgs(context: &Context, msg_ids: *const uint32_t, msg_cn let mut i: libc::c_int = 0i32; while i < msg_cnt { dc_update_msg_chat_id(context, *msg_ids.offset(i as isize), 3i32 as uint32_t); - dc_job_add( + job_add( context, - 110, + Action::DeleteMsgOnImap, *msg_ids.offset(i as isize) as libc::c_int, Params::new(), 0, @@ -603,8 +603,8 @@ pub unsafe fn dc_delete_msgs(context: &Context, msg_ids: *const uint32_t, msg_cn if 0 != msg_cnt { context.call_cb(Event::MSGS_CHANGED, 0 as uintptr_t, 0 as uintptr_t); - dc_job_kill_action(context, 105); - dc_job_add(context, 105, 0, Params::new(), 10); + job_kill_action(context, Action::Housekeeping); + job_add(context, Action::Housekeeping, 0, Params::new(), 10); }; } @@ -654,7 +654,13 @@ pub fn dc_markseen_msgs(context: &Context, msg_ids: *const u32, msg_cnt: usize) dc_update_msg_state(context, id, MessageState::InSeen); info!(context, 0, "Seen message #{}.", id); - unsafe { dc_job_add(context, 130, id as i32, Params::new(), 0) }; + job_add( + context, + Action::MarkseenMsgOnImap, + id as i32, + Params::new(), + 0, + ); send_event = true; } } else if curr_state == MessageState::InFresh { @@ -1150,16 +1156,17 @@ pub unsafe fn dc_msg_latefiling_mediasize( }; } -pub unsafe fn dc_msg_save_param_to_disk(msg: *mut dc_msg_t) -> bool { +pub fn dc_msg_save_param_to_disk(msg: *mut dc_msg_t) -> bool { if msg.is_null() { return false; } + let msg = unsafe { &*msg }; sql::execute( - (*msg).context, - &(*msg).context.sql, + msg.context, + &msg.context.sql, "UPDATE msgs SET param=? WHERE id=?;", - params![(*msg).param.to_string(), (*msg).id as i32], + params![msg.param.to_string(), msg.id as i32], ) .is_ok() } @@ -1235,16 +1242,16 @@ pub fn dc_update_msg_move_state( .is_ok() } -pub unsafe fn dc_set_msg_failed(context: &Context, msg_id: uint32_t, error: *const libc::c_char) { +pub unsafe fn dc_set_msg_failed(context: &Context, msg_id: u32, error: Option>) { let mut msg = dc_msg_new_untyped(context); if dc_msg_load_from_db(msg, context, msg_id) { if (*msg).state.can_fail() { (*msg).state = MessageState::OutFailed; } - if !error.is_null() { - (*msg).param.set(Param::Error, as_str(error)); - error!(context, 0, "{}", as_str(error),); + if let Some(error) = error { + (*msg).param.set(Param::Error, error.as_ref()); + error!(context, 0, "{}", error.as_ref()); } if sql::execute( diff --git a/src/dc_receive_imf.rs b/src/dc_receive_imf.rs index 95dc879bb..c81053001 100644 --- a/src/dc_receive_imf.rs +++ b/src/dc_receive_imf.rs @@ -14,7 +14,6 @@ use crate::chat::{self, Chat}; use crate::constants::*; use crate::contact::*; use crate::context::Context; -use crate::dc_job::*; use crate::dc_location::*; use crate::dc_mimeparser::*; use crate::dc_move::*; @@ -23,6 +22,7 @@ use crate::dc_securejoin::*; use crate::dc_strencode::*; use crate::dc_tools::*; use crate::error::Result; +use crate::job::*; use crate::param::*; use crate::peerstate::*; use crate::sql; @@ -232,9 +232,9 @@ pub unsafe fn dc_receive_imf( } if 0 != add_delete_job && !created_db_entries.is_empty() { - dc_job_add( + job_add( context, - DC_JOB_DELETE_MSG_ON_IMAP, + Action::DeleteMsgOnImap, created_db_entries[0].1 as i32, Params::new(), 0, @@ -920,7 +920,7 @@ unsafe fn handle_reports( { param.set_int(Param::AlsoMove, 1); } - dc_job_add(context, 120, 0, param, 0); + job_add(context, Action::MarkseenMdnOnImap, 0, param, 0); } } } diff --git a/src/dc_tools.rs b/src/dc_tools.rs index 5c2d680cb..b500fcc55 100644 --- a/src/dc_tools.rs +++ b/src/dc_tools.rs @@ -1284,6 +1284,14 @@ pub fn as_str<'a>(s: *const libc::c_char) -> &'a str { as_str_safe(s).unwrap_or_else(|err| panic!("{}", err)) } +/// Converts a C string to either a Rust `&str` or `None` if it is a null pointer. +pub fn as_opt_str<'a>(s: *const libc::c_char) -> Option<&'a str> { + if s.is_null() { + return None; + } + Some(as_str(s)) +} + fn as_str_safe<'a>(s: *const libc::c_char) -> Result<&'a str, Error> { assert!(!s.is_null(), "cannot be used on null pointers"); diff --git a/src/dc_job.rs b/src/job.rs similarity index 50% rename from src/dc_job.rs rename to src/job.rs index c1f77c9e9..7f52f7cba 100644 --- a/src/dc_job.rs +++ b/src/job.rs @@ -1,9 +1,8 @@ -use mmime::mmapstring::*; - use std::ffi::CStr; use std::ptr; use std::time::Duration; +use deltachat_derive::{FromSql, ToSql}; use rand::{thread_rng, Rng}; use crate::chat; @@ -22,781 +21,430 @@ use crate::sql; use crate::types::*; use crate::x::*; -const DC_IMAP_THREAD: libc::c_int = 100; -const DC_SMTP_THREAD: libc::c_int = 5000; +/// Thread IDs +#[derive(Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql)] +#[repr(i32)] +enum Thread { + Imap = 100, + Smtp = 5000, +} -// thread IDs -// jobs in the INBOX-thread, range from DC_IMAP_THREAD..DC_IMAP_THREAD+999 -// low priority ... -// ... high priority -// jobs in the SMTP-thread, range from DC_SMTP_THREAD..DC_SMTP_THREAD+999 -// low priority ... -// ... high priority -// timeouts until actions are aborted. -// this may also affects IDLE to return, so a re-connect may take this time. -// mailcore2 uses 30 seconds, k-9 uses 10 seconds -#[derive(Clone)] -#[repr(C)] -pub struct dc_job_t { - pub job_id: uint32_t, - pub action: libc::c_int, - pub foreign_id: uint32_t, +#[derive(Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql)] +#[repr(i32)] +pub enum Action { + // Jobs in the INBOX-thread, range from DC_IMAP_THREAD..DC_IMAP_THREAD+999 + Housekeeping = 105, // low priority ... + DeleteMsgOnImap = 110, + MarkseenMdnOnImap = 120, + MarkseenMsgOnImap = 130, + MoveMsg = 200, + ConfigureImap = 900, + ImexImap = 910, // ... high priority + + // Jobs in the SMTP-thread, range from DC_SMTP_THREAD..DC_SMTP_THREAD+999 + MaybeSendLocations = 5005, // low priority ... + MaybeSendLocationsEnded = 5007, + SendMdnOld = 5010, + SendMdn = 5011, + SendMsgToSmtpOld = 5900, + SendMsgToSmtp = 5901, // ... high priority +} + +impl From for Thread { + fn from(action: Action) -> Thread { + use Action::*; + + match action { + Housekeeping => Thread::Imap, + DeleteMsgOnImap => Thread::Imap, + MarkseenMdnOnImap => Thread::Imap, + MarkseenMsgOnImap => Thread::Imap, + MoveMsg => Thread::Imap, + ConfigureImap => Thread::Imap, + ImexImap => Thread::Imap, + + MaybeSendLocations => Thread::Smtp, + MaybeSendLocationsEnded => Thread::Smtp, + SendMdnOld => Thread::Smtp, + SendMdn => Thread::Smtp, + SendMsgToSmtpOld => Thread::Smtp, + SendMsgToSmtp => Thread::Smtp, + } + } +} + +#[derive(Debug, Clone)] +pub struct Job { + pub job_id: u32, + pub action: Action, + pub foreign_id: u32, pub desired_timestamp: i64, pub added_timestamp: i64, - pub tries: libc::c_int, + pub tries: i32, pub param: Params, - pub try_again: libc::c_int, - pub pending_error: *mut libc::c_char, + pub try_again: i32, + pub pending_error: Option, } -pub unsafe fn dc_perform_imap_jobs(context: &Context) { - info!(context, 0, "dc_perform_imap_jobs starting.",); - - let probe_imap_network = *context.probe_imap_network.clone().read().unwrap(); - *context.probe_imap_network.write().unwrap() = false; - *context.perform_inbox_jobs_needed.write().unwrap() = false; - - dc_job_perform(context, DC_IMAP_THREAD, probe_imap_network); - info!(context, 0, "dc_perform_imap_jobs ended.",); -} - -unsafe fn dc_job_perform(context: &Context, thread: libc::c_int, probe_network: bool) { - let query = if !probe_network { - // processing for first-try and after backoff-timeouts: - // process jobs in the order they were added. - "SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \ - FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, added_timestamp;" - } else { - // processing after call to dc_maybe_network(): - // process _all_ pending jobs that failed before - // in the order of their backoff-times. - "SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \ - FROM jobs WHERE thread=? AND tries>0 ORDER BY desired_timestamp, action DESC;" - }; - - let params_no_probe = params![thread as i64, time()]; - let params_probe = params![thread as i64]; - let params: &[&dyn rusqlite::ToSql] = if !probe_network { - params_no_probe - } else { - params_probe - }; - - let jobs: Result, _> = context.sql.query_map( - query, - params, - |row| { - let job = dc_job_t { - job_id: row.get(0)?, - action: row.get(1)?, - foreign_id: row.get(2)?, - desired_timestamp: row.get(5)?, - added_timestamp: row.get(4)?, - tries: row.get(6)?, - param: row.get::<_, String>(3)?.parse().unwrap_or_default(), - try_again: 0, - pending_error: 0 as *mut libc::c_char, - }; - - Ok(job) - }, - |jobs| { - jobs.collect::, _>>() - .map_err(Into::into) - }, - ); - match jobs { - Ok(ref _res) => {} - Err(ref err) => { - info!(context, 0, "query failed: {:?}", err); - } +impl Job { + fn delete(&self, context: &Context) -> bool { + context + .sql + .execute("DELETE FROM jobs WHERE id=?;", params![self.job_id as i32]) + .is_ok() } - for mut job in jobs.unwrap_or_default() { - info!( + + fn update(&self, context: &Context) -> bool { + sql::execute( context, - 0, - "{}-job #{}, action {} started...", - if thread == DC_IMAP_THREAD { - "INBOX" - } else { - "SMTP" - }, - job.job_id, - job.action, - ); - - // some configuration jobs are "exclusive": - // - they are always executed in the imap-thread and the smtp-thread is suspended during execution - // - they may change the database handle change the database handle; we do not keep old pointers therefore - // - they can be re-executed one time AT_ONCE, but they are not save in the database for later execution - if 900 == job.action || 910 == job.action { - dc_job_kill_action(context, job.action); - &context - .sentbox_thread - .clone() - .read() - .unwrap() - .suspend(context); - &context - .mvbox_thread - .clone() - .read() - .unwrap() - .suspend(context); - dc_suspend_smtp_thread(context, true); - } - - let mut tries = 0; - while tries <= 1 { - // this can be modified by a job using dc_job_try_again_later() - job.try_again = 0; - - match job.action { - 5901 => dc_job_do_DC_JOB_SEND(context, &mut job), - 110 => dc_job_do_DC_JOB_DELETE_MSG_ON_IMAP(context, &mut job), - 130 => dc_job_do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context, &mut job), - 120 => dc_job_do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context, &mut job), - 200 => dc_job_do_DC_JOB_MOVE_MSG(context, &mut job), - 5011 => dc_job_do_DC_JOB_SEND(context, &mut job), - 900 => dc_job_do_DC_JOB_CONFIGURE_IMAP(context, &mut job), - 910 => dc_job_do_DC_JOB_IMEX_IMAP(context, &mut job), - 5005 => dc_job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context, &mut job), - 5007 => dc_job_do_DC_JOB_MAYBE_SEND_LOC_ENDED(context, &mut job), - 105 => sql::housekeeping(context), - _ => {} - } - if job.try_again != -1 { - break; - } - tries += 1 - } - if 900 == job.action || 910 == job.action { - context - .sentbox_thread - .clone() - .read() - .unwrap() - .unsuspend(context); - context - .mvbox_thread - .clone() - .read() - .unwrap() - .unsuspend(context); - dc_suspend_smtp_thread(context, false); - break; - } else if job.try_again == 2 { - // just try over next loop unconditionally, the ui typically interrupts idle when the file (video) is ready - info!( - context, - 0, - "{}-job #{} not yet ready and will be delayed.", - if thread == DC_IMAP_THREAD { - "INBOX" - } else { - "SMTP" - }, - job.job_id - ); - } else if job.try_again == -1 || job.try_again == 3 { - let tries = job.tries + 1; - if tries < 17 { - job.tries = tries; - let time_offset = get_backoff_time_offset(tries); - job.desired_timestamp = job.added_timestamp + time_offset; - dc_job_update(context, &mut job); - info!( - context, - 0, - "{}-job #{} not succeeded on try #{}, retry in ADD_TIME+{} (in {} seconds).", - if thread == DC_IMAP_THREAD { - "INBOX" - } else { - "SMTP" - }, - job.job_id as libc::c_int, - tries, - time_offset, - job.added_timestamp + time_offset - time() - ); - if thread == DC_SMTP_THREAD && tries < 17 - 1 { - context - .smtp_state - .clone() - .0 - .lock() - .unwrap() - .perform_jobs_needed = 2; - } - } else { - if job.action == 5901 { - dc_set_msg_failed(context, job.foreign_id, job.pending_error); - } - dc_job_delete(context, &mut job); - } - if !probe_network { - continue; - } - // on dc_maybe_network() we stop trying here; - // these jobs are already tried once. - // otherwise, we just continue with the next job - // to give other jobs a chance being tried at least once. - break; - } else { - dc_job_delete(context, &mut job); - } - free(job.pending_error as *mut libc::c_void); - } -} - -fn dc_job_delete(context: &Context, job: &dc_job_t) -> bool { - context - .sql - .execute("DELETE FROM jobs WHERE id=?;", params![job.job_id as i32]) + &context.sql, + "UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;", + params![ + self.desired_timestamp, + self.tries as i64, + self.param.to_string(), + self.job_id as i32, + ], + ) .is_ok() -} - -/* ****************************************************************************** - * Tools - ******************************************************************************/ -#[allow(non_snake_case)] -fn get_backoff_time_offset(c_tries: libc::c_int) -> i64 { - // results in ~3 weeks for the last backoff timespan - let mut N = 2_i32.pow((c_tries - 1) as u32); - N = N * 60; - let mut rng = thread_rng(); - let n: i32 = rng.gen(); - let mut seconds = n % (N + 1); - if seconds < 1 { - seconds = 1; } - seconds as i64 -} -fn dc_job_update(context: &Context, job: &dc_job_t) -> bool { - sql::execute( - context, - &context.sql, - "UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;", - params![ - job.desired_timestamp, - job.tries as i64, - job.param.to_string(), - job.job_id as i32, - ], - ) - .is_ok() -} + #[allow(non_snake_case)] + fn do_DC_JOB_SEND(&mut self, context: &Context) { + let ok_to_continue; + let mut filename = ptr::null_mut(); + let mut buf = ptr::null_mut(); + let mut buf_bytes = 0; -unsafe fn dc_suspend_smtp_thread(context: &Context, suspend: bool) { - context.smtp_state.0.lock().unwrap().suspended = suspend; - if suspend { - loop { - if !context.smtp_state.0.lock().unwrap().doing_jobs { - return; + /* connect to SMTP server, if not yet done */ + if !context.smtp.lock().unwrap().is_connected() { + let loginparam = dc_loginparam_read(context, &context.sql, "configured_"); + let connected = context.smtp.lock().unwrap().connect(context, &loginparam); + + if !connected { + self.try_again_later(3i32, None); + ok_to_continue = false; + } else { + ok_to_continue = true; } - std::thread::sleep(std::time::Duration::from_micros(300 * 1000)); - } - } -} - -#[allow(non_snake_case)] -unsafe fn dc_job_do_DC_JOB_SEND(context: &Context, job: &mut dc_job_t) { - let ok_to_continue; - let mut filename: *mut libc::c_char = 0 as *mut libc::c_char; - let mut buf: *mut libc::c_void = ptr::null_mut(); - let mut buf_bytes: size_t = 0i32 as size_t; - - /* connect to SMTP server, if not yet done */ - if !context.smtp.lock().unwrap().is_connected() { - let loginparam = dc_loginparam_read(context, &context.sql, "configured_"); - let connected = context.smtp.lock().unwrap().connect(context, &loginparam); - - if !connected { - dc_job_try_again_later(job, 3i32, ptr::null()); - ok_to_continue = false; } else { ok_to_continue = true; } - } else { - ok_to_continue = true; - } - if ok_to_continue { - let filename_s = job.param.get(Param::File).unwrap_or_default(); - filename = filename_s.strdup(); - if strlen(filename) == 0 { - warn!(context, 0, "Missing file name for job {}", job.job_id,); - } else if !(0 == dc_read_file(context, filename, &mut buf, &mut buf_bytes)) { - let recipients = job.param.get(Param::Recipients); - if recipients.is_none() { - warn!(context, 0, "Missing recipients for job {}", job.job_id,); - } else { - let recipients_list = recipients - .unwrap() - .split("\x1e") - .filter_map(|addr| match lettre::EmailAddress::new(addr.to_string()) { - Ok(addr) => Some(addr), - Err(err) => { - eprintln!("WARNING: invalid recipient: {} {:?}", addr, err); - None + if ok_to_continue { + let filename_s = self.param.get(Param::File).unwrap_or_default(); + filename = unsafe { filename_s.strdup() }; + if unsafe { strlen(filename) } == 0 { + warn!(context, 0, "Missing file name for job {}", self.job_id,); + } else if 0 != unsafe { dc_read_file(context, filename, &mut buf, &mut buf_bytes) } { + let recipients = self.param.get(Param::Recipients); + if recipients.is_none() { + warn!(context, 0, "Missing recipients for job {}", self.job_id,); + } else { + let recipients_list = recipients + .unwrap() + .split("\x1e") + .filter_map(|addr| match lettre::EmailAddress::new(addr.to_string()) { + Ok(addr) => Some(addr), + Err(err) => { + eprintln!("WARNING: invalid recipient: {} {:?}", addr, err); + None + } + }) + .collect::>(); + /* if there is a msg-id and it does not exist in the db, cancel sending. + this happends if dc_delete_msgs() was called + before the generated mime was sent out */ + let ok_to_continue1; + if 0 != self.foreign_id { + if 0 == unsafe { dc_msg_exists(context, self.foreign_id) } { + warn!( + context, + 0, + "Message {} for job {} does not exist", + self.foreign_id, + self.job_id, + ); + ok_to_continue1 = false; + } else { + ok_to_continue1 = true; } - }) - .collect::>(); - /* if there is a msg-id and it does not exist in the db, cancel sending. - this happends if dc_delete_msgs() was called - before the generated mime was sent out */ - let ok_to_continue1; - if 0 != job.foreign_id { - if 0 == dc_msg_exists(context, job.foreign_id) { - warn!( - context, - 0, "Message {} for job {} does not exist", job.foreign_id, job.job_id, - ); + } else { + ok_to_continue1 = true; + } + if ok_to_continue1 { + /* send message */ + let body = unsafe { + std::slice::from_raw_parts(buf as *const u8, buf_bytes).to_vec() + }; + + // hold the smtp lock during sending of a job and + // its ok/error response processing. Note that if a message + // was sent we need to mark it in the database as we + // otherwise might send it twice. + let mut sock = context.smtp.lock().unwrap(); + if 0 == sock.send(context, recipients_list, body) { + sock.disconnect(); + self.try_again_later(-1i32, Some(as_str(sock.error))); + } else { + dc_delete_file(context, filename_s); + if 0 != self.foreign_id { + dc_update_msg_state( + context, + self.foreign_id, + MessageState::OutDelivered, + ); + let chat_id: i32 = context + .sql + .query_row_col( + context, + "SELECT chat_id FROM msgs WHERE id=?", + params![self.foreign_id as i32], + 0, + ) + .unwrap_or_default(); + context.call_cb( + Event::MSG_DELIVERED, + chat_id as uintptr_t, + self.foreign_id as uintptr_t, + ); + } + } + } + } + } + } + unsafe { free(buf) }; + unsafe { free(filename.cast()) }; + } + + // this value does not increase the number of tries + fn try_again_later(&mut self, try_again: libc::c_int, pending_error: Option<&str>) { + self.try_again = try_again; + self.pending_error = pending_error.map(|s| s.to_string()); + } + + #[allow(non_snake_case)] + fn do_DC_JOB_MOVE_MSG(&mut self, context: &Context) { + let ok_to_continue; + let msg = unsafe { dc_msg_new_untyped(context) }; + let mut dest_uid = 0; + + let inbox = context.inbox.read().unwrap(); + + if !inbox.is_connected() { + connect_to_inbox(context, &inbox); + if !inbox.is_connected() { + self.try_again_later(3, None); + ok_to_continue = false; + } else { + ok_to_continue = true; + } + } else { + ok_to_continue = true; + } + if ok_to_continue { + if dc_msg_load_from_db(msg, context, self.foreign_id) { + if context + .sql + .get_config_int(context, "folders_configured") + .unwrap_or_default() + < 3 + { + inbox.configure_folders(context, 0x1i32); + } + let dest_folder = context.sql.get_config(context, "configured_mvbox_folder"); + + let msg = unsafe { &mut *msg }; + + if let Some(dest_folder) = dest_folder { + let server_folder = msg.server_folder.as_ref().unwrap(); + + match inbox.mv( + context, + server_folder, + msg.server_uid, + &dest_folder, + &mut dest_uid, + ) as libc::c_uint + { + 1 => { + self.try_again_later(3i32, None); + } + 3 => { + dc_update_server_uid(context, msg.rfc724_mid, &dest_folder, dest_uid); + } + 0 | 2 | _ => {} + } + } + } + } + + unsafe { dc_msg_unref(msg) }; + } + + #[allow(non_snake_case)] + fn do_DC_JOB_DELETE_MSG_ON_IMAP(&mut self, context: &Context) { + let mut delete_from_server = 1; + let msg = unsafe { dc_msg_new_untyped(context) }; + let inbox = context.inbox.read().unwrap(); + + if !(!dc_msg_load_from_db(msg, context, self.foreign_id) + || unsafe { (*msg).rfc724_mid.is_null() } + || unsafe { *(*msg).rfc724_mid.offset(0isize) as libc::c_int == 0 }) + { + let ok_to_continue1; + /* eg. device messages have no Message-ID */ + if dc_rfc724_mid_cnt(context, unsafe { (*msg).rfc724_mid }) != 1 { + info!( + context, + 0, "The message is deleted from the server when all parts are deleted.", + ); + delete_from_server = 0i32 + } + /* if this is the last existing part of the message, we delete the message from the server */ + if 0 != delete_from_server { + let ok_to_continue; + if !inbox.is_connected() { + connect_to_inbox(context, &inbox); + if !inbox.is_connected() { + self.try_again_later(3i32, None); + ok_to_continue = false; + } else { + ok_to_continue = true; + } + } else { + ok_to_continue = true; + } + if ok_to_continue { + let mid = unsafe { CStr::from_ptr((*msg).rfc724_mid).to_str().unwrap() }; + let server_folder = unsafe { (*msg).server_folder.as_ref().unwrap() }; + if 0 == inbox.delete_msg(context, mid, server_folder, unsafe { + &mut (*msg).server_uid + }) { + self.try_again_later(-1i32, None); ok_to_continue1 = false; } else { ok_to_continue1 = true; } } else { - ok_to_continue1 = true; + ok_to_continue1 = false; } - if ok_to_continue1 { - /* send message */ - let body = std::slice::from_raw_parts(buf as *const u8, buf_bytes).to_vec(); - - // hold the smtp lock during sending of a job and - // its ok/error response processing. Note that if a message - // was sent we need to mark it in the database as we - // otherwise might send it twice. - let mut sock = context.smtp.lock().unwrap(); - if 0 == sock.send(context, recipients_list, body) { - sock.disconnect(); - dc_job_try_again_later(job, -1i32, sock.error); - } else { - dc_delete_file(context, filename_s); - if 0 != job.foreign_id { - dc_update_msg_state( - context, - job.foreign_id, - MessageState::OutDelivered, - ); - let chat_id: i32 = context - .sql - .query_row_col( - context, - "SELECT chat_id FROM msgs WHERE id=?", - params![job.foreign_id as i32], - 0, - ) - .unwrap_or_default(); - context.call_cb( - Event::MSG_DELIVERED, - chat_id as uintptr_t, - job.foreign_id as uintptr_t, - ); - } - } - } - } - } - } - free(buf); - free(filename as *mut libc::c_void); -} - -// this value does not increase the number of tries -unsafe fn dc_job_try_again_later( - job: &mut dc_job_t, - try_again: libc::c_int, - pending_error: *const libc::c_char, -) { - job.try_again = try_again; - free(job.pending_error as *mut libc::c_void); - job.pending_error = dc_strdup_keep_null(pending_error); -} - -#[allow(non_snake_case)] -unsafe fn dc_job_do_DC_JOB_MOVE_MSG(context: &Context, job: &mut dc_job_t) { - let ok_to_continue; - let msg = dc_msg_new_untyped(context); - let mut dest_uid: uint32_t = 0i32 as uint32_t; - - let inbox = context.inbox.read().unwrap(); - - if !inbox.is_connected() { - connect_to_inbox(context, &inbox); - if !inbox.is_connected() { - dc_job_try_again_later(job, 3i32, ptr::null()); - ok_to_continue = false; - } else { - ok_to_continue = true; - } - } else { - ok_to_continue = true; - } - if ok_to_continue { - if dc_msg_load_from_db(msg, context, job.foreign_id) { - if context - .sql - .get_config_int(context, "folders_configured") - .unwrap_or_default() - < 3 - { - inbox.configure_folders(context, 0x1i32); - } - let dest_folder = context.sql.get_config(context, "configured_mvbox_folder"); - - if let Some(dest_folder) = dest_folder { - let server_folder = (*msg).server_folder.as_ref().unwrap(); - - match inbox.mv( - context, - server_folder, - (*msg).server_uid, - &dest_folder, - &mut dest_uid, - ) as libc::c_uint - { - 1 => { - dc_job_try_again_later(job, 3i32, ptr::null()); - } - 3 => { - dc_update_server_uid(context, (*msg).rfc724_mid, &dest_folder, dest_uid); - } - 0 | 2 | _ => {} - } - } - } - } - - dc_msg_unref(msg); -} - -/* ****************************************************************************** - * IMAP-jobs - ******************************************************************************/ -fn connect_to_inbox(context: &Context, inbox: &Imap) -> libc::c_int { - let ret_connected = dc_connect_to_configured_imap(context, inbox); - if 0 != ret_connected { - inbox.set_watch_folder("INBOX".into()); - } - ret_connected -} - -#[allow(non_snake_case)] -unsafe fn dc_job_do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context: &Context, job: &mut dc_job_t) { - let ok_to_continue; - let folder = job - .param - .get(Param::ServerFolder) - .unwrap_or_default() - .to_string(); - let uid = job.param.get_int(Param::ServerUid).unwrap_or_default() as u32; - let mut dest_uid = 0; - let inbox = context.inbox.read().unwrap(); - - if !inbox.is_connected() { - connect_to_inbox(context, &inbox); - if !inbox.is_connected() { - dc_job_try_again_later(job, 3, ptr::null()); - ok_to_continue = false; - } else { - ok_to_continue = true; - } - } else { - ok_to_continue = true; - } - if ok_to_continue { - if inbox.set_seen(context, &folder, uid) == 0 { - dc_job_try_again_later(job, 3i32, ptr::null()); - } - if 0 != job.param.get_int(Param::AlsoMove).unwrap_or_default() { - if context - .sql - .get_config_int(context, "folders_configured") - .unwrap_or_default() - < 3 - { - inbox.configure_folders(context, 0x1i32); - } - let dest_folder = context.sql.get_config(context, "configured_mvbox_folder"); - if let Some(dest_folder) = dest_folder { - if 1 == inbox.mv(context, folder, uid, dest_folder, &mut dest_uid) as libc::c_uint { - dc_job_try_again_later(job, 3, 0 as *const libc::c_char); - } - } - } - } -} - -#[allow(non_snake_case)] -unsafe fn dc_job_do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context: &Context, job: &mut dc_job_t) { - let ok_to_continue; - let msg: *mut dc_msg_t = dc_msg_new_untyped(context); - let inbox = context.inbox.read().unwrap(); - - if !inbox.is_connected() { - connect_to_inbox(context, &inbox); - if !inbox.is_connected() { - dc_job_try_again_later(job, 3i32, ptr::null()); - ok_to_continue = false; - } else { - ok_to_continue = true; - } - } else { - ok_to_continue = true; - } - if ok_to_continue { - if dc_msg_load_from_db(msg, context, job.foreign_id) { - let server_folder = (*msg).server_folder.as_ref().unwrap(); - match inbox.set_seen(context, server_folder, (*msg).server_uid) as libc::c_uint { - 0 => {} - 1 => { - dc_job_try_again_later(job, 3i32, ptr::null()); - } - _ => { - if 0 != (*msg).param.get_int(Param::WantsMdn).unwrap_or_default() - && 0 != context - .sql - .get_config_int(context, "mdns_enabled") - .unwrap_or_else(|| 1) - { - let folder = (*msg).server_folder.as_ref().unwrap(); - - match inbox.set_mdnsent(context, folder, (*msg).server_uid) as libc::c_uint - { - 1 => { - dc_job_try_again_later(job, 3i32, 0 as *const libc::c_char); - } - 3 => { - dc_send_mdn(context, (*msg).id); - } - 0 | 2 | _ => {} - } - } - } - } - } - } - dc_msg_unref(msg); -} -unsafe fn dc_send_mdn(context: &Context, msg_id: uint32_t) { - let mut mimefactory = dc_mimefactory_t { - from_addr: ptr::null_mut(), - from_displayname: ptr::null_mut(), - selfstatus: ptr::null_mut(), - recipients_names: ptr::null_mut(), - recipients_addr: ptr::null_mut(), - timestamp: 0, - rfc724_mid: ptr::null_mut(), - loaded: DC_MF_NOTHING_LOADED, - msg: ptr::null_mut(), - chat: None, - increation: 0, - in_reply_to: ptr::null_mut(), - references: ptr::null_mut(), - req_mdn: 0, - out: ptr::null_mut(), - out_encrypted: 0, - out_gossiped: 0, - out_last_added_location_id: 0, - error: ptr::null_mut(), - context, - }; - - if !(0 == dc_mimefactory_load_mdn(&mut mimefactory, msg_id) - || 0 == dc_mimefactory_render(&mut mimefactory)) - { - dc_add_smtp_job(context, 5011i32, &mut mimefactory); - } - dc_mimefactory_empty(&mut mimefactory); -} -/* ****************************************************************************** - * SMTP-jobs - ******************************************************************************/ -/* * - * Store the MIME message in a file and send it later with a new SMTP job. - * - * @param context The context object as created by dc_context_new() - * @param action One of the DC_JOB_SEND_ constants - * @param mimefactory An instance of dc_mimefactory_t with a loaded and rendered message or MDN - * @return 1=success, 0=error - */ -#[allow(non_snake_case)] -unsafe fn dc_add_smtp_job( - context: &Context, - action: libc::c_int, - mimefactory: *mut dc_mimefactory_t, -) -> libc::c_int { - let pathNfilename: *mut libc::c_char; - let mut success: libc::c_int = 0i32; - let mut recipients: *mut libc::c_char = 0 as *mut libc::c_char; - let mut param = Params::new(); - pathNfilename = dc_get_fine_pathNfilename( - context, - b"$BLOBDIR\x00" as *const u8 as *const libc::c_char, - (*mimefactory).rfc724_mid, - ); - if pathNfilename.is_null() { - error!( - context, - 0, - "Could not find free file name for message with ID <{}>.", - to_string((*mimefactory).rfc724_mid), - ); - } else if 0 - == dc_write_file( - context, - pathNfilename, - (*(*mimefactory).out).str_0 as *const libc::c_void, - (*(*mimefactory).out).len, - ) - { - error!( - context, - 0, - "Could not write message <{}> to \"{}\".", - to_string((*mimefactory).rfc724_mid), - as_str(pathNfilename), - ); - } else { - recipients = dc_str_from_clist( - (*mimefactory).recipients_addr, - b"\x1e\x00" as *const u8 as *const libc::c_char, - ); - param.set(Param::File, as_str(pathNfilename)); - param.set(Param::Recipients, as_str(recipients)); - dc_job_add( - context, - action, - (if (*mimefactory).loaded as libc::c_uint - == DC_MF_MSG_LOADED as libc::c_int as libc::c_uint - { - (*(*mimefactory).msg).id } else { - 0 - }) as libc::c_int, - param, - 0, - ); - success = 1i32 - } - free(recipients as *mut libc::c_void); - free(pathNfilename as *mut libc::c_void); - success -} - -pub unsafe fn dc_job_add( - context: &Context, - action: libc::c_int, - foreign_id: libc::c_int, - param: Params, - delay_seconds: libc::c_int, -) { - let timestamp = time(); - let thread = if action >= DC_IMAP_THREAD && action < DC_IMAP_THREAD + 1000 { - DC_IMAP_THREAD - } else if action >= DC_SMTP_THREAD && action < DC_SMTP_THREAD + 1000 { - DC_SMTP_THREAD - } else { - return; - }; - - sql::execute( - context, - &context.sql, - "INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);", - params![ - timestamp, - thread, - action, - foreign_id, - param.to_string(), - (timestamp + delay_seconds as i64) - ] - ).ok(); - - if thread == DC_IMAP_THREAD { - dc_interrupt_imap_idle(context); - } else { - dc_interrupt_smtp_idle(context); - } -} - -pub unsafe fn dc_interrupt_smtp_idle(context: &Context) { - info!(context, 0, "Interrupting SMTP-idle...",); - - let &(ref lock, ref cvar) = &*context.smtp_state.clone(); - let mut state = lock.lock().unwrap(); - - state.perform_jobs_needed = 1; - state.idle = true; - cvar.notify_one(); -} - -pub unsafe fn dc_interrupt_imap_idle(context: &Context) { - info!(context, 0, "Interrupting IMAP-IDLE...",); - - *context.perform_inbox_jobs_needed.write().unwrap() = true; - context.inbox.read().unwrap().interrupt_idle(); -} - -#[allow(non_snake_case)] -unsafe fn dc_job_do_DC_JOB_DELETE_MSG_ON_IMAP(context: &Context, job: &mut dc_job_t) { - let mut delete_from_server: libc::c_int = 1i32; - let msg: *mut dc_msg_t = dc_msg_new_untyped(context); - let inbox = context.inbox.read().unwrap(); - - if !(!dc_msg_load_from_db(msg, context, job.foreign_id) - || (*msg).rfc724_mid.is_null() - || *(*msg).rfc724_mid.offset(0isize) as libc::c_int == 0i32) - { - let ok_to_continue1; - /* eg. device messages have no Message-ID */ - if dc_rfc724_mid_cnt(context, (*msg).rfc724_mid) != 1i32 { - info!( - context, - 0, "The message is deleted from the server when all parts are deleted.", - ); - delete_from_server = 0i32 + ok_to_continue1 = true; + } + if ok_to_continue1 { + unsafe { dc_delete_msg_from_db(context, (*msg).id) }; + } } - /* if this is the last existing part of the message, we delete the message from the server */ - if 0 != delete_from_server { - let ok_to_continue; + unsafe { dc_msg_unref(msg) } + } + + #[allow(non_snake_case)] + fn do_DC_JOB_MARKSEEN_MSG_ON_IMAP(&mut self, context: &Context) { + let ok_to_continue; + let msg = unsafe { dc_msg_new_untyped(context) }; + let inbox = context.inbox.read().unwrap(); + + if !inbox.is_connected() { + connect_to_inbox(context, &inbox); if !inbox.is_connected() { - connect_to_inbox(context, &inbox); - if !inbox.is_connected() { - dc_job_try_again_later(job, 3i32, 0 as *const libc::c_char); - ok_to_continue = false; - } else { - ok_to_continue = true; - } + self.try_again_later(3i32, None); + ok_to_continue = false; } else { ok_to_continue = true; } - if ok_to_continue { - let mid = CStr::from_ptr((*msg).rfc724_mid).to_str().unwrap(); - let server_folder = (*msg).server_folder.as_ref().unwrap(); - if 0 == inbox.delete_msg(context, mid, server_folder, &mut (*msg).server_uid) { - dc_job_try_again_later(job, -1i32, 0 as *const libc::c_char); - ok_to_continue1 = false; - } else { - ok_to_continue1 = true; + } else { + ok_to_continue = true; + } + if ok_to_continue { + if dc_msg_load_from_db(msg, context, self.foreign_id) { + let server_folder = unsafe { (*msg).server_folder.as_ref().unwrap() }; + match inbox.set_seen(context, server_folder, unsafe { (*msg).server_uid }) + as libc::c_uint + { + 0 => {} + 1 => { + self.try_again_later(3i32, None); + } + _ => { + if 0 != unsafe { (*msg).param.get_int(Param::WantsMdn).unwrap_or_default() } + && 0 != context + .sql + .get_config_int(context, "mdns_enabled") + .unwrap_or_else(|| 1) + { + let folder = unsafe { (*msg).server_folder.as_ref().unwrap() }; + + match inbox.set_mdnsent(context, folder, unsafe { (*msg).server_uid }) + as libc::c_uint + { + 1 => { + self.try_again_later(3i32, None); + } + 3 => { + send_mdn(context, unsafe { (*msg).id }); + } + 0 | 2 | _ => {} + } + } + } } + } + } + unsafe { dc_msg_unref(msg) }; + } + + #[allow(non_snake_case)] + fn do_DC_JOB_MARKSEEN_MDN_ON_IMAP(&mut self, context: &Context) { + let ok_to_continue; + let folder = self + .param + .get(Param::ServerFolder) + .unwrap_or_default() + .to_string(); + let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32; + let mut dest_uid = 0; + let inbox = context.inbox.read().unwrap(); + + if !inbox.is_connected() { + connect_to_inbox(context, &inbox); + if !inbox.is_connected() { + self.try_again_later(3, None); + ok_to_continue = false; } else { - ok_to_continue1 = false; + ok_to_continue = true; } } else { - ok_to_continue1 = true; + ok_to_continue = true; } - if ok_to_continue1 { - dc_delete_msg_from_db(context, (*msg).id); + if ok_to_continue { + if inbox.set_seen(context, &folder, uid) == 0 { + self.try_again_later(3i32, None); + } + if 0 != self.param.get_int(Param::AlsoMove).unwrap_or_default() { + if context + .sql + .get_config_int(context, "folders_configured") + .unwrap_or_default() + < 3 + { + inbox.configure_folders(context, 0x1i32); + } + let dest_folder = context.sql.get_config(context, "configured_mvbox_folder"); + if let Some(dest_folder) = dest_folder { + if 1 == inbox.mv(context, folder, uid, dest_folder, &mut dest_uid) + as libc::c_uint + { + self.try_again_later(3, None); + } + } + } } } - dc_msg_unref(msg); } /* delete all pending jobs with the given action */ -pub fn dc_job_kill_action(context: &Context, action: libc::c_int) -> bool { +pub fn job_kill_action(context: &Context, action: Action) -> bool { sql::execute( context, &context.sql, @@ -806,7 +454,7 @@ pub fn dc_job_kill_action(context: &Context, action: libc::c_int) -> bool { .is_ok() } -pub fn dc_perform_imap_fetch(context: &Context) { +pub fn perform_imap_fetch(context: &Context) { let inbox = context.inbox.read().unwrap(); let start = std::time::Instant::now(); @@ -836,7 +484,7 @@ pub fn dc_perform_imap_fetch(context: &Context) { ); } -pub fn dc_perform_imap_idle(context: &Context) { +pub fn perform_imap_idle(context: &Context) { let inbox = context.inbox.read().unwrap(); connect_to_inbox(context, &inbox); @@ -853,7 +501,7 @@ pub fn dc_perform_imap_idle(context: &Context) { info!(context, 0, "INBOX-IDLE ended."); } -pub fn dc_perform_mvbox_fetch(context: &Context) { +pub fn perform_mvbox_fetch(context: &Context) { let use_network = context .sql .get_config_int(context, "mvbox_watch") @@ -866,7 +514,7 @@ pub fn dc_perform_mvbox_fetch(context: &Context) { .fetch(context, use_network == 1); } -pub fn dc_perform_mvbox_idle(context: &Context) { +pub fn perform_mvbox_idle(context: &Context) { let use_network = context .sql .get_config_int(context, "mvbox_watch") @@ -879,11 +527,11 @@ pub fn dc_perform_mvbox_idle(context: &Context) { .idle(context, use_network == 1); } -pub fn dc_interrupt_mvbox_idle(context: &Context) { +pub fn interrupt_mvbox_idle(context: &Context) { context.mvbox_thread.read().unwrap().interrupt_idle(context); } -pub fn dc_perform_sentbox_fetch(context: &Context) { +pub fn perform_sentbox_fetch(context: &Context) { let use_network = context .sql .get_config_int(context, "sentbox_watch") @@ -896,7 +544,7 @@ pub fn dc_perform_sentbox_fetch(context: &Context) { .fetch(context, use_network == 1); } -pub fn dc_perform_sentbox_idle(context: &Context) { +pub fn perform_sentbox_idle(context: &Context) { let use_network = context .sql .get_config_int(context, "sentbox_watch") @@ -909,7 +557,7 @@ pub fn dc_perform_sentbox_idle(context: &Context) { .idle(context, use_network == 1); } -pub fn dc_interrupt_sentbox_idle(context: &Context) { +pub fn interrupt_sentbox_idle(context: &Context) { context .sentbox_thread .read() @@ -917,7 +565,7 @@ pub fn dc_interrupt_sentbox_idle(context: &Context) { .interrupt_idle(context); } -pub unsafe fn dc_perform_smtp_jobs(context: &Context) { +pub fn perform_smtp_jobs(context: &Context) { let probe_smtp_network = { let &(ref lock, _) = &*context.smtp_state.clone(); let mut state = lock.lock().unwrap(); @@ -935,7 +583,7 @@ pub unsafe fn dc_perform_smtp_jobs(context: &Context) { }; info!(context, 0, "SMTP-jobs started...",); - dc_job_perform(context, DC_SMTP_THREAD, probe_smtp_network); + job_perform(context, Thread::Smtp, probe_smtp_network); info!(context, 0, "SMTP-jobs ended."); { @@ -946,7 +594,7 @@ pub unsafe fn dc_perform_smtp_jobs(context: &Context) { } } -pub unsafe fn dc_perform_smtp_idle(context: &Context) { +pub fn perform_smtp_idle(context: &Context) { info!(context, 0, "SMTP-idle started...",); { let &(ref lock, ref cvar) = &*context.smtp_state.clone(); @@ -958,7 +606,7 @@ pub unsafe fn dc_perform_smtp_idle(context: &Context) { 0, "SMTP-idle will not be started because of waiting jobs.", ); } else { - let dur = get_next_wakeup_time(context, DC_SMTP_THREAD); + let dur = get_next_wakeup_time(context, Thread::Smtp); loop { let res = cvar.wait_timeout(state, dur).unwrap(); @@ -976,7 +624,7 @@ pub unsafe fn dc_perform_smtp_idle(context: &Context) { info!(context, 0, "SMTP-idle ended.",); } -unsafe fn get_next_wakeup_time(context: &Context, thread: libc::c_int) -> Duration { +fn get_next_wakeup_time(context: &Context, thread: Thread) -> Duration { let t: i64 = context .sql .query_row_col( @@ -1000,7 +648,7 @@ unsafe fn get_next_wakeup_time(context: &Context, thread: libc::c_int) -> Durati wakeup_time } -pub unsafe fn dc_maybe_network(context: &Context) { +pub fn maybe_network(context: &Context) { { let &(ref lock, _) = &*context.smtp_state.clone(); let mut state = lock.lock().unwrap(); @@ -1009,13 +657,13 @@ pub unsafe fn dc_maybe_network(context: &Context) { *context.probe_imap_network.write().unwrap() = true; } - dc_interrupt_smtp_idle(context); - dc_interrupt_imap_idle(context); - dc_interrupt_mvbox_idle(context); - dc_interrupt_sentbox_idle(context); + interrupt_smtp_idle(context); + interrupt_imap_idle(context); + interrupt_mvbox_idle(context); + interrupt_sentbox_idle(context); } -pub fn dc_job_action_exists(context: &Context, action: libc::c_int) -> bool { +pub fn job_action_exists(context: &Context, action: Action) -> bool { context .sql .exists("SELECT id FROM jobs WHERE action=?;", params![action]) @@ -1024,28 +672,28 @@ pub fn dc_job_action_exists(context: &Context, action: libc::c_int) -> bool { /* special case for DC_JOB_SEND_MSG_TO_SMTP */ #[allow(non_snake_case)] -pub unsafe fn dc_job_send_msg(context: &Context, msg_id: uint32_t) -> libc::c_int { +pub unsafe fn job_send_msg(context: &Context, msg_id: uint32_t) -> libc::c_int { let mut success = 0; let mut mimefactory = dc_mimefactory_t { - from_addr: 0 as *mut libc::c_char, - from_displayname: 0 as *mut libc::c_char, - selfstatus: 0 as *mut libc::c_char, - recipients_names: 0 as *mut clist, - recipients_addr: 0 as *mut clist, + from_addr: ptr::null_mut(), + from_displayname: ptr::null_mut(), + selfstatus: ptr::null_mut(), + recipients_names: ptr::null_mut(), + recipients_addr: ptr::null_mut(), timestamp: 0, - rfc724_mid: 0 as *mut libc::c_char, + rfc724_mid: ptr::null_mut(), loaded: DC_MF_NOTHING_LOADED, - msg: 0 as *mut dc_msg_t, + msg: ptr::null_mut(), chat: None, increation: 0, - in_reply_to: 0 as *mut libc::c_char, - references: 0 as *mut libc::c_char, + in_reply_to: ptr::null_mut(), + references: ptr::null_mut(), req_mdn: 0, - out: 0 as *mut MMAPString, + out: ptr::null_mut(), out_encrypted: 0, out_gossiped: 0, out_last_added_location_id: 0, - error: 0 as *mut libc::c_char, + error: ptr::null_mut(), context, }; @@ -1080,7 +728,7 @@ pub unsafe fn dc_job_send_msg(context: &Context, msg_id: uint32_t) -> libc::c_in } /* create message */ if 0 == dc_mimefactory_render(&mut mimefactory) { - dc_set_msg_failed(context, msg_id, mimefactory.error); + dc_set_msg_failed(context, msg_id, as_opt_str(mimefactory.error)); } else if 0 != (*mimefactory.msg) .param @@ -1098,8 +746,7 @@ pub unsafe fn dc_job_send_msg(context: &Context, msg_id: uint32_t) -> libc::c_in dc_set_msg_failed( context, msg_id, - b"End-to-end-encryption unavailable unexpectedly.\x00" as *const u8 - as *const libc::c_char, + Some("End-to-end-encryption unavailable unexpectedly."), ); } else { /* unrecoverable */ @@ -1140,10 +787,395 @@ pub unsafe fn dc_job_send_msg(context: &Context, msg_id: uint32_t) -> libc::c_in (*mimefactory.msg).param.set_int(Param::GuranteeE2ee, 1); dc_msg_save_param_to_disk(mimefactory.msg); } - success = dc_add_smtp_job(context, 5901i32, &mut mimefactory); + success = add_smtp_job(context, Action::SendMsgToSmtp, &mut mimefactory); } } dc_mimefactory_empty(&mut mimefactory); success } + +pub fn perform_imap_jobs(context: &Context) { + info!(context, 0, "dc_perform_imap_jobs starting.",); + + let probe_imap_network = *context.probe_imap_network.clone().read().unwrap(); + *context.probe_imap_network.write().unwrap() = false; + *context.perform_inbox_jobs_needed.write().unwrap() = false; + + job_perform(context, Thread::Imap, probe_imap_network); + info!(context, 0, "dc_perform_imap_jobs ended.",); +} + +fn job_perform(context: &Context, thread: Thread, probe_network: bool) { + let query = if !probe_network { + // processing for first-try and after backoff-timeouts: + // process jobs in the order they were added. + "SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \ + FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, added_timestamp;" + } else { + // processing after call to dc_maybe_network(): + // process _all_ pending jobs that failed before + // in the order of their backoff-times. + "SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \ + FROM jobs WHERE thread=? AND tries>0 ORDER BY desired_timestamp, action DESC;" + }; + + let params_no_probe = params![thread as i64, time()]; + let params_probe = params![thread as i64]; + let params: &[&dyn rusqlite::ToSql] = if !probe_network { + params_no_probe + } else { + params_probe + }; + + let jobs: Result, _> = context.sql.query_map( + query, + params, + |row| { + let job = Job { + job_id: row.get(0)?, + action: row.get(1)?, + foreign_id: row.get(2)?, + desired_timestamp: row.get(5)?, + added_timestamp: row.get(4)?, + tries: row.get(6)?, + param: row.get::<_, String>(3)?.parse().unwrap_or_default(), + try_again: 0, + pending_error: None, + }; + + Ok(job) + }, + |jobs| jobs.collect::, _>>().map_err(Into::into), + ); + match jobs { + Ok(ref _res) => {} + Err(ref err) => { + info!(context, 0, "query failed: {:?}", err); + } + } + + for mut job in jobs.unwrap_or_default() { + info!( + context, + 0, + "{}-job #{}, action {} started...", + if thread == Thread::Imap { + "INBOX" + } else { + "SMTP" + }, + job.job_id, + job.action, + ); + + // some configuration jobs are "exclusive": + // - they are always executed in the imap-thread and the smtp-thread is suspended during execution + // - they may change the database handle change the database handle; we do not keep old pointers therefore + // - they can be re-executed one time AT_ONCE, but they are not save in the database for later execution + if Action::ConfigureImap == job.action || Action::ImexImap == job.action { + job_kill_action(context, job.action); + &context + .sentbox_thread + .clone() + .read() + .unwrap() + .suspend(context); + &context + .mvbox_thread + .clone() + .read() + .unwrap() + .suspend(context); + suspend_smtp_thread(context, true); + } + + let mut tries = 0; + while tries <= 1 { + // this can be modified by a job using dc_job_try_again_later() + job.try_again = 0; + + match job.action { + Action::SendMsgToSmtp => job.do_DC_JOB_SEND(context), + Action::DeleteMsgOnImap => job.do_DC_JOB_DELETE_MSG_ON_IMAP(context), + Action::MarkseenMsgOnImap => job.do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context), + Action::MarkseenMdnOnImap => job.do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context), + Action::MoveMsg => job.do_DC_JOB_MOVE_MSG(context), + Action::SendMdn => job.do_DC_JOB_SEND(context), + Action::ConfigureImap => unsafe { dc_job_do_DC_JOB_CONFIGURE_IMAP(context, &job) }, + Action::ImexImap => unsafe { dc_job_do_DC_JOB_IMEX_IMAP(context, &job) }, + Action::MaybeSendLocations => unsafe { + dc_job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context, &job) + }, + Action::MaybeSendLocationsEnded => unsafe { + dc_job_do_DC_JOB_MAYBE_SEND_LOC_ENDED(context, &mut job) + }, + Action::Housekeeping => sql::housekeeping(context), + Action::SendMdnOld => {} + Action::SendMsgToSmtpOld => {} + } + if job.try_again != -1 { + break; + } + tries += 1 + } + if Action::ConfigureImap == job.action || Action::ImexImap == job.action { + context + .sentbox_thread + .clone() + .read() + .unwrap() + .unsuspend(context); + context + .mvbox_thread + .clone() + .read() + .unwrap() + .unsuspend(context); + suspend_smtp_thread(context, false); + break; + } else if job.try_again == 2 { + // just try over next loop unconditionally, the ui typically interrupts idle when the file (video) is ready + info!( + context, + 0, + "{}-job #{} not yet ready and will be delayed.", + if thread == Thread::Imap { + "INBOX" + } else { + "SMTP" + }, + job.job_id + ); + } else if job.try_again == -1 || job.try_again == 3 { + let tries = job.tries + 1; + if tries < 17 { + job.tries = tries; + let time_offset = get_backoff_time_offset(tries); + job.desired_timestamp = job.added_timestamp + time_offset; + job.update(context); + info!( + context, + 0, + "{}-job #{} not succeeded on try #{}, retry in ADD_TIME+{} (in {} seconds).", + if thread == Thread::Imap { + "INBOX" + } else { + "SMTP" + }, + job.job_id as u32, + tries, + time_offset, + job.added_timestamp + time_offset - time() + ); + if thread == Thread::Smtp && tries < 17 - 1 { + context + .smtp_state + .clone() + .0 + .lock() + .unwrap() + .perform_jobs_needed = 2; + } + } else { + if job.action == Action::SendMsgToSmtp { + unsafe { + dc_set_msg_failed(context, job.foreign_id, job.pending_error.as_ref()) + }; + } + job.delete(context); + } + if !probe_network { + continue; + } + // on dc_maybe_network() we stop trying here; + // these jobs are already tried once. + // otherwise, we just continue with the next job + // to give other jobs a chance being tried at least once. + break; + } else { + job.delete(context); + } + } +} + +#[allow(non_snake_case)] +fn get_backoff_time_offset(c_tries: libc::c_int) -> i64 { + // results in ~3 weeks for the last backoff timespan + let mut N = 2_i32.pow((c_tries - 1) as u32); + N = N * 60; + let mut rng = thread_rng(); + let n: i32 = rng.gen(); + let mut seconds = n % (N + 1); + if seconds < 1 { + seconds = 1; + } + seconds as i64 +} + +fn suspend_smtp_thread(context: &Context, suspend: bool) { + context.smtp_state.0.lock().unwrap().suspended = suspend; + if suspend { + loop { + if !context.smtp_state.0.lock().unwrap().doing_jobs { + return; + } + std::thread::sleep(std::time::Duration::from_micros(300 * 1000)); + } + } +} + +fn connect_to_inbox(context: &Context, inbox: &Imap) -> libc::c_int { + let ret_connected = dc_connect_to_configured_imap(context, inbox); + if 0 != ret_connected { + inbox.set_watch_folder("INBOX".into()); + } + ret_connected +} + +fn send_mdn(context: &Context, msg_id: uint32_t) { + let mut mimefactory = dc_mimefactory_t { + from_addr: ptr::null_mut(), + from_displayname: ptr::null_mut(), + selfstatus: ptr::null_mut(), + recipients_names: ptr::null_mut(), + recipients_addr: ptr::null_mut(), + timestamp: 0, + rfc724_mid: ptr::null_mut(), + loaded: DC_MF_NOTHING_LOADED, + msg: ptr::null_mut(), + chat: None, + increation: 0, + in_reply_to: ptr::null_mut(), + references: ptr::null_mut(), + req_mdn: 0, + out: ptr::null_mut(), + out_encrypted: 0, + out_gossiped: 0, + out_last_added_location_id: 0, + error: ptr::null_mut(), + context, + }; + + if !(0 == unsafe { dc_mimefactory_load_mdn(&mut mimefactory, msg_id) } + || 0 == unsafe { dc_mimefactory_render(&mut mimefactory) }) + { + add_smtp_job(context, Action::SendMdn, &mut mimefactory); + } +} + +#[allow(non_snake_case)] +fn add_smtp_job(context: &Context, action: Action, mimefactory: &dc_mimefactory_t) -> libc::c_int { + let pathNfilename: *mut libc::c_char; + let mut success: libc::c_int = 0i32; + let mut recipients: *mut libc::c_char = 0 as *mut libc::c_char; + let mut param = Params::new(); + pathNfilename = unsafe { + dc_get_fine_pathNfilename( + context, + b"$BLOBDIR\x00" as *const u8 as *const libc::c_char, + mimefactory.rfc724_mid, + ) + }; + if pathNfilename.is_null() { + error!( + context, + 0, + "Could not find free file name for message with ID <{}>.", + to_string(mimefactory.rfc724_mid), + ); + } else if 0 + == unsafe { + dc_write_file( + context, + pathNfilename, + (*mimefactory.out).str_0 as *const libc::c_void, + (*mimefactory.out).len, + ) + } + { + error!( + context, + 0, + "Could not write message <{}> to \"{}\".", + to_string(mimefactory.rfc724_mid), + as_str(pathNfilename), + ); + } else { + recipients = unsafe { + dc_str_from_clist( + mimefactory.recipients_addr, + b"\x1e\x00" as *const u8 as *const libc::c_char, + ) + }; + param.set(Param::File, as_str(pathNfilename)); + param.set(Param::Recipients, as_str(recipients)); + job_add( + context, + action, + (if mimefactory.loaded as libc::c_uint + == DC_MF_MSG_LOADED as libc::c_int as libc::c_uint + { + unsafe { (*mimefactory.msg).id } + } else { + 0 + }) as libc::c_int, + param, + 0, + ); + success = 1; + } + unsafe { + free(recipients.cast()); + free(pathNfilename.cast()); + } + success +} + +pub fn job_add( + context: &Context, + action: Action, + foreign_id: libc::c_int, + param: Params, + delay_seconds: i64, +) { + let timestamp = time(); + let thread: Thread = action.into(); + + sql::execute( + context, + &context.sql, + "INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);", + params![ + timestamp, + thread, + action, + foreign_id, + param.to_string(), + (timestamp + delay_seconds as i64) + ] + ).ok(); + + match thread { + Thread::Imap => interrupt_imap_idle(context), + Thread::Smtp => interrupt_smtp_idle(context), + } +} + +pub fn interrupt_smtp_idle(context: &Context) { + info!(context, 0, "Interrupting SMTP-idle...",); + + let &(ref lock, ref cvar) = &*context.smtp_state.clone(); + let mut state = lock.lock().unwrap(); + + state.perform_jobs_needed = 1; + state.idle = true; + cvar.notify_one(); +} + +pub fn interrupt_imap_idle(context: &Context) { + info!(context, 0, "Interrupting IMAP-IDLE...",); + + *context.perform_inbox_jobs_needed.write().unwrap() = true; + context.inbox.read().unwrap().interrupt_idle(); +} diff --git a/src/lib.rs b/src/lib.rs index 500cb86c5..bdebfb851 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,6 +30,7 @@ pub mod constants; pub mod contact; pub mod context; mod imap; +pub mod job; mod job_thread; pub mod key; pub mod keyring; @@ -50,7 +51,6 @@ pub mod dc_configure; mod dc_dehtml; mod dc_e2ee; pub mod dc_imex; -pub mod dc_job; pub mod dc_location; mod dc_loginparam; mod dc_mimefactory;