Merge remote-tracking branch 'origin/master' into flub/send-backup

This commit is contained in:
link2xt
2023-03-19 11:10:07 +00:00
30 changed files with 379 additions and 264 deletions

View File

@@ -271,14 +271,14 @@ impl Accounts {
/// Notifies all accounts that the network may have become available.
pub async fn maybe_network(&self) {
for account in self.accounts.values() {
account.maybe_network().await;
account.scheduler.maybe_network().await;
}
}
/// Notifies all accounts that the network connection may have been lost.
pub async fn maybe_network_lost(&self) {
for account in self.accounts.values() {
account.maybe_network_lost().await;
account.scheduler.maybe_network_lost(account).await;
}
}

View File

@@ -639,7 +639,10 @@ impl ChatId {
context.emit_msgs_changed_without_ids();
context.set_config(Config::LastHousekeeping, None).await?;
context.interrupt_inbox(InterruptInfo::new(false)).await;
context
.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
if chat.is_self_talk() {
let mut msg = Message::new(Viewtype::Text);
@@ -1667,7 +1670,7 @@ impl Chat {
maybe_set_logging_xdc(context, msg, self.id).await?;
}
context.interrupt_ephemeral_task().await;
context.scheduler.interrupt_ephemeral_task().await;
Ok(msg.id)
}
}
@@ -2201,7 +2204,10 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) -
context.emit_event(EventType::LocationChanged(Some(ContactId::SELF)));
}
context.interrupt_smtp(InterruptInfo::new(false)).await;
context
.scheduler
.interrupt_smtp(InterruptInfo::new(false))
.await;
}
Ok(msg.id)
@@ -3433,7 +3439,10 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId)
.await?;
curr_timestamp += 1;
if create_send_msg_job(context, new_msg_id).await?.is_some() {
context.interrupt_smtp(InterruptInfo::new(false)).await;
context
.scheduler
.interrupt_smtp(InterruptInfo::new(false))
.await;
}
}
created_chats.push(chat_id);
@@ -3488,7 +3497,10 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
msg_id: msg.id,
});
if create_send_msg_job(context, msg.id).await?.is_some() {
context.interrupt_smtp(InterruptInfo::new(false)).await;
context
.scheduler
.interrupt_smtp(InterruptInfo::new(false))
.await;
}
}
}

View File

