Try to fix it. I dont like the code and it does not work.

This commit is contained in:
Hocuri
2020-05-31 18:07:40 +02:00
parent 76f4b9baa4
commit 6e05039a10
5 changed files with 137 additions and 100 deletions

View File

@@ -11,7 +11,7 @@ use crate::dc_tools::*;
use crate::events::Event;
use crate::message::MsgId;
use crate::mimefactory::RECOMMENDED_FILE_SIZE;
use crate::stock::StockMessage;
use crate::{scheduler::InterruptInfo, stock::StockMessage};
/// The available configuration keys.
#[derive(
@@ -203,17 +203,18 @@ impl Context {
}
Config::InboxWatch => {
let ret = self.sql.set_raw_config(self, key, value).await;
self.interrupt_inbox(false).await;
self.interrupt_inbox(InterruptInfo::new(false, None)).await;
ret
}
Config::SentboxWatch => {
let ret = self.sql.set_raw_config(self, key, value).await;
self.interrupt_sentbox(false).await;
self.interrupt_sentbox(InterruptInfo::new(false, None))
.await;
ret
}
Config::MvboxWatch => {
let ret = self.sql.set_raw_config(self, key, value).await;
self.interrupt_mvbox(false).await;
self.interrupt_mvbox(InterruptInfo::new(false, None)).await;
ret
}
Config::Selfstatus => {

View File

@@ -4,7 +4,7 @@ use async_imap::extensions::idle::IdleResponse;
use async_std::prelude::*;
use std::time::{Duration, SystemTime};
use crate::context::Context;
use crate::{context::Context, scheduler::InterruptInfo};
use super::select_folder;
use super::session::Session;
@@ -34,7 +34,11 @@ impl Imap {
self.config.can_idle
}
pub async fn idle(&mut self, context: &Context, watch_folder: Option<String>) -> Result<bool> {
pub async fn idle(
&mut self,
context: &Context,
watch_folder: Option<String>,
) -> Result<InterruptInfo> {
use futures::future::FutureExt;
if !self.can_idle() {
@@ -46,7 +50,7 @@ impl Imap {
let session = self.session.take();
let timeout = Duration::from_secs(23 * 60);
let mut probe_network = false;
let mut info = Default::default();
if let Some(session) = session {
let mut handle = session.idle();
@@ -58,7 +62,7 @@ impl Imap {
enum Event {
IdleResponse(IdleResponse),
Interrupt(bool),
Interrupt(InterruptInfo),
}
if self.skip_next_idle_wait {
@@ -90,8 +94,8 @@ impl Imap {
Ok(Event::IdleResponse(IdleResponse::ManualInterrupt)) => {
info!(context, "Idle wait was interrupted");
}
Ok(Event::Interrupt(probe)) => {
probe_network = probe;
Ok(Event::Interrupt(i)) => {
info = i;
info!(context, "Idle wait was interrupted");
}
Err(err) => {
@@ -125,14 +129,14 @@ impl Imap {
}
}
Ok(probe_network)
Ok(info)
}
pub(crate) async fn fake_idle(
&mut self,
context: &Context,
watch_folder: Option<String>,
) -> bool {
) -> InterruptInfo {
// Idle using polling. This is also needed if we're not yet configured -
// in this case, we're waiting for a configure job (and an interrupt).
@@ -144,7 +148,7 @@ impl Imap {
return self.idle_interrupt.recv().await.unwrap_or_default();
}
let mut probe_network = false;
let mut info: InterruptInfo = Default::default();
if self.skip_next_idle_wait {
// interrupt_idle has happened before we
// provided self.interrupt
@@ -157,10 +161,10 @@ impl Imap {
enum Event {
Tick,
Interrupt(bool),
Interrupt(InterruptInfo),
}
// loop until we are interrupted or if we fetched something
probe_network =
info =
loop {
use futures::future::FutureExt;
match interval
@@ -181,7 +185,7 @@ impl Imap {
}
if self.config.can_idle {
// we only fake-idled because network was gone during IDLE, probably
break false;
break InterruptInfo::new(false, None);
}
info!(context, "fake_idle is connected");
// we are connected, let's see if fetching messages results
@@ -194,7 +198,7 @@ impl Imap {
Ok(res) => {
info!(context, "fetch_new_messages returned {:?}", res);
if res {
break false;
break InterruptInfo::new(false, None);
}
}
Err(err) => {
@@ -204,9 +208,9 @@ impl Imap {
}
}
}
Event::Interrupt(probe_network) => {
Event::Interrupt(info) => {
// Interrupt
break probe_network;
break info;
}
}
};
@@ -222,6 +226,6 @@ impl Imap {
/ 1000.,
);
probe_network
info
}
}

View File

@@ -27,7 +27,7 @@ use crate::message::{self, update_server_uid};
use crate::mimeparser;
use crate::oauth2::dc_get_oauth2_access_token;
use crate::param::Params;
use crate::stock::StockMessage;
use crate::{scheduler::InterruptInfo, stock::StockMessage};
mod client;
mod idle;
@@ -109,7 +109,7 @@ const SELECT_ALL: &str = "1:*";
#[derive(Debug)]
pub struct Imap {
idle_interrupt: Receiver<bool>,
idle_interrupt: Receiver<InterruptInfo>,
config: ImapConfig,
session: Option<Session>,
connected: bool,
@@ -181,7 +181,7 @@ impl Default for ImapConfig {
}
impl Imap {
pub fn new(idle_interrupt: Receiver<bool>) -> Self {
pub fn new(idle_interrupt: Receiver<InterruptInfo>) -> Self {
Imap {
idle_interrupt,
config: Default::default(),
@@ -1391,6 +1391,10 @@ async fn precheck_imf(
if old_server_folder != server_folder || old_server_uid != server_uid {
update_server_uid(context, rfc724_mid, server_folder, server_uid).await;
context
.interrupt_inbox(InterruptInfo::new(false, Some(msg_id)))
.await;
info!(context, "Updating server_uid and interrupting")
}
Ok(true)
} else {

View File

@@ -31,7 +31,7 @@ use crate::message::{self, Message, MessageState};
use crate::mimefactory::MimeFactory;
use crate::param::*;
use crate::smtp::Smtp;
use crate::sql;
use crate::{scheduler::InterruptInfo, sql};
// results in ~3 weeks for the last backoff timespan
const JOB_RETRIES: u32 = 17;
@@ -1022,14 +1022,18 @@ pub async fn add(context: &Context, job: Job) {
| Action::MarkseenMsgOnImap
| Action::MoveMsg => {
info!(context, "interrupt: imap");
context.interrupt_inbox(false).await;
context
.interrupt_inbox(InterruptInfo::new(false, None))
.await;
}
Action::MaybeSendLocations
| Action::MaybeSendLocationsEnded
| Action::SendMdn
| Action::SendMsgToSmtp => {
info!(context, "interrupt: smtp");
context.interrupt_smtp(false).await;
context
.interrupt_smtp(InterruptInfo::new(false, None))
.await;
}
}
}
@@ -1044,38 +1048,49 @@ pub async fn add(context: &Context, job: Job) {
pub(crate) async fn load_next(
context: &Context,
thread: Thread,
probe_network: bool,
info: &InterruptInfo,
) -> Option<Job> {
info!(context, "loading job for {}-thread", thread);
let query = if !probe_network {
let query;
let params;
let t = time();
let m;
let thread_i = thread as i64;
if let Some(msg_id) = info.msg_id {
query = r#"
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
FROM jobs
WHERE foreign_id=?
ORDER BY action DESC, added_timestamp
LIMIT 1;
"#;
m = msg_id;
params = paramsv![m];
} else if !info.probe_network {
// processing for first-try and after backoff-timeouts:
// process jobs in the order they were added.
r#"
query = r#"
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
FROM jobs
WHERE thread=? AND desired_timestamp<=?
ORDER BY action DESC, added_timestamp
LIMIT 1;
"#
"#;
params = paramsv![thread_i, t];
} else {
// processing after call to dc_maybe_network():
// process _all_ pending jobs that failed before
// in the order of their backoff-times.
r#"
query = r#"
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
FROM jobs
WHERE thread=? AND tries>0
ORDER BY desired_timestamp, action DESC
LIMIT 1;
"#
};
let thread_i = thread as i64;
let t = time();
let params = if !probe_network {
paramsv![thread_i, t]
} else {
paramsv![thread_i]
"#;
params = paramsv![thread_i];
};
let job = loop {
@@ -1182,11 +1197,21 @@ mod tests {
// all jobs.
let t = dummy_context().await;
insert_job(&t.ctx, -1).await; // This can not be loaded into Job struct.
let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await;
let jobs = load_next(
&t.ctx,
Thread::from(Action::MoveMsg),
&InterruptInfo::new(false, None),
)
.await;
assert!(jobs.is_none());
insert_job(&t.ctx, 1).await;
let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await;
let jobs = load_next(
&t.ctx,
Thread::from(Action::MoveMsg),
&InterruptInfo::new(false, None),
)
.await;
assert!(jobs.is_some());
}
@@ -1196,7 +1221,12 @@ mod tests {
insert_job(&t.ctx, 1).await;
let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await;
let jobs = load_next(
&t.ctx,
Thread::from(Action::MoveMsg),
&InterruptInfo::new(false, None),
)
.await;
assert!(jobs.is_some());
}
}

View File

@@ -5,7 +5,7 @@ use async_std::task;
use crate::context::Context;
use crate::imap::Imap;
use crate::job::{self, Thread};
use crate::{config::Config, smtp::Smtp};
use crate::{config::Config, message::MsgId, smtp::Smtp};
pub(crate) struct StopToken;
@@ -32,36 +32,20 @@ impl Context {
self.scheduler.read().await.maybe_network().await;
}
pub(crate) async fn interrupt_inbox(&self, probe_network: bool) {
self.scheduler
.read()
.await
.interrupt_inbox(probe_network)
.await;
pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) {
self.scheduler.read().await.interrupt_inbox(info).await;
}
pub(crate) async fn interrupt_sentbox(&self, probe_network: bool) {
self.scheduler
.read()
.await
.interrupt_sentbox(probe_network)
.await;
pub(crate) async fn interrupt_sentbox(&self, info: InterruptInfo) {
self.scheduler.read().await.interrupt_sentbox(info).await;
}
pub(crate) async fn interrupt_mvbox(&self, probe_network: bool) {
self.scheduler
.read()
.await
.interrupt_mvbox(probe_network)
.await;
pub(crate) async fn interrupt_mvbox(&self, info: InterruptInfo) {
self.scheduler.read().await.interrupt_mvbox(info).await;
}
pub(crate) async fn interrupt_smtp(&self, probe_network: bool) {
self.scheduler
.read()
.await
.interrupt_smtp(probe_network)
.await;
pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) {
self.scheduler.read().await.interrupt_smtp(info).await;
}
}
@@ -86,14 +70,14 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
started.send(()).await;
// track number of continously executed jobs
let mut jobs_loaded = 0;
let mut probe_network = false;
let mut jobs_loaded: i32 = 0;
let mut info: InterruptInfo = Default::default();
loop {
match job::load_next(&ctx, Thread::Imap, probe_network).await {
match job::load_next(&ctx, Thread::Imap, &info).await {
Some(job) if jobs_loaded <= 20 => {
jobs_loaded += 1;
job::perform_job(&ctx, job::Connection::Inbox(&mut connection), job).await;
probe_network = false;
info = Default::default();
}
Some(job) => {
// Let the fetch run, but return back to the job afterwards.
@@ -103,8 +87,7 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
}
None => {
jobs_loaded = 0;
probe_network =
fetch_idle(&ctx, &mut connection, Config::ConfiguredInboxFolder).await;
info = fetch_idle(&ctx, &mut connection, Config::ConfiguredInboxFolder).await;
}
}
}
@@ -136,7 +119,7 @@ async fn fetch(ctx: &Context, connection: &mut Imap) {
}
}
async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> bool {
async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> InterruptInfo {
match ctx.get_config(folder).await {
Some(watch_folder) => {
// fetch
@@ -153,7 +136,7 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> boo
.unwrap_or_else(|err| {
connection.trigger_reconnect();
error!(ctx, "{}", err);
false
InterruptInfo::new(false, None)
})
} else {
connection.fake_idle(&ctx, Some(watch_folder)).await
@@ -223,18 +206,18 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
started.send(()).await;
let ctx = ctx1;
let mut probe_network = false;
let mut interrupt_info = Default::default();
loop {
match job::load_next(&ctx, Thread::Smtp, probe_network).await {
match job::load_next(&ctx, Thread::Smtp, &interrupt_info).await {
Some(job) => {
info!(ctx, "executing smtp job");
job::perform_job(&ctx, job::Connection::Smtp(&mut connection), job).await;
probe_network = false;
interrupt_info = Default::default();
}
None => {
// Fake Idle
info!(ctx, "smtp fake idle - started");
probe_network = idle_interrupt_receiver.recv().await.unwrap_or_default();
interrupt_info = idle_interrupt_receiver.recv().await.unwrap_or_default();
info!(ctx, "smtp fake idle - interrupted")
}
}
@@ -333,34 +316,34 @@ impl Scheduler {
return;
}
self.interrupt_inbox(true)
.join(self.interrupt_mvbox(true))
.join(self.interrupt_sentbox(true))
.join(self.interrupt_smtp(true))
self.interrupt_inbox(InterruptInfo::new(true, None))
.join(self.interrupt_mvbox(InterruptInfo::new(true, None)))
.join(self.interrupt_sentbox(InterruptInfo::new(true, None)))
.join(self.interrupt_smtp(InterruptInfo::new(true, None)))
.await;
}
async fn interrupt_inbox(&self, probe_network: bool) {
async fn interrupt_inbox(&self, info: InterruptInfo) {
if let Scheduler::Running { ref inbox, .. } = self {
inbox.interrupt(probe_network).await;
inbox.interrupt(info).await;
}
}
async fn interrupt_mvbox(&self, probe_network: bool) {
async fn interrupt_mvbox(&self, info: InterruptInfo) {
if let Scheduler::Running { ref mvbox, .. } = self {
mvbox.interrupt(probe_network).await;
mvbox.interrupt(info).await;
}
}
async fn interrupt_sentbox(&self, probe_network: bool) {
async fn interrupt_sentbox(&self, info: InterruptInfo) {
if let Scheduler::Running { ref sentbox, .. } = self {
sentbox.interrupt(probe_network).await;
sentbox.interrupt(info).await;
}
}
async fn interrupt_smtp(&self, probe_network: bool) {
async fn interrupt_smtp(&self, info: InterruptInfo) {
if let Scheduler::Running { ref smtp, .. } = self {
smtp.interrupt(probe_network).await;
smtp.interrupt(info).await;
}
}
@@ -429,7 +412,7 @@ struct ConnectionState {
/// Channel to interrupt the whole connection.
stop_sender: Sender<()>,
/// Channel to interrupt idle.
idle_interrupt_sender: Sender<bool>,
idle_interrupt_sender: Sender<InterruptInfo>,
}
impl ConnectionState {
@@ -441,9 +424,9 @@ impl ConnectionState {
self.shutdown_receiver.recv().await.ok();
}
async fn interrupt(&self, probe_network: bool) {
async fn interrupt(&self, info: InterruptInfo) {
// Use try_send to avoid blocking on interrupts.
self.idle_interrupt_sender.try_send(probe_network).ok();
self.idle_interrupt_sender.try_send(info).ok();
}
}
@@ -477,8 +460,8 @@ impl SmtpConnectionState {
}
/// Interrupt any form of idle.
async fn interrupt(&self, probe_network: bool) {
self.state.interrupt(probe_network).await;
async fn interrupt(&self, info: InterruptInfo) {
self.state.interrupt(info).await;
}
/// Shutdown this connection completely.
@@ -492,7 +475,7 @@ struct SmtpConnectionHandlers {
connection: Smtp,
stop_receiver: Receiver<()>,
shutdown_sender: Sender<()>,
idle_interrupt_receiver: Receiver<bool>,
idle_interrupt_receiver: Receiver<InterruptInfo>,
}
#[derive(Debug)]
@@ -525,8 +508,8 @@ impl ImapConnectionState {
}
/// Interrupt any form of idle.
async fn interrupt(&self, probe_network: bool) {
self.state.interrupt(probe_network).await;
async fn interrupt(&self, info: InterruptInfo) {
self.state.interrupt(info).await;
}
/// Shutdown this connection completely.
@@ -541,3 +524,18 @@ struct ImapConnectionHandlers {
stop_receiver: Receiver<()>,
shutdown_sender: Sender<()>,
}
#[derive(Default, Debug)]
pub struct InterruptInfo {
pub probe_network: bool,
pub msg_id: Option<MsgId>,
}
impl InterruptInfo {
pub fn new(probe_network: bool, msg_id: Option<MsgId>) -> Self {
Self {
probe_network,
msg_id,
}
}
}