Compare commits

..

1 Commits

Author SHA1 Message Date
holger krekel
81f85488de use async-imap response handling branch 2019-11-25 23:45:03 +01:00
6 changed files with 134 additions and 24 deletions

6
Cargo.lock generated
View File

@@ -90,7 +90,7 @@ dependencies = [
[[package]]
name = "async-imap"
version = "0.1.1"
source = "git+https://github.com/async-email/async-imap#1327f678cf5515842fc309636459372e6ab40db2"
source = "git+https://github.com/async-email/async-imap?branch=improved-response-handling#a79795381622e536f035a96c2c249272e9fbcece"
dependencies = [
"async-attributes 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"async-std 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -610,7 +610,7 @@ dependencies = [
name = "deltachat"
version = "1.0.0-beta.8"
dependencies = [
"async-imap 0.1.1 (git+https://github.com/async-email/async-imap)",
"async-imap 0.1.1 (git+https://github.com/async-email/async-imap?branch=improved-response-handling)",
"async-std 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"async-tls 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"backtrace 0.3.40 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -3245,7 +3245,7 @@ dependencies = [
"checksum arrayvec 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8"
"checksum ascii_utils 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "71938f30533e4d95a6d17aa530939da3842c2ab6f4f84b9dae68447e4129f74a"
"checksum async-attributes 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "efd3d156917d94862e779f356c5acae312b08fd3121e792c857d7928c8088423"
"checksum async-imap 0.1.1 (git+https://github.com/async-email/async-imap)" = "<none>"
"checksum async-imap 0.1.1 (git+https://github.com/async-email/async-imap?branch=improved-response-handling)" = "<none>"
"checksum async-macros 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "644a5a8de80f2085a1e7e57cd1544a2a7438f6e003c0790999bd43b92a77cdb2"
"checksum async-std 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "56933da6903b273923d13f4746d829f66ff9b444173f6743d831e80f4da15446"
"checksum async-task 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de6bd58f7b9cc49032559422595c81cbfcf04db2f2133592f70af19e258a1ced"

View File

@@ -19,7 +19,7 @@ reqwest = { version = "0.9.15", default-features = false, features = ["rustls-tl
num-derive = "0.2.5"
num-traits = "0.2.6"
lettre = { git = "https://github.com/deltachat/lettre", branch = "feat/rustls" }
async-imap = { git = "https://github.com/async-email/async-imap", branch="master" }
async-imap = { git = "https://github.com/async-email/async-imap", branch="improved-response-handling" }
async-tls = "0.6"
async-std = { version = "1.0", features = ["unstable"] }
base64 = "0.10"

View File

@@ -69,6 +69,18 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) {
.unwrap()
.imap
.disconnect(context);
context
.sentbox_thread
.read()
.unwrap()
.imap
.disconnect(context);
context
.mvbox_thread
.read()
.unwrap()
.imap
.disconnect(context);
context.smtp.clone().lock().unwrap().disconnect();
info!(context, "Configure ...",);

View File

@@ -498,6 +498,8 @@ pub struct BobStatus {
#[derive(Default, Debug)]
pub struct SmtpState {
pub idle: bool,
pub suspended: bool,
pub doing_jobs: bool,
pub perform_jobs_needed: i32,
pub probe_network: bool,
}

View File

@@ -31,13 +31,6 @@ enum Thread {
Smtp = 5000,
}
#[derive(Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive)]
enum TryAgain {
Dont,
AtOnce,
StandardDelay,
}
impl Default for Thread {
fn default() -> Self {
Thread::Unknown
@@ -109,7 +102,7 @@ pub struct Job {
pub added_timestamp: i64,
pub tries: i32,
pub param: Params,
try_again: TryAgain,
pub try_again: i32,
pub pending_error: Option<String>,
}
@@ -143,7 +136,7 @@ impl Job {
let loginparam = LoginParam::from_database(context, "configured_");
let connected = context.smtp.lock().unwrap().connect(context, &loginparam);
if connected.is_err() {
self.try_again_later(TryAgain::StandardDelay, None);
self.try_again_later(3, None);
return;
}
}
@@ -184,7 +177,7 @@ impl Job {
Err(err) => {
smtp.disconnect();
warn!(context, "smtp failed: {}", err);
self.try_again_later(TryAgain::AtOnce, Some(err.to_string()));
self.try_again_later(-1, Some(err.to_string()));
}
Ok(()) => {
// smtp success, update db ASAP, then delete smtp file
@@ -203,7 +196,7 @@ impl Job {
}
// this value does not increase the number of tries
fn try_again_later(&mut self, try_again: TryAgain, pending_error: Option<String>) {
fn try_again_later(&mut self, try_again: i32, pending_error: Option<String>) {
self.try_again = try_again;
self.pending_error = pending_error;
}
@@ -237,7 +230,7 @@ impl Job {
&mut dest_uid,
) {
ImapActionResult::RetryLater => {
self.try_again_later(TryAgain::StandardDelay, None);
self.try_again_later(3i32, None);
}
ImapActionResult::Success => {
message::update_server_uid(
@@ -273,7 +266,7 @@ impl Job {
let res =
imap_inbox.delete_msg(context, &mid, server_folder, &mut msg.server_uid);
if res == ImapActionResult::RetryLater {
self.try_again_later(TryAgain::AtOnce, None);
self.try_again_later(-1i32, None);
return;
}
}
@@ -306,7 +299,7 @@ impl Job {
let folder = msg.server_folder.as_ref().unwrap();
match imap_inbox.set_seen(context, folder, msg.server_uid) {
ImapActionResult::RetryLater => {
self.try_again_later(TryAgain::StandardDelay, None);
self.try_again_later(3i32, None);
}
ImapActionResult::AlreadyDone => {}
ImapActionResult::Success | ImapActionResult::Failed => {
@@ -336,7 +329,7 @@ impl Job {
let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32;
let imap_inbox = &context.inbox_thread.read().unwrap().imap;
if imap_inbox.set_seen(context, &folder, uid) == ImapActionResult::RetryLater {
self.try_again_later(TryAgain::StandardDelay, None);
self.try_again_later(3i32, None);
return;
}
if 0 != self.param.get_int(Param::AlsoMove).unwrap_or_default() {
@@ -356,7 +349,7 @@ impl Job {
if ImapActionResult::RetryLater
== imap_inbox.mv(context, &folder, uid, &dest_folder, &mut dest_uid)
{
self.try_again_later(TryAgain::StandardDelay, None);
self.try_again_later(3, None);
}
}
}
@@ -479,12 +472,24 @@ pub fn perform_smtp_jobs(context: &Context) {
state.probe_network = false;
state.perform_jobs_needed = 0;
if state.suspended {
info!(context, "SMTP-jobs suspended.",);
return;
}
state.doing_jobs = true;
probe_smtp_network
};
info!(context, "SMTP-jobs started...",);
job_perform(context, Thread::Smtp, probe_smtp_network);
info!(context, "SMTP-jobs ended.");
{
let &(ref lock, _) = &*context.smtp_state.clone();
let mut state = lock.lock().unwrap();
state.doing_jobs = false;
}
}
pub fn perform_smtp_idle(context: &Context) {
@@ -737,7 +742,7 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
added_timestamp: row.get(4)?,
tries: row.get(6)?,
param: row.get::<_, String>(3)?.parse().unwrap_or_default(),
try_again: TryAgain::Dont,
try_again: 0,
pending_error: None,
};
@@ -763,15 +768,30 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
);
// some configuration jobs are "exclusive":
// - they are always executed in the imap-thread and the smtp-thread is suspended during execution
// - they may change the database handle change the database handle; we do not keep old pointers therefore
// - they can be re-executed one time AT_ONCE, but they are not save in the database for later execution
if Action::ConfigureImap == job.action || Action::ImexImap == job.action {
job_kill_action(context, job.action);
context
.sentbox_thread
.clone()
.read()
.unwrap()
.suspend(context);
context
.mvbox_thread
.clone()
.read()
.unwrap()
.suspend(context);
suspend_smtp_thread(context, true);
}
let mut tries = 0;
while tries <= 1 {
// this can be modified by a job using dc_job_try_again_later()
job.try_again = TryAgain::Dont;
job.try_again = 0;
match job.action {
Action::Unknown => {
@@ -801,14 +821,39 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
Action::SendMdnOld => {}
Action::SendMsgToSmtpOld => {}
}
if job.try_again != TryAgain::AtOnce {
if job.try_again != -1 {
break;
}
tries += 1
}
if Action::ConfigureImap == job.action || Action::ImexImap == job.action {
context
.sentbox_thread
.clone()
.read()
.unwrap()
.unsuspend(context);
context
.mvbox_thread
.clone()
.read()
.unwrap()
.unsuspend(context);
suspend_smtp_thread(context, false);
break;
} else if job.try_again == TryAgain::AtOnce || job.try_again == TryAgain::StandardDelay {
} else if job.try_again == 2 {
// just try over next loop unconditionally, the ui typically interrupts idle when the file (video) is ready
info!(
context,
"{}-job #{} not yet ready and will be delayed.",
if thread == Thread::Imap {
"INBOX"
} else {
"SMTP"
},
job.job_id
);
} else if job.try_again == -1 || job.try_again == 3 {
let tries = job.tries + 1;
if tries < 17 {
job.tries = tries;
@@ -874,6 +919,18 @@ fn get_backoff_time_offset(c_tries: libc::c_int) -> i64 {
seconds as i64
}
fn suspend_smtp_thread(context: &Context, suspend: bool) {
context.smtp_state.0.lock().unwrap().suspended = suspend;
if suspend {
loop {
if !context.smtp_state.0.lock().unwrap().doing_jobs {
return;
}
std::thread::sleep(std::time::Duration::from_micros(300 * 1000));
}
}
}
fn send_mdn(context: &Context, msg_id: MsgId) -> Result<(), Error> {
let mut mimefactory = MimeFactory::load_mdn(context, msg_id)?;
unsafe { mimefactory.render()? };

View File

@@ -16,6 +16,7 @@ pub struct JobThread {
pub struct JobState {
idle: bool,
jobs_needed: bool,
suspended: bool,
using_handle: bool,
}
@@ -29,6 +30,32 @@ impl JobThread {
}
}
pub fn suspend(&self, context: &Context) {
info!(context, "Suspending {}-thread.", self.name,);
{
self.state.0.lock().unwrap().suspended = true;
}
self.interrupt_idle(context);
loop {
let using_handle = self.state.0.lock().unwrap().using_handle;
if !using_handle {
return;
}
std::thread::sleep(std::time::Duration::from_micros(300 * 1000));
}
}
pub fn unsuspend(&self, context: &Context) {
info!(context, "Unsuspending {}-thread.", self.name);
let &(ref lock, ref cvar) = &*self.state.clone();
let mut state = lock.lock().unwrap();
state.suspended = false;
state.idle = true;
cvar.notify_one();
}
pub fn interrupt_idle(&self, context: &Context) {
{
self.state.0.lock().unwrap().jobs_needed = true;
@@ -50,6 +77,10 @@ impl JobThread {
let &(ref lock, _) = &*self.state.clone();
let mut state = lock.lock().unwrap();
if state.suspended {
return;
}
state.using_handle = true;
}
@@ -114,6 +145,14 @@ impl JobThread {
return;
}
if state.suspended {
while !state.idle {
state = cvar.wait(state).unwrap();
}
state.idle = false;
return;
}
state.using_handle = true;
if !use_network {