Replace location jobs with async location loop

Locations are now sent in the background regardless
of whether SMTP loop is interrupted or not.
This commit is contained in:
link2xt
2022-04-24 05:41:12 +00:00
parent 3a10f0155f
commit 904e8966c0
4 changed files with 146 additions and 148 deletions

View File

@@ -2,6 +2,9 @@
## Unreleased
### Changes
- Send locations in the background regardless of SMTP loop activity #3247
### Fixes
- simplify `dc_stop_io()` and remove potential panics and race conditions #3273

View File

@@ -14,7 +14,6 @@ use crate::context::Context;
use crate::dc_tools::time;
use crate::events::EventType;
use crate::imap::Imap;
use crate::location;
use crate::message::{Message, MsgId};
use crate::mimefactory::MimeFactory;
use crate::param::{Param, Params};
@@ -90,8 +89,6 @@ pub enum Action {
ResyncFolders = 300,
// Jobs in the SMTP-thread, range from DC_SMTP_THREAD..DC_SMTP_THREAD+999
MaybeSendLocations = 5005, // low priority ...
MaybeSendLocationsEnded = 5007,
SendMdn = 5010,
}
@@ -105,8 +102,6 @@ impl From<Action> for Thread {
UpdateRecentQuota => Thread::Imap,
DownloadMsg => Thread::Imap,
MaybeSendLocations => Thread::Smtp,
MaybeSendLocationsEnded => Thread::Smtp,
SendMdn => Thread::Smtp,
}
}
@@ -571,10 +566,6 @@ async fn perform_job_action(
let try_res = match job.action {
Action::SendMdn => job.send_mdn(context, connection.smtp()).await,
Action::MaybeSendLocations => location::job_maybe_send_locations(context, job).await,
Action::MaybeSendLocationsEnded => {
location::job_maybe_send_locations_ended(context, job).await
}
Action::ResyncFolders => job.resync_folders(context, connection.inbox()).await,
Action::FetchExistingMsgs => job.fetch_existing_msgs(context, connection.inbox()).await,
Action::UpdateRecentQuota => match context.update_recent_quota(connection.inbox()).await {
@@ -647,7 +638,7 @@ pub async fn add(context: &Context, job: Job) -> Result<()> {
info!(context, "interrupt: imap");
context.interrupt_inbox(InterruptInfo::new(false)).await;
}
Action::MaybeSendLocations | Action::MaybeSendLocationsEnded | Action::SendMdn => {
Action::SendMdn => {
info!(context, "interrupt: smtp");
context.interrupt_smtp(InterruptInfo::new(false)).await;
}

View File

@@ -1,19 +1,20 @@
//! Location handling.
use std::convert::TryFrom;
use anyhow::{ensure, Result};
use anyhow::{ensure, Context as _, Result};
use async_std::channel::Receiver;
use async_std::future::timeout;
use bitflags::bitflags;
use quick_xml::events::{BytesEnd, BytesStart, BytesText};
use std::time::Duration;
use crate::chat::{self, ChatId};
use crate::contact::ContactId;
use crate::context::Context;
use crate::dc_tools::time;
use crate::dc_tools::{duration_to_str, time};
use crate::events::EventType;
use crate::job::{self, Job};
use crate::message::{Message, MsgId, Viewtype};
use crate::mimeparser::SystemMessage;
use crate::param::Params;
use crate::stock_str;
/// Location record
@@ -226,32 +227,11 @@ pub async fn send_locations_to_chat(
}
context.emit_event(EventType::ChatModified(chat_id));
if 0 != seconds {
schedule_maybe_send_locations(context, false).await?;
job::add(
context,
job::Job::new(
job::Action::MaybeSendLocationsEnded,
chat_id.to_u32(),
Params::new(),
seconds + 1,
),
)
.await?;
context.interrupt_location().await;
}
Ok(())
}
async fn schedule_maybe_send_locations(context: &Context, force_schedule: bool) -> Result<()> {
if force_schedule || !job::action_exists(context, job::Action::MaybeSendLocations).await? {
job::add(
context,
job::Job::new(job::Action::MaybeSendLocations, 0, Params::new(), 60),
)
.await?;
};
Ok(())
}
/// Returns whether `chat_id` or any chat is sending locations.
///
/// If `chat_id` is `Some` only that chat is checked, otherwise returns `true` if any chat
@@ -318,13 +298,13 @@ pub async fn set(context: &Context, latitude: f64, longitude: f64, accuracy: f64
).await {
warn!(context, "failed to store location {:?}", err);
} else {
info!(context, "stored location for chat {}", chat_id);
continue_streaming = true;
}
}
if continue_streaming {
context.emit_event(EventType::LocationChanged(Some(ContactId::SELF)));
};
schedule_maybe_send_locations(context, false).await.ok();
}
continue_streaming
@@ -607,147 +587,140 @@ pub(crate) async fn save(
Ok(newest_location_id)
}
pub(crate) async fn job_maybe_send_locations(context: &Context, _job: &Job) -> job::Status {
let now = time();
let mut continue_streaming = false;
info!(
context,
" ----------------- MAYBE_SEND_LOCATIONS -------------- ",
);
pub(crate) async fn location_loop(context: &Context, interrupt_receiver: Receiver<()>) {
loop {
let next_event = match maybe_send_locations(context).await {
Err(err) => {
warn!(context, "maybe_send_locations failed: {}", err);
Some(60) // Retry one minute later.
}
Ok(next_event) => next_event,
};
let duration = if let Some(next_event) = next_event {
Duration::from_secs(next_event)
} else {
Duration::from_secs(86400)
};
info!(
context,
"Location loop is waiting for {} or interrupt",
duration_to_str(duration)
);
timeout(duration, interrupt_receiver.recv()).await.ok();
}
}
/// Returns number of seconds until the next time location streaming for some chat ends
/// automatically.
async fn maybe_send_locations(context: &Context) -> Result<Option<u64>> {
let mut next_event: Option<u64> = None;
let now = time();
let rows = context
.sql
.query_map(
"SELECT id, locations_send_begin, locations_last_sent \
FROM chats \
WHERE locations_send_until>?;",
paramsv![now],
"SELECT id, locations_send_begin, locations_send_until, locations_last_sent
FROM chats
WHERE locations_send_until>0",
[],
|row| {
let chat_id: ChatId = row.get(0)?;
let locations_send_begin: i64 = row.get(1)?;
let locations_last_sent: i64 = row.get(2)?;
continue_streaming = true;
// be a bit tolerant as the timer may not align exactly with time(NULL)
if now - locations_last_sent < (60 - 3) {
Ok(None)
} else {
Ok(Some((chat_id, locations_send_begin, locations_last_sent)))
}
let locations_send_until: i64 = row.get(2)?;
let locations_last_sent: i64 = row.get(3)?;
Ok((
chat_id,
locations_send_begin,
locations_send_until,
locations_last_sent,
))
},
|rows| {
rows.filter_map(|v| v.transpose())
.collect::<std::result::Result<Vec<_>, _>>()
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.await;
.await
.context("failed to query location streaming chats")?;
if let Ok(rows) = rows {
let mut msgs = Vec::new();
{
let conn = job_try!(context.sql.get_conn().await);
let mut stmt_locations = job_try!(conn.prepare_cached(
"SELECT id \
for (chat_id, locations_send_begin, locations_send_until, locations_last_sent) in rows {
if locations_send_begin > 0 && locations_send_until > now {
let can_send = now > locations_last_sent + 60;
let has_locations = context
.sql
.exists(
"SELECT COUNT(id) \
FROM locations \
WHERE from_id=? \
AND timestamp>=? \
AND timestamp>? \
AND independent=0 \
ORDER BY timestamp;",
));
AND independent=0",
paramsv![ContactId::SELF, locations_send_begin, locations_last_sent,],
)
.await?;
for (chat_id, locations_send_begin, locations_last_sent) in &rows {
if !stmt_locations
.exists(paramsv![
ContactId::SELF,
*locations_send_begin,
*locations_last_sent,
])
.unwrap_or_default()
{
// if there is no new location, there's nothing to send.
// however, maybe we want to bypass this test eg. 15 minutes
} else {
// pending locations are attached automatically to every message,
next_event = next_event
.into_iter()
.chain(u64::try_from(locations_send_until - now).into_iter())
.min();
if has_locations {
if can_send {
// Send location-only message.
// Pending locations are attached automatically to every message,
// so also to this empty text message.
// DC_CMD_LOCATION is only needed to create a nicer subject.
//
// for optimisation and to avoid flooding the sending queue,
// we could sending these messages only if we're really online.
// the easiest way to determine this, is to check for an empty message queue.
// (might not be 100%, however, as positions are sent combined later
// and dc_set_location() is typically called periodically, this is ok)
info!(
context,
"Chat {} has pending locations, sending them.", chat_id
);
let mut msg = Message::new(Viewtype::Text);
msg.hidden = true;
msg.param.set_cmd(SystemMessage::LocationOnly);
msgs.push((*chat_id, msg));
chat::send_msg(context, chat_id, &mut msg).await?;
} else {
// Wait until pending locations can be sent.
info!(
context,
"Chat {} has pending locations, but they can't be sent yet.", chat_id
);
next_event = next_event
.into_iter()
.chain(u64::try_from(locations_last_sent + 61 - now).into_iter())
.min();
}
} else {
info!(
context,
"Chat {} has location streaming enabled, but no pending locations.", chat_id
);
}
}
for (chat_id, mut msg) in msgs.into_iter() {
// TODO: better error handling
chat::send_msg(context, chat_id, &mut msg)
.await
.unwrap_or_default();
}
}
if continue_streaming {
job_try!(schedule_maybe_send_locations(context, true).await);
}
job::Status::Finished(Ok(()))
}
pub(crate) async fn job_maybe_send_locations_ended(
context: &Context,
job: &mut Job,
) -> job::Status {
// this function is called when location-streaming _might_ have ended for a chat.
// the function checks, if location-streaming is really ended;
// if so, a device-message is added if not yet done.
let chat_id = ChatId::new(job.foreign_id);
let (send_begin, send_until) = job_try!(
context
.sql
.query_row(
"SELECT locations_send_begin, locations_send_until FROM chats WHERE id=?",
paramsv![chat_id],
|row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)),
)
.await
);
let now = time();
if !(send_begin != 0 && now <= send_until) {
// still streaming -
// may happen as several calls to dc_send_locations_to_chat()
// do not un-schedule pending DC_MAYBE_SEND_LOC_ENDED jobs
if !(send_begin == 0 && send_until == 0) {
// not streaming, device-message already sent
job_try!(
context
.sql
.execute(
"UPDATE chats \
SET locations_send_begin=0, locations_send_until=0 \
WHERE id=?",
paramsv![chat_id],
)
.await
} else {
// Location streaming was either explicitly disabled (locations_send_begin = 0) or
// locations_send_until is in the past.
info!(
context,
"Disabling location streaming for chat {}.", chat_id
);
context
.sql
.execute(
"UPDATE chats \
SET locations_send_begin=0, locations_send_until=0 \
WHERE id=?",
paramsv![chat_id],
)
.await
.context("failed to disable location streaming")?;
let stock_str = stock_str::msg_location_disabled(context).await;
job_try!(chat::add_info_msg(context, chat_id, &stock_str, now).await);
chat::add_info_msg(context, chat_id, &stock_str, now).await?;
context.emit_event(EventType::ChatModified(chat_id));
}
}
job::Status::Finished(Ok(()))
Ok(next_event)
}
#[cfg(test)]

View File

@@ -12,6 +12,7 @@ use crate::dc_tools::time;
use crate::ephemeral::{self, delete_expired_imap_messages};
use crate::imap::Imap;
use crate::job::{self, Thread};
use crate::location;
use crate::log::LogExt;
use crate::smtp::{send_smtp_messages, Smtp};
use crate::sql;
@@ -36,6 +37,8 @@ pub(crate) enum Scheduler {
smtp_handle: Option<task::JoinHandle<()>>,
ephemeral_handle: Option<task::JoinHandle<()>>,
ephemeral_interrupt_send: Sender<()>,
location_handle: Option<task::JoinHandle<()>>,
location_interrupt_send: Sender<()>,
},
}
@@ -65,6 +68,10 @@ impl Context {
pub(crate) async fn interrupt_ephemeral_task(&self) {
self.scheduler.read().await.interrupt_ephemeral_task().await;
}
pub(crate) async fn interrupt_location(&self) {
self.scheduler.read().await.interrupt_location().await;
}
}
async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) {
@@ -386,6 +393,7 @@ impl Scheduler {
let mut sentbox_handle = None;
let (smtp_start_send, smtp_start_recv) = channel::bounded(1);
let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
let inbox_handle = {
let ctx = ctx.clone();
@@ -454,6 +462,13 @@ impl Scheduler {
}))
};
let location_handle = {
let ctx = ctx.clone();
Some(task::spawn(async move {
location::location_loop(&ctx, location_interrupt_recv).await;
}))
};
*self = Scheduler::Running {
inbox,
mvbox,
@@ -465,6 +480,8 @@ impl Scheduler {
smtp_handle,
ephemeral_handle,
ephemeral_interrupt_send,
location_handle,
location_interrupt_send,
};
// wait for all loops to be started
@@ -540,6 +557,16 @@ impl Scheduler {
}
}
async fn interrupt_location(&self) {
if let Scheduler::Running {
ref location_interrupt_send,
..
} = self
{
location_interrupt_send.try_send(()).ok();
}
}
/// Halt the scheduler.
pub(crate) async fn stop(&mut self) -> Result<()> {
match self {
@@ -556,6 +583,7 @@ impl Scheduler {
smtp,
smtp_handle,
ephemeral_handle,
location_handle,
..
} => {
if inbox_handle.is_some() {
@@ -586,6 +614,9 @@ impl Scheduler {
if let Some(handle) = ephemeral_handle.take() {
handle.cancel().await;
}
if let Some(handle) = location_handle.take() {
handle.cancel().await;
}
*self = Scheduler::Stopped;
Ok(())