refactor: get rid of InterruptInfo

It was passed around, but the boolean inside was not used.
This commit is contained in:
link2xt
2023-11-10 03:13:18 +00:00
parent 765c95de39
commit 1a4c2953f7
9 changed files with 78 additions and 135 deletions

View File

@@ -35,7 +35,6 @@ use crate::mimeparser::SystemMessage;
use crate::param::{Param, Params};
use crate::peerstate::{Peerstate, PeerstateVerifiedStatus};
use crate::receive_imf::ReceivedMsg;
use crate::scheduler::InterruptInfo;
use crate::smtp::send_msg_to_smtp;
use crate::sql;
use crate::stock_str;
@@ -702,10 +701,7 @@ impl ChatId {
context.emit_msgs_changed_without_ids();
context.set_config(Config::LastHousekeeping, None).await?;
context
.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
context.scheduler.interrupt_inbox().await;
if chat.is_self_talk() {
let mut msg = Message::new(Viewtype::Text);
@@ -2503,10 +2499,7 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) -
context.emit_event(EventType::LocationChanged(Some(ContactId::SELF)));
}
context
.scheduler
.interrupt_smtp(InterruptInfo::new(false))
.await;
context.scheduler.interrupt_smtp().await;
}
Ok(msg.id)
@@ -3736,10 +3729,7 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId)
.await?;
curr_timestamp += 1;
if create_send_msg_job(context, &mut msg).await?.is_some() {
context
.scheduler
.interrupt_smtp(InterruptInfo::new(false))
.await;
context.scheduler.interrupt_smtp().await;
}
}
created_chats.push(chat_id);
@@ -3796,10 +3786,7 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
msg_id: msg.id,
});
if create_send_msg_job(context, &mut msg).await?.is_some() {
context
.scheduler
.interrupt_smtp(InterruptInfo::new(false))
.await;
context.scheduler.interrupt_smtp().await;
}
}
Ok(())

View File

