scheduler: use oneshot channel for "started" notifications

The senders are not used more than once.
This commit is contained in:
link2xt
2023-04-17 17:38:49 +00:00
parent f267f6f756
commit 2ef5f2eb52

View File

@@ -323,7 +323,11 @@ pub(crate) struct Scheduler {
recently_seen_loop: RecentlySeenLoop, recently_seen_loop: RecentlySeenLoop,
} }
async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) { async fn inbox_loop(
ctx: Context,
started: oneshot::Sender<()>,
inbox_handlers: ImapConnectionHandlers,
) {
use futures::future::FutureExt; use futures::future::FutureExt;
info!(ctx, "starting inbox loop"); info!(ctx, "starting inbox loop");
@@ -335,8 +339,8 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
let ctx1 = ctx.clone(); let ctx1 = ctx.clone();
let fut = async move { let fut = async move {
let ctx = ctx1; let ctx = ctx1;
if let Err(err) = started.send(()).await { if let Err(()) = started.send(()) {
warn!(ctx, "inbox loop, missing started receiver: {}", err); warn!(ctx, "inbox loop, missing started receiver");
return; return;
}; };
@@ -600,7 +604,7 @@ async fn fetch_idle(
async fn simple_imap_loop( async fn simple_imap_loop(
ctx: Context, ctx: Context,
started: Sender<()>, started: oneshot::Sender<()>,
inbox_handlers: ImapConnectionHandlers, inbox_handlers: ImapConnectionHandlers,
folder_meaning: FolderMeaning, folder_meaning: FolderMeaning,
) { ) {
@@ -616,8 +620,8 @@ async fn simple_imap_loop(
let fut = async move { let fut = async move {
let ctx = ctx1; let ctx = ctx1;
if let Err(err) = started.send(()).await { if let Err(()) = started.send(()) {
warn!(&ctx, "simple imap loop, missing started receiver: {}", err); warn!(&ctx, "simple imap loop, missing started receiver");
return; return;
} }
@@ -635,7 +639,11 @@ async fn simple_imap_loop(
.await; .await;
} }
async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnectionHandlers) { async fn smtp_loop(
ctx: Context,
started: oneshot::Sender<()>,
smtp_handlers: SmtpConnectionHandlers,
) {
use futures::future::FutureExt; use futures::future::FutureExt;
info!(ctx, "starting smtp loop"); info!(ctx, "starting smtp loop");
@@ -648,8 +656,8 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
let ctx1 = ctx.clone(); let ctx1 = ctx.clone();
let fut = async move { let fut = async move {
let ctx = ctx1; let ctx = ctx1;
if let Err(err) = started.send(()).await { if let Err(()) = started.send(()) {
warn!(&ctx, "smtp loop, missing started receiver: {}", err); warn!(&ctx, "smtp loop, missing started receiver");
return; return;
} }
@@ -721,7 +729,7 @@ impl Scheduler {
pub async fn start(ctx: Context) -> Result<Self> { pub async fn start(ctx: Context) -> Result<Self> {
let (smtp, smtp_handlers) = SmtpConnectionState::new(); let (smtp, smtp_handlers) = SmtpConnectionState::new();
let (smtp_start_send, smtp_start_recv) = channel::bounded(1); let (smtp_start_send, smtp_start_recv) = oneshot::channel();
let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1); let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1); let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
@@ -729,7 +737,7 @@ impl Scheduler {
let mut start_recvs = Vec::new(); let mut start_recvs = Vec::new();
let (conn_state, inbox_handlers) = ImapConnectionState::new(&ctx).await?; let (conn_state, inbox_handlers) = ImapConnectionState::new(&ctx).await?;
let (inbox_start_send, inbox_start_recv) = channel::bounded(1); let (inbox_start_send, inbox_start_recv) = oneshot::channel();
let handle = { let handle = {
let ctx = ctx.clone(); let ctx = ctx.clone();
task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers)) task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
@@ -750,7 +758,7 @@ impl Scheduler {
] { ] {
if should_watch? { if should_watch? {
let (conn_state, handlers) = ImapConnectionState::new(&ctx).await?; let (conn_state, handlers) = ImapConnectionState::new(&ctx).await?;
let (start_send, start_recv) = channel::bounded(1); let (start_send, start_recv) = oneshot::channel();
let ctx = ctx.clone(); let ctx = ctx.clone();
let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning)); let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
oboxes.push(SchedBox { oboxes.push(SchedBox {
@@ -797,7 +805,7 @@ impl Scheduler {
}; };
// wait for all loops to be started // wait for all loops to be started
if let Err(err) = try_join_all(start_recvs.iter().map(|r| r.recv())).await { if let Err(err) = try_join_all(start_recvs).await {
bail!("failed to start scheduler: {}", err); bail!("failed to start scheduler: {}", err);
} }