@@ -443,7 +443,7 @@ impl Context {
Config::DeleteDeviceAfter => {
let ret = self.sql.set_raw_config(key.as_ref(), value).await;
// Interrupt ephemeral loop to delete old messages immediately.
self.interrupt_ephemeral_task().await;
self.scheduler.interrupt_ephemeral_task().await;
ret?
}
Config::Displayname => {

View File

@@ -59,7 +59,7 @@ impl Context {
/// Configures this account with the currently set parameters.
pub async fn configure(&self) -> Result<()> {
ensure!(
self.scheduler.read().await.is_none(),
!self.scheduler.is_running().await,
"cannot configure, already running"
);
ensure!(
@@ -469,7 +469,9 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> {
ctx.set_config_bool(Config::FetchedExistingMsgs, false)
.await?;
ctx.interrupt_inbox(InterruptInfo::new(false)).await;
ctx.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
progress!(ctx, 940);
update_device_chats_handle.await??;

View File

@@ -1466,7 +1466,10 @@ pub(crate) async fn update_last_seen(
> 0
&& timestamp > time() - SEEN_RECENTLY_SECONDS
{
context.interrupt_recently_seen(contact_id, timestamp).await;
context
.scheduler
.interrupt_recently_seen(contact_id, timestamp)
.await;
}
Ok(())
}

View File

@@ -25,7 +25,7 @@ use crate::login_param::LoginParam;
use crate::message::{self, MessageState, MsgId};
use crate::qr::Qr;
use crate::quota::QuotaInfo;
use crate::scheduler::Scheduler;
use crate::scheduler::SchedulerState;
use crate::sql::Sql;
use crate::stock_str::StockStrings;
use crate::timesmearing::SmearedTimestamp;
@@ -206,7 +206,7 @@ pub struct InnerContext {
pub(crate) translated_stockstrings: StockStrings,
pub(crate) events: Events,
pub(crate) scheduler: RwLock<Option<Scheduler>>,
pub(crate) scheduler: SchedulerState,
pub(crate) ratelimit: RwLock<Ratelimit>,
/// Recently loaded quota information, if any.
@@ -383,7 +383,7 @@ impl Context {
wrong_pw_warning_mutex: Mutex::new(()),
translated_stockstrings: stockstrings,
events,
scheduler: RwLock::new(None),
scheduler: SchedulerState::new(),
ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow to send 6 messages immediately, no more than once every 10 seconds.
quota: RwLock::new(None),
quota_update_request: AtomicBool::new(false),
@@ -409,42 +409,23 @@ impl Context {
warn!(self, "can not start io on a context that is not configured");
return;
}
info!(self, "starting IO");
let mut lock = self.inner.scheduler.write().await;
if lock.is_none() {
match Scheduler::start(self.clone()).await {
Err(err) => error!(self, "Failed to start IO: {:#}", err),
Ok(scheduler) => *lock = Some(scheduler),
}
}
self.scheduler.start(self.clone()).await;
}
/// Stops the IO scheduler.
pub async fn stop_io(&self) {
// Sending an event wakes up event pollers (get_next_event)
// so the caller of stop_io() can arrange for proper termination.
// For this, the caller needs to instruct the event poller
// to terminate on receiving the next event and then call stop_io()
// which will emit the below event(s)
info!(self, "stopping IO");
if let Some(debug_logging) = self.debug_logging.read().await.as_ref() {
debug_logging.loop_handle.abort();
}
if let Some(scheduler) = self.inner.scheduler.write().await.take() {
scheduler.stop(self).await;
}
self.scheduler.stop(self).await;
}
/// Restarts the IO scheduler if it was running before
/// when it is not running this is an no-op
pub async fn restart_io_if_running(&self) {
info!(self, "restarting IO");
let is_running = { self.inner.scheduler.read().await.is_some() };
if is_running {
self.stop_io().await;
self.start_io().await;
}
self.scheduler.restart(self).await;
}
/// Indicate that the network likely has come back.
pub async fn maybe_network(&self) {
self.scheduler.maybe_network().await;
}
/// Returns a reference to the underlying SQL instance.

View File

@@ -317,7 +317,7 @@ impl MsgId {
paramsv![ephemeral_timestamp, ephemeral_timestamp, self],
)
.await?;
context.interrupt_ephemeral_task().await;
context.scheduler.interrupt_ephemeral_task().await;
}
Ok(())
}
@@ -345,7 +345,7 @@ pub(crate) async fn start_ephemeral_timers_msgids(
)
.await?;
if count > 0 {
context.interrupt_ephemeral_task().await;
context.scheduler.interrupt_ephemeral_task().await;
}
Ok(())
}

View File

@@ -475,7 +475,7 @@ impl Imap {
// Note that the `Config::DeleteDeviceAfter` timer starts as soon as the messages are
// fetched while the per-chat ephemeral timers start as soon as the messages are marked
// as noticed.
context.interrupt_ephemeral_task().await;
context.scheduler.interrupt_ephemeral_task().await;
}
let session = self
@@ -2224,7 +2224,10 @@ pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str)
paramsv![message_id],
)
.await?;
context.interrupt_inbox(InterruptInfo::new(false)).await;
context
.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
Ok(())
}

View File

@@ -90,13 +90,17 @@ pub async fn imex(
) -> Result<()> {
let cancel = context.alloc_ongoing().await?;
let res = imex_inner(context, what, path, passphrase)
.race(async {
cancel.recv().await.ok();
Err(format_err!("canceled"))
})
.await;
let res = {
let mut guard = context.scheduler.pause(context).await;
let res = imex_inner(context, what, path, passphrase)
.race(async {
cancel.recv().await.ok();
Err(format_err!("canceled"))
})
.await;
guard.resume().await;
res
};
context.free_ongoing().await;
if let Err(err) = res.as_ref() {
@@ -417,7 +421,7 @@ async fn import_backup(
"Cannot import backups to accounts in use."
);
ensure!(
context.scheduler.read().await.is_none(),
!context.scheduler.is_running().await,
"cannot import backup, IO is running"
);

View File

@@ -238,6 +238,7 @@ fn get_backoff_time_offset(tries: u32) -> i64 {
pub(crate) async fn schedule_resync(context: &Context) -> Result<()> {
context.resync_request.store(true, Ordering::Relaxed);
context
.scheduler
.interrupt_inbox(InterruptInfo {
probe_network: false,
})
@@ -250,7 +251,10 @@ pub async fn add(context: &Context, job: Job) -> Result<()> {
job.save(context).await.context("failed to save job")?;
info!(context, "interrupt: imap");
context.interrupt_inbox(InterruptInfo::new(false)).await;
context
.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
Ok(())
}

View File

@@ -267,7 +267,7 @@ pub async fn send_locations_to_chat(
}
context.emit_event(EventType::ChatModified(chat_id));
if 0 != seconds {
context.interrupt_location().await;
context.scheduler.interrupt_location().await;
}
Ok(())
}

View File

@@ -1419,7 +1419,10 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
}
// Interrupt Inbox loop to start message deletion and run housekeeping.
context.interrupt_inbox(InterruptInfo::new(false)).await;
context
.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
Ok(())
}
@@ -1531,7 +1534,10 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> Result<()>
)
.await
.context("failed to insert into smtp_mdns")?;
context.interrupt_smtp(InterruptInfo::new(false)).await;
context
.scheduler
.interrupt_smtp(InterruptInfo::new(false))
.await;
}
}
updated_chat_ids.insert(curr_chat_id);

