some cleanup

This commit is contained in:
dignifiedquire
2020-03-18 02:23:22 +01:00
parent 563b550f08
commit 94c6a01420
3 changed files with 117 additions and 125 deletions

View File

@@ -67,7 +67,8 @@ async fn main() {
println!("------ RUN ------");
ctx.run().await;
println!("sending a message");
println!("--- SENDING A MESSAGE ---");
let contact_id = Contact::create(&ctx, "dignifiedquire", "dignifiedquire@gmail.com")
.await
.unwrap();

View File

@@ -128,7 +128,12 @@ impl Context {
}
pub async fn run(&self) {
self.inner.scheduler.write().await.run(self.clone()).await
let ctx = self.clone();
println!("RUN LOCK START");
let l = &mut *self.inner.scheduler.write().await;
println!("RUN LOCK AQ");
l.run(ctx);
println!("RUN LOCK DONE");
}
pub async fn stop(&self) {

View File

@@ -40,22 +40,116 @@ impl Context {
}
}
async fn inbox_loop(ctx: Context, inbox_handlers: ImapConnectionHandlers) {
info!(ctx, "starting inbox loop");
let ImapConnectionHandlers {
mut connection,
stop_receiver,
shutdown_sender,
} = inbox_handlers;
let fut = async move {
connection.connect_configured(&ctx).await.unwrap();
loop {
// TODO: correct value
let probe_network = false;
match job::load_next(&ctx, Thread::Imap, probe_network)
.timeout(Duration::from_millis(200))
.await
{
Ok(Some(job)) => {
job::perform_job(&ctx, job::Connection::Inbox(&mut connection), job).await;
}
Ok(None) | Err(async_std::future::TimeoutError { .. }) => {
let watch_folder = get_watch_folder(&ctx, "configured_inbox_folder")
.await
.ok_or_else(|| Error::WatchFolderNotFound("not-set".to_string()))
.unwrap();
// fetch
connection
.fetch(&ctx, &watch_folder)
.await
.unwrap_or_else(|err| {
error!(ctx, "{}", err);
});
// idle
if connection.can_idle() {
connection
.idle(&ctx, Some(watch_folder))
.await
.unwrap_or_else(|err| {
error!(ctx, "{}", err);
});
} else {
connection.fake_idle(&ctx, Some(watch_folder)).await;
}
}
}
}
};
fut.race(stop_receiver.recv()).await;
shutdown_sender.send(()).await;
}
async fn smtp_loop(ctx: Context, smtp_handlers: SmtpConnectionHandlers) {
info!(ctx, "starting smtp loop");
let SmtpConnectionHandlers {
mut connection,
stop_receiver,
shutdown_sender,
idle_interrupt_receiver,
} = smtp_handlers;
let fut = async move {
loop {
// TODO: correct value
let probe_network = false;
match job::load_next(&ctx, Thread::Smtp, probe_network)
.timeout(Duration::from_millis(200))
.await
{
Ok(Some(job)) => {
job::perform_job(&ctx, job::Connection::Smtp(&mut connection), job).await;
}
Ok(None) | Err(async_std::future::TimeoutError { .. }) => {
use futures::future::FutureExt;
// Fake Idle
async_std::task::sleep(Duration::from_millis(500))
.race(idle_interrupt_receiver.recv().map(|_| ()))
.await;
}
}
}
};
fut.race(stop_receiver.recv()).await;
shutdown_sender.send(()).await;
}
impl Scheduler {
/// Start the scheduler, panics if it is already running.
pub async fn run(&mut self, ctx: Context) {
pub fn run(&mut self, ctx: Context) {
match self {
Scheduler::Stopped => {
let (
(
((inbox, inbox_handlers), (mvbox, mvbox_handlers)),
(sentbox, sentbox_handlers),
),
(smtp, smtp_handlers),
) = ImapConnectionState::new()
.join(ImapConnectionState::new())
.join(ImapConnectionState::new())
.join(SmtpConnectionState::new())
.await;
let (mvbox, mvbox_handlers) = ImapConnectionState::new();
let (sentbox, sentbox_handlers) = ImapConnectionState::new();
let (smtp, smtp_handlers) = SmtpConnectionState::new();
let (inbox, inbox_handlers) = ImapConnectionState::new();
let ctx1 = ctx.clone();
let _ = task::spawn(async move { inbox_loop(ctx1, inbox_handlers).await });
// TODO: mvbox
// TODO: sentbox
let ctx1 = ctx.clone();
let _ = task::spawn(async move { smtp_loop(ctx1, smtp_handlers).await });
*self = Scheduler::Running {
inbox,
mvbox,
@@ -63,116 +157,8 @@ impl Scheduler {
smtp,
};
let ctx1 = ctx.clone();
task::spawn(async move {
info!(ctx1, "starting inbox loop");
let ImapConnectionHandlers {
mut connection,
stop_receiver,
shutdown_sender,
} = inbox_handlers;
let fut = async move {
connection.connect_configured(&ctx1).await.unwrap();
loop {
// TODO: correct value
let probe_network = false;
match job::load_next(&ctx1, Thread::Imap, probe_network)
.timeout(Duration::from_millis(200))
.await
{
Ok(Some(job)) => {
job::perform_job(
&ctx1,
job::Connection::Inbox(&mut connection),
job,
)
.await;
}
Ok(None) | Err(async_std::future::TimeoutError { .. }) => {
let watch_folder =
get_watch_folder(&ctx1, "configured_inbox_folder")
.await
.ok_or_else(|| {
Error::WatchFolderNotFound("not-set".to_string())
})
.unwrap();
// fetch
connection.fetch(&ctx1, &watch_folder).await.unwrap_or_else(
|err| {
error!(ctx1, "{}", err);
},
);
// idle
if connection.can_idle() {
connection
.idle(&ctx1, Some(watch_folder))
.await
.unwrap_or_else(|err| {
error!(ctx1, "{}", err);
});
} else {
connection.fake_idle(&ctx1, Some(watch_folder)).await;
}
}
}
}
};
fut.race(stop_receiver.recv()).await;
shutdown_sender.send(()).await;
});
// TODO: mvbox
// TODO: sentbox
let ctx1 = ctx.clone();
task::spawn(async move {
info!(ctx1, "starting smtp loop");
let SmtpConnectionHandlers {
mut connection,
stop_receiver,
shutdown_sender,
idle_interrupt_receiver,
} = smtp_handlers;
let fut = async move {
loop {
// TODO: correct value
let probe_network = false;
match job::load_next(&ctx1, Thread::Smtp, probe_network)
.timeout(Duration::from_millis(200))
.await
{
Ok(Some(job)) => {
job::perform_job(
&ctx1,
job::Connection::Smtp(&mut connection),
job,
)
.await;
}
Ok(None) | Err(async_std::future::TimeoutError { .. }) => {
use futures::future::FutureExt;
// Fake Idle
async_std::task::sleep(Duration::from_millis(500))
.race(idle_interrupt_receiver.recv().map(|_| ()))
.await;
}
}
}
};
fut.race(stop_receiver.recv()).await;
shutdown_sender.send(()).await;
});
info!(ctx, "scheduler is running");
println!("RUN DONE");
}
Scheduler::Running { .. } => {
// TODO: return an error
@@ -271,7 +257,7 @@ pub(crate) struct SmtpConnectionState {
}
impl SmtpConnectionState {
async fn new() -> (Self, SmtpConnectionHandlers) {
fn new() -> (Self, SmtpConnectionHandlers) {
let (stop_sender, stop_receiver) = channel(1);
let (shutdown_sender, shutdown_receiver) = channel(1);
let (idle_interrupt_sender, idle_interrupt_receiver) = channel(1);
@@ -320,7 +306,7 @@ pub(crate) struct ImapConnectionState {
impl ImapConnectionState {
/// Construct a new connection.
async fn new() -> (Self, ImapConnectionHandlers) {
fn new() -> (Self, ImapConnectionHandlers) {
let (stop_sender, stop_receiver) = channel(1);
let (idle_interrupt_sender, idle_interrupt_receiver) = channel(1);
let (shutdown_sender, shutdown_receiver) = channel(1);