Compare commits

...

4 Commits

Author SHA1 Message Date
holger krekel
177009a5aa implement timeout handling 2019-12-15 00:19:09 +01:00
holger krekel
4a5e99c48e make idle wait only as long as the next job wants to run 2019-12-15 00:19:09 +01:00
holger krekel
7c3fc251ff avoid inbox jobs_needed flag 2019-12-15 00:19:09 +01:00
holger krekel
b3495695d0 attempt to sort adding jobs 2019-12-15 00:19:09 +01:00
17 changed files with 127 additions and 110 deletions

View File

@@ -509,7 +509,7 @@ pub unsafe extern "C" fn dc_interrupt_imap_idle(context: *mut dc_context_t) {
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::interrupt_inbox_idle(ctx, true))
.with_inner(|ctx| job::interrupt_inbox_idle(ctx))
.unwrap_or(())
}

View File

@@ -491,7 +491,7 @@ pub fn dc_cmdline(context: &Context, line: &str) -> Result<(), failure::Error> {
println!("{:#?}", context.get_info());
}
"interrupt" => {
interrupt_inbox_idle(context, true);
interrupt_inbox_idle(context);
}
"maybenetwork" => {
maybe_network(context);

View File

@@ -202,7 +202,7 @@ fn stop_threads(context: &Context) {
println!("Stopping threads");
IS_RUNNING.store(false, Ordering::Relaxed);
interrupt_inbox_idle(context, true);
interrupt_inbox_idle(context);
interrupt_mvbox_idle(context);
interrupt_sentbox_idle(context);
interrupt_smtp_idle(context);

View File

@@ -104,7 +104,7 @@ fn main() {
println!("stopping threads");
*running.write().unwrap() = false;
deltachat::job::interrupt_inbox_idle(&ctx, true);
deltachat::job::interrupt_inbox_idle(&ctx);
deltachat::job::interrupt_smtp_idle(&ctx);
println!("joining");

View File

@@ -1352,7 +1352,7 @@ pub fn delete(context: &Context, chat_id: u32) -> Result<(), Error> {
});
job_kill_action(context, Action::Housekeeping);
job_add(context, Action::Housekeeping, 0, Params::new(), 10);
add_job_with_interrupt(context, Action::Housekeeping, 0, Params::new(), 10);
Ok(())
}

View File

@@ -143,7 +143,7 @@ impl Context {
}
Config::InboxWatch => {
let ret = self.sql.set_raw_config(self, key, value);
interrupt_inbox_idle(self, true);
interrupt_inbox_idle(self);
ret
}
Config::SentboxWatch => {

View File

@@ -38,7 +38,7 @@ pub fn configure(context: &Context) {
return;
}
job_kill_action(context, Action::ConfigureImap);
job_add(context, Action::ConfigureImap, 0, Params::new(), 0);
add_job_with_interrupt(context, Action::ConfigureImap, 0, Params::new(), 0);
}
/// Check if the context is already configured.

View File

@@ -14,13 +14,11 @@ use crate::contact::*;
use crate::error::*;
use crate::events::Event;
use crate::imap::*;
use crate::job::*;
use crate::job_thread::JobThread;
use crate::key::*;
use crate::login_param::LoginParam;
use crate::lot::Lot;
use crate::message::{self, Message, MsgId};
use crate::param::Params;
use crate::message::{self, MsgId};
use crate::smtp::Smtp;
use crate::sql::Sql;
@@ -46,7 +44,6 @@ pub struct Context {
/// Blob directory path
blobdir: PathBuf,
pub sql: Sql,
pub perform_inbox_jobs_needed: Arc<RwLock<bool>>,
pub probe_imap_network: Arc<RwLock<bool>>,
pub inbox_thread: Arc<RwLock<JobThread>>,
pub sentbox_thread: Arc<RwLock<JobThread>>,
@@ -149,7 +146,6 @@ impl Context {
Imap::new(),
))),
probe_imap_network: Arc::new(RwLock::new(false)),
perform_inbox_jobs_needed: Arc::new(RwLock::new(false)),
generating_key_mutex: Mutex::new(()),
translated_stockstrings: RwLock::new(HashMap::new()),
};
@@ -436,34 +432,6 @@ impl Context {
false
}
}
pub fn do_heuristics_moves(&self, folder: &str, msg_id: MsgId) {
if !self.get_config_bool(Config::MvboxMove) {
return;
}
if self.is_mvbox(folder) {
return;
}
if let Ok(msg) = Message::load_from_db(self, msg_id) {
if msg.is_setupmessage() {
// do not move setup messages;
// there may be a non-delta device that wants to handle it
return;
}
// 1 = dc message, 2 = reply to dc message
if 0 != msg.is_dc_message {
job_add(
self,
Action::MoveMsg,
msg.id.to_u32() as i32,
Params::new(),
0,
);
}
}
}
}
impl Drop for Context {

View File

@@ -250,7 +250,7 @@ pub fn dc_receive_imf(
// if we delete we don't need to try moving messages
if needs_delete_job && !created_db_entries.is_empty() {
job_add(
add_job_no_interrupt(
context,
Action::DeleteMsgOnImap,
created_db_entries[0].1.to_u32() as i32,
@@ -258,7 +258,7 @@ pub fn dc_receive_imf(
0,
);
} else {
context.do_heuristics_moves(server_folder.as_ref(), insert_msg_id);
crate::imap::do_heuristics_moves(context, server_folder.as_ref(), insert_msg_id);
}
info!(

View File

@@ -3,6 +3,7 @@ use super::Imap;
use async_imap::extensions::idle::IdleResponse;
use async_std::prelude::*;
use async_std::task;
use core::cmp::min;
use std::sync::atomic::Ordering;
use std::time::{Duration, SystemTime};
@@ -45,7 +46,12 @@ impl Imap {
task::block_on(async move { self.config.read().await.can_idle })
}
pub fn idle(&self, context: &Context, watch_folder: Option<String>) -> Result<()> {
pub fn idle(
&self,
context: &Context,
watch_folder: Option<String>,
until: SystemTime,
) -> Result<()> {
task::block_on(async move {
if !self.can_idle() {
return Err(Error::IdleAbilityMissing);
@@ -57,8 +63,18 @@ impl Imap {
self.select_folder(context, watch_folder.clone()).await?;
let timeout = match until.duration_since(SystemTime::now()) {
Ok(timeout) => timeout,
Err(_) => {
info!(context, "idle called with negative timeout");
return Ok(());
}
};
let timeout = min(timeout, Duration::from_secs(23 * 60));
info!(context, "idle-timeout is {:?}", timeout);
let session = self.session.lock().await.take();
let timeout = Duration::from_secs(23 * 60);
if let Some(session) = session {
match session.idle() {
// BEWARE: If you change the Secure branch you
@@ -67,7 +83,6 @@ impl Imap {
if let Err(err) = handle.init().await {
return Err(Error::IdleProtocolFailed(err));
}
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
*self.interrupt.lock().await = Some(interrupt);
@@ -178,7 +193,12 @@ impl Imap {
})
}
pub(crate) fn fake_idle(&self, context: &Context, watch_folder: Option<String>) {
pub(crate) fn fake_idle(
&self,
context: &Context,
watch_folder: Option<String>,
until: SystemTime,
) {
// Idle using polling. This is also needed if we're not yet configured -
// in this case, we're waiting for a configure job (and an interrupt).
task::block_on(async move {
@@ -213,6 +233,11 @@ impl Imap {
break;
}
info!(context, "fake_idle is connected");
if SystemTime::now() > until {
info!(context, "fake_idle stopping as jobs need running");
break;
}
// we are connected, let's see if fetching messages results
// in anything. If so, we behave as if IDLE had data but
// will have already fetched the messages so perform_*_fetch

View File

@@ -17,9 +17,9 @@ use crate::context::Context;
use crate::dc_receive_imf::dc_receive_imf;
use crate::events::Event;
use crate::imap_client::*;
use crate::job::{job_add, Action};
use crate::job::{add_job_no_interrupt, Action};
use crate::login_param::{CertificateChecks, LoginParam};
use crate::message::{self, update_server_uid};
use crate::message::{self, update_server_uid, Message, MsgId};
use crate::oauth2::dc_get_oauth2_access_token;
use crate::param::Params;
use crate::stock::StockMessage;
@@ -1202,8 +1202,8 @@ fn precheck_imf(context: &Context, rfc724_mid: &str, server_folder: &str, server
{
if old_server_folder.is_empty() && old_server_uid == 0 {
info!(context, "[move] detected bbc-self {}", rfc724_mid,);
context.do_heuristics_moves(server_folder.as_ref(), msg_id);
job_add(
do_heuristics_moves(context, server_folder, msg_id);
add_job_no_interrupt(
context,
Action::MarkseenMsgOnImap,
msg_id.to_u32() as i32,
@@ -1237,3 +1237,32 @@ fn prefetch_get_message_id(prefetch_msg: &Fetch) -> Result<String> {
wrapmime::parse_message_id(&message_id.unwrap()).map_err(Into::into)
}
pub fn do_heuristics_moves(context: &Context, folder: &str, msg_id: MsgId) {
if !context.get_config_bool(crate::config::Config::MvboxMove) {
return;
}
if context.is_mvbox(folder) {
return;
}
if let Ok(msg) = Message::load_from_db(context, msg_id) {
if msg.is_setupmessage() {
// do not move setup messages;
// there may be a non-delta device that wants to handle it
return;
}
// 1 = dc message, 2 = reply to dc message
if 0 != msg.is_dc_message {
// we are called from receive_imf so there is no idle running
add_job_no_interrupt(
context,
Action::MoveMsg,
msg.id.to_u32() as i32,
Params::new(),
0,
);
}
}
}

View File

@@ -74,7 +74,7 @@ pub fn imex(context: &Context, what: ImexMode, param1: Option<impl AsRef<Path>>)
}
job_kill_action(context, Action::ImexImap);
job_add(context, Action::ImexImap, 0, param, 0);
add_job_with_interrupt(context, Action::ImexImap, 0, param, 0);
}
/// Returns the filename of the backup found (otherwise an error)

View File

@@ -35,7 +35,7 @@ const JOB_RETRIES: u32 = 17;
/// Thread IDs
#[derive(Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql)]
#[repr(i32)]
enum Thread {
pub enum Thread {
Unknown = 0,
Imap = 100,
Smtp = 5000,
@@ -134,7 +134,7 @@ impl Job {
/// Updates the job already stored in the database.
///
/// To add a new job, use [job_add].
/// To add a new job, use [add_job_*].
fn update(&self, context: &Context) -> bool {
sql::execute(
context,
@@ -436,13 +436,6 @@ pub fn perform_sentbox_fetch(context: &Context) {
}
pub fn perform_inbox_idle(context: &Context) {
if *context.perform_inbox_jobs_needed.clone().read().unwrap() {
info!(
context,
"INBOX-IDLE will not be started because of waiting jobs."
);
return;
}
let use_network = context.get_config_bool(Config::InboxWatch);
context
@@ -472,21 +465,8 @@ pub fn perform_sentbox_idle(context: &Context) {
.idle(context, use_network);
}
pub fn interrupt_inbox_idle(context: &Context, block: bool) {
info!(context, "interrupt_inbox_idle called blocking={}", block);
if block {
context.inbox_thread.read().unwrap().interrupt_idle(context);
} else {
match context.inbox_thread.try_read() {
Ok(inbox_thread) => {
inbox_thread.interrupt_idle(context);
}
Err(err) => {
*context.perform_inbox_jobs_needed.write().unwrap() = true;
warn!(context, "could not interrupt idle: {}", err);
}
}
}
pub fn interrupt_inbox_idle(context: &Context) {
context.inbox_thread.read().unwrap().interrupt_idle(context);
}
pub fn interrupt_mvbox_idle(context: &Context) {
@@ -563,7 +543,7 @@ pub fn perform_smtp_idle(context: &Context) {
info!(context, "SMTP-idle ended.",);
}
fn get_next_wakeup_time(context: &Context, thread: Thread) -> Duration {
pub(crate) fn get_next_wakeup_time(context: &Context, thread: Thread) -> Duration {
let t: i64 = context
.sql
.query_get_value(
@@ -573,17 +553,16 @@ fn get_next_wakeup_time(context: &Context, thread: Thread) -> Duration {
)
.unwrap_or_default();
let mut wakeup_time = Duration::new(10 * 60, 0);
let now = time();
if t > 0 {
let now = time();
if t > now {
wakeup_time = Duration::new((t - now) as u64, 0);
Duration::new((t - now) as u64, 0)
} else {
wakeup_time = Duration::new(0, 0);
Duration::new(0, 0)
}
} else {
Duration::new(30 * 60, 0)
}
wakeup_time
}
pub fn maybe_network(context: &Context) {
@@ -596,7 +575,7 @@ pub fn maybe_network(context: &Context) {
}
interrupt_smtp_idle(context);
interrupt_inbox_idle(context, true);
interrupt_inbox_idle(context);
interrupt_mvbox_idle(context);
interrupt_sentbox_idle(context);
}
@@ -715,7 +694,6 @@ pub fn perform_inbox_jobs(context: &Context) {
let probe_imap_network = *context.probe_imap_network.clone().read().unwrap();
*context.probe_imap_network.write().unwrap() = false;
*context.perform_inbox_jobs_needed.write().unwrap() = false;
job_perform(context, Thread::Imap, probe_imap_network);
info!(context, "dc_perform_inbox_jobs ended.",);
@@ -918,7 +896,7 @@ fn add_smtp_job(
param.set(Param::File, blob.as_name());
param.set(Param::Recipients, &recipients);
job_add(
add_job_with_interrupt(
context,
action,
rendered_msg
@@ -934,7 +912,7 @@ fn add_smtp_job(
/// Adds a job to the database, scheduling it `delay_seconds`
/// after the current time.
pub fn job_add(
pub fn add_job_no_interrupt(
context: &Context,
action: Action,
foreign_id: i32,
@@ -942,7 +920,7 @@ pub fn job_add(
delay_seconds: i64,
) {
if action == Action::Unknown {
error!(context, "Invalid action passed to job_add");
error!(context, "Invalid action passed to add_job_no_interrupt");
return;
}
@@ -962,9 +940,20 @@ pub fn job_add(
(timestamp + delay_seconds as i64)
]
).ok();
}
pub fn add_job_with_interrupt(
context: &Context,
action: Action,
foreign_id: i32,
param: Params,
delay_seconds: i64,
) {
let thread: Thread = action.into();
add_job_no_interrupt(context, action, foreign_id, param, delay_seconds);
match thread {
Thread::Imap => interrupt_inbox_idle(context, false),
Thread::Imap => interrupt_inbox_idle(context),
Thread::Smtp => interrupt_smtp_idle(context),
Thread::Unknown => {}
}

View File

@@ -3,6 +3,8 @@ use std::sync::{Arc, Condvar, Mutex};
use crate::context::Context;
use crate::error::{Error, Result};
use crate::imap::Imap;
use crate::job::{get_next_wakeup_time, Thread};
use std::time::{Duration, SystemTime};
#[derive(Debug)]
pub struct JobThread {
@@ -57,10 +59,6 @@ impl JobThread {
}
pub fn interrupt_idle(&self, context: &Context) {
{
self.state.0.lock().unwrap().jobs_needed = true;
}
info!(context, "Interrupting {}-IDLE...", self.name);
self.imap.interrupt_idle(context);
@@ -136,18 +134,23 @@ impl JobThread {
}
pub fn idle(&self, context: &Context, use_network: bool) {
// standard idle wait timeout
let mut timeout = Duration::from_secs(23 * 60);
{
let &(ref lock, ref cvar) = &*self.state.clone();
let mut state = lock.lock().unwrap();
if state.jobs_needed {
info!(
context,
"{}-IDLE will not be started as it was interrupted while not ideling.",
self.name,
);
state.jobs_needed = false;
return;
// if we are in the inbox (job) thread we only want to wait
// until the next job is due.
if self.folder_config_name == "configured_inbox_folder" {
timeout = get_next_wakeup_time(context, Thread::Imap);
if timeout <= Duration::from_millis(20) {
info!(
context,
"INBOX-IDLE will not be started because of waiting jobs."
);
return;
}
}
if state.suspended {
@@ -171,6 +174,8 @@ impl JobThread {
}
}
let until = SystemTime::now() + timeout;
let prefix = format!("{}-IDLE", self.name);
let do_fake_idle = match self.imap.connect_configured(context) {
Ok(()) => {
@@ -179,7 +184,7 @@ impl JobThread {
} else {
let watch_folder = self.get_watch_folder(context);
info!(context, "{} started...", prefix);
let res = self.imap.idle(context, watch_folder);
let res = self.imap.idle(context, watch_folder, until);
info!(context, "{} ended...", prefix);
if let Err(err) = res {
warn!(context, "{} failed: {} -> reconnecting", prefix, err);
@@ -199,7 +204,7 @@ impl JobThread {
};
if do_fake_idle {
let watch_folder = self.get_watch_folder(context);
self.imap.fake_idle(context, watch_folder);
self.imap.fake_idle(context, watch_folder, until);
}
self.state.0.lock().unwrap().using_handle = false;

View File

@@ -226,7 +226,7 @@ pub fn send_locations_to_chat(context: &Context, chat_id: u32, seconds: i64) {
context.call_cb(Event::ChatModified(chat_id));
if 0 != seconds {
schedule_MAYBE_SEND_LOCATIONS(context, false);
job_add(
add_job_with_interrupt(
context,
Action::MaybeSendLocationsEnded,
chat_id as i32,
@@ -241,7 +241,8 @@ pub fn send_locations_to_chat(context: &Context, chat_id: u32, seconds: i64) {
#[allow(non_snake_case)]
fn schedule_MAYBE_SEND_LOCATIONS(context: &Context, force_schedule: bool) {
if force_schedule || !job_action_exists(context, Action::MaybeSendLocations) {
job_add(context, Action::MaybeSendLocations, 0, Params::new(), 60);
// XXX questionable to interrupt but set a +60secs target
add_job_with_interrupt(context, Action::MaybeSendLocations, 0, Params::new(), 60);
};
}

View File

@@ -875,7 +875,7 @@ pub fn delete_msgs(context: &Context, msg_ids: &[MsgId]) {
}
}
update_msg_chat_id(context, *msg_id, DC_CHAT_ID_TRASH);
job_add(
add_job_with_interrupt(
context,
Action::DeleteMsgOnImap,
msg_id.to_u32() as i32,
@@ -890,7 +890,7 @@ pub fn delete_msgs(context: &Context, msg_ids: &[MsgId]) {
msg_id: MsgId::new(0),
});
job_kill_action(context, Action::Housekeeping);
job_add(context, Action::Housekeeping, 0, Params::new(), 10);
add_job_with_interrupt(context, Action::Housekeeping, 0, Params::new(), 10);
};
}
@@ -961,7 +961,7 @@ pub fn markseen_msgs(context: &Context, msg_ids: &[MsgId]) -> bool {
update_msg_state(context, *id, MessageState::InSeen);
info!(context, "Seen message {}.", id);
job_add(
add_job_with_interrupt(
context,
Action::MarkseenMsgOnImap,
id.to_u32() as i32,
@@ -1319,7 +1319,7 @@ pub fn update_server_uid(
#[allow(dead_code)]
pub fn dc_empty_server(context: &Context, flags: u32) {
job_kill_action(context, Action::EmptyServer);
job_add(context, Action::EmptyServer, flags as i32, Params::new(), 0);
add_job_with_interrupt(context, Action::EmptyServer, flags as i32, Params::new(), 0);
}
#[cfg(test)]

View File

@@ -15,7 +15,7 @@ use crate::dc_tools::*;
use crate::e2ee;
use crate::error::Result;
use crate::headerdef::HeaderDef;
use crate::job::{job_add, Action};
use crate::job::{add_job_no_interrupt, Action};
use crate::location;
use crate::message;
use crate::message::MsgId;
@@ -782,7 +782,7 @@ impl<'a> MimeParser<'a> {
if self.has_chat_version() && self.context.get_config_bool(Config::MvboxMove) {
param.set_int(Param::AlsoMove, 1);
}
job_add(self.context, Action::MarkseenMdnOnImap, 0, param, 0);
add_job_no_interrupt(self.context, Action::MarkseenMdnOnImap, 0, param, 0);
}
}
}