View File

@@ -2,7 +2,7 @@ use async_native_tls::TlsStream;
use fast_socks5::client::Socks5Stream;
use std::pin::Pin;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite, BufWriter};
use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, BufStream, BufWriter};
use tokio_io_timeout::TimeoutStream;
pub(crate) trait SessionStream:
@@ -22,6 +22,11 @@ impl<T: SessionStream> SessionStream for TlsStream<T> {
self.get_mut().set_read_timeout(timeout);
}
}
impl<T: SessionStream> SessionStream for BufStream<T> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.get_mut().set_read_timeout(timeout);
}
}
impl<T: SessionStream> SessionStream for BufWriter<T> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.get_mut().set_read_timeout(timeout);
@@ -39,3 +44,8 @@ impl<T: SessionStream> SessionStream for Socks5Stream<T> {
self.get_socket_mut().set_read_timeout(timeout)
}
}
/// Session stream with a read buffer.
pub(crate) trait SessionBufStream: SessionStream + AsyncBufRead {}
impl<T: SessionStream + AsyncBufRead> SessionBufStream for T {}

View File

@@ -1513,7 +1513,7 @@ static P_ZOHO: Lazy<Provider> = Lazy::new(|| Provider {
});
pub(crate) static PROVIDER_DATA: Lazy<HashMap<&'static str, &'static Provider>> = Lazy::new(|| {
[
HashMap::from([
("163.com", &*P_163),
("aktivix.org", &*P_AKTIVIX_ORG),
("aol.com", &*P_AOL),
@@ -1875,14 +1875,11 @@ pub(crate) static PROVIDER_DATA: Lazy<HashMap<&'static str, &'static Provider>>
("zohomail.eu", &*P_ZOHO),
("zohomail.com", &*P_ZOHO),
("zoho.com", &*P_ZOHO),
]
.iter()
.copied()
.collect()
])
});
pub(crate) static PROVIDER_IDS: Lazy<HashMap<&'static str, &'static Provider>> = Lazy::new(|| {
[
HashMap::from([
("163", &*P_163),
("aktivix.org", &*P_AKTIVIX_ORG),
("aol", &*P_AOL),
@@ -1945,10 +1942,7 @@ pub(crate) static PROVIDER_IDS: Lazy<HashMap<&'static str, &'static Provider>> =
("yggmail", &*P_YGGMAIL),
("ziggo.nl", &*P_ZIGGO_NL),
("zoho", &*P_ZOHO),
]
.iter()
.copied()
.collect()
])
});
pub static PROVIDER_UPDATED: Lazy<chrono::NaiveDate> =

View File

