mirror of
https://github.com/chatmail/core.git
synced 2026-05-02 04:46:29 +03:00
Merge branch 'flub/pause-io'
PR: <https://github.com/deltachat/deltachat-core-rust/pull/4138>
This commit is contained in:
@@ -8,6 +8,10 @@
|
|||||||
- Support non-persistent configuration with DELTACHAT_* env
|
- Support non-persistent configuration with DELTACHAT_* env
|
||||||
- Print deltachat-repl errors with causes. #4166
|
- Print deltachat-repl errors with causes. #4166
|
||||||
- Increase MSRV to 1.64. #4167
|
- Increase MSRV to 1.64. #4167
|
||||||
|
- Core takes care of stopping and re-starting IO itself where needed,
|
||||||
|
e.g. during backup creation. It is no longer needed to call
|
||||||
|
dc_stop_io(). dc_start_io() can now be called at any time without
|
||||||
|
harm. #4138
|
||||||
|
|
||||||
### Fixes
|
### Fixes
|
||||||
- Fix segmentation fault if `dc_context_unref()` is called during
|
- Fix segmentation fault if `dc_context_unref()` is called during
|
||||||
|
|||||||
@@ -2100,8 +2100,7 @@ dc_contact_t* dc_get_contact (dc_context_t* context, uint32_t co
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Import/export things.
|
* Import/export things.
|
||||||
* During backup import/export IO must not be started,
|
*
|
||||||
* if needed stop IO using dc_accounts_stop_io() or dc_stop_io() first.
|
|
||||||
* What to do is defined by the _what_ parameter which may be one of the following:
|
* What to do is defined by the _what_ parameter which may be one of the following:
|
||||||
*
|
*
|
||||||
* - **DC_IMEX_EXPORT_BACKUP** (11) - Export a backup to the directory given as `param1`
|
* - **DC_IMEX_EXPORT_BACKUP** (11) - Export a backup to the directory given as `param1`
|
||||||
|
|||||||
@@ -1325,16 +1325,13 @@ impl CommandApi {
|
|||||||
passphrase: Option<String>,
|
passphrase: Option<String>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let ctx = self.get_context(account_id).await?;
|
let ctx = self.get_context(account_id).await?;
|
||||||
ctx.stop_io().await;
|
imex::imex(
|
||||||
let result = imex::imex(
|
|
||||||
&ctx,
|
&ctx,
|
||||||
imex::ImexMode::ExportBackup,
|
imex::ImexMode::ExportBackup,
|
||||||
destination.as_ref(),
|
destination.as_ref(),
|
||||||
passphrase,
|
passphrase,
|
||||||
)
|
)
|
||||||
.await;
|
.await
|
||||||
ctx.start_io().await;
|
|
||||||
result
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn import_backup(
|
async fn import_backup(
|
||||||
|
|||||||
@@ -271,14 +271,14 @@ impl Accounts {
|
|||||||
/// Notifies all accounts that the network may have become available.
|
/// Notifies all accounts that the network may have become available.
|
||||||
pub async fn maybe_network(&self) {
|
pub async fn maybe_network(&self) {
|
||||||
for account in self.accounts.values() {
|
for account in self.accounts.values() {
|
||||||
account.maybe_network().await;
|
account.scheduler.maybe_network().await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Notifies all accounts that the network connection may have been lost.
|
/// Notifies all accounts that the network connection may have been lost.
|
||||||
pub async fn maybe_network_lost(&self) {
|
pub async fn maybe_network_lost(&self) {
|
||||||
for account in self.accounts.values() {
|
for account in self.accounts.values() {
|
||||||
account.maybe_network_lost().await;
|
account.scheduler.maybe_network_lost(account).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
22
src/chat.rs
22
src/chat.rs
@@ -639,7 +639,10 @@ impl ChatId {
|
|||||||
context.emit_msgs_changed_without_ids();
|
context.emit_msgs_changed_without_ids();
|
||||||
|
|
||||||
context.set_config(Config::LastHousekeeping, None).await?;
|
context.set_config(Config::LastHousekeeping, None).await?;
|
||||||
context.interrupt_inbox(InterruptInfo::new(false)).await;
|
context
|
||||||
|
.scheduler
|
||||||
|
.interrupt_inbox(InterruptInfo::new(false))
|
||||||
|
.await;
|
||||||
|
|
||||||
if chat.is_self_talk() {
|
if chat.is_self_talk() {
|
||||||
let mut msg = Message::new(Viewtype::Text);
|
let mut msg = Message::new(Viewtype::Text);
|
||||||
@@ -1667,7 +1670,7 @@ impl Chat {
|
|||||||
|
|
||||||
maybe_set_logging_xdc(context, msg, self.id).await?;
|
maybe_set_logging_xdc(context, msg, self.id).await?;
|
||||||
}
|
}
|
||||||
context.interrupt_ephemeral_task().await;
|
context.scheduler.interrupt_ephemeral_task().await;
|
||||||
Ok(msg.id)
|
Ok(msg.id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -2201,7 +2204,10 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) -
|
|||||||
context.emit_event(EventType::LocationChanged(Some(ContactId::SELF)));
|
context.emit_event(EventType::LocationChanged(Some(ContactId::SELF)));
|
||||||
}
|
}
|
||||||
|
|
||||||
context.interrupt_smtp(InterruptInfo::new(false)).await;
|
context
|
||||||
|
.scheduler
|
||||||
|
.interrupt_smtp(InterruptInfo::new(false))
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(msg.id)
|
Ok(msg.id)
|
||||||
@@ -3433,7 +3439,10 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId)
|
|||||||
.await?;
|
.await?;
|
||||||
curr_timestamp += 1;
|
curr_timestamp += 1;
|
||||||
if create_send_msg_job(context, new_msg_id).await?.is_some() {
|
if create_send_msg_job(context, new_msg_id).await?.is_some() {
|
||||||
context.interrupt_smtp(InterruptInfo::new(false)).await;
|
context
|
||||||
|
.scheduler
|
||||||
|
.interrupt_smtp(InterruptInfo::new(false))
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
created_chats.push(chat_id);
|
created_chats.push(chat_id);
|
||||||
@@ -3488,7 +3497,10 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
|
|||||||
msg_id: msg.id,
|
msg_id: msg.id,
|
||||||
});
|
});
|
||||||
if create_send_msg_job(context, msg.id).await?.is_some() {
|
if create_send_msg_job(context, msg.id).await?.is_some() {
|
||||||
context.interrupt_smtp(InterruptInfo::new(false)).await;
|
context
|
||||||
|
.scheduler
|
||||||
|
.interrupt_smtp(InterruptInfo::new(false))
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -443,7 +443,7 @@ impl Context {
|
|||||||
Config::DeleteDeviceAfter => {
|
Config::DeleteDeviceAfter => {
|
||||||
let ret = self.sql.set_raw_config(key.as_ref(), value).await;
|
let ret = self.sql.set_raw_config(key.as_ref(), value).await;
|
||||||
// Interrupt ephemeral loop to delete old messages immediately.
|
// Interrupt ephemeral loop to delete old messages immediately.
|
||||||
self.interrupt_ephemeral_task().await;
|
self.scheduler.interrupt_ephemeral_task().await;
|
||||||
ret?
|
ret?
|
||||||
}
|
}
|
||||||
Config::Displayname => {
|
Config::Displayname => {
|
||||||
|
|||||||
@@ -59,7 +59,7 @@ impl Context {
|
|||||||
/// Configures this account with the currently set parameters.
|
/// Configures this account with the currently set parameters.
|
||||||
pub async fn configure(&self) -> Result<()> {
|
pub async fn configure(&self) -> Result<()> {
|
||||||
ensure!(
|
ensure!(
|
||||||
self.scheduler.read().await.is_none(),
|
!self.scheduler.is_running().await,
|
||||||
"cannot configure, already running"
|
"cannot configure, already running"
|
||||||
);
|
);
|
||||||
ensure!(
|
ensure!(
|
||||||
@@ -469,7 +469,9 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> {
|
|||||||
|
|
||||||
ctx.set_config_bool(Config::FetchedExistingMsgs, false)
|
ctx.set_config_bool(Config::FetchedExistingMsgs, false)
|
||||||
.await?;
|
.await?;
|
||||||
ctx.interrupt_inbox(InterruptInfo::new(false)).await;
|
ctx.scheduler
|
||||||
|
.interrupt_inbox(InterruptInfo::new(false))
|
||||||
|
.await;
|
||||||
|
|
||||||
progress!(ctx, 940);
|
progress!(ctx, 940);
|
||||||
update_device_chats_handle.await??;
|
update_device_chats_handle.await??;
|
||||||
|
|||||||
@@ -1466,7 +1466,10 @@ pub(crate) async fn update_last_seen(
|
|||||||
> 0
|
> 0
|
||||||
&& timestamp > time() - SEEN_RECENTLY_SECONDS
|
&& timestamp > time() - SEEN_RECENTLY_SECONDS
|
||||||
{
|
{
|
||||||
context.interrupt_recently_seen(contact_id, timestamp).await;
|
context
|
||||||
|
.scheduler
|
||||||
|
.interrupt_recently_seen(contact_id, timestamp)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ use crate::key::{DcKey, SignedPublicKey};
|
|||||||
use crate::login_param::LoginParam;
|
use crate::login_param::LoginParam;
|
||||||
use crate::message::{self, MessageState, MsgId};
|
use crate::message::{self, MessageState, MsgId};
|
||||||
use crate::quota::QuotaInfo;
|
use crate::quota::QuotaInfo;
|
||||||
use crate::scheduler::Scheduler;
|
use crate::scheduler::SchedulerState;
|
||||||
use crate::sql::Sql;
|
use crate::sql::Sql;
|
||||||
use crate::stock_str::StockStrings;
|
use crate::stock_str::StockStrings;
|
||||||
use crate::timesmearing::SmearedTimestamp;
|
use crate::timesmearing::SmearedTimestamp;
|
||||||
@@ -201,7 +201,7 @@ pub struct InnerContext {
|
|||||||
pub(crate) translated_stockstrings: StockStrings,
|
pub(crate) translated_stockstrings: StockStrings,
|
||||||
pub(crate) events: Events,
|
pub(crate) events: Events,
|
||||||
|
|
||||||
pub(crate) scheduler: RwLock<Option<Scheduler>>,
|
pub(crate) scheduler: SchedulerState,
|
||||||
pub(crate) ratelimit: RwLock<Ratelimit>,
|
pub(crate) ratelimit: RwLock<Ratelimit>,
|
||||||
|
|
||||||
/// Recently loaded quota information, if any.
|
/// Recently loaded quota information, if any.
|
||||||
@@ -370,7 +370,7 @@ impl Context {
|
|||||||
wrong_pw_warning_mutex: Mutex::new(()),
|
wrong_pw_warning_mutex: Mutex::new(()),
|
||||||
translated_stockstrings: stockstrings,
|
translated_stockstrings: stockstrings,
|
||||||
events,
|
events,
|
||||||
scheduler: RwLock::new(None),
|
scheduler: SchedulerState::new(),
|
||||||
ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow to send 6 messages immediately, no more than once every 10 seconds.
|
ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow to send 6 messages immediately, no more than once every 10 seconds.
|
||||||
quota: RwLock::new(None),
|
quota: RwLock::new(None),
|
||||||
quota_update_request: AtomicBool::new(false),
|
quota_update_request: AtomicBool::new(false),
|
||||||
@@ -395,42 +395,23 @@ impl Context {
|
|||||||
warn!(self, "can not start io on a context that is not configured");
|
warn!(self, "can not start io on a context that is not configured");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
self.scheduler.start(self.clone()).await;
|
||||||
info!(self, "starting IO");
|
|
||||||
let mut lock = self.inner.scheduler.write().await;
|
|
||||||
if lock.is_none() {
|
|
||||||
match Scheduler::start(self.clone()).await {
|
|
||||||
Err(err) => error!(self, "Failed to start IO: {:#}", err),
|
|
||||||
Ok(scheduler) => *lock = Some(scheduler),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stops the IO scheduler.
|
/// Stops the IO scheduler.
|
||||||
pub async fn stop_io(&self) {
|
pub async fn stop_io(&self) {
|
||||||
// Sending an event wakes up event pollers (get_next_event)
|
self.scheduler.stop(self).await;
|
||||||
// so the caller of stop_io() can arrange for proper termination.
|
|
||||||
// For this, the caller needs to instruct the event poller
|
|
||||||
// to terminate on receiving the next event and then call stop_io()
|
|
||||||
// which will emit the below event(s)
|
|
||||||
info!(self, "stopping IO");
|
|
||||||
if let Some(debug_logging) = self.debug_logging.read().await.as_ref() {
|
|
||||||
debug_logging.loop_handle.abort();
|
|
||||||
}
|
|
||||||
if let Some(scheduler) = self.inner.scheduler.write().await.take() {
|
|
||||||
scheduler.stop(self).await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Restarts the IO scheduler if it was running before
|
/// Restarts the IO scheduler if it was running before
|
||||||
/// when it is not running this is an no-op
|
/// when it is not running this is an no-op
|
||||||
pub async fn restart_io_if_running(&self) {
|
pub async fn restart_io_if_running(&self) {
|
||||||
info!(self, "restarting IO");
|
self.scheduler.restart(self).await;
|
||||||
let is_running = { self.inner.scheduler.read().await.is_some() };
|
}
|
||||||
if is_running {
|
|
||||||
self.stop_io().await;
|
/// Indicate that the network likely has come back.
|
||||||
self.start_io().await;
|
pub async fn maybe_network(&self) {
|
||||||
}
|
self.scheduler.maybe_network().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a reference to the underlying SQL instance.
|
/// Returns a reference to the underlying SQL instance.
|
||||||
|
|||||||
@@ -317,7 +317,7 @@ impl MsgId {
|
|||||||
paramsv![ephemeral_timestamp, ephemeral_timestamp, self],
|
paramsv![ephemeral_timestamp, ephemeral_timestamp, self],
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
context.interrupt_ephemeral_task().await;
|
context.scheduler.interrupt_ephemeral_task().await;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -345,7 +345,7 @@ pub(crate) async fn start_ephemeral_timers_msgids(
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
if count > 0 {
|
if count > 0 {
|
||||||
context.interrupt_ephemeral_task().await;
|
context.scheduler.interrupt_ephemeral_task().await;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -475,7 +475,7 @@ impl Imap {
|
|||||||
// Note that the `Config::DeleteDeviceAfter` timer starts as soon as the messages are
|
// Note that the `Config::DeleteDeviceAfter` timer starts as soon as the messages are
|
||||||
// fetched while the per-chat ephemeral timers start as soon as the messages are marked
|
// fetched while the per-chat ephemeral timers start as soon as the messages are marked
|
||||||
// as noticed.
|
// as noticed.
|
||||||
context.interrupt_ephemeral_task().await;
|
context.scheduler.interrupt_ephemeral_task().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
let session = self
|
let session = self
|
||||||
@@ -2224,7 +2224,10 @@ pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str)
|
|||||||
paramsv![message_id],
|
paramsv![message_id],
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
context.interrupt_inbox(InterruptInfo::new(false)).await;
|
context
|
||||||
|
.scheduler
|
||||||
|
.interrupt_inbox(InterruptInfo::new(false))
|
||||||
|
.await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
22
src/imex.rs
22
src/imex.rs
@@ -86,13 +86,17 @@ pub async fn imex(
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let cancel = context.alloc_ongoing().await?;
|
let cancel = context.alloc_ongoing().await?;
|
||||||
|
|
||||||
let res = imex_inner(context, what, path, passphrase)
|
let res = {
|
||||||
.race(async {
|
let mut guard = context.scheduler.pause(context).await;
|
||||||
cancel.recv().await.ok();
|
let res = imex_inner(context, what, path, passphrase)
|
||||||
Err(format_err!("canceled"))
|
.race(async {
|
||||||
})
|
cancel.recv().await.ok();
|
||||||
.await;
|
Err(format_err!("canceled"))
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
guard.resume().await;
|
||||||
|
res
|
||||||
|
};
|
||||||
context.free_ongoing().await;
|
context.free_ongoing().await;
|
||||||
|
|
||||||
if let Err(err) = res.as_ref() {
|
if let Err(err) = res.as_ref() {
|
||||||
@@ -413,7 +417,7 @@ async fn import_backup(
|
|||||||
"Cannot import backups to accounts in use."
|
"Cannot import backups to accounts in use."
|
||||||
);
|
);
|
||||||
ensure!(
|
ensure!(
|
||||||
context.scheduler.read().await.is_none(),
|
!context.scheduler.is_running().await,
|
||||||
"cannot import backup, IO is running"
|
"cannot import backup, IO is running"
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -523,7 +527,7 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res
|
|||||||
sql::housekeeping(context).await.ok_or_log(context);
|
sql::housekeeping(context).await.ok_or_log(context);
|
||||||
|
|
||||||
ensure!(
|
ensure!(
|
||||||
context.scheduler.read().await.is_none(),
|
!context.scheduler.is_running().await,
|
||||||
"cannot export backup, IO is running"
|
"cannot export backup, IO is running"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -238,6 +238,7 @@ fn get_backoff_time_offset(tries: u32) -> i64 {
|
|||||||
pub(crate) async fn schedule_resync(context: &Context) -> Result<()> {
|
pub(crate) async fn schedule_resync(context: &Context) -> Result<()> {
|
||||||
context.resync_request.store(true, Ordering::Relaxed);
|
context.resync_request.store(true, Ordering::Relaxed);
|
||||||
context
|
context
|
||||||
|
.scheduler
|
||||||
.interrupt_inbox(InterruptInfo {
|
.interrupt_inbox(InterruptInfo {
|
||||||
probe_network: false,
|
probe_network: false,
|
||||||
})
|
})
|
||||||
@@ -250,7 +251,10 @@ pub async fn add(context: &Context, job: Job) -> Result<()> {
|
|||||||
job.save(context).await.context("failed to save job")?;
|
job.save(context).await.context("failed to save job")?;
|
||||||
|
|
||||||
info!(context, "interrupt: imap");
|
info!(context, "interrupt: imap");
|
||||||
context.interrupt_inbox(InterruptInfo::new(false)).await;
|
context
|
||||||
|
.scheduler
|
||||||
|
.interrupt_inbox(InterruptInfo::new(false))
|
||||||
|
.await;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -267,7 +267,7 @@ pub async fn send_locations_to_chat(
|
|||||||
}
|
}
|
||||||
context.emit_event(EventType::ChatModified(chat_id));
|
context.emit_event(EventType::ChatModified(chat_id));
|
||||||
if 0 != seconds {
|
if 0 != seconds {
|
||||||
context.interrupt_location().await;
|
context.scheduler.interrupt_location().await;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1419,7 +1419,10 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Interrupt Inbox loop to start message deletion and run housekeeping.
|
// Interrupt Inbox loop to start message deletion and run housekeeping.
|
||||||
context.interrupt_inbox(InterruptInfo::new(false)).await;
|
context
|
||||||
|
.scheduler
|
||||||
|
.interrupt_inbox(InterruptInfo::new(false))
|
||||||
|
.await;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1531,7 +1534,10 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> Result<()>
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.context("failed to insert into smtp_mdns")?;
|
.context("failed to insert into smtp_mdns")?;
|
||||||
context.interrupt_smtp(InterruptInfo::new(false)).await;
|
context
|
||||||
|
.scheduler
|
||||||
|
.interrupt_smtp(InterruptInfo::new(false))
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
updated_chat_ids.insert(curr_chat_id);
|
updated_chat_ids.insert(curr_chat_id);
|
||||||
|
|||||||
@@ -115,7 +115,9 @@ impl Context {
|
|||||||
let requested = self.quota_update_request.swap(true, Ordering::Relaxed);
|
let requested = self.quota_update_request.swap(true, Ordering::Relaxed);
|
||||||
if !requested {
|
if !requested {
|
||||||
// Quota update was not requested before.
|
// Quota update was not requested before.
|
||||||
self.interrupt_inbox(InterruptInfo::new(false)).await;
|
self.scheduler
|
||||||
|
.interrupt_inbox(InterruptInfo::new(false))
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
251
src/scheduler.rs
251
src/scheduler.rs
@@ -5,6 +5,7 @@ use anyhow::{bail, Context as _, Result};
|
|||||||
use async_channel::{self as channel, Receiver, Sender};
|
use async_channel::{self as channel, Receiver, Sender};
|
||||||
use futures::future::try_join_all;
|
use futures::future::try_join_all;
|
||||||
use futures_lite::FutureExt;
|
use futures_lite::FutureExt;
|
||||||
|
use tokio::sync::{RwLock, RwLockWriteGuard};
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
|
|
||||||
use self::connectivity::ConnectivityStore;
|
use self::connectivity::ConnectivityStore;
|
||||||
@@ -23,10 +24,210 @@ use crate::tools::{duration_to_str, maybe_add_time_based_warnings};
|
|||||||
|
|
||||||
pub(crate) mod connectivity;
|
pub(crate) mod connectivity;
|
||||||
|
|
||||||
|
/// State of the IO scheduler, as stored on the [`Context`].
|
||||||
|
///
|
||||||
|
/// The IO scheduler can be stopped or started, but core can also pause it. After pausing
|
||||||
|
/// the IO scheduler will be restarted only if it was running before paused or
|
||||||
|
/// [`Context::start_io`] was called in the meantime while it was paused.
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub(crate) struct SchedulerState {
|
||||||
|
inner: RwLock<InnerSchedulerState>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SchedulerState {
|
||||||
|
pub(crate) fn new() -> Self {
|
||||||
|
Default::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Whether the scheduler is currently running.
|
||||||
|
pub(crate) async fn is_running(&self) -> bool {
|
||||||
|
let inner = self.inner.read().await;
|
||||||
|
inner.scheduler.is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Starts the scheduler if it is not yet started.
|
||||||
|
pub(crate) async fn start(&self, context: Context) {
|
||||||
|
let mut inner = self.inner.write().await;
|
||||||
|
inner.started = true;
|
||||||
|
if inner.scheduler.is_none() && !inner.paused {
|
||||||
|
Self::do_start(inner, context).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Starts the scheduler if it is not yet started.
|
||||||
|
async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) {
|
||||||
|
info!(context, "starting IO");
|
||||||
|
let ctx = context.clone();
|
||||||
|
match Scheduler::start(context).await {
|
||||||
|
Ok(scheduler) => inner.scheduler = Some(scheduler),
|
||||||
|
Err(err) => error!(&ctx, "Failed to start IO: {:#}", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stops the scheduler if it is currently running.
|
||||||
|
pub(crate) async fn stop(&self, context: &Context) {
|
||||||
|
let mut inner = self.inner.write().await;
|
||||||
|
inner.started = false;
|
||||||
|
Self::do_stop(inner, context).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stops the scheduler if it is currently running.
|
||||||
|
async fn do_stop(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: &Context) {
|
||||||
|
// Sending an event wakes up event pollers (get_next_event)
|
||||||
|
// so the caller of stop_io() can arrange for proper termination.
|
||||||
|
// For this, the caller needs to instruct the event poller
|
||||||
|
// to terminate on receiving the next event and then call stop_io()
|
||||||
|
// which will emit the below event(s)
|
||||||
|
info!(context, "stopping IO");
|
||||||
|
if let Some(debug_logging) = context.debug_logging.read().await.as_ref() {
|
||||||
|
debug_logging.loop_handle.abort();
|
||||||
|
}
|
||||||
|
if let Some(scheduler) = inner.scheduler.take() {
|
||||||
|
scheduler.stop(context).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Pauses the IO scheduler.
|
||||||
|
///
|
||||||
|
/// If it is currently running the scheduler will be stopped. When
|
||||||
|
/// [`IoPausedGuard::resume`] is called the scheduler is started again.
|
||||||
|
///
|
||||||
|
/// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
|
||||||
|
/// resume will do the right thing and restore the scheduler to the state requested by
|
||||||
|
/// the last call.
|
||||||
|
pub(crate) async fn pause<'a>(&'_ self, context: &'a Context) -> IoPausedGuard<'a> {
|
||||||
|
let mut inner = self.inner.write().await;
|
||||||
|
inner.paused = true;
|
||||||
|
Self::do_stop(inner, context).await;
|
||||||
|
IoPausedGuard {
|
||||||
|
context,
|
||||||
|
done: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Restarts the scheduler, only if it is running.
|
||||||
|
pub(crate) async fn restart(&self, context: &Context) {
|
||||||
|
info!(context, "restarting IO");
|
||||||
|
if self.is_running().await {
|
||||||
|
self.stop(context).await;
|
||||||
|
self.start(context.clone()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Indicate that the network likely has come back.
|
||||||
|
pub(crate) async fn maybe_network(&self) {
|
||||||
|
let inner = self.inner.read().await;
|
||||||
|
let (inbox, oboxes) = match inner.scheduler {
|
||||||
|
Some(ref scheduler) => {
|
||||||
|
scheduler.maybe_network();
|
||||||
|
let inbox = scheduler.inbox.conn_state.state.connectivity.clone();
|
||||||
|
let oboxes = scheduler
|
||||||
|
.oboxes
|
||||||
|
.iter()
|
||||||
|
.map(|b| b.conn_state.state.connectivity.clone())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
(inbox, oboxes)
|
||||||
|
}
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
drop(inner);
|
||||||
|
connectivity::idle_interrupted(inbox, oboxes).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Indicate that the network likely is lost.
|
||||||
|
pub(crate) async fn maybe_network_lost(&self, context: &Context) {
|
||||||
|
let inner = self.inner.read().await;
|
||||||
|
let stores = match inner.scheduler {
|
||||||
|
Some(ref scheduler) => {
|
||||||
|
scheduler.maybe_network_lost();
|
||||||
|
scheduler
|
||||||
|
.boxes()
|
||||||
|
.map(|b| b.conn_state.state.connectivity.clone())
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
drop(inner);
|
||||||
|
connectivity::maybe_network_lost(context, stores).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) {
|
||||||
|
let inner = self.inner.read().await;
|
||||||
|
if let Some(ref scheduler) = inner.scheduler {
|
||||||
|
scheduler.interrupt_inbox(info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) {
|
||||||
|
let inner = self.inner.read().await;
|
||||||
|
if let Some(ref scheduler) = inner.scheduler {
|
||||||
|
scheduler.interrupt_smtp(info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn interrupt_ephemeral_task(&self) {
|
||||||
|
let inner = self.inner.read().await;
|
||||||
|
if let Some(ref scheduler) = inner.scheduler {
|
||||||
|
scheduler.interrupt_ephemeral_task();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn interrupt_location(&self) {
|
||||||
|
let inner = self.inner.read().await;
|
||||||
|
if let Some(ref scheduler) = inner.scheduler {
|
||||||
|
scheduler.interrupt_location();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
|
||||||
|
let inner = self.inner.read().await;
|
||||||
|
if let Some(ref scheduler) = inner.scheduler {
|
||||||
|
scheduler.interrupt_recently_seen(contact_id, timestamp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
struct InnerSchedulerState {
|
||||||
|
scheduler: Option<Scheduler>,
|
||||||
|
started: bool,
|
||||||
|
paused: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(crate) struct IoPausedGuard<'a> {
|
||||||
|
context: &'a Context,
|
||||||
|
done: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> IoPausedGuard<'a> {
|
||||||
|
pub(crate) async fn resume(&mut self) {
|
||||||
|
self.done = true;
|
||||||
|
let mut inner = self.context.scheduler.inner.write().await;
|
||||||
|
inner.paused = false;
|
||||||
|
if inner.started && inner.scheduler.is_none() {
|
||||||
|
SchedulerState::do_start(inner, self.context.clone()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Drop for IoPausedGuard<'a> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if self.done {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Async .resume() should be called manually due to lack of async drop.
|
||||||
|
error!(self.context, "Pause guard dropped without resuming.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct SchedBox {
|
struct SchedBox {
|
||||||
meaning: FolderMeaning,
|
meaning: FolderMeaning,
|
||||||
conn_state: ImapConnectionState,
|
conn_state: ImapConnectionState,
|
||||||
|
|
||||||
|
/// IMAP loop task handle.
|
||||||
handle: task::JoinHandle<()>,
|
handle: task::JoinHandle<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -46,56 +247,6 @@ pub(crate) struct Scheduler {
|
|||||||
recently_seen_loop: RecentlySeenLoop,
|
recently_seen_loop: RecentlySeenLoop,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Context {
|
|
||||||
/// Indicate that the network likely has come back.
|
|
||||||
pub async fn maybe_network(&self) {
|
|
||||||
let lock = self.scheduler.read().await;
|
|
||||||
if let Some(scheduler) = &*lock {
|
|
||||||
scheduler.maybe_network();
|
|
||||||
}
|
|
||||||
connectivity::idle_interrupted(lock).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Indicate that the network likely is lost.
|
|
||||||
pub async fn maybe_network_lost(&self) {
|
|
||||||
let lock = self.scheduler.read().await;
|
|
||||||
if let Some(scheduler) = &*lock {
|
|
||||||
scheduler.maybe_network_lost();
|
|
||||||
}
|
|
||||||
connectivity::maybe_network_lost(self, lock).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) {
|
|
||||||
if let Some(scheduler) = &*self.scheduler.read().await {
|
|
||||||
scheduler.interrupt_inbox(info);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) {
|
|
||||||
if let Some(scheduler) = &*self.scheduler.read().await {
|
|
||||||
scheduler.interrupt_smtp(info);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn interrupt_ephemeral_task(&self) {
|
|
||||||
if let Some(scheduler) = &*self.scheduler.read().await {
|
|
||||||
scheduler.interrupt_ephemeral_task();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn interrupt_location(&self) {
|
|
||||||
if let Some(scheduler) = &*self.scheduler.read().await {
|
|
||||||
scheduler.interrupt_location();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
|
|
||||||
if let Some(scheduler) = &*self.scheduler.read().await {
|
|
||||||
scheduler.interrupt_recently_seen(contact_id, timestamp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) {
|
async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) {
|
||||||
use futures::future::FutureExt;
|
use futures::future::FutureExt;
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ use std::{iter::once, ops::Deref, sync::Arc};
|
|||||||
|
|
||||||
use anyhow::{anyhow, Result};
|
use anyhow::{anyhow, Result};
|
||||||
use humansize::{format_size, BINARY};
|
use humansize::{format_size, BINARY};
|
||||||
use tokio::sync::{Mutex, RwLockReadGuard};
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
use crate::events::EventType;
|
use crate::events::EventType;
|
||||||
use crate::imap::{scan_folders::get_watched_folder_configs, FolderMeaning};
|
use crate::imap::{scan_folders::get_watched_folder_configs, FolderMeaning};
|
||||||
@@ -12,7 +12,7 @@ use crate::quota::{
|
|||||||
};
|
};
|
||||||
use crate::tools::time;
|
use crate::tools::time;
|
||||||
use crate::{context::Context, log::LogExt};
|
use crate::{context::Context, log::LogExt};
|
||||||
use crate::{scheduler::Scheduler, stock_str, tools};
|
use crate::{stock_str, tools};
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumProperty, PartialOrd, Ord)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumProperty, PartialOrd, Ord)]
|
||||||
pub enum Connectivity {
|
pub enum Connectivity {
|
||||||
@@ -156,19 +156,7 @@ impl ConnectivityStore {
|
|||||||
/// Set all folder states to InterruptingIdle in case they were `Connected` before.
|
/// Set all folder states to InterruptingIdle in case they were `Connected` before.
|
||||||
/// Called during `dc_maybe_network()` to make sure that `dc_accounts_all_work_done()`
|
/// Called during `dc_maybe_network()` to make sure that `dc_accounts_all_work_done()`
|
||||||
/// returns false immediately after `dc_maybe_network()`.
|
/// returns false immediately after `dc_maybe_network()`.
|
||||||
pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option<Scheduler>>) {
|
pub(crate) async fn idle_interrupted(inbox: ConnectivityStore, oboxes: Vec<ConnectivityStore>) {
|
||||||
let (inbox, oboxes) = match &*scheduler {
|
|
||||||
Some(Scheduler { inbox, oboxes, .. }) => (
|
|
||||||
inbox.conn_state.state.connectivity.clone(),
|
|
||||||
oboxes
|
|
||||||
.iter()
|
|
||||||
.map(|b| b.conn_state.state.connectivity.clone())
|
|
||||||
.collect::<Vec<_>>(),
|
|
||||||
),
|
|
||||||
None => return,
|
|
||||||
};
|
|
||||||
drop(scheduler);
|
|
||||||
|
|
||||||
let mut connectivity_lock = inbox.0.lock().await;
|
let mut connectivity_lock = inbox.0.lock().await;
|
||||||
// For the inbox, we also have to set the connectivity to InterruptingIdle if it was
|
// For the inbox, we also have to set the connectivity to InterruptingIdle if it was
|
||||||
// NotConfigured before: If all folders are NotConfigured, dc_get_connectivity()
|
// NotConfigured before: If all folders are NotConfigured, dc_get_connectivity()
|
||||||
@@ -195,19 +183,7 @@ pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option<Sched
|
|||||||
/// Set the connectivity to "Not connected" after a call to dc_maybe_network_lost().
|
/// Set the connectivity to "Not connected" after a call to dc_maybe_network_lost().
|
||||||
/// If we did not do this, the connectivity would stay "Connected" for quite a long time
|
/// If we did not do this, the connectivity would stay "Connected" for quite a long time
|
||||||
/// after `maybe_network_lost()` was called.
|
/// after `maybe_network_lost()` was called.
|
||||||
pub(crate) async fn maybe_network_lost(
|
pub(crate) async fn maybe_network_lost(context: &Context, stores: Vec<ConnectivityStore>) {
|
||||||
context: &Context,
|
|
||||||
scheduler: RwLockReadGuard<'_, Option<Scheduler>>,
|
|
||||||
) {
|
|
||||||
let stores: Vec<_> = match &*scheduler {
|
|
||||||
Some(sched) => sched
|
|
||||||
.boxes()
|
|
||||||
.map(|b| b.conn_state.state.connectivity.clone())
|
|
||||||
.collect(),
|
|
||||||
None => return,
|
|
||||||
};
|
|
||||||
drop(scheduler);
|
|
||||||
|
|
||||||
for store in &stores {
|
for store in &stores {
|
||||||
let mut connectivity_lock = store.0.lock().await;
|
let mut connectivity_lock = store.0.lock().await;
|
||||||
if !matches!(
|
if !matches!(
|
||||||
@@ -249,9 +225,9 @@ impl Context {
|
|||||||
///
|
///
|
||||||
/// If the connectivity changes, a DC_EVENT_CONNECTIVITY_CHANGED will be emitted.
|
/// If the connectivity changes, a DC_EVENT_CONNECTIVITY_CHANGED will be emitted.
|
||||||
pub async fn get_connectivity(&self) -> Connectivity {
|
pub async fn get_connectivity(&self) -> Connectivity {
|
||||||
let lock = self.scheduler.read().await;
|
let lock = self.scheduler.inner.read().await;
|
||||||
let stores: Vec<_> = match &*lock {
|
let stores: Vec<_> = match lock.scheduler {
|
||||||
Some(sched) => sched
|
Some(ref sched) => sched
|
||||||
.boxes()
|
.boxes()
|
||||||
.map(|b| b.conn_state.state.connectivity.clone())
|
.map(|b| b.conn_state.state.connectivity.clone())
|
||||||
.collect(),
|
.collect(),
|
||||||
@@ -332,9 +308,9 @@ impl Context {
|
|||||||
// Get the states from the RwLock
|
// Get the states from the RwLock
|
||||||
// =============================================================================================
|
// =============================================================================================
|
||||||
|
|
||||||
let lock = self.scheduler.read().await;
|
let lock = self.scheduler.inner.read().await;
|
||||||
let (folders_states, smtp) = match &*lock {
|
let (folders_states, smtp) = match lock.scheduler {
|
||||||
Some(sched) => (
|
Some(ref sched) => (
|
||||||
sched
|
sched
|
||||||
.boxes()
|
.boxes()
|
||||||
.map(|b| (b.meaning, b.conn_state.state.connectivity.clone()))
|
.map(|b| (b.meaning, b.conn_state.state.connectivity.clone()))
|
||||||
@@ -503,9 +479,9 @@ impl Context {
|
|||||||
|
|
||||||
/// Returns true if all background work is done.
|
/// Returns true if all background work is done.
|
||||||
pub async fn all_work_done(&self) -> bool {
|
pub async fn all_work_done(&self) -> bool {
|
||||||
let lock = self.scheduler.read().await;
|
let lock = self.scheduler.inner.read().await;
|
||||||
let stores: Vec<_> = match &*lock {
|
let stores: Vec<_> = match lock.scheduler {
|
||||||
Some(sched) => sched
|
Some(ref sched) => sched
|
||||||
.boxes()
|
.boxes()
|
||||||
.map(|b| &b.conn_state.state)
|
.map(|b| &b.conn_state.state)
|
||||||
.chain(once(&sched.smtp.state))
|
.chain(once(&sched.smtp.state))
|
||||||
|
|||||||
@@ -421,7 +421,9 @@ impl Context {
|
|||||||
DO UPDATE SET last_serial=excluded.last_serial, descr=excluded.descr",
|
DO UPDATE SET last_serial=excluded.last_serial, descr=excluded.descr",
|
||||||
paramsv![instance.id, status_update_serial, status_update_serial, descr],
|
paramsv![instance.id, status_update_serial, status_update_serial, descr],
|
||||||
).await?;
|
).await?;
|
||||||
self.interrupt_smtp(InterruptInfo::new(false)).await;
|
self.scheduler
|
||||||
|
.interrupt_smtp(InterruptInfo::new(false))
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user