mirror of
https://github.com/chatmail/core.git
synced 2026-05-17 05:46:30 +03:00
fix: avoid lock for probe_network, avoiding deadlock on startup
Closes #1532
This commit is contained in:
@@ -199,17 +199,17 @@ impl Context {
|
|||||||
}
|
}
|
||||||
Config::InboxWatch => {
|
Config::InboxWatch => {
|
||||||
let ret = self.sql.set_raw_config(self, key, value).await;
|
let ret = self.sql.set_raw_config(self, key, value).await;
|
||||||
self.interrupt_inbox().await;
|
self.interrupt_inbox(false).await;
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
Config::SentboxWatch => {
|
Config::SentboxWatch => {
|
||||||
let ret = self.sql.set_raw_config(self, key, value).await;
|
let ret = self.sql.set_raw_config(self, key, value).await;
|
||||||
self.interrupt_sentbox().await;
|
self.interrupt_sentbox(false).await;
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
Config::MvboxWatch => {
|
Config::MvboxWatch => {
|
||||||
let ret = self.sql.set_raw_config(self, key, value).await;
|
let ret = self.sql.set_raw_config(self, key, value).await;
|
||||||
self.interrupt_mvbox().await;
|
self.interrupt_mvbox(false).await;
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
Config::Selfstatus => {
|
Config::Selfstatus => {
|
||||||
|
|||||||
@@ -140,8 +140,10 @@ impl Context {
|
|||||||
info!(self, "starting IO");
|
info!(self, "starting IO");
|
||||||
assert!(!self.is_io_running().await, "context is already running");
|
assert!(!self.is_io_running().await, "context is already running");
|
||||||
|
|
||||||
let l = &mut *self.inner.scheduler.write().await;
|
{
|
||||||
l.start(self.clone()).await;
|
let l = &mut *self.inner.scheduler.write().await;
|
||||||
|
l.start(self.clone()).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns if the IO scheduler is running.
|
/// Returns if the IO scheduler is running.
|
||||||
|
|||||||
130
src/imap/idle.rs
130
src/imap/idle.rs
@@ -34,7 +34,7 @@ impl Imap {
|
|||||||
self.config.can_idle
|
self.config.can_idle
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn idle(&mut self, context: &Context, watch_folder: Option<String>) -> Result<()> {
|
pub async fn idle(&mut self, context: &Context, watch_folder: Option<String>) -> Result<bool> {
|
||||||
use futures::future::FutureExt;
|
use futures::future::FutureExt;
|
||||||
|
|
||||||
if !self.can_idle() {
|
if !self.can_idle() {
|
||||||
@@ -46,6 +46,7 @@ impl Imap {
|
|||||||
|
|
||||||
let session = self.session.take();
|
let session = self.session.take();
|
||||||
let timeout = Duration::from_secs(23 * 60);
|
let timeout = Duration::from_secs(23 * 60);
|
||||||
|
let mut probe_network = false;
|
||||||
|
|
||||||
if let Some(session) = session {
|
if let Some(session) = session {
|
||||||
let mut handle = session.idle();
|
let mut handle = session.idle();
|
||||||
@@ -55,6 +56,11 @@ impl Imap {
|
|||||||
|
|
||||||
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
|
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
|
||||||
|
|
||||||
|
enum Event {
|
||||||
|
IdleResponse(IdleResponse),
|
||||||
|
Interrupt(bool),
|
||||||
|
}
|
||||||
|
|
||||||
if self.skip_next_idle_wait {
|
if self.skip_next_idle_wait {
|
||||||
// interrupt_idle has happened before we
|
// interrupt_idle has happened before we
|
||||||
// provided self.interrupt
|
// provided self.interrupt
|
||||||
@@ -65,23 +71,27 @@ impl Imap {
|
|||||||
info!(context, "Idle wait was skipped");
|
info!(context, "Idle wait was skipped");
|
||||||
} else {
|
} else {
|
||||||
info!(context, "Idle entering wait-on-remote state");
|
info!(context, "Idle entering wait-on-remote state");
|
||||||
let fut = idle_wait.race(
|
let fut = idle_wait.map(|ev| ev.map(Event::IdleResponse)).race(
|
||||||
self.idle_interrupt
|
self.idle_interrupt.recv().map(|probe_network| {
|
||||||
.recv()
|
Ok(Event::Interrupt(probe_network.unwrap_or_default()))
|
||||||
.map(|_| Ok(IdleResponse::ManualInterrupt)),
|
}),
|
||||||
);
|
);
|
||||||
|
|
||||||
match fut.await {
|
match fut.await {
|
||||||
Ok(IdleResponse::NewData(_)) => {
|
Ok(Event::IdleResponse(IdleResponse::NewData(_))) => {
|
||||||
info!(context, "Idle has NewData");
|
info!(context, "Idle has NewData");
|
||||||
}
|
}
|
||||||
// TODO: idle_wait does not distinguish manual interrupts
|
// TODO: idle_wait does not distinguish manual interrupts
|
||||||
// from Timeouts if we would know it's a Timeout we could bail
|
// from Timeouts if we would know it's a Timeout we could bail
|
||||||
// directly and reconnect .
|
// directly and reconnect .
|
||||||
Ok(IdleResponse::Timeout) => {
|
Ok(Event::IdleResponse(IdleResponse::Timeout)) => {
|
||||||
info!(context, "Idle-wait timeout or interruption");
|
info!(context, "Idle-wait timeout or interruption");
|
||||||
}
|
}
|
||||||
Ok(IdleResponse::ManualInterrupt) => {
|
Ok(Event::IdleResponse(IdleResponse::ManualInterrupt)) => {
|
||||||
|
info!(context, "Idle wait was interrupted");
|
||||||
|
}
|
||||||
|
Ok(Event::Interrupt(probe)) => {
|
||||||
|
probe_network = probe;
|
||||||
info!(context, "Idle wait was interrupted");
|
info!(context, "Idle wait was interrupted");
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@@ -115,16 +125,26 @@ impl Imap {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(probe_network)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn fake_idle(&mut self, context: &Context, watch_folder: Option<String>) {
|
pub(crate) async fn fake_idle(
|
||||||
|
&mut self,
|
||||||
|
context: &Context,
|
||||||
|
watch_folder: Option<String>,
|
||||||
|
) -> bool {
|
||||||
// Idle using polling. This is also needed if we're not yet configured -
|
// Idle using polling. This is also needed if we're not yet configured -
|
||||||
// in this case, we're waiting for a configure job (and an interrupt).
|
// in this case, we're waiting for a configure job (and an interrupt).
|
||||||
|
|
||||||
let fake_idle_start_time = SystemTime::now();
|
let fake_idle_start_time = SystemTime::now();
|
||||||
info!(context, "IMAP-fake-IDLEing...");
|
info!(context, "IMAP-fake-IDLEing...");
|
||||||
|
|
||||||
|
// Do not poll, just wait for an interrupt when no folder is passed in.
|
||||||
|
if watch_folder.is_none() {
|
||||||
|
return self.idle_interrupt.recv().await.unwrap_or_default();
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut probe_network = false;
|
||||||
if self.skip_next_idle_wait {
|
if self.skip_next_idle_wait {
|
||||||
// interrupt_idle has happened before we
|
// interrupt_idle has happened before we
|
||||||
// provided self.interrupt
|
// provided self.interrupt
|
||||||
@@ -135,53 +155,61 @@ impl Imap {
|
|||||||
// TODO: grow sleep durations / make them more flexible
|
// TODO: grow sleep durations / make them more flexible
|
||||||
let mut interval = async_std::stream::interval(Duration::from_secs(60));
|
let mut interval = async_std::stream::interval(Duration::from_secs(60));
|
||||||
|
|
||||||
|
enum Event {
|
||||||
|
Tick,
|
||||||
|
Interrupt(bool),
|
||||||
|
}
|
||||||
// loop until we are interrupted or if we fetched something
|
// loop until we are interrupted or if we fetched something
|
||||||
loop {
|
probe_network =
|
||||||
use futures::future::FutureExt;
|
loop {
|
||||||
match interval
|
use futures::future::FutureExt;
|
||||||
.next()
|
match interval
|
||||||
.race(self.idle_interrupt.recv().map(|_| None))
|
.next()
|
||||||
.await
|
.map(|_| Event::Tick)
|
||||||
{
|
.race(self.idle_interrupt.recv().map(|probe_network| {
|
||||||
Some(_) => {
|
Event::Interrupt(probe_network.unwrap_or_default())
|
||||||
// try to connect with proper login params
|
}))
|
||||||
// (setup_handle_if_needed might not know about them if we
|
.await
|
||||||
// never successfully connected)
|
{
|
||||||
if let Err(err) = self.connect_configured(context).await {
|
Event::Tick => {
|
||||||
warn!(context, "fake_idle: could not connect: {}", err);
|
// try to connect with proper login params
|
||||||
continue;
|
// (setup_handle_if_needed might not know about them if we
|
||||||
}
|
// never successfully connected)
|
||||||
if self.config.can_idle {
|
if let Err(err) = self.connect_configured(context).await {
|
||||||
// we only fake-idled because network was gone during IDLE, probably
|
warn!(context, "fake_idle: could not connect: {}", err);
|
||||||
break;
|
continue;
|
||||||
}
|
}
|
||||||
info!(context, "fake_idle is connected");
|
if self.config.can_idle {
|
||||||
// we are connected, let's see if fetching messages results
|
// we only fake-idled because network was gone during IDLE, probably
|
||||||
// in anything. If so, we behave as if IDLE had data but
|
break false;
|
||||||
// will have already fetched the messages so perform_*_fetch
|
}
|
||||||
// will not find any new.
|
info!(context, "fake_idle is connected");
|
||||||
|
// we are connected, let's see if fetching messages results
|
||||||
|
// in anything. If so, we behave as if IDLE had data but
|
||||||
|
// will have already fetched the messages so perform_*_fetch
|
||||||
|
// will not find any new.
|
||||||
|
|
||||||
if let Some(ref watch_folder) = watch_folder {
|
if let Some(ref watch_folder) = watch_folder {
|
||||||
match self.fetch_new_messages(context, watch_folder).await {
|
match self.fetch_new_messages(context, watch_folder).await {
|
||||||
Ok(res) => {
|
Ok(res) => {
|
||||||
info!(context, "fetch_new_messages returned {:?}", res);
|
info!(context, "fetch_new_messages returned {:?}", res);
|
||||||
if res {
|
if res {
|
||||||
break;
|
break false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!(context, "could not fetch from folder: {}", err);
|
||||||
|
self.trigger_reconnect()
|
||||||
}
|
}
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
error!(context, "could not fetch from folder: {}", err);
|
|
||||||
self.trigger_reconnect()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Event::Interrupt(probe_network) => {
|
||||||
|
// Interrupt
|
||||||
|
break probe_network;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
None => {
|
};
|
||||||
// Interrupt
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
@@ -193,5 +221,7 @@ impl Imap {
|
|||||||
.as_millis() as f64
|
.as_millis() as f64
|
||||||
/ 1000.,
|
/ 1000.,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
probe_network
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -109,7 +109,7 @@ const SELECT_ALL: &str = "1:*";
|
|||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Imap {
|
pub struct Imap {
|
||||||
idle_interrupt: Receiver<()>,
|
idle_interrupt: Receiver<bool>,
|
||||||
config: ImapConfig,
|
config: ImapConfig,
|
||||||
session: Option<Session>,
|
session: Option<Session>,
|
||||||
connected: bool,
|
connected: bool,
|
||||||
@@ -181,7 +181,7 @@ impl Default for ImapConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Imap {
|
impl Imap {
|
||||||
pub fn new(idle_interrupt: Receiver<()>) -> Self {
|
pub fn new(idle_interrupt: Receiver<bool>) -> Self {
|
||||||
Imap {
|
Imap {
|
||||||
idle_interrupt,
|
idle_interrupt,
|
||||||
config: Default::default(),
|
config: Default::default(),
|
||||||
|
|||||||
70
src/job.rs
70
src/job.rs
@@ -1028,13 +1028,15 @@ pub async fn add(context: &Context, job: Job) {
|
|||||||
| Action::DeleteMsgOnImap
|
| Action::DeleteMsgOnImap
|
||||||
| Action::MarkseenMsgOnImap
|
| Action::MarkseenMsgOnImap
|
||||||
| Action::MoveMsg => {
|
| Action::MoveMsg => {
|
||||||
context.interrupt_inbox().await;
|
info!(context, "interrupt: imap");
|
||||||
|
context.interrupt_inbox(false).await;
|
||||||
}
|
}
|
||||||
Action::MaybeSendLocations
|
Action::MaybeSendLocations
|
||||||
| Action::MaybeSendLocationsEnded
|
| Action::MaybeSendLocationsEnded
|
||||||
| Action::SendMdn
|
| Action::SendMdn
|
||||||
| Action::SendMsgToSmtp => {
|
| Action::SendMsgToSmtp => {
|
||||||
context.interrupt_smtp().await;
|
info!(context, "interrupt: smtp");
|
||||||
|
context.interrupt_smtp(false).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1088,13 +1090,13 @@ LIMIT 1;
|
|||||||
.sql
|
.sql
|
||||||
.query_row_optional(query, params.clone(), |row| {
|
.query_row_optional(query, params.clone(), |row| {
|
||||||
let job = Job {
|
let job = Job {
|
||||||
job_id: row.get(0)?,
|
job_id: row.get("id")?,
|
||||||
action: row.get(1)?,
|
action: row.get("action")?,
|
||||||
foreign_id: row.get(2)?,
|
foreign_id: row.get("foreign_id")?,
|
||||||
desired_timestamp: row.get(5)?,
|
desired_timestamp: row.get("desired_timestamp")?,
|
||||||
added_timestamp: row.get(4)?,
|
added_timestamp: row.get("added_timestamp")?,
|
||||||
tries: row.get(6)?,
|
tries: row.get("tries")?,
|
||||||
param: row.get::<_, String>(3)?.parse().unwrap_or_default(),
|
param: row.get::<_, String>("param")?.parse().unwrap_or_default(),
|
||||||
pending_error: None,
|
pending_error: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -1104,8 +1106,9 @@ LIMIT 1;
|
|||||||
|
|
||||||
match job_res {
|
match job_res {
|
||||||
Ok(job) => break job,
|
Ok(job) => break job,
|
||||||
Err(_) => {
|
Err(err) => {
|
||||||
// Remove invalid job from the DB
|
// Remove invalid job from the DB
|
||||||
|
info!(context, "cleaning up job, because of {}", err);
|
||||||
|
|
||||||
// TODO: improve by only doing a single query
|
// TODO: improve by only doing a single query
|
||||||
match context
|
match context
|
||||||
@@ -1116,7 +1119,7 @@ LIMIT 1;
|
|||||||
Ok(id) => {
|
Ok(id) => {
|
||||||
context
|
context
|
||||||
.sql
|
.sql
|
||||||
.execute("DELETE FROM jobs WHERE id=?", paramsv![id])
|
.execute("DELETE FROM jobs WHERE id=?;", paramsv![id])
|
||||||
.await
|
.await
|
||||||
.ok();
|
.ok();
|
||||||
}
|
}
|
||||||
@@ -1129,21 +1132,26 @@ LIMIT 1;
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if thread == Thread::Imap {
|
match thread {
|
||||||
if let Some(job) = job {
|
Thread::Unknown => {
|
||||||
if job.action < Action::DeleteMsgOnImap {
|
error!(context, "unknown thread for job");
|
||||||
load_imap_deletion_job(context)
|
None
|
||||||
.await
|
|
||||||
.unwrap_or_default()
|
|
||||||
.or(Some(job))
|
|
||||||
} else {
|
|
||||||
Some(job)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
load_imap_deletion_job(context).await.unwrap_or_default()
|
|
||||||
}
|
}
|
||||||
} else {
|
Thread::Imap => {
|
||||||
job
|
if let Some(job) = job {
|
||||||
|
if job.action < Action::DeleteMsgOnImap {
|
||||||
|
load_imap_deletion_job(context)
|
||||||
|
.await
|
||||||
|
.unwrap_or_default()
|
||||||
|
.or(Some(job))
|
||||||
|
} else {
|
||||||
|
Some(job)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
load_imap_deletion_job(context).await.unwrap_or_default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Thread::Smtp => job,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1175,7 +1183,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[async_std::test]
|
#[async_std::test]
|
||||||
async fn test_load_next_job() {
|
async fn test_load_next_job_two() {
|
||||||
// We want to ensure that loading jobs skips over jobs which
|
// We want to ensure that loading jobs skips over jobs which
|
||||||
// fails to load from the database instead of failing to load
|
// fails to load from the database instead of failing to load
|
||||||
// all jobs.
|
// all jobs.
|
||||||
@@ -1188,4 +1196,14 @@ mod tests {
|
|||||||
let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await;
|
let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await;
|
||||||
assert!(jobs.is_some());
|
assert!(jobs.is_some());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_std::test]
|
||||||
|
async fn test_load_next_job_one() {
|
||||||
|
let t = dummy_context().await;
|
||||||
|
|
||||||
|
insert_job(&t.ctx, 1).await;
|
||||||
|
|
||||||
|
let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await;
|
||||||
|
assert!(jobs.is_some());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
155
src/scheduler.rs
155
src/scheduler.rs
@@ -2,8 +2,6 @@ use async_std::prelude::*;
|
|||||||
use async_std::sync::{channel, Receiver, Sender};
|
use async_std::sync::{channel, Receiver, Sender};
|
||||||
use async_std::task;
|
use async_std::task;
|
||||||
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use crate::context::Context;
|
use crate::context::Context;
|
||||||
use crate::imap::Imap;
|
use crate::imap::Imap;
|
||||||
use crate::job::{self, Thread};
|
use crate::job::{self, Thread};
|
||||||
@@ -25,30 +23,45 @@ pub(crate) enum Scheduler {
|
|||||||
sentbox_handle: Option<task::JoinHandle<()>>,
|
sentbox_handle: Option<task::JoinHandle<()>>,
|
||||||
smtp: SmtpConnectionState,
|
smtp: SmtpConnectionState,
|
||||||
smtp_handle: Option<task::JoinHandle<()>>,
|
smtp_handle: Option<task::JoinHandle<()>>,
|
||||||
probe_network: bool,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Context {
|
impl Context {
|
||||||
/// Indicate that the network likely has come back.
|
/// Indicate that the network likely has come back.
|
||||||
pub async fn maybe_network(&self) {
|
pub async fn maybe_network(&self) {
|
||||||
self.scheduler.write().await.maybe_network().await;
|
self.scheduler.read().await.maybe_network().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn interrupt_inbox(&self) {
|
pub(crate) async fn interrupt_inbox(&self, probe_network: bool) {
|
||||||
self.scheduler.read().await.interrupt_inbox().await;
|
self.scheduler
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.interrupt_inbox(probe_network)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn interrupt_sentbox(&self) {
|
pub(crate) async fn interrupt_sentbox(&self, probe_network: bool) {
|
||||||
self.scheduler.read().await.interrupt_sentbox().await;
|
self.scheduler
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.interrupt_sentbox(probe_network)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn interrupt_mvbox(&self) {
|
pub(crate) async fn interrupt_mvbox(&self, probe_network: bool) {
|
||||||
self.scheduler.read().await.interrupt_mvbox().await;
|
self.scheduler
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.interrupt_mvbox(probe_network)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn interrupt_smtp(&self) {
|
pub(crate) async fn interrupt_smtp(&self, probe_network: bool) {
|
||||||
self.scheduler.read().await.interrupt_smtp().await;
|
self.scheduler
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.interrupt_smtp(probe_network)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -73,26 +86,23 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
|
|||||||
|
|
||||||
// track number of continously executed jobs
|
// track number of continously executed jobs
|
||||||
let mut jobs_loaded = 0;
|
let mut jobs_loaded = 0;
|
||||||
|
let mut probe_network = false;
|
||||||
loop {
|
loop {
|
||||||
let probe_network = ctx.scheduler.read().await.get_probe_network();
|
match job::load_next(&ctx, Thread::Imap, probe_network).await {
|
||||||
match job::load_next(&ctx, Thread::Imap, probe_network)
|
Some(job) if jobs_loaded <= 20 => {
|
||||||
.timeout(Duration::from_millis(200))
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(Some(job)) if jobs_loaded <= 20 => {
|
|
||||||
jobs_loaded += 1;
|
jobs_loaded += 1;
|
||||||
job::perform_job(&ctx, job::Connection::Inbox(&mut connection), job).await;
|
job::perform_job(&ctx, job::Connection::Inbox(&mut connection), job).await;
|
||||||
ctx.scheduler.write().await.set_probe_network(false);
|
probe_network = false;
|
||||||
}
|
}
|
||||||
Ok(Some(job)) => {
|
Some(job) => {
|
||||||
// Let the fetch run, but return back to the job afterwards.
|
// Let the fetch run, but return back to the job afterwards.
|
||||||
info!(ctx, "postponing imap-job {} to run fetch...", job);
|
info!(ctx, "postponing imap-job {} to run fetch...", job);
|
||||||
jobs_loaded = 0;
|
jobs_loaded = 0;
|
||||||
fetch(&ctx, &mut connection).await;
|
fetch(&ctx, &mut connection).await;
|
||||||
}
|
}
|
||||||
Ok(None) | Err(async_std::future::TimeoutError { .. }) => {
|
None => {
|
||||||
jobs_loaded = 0;
|
jobs_loaded = 0;
|
||||||
fetch_idle(&ctx, &mut connection).await;
|
probe_network = fetch_idle(&ctx, &mut connection).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -126,7 +136,7 @@ async fn fetch(ctx: &Context, connection: &mut Imap) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_idle(ctx: &Context, connection: &mut Imap) {
|
async fn fetch_idle(ctx: &Context, connection: &mut Imap) -> bool {
|
||||||
match get_watch_folder(&ctx, "configured_inbox_folder").await {
|
match get_watch_folder(&ctx, "configured_inbox_folder").await {
|
||||||
Some(watch_folder) => {
|
Some(watch_folder) => {
|
||||||
// fetch
|
// fetch
|
||||||
@@ -144,14 +154,15 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap) {
|
|||||||
.await
|
.await
|
||||||
.unwrap_or_else(|err| {
|
.unwrap_or_else(|err| {
|
||||||
error!(ctx, "{}", err);
|
error!(ctx, "{}", err);
|
||||||
});
|
false
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
connection.fake_idle(&ctx, Some(watch_folder)).await;
|
connection.fake_idle(&ctx, Some(watch_folder)).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
warn!(ctx, "Can not watch inbox folder, not set");
|
warn!(ctx, "Can not watch inbox folder, not set");
|
||||||
connection.fake_idle(&ctx, None).await;
|
connection.fake_idle(&ctx, None).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -199,6 +210,7 @@ async fn simple_imap_loop(
|
|||||||
.await
|
.await
|
||||||
.unwrap_or_else(|err| {
|
.unwrap_or_else(|err| {
|
||||||
error!(ctx, "{}", err);
|
error!(ctx, "{}", err);
|
||||||
|
false
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
connection.fake_idle(&ctx, Some(watch_folder)).await;
|
connection.fake_idle(&ctx, Some(watch_folder)).await;
|
||||||
@@ -210,7 +222,7 @@ async fn simple_imap_loop(
|
|||||||
"No watch folder found for {}, skipping",
|
"No watch folder found for {}, skipping",
|
||||||
folder.as_ref()
|
folder.as_ref()
|
||||||
);
|
);
|
||||||
connection.fake_idle(&ctx, None).await
|
connection.fake_idle(&ctx, None).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -241,21 +253,20 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
|
|||||||
let fut = async move {
|
let fut = async move {
|
||||||
started.send(()).await;
|
started.send(()).await;
|
||||||
let ctx = ctx1;
|
let ctx = ctx1;
|
||||||
|
|
||||||
|
let mut probe_network = false;
|
||||||
loop {
|
loop {
|
||||||
let probe_network = ctx.scheduler.read().await.get_probe_network();
|
match job::load_next(&ctx, Thread::Smtp, probe_network).await {
|
||||||
match job::load_next(&ctx, Thread::Smtp, probe_network)
|
Some(job) => {
|
||||||
.timeout(Duration::from_millis(200))
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(Some(job)) => {
|
|
||||||
info!(ctx, "executing smtp job");
|
info!(ctx, "executing smtp job");
|
||||||
job::perform_job(&ctx, job::Connection::Smtp(&mut connection), job).await;
|
job::perform_job(&ctx, job::Connection::Smtp(&mut connection), job).await;
|
||||||
ctx.scheduler.write().await.set_probe_network(false);
|
probe_network = false;
|
||||||
}
|
}
|
||||||
Ok(None) | Err(async_std::future::TimeoutError { .. }) => {
|
None => {
|
||||||
info!(ctx, "smtp fake idle");
|
|
||||||
// Fake Idle
|
// Fake Idle
|
||||||
idle_interrupt_receiver.recv().await.ok();
|
info!(ctx, "smtp fake idle - started");
|
||||||
|
probe_network = idle_interrupt_receiver.recv().await.unwrap_or_default();
|
||||||
|
info!(ctx, "smtp fake idle - interrupted")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -284,7 +295,6 @@ impl Scheduler {
|
|||||||
mvbox,
|
mvbox,
|
||||||
sentbox,
|
sentbox,
|
||||||
smtp,
|
smtp,
|
||||||
probe_network: false,
|
|
||||||
inbox_handle: None,
|
inbox_handle: None,
|
||||||
mvbox_handle: None,
|
mvbox_handle: None,
|
||||||
sentbox_handle: None,
|
sentbox_handle: None,
|
||||||
@@ -349,58 +359,39 @@ impl Scheduler {
|
|||||||
info!(ctx, "scheduler is running");
|
info!(ctx, "scheduler is running");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_probe_network(&mut self, val: bool) {
|
async fn maybe_network(&self) {
|
||||||
match self {
|
|
||||||
Scheduler::Running {
|
|
||||||
ref mut probe_network,
|
|
||||||
..
|
|
||||||
} => {
|
|
||||||
*probe_network = val;
|
|
||||||
}
|
|
||||||
_ => panic!("set_probe_network can only be called when running"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_probe_network(&self) -> bool {
|
|
||||||
match self {
|
|
||||||
Scheduler::Running { probe_network, .. } => *probe_network,
|
|
||||||
_ => panic!("get_probe_network can only be called when running"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn maybe_network(&mut self) {
|
|
||||||
if !self.is_running() {
|
if !self.is_running() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
self.set_probe_network(true);
|
|
||||||
self.interrupt_inbox()
|
self.interrupt_inbox(true)
|
||||||
.join(self.interrupt_mvbox())
|
.join(self.interrupt_mvbox(true))
|
||||||
.join(self.interrupt_sentbox())
|
.join(self.interrupt_sentbox(true))
|
||||||
.join(self.interrupt_smtp())
|
.join(self.interrupt_smtp(true))
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn interrupt_inbox(&self) {
|
async fn interrupt_inbox(&self, probe_network: bool) {
|
||||||
if let Scheduler::Running { ref inbox, .. } = self {
|
if let Scheduler::Running { ref inbox, .. } = self {
|
||||||
inbox.interrupt().await;
|
inbox.interrupt(probe_network).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn interrupt_mvbox(&self) {
|
async fn interrupt_mvbox(&self, probe_network: bool) {
|
||||||
if let Scheduler::Running { ref mvbox, .. } = self {
|
if let Scheduler::Running { ref mvbox, .. } = self {
|
||||||
mvbox.interrupt().await;
|
mvbox.interrupt(probe_network).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn interrupt_sentbox(&self) {
|
async fn interrupt_sentbox(&self, probe_network: bool) {
|
||||||
if let Scheduler::Running { ref sentbox, .. } = self {
|
if let Scheduler::Running { ref sentbox, .. } = self {
|
||||||
sentbox.interrupt().await;
|
sentbox.interrupt(probe_network).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn interrupt_smtp(&self) {
|
async fn interrupt_smtp(&self, probe_network: bool) {
|
||||||
if let Scheduler::Running { ref smtp, .. } = self {
|
if let Scheduler::Running { ref smtp, .. } = self {
|
||||||
smtp.interrupt().await;
|
smtp.interrupt(probe_network).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -469,7 +460,7 @@ struct ConnectionState {
|
|||||||
/// Channel to interrupt the whole connection.
|
/// Channel to interrupt the whole connection.
|
||||||
stop_sender: Sender<()>,
|
stop_sender: Sender<()>,
|
||||||
/// Channel to interrupt idle.
|
/// Channel to interrupt idle.
|
||||||
idle_interrupt_sender: Sender<()>,
|
idle_interrupt_sender: Sender<bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ConnectionState {
|
impl ConnectionState {
|
||||||
@@ -481,11 +472,9 @@ impl ConnectionState {
|
|||||||
self.shutdown_receiver.recv().await.ok();
|
self.shutdown_receiver.recv().await.ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn interrupt(&self) {
|
async fn interrupt(&self, probe_network: bool) {
|
||||||
if !self.idle_interrupt_sender.is_full() {
|
// Use try_send to avoid blocking on interrupts.
|
||||||
// Use try_send to avoid blocking on interrupts.
|
self.idle_interrupt_sender.try_send(probe_network).ok();
|
||||||
self.idle_interrupt_sender.send(()).await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -519,8 +508,8 @@ impl SmtpConnectionState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Interrupt any form of idle.
|
/// Interrupt any form of idle.
|
||||||
async fn interrupt(&self) {
|
async fn interrupt(&self, probe_network: bool) {
|
||||||
self.state.interrupt().await;
|
self.state.interrupt(probe_network).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Shutdown this connection completely.
|
/// Shutdown this connection completely.
|
||||||
@@ -534,7 +523,7 @@ struct SmtpConnectionHandlers {
|
|||||||
connection: Smtp,
|
connection: Smtp,
|
||||||
stop_receiver: Receiver<()>,
|
stop_receiver: Receiver<()>,
|
||||||
shutdown_sender: Sender<()>,
|
shutdown_sender: Sender<()>,
|
||||||
idle_interrupt_receiver: Receiver<()>,
|
idle_interrupt_receiver: Receiver<bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -546,8 +535,8 @@ impl ImapConnectionState {
|
|||||||
/// Construct a new connection.
|
/// Construct a new connection.
|
||||||
fn new() -> (Self, ImapConnectionHandlers) {
|
fn new() -> (Self, ImapConnectionHandlers) {
|
||||||
let (stop_sender, stop_receiver) = channel(1);
|
let (stop_sender, stop_receiver) = channel(1);
|
||||||
let (idle_interrupt_sender, idle_interrupt_receiver) = channel(1);
|
|
||||||
let (shutdown_sender, shutdown_receiver) = channel(1);
|
let (shutdown_sender, shutdown_receiver) = channel(1);
|
||||||
|
let (idle_interrupt_sender, idle_interrupt_receiver) = channel(1);
|
||||||
|
|
||||||
let handlers = ImapConnectionHandlers {
|
let handlers = ImapConnectionHandlers {
|
||||||
connection: Imap::new(idle_interrupt_receiver),
|
connection: Imap::new(idle_interrupt_receiver),
|
||||||
@@ -567,8 +556,8 @@ impl ImapConnectionState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Interrupt any form of idle.
|
/// Interrupt any form of idle.
|
||||||
async fn interrupt(&self) {
|
async fn interrupt(&self, probe_network: bool) {
|
||||||
self.state.interrupt().await;
|
self.state.interrupt(probe_network).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Shutdown this connection completely.
|
/// Shutdown this connection completely.
|
||||||
|
|||||||
Reference in New Issue
Block a user