mirror of
https://github.com/chatmail/core.git
synced 2026-06-19 14:17:03 +03:00
Compare commits
1 Commits
remove-sus
...
response_h
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
81f85488de |
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -90,7 +90,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "async-imap"
|
||||
version = "0.1.1"
|
||||
source = "git+https://github.com/async-email/async-imap#1327f678cf5515842fc309636459372e6ab40db2"
|
||||
source = "git+https://github.com/async-email/async-imap?branch=improved-response-handling#a79795381622e536f035a96c2c249272e9fbcece"
|
||||
dependencies = [
|
||||
"async-attributes 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"async-std 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
@@ -610,7 +610,7 @@ dependencies = [
|
||||
name = "deltachat"
|
||||
version = "1.0.0-beta.8"
|
||||
dependencies = [
|
||||
"async-imap 0.1.1 (git+https://github.com/async-email/async-imap)",
|
||||
"async-imap 0.1.1 (git+https://github.com/async-email/async-imap?branch=improved-response-handling)",
|
||||
"async-std 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"async-tls 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"backtrace 0.3.40 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
@@ -3245,7 +3245,7 @@ dependencies = [
|
||||
"checksum arrayvec 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8"
|
||||
"checksum ascii_utils 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "71938f30533e4d95a6d17aa530939da3842c2ab6f4f84b9dae68447e4129f74a"
|
||||
"checksum async-attributes 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "efd3d156917d94862e779f356c5acae312b08fd3121e792c857d7928c8088423"
|
||||
"checksum async-imap 0.1.1 (git+https://github.com/async-email/async-imap)" = "<none>"
|
||||
"checksum async-imap 0.1.1 (git+https://github.com/async-email/async-imap?branch=improved-response-handling)" = "<none>"
|
||||
"checksum async-macros 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "644a5a8de80f2085a1e7e57cd1544a2a7438f6e003c0790999bd43b92a77cdb2"
|
||||
"checksum async-std 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "56933da6903b273923d13f4746d829f66ff9b444173f6743d831e80f4da15446"
|
||||
"checksum async-task 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de6bd58f7b9cc49032559422595c81cbfcf04db2f2133592f70af19e258a1ced"
|
||||
|
||||
@@ -19,7 +19,7 @@ reqwest = { version = "0.9.15", default-features = false, features = ["rustls-tl
|
||||
num-derive = "0.2.5"
|
||||
num-traits = "0.2.6"
|
||||
lettre = { git = "https://github.com/deltachat/lettre", branch = "feat/rustls" }
|
||||
async-imap = { git = "https://github.com/async-email/async-imap", branch="master" }
|
||||
async-imap = { git = "https://github.com/async-email/async-imap", branch="improved-response-handling" }
|
||||
async-tls = "0.6"
|
||||
async-std = { version = "1.0", features = ["unstable"] }
|
||||
base64 = "0.10"
|
||||
|
||||
@@ -69,6 +69,18 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) {
|
||||
.unwrap()
|
||||
.imap
|
||||
.disconnect(context);
|
||||
context
|
||||
.sentbox_thread
|
||||
.read()
|
||||
.unwrap()
|
||||
.imap
|
||||
.disconnect(context);
|
||||
context
|
||||
.mvbox_thread
|
||||
.read()
|
||||
.unwrap()
|
||||
.imap
|
||||
.disconnect(context);
|
||||
context.smtp.clone().lock().unwrap().disconnect();
|
||||
info!(context, "Configure ...",);
|
||||
|
||||
|
||||
@@ -498,6 +498,8 @@ pub struct BobStatus {
|
||||
#[derive(Default, Debug)]
|
||||
pub struct SmtpState {
|
||||
pub idle: bool,
|
||||
pub suspended: bool,
|
||||
pub doing_jobs: bool,
|
||||
pub perform_jobs_needed: i32,
|
||||
pub probe_network: bool,
|
||||
}
|
||||
|
||||
97
src/job.rs
97
src/job.rs
@@ -31,13 +31,6 @@ enum Thread {
|
||||
Smtp = 5000,
|
||||
}
|
||||
|
||||
#[derive(Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive)]
|
||||
enum TryAgain {
|
||||
Dont,
|
||||
AtOnce,
|
||||
StandardDelay,
|
||||
}
|
||||
|
||||
impl Default for Thread {
|
||||
fn default() -> Self {
|
||||
Thread::Unknown
|
||||
@@ -109,7 +102,7 @@ pub struct Job {
|
||||
pub added_timestamp: i64,
|
||||
pub tries: i32,
|
||||
pub param: Params,
|
||||
try_again: TryAgain,
|
||||
pub try_again: i32,
|
||||
pub pending_error: Option<String>,
|
||||
}
|
||||
|
||||
@@ -143,7 +136,7 @@ impl Job {
|
||||
let loginparam = LoginParam::from_database(context, "configured_");
|
||||
let connected = context.smtp.lock().unwrap().connect(context, &loginparam);
|
||||
if connected.is_err() {
|
||||
self.try_again_later(TryAgain::StandardDelay, None);
|
||||
self.try_again_later(3, None);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -184,7 +177,7 @@ impl Job {
|
||||
Err(err) => {
|
||||
smtp.disconnect();
|
||||
warn!(context, "smtp failed: {}", err);
|
||||
self.try_again_later(TryAgain::AtOnce, Some(err.to_string()));
|
||||
self.try_again_later(-1, Some(err.to_string()));
|
||||
}
|
||||
Ok(()) => {
|
||||
// smtp success, update db ASAP, then delete smtp file
|
||||
@@ -203,7 +196,7 @@ impl Job {
|
||||
}
|
||||
|
||||
// this value does not increase the number of tries
|
||||
fn try_again_later(&mut self, try_again: TryAgain, pending_error: Option<String>) {
|
||||
fn try_again_later(&mut self, try_again: i32, pending_error: Option<String>) {
|
||||
self.try_again = try_again;
|
||||
self.pending_error = pending_error;
|
||||
}
|
||||
@@ -237,7 +230,7 @@ impl Job {
|
||||
&mut dest_uid,
|
||||
) {
|
||||
ImapActionResult::RetryLater => {
|
||||
self.try_again_later(TryAgain::StandardDelay, None);
|
||||
self.try_again_later(3i32, None);
|
||||
}
|
||||
ImapActionResult::Success => {
|
||||
message::update_server_uid(
|
||||
@@ -273,7 +266,7 @@ impl Job {
|
||||
let res =
|
||||
imap_inbox.delete_msg(context, &mid, server_folder, &mut msg.server_uid);
|
||||
if res == ImapActionResult::RetryLater {
|
||||
self.try_again_later(TryAgain::AtOnce, None);
|
||||
self.try_again_later(-1i32, None);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -306,7 +299,7 @@ impl Job {
|
||||
let folder = msg.server_folder.as_ref().unwrap();
|
||||
match imap_inbox.set_seen(context, folder, msg.server_uid) {
|
||||
ImapActionResult::RetryLater => {
|
||||
self.try_again_later(TryAgain::StandardDelay, None);
|
||||
self.try_again_later(3i32, None);
|
||||
}
|
||||
ImapActionResult::AlreadyDone => {}
|
||||
ImapActionResult::Success | ImapActionResult::Failed => {
|
||||
@@ -336,7 +329,7 @@ impl Job {
|
||||
let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32;
|
||||
let imap_inbox = &context.inbox_thread.read().unwrap().imap;
|
||||
if imap_inbox.set_seen(context, &folder, uid) == ImapActionResult::RetryLater {
|
||||
self.try_again_later(TryAgain::StandardDelay, None);
|
||||
self.try_again_later(3i32, None);
|
||||
return;
|
||||
}
|
||||
if 0 != self.param.get_int(Param::AlsoMove).unwrap_or_default() {
|
||||
@@ -356,7 +349,7 @@ impl Job {
|
||||
if ImapActionResult::RetryLater
|
||||
== imap_inbox.mv(context, &folder, uid, &dest_folder, &mut dest_uid)
|
||||
{
|
||||
self.try_again_later(TryAgain::StandardDelay, None);
|
||||
self.try_again_later(3, None);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -479,12 +472,24 @@ pub fn perform_smtp_jobs(context: &Context) {
|
||||
state.probe_network = false;
|
||||
state.perform_jobs_needed = 0;
|
||||
|
||||
if state.suspended {
|
||||
info!(context, "SMTP-jobs suspended.",);
|
||||
return;
|
||||
}
|
||||
state.doing_jobs = true;
|
||||
probe_smtp_network
|
||||
};
|
||||
|
||||
info!(context, "SMTP-jobs started...",);
|
||||
job_perform(context, Thread::Smtp, probe_smtp_network);
|
||||
info!(context, "SMTP-jobs ended.");
|
||||
|
||||
{
|
||||
let &(ref lock, _) = &*context.smtp_state.clone();
|
||||
let mut state = lock.lock().unwrap();
|
||||
|
||||
state.doing_jobs = false;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn perform_smtp_idle(context: &Context) {
|
||||
@@ -737,7 +742,7 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
|
||||
added_timestamp: row.get(4)?,
|
||||
tries: row.get(6)?,
|
||||
param: row.get::<_, String>(3)?.parse().unwrap_or_default(),
|
||||
try_again: TryAgain::Dont,
|
||||
try_again: 0,
|
||||
pending_error: None,
|
||||
};
|
||||
|
||||
@@ -763,15 +768,30 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
|
||||
);
|
||||
|
||||
// some configuration jobs are "exclusive":
|
||||
// - they are always executed in the imap-thread and the smtp-thread is suspended during execution
|
||||
// - they may change the database handle change the database handle; we do not keep old pointers therefore
|
||||
// - they can be re-executed one time AT_ONCE, but they are not save in the database for later execution
|
||||
if Action::ConfigureImap == job.action || Action::ImexImap == job.action {
|
||||
job_kill_action(context, job.action);
|
||||
context
|
||||
.sentbox_thread
|
||||
.clone()
|
||||
.read()
|
||||
.unwrap()
|
||||
.suspend(context);
|
||||
context
|
||||
.mvbox_thread
|
||||
.clone()
|
||||
.read()
|
||||
.unwrap()
|
||||
.suspend(context);
|
||||
suspend_smtp_thread(context, true);
|
||||
}
|
||||
|
||||
let mut tries = 0;
|
||||
while tries <= 1 {
|
||||
// this can be modified by a job using dc_job_try_again_later()
|
||||
job.try_again = TryAgain::Dont;
|
||||
job.try_again = 0;
|
||||
|
||||
match job.action {
|
||||
Action::Unknown => {
|
||||
@@ -801,14 +821,39 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
|
||||
Action::SendMdnOld => {}
|
||||
Action::SendMsgToSmtpOld => {}
|
||||
}
|
||||
if job.try_again != TryAgain::AtOnce {
|
||||
if job.try_again != -1 {
|
||||
break;
|
||||
}
|
||||
tries += 1
|
||||
}
|
||||
if Action::ConfigureImap == job.action || Action::ImexImap == job.action {
|
||||
context
|
||||
.sentbox_thread
|
||||
.clone()
|
||||
.read()
|
||||
.unwrap()
|
||||
.unsuspend(context);
|
||||
context
|
||||
.mvbox_thread
|
||||
.clone()
|
||||
.read()
|
||||
.unwrap()
|
||||
.unsuspend(context);
|
||||
suspend_smtp_thread(context, false);
|
||||
break;
|
||||
} else if job.try_again == TryAgain::AtOnce || job.try_again == TryAgain::StandardDelay {
|
||||
} else if job.try_again == 2 {
|
||||
// just try over next loop unconditionally, the ui typically interrupts idle when the file (video) is ready
|
||||
info!(
|
||||
context,
|
||||
"{}-job #{} not yet ready and will be delayed.",
|
||||
if thread == Thread::Imap {
|
||||
"INBOX"
|
||||
} else {
|
||||
"SMTP"
|
||||
},
|
||||
job.job_id
|
||||
);
|
||||
} else if job.try_again == -1 || job.try_again == 3 {
|
||||
let tries = job.tries + 1;
|
||||
if tries < 17 {
|
||||
job.tries = tries;
|
||||
@@ -874,6 +919,18 @@ fn get_backoff_time_offset(c_tries: libc::c_int) -> i64 {
|
||||
seconds as i64
|
||||
}
|
||||
|
||||
fn suspend_smtp_thread(context: &Context, suspend: bool) {
|
||||
context.smtp_state.0.lock().unwrap().suspended = suspend;
|
||||
if suspend {
|
||||
loop {
|
||||
if !context.smtp_state.0.lock().unwrap().doing_jobs {
|
||||
return;
|
||||
}
|
||||
std::thread::sleep(std::time::Duration::from_micros(300 * 1000));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn send_mdn(context: &Context, msg_id: MsgId) -> Result<(), Error> {
|
||||
let mut mimefactory = MimeFactory::load_mdn(context, msg_id)?;
|
||||
unsafe { mimefactory.render()? };
|
||||
|
||||
@@ -16,6 +16,7 @@ pub struct JobThread {
|
||||
pub struct JobState {
|
||||
idle: bool,
|
||||
jobs_needed: bool,
|
||||
suspended: bool,
|
||||
using_handle: bool,
|
||||
}
|
||||
|
||||
@@ -29,6 +30,32 @@ impl JobThread {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn suspend(&self, context: &Context) {
|
||||
info!(context, "Suspending {}-thread.", self.name,);
|
||||
{
|
||||
self.state.0.lock().unwrap().suspended = true;
|
||||
}
|
||||
self.interrupt_idle(context);
|
||||
loop {
|
||||
let using_handle = self.state.0.lock().unwrap().using_handle;
|
||||
if !using_handle {
|
||||
return;
|
||||
}
|
||||
std::thread::sleep(std::time::Duration::from_micros(300 * 1000));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unsuspend(&self, context: &Context) {
|
||||
info!(context, "Unsuspending {}-thread.", self.name);
|
||||
|
||||
let &(ref lock, ref cvar) = &*self.state.clone();
|
||||
let mut state = lock.lock().unwrap();
|
||||
|
||||
state.suspended = false;
|
||||
state.idle = true;
|
||||
cvar.notify_one();
|
||||
}
|
||||
|
||||
pub fn interrupt_idle(&self, context: &Context) {
|
||||
{
|
||||
self.state.0.lock().unwrap().jobs_needed = true;
|
||||
@@ -50,6 +77,10 @@ impl JobThread {
|
||||
let &(ref lock, _) = &*self.state.clone();
|
||||
let mut state = lock.lock().unwrap();
|
||||
|
||||
if state.suspended {
|
||||
return;
|
||||
}
|
||||
|
||||
state.using_handle = true;
|
||||
}
|
||||
|
||||
@@ -114,6 +145,14 @@ impl JobThread {
|
||||
return;
|
||||
}
|
||||
|
||||
if state.suspended {
|
||||
while !state.idle {
|
||||
state = cvar.wait(state).unwrap();
|
||||
}
|
||||
state.idle = false;
|
||||
return;
|
||||
}
|
||||
|
||||
state.using_handle = true;
|
||||
|
||||
if !use_network {
|
||||
|
||||
Reference in New Issue
Block a user