diff --git a/CHANGELOG.md b/CHANGELOG.md index f4670a963..f58163a97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## Unreleased +### Changes +- Send locations in the background regardless of SMTP loop activity #3247 + ### Fixes - simplify `dc_stop_io()` and remove potential panics and race conditions #3273 diff --git a/src/job.rs b/src/job.rs index 20856eef0..3a28d10c5 100644 --- a/src/job.rs +++ b/src/job.rs @@ -14,7 +14,6 @@ use crate::context::Context; use crate::dc_tools::time; use crate::events::EventType; use crate::imap::Imap; -use crate::location; use crate::message::{Message, MsgId}; use crate::mimefactory::MimeFactory; use crate::param::{Param, Params}; @@ -90,8 +89,6 @@ pub enum Action { ResyncFolders = 300, // Jobs in the SMTP-thread, range from DC_SMTP_THREAD..DC_SMTP_THREAD+999 - MaybeSendLocations = 5005, // low priority ... - MaybeSendLocationsEnded = 5007, SendMdn = 5010, } @@ -105,8 +102,6 @@ impl From for Thread { UpdateRecentQuota => Thread::Imap, DownloadMsg => Thread::Imap, - MaybeSendLocations => Thread::Smtp, - MaybeSendLocationsEnded => Thread::Smtp, SendMdn => Thread::Smtp, } } @@ -571,10 +566,6 @@ async fn perform_job_action( let try_res = match job.action { Action::SendMdn => job.send_mdn(context, connection.smtp()).await, - Action::MaybeSendLocations => location::job_maybe_send_locations(context, job).await, - Action::MaybeSendLocationsEnded => { - location::job_maybe_send_locations_ended(context, job).await - } Action::ResyncFolders => job.resync_folders(context, connection.inbox()).await, Action::FetchExistingMsgs => job.fetch_existing_msgs(context, connection.inbox()).await, Action::UpdateRecentQuota => match context.update_recent_quota(connection.inbox()).await { @@ -647,7 +638,7 @@ pub async fn add(context: &Context, job: Job) -> Result<()> { info!(context, "interrupt: imap"); context.interrupt_inbox(InterruptInfo::new(false)).await; } - Action::MaybeSendLocations | Action::MaybeSendLocationsEnded | Action::SendMdn => { + Action::SendMdn => { info!(context, "interrupt: smtp"); context.interrupt_smtp(InterruptInfo::new(false)).await; } diff --git a/src/location.rs b/src/location.rs index a85d1b8c8..8ca110693 100644 --- a/src/location.rs +++ b/src/location.rs @@ -1,19 +1,20 @@ //! Location handling. use std::convert::TryFrom; -use anyhow::{ensure, Result}; +use anyhow::{ensure, Context as _, Result}; +use async_std::channel::Receiver; +use async_std::future::timeout; use bitflags::bitflags; use quick_xml::events::{BytesEnd, BytesStart, BytesText}; +use std::time::Duration; use crate::chat::{self, ChatId}; use crate::contact::ContactId; use crate::context::Context; -use crate::dc_tools::time; +use crate::dc_tools::{duration_to_str, time}; use crate::events::EventType; -use crate::job::{self, Job}; use crate::message::{Message, MsgId, Viewtype}; use crate::mimeparser::SystemMessage; -use crate::param::Params; use crate::stock_str; /// Location record @@ -226,32 +227,11 @@ pub async fn send_locations_to_chat( } context.emit_event(EventType::ChatModified(chat_id)); if 0 != seconds { - schedule_maybe_send_locations(context, false).await?; - job::add( - context, - job::Job::new( - job::Action::MaybeSendLocationsEnded, - chat_id.to_u32(), - Params::new(), - seconds + 1, - ), - ) - .await?; + context.interrupt_location().await; } Ok(()) } -async fn schedule_maybe_send_locations(context: &Context, force_schedule: bool) -> Result<()> { - if force_schedule || !job::action_exists(context, job::Action::MaybeSendLocations).await? { - job::add( - context, - job::Job::new(job::Action::MaybeSendLocations, 0, Params::new(), 60), - ) - .await?; - }; - Ok(()) -} - /// Returns whether `chat_id` or any chat is sending locations. /// /// If `chat_id` is `Some` only that chat is checked, otherwise returns `true` if any chat @@ -318,13 +298,13 @@ pub async fn set(context: &Context, latitude: f64, longitude: f64, accuracy: f64 ).await { warn!(context, "failed to store location {:?}", err); } else { + info!(context, "stored location for chat {}", chat_id); continue_streaming = true; } } if continue_streaming { context.emit_event(EventType::LocationChanged(Some(ContactId::SELF))); }; - schedule_maybe_send_locations(context, false).await.ok(); } continue_streaming @@ -607,147 +587,140 @@ pub(crate) async fn save( Ok(newest_location_id) } -pub(crate) async fn job_maybe_send_locations(context: &Context, _job: &Job) -> job::Status { - let now = time(); - let mut continue_streaming = false; - info!( - context, - " ----------------- MAYBE_SEND_LOCATIONS -------------- ", - ); +pub(crate) async fn location_loop(context: &Context, interrupt_receiver: Receiver<()>) { + loop { + let next_event = match maybe_send_locations(context).await { + Err(err) => { + warn!(context, "maybe_send_locations failed: {}", err); + Some(60) // Retry one minute later. + } + Ok(next_event) => next_event, + }; + let duration = if let Some(next_event) = next_event { + Duration::from_secs(next_event) + } else { + Duration::from_secs(86400) + }; + + info!( + context, + "Location loop is waiting for {} or interrupt", + duration_to_str(duration) + ); + timeout(duration, interrupt_receiver.recv()).await.ok(); + } +} + +/// Returns number of seconds until the next time location streaming for some chat ends +/// automatically. +async fn maybe_send_locations(context: &Context) -> Result> { + let mut next_event: Option = None; + + let now = time(); let rows = context .sql .query_map( - "SELECT id, locations_send_begin, locations_last_sent \ - FROM chats \ - WHERE locations_send_until>?;", - paramsv![now], + "SELECT id, locations_send_begin, locations_send_until, locations_last_sent + FROM chats + WHERE locations_send_until>0", + [], |row| { let chat_id: ChatId = row.get(0)?; let locations_send_begin: i64 = row.get(1)?; - let locations_last_sent: i64 = row.get(2)?; - continue_streaming = true; - - // be a bit tolerant as the timer may not align exactly with time(NULL) - if now - locations_last_sent < (60 - 3) { - Ok(None) - } else { - Ok(Some((chat_id, locations_send_begin, locations_last_sent))) - } + let locations_send_until: i64 = row.get(2)?; + let locations_last_sent: i64 = row.get(3)?; + Ok(( + chat_id, + locations_send_begin, + locations_send_until, + locations_last_sent, + )) }, |rows| { - rows.filter_map(|v| v.transpose()) - .collect::, _>>() + rows.collect::, _>>() .map_err(Into::into) }, ) - .await; + .await + .context("failed to query location streaming chats")?; - if let Ok(rows) = rows { - let mut msgs = Vec::new(); - - { - let conn = job_try!(context.sql.get_conn().await); - - let mut stmt_locations = job_try!(conn.prepare_cached( - "SELECT id \ + for (chat_id, locations_send_begin, locations_send_until, locations_last_sent) in rows { + if locations_send_begin > 0 && locations_send_until > now { + let can_send = now > locations_last_sent + 60; + let has_locations = context + .sql + .exists( + "SELECT COUNT(id) \ FROM locations \ WHERE from_id=? \ AND timestamp>=? \ AND timestamp>? \ - AND independent=0 \ - ORDER BY timestamp;", - )); + AND independent=0", + paramsv![ContactId::SELF, locations_send_begin, locations_last_sent,], + ) + .await?; - for (chat_id, locations_send_begin, locations_last_sent) in &rows { - if !stmt_locations - .exists(paramsv![ - ContactId::SELF, - *locations_send_begin, - *locations_last_sent, - ]) - .unwrap_or_default() - { - // if there is no new location, there's nothing to send. - // however, maybe we want to bypass this test eg. 15 minutes - } else { - // pending locations are attached automatically to every message, + next_event = next_event + .into_iter() + .chain(u64::try_from(locations_send_until - now).into_iter()) + .min(); + + if has_locations { + if can_send { + // Send location-only message. + // Pending locations are attached automatically to every message, // so also to this empty text message. - // DC_CMD_LOCATION is only needed to create a nicer subject. - // - // for optimisation and to avoid flooding the sending queue, - // we could sending these messages only if we're really online. - // the easiest way to determine this, is to check for an empty message queue. - // (might not be 100%, however, as positions are sent combined later - // and dc_set_location() is typically called periodically, this is ok) + info!( + context, + "Chat {} has pending locations, sending them.", chat_id + ); let mut msg = Message::new(Viewtype::Text); msg.hidden = true; msg.param.set_cmd(SystemMessage::LocationOnly); - msgs.push((*chat_id, msg)); + chat::send_msg(context, chat_id, &mut msg).await?; + } else { + // Wait until pending locations can be sent. + info!( + context, + "Chat {} has pending locations, but they can't be sent yet.", chat_id + ); + next_event = next_event + .into_iter() + .chain(u64::try_from(locations_last_sent + 61 - now).into_iter()) + .min(); } + } else { + info!( + context, + "Chat {} has location streaming enabled, but no pending locations.", chat_id + ); } - } - - for (chat_id, mut msg) in msgs.into_iter() { - // TODO: better error handling - chat::send_msg(context, chat_id, &mut msg) - .await - .unwrap_or_default(); - } - } - - if continue_streaming { - job_try!(schedule_maybe_send_locations(context, true).await); - } - job::Status::Finished(Ok(())) -} - -pub(crate) async fn job_maybe_send_locations_ended( - context: &Context, - job: &mut Job, -) -> job::Status { - // this function is called when location-streaming _might_ have ended for a chat. - // the function checks, if location-streaming is really ended; - // if so, a device-message is added if not yet done. - - let chat_id = ChatId::new(job.foreign_id); - - let (send_begin, send_until) = job_try!( - context - .sql - .query_row( - "SELECT locations_send_begin, locations_send_until FROM chats WHERE id=?", - paramsv![chat_id], - |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)), - ) - .await - ); - - let now = time(); - if !(send_begin != 0 && now <= send_until) { - // still streaming - - // may happen as several calls to dc_send_locations_to_chat() - // do not un-schedule pending DC_MAYBE_SEND_LOC_ENDED jobs - if !(send_begin == 0 && send_until == 0) { - // not streaming, device-message already sent - job_try!( - context - .sql - .execute( - "UPDATE chats \ - SET locations_send_begin=0, locations_send_until=0 \ - WHERE id=?", - paramsv![chat_id], - ) - .await + } else { + // Location streaming was either explicitly disabled (locations_send_begin = 0) or + // locations_send_until is in the past. + info!( + context, + "Disabling location streaming for chat {}.", chat_id ); + context + .sql + .execute( + "UPDATE chats \ + SET locations_send_begin=0, locations_send_until=0 \ + WHERE id=?", + paramsv![chat_id], + ) + .await + .context("failed to disable location streaming")?; let stock_str = stock_str::msg_location_disabled(context).await; - job_try!(chat::add_info_msg(context, chat_id, &stock_str, now).await); + chat::add_info_msg(context, chat_id, &stock_str, now).await?; context.emit_event(EventType::ChatModified(chat_id)); } } - job::Status::Finished(Ok(())) + + Ok(next_event) } #[cfg(test)] diff --git a/src/scheduler.rs b/src/scheduler.rs index 5c2142263..26b60b43e 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -12,6 +12,7 @@ use crate::dc_tools::time; use crate::ephemeral::{self, delete_expired_imap_messages}; use crate::imap::Imap; use crate::job::{self, Thread}; +use crate::location; use crate::log::LogExt; use crate::smtp::{send_smtp_messages, Smtp}; use crate::sql; @@ -36,6 +37,8 @@ pub(crate) enum Scheduler { smtp_handle: Option>, ephemeral_handle: Option>, ephemeral_interrupt_send: Sender<()>, + location_handle: Option>, + location_interrupt_send: Sender<()>, }, } @@ -65,6 +68,10 @@ impl Context { pub(crate) async fn interrupt_ephemeral_task(&self) { self.scheduler.read().await.interrupt_ephemeral_task().await; } + + pub(crate) async fn interrupt_location(&self) { + self.scheduler.read().await.interrupt_location().await; + } } async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) { @@ -386,6 +393,7 @@ impl Scheduler { let mut sentbox_handle = None; let (smtp_start_send, smtp_start_recv) = channel::bounded(1); let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1); + let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1); let inbox_handle = { let ctx = ctx.clone(); @@ -454,6 +462,13 @@ impl Scheduler { })) }; + let location_handle = { + let ctx = ctx.clone(); + Some(task::spawn(async move { + location::location_loop(&ctx, location_interrupt_recv).await; + })) + }; + *self = Scheduler::Running { inbox, mvbox, @@ -465,6 +480,8 @@ impl Scheduler { smtp_handle, ephemeral_handle, ephemeral_interrupt_send, + location_handle, + location_interrupt_send, }; // wait for all loops to be started @@ -540,6 +557,16 @@ impl Scheduler { } } + async fn interrupt_location(&self) { + if let Scheduler::Running { + ref location_interrupt_send, + .. + } = self + { + location_interrupt_send.try_send(()).ok(); + } + } + /// Halt the scheduler. pub(crate) async fn stop(&mut self) -> Result<()> { match self { @@ -556,6 +583,7 @@ impl Scheduler { smtp, smtp_handle, ephemeral_handle, + location_handle, .. } => { if inbox_handle.is_some() { @@ -586,6 +614,9 @@ impl Scheduler { if let Some(handle) = ephemeral_handle.take() { handle.cancel().await; } + if let Some(handle) = location_handle.take() { + handle.cancel().await; + } *self = Scheduler::Stopped; Ok(())