@@ -31,7 +31,6 @@ use crate::login_param::{CertificateChecks, LoginParam, ServerLoginParam};
use crate::message::{Message, Viewtype};
use crate::oauth2::get_oauth2_addr;
use crate::provider::{Protocol, Socket, UsernamePattern};
use crate::scheduler::InterruptInfo;
use crate::smtp::Smtp;
use crate::socks::Socks5Config;
use crate::stock_str;
@@ -481,9 +480,7 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> {
ctx.set_config_bool(Config::FetchedExistingMsgs, false)
.await?;
ctx.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
ctx.scheduler.interrupt_inbox().await;
progress!(ctx, 940);
update_device_chats_handle.await??;

View File

@@ -23,7 +23,7 @@ use crate::key::{load_self_public_key, DcKey as _};
use crate::login_param::LoginParam;
use crate::message::{self, MessageState, MsgId};
use crate::quota::QuotaInfo;
use crate::scheduler::{InterruptInfo, SchedulerState};
use crate::scheduler::SchedulerState;
use crate::sql::Sql;
use crate::stock_str::StockStrings;
use crate::timesmearing::SmearedTimestamp;
@@ -437,11 +437,7 @@ impl Context {
pub(crate) async fn schedule_resync(&self) -> Result<()> {
self.resync_request.store(true, Ordering::Relaxed);
self.scheduler
.interrupt_inbox(InterruptInfo {
probe_network: false,
})
.await;
self.scheduler.interrupt_inbox().await;
Ok(())
}

View File

@@ -12,7 +12,6 @@ use crate::context::Context;
use crate::imap::{Imap, ImapActionResult};
use crate::message::{Message, MsgId, Viewtype};
use crate::mimeparser::{MimeMessage, Part};
use crate::scheduler::InterruptInfo;
use crate::tools::time;
use crate::{stock_str, EventType};
@@ -93,10 +92,7 @@ impl MsgId {
.sql
.execute("INSERT INTO download (msg_id) VALUES (?)", (self,))
.await?;
context
.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
context.scheduler.interrupt_inbox().await;
}
}
Ok(())

View File

@@ -35,7 +35,6 @@ use crate::receive_imf::{
from_field_to_contact_id, get_prefetch_parent_message, receive_imf_inner, ReceivedMsg,
};
use crate::scheduler::connectivity::ConnectivityStore;
use crate::scheduler::InterruptInfo;
use crate::socks::Socks5Config;
use crate::sql;
use crate::stock_str;
@@ -86,7 +85,7 @@ const BODY_PARTIAL: &str = "(FLAGS RFC822.SIZE BODY.PEEK[HEADER])";
#[derive(Debug)]
pub struct Imap {
pub(crate) idle_interrupt_receiver: Receiver<InterruptInfo>,
pub(crate) idle_interrupt_receiver: Receiver<()>,
config: ImapConfig,
pub(crate) session: Option<Session>,
login_failed_once: bool,
@@ -228,7 +227,7 @@ impl Imap {
socks5_config: Option<Socks5Config>,
addr: &str,
provider_strict_tls: bool,
idle_interrupt_receiver: Receiver<InterruptInfo>,
idle_interrupt_receiver: Receiver<()>,
) -> Result<Self> {
if lp.server.is_empty() || lp.user.is_empty() || lp.password.is_empty() {
bail!("Incomplete IMAP connection parameters");
@@ -261,7 +260,7 @@ impl Imap {
/// Creates new disconnected IMAP client using configured parameters.
pub async fn new_configured(
context: &Context,
idle_interrupt_receiver: Receiver<InterruptInfo>,
idle_interrupt_receiver: Receiver<()>,
) -> Result<Self> {
if !context.is_configured().await? {
bail!("IMAP Connect without configured params");
@@ -2290,10 +2289,7 @@ pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str)
(message_id,),
)
.await?;
context
.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
context.scheduler.interrupt_inbox().await;
Ok(())
}

View File

@@ -8,9 +8,9 @@ use futures_lite::FutureExt;
use super::session::Session;
use super::Imap;
use crate::config::Config;
use crate::context::Context;
use crate::imap::{client::IMAP_TIMEOUT, get_uid_next, FolderMeaning};
use crate::log::LogExt;
use crate::{context::Context, scheduler::InterruptInfo};
const IDLE_TIMEOUT: Duration = Duration::from_secs(23 * 60);
@@ -18,17 +18,15 @@ impl Session {
pub async fn idle(
mut self,
context: &Context,
idle_interrupt_receiver: Receiver<InterruptInfo>,
idle_interrupt_receiver: Receiver<()>,
folder: &str,
) -> Result<(Self, InterruptInfo)> {
) -> Result<Self> {
use futures::future::FutureExt;
let mut info = Default::default();
self.select_folder(context, Some(folder)).await?;
if self.server_sent_unsolicited_exists(context)? {
return Ok((self, info));
return Ok(self);
}
// Despite checking for unsolicited EXISTS above,
@@ -47,16 +45,16 @@ impl Session {
context,
"Skipping IDLE on {folder:?} because UIDNEXT {uid_next}>{expected_uid_next} indicates there are new messages."
);
return Ok((self, info));
return Ok(self);
}
} else {
warn!(context, "STATUS {folder} (UIDNEXT) did not return UIDNEXT");
// Go to IDLE anyway if STATUS is broken.
}
if let Ok(info) = idle_interrupt_receiver.try_recv() {
info!(context, "skip idle, got interrupt {:?}", info);
return Ok((self, info));
if let Ok(()) = idle_interrupt_receiver.try_recv() {
info!(context, "skip idle, got interrupt");
return Ok(self);
}
let mut handle = self.inner.idle();
@@ -73,17 +71,17 @@ impl Session {
enum Event {
IdleResponse(IdleResponse),
Interrupt(InterruptInfo),
Interrupt,
}
info!(context, "{folder}: Idle entering wait-on-remote state");
let fut = idle_wait.map(|ev| ev.map(Event::IdleResponse)).race(async {
let info = idle_interrupt_receiver.recv().await;
idle_interrupt_receiver.recv().await.ok();
// cancel imap idle connection properly
drop(interrupt);
Ok(Event::Interrupt(info.unwrap_or_default()))
Ok(Event::Interrupt)
});
match fut.await {
@@ -96,9 +94,8 @@ impl Session {
Ok(Event::IdleResponse(IdleResponse::ManualInterrupt)) => {
info!(context, "{folder}: Idle wait was interrupted manually");
}
Ok(Event::Interrupt(i)) => {
info!(context, "{folder}: Idle wait was interrupted: {:?}", &i);
info = i;
Ok(Event::Interrupt) => {
info!(context, "{folder}: Idle wait was interrupted");
}
Err(err) => {
warn!(context, "{folder}: Idle wait errored: {err:?}");
@@ -112,7 +109,7 @@ impl Session {
session.as_mut().set_read_timeout(Some(IMAP_TIMEOUT));
self.inner = session;
Ok((self, info))
Ok(self)
}
}
@@ -122,7 +119,7 @@ impl Imap {
context: &Context,
watch_folder: Option<String>,
folder_meaning: FolderMeaning,
) -> InterruptInfo {
) {
// Idle using polling. This is also needed if we're not yet configured -
// in this case, we're waiting for a configure job (and an interrupt).
@@ -133,11 +130,8 @@ impl Imap {
watch_folder
} else {
info!(context, "IMAP-fake-IDLE: no folder, waiting for interrupt");
return self
.idle_interrupt_receiver
.recv()
.await
.unwrap_or_default();
self.idle_interrupt_receiver.recv().await.ok();
return;
};
info!(context, "IMAP-fake-IDLEing folder={:?}", watch_folder);
@@ -147,10 +141,10 @@ impl Imap {
enum Event {
Tick,
Interrupt(InterruptInfo),
Interrupt,
}
// loop until we are interrupted or if we fetched something
let info = loop {
loop {
use futures::future::FutureExt;
match interval
.tick()
@@ -158,7 +152,7 @@ impl Imap {
.race(
self.idle_interrupt_receiver
.recv()
.map(|probe_network| Event::Interrupt(probe_network.unwrap_or_default())),
.map(|_| Event::Interrupt),
)
.await
{
@@ -180,7 +174,7 @@ impl Imap {
.unwrap_or_default()
{
// we only fake-idled because network was gone during IDLE, probably
break InterruptInfo::new(false);
break;
}
}
info!(context, "fake_idle is connected");
@@ -195,7 +189,7 @@ impl Imap {
Ok(res) => {
info!(context, "fetch_new_messages returned {:?}", res);
if res {
break InterruptInfo::new(false);
break;
}
}
Err(err) => {
@@ -204,13 +198,12 @@ impl Imap {
}
}
}
Event::Interrupt(info) => {
// Interrupt
Event::Interrupt => {
info!(context, "Fake IDLE interrupted");
break info;
break;
}
}
};
}
info!(
context,
@@ -221,7 +214,5 @@ impl Imap {
.as_millis() as f64
/ 1000.,
);
info
}
}

View File

@@ -24,7 +24,6 @@ use crate::mimeparser::{parse_message_id, SystemMessage};
use crate::param::{Param, Params};
use crate::pgp::split_armored_data;
use crate::reaction::get_msg_reactions;
use crate::scheduler::InterruptInfo;
use crate::sql;
use crate::summary::Summary;
use crate::tools::{
@@ -1527,10 +1526,7 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
}
// Interrupt Inbox loop to start message deletion and run housekeeping.
context
.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
context.scheduler.interrupt_inbox().await;
Ok(())
}
@@ -1648,10 +1644,7 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> Result<()>
)
.await
.context("failed to insert into smtp_mdns")?;
context
.scheduler
.interrupt_smtp(InterruptInfo::new(false))
.await;
context.scheduler.interrupt_smtp().await;
}
}
updated_chat_ids.insert(curr_chat_id);

View File

@@ -233,17 +233,17 @@ impl SchedulerState {
connectivity::maybe_network_lost(context, stores).await;
}
pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) {
pub(crate) async fn interrupt_inbox(&self) {
let inner = self.inner.read().await;
if let InnerSchedulerState::Started(ref scheduler) = *inner {
scheduler.interrupt_inbox(info);
scheduler.interrupt_inbox();
}
}
pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) {
pub(crate) async fn interrupt_smtp(&self) {
let inner = self.inner.read().await;
if let InnerSchedulerState::Started(ref scheduler) = *inner {
scheduler.interrupt_smtp(info);
scheduler.interrupt_smtp();
}
}
@@ -463,18 +463,15 @@ async fn inbox_loop(
/// handling all the errors. In case of an error, it is logged, but not propagated upwards. If
/// critical operation fails such as fetching new messages fails, connection is reset via
/// `trigger_reconnect`, so a fresh one can be opened.
async fn fetch_idle(
ctx: &Context,
connection: &mut Imap,
folder_meaning: FolderMeaning,
) -> InterruptInfo {
async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_meaning: FolderMeaning) {
let folder_config = match folder_meaning.to_config() {
Some(c) => c,
None => {
error!(ctx, "Bad folder meaning: {}", folder_meaning);
return connection
connection
.fake_idle(ctx, None, FolderMeaning::Unknown)
.await;
return;
}
};
let folder = match ctx.get_config(folder_config).await {
@@ -484,9 +481,10 @@ async fn fetch_idle(
ctx,
"Can not watch {} folder, failed to retrieve config: {:#}", folder_config, err
);
return connection
connection
.fake_idle(ctx, None, FolderMeaning::Unknown)
.await;
return;
}
};
@@ -495,9 +493,10 @@ async fn fetch_idle(
} else {
connection.connectivity.set_not_configured(ctx).await;
info!(ctx, "Can not watch {} folder, not set", folder_config);
return connection
connection
.fake_idle(ctx, None, FolderMeaning::Unknown)
.await;
return;
};
// connect and fake idle if unable to connect
@@ -508,9 +507,10 @@ async fn fetch_idle(
{
warn!(ctx, "{:#}", err);
connection.trigger_reconnect(ctx);
return connection
connection
.fake_idle(ctx, Some(watch_folder), folder_meaning)
.await;
return;
}
if folder_config == Config::ConfiguredInboxFolder {
@@ -534,7 +534,7 @@ async fn fetch_idle(
{
connection.trigger_reconnect(ctx);
warn!(ctx, "{:#}", err);
return InterruptInfo::new(false);
return;
}
// Mark expired messages for deletion. Marked messages will be deleted from the server
@@ -573,7 +573,7 @@ async fn fetch_idle(
{
connection.trigger_reconnect(ctx);
warn!(ctx, "{:#}", err);
return InterruptInfo::new(false);
return;
}
}
Ok(false) => {}
@@ -593,9 +593,10 @@ async fn fetch_idle(
ctx.emit_event(EventType::ImapInboxIdle);
let Some(session) = connection.session.take() else {
warn!(ctx, "No IMAP session, going to fake idle.");
return connection
connection
.fake_idle(ctx, Some(watch_folder), folder_meaning)
.await;
return;
};
if !session.can_idle() {
@@ -603,9 +604,10 @@ async fn fetch_idle(
ctx,
"IMAP session does not support IDLE, going to fake idle."
);
return connection
connection
.fake_idle(ctx, Some(watch_folder), folder_meaning)
.await;
return;
}
if ctx
@@ -616,9 +618,10 @@ async fn fetch_idle(
.unwrap_or_default()
{
info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
return connection
connection
.fake_idle(ctx, Some(watch_folder), folder_meaning)
.await;
return;
}
info!(ctx, "IMAP session supports IDLE, using it.");
@@ -631,14 +634,12 @@ async fn fetch_idle(
.await
.context("idle")
{
Ok((session, info)) => {
Ok(session) => {
connection.session = Some(session);
info
}
Err(err) => {
connection.trigger_reconnect(ctx);
warn!(ctx, "{:#}", err);
InterruptInfo::new(false)
}
}
}
@@ -860,24 +861,24 @@ impl Scheduler {
fn maybe_network(&self) {
for b in self.boxes() {
b.conn_state.interrupt(InterruptInfo::new(true));
b.conn_state.interrupt();
}
self.interrupt_smtp(InterruptInfo::new(true));
self.interrupt_smtp();
}
fn maybe_network_lost(&self) {
for b in self.boxes() {
b.conn_state.interrupt(InterruptInfo::new(false));
b.conn_state.interrupt();
}
self.interrupt_smtp(InterruptInfo::new(false));
self.interrupt_smtp();
}
fn interrupt_inbox(&self, info: InterruptInfo) {
self.inbox.conn_state.interrupt(info);
fn interrupt_inbox(&self) {
self.inbox.conn_state.interrupt();
}
fn interrupt_smtp(&self, info: InterruptInfo) {
self.smtp.interrupt(info);
fn interrupt_smtp(&self) {
self.smtp.interrupt();
}
fn interrupt_ephemeral_task(&self) {
@@ -927,7 +928,7 @@ struct ConnectionState {
/// Channel to interrupt the whole connection.
stop_sender: Sender<()>,
/// Channel to interrupt idle.
idle_interrupt_sender: Sender<InterruptInfo>,
idle_interrupt_sender: Sender<()>,
/// Mutex to pass connectivity info between IMAP/SMTP threads and the API
connectivity: ConnectivityStore,
}
@@ -943,9 +944,9 @@ impl ConnectionState {
Ok(())
}
fn interrupt(&self, info: InterruptInfo) {
fn interrupt(&self) {
// Use try_send to avoid blocking on interrupts.
self.idle_interrupt_sender.try_send(info).ok();
self.idle_interrupt_sender.try_send(()).ok();
}
}
@@ -977,8 +978,8 @@ impl SmtpConnectionState {
}
/// Interrupt any form of idle.
fn interrupt(&self, info: InterruptInfo) {
self.state.interrupt(info);
fn interrupt(&self) {
self.state.interrupt();
}
/// Shutdown this connection completely.
@@ -991,7 +992,7 @@ impl SmtpConnectionState {
struct SmtpConnectionHandlers {
connection: Smtp,
stop_receiver: Receiver<()>,
idle_interrupt_receiver: Receiver<InterruptInfo>,
idle_interrupt_receiver: Receiver<()>,
}
#[derive(Debug)]
@@ -1022,8 +1023,8 @@ impl ImapConnectionState {
}
/// Interrupt any form of idle.
fn interrupt(&self, info: InterruptInfo) {
self.state.interrupt(info);
fn interrupt(&self) {
self.state.interrupt();
}
/// Shutdown this connection completely.
@@ -1038,14 +1039,3 @@ struct ImapConnectionHandlers {
connection: Imap,
stop_receiver: Receiver<()>,
}
#[derive(Default, Debug)]
pub struct InterruptInfo {
pub probe_network: bool,
}
impl InterruptInfo {
pub fn new(probe_network: bool) -> Self {
Self { probe_network }
}
}

View File

@@ -37,7 +37,6 @@ use crate::mimefactory::wrapped_base64_encode;
use crate::mimeparser::SystemMessage;
use crate::param::Param;
use crate::param::Params;
use crate::scheduler::InterruptInfo;
use crate::tools::strip_rtlo_characters;
use crate::tools::{create_smeared_timestamp, get_abs_path};
@@ -485,9 +484,7 @@ impl Context {
DO UPDATE SET last_serial=excluded.last_serial, descr=excluded.descr",
(instance.id, status_update_serial, status_update_serial, descr),
).await?;
self.scheduler
.interrupt_smtp(InterruptInfo::new(false))
.await;
self.scheduler.interrupt_smtp().await;
}
Ok(())
}