From fa87d2e22577a4e7e821ee054a63d42a91820e79 Mon Sep 17 00:00:00 2001 From: link2xt Date: Wed, 12 Apr 2023 21:48:14 +0000 Subject: [PATCH] New APIs for message processing loops This patch adds new C APIs dc_get_next_msgs() and dc_wait_next_msgs(), and their JSON-RPC counterparts get_next_msgs() and wait_next_msgs(). New configuration "last_msg_id" tracks the last message ID processed by the bot. get_next_msgs() returns message IDs above the "last_msg_id". wait_next_msgs() waits for new message notification and calls get_next_msgs(). wait_next_msgs() can be used to build a separate message processing loop independent of the event loop. Async Python API get_fresh_messages_in_arrival_order() is deprecated in favor of get_next_messages(). Introduced Python APIs: - Account.wait_next_incoming_message() - Message.is_from_self() - Message.is_from_device() Introduced Rust APIs: - Context.set_config_u32() - Context.get_config_u32() --- CHANGELOG.md | 8 ++ deltachat-ffi/deltachat.h | 74 +++++++++++- deltachat-ffi/src/lib.rs | 44 +++++++ deltachat-jsonrpc/src/api/mod.rs | 48 ++++++++ .../examples/echobot_no_hooks.py | 6 +- .../src/deltachat_rpc_client/__init__.py | 3 +- .../src/deltachat_rpc_client/account.py | 16 +++ .../src/deltachat_rpc_client/client.py | 7 +- deltachat-rpc-client/tests/test_something.py | 30 ++++- python/src/deltachat/account.py | 16 +++ python/src/deltachat/message.py | 10 ++ python/tests/test_1_online.py | 6 +- src/chat.rs | 3 + src/config.rs | 14 +++ src/context.rs | 111 +++++++++++++++++- src/message.rs | 6 + src/receive_imf.rs | 1 + src/scheduler.rs | 10 ++ 18 files changed, 398 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 714cd3d22..21a540326 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,14 @@ Also terminate on ctrl-c. - Refactorings #4317 - Add JSON-RPC API `can_send()`. +- New `dc_get_next_msgs()` and `dc_wait_next_msgs()` C APIs. + New `get_next_msgs()` and `wait_next_msgs()` JSON-RPC API. + These APIs can be used by bots to get all unprocessed messages + in the order of their arrival and wait for them without relying on events. +- Async Python API `get_fresh_messages_in_arrival_order()` is deprecated + in favor of `get_next_msgs()` and `wait_next_msgs()`. +- New Python bindings API `Account.wait_next_incoming_message()`. +- New Python bindings APIs `Message.is_from_self()` and `Message.is_from_device()`. ### Fixes - Fix python bindings README documentation on installing the bindings from source. diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index c901ab738..c76a5891a 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -181,12 +181,17 @@ typedef struct _dc_event_emitter dc_accounts_event_emitter_t; * and check it in the event loop thread * every time before calling dc_get_next_event(). * To terminate the event loop, main thread should: - * 1. Notify event loop that it should terminate by atomically setting the - * boolean flag in the memory shared between the main thread and event loop. + * 1. Notify background threads, + * such as event loop (blocking in dc_get_next_event()) + * and message processing loop (blocking in dc_wait_next_msgs()), + * that they should terminate by atomically setting the + * boolean flag in the memory + * shared between the main thread and background loop threads. * 2. Call dc_stop_io() or dc_accounts_stop_io(), depending * on whether a single account or account manager is used. * Stopping I/O is guaranteed to emit at least one event * and interrupt the event loop even if it was blocked on dc_get_next_event(). + * Stopping I/O is guaranteed to interrupt a single dc_wait_next_msgs(). * 3. Wait until the event loop thread notices the flag, * exits the event loop and terminates. * 4. Call dc_context_unref() or dc_accounts_unref(). @@ -457,6 +462,16 @@ char* dc_get_blobdir (const dc_context_t* context); * Prevents adding the "Device messages" and "Saved messages" chats, * adds Auto-Submitted header to outgoing messages * and accepts contact requests automatically (calling dc_accept_chat() is not needed for bots). + * - `last_msg_id` = database ID of the last message processed by the bot. + * This ID and IDs below it are guaranteed not to be returned + * by dc_get_next_msgs() and dc_wait_next_msgs(). + * The value is updated automatically + * when dc_markseen_msgs() is called, + * but the bot can also set it manually if it processed + * the message but does not want to mark it as seen. + * For most bots calling `dc_markseen_msgs()` is the + * recommended way to update this value + * even for self-sent messages. * - `fetch_existing_msgs` = 1=fetch most recent existing messages on configure (default), * 0=do not fetch existing messages on configure. * In both cases, existing recipients are added to the contact database. @@ -1343,6 +1358,56 @@ int dc_estimate_deletion_cnt (dc_context_t* context, int from_ser dc_array_t* dc_get_fresh_msgs (dc_context_t* context); +/** + * Returns the message IDs of all messages of any chat + * with a database ID higher than `last_msg_id` config value. + * + * This function is intended for use by bots. + * Self-sent messages, device messages, + * messages from contact requests + * and muted chats are included, + * but messages from explicitly blocked contacts + * and chats are ignored. + * + * This function may be called as a part of event loop + * triggered by DC_EVENT_INCOMING_MSG if you are only interested + * in the incoming messages. + * Otherwise use a separate message processing loop + * calling dc_wait_next_msgs() in a separate thread. + * + * @memberof dc_context_t + * @param context The context object as returned from dc_context_new(). + * @return An array of message IDs, must be dc_array_unref()'d when no longer used. + * On errors, the list is empty. NULL is never returned. + */ +dc_array_t* dc_get_next_msgs (dc_context_t* context); + + +/** + * Waits for notification of new messages + * and returns an array of new message IDs. + * See the documentation for dc_get_next_msgs() + * for the details of return value. + * + * This function waits for internal notification of + * a new message in the database and returns afterwards. + * Notification is also sent when I/O is started + * to allow processing new messages + * and when I/O is stopped using dc_stop_io() or dc_accounts_stop_io() + * to allow for manual interruption of the message processing loop. + * The function may return an empty array if there are + * no messages after notification, + * which may happen on start or if the message is quickly deleted + * after adding it to the database. + * + * @memberof dc_context_t + * @param context The context object as returned from dc_context_new(). + * @return An array of message IDs, must be dc_array_unref()'d when no longer used. + * On errors, the list is empty. NULL is never returned. + */ +dc_array_t* dc_wait_next_msgs (dc_context_t* context); + + /** * Mark all messages in a chat as _noticed_. * _Noticed_ messages are no longer _fresh_ and do not count as being unseen @@ -1942,6 +2007,11 @@ int dc_resend_msgs (dc_context_t* context, const uint3 * Moreover, timer is started for incoming ephemeral messages. * This also happens for contact requests chats. * + * This function updates last_msg_id configuration value + * to the maximum of the current value and IDs passed to this function. + * Bots which mark messages as seen can rely on this side effect + * to avoid updating last_msg_id value manually. + * * One #DC_EVENT_MSGS_NOTICED event is emitted per modified chat. * * @memberof dc_context_t diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index 7daa9d195..6872c6d38 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -1280,6 +1280,50 @@ pub unsafe extern "C" fn dc_get_fresh_msgs( }) } +#[no_mangle] +pub unsafe extern "C" fn dc_get_next_msgs(context: *mut dc_context_t) -> *mut dc_array::dc_array_t { + if context.is_null() { + eprintln!("ignoring careless call to dc_get_next_msgs()"); + return ptr::null_mut(); + } + let ctx = &*context; + + let msg_ids = block_on(ctx.get_next_msgs()) + .context("failed to get next messages") + .log_err(ctx) + .unwrap_or_default(); + let arr = dc_array_t::from( + msg_ids + .iter() + .map(|msg_id| msg_id.to_u32()) + .collect::>(), + ); + Box::into_raw(Box::new(arr)) +} + +#[no_mangle] +pub unsafe extern "C" fn dc_wait_next_msgs( + context: *mut dc_context_t, +) -> *mut dc_array::dc_array_t { + if context.is_null() { + eprintln!("ignoring careless call to dc_wait_next_msgs()"); + return ptr::null_mut(); + } + let ctx = &*context; + + let msg_ids = block_on(ctx.wait_next_msgs()) + .context("failed to wait for next messages") + .log_err(ctx) + .unwrap_or_default(); + let arr = dc_array_t::from( + msg_ids + .iter() + .map(|msg_id| msg_id.to_u32()) + .collect::>(), + ); + Box::into_raw(Box::new(arr)) +} + #[no_mangle] pub unsafe extern "C" fn dc_marknoticed_chat(context: *mut dc_context_t, chat_id: u32) { if context.is_null() { diff --git a/deltachat-jsonrpc/src/api/mod.rs b/deltachat-jsonrpc/src/api/mod.rs index eac81c312..cc994ed63 100644 --- a/deltachat-jsonrpc/src/api/mod.rs +++ b/deltachat-jsonrpc/src/api/mod.rs @@ -453,6 +453,49 @@ impl CommandApi { ChatId::new(chat_id).get_fresh_msg_cnt(&ctx).await } + /// Gets messages to be processed by the bot and returns their IDs. + /// + /// Only messages with database ID higher than `last_msg_id` config value + /// are returned. After processing the messages, the bot should + /// update `last_msg_id` by calling [`markseen_msgs`] + /// or manually updating the value to avoid getting already + /// processed messages. + /// + /// [`markseen_msgs`]: Self::markseen_msgs + async fn get_next_msgs(&self, account_id: u32) -> Result> { + let ctx = self.get_context(account_id).await?; + let msg_ids = ctx + .get_next_msgs() + .await? + .iter() + .map(|msg_id| msg_id.to_u32()) + .collect(); + Ok(msg_ids) + } + + /// Waits for messages to be processed by the bot and returns their IDs. + /// + /// This function is similar to [`get_next_msgs`], + /// but waits for internal new message notification before returning. + /// New message notification is sent when new message is added to the database, + /// on initialization, when I/O is started and when I/O is stopped. + /// This allows bots to use `wait_next_msgs` in a loop to process + /// old messages after initialization and during the bot runtime. + /// To shutdown the bot, stopping I/O can be used to interrupt + /// pending or next `wait_next_msgs` call. + /// + /// [`get_next_msgs`]: Self::get_next_msgs + async fn wait_next_msgs(&self, account_id: u32) -> Result> { + let ctx = self.get_context(account_id).await?; + let msg_ids = ctx + .wait_next_msgs() + .await? + .iter() + .map(|msg_id| msg_id.to_u32()) + .collect(); + Ok(msg_ids) + } + /// Estimate the number of messages that will be deleted /// by the set_config()-options `delete_device_after` or `delete_server_after`. /// This is typically used to show the estimated impact to the user @@ -944,6 +987,11 @@ impl CommandApi { /// Moreover, timer is started for incoming ephemeral messages. /// This also happens for contact requests chats. /// + /// This function updates `last_msg_id` configuration value + /// to the maximum of the current value and IDs passed to this function. + /// Bots which mark messages as seen can rely on this side effect + /// to avoid updating `last_msg_id` value manually. + /// /// One #DC_EVENT_MSGS_NOTICED event is emitted per modified chat. async fn markseen_msgs(&self, account_id: u32, msg_ids: Vec) -> Result<()> { let ctx = self.get_context(account_id).await?; diff --git a/deltachat-rpc-client/examples/echobot_no_hooks.py b/deltachat-rpc-client/examples/echobot_no_hooks.py index fadf2e560..77fda86e7 100644 --- a/deltachat-rpc-client/examples/echobot_no_hooks.py +++ b/deltachat-rpc-client/examples/echobot_no_hooks.py @@ -6,7 +6,7 @@ import asyncio import logging import sys -from deltachat_rpc_client import DeltaChat, EventType, Rpc +from deltachat_rpc_client import DeltaChat, EventType, Rpc, SpecialContactId async def main(): @@ -30,9 +30,9 @@ async def main(): await deltachat.start_io() async def process_messages(): - for message in await account.get_fresh_messages_in_arrival_order(): + for message in await account.get_next_messages(): snapshot = await message.get_snapshot() - if not snapshot.is_bot and not snapshot.is_info: + if snapshot.from_id != SpecialContactId.SELF and not snapshot.is_bot and not snapshot.is_info: await snapshot.chat.send_text(snapshot.text) await snapshot.message.mark_seen() diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py b/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py index 94edad50e..727c51c80 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py @@ -3,7 +3,7 @@ from ._utils import AttrDict, run_bot_cli, run_client_cli from .account import Account from .chat import Chat from .client import Bot, Client -from .const import EventType +from .const import EventType, SpecialContactId from .contact import Contact from .deltachat import DeltaChat from .message import Message @@ -19,6 +19,7 @@ __all__ = [ "DeltaChat", "EventType", "Message", + "SpecialContactId", "Rpc", "run_bot_cli", "run_client_cli", diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/account.py b/deltachat-rpc-client/src/deltachat_rpc_client/account.py index 6434ee966..4c44079f7 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/account.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/account.py @@ -1,5 +1,6 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, List, Optional, Tuple, Union +from warnings import warn from ._utils import AttrDict from .chat import Chat @@ -239,7 +240,22 @@ class Account: fresh_msg_ids = await self._rpc.get_fresh_msgs(self.id) return [Message(self, msg_id) for msg_id in fresh_msg_ids] + async def get_next_messages(self) -> List[Message]: + """Return a list of next messages.""" + next_msg_ids = await self._rpc.get_next_msgs(self.id) + return [Message(self, msg_id) for msg_id in next_msg_ids] + + async def wait_next_messages(self) -> List[Message]: + """Wait for new messages and return a list of them.""" + next_msg_ids = await self._rpc.wait_next_msgs(self.id) + return [Message(self, msg_id) for msg_id in next_msg_ids] + async def get_fresh_messages_in_arrival_order(self) -> List[Message]: """Return fresh messages list sorted in the order of their arrival, with ascending IDs.""" + warn( + "get_fresh_messages_in_arrival_order is deprecated, use get_next_messages instead.", + DeprecationWarning, + stacklevel=2, + ) fresh_msg_ids = sorted(await self._rpc.get_fresh_msgs(self.id)) return [Message(self, msg_id) for msg_id in fresh_msg_ids] diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/client.py b/deltachat-rpc-client/src/deltachat_rpc_client/client.py index 6f816e5de..5205a1ed9 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/client.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/client.py @@ -20,7 +20,7 @@ from ._utils import ( parse_system_image_changed, parse_system_title_changed, ) -from .const import COMMAND_PREFIX, EventType, SystemMessageType +from .const import COMMAND_PREFIX, EventType, SpecialContactId, SystemMessageType from .events import ( EventFilter, GroupImageChanged, @@ -189,9 +189,10 @@ class Client: async def _process_messages(self) -> None: if self._should_process_messages: - for message in await self.account.get_fresh_messages_in_arrival_order(): + for message in await self.account.get_next_messages(): snapshot = await message.get_snapshot() - await self._on_new_msg(snapshot) + if snapshot.from_id not in [SpecialContactId.SELF, SpecialContactId.DEVICE]: + await self._on_new_msg(snapshot) if snapshot.is_info and snapshot.system_message_type != SystemMessageType.WEBXDC_INFO_MESSAGE: await self._handle_info_msg(snapshot) await snapshot.message.mark_seen() diff --git a/deltachat-rpc-client/tests/test_something.py b/deltachat-rpc-client/tests/test_something.py index ee08791fe..d17c28a16 100644 --- a/deltachat-rpc-client/tests/test_something.py +++ b/deltachat-rpc-client/tests/test_something.py @@ -98,8 +98,8 @@ async def test_account(acfactory) -> None: assert await alice.get_chatlist() assert await alice.get_chatlist(snapshot=True) assert await alice.get_qr_code() - await alice.get_fresh_messages() - await alice.get_fresh_messages_in_arrival_order() + assert await alice.get_fresh_messages() + assert await alice.get_next_messages() group = await alice.create_group("test group") await group.add_contact(alice_contact_bob) @@ -305,3 +305,29 @@ async def test_bot(acfactory) -> None: await acfactory.process_message(from_account=user, to_client=bot, text="hello") event = await acfactory.process_message(from_account=user, to_client=bot, text="/help") mock.hook.assert_called_once_with(event.msg_id) + + +@pytest.mark.asyncio() +async def test_wait_next_messages(acfactory) -> None: + alice = await acfactory.new_configured_account() + + # Create a bot account so it does not receive device messages in the beginning. + bot = await acfactory.new_preconfigured_account() + await bot.set_config("bot", "1") + await bot.configure() + + # There are no old messages and the call returns immediately. + assert not await bot.wait_next_messages() + + # Bot starts waiting for messages. + next_messages_task = asyncio.create_task(bot.wait_next_messages()) + + bot_addr = await bot.get_config("addr") + alice_contact_bot = await alice.create_contact(bot_addr, "Bob") + alice_chat_bot = await alice_contact_bot.create_chat() + await alice_chat_bot.send_text("Hello!") + + next_messages = await next_messages_task + assert len(next_messages) == 1 + snapshot = await next_messages[0].get_snapshot() + assert snapshot.text == "Hello!" diff --git a/python/src/deltachat/account.py b/python/src/deltachat/account.py index 9d80e35f7..6d067f4fb 100644 --- a/python/src/deltachat/account.py +++ b/python/src/deltachat/account.py @@ -376,6 +376,22 @@ class Account: dc_array = ffi.gc(lib.dc_get_fresh_msgs(self._dc_context), lib.dc_array_unref) return (x for x in iter_array(dc_array, lambda x: Message.from_db(self, x)) if x is not None) + def _wait_next_message_ids(self) -> List[int]: + """Return IDs of all next messages from all chats.""" + dc_array = ffi.gc(lib.dc_wait_next_msgs(self._dc_context), lib.dc_array_unref) + return [lib.dc_array_get_id(dc_array, i) for i in range(lib.dc_array_get_cnt(dc_array))] + + def wait_next_incoming_message(self) -> Message: + """Waits until the next incoming message + with ID higher than given is received and returns it.""" + while True: + message_ids = self._wait_next_message_ids() + for msg_id in message_ids: + message = Message.from_db(self, msg_id) + if message and not message.is_from_self() and not message.is_from_device(): + self.set_config("last_msg_id", str(msg_id)) + return message + def create_chat(self, obj) -> Chat: """Create a 1:1 chat with Account, Contact or e-mail address.""" return self.create_contact(obj).create_chat() diff --git a/python/src/deltachat/message.py b/python/src/deltachat/message.py index 1c2986728..e58d97d05 100644 --- a/python/src/deltachat/message.py +++ b/python/src/deltachat/message.py @@ -344,6 +344,16 @@ class Message: contact_id = lib.dc_msg_get_from_id(self._dc_msg) return Contact(self.account, contact_id) + def is_from_self(self): + """Return true if the message is sent by self.""" + contact_id = lib.dc_msg_get_from_id(self._dc_msg) + return contact_id == const.DC_CONTACT_ID_SELF + + def is_from_device(self): + """Return true if the message is sent by the device.""" + contact_id = lib.dc_msg_get_from_id(self._dc_msg) + return contact_id == const.DC_CONTACT_ID_DEVICE + # # Message State query methods # diff --git a/python/tests/test_1_online.py b/python/tests/test_1_online.py index cfa5655a1..85e1774bd 100644 --- a/python/tests/test_1_online.py +++ b/python/tests/test_1_online.py @@ -44,21 +44,21 @@ def test_configure_generate_key(acfactory, lp): lp.sec("ac1: send unencrypted message to ac2") chat.send_text("message1") lp.sec("ac2: waiting for message from ac1") - msg_in = ac2._evtracker.wait_next_incoming_message() + msg_in = ac2.wait_next_incoming_message() assert msg_in.text == "message1" assert not msg_in.is_encrypted() lp.sec("ac2: send encrypted message to ac1") msg_in.chat.send_text("message2") lp.sec("ac1: waiting for message from ac2") - msg2_in = ac1._evtracker.wait_next_incoming_message() + msg2_in = ac1.wait_next_incoming_message() assert msg2_in.text == "message2" assert msg2_in.is_encrypted() lp.sec("ac1: send encrypted message to ac2") msg2_in.chat.send_text("message3") lp.sec("ac2: waiting for message from ac1") - msg3_in = ac2._evtracker.wait_next_incoming_message() + msg3_in = ac2.wait_next_incoming_message() assert msg3_in.text == "message3" assert msg3_in.is_encrypted() diff --git a/src/chat.rs b/src/chat.rs index d61f93bfd..7561c511f 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -1666,6 +1666,7 @@ impl Chat { ], ) .await?; + context.new_msgs_notify.notify_one(); msg.id = MsgId::new(u32::try_from(raw_id)?); maybe_set_logging_xdc(context, msg, self.id).await?; @@ -3628,6 +3629,7 @@ pub async fn add_device_msg_with_importance( ), ) .await?; + context.new_msgs_notify.notify_one(); msg_id = MsgId::new(u32::try_from(row_id)?); if !msg.hidden { @@ -3741,6 +3743,7 @@ pub(crate) async fn add_info_msg_with_cmd( parent.map(|msg|msg.rfc724_mid.clone()).unwrap_or_default() ) ).await?; + context.new_msgs_notify.notify_one(); let msg_id = MsgId::new(row_id.try_into()?); context.emit_msgs_changed(chat_id, msg_id); diff --git a/src/config.rs b/src/config.rs index 035caf314..3527e0096 100644 --- a/src/config.rs +++ b/src/config.rs @@ -308,6 +308,9 @@ pub enum Config { /// This value is used internally to remember the MsgId of the logging xdc #[strum(props(default = "0"))] DebugLogging, + + /// Last message processed by the bot. + LastMsgId, } impl Context { @@ -358,6 +361,11 @@ impl Context { Ok(self.get_config_parsed(key).await?.unwrap_or_default()) } + /// Returns 32-bit unsigned integer configuration value for the given key. + pub async fn get_config_u32(&self, key: Config) -> Result { + Ok(self.get_config_parsed(key).await?.unwrap_or_default()) + } + /// Returns 64-bit signed integer configuration value for the given key. pub async fn get_config_i64(&self, key: Config) -> Result { Ok(self.get_config_parsed(key).await?.unwrap_or_default()) @@ -459,6 +467,12 @@ impl Context { Ok(()) } + /// Set the given config to an unsigned 32-bit integer value. + pub async fn set_config_u32(&self, key: Config, value: u32) -> Result<()> { + self.set_config(key, Some(&value.to_string())).await?; + Ok(()) + } + /// Set the given config to a boolean value. pub async fn set_config_bool(&self, key: Config, value: bool) -> Result<()> { self.set_config(key, if value { Some("1") } else { Some("0") }) diff --git a/src/context.rs b/src/context.rs index 2161d0855..3245a6ff7 100644 --- a/src/context.rs +++ b/src/context.rs @@ -11,7 +11,7 @@ use std::time::{Duration, Instant, SystemTime}; use anyhow::{bail, ensure, Context as _, Result}; use async_channel::{self as channel, Receiver, Sender}; use ratelimit::Ratelimit; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{Mutex, Notify, RwLock}; use tokio::task; use crate::chat::{get_chat_cnt, ChatId}; @@ -218,6 +218,11 @@ pub struct InnerContext { /// IMAP UID resync request. pub(crate) resync_request: AtomicBool, + /// Notify about new messages. + /// + /// This causes [`Context::wait_next_msgs`] to wake up. + pub(crate) new_msgs_notify: Notify, + /// Server ID response if ID capability is supported /// and the server returned non-NIL on the inbox connection. /// @@ -363,6 +368,11 @@ impl Context { blobdir.display() ); + let new_msgs_notify = Notify::new(); + // Notify once immediately to allow processing old messages + // without starting I/O. + new_msgs_notify.notify_one(); + let inner = InnerContext { id, blobdir, @@ -379,6 +389,7 @@ impl Context { quota: RwLock::new(None), quota_update_request: AtomicBool::new(false), resync_request: AtomicBool::new(false), + new_msgs_notify, server_id: RwLock::new(None), creation_time: std::time::SystemTime::now(), last_full_folder_scan: Mutex::new(None), @@ -767,6 +778,10 @@ impl Context { "debug_logging", self.get_config_int(Config::DebugLogging).await?.to_string(), ); + res.insert( + "last_msg_id", + self.get_config_int(Config::LastMsgId).await?.to_string(), + ); let elapsed = self.creation_time.elapsed(); res.insert("uptime", duration_to_str(elapsed.unwrap_or_default())); @@ -813,6 +828,66 @@ impl Context { Ok(list) } + /// Returns a list of messages with database ID higher than requested. + /// + /// Blocked contacts and chats are excluded, + /// but self-sent messages and contact requests are included in the results. + pub async fn get_next_msgs(&self) -> Result> { + let last_msg_id = match self.get_config(Config::LastMsgId).await? { + Some(s) => MsgId::new(s.parse()?), + None => MsgId::new_unset(), + }; + + let list = self + .sql + .query_map( + "SELECT m.id + FROM msgs m + LEFT JOIN contacts ct + ON m.from_id=ct.id + LEFT JOIN chats c + ON m.chat_id=c.id + WHERE m.id>? + AND m.hidden=0 + AND m.chat_id>9 + AND ct.blocked=0 + AND c.blocked!=1 + ORDER BY m.id ASC", + ( + last_msg_id.to_u32(), // Explicitly convert to u32 because 0 is allowed. + ), + |row| { + let msg_id: MsgId = row.get(0)?; + Ok(msg_id) + }, + |rows| { + let mut list = Vec::new(); + for row in rows { + list.push(row?); + } + Ok(list) + }, + ) + .await?; + Ok(list) + } + + /// Returns a list of messages with database ID higher than last marked as seen. + /// + /// This function is supposed to be used by bot to request messages + /// that are not processed yet. + /// + /// Waits for notification and returns a result. + /// Note that the result may be empty if the message is deleted + /// shortly after notification or notification is manually triggered + /// to interrupt waiting. + /// Notification may be manually triggered by calling [`Self::stop_io`]. + pub async fn wait_next_msgs(&self) -> Result> { + self.new_msgs_notify.notified().await; + let list = self.get_next_msgs().await?; + Ok(list) + } + /// Searches for messages containing the query string. /// /// If `chat_id` is provided this searches only for messages in this chat, if `chat_id` @@ -1444,4 +1519,38 @@ mod tests { Ok(()) } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_get_next_msgs() -> Result<()> { + let alice = TestContext::new_alice().await; + let bob = TestContext::new_bob().await; + + let alice_chat = alice.create_chat(&bob).await; + + assert!(alice.get_next_msgs().await?.is_empty()); + assert!(bob.get_next_msgs().await?.is_empty()); + + let sent_msg = alice.send_text(alice_chat.id, "Hi Bob").await; + let received_msg = bob.recv_msg(&sent_msg).await; + + let bob_next_msg_ids = bob.get_next_msgs().await?; + assert_eq!(bob_next_msg_ids.len(), 1); + assert_eq!(bob_next_msg_ids.get(0), Some(&received_msg.id)); + + bob.set_config_u32(Config::LastMsgId, received_msg.id.to_u32()) + .await?; + assert!(bob.get_next_msgs().await?.is_empty()); + + // Next messages include self-sent messages. + let alice_next_msg_ids = alice.get_next_msgs().await?; + assert_eq!(alice_next_msg_ids.len(), 1); + assert_eq!(alice_next_msg_ids.get(0), Some(&sent_msg.sender_msg_id)); + + alice + .set_config_u32(Config::LastMsgId, sent_msg.sender_msg_id.to_u32()) + .await?; + assert!(alice.get_next_msgs().await?.is_empty()); + + Ok(()) + } } diff --git a/src/message.rs b/src/message.rs index 563020b7c..ea72a8c99 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1468,6 +1468,12 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec) -> Result<()> return Ok(()); } + let old_last_msg_id = MsgId::new(context.get_config_u32(Config::LastMsgId).await?); + let last_msg_id = msg_ids.iter().fold(&old_last_msg_id, std::cmp::max); + context + .set_config_u32(Config::LastMsgId, last_msg_id.to_u32()) + .await?; + let msgs = context .sql .query_map( diff --git a/src/receive_imf.rs b/src/receive_imf.rs index 9418a228f..c3b2712bf 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -363,6 +363,7 @@ pub(crate) async fn receive_imf_inner( chat_id.emit_msg_event(context, *msg_id, incoming && fresh); } } + context.new_msgs_notify.notify_one(); mime_parser .handle_reports(context, from_id, sent_timestamp, &mime_parser.parts) diff --git a/src/scheduler.rs b/src/scheduler.rs index 2404d7e22..840824f09 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -62,6 +62,11 @@ impl SchedulerState { /// Starts the scheduler if it is not yet started. async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) { info!(context, "starting IO"); + + // Notify message processing loop + // to allow processing old messages after restart. + context.new_msgs_notify.notify_one(); + let ctx = context.clone(); match Scheduler::start(context).await { Ok(scheduler) => *inner = InnerSchedulerState::Started(scheduler), @@ -95,6 +100,11 @@ impl SchedulerState { // to terminate on receiving the next event and then call stop_io() // which will emit the below event(s) info!(context, "stopping IO"); + + // Wake up message processing loop even if there are no messages + // to allow for clean shutdown. + context.new_msgs_notify.notify_one(); + if let Some(debug_logging) = context.debug_logging.read().await.as_ref() { debug_logging.loop_handle.abort(); }