@@ -115,7 +115,9 @@ impl Context {
let requested = self.quota_update_request.swap(true, Ordering::Relaxed);
if !requested {
// Quota update was not requested before.
self.interrupt_inbox(InterruptInfo::new(false)).await;
self.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
}
Ok(())
}

View File

@@ -5,6 +5,7 @@ use anyhow::{bail, Context as _, Result};
use async_channel::{self as channel, Receiver, Sender};
use futures::future::try_join_all;
use futures_lite::FutureExt;
use tokio::sync::{RwLock, RwLockWriteGuard};
use tokio::task;
use self::connectivity::ConnectivityStore;
@@ -23,10 +24,210 @@ use crate::tools::{duration_to_str, maybe_add_time_based_warnings};
pub(crate) mod connectivity;
/// State of the IO scheduler, as stored on the [`Context`].
///
/// The IO scheduler can be stopped or started, but core can also pause it. After pausing
/// the IO scheduler will be restarted only if it was running before paused or
/// [`Context::start_io`] was called in the meantime while it was paused.
#[derive(Debug, Default)]
pub(crate) struct SchedulerState {
inner: RwLock<InnerSchedulerState>,
}
impl SchedulerState {
pub(crate) fn new() -> Self {
Default::default()
}
/// Whether the scheduler is currently running.
pub(crate) async fn is_running(&self) -> bool {
let inner = self.inner.read().await;
inner.scheduler.is_some()
}
/// Starts the scheduler if it is not yet started.
pub(crate) async fn start(&self, context: Context) {
let mut inner = self.inner.write().await;
inner.started = true;
if inner.scheduler.is_none() && !inner.paused {
Self::do_start(inner, context).await;
}
}
/// Starts the scheduler if it is not yet started.
async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) {
info!(context, "starting IO");
let ctx = context.clone();
match Scheduler::start(context).await {
Ok(scheduler) => inner.scheduler = Some(scheduler),
Err(err) => error!(&ctx, "Failed to start IO: {:#}", err),
}
}
/// Stops the scheduler if it is currently running.
pub(crate) async fn stop(&self, context: &Context) {
let mut inner = self.inner.write().await;
inner.started = false;
Self::do_stop(inner, context).await;
}
/// Stops the scheduler if it is currently running.
async fn do_stop(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: &Context) {
// Sending an event wakes up event pollers (get_next_event)
// so the caller of stop_io() can arrange for proper termination.
// For this, the caller needs to instruct the event poller
// to terminate on receiving the next event and then call stop_io()
// which will emit the below event(s)
info!(context, "stopping IO");
if let Some(debug_logging) = context.debug_logging.read().await.as_ref() {
debug_logging.loop_handle.abort();
}
if let Some(scheduler) = inner.scheduler.take() {
scheduler.stop(context).await;
}
}
/// Pauses the IO scheduler.
///
/// If it is currently running the scheduler will be stopped. When
/// [`IoPausedGuard::resume`] is called the scheduler is started again.
///
/// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
/// resume will do the right thing and restore the scheduler to the state requested by
/// the last call.
pub(crate) async fn pause<'a>(&'_ self, context: &'a Context) -> IoPausedGuard<'a> {
let mut inner = self.inner.write().await;
inner.paused = true;
Self::do_stop(inner, context).await;
IoPausedGuard {
context,
done: false,
}
}
/// Restarts the scheduler, only if it is running.
pub(crate) async fn restart(&self, context: &Context) {
info!(context, "restarting IO");
if self.is_running().await {
self.stop(context).await;
self.start(context.clone()).await;
}
}
/// Indicate that the network likely has come back.
pub(crate) async fn maybe_network(&self) {
let inner = self.inner.read().await;
let (inbox, oboxes) = match inner.scheduler {
Some(ref scheduler) => {
scheduler.maybe_network();
let inbox = scheduler.inbox.conn_state.state.connectivity.clone();
let oboxes = scheduler
.oboxes
.iter()
.map(|b| b.conn_state.state.connectivity.clone())
.collect::<Vec<_>>();
(inbox, oboxes)
}
None => return,
};
drop(inner);
connectivity::idle_interrupted(inbox, oboxes).await;
}
/// Indicate that the network likely is lost.
pub(crate) async fn maybe_network_lost(&self, context: &Context) {
let inner = self.inner.read().await;
let stores = match inner.scheduler {
Some(ref scheduler) => {
scheduler.maybe_network_lost();
scheduler
.boxes()
.map(|b| b.conn_state.state.connectivity.clone())
.collect()
}
None => return,
};
drop(inner);
connectivity::maybe_network_lost(context, stores).await;
}
pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) {
let inner = self.inner.read().await;
if let Some(ref scheduler) = inner.scheduler {
scheduler.interrupt_inbox(info);
}
}
pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) {
let inner = self.inner.read().await;
if let Some(ref scheduler) = inner.scheduler {
scheduler.interrupt_smtp(info);
}
}
pub(crate) async fn interrupt_ephemeral_task(&self) {
let inner = self.inner.read().await;
if let Some(ref scheduler) = inner.scheduler {
scheduler.interrupt_ephemeral_task();
}
}
pub(crate) async fn interrupt_location(&self) {
let inner = self.inner.read().await;
if let Some(ref scheduler) = inner.scheduler {
scheduler.interrupt_location();
}
}
pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
let inner = self.inner.read().await;
if let Some(ref scheduler) = inner.scheduler {
scheduler.interrupt_recently_seen(contact_id, timestamp);
}
}
}
#[derive(Debug, Default)]
struct InnerSchedulerState {
scheduler: Option<Scheduler>,
started: bool,
paused: bool,
}
#[derive(Debug)]
pub(crate) struct IoPausedGuard<'a> {
context: &'a Context,
done: bool,
}
impl<'a> IoPausedGuard<'a> {
pub(crate) async fn resume(&mut self) {
self.done = true;
let mut inner = self.context.scheduler.inner.write().await;
inner.paused = false;
if inner.started && inner.scheduler.is_none() {
SchedulerState::do_start(inner, self.context.clone()).await;
}
}
}
impl<'a> Drop for IoPausedGuard<'a> {
fn drop(&mut self) {
if self.done {
return;
}
// Async .resume() should be called manually due to lack of async drop.
error!(self.context, "Pause guard dropped without resuming.");
}
}
#[derive(Debug)]
struct SchedBox {
meaning: FolderMeaning,
conn_state: ImapConnectionState,
/// IMAP loop task handle.
handle: task::JoinHandle<()>,
}
@@ -46,56 +247,6 @@ pub(crate) struct Scheduler {
recently_seen_loop: RecentlySeenLoop,
}
impl Context {
/// Indicate that the network likely has come back.
pub async fn maybe_network(&self) {
let lock = self.scheduler.read().await;
if let Some(scheduler) = &*lock {
scheduler.maybe_network();
}
connectivity::idle_interrupted(lock).await;
}
/// Indicate that the network likely is lost.
pub async fn maybe_network_lost(&self) {
let lock = self.scheduler.read().await;
if let Some(scheduler) = &*lock {
scheduler.maybe_network_lost();
}
connectivity::maybe_network_lost(self, lock).await;
}
pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) {
if let Some(scheduler) = &*self.scheduler.read().await {
scheduler.interrupt_inbox(info);
}
}
pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) {
if let Some(scheduler) = &*self.scheduler.read().await {
scheduler.interrupt_smtp(info);
}
}
pub(crate) async fn interrupt_ephemeral_task(&self) {
if let Some(scheduler) = &*self.scheduler.read().await {
scheduler.interrupt_ephemeral_task();
}
}
pub(crate) async fn interrupt_location(&self) {
if let Some(scheduler) = &*self.scheduler.read().await {
scheduler.interrupt_location();
}
}
pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
if let Some(scheduler) = &*self.scheduler.read().await {
scheduler.interrupt_recently_seen(contact_id, timestamp);
}
}
}
async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) {
use futures::future::FutureExt;

View File

@@ -3,7 +3,7 @@ use std::{iter::once, ops::Deref, sync::Arc};
use anyhow::{anyhow, Result};
use humansize::{format_size, BINARY};
use tokio::sync::{Mutex, RwLockReadGuard};
use tokio::sync::Mutex;
use crate::events::EventType;
use crate::imap::{scan_folders::get_watched_folder_configs, FolderMeaning};
@@ -12,7 +12,7 @@ use crate::quota::{
};
use crate::tools::time;
use crate::{context::Context, log::LogExt};
use crate::{scheduler::Scheduler, stock_str, tools};
use crate::{stock_str, tools};
#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumProperty, PartialOrd, Ord)]
pub enum Connectivity {
@@ -156,19 +156,7 @@ impl ConnectivityStore {
/// Set all folder states to InterruptingIdle in case they were `Connected` before.
/// Called during `dc_maybe_network()` to make sure that `dc_accounts_all_work_done()`
/// returns false immediately after `dc_maybe_network()`.
pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option<Scheduler>>) {
let (inbox, oboxes) = match &*scheduler {
Some(Scheduler { inbox, oboxes, .. }) => (
inbox.conn_state.state.connectivity.clone(),
oboxes
.iter()
.map(|b| b.conn_state.state.connectivity.clone())
.collect::<Vec<_>>(),
),
None => return,
};
drop(scheduler);
pub(crate) async fn idle_interrupted(inbox: ConnectivityStore, oboxes: Vec<ConnectivityStore>) {
let mut connectivity_lock = inbox.0.lock().await;
// For the inbox, we also have to set the connectivity to InterruptingIdle if it was
// NotConfigured before: If all folders are NotConfigured, dc_get_connectivity()
@@ -195,19 +183,7 @@ pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option<Sched
/// Set the connectivity to "Not connected" after a call to dc_maybe_network_lost().
/// If we did not do this, the connectivity would stay "Connected" for quite a long time
/// after `maybe_network_lost()` was called.
pub(crate) async fn maybe_network_lost(
context: &Context,
scheduler: RwLockReadGuard<'_, Option<Scheduler>>,
) {
let stores: Vec<_> = match &*scheduler {
Some(sched) => sched
.boxes()
.map(|b| b.conn_state.state.connectivity.clone())
.collect(),
None => return,
};
drop(scheduler);
pub(crate) async fn maybe_network_lost(context: &Context, stores: Vec<ConnectivityStore>) {
for store in &stores {
let mut connectivity_lock = store.0.lock().await;
if !matches!(
@@ -249,9 +225,9 @@ impl Context {
///
/// If the connectivity changes, a DC_EVENT_CONNECTIVITY_CHANGED will be emitted.
pub async fn get_connectivity(&self) -> Connectivity {
let lock = self.scheduler.read().await;
let stores: Vec<_> = match &*lock {
Some(sched) => sched
let lock = self.scheduler.inner.read().await;
let stores: Vec<_> = match lock.scheduler {
Some(ref sched) => sched
.boxes()
.map(|b| b.conn_state.state.connectivity.clone())
.collect(),
@@ -332,9 +308,9 @@ impl Context {
// Get the states from the RwLock
// =============================================================================================
let lock = self.scheduler.read().await;
let (folders_states, smtp) = match &*lock {
Some(sched) => (
let lock = self.scheduler.inner.read().await;
let (folders_states, smtp) = match lock.scheduler {
Some(ref sched) => (
sched
.boxes()
.map(|b| (b.meaning, b.conn_state.state.connectivity.clone()))
@@ -503,9 +479,9 @@ impl Context {
/// Returns true if all background work is done.
pub async fn all_work_done(&self) -> bool {
let lock = self.scheduler.read().await;
let stores: Vec<_> = match &*lock {
Some(sched) => sched
let lock = self.scheduler.inner.read().await;
let stores: Vec<_> = match lock.scheduler {
Some(ref sched) => sched
.boxes()
.map(|b| &b.conn_state.state)
.chain(once(&sched.smtp.state))

View File

@@ -7,7 +7,7 @@ use std::time::{Duration, SystemTime};
use anyhow::{bail, format_err, Context as _, Error, Result};
use async_smtp::response::{Category, Code, Detail};
use async_smtp::{self as smtp, EmailAddress, SmtpTransport};
use tokio::io::BufWriter;
use tokio::io::BufStream;
use tokio::task;
use crate::config::Config;
@@ -18,7 +18,7 @@ use crate::message::Message;
use crate::message::{self, MsgId};
use crate::mimefactory::MimeFactory;
use crate::net::connect_tcp;
use crate::net::session::SessionStream;
use crate::net::session::SessionBufStream;
use crate::net::tls::wrap_tls;
use crate::oauth2::get_oauth2_access_token;
use crate::provider::Socket;
@@ -32,7 +32,7 @@ const SMTP_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Default)]
pub(crate) struct Smtp {
/// SMTP connection.
transport: Option<SmtpTransport<Box<dyn SessionStream>>>,
transport: Option<SmtpTransport<Box<dyn SessionBufStream>>>,
/// Email address we are sending from.
from: Option<EmailAddress>,
@@ -116,13 +116,13 @@ impl Smtp {
port: u16,
strict_tls: bool,
socks5_config: Socks5Config,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> {
) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let socks5_stream = socks5_config
.connect(context, hostname, port, SMTP_TIMEOUT, strict_tls)
.await?;
let tls_stream = wrap_tls(strict_tls, hostname, socks5_stream).await?;
let buffered_stream = BufWriter::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let buffered_stream = BufStream::new(tls_stream);
let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)
@@ -135,20 +135,20 @@ impl Smtp {
port: u16,
strict_tls: bool,
socks5_config: Socks5Config,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> {
) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let socks5_stream = socks5_config
.connect(context, hostname, port, SMTP_TIMEOUT, strict_tls)
.await?;
// Run STARTTLS command and convert the client back into a stream.
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, socks5_stream).await?;
let transport = SmtpTransport::new(client, BufStream::new(socks5_stream)).await?;
let tcp_stream = transport.starttls().await?;
let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream)
.await
.context("STARTTLS upgrade failed")?;
let buffered_stream = BufWriter::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let buffered_stream = BufStream::new(tls_stream);
let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true).without_greeting();
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)
@@ -160,12 +160,12 @@ impl Smtp {
hostname: &str,
port: u16,
socks5_config: Socks5Config,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> {
) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let socks5_stream = socks5_config
.connect(context, hostname, port, SMTP_TIMEOUT, false)
.await?;
let buffered_stream = BufWriter::new(socks5_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let buffered_stream = BufStream::new(socks5_stream);
let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)
@@ -177,11 +177,11 @@ impl Smtp {
hostname: &str,
port: u16,
strict_tls: bool,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> {
) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, false).await?;
let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream).await?;
let buffered_stream = BufWriter::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let buffered_stream = BufStream::new(tls_stream);
let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)
@@ -193,18 +193,18 @@ impl Smtp {
hostname: &str,
port: u16,
strict_tls: bool,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> {
) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, strict_tls).await?;
// Run STARTTLS command and convert the client back into a stream.
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, tcp_stream).await?;
let transport = SmtpTransport::new(client, BufStream::new(tcp_stream)).await?;
let tcp_stream = transport.starttls().await?;
let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream)
.await
.context("STARTTLS upgrade failed")?;
let buffered_stream = BufWriter::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let buffered_stream = BufStream::new(tls_stream);
let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true).without_greeting();
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)
@@ -215,10 +215,10 @@ impl Smtp {
context: &Context,
hostname: &str,
port: u16,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> {
) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, false).await?;
let buffered_stream = BufWriter::new(tcp_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let buffered_stream = BufStream::new(tcp_stream);
let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)

View File

@@ -109,8 +109,8 @@ impl Context {
Ok(())
}
// Add deleted qr-code token to the list of items to be synced
// so that the token also gets deleted on the other devices.
/// Adds deleted qr-code token to the list of items to be synced
/// so that the token also gets deleted on the other devices.
pub(crate) async fn sync_qr_code_token_deletion(
&self,
invitenumber: String,

View File

@@ -421,7 +421,9 @@ impl Context {
DO UPDATE SET last_serial=excluded.last_serial, descr=excluded.descr",
paramsv![instance.id, status_update_serial, status_update_serial, descr],
).await?;
self.interrupt_smtp(InterruptInfo::new(false)).await;
self.scheduler
.interrupt_smtp(InterruptInfo::new(false))
.await;
}
Ok(())
}