hook up scheduler with jobs

This commit is contained in:
dignifiedquire
2020-03-18 00:01:59 +01:00
parent ce5b95f8e5
commit 846ef043d5
7 changed files with 604 additions and 722 deletions

View File

@@ -1,9 +1,12 @@
use async_std::prelude::*;
use async_std::sync::{channel, Receiver, Sender};
use async_std::task;
const MAX_JOBS_WAITING: usize = 50;
use std::time::Duration;
use crate::context::Context;
use crate::imap::Imap;
use crate::job::{self, Thread};
use crate::smtp::Smtp;
/// Job and connection scheduler.
@@ -11,16 +14,34 @@ use crate::smtp::Smtp;
pub(crate) enum Scheduler {
Stopped,
Running {
inbox: ImapConnectionState<InboxJob>,
mvbox: ImapConnectionState<MvboxJob>,
sentbox: ImapConnectionState<SentboxJob>,
inbox: ImapConnectionState,
mvbox: ImapConnectionState,
sentbox: ImapConnectionState,
smtp: SmtpConnectionState,
},
}
impl Context {
pub(crate) async fn interrupt_inbox(&self) {
self.scheduler.read().await.interrupt_inbox().await;
}
pub(crate) async fn interrupt_sentbox(&self) {
self.scheduler.read().await.interrupt_sentbox().await;
}
pub(crate) async fn interrupt_mvbox(&self) {
self.scheduler.read().await.interrupt_mvbox().await;
}
pub(crate) async fn interrupt_smtp(&self) {
self.scheduler.read().await.interrupt_smtp().await;
}
}
impl Scheduler {
/// Start the scheduler, panics if it is already running.
pub async fn run(&mut self) {
pub async fn run(&mut self, ctx: Context) {
match self {
Scheduler::Stopped => {
let (
@@ -34,13 +55,96 @@ impl Scheduler {
.join(ImapConnectionState::new())
.join(SmtpConnectionState::new())
.await;
*self = Scheduler::Running {
inbox,
mvbox,
sentbox,
smtp,
};
let ctx1 = ctx.clone();
task::spawn(async move {
let ImapConnectionHandlers {
mut connection,
stop_receiver,
shutdown_sender,
} = inbox_handlers;
let fut = async move {
loop {
// TODO: correct value
let probe_network = false;
match job::load_next(&ctx1, Thread::Imap, probe_network)
.timeout(Duration::from_millis(200))
.await
{
Ok(Some(job)) => {
job::perform_job(
&ctx1,
job::Connection::Inbox(&mut connection),
job,
)
.await;
}
Ok(None) | Err(async_std::future::TimeoutError { .. }) => {
// fetch
connection.fetch(&ctx1, "TODO").await;
// idle
connection.idle(&ctx1, Some("TODO".into())).await;
}
}
}
};
fut.race(stop_receiver.recv()).await;
shutdown_sender.send(()).await;
});
// TODO: mvbox
// TODO: sentbox
let ctx1 = ctx.clone();
task::spawn(async move {
let SmtpConnectionHandlers {
mut connection,
stop_receiver,
shutdown_sender,
idle_interrupt_receiver,
} = smtp_handlers;
let fut = async move {
loop {
// TODO: correct value
let probe_network = false;
match job::load_next(&ctx1, Thread::Smtp, probe_network)
.timeout(Duration::from_millis(200))
.await
{
Ok(Some(job)) => {
job::perform_job(
&ctx1,
job::Connection::Smtp(&mut connection),
job,
)
.await;
}
Ok(None) | Err(async_std::future::TimeoutError { .. }) => {
use futures::future::FutureExt;
// Fake Idle
async_std::task::sleep(Duration::from_millis(500))
.race(idle_interrupt_receiver.recv().map(|_| ()))
.await;
}
}
}
};
fut.race(stop_receiver.recv()).await;
shutdown_sender.send(()).await;
});
}
Scheduler::Running { .. } => {
// TODO: return an error
@@ -49,6 +153,41 @@ impl Scheduler {
}
}
fn inbox(&self) -> Option<&ImapConnectionState> {
match self {
Scheduler::Running { ref inbox, .. } => Some(inbox),
_ => None,
}
}
async fn interrupt_inbox(&self) {
match self {
Scheduler::Running { ref inbox, .. } => inbox.interrupt().await,
_ => panic!("interrupt_imap must be called in running mode"),
}
}
async fn interrupt_mvbox(&self) {
match self {
Scheduler::Running { ref mvbox, .. } => mvbox.interrupt().await,
_ => panic!("interrupt_mvbox must be called in running mode"),
}
}
async fn interrupt_sentbox(&self) {
match self {
Scheduler::Running { ref sentbox, .. } => sentbox.interrupt().await,
_ => panic!("interrupt_sentbox must be called in running mode"),
}
}
async fn interrupt_smtp(&self) {
match self {
Scheduler::Running { ref smtp, .. } => smtp.interrupt().await,
_ => panic!("interrupt_smtp must be called in running mode"),
}
}
/// Halt the scheduler, panics if it is already stopped.
pub async fn stop(&mut self) {
match self {
@@ -90,54 +229,51 @@ impl Scheduler {
/// Connection state logic shared between imap and smtp connections.
#[derive(Debug)]
struct ConnectionState<T> {
struct ConnectionState {
/// Channel to notify that shutdown has completed.
shutdown_receiver: Receiver<()>,
/// Channel to interrupt the whole connection.
stop_sender: Sender<()>,
/// Channel to receive new jobs.
jobs_receiver: Receiver<T>,
/// Channel to schedule new jobs.
jobs_sender: Sender<T>,
/// Channel to interrupt idle.
idle_interrupt_sender: Sender<()>,
}
impl<T> ConnectionState<T> {
/// Send a new job.
pub async fn send_job(&self, job: T) {
self.jobs_sender.send(job).await;
}
impl ConnectionState {
/// Shutdown this connection completely.
pub async fn stop(&self) {
async fn stop(&self) {
// Trigger shutdown of the run loop.
self.stop_sender.send(()).await;
// Wait for a notification that the run loop has been shutdown.
self.shutdown_receiver.recv().await;
}
async fn interrupt(&self) {
self.idle_interrupt_sender.send(()).await;
}
}
#[derive(Debug)]
pub(crate) struct SmtpConnectionState {
state: ConnectionState<SmtpJob>,
state: ConnectionState,
}
impl SmtpConnectionState {
async fn new() -> (Self, SmtpConnectionHandlers) {
let (jobs_sender, jobs_receiver) = channel(50);
let (stop_sender, stop_receiver) = channel(1);
let (shutdown_sender, shutdown_receiver) = channel(1);
let (idle_interrupt_sender, idle_interrupt_receiver) = channel(1);
let handlers = SmtpConnectionHandlers {
connection: Smtp::new(),
stop_receiver,
shutdown_sender,
idle_interrupt_receiver,
};
let state = ConnectionState {
idle_interrupt_sender,
shutdown_receiver,
stop_sender,
jobs_sender,
jobs_receiver,
};
let conn = SmtpConnectionState { state };
@@ -145,9 +281,9 @@ impl SmtpConnectionState {
(conn, handlers)
}
/// Send a new job.
async fn send_job(&self, job: SmtpJob) {
self.state.send_job(job).await;
/// Interrupt any form of idle.
async fn interrupt(&self) {
self.state.interrupt().await;
}
/// Shutdown this connection completely.
@@ -161,19 +297,17 @@ struct SmtpConnectionHandlers {
connection: Smtp,
stop_receiver: Receiver<()>,
shutdown_sender: Sender<()>,
idle_interrupt_receiver: Receiver<()>,
}
#[derive(Debug)]
pub(crate) struct ImapConnectionState<T> {
/// Channel to interrupt idle.
idle_interrupt_sender: Sender<()>,
state: ConnectionState<T>,
pub(crate) struct ImapConnectionState {
state: ConnectionState,
}
impl<T> ImapConnectionState<T> {
impl ImapConnectionState {
/// Construct a new connection.
async fn new() -> (Self, ImapConnectionHandlers) {
let (jobs_sender, jobs_receiver) = channel(MAX_JOBS_WAITING);
let (stop_sender, stop_receiver) = channel(1);
let (idle_interrupt_sender, idle_interrupt_receiver) = channel(1);
let (shutdown_sender, shutdown_receiver) = channel(1);
@@ -185,26 +319,19 @@ impl<T> ImapConnectionState<T> {
};
let state = ConnectionState {
idle_interrupt_sender,
shutdown_receiver,
stop_sender,
jobs_sender,
jobs_receiver,
};
let conn = ImapConnectionState {
idle_interrupt_sender,
state,
};
let conn = ImapConnectionState { state };
(conn, handlers)
}
/// Send a new job.
async fn send_job(&self, job: T) {
self.state
.send_job(job)
.join(self.idle_interrupt_sender.send(()))
.await;
/// Interrupt any form of idle.
async fn interrupt(&self) {
self.state.interrupt().await;
}
/// Shutdown this connection completely.
@@ -219,19 +346,3 @@ struct ImapConnectionHandlers {
stop_receiver: Receiver<()>,
shutdown_sender: Sender<()>,
}
/// Jobs handled by the inbox connection.
#[derive(Debug)]
pub enum InboxJob {}
/// Jobs handled by the mvbox connection.
#[derive(Debug)]
pub enum MvboxJob {}
/// Jobs handled by the sentbox connection.
#[derive(Debug)]
pub enum SentboxJob {}
/// Jobs handled by the smtp connection.
#[derive(Debug)]
pub enum SmtpJob {}