New APIs for message processing loops

This patch adds new C APIs
dc_get_next_msgs() and dc_wait_next_msgs(),
and their JSON-RPC counterparts
get_next_msgs() and wait_next_msgs().

New configuration "last_msg_id"
tracks the last message ID processed by the bot.
get_next_msgs() returns message IDs above
the "last_msg_id".
wait_next_msgs() waits for new message notification
and calls get_next_msgs().
wait_next_msgs() can be used to build
a separate message processing loop
independent of the event loop.

Async Python API get_fresh_messages_in_arrival_order()
is deprecated in favor of get_next_messages().

Introduced Python APIs:
- Account.wait_next_incoming_message()
- Message.is_from_self()
- Message.is_from_device()

Introduced Rust APIs:
- Context.set_config_u32()
- Context.get_config_u32()
This commit is contained in:
link2xt
2023-04-12 21:48:14 +00:00
parent 28a13e98a6
commit fa87d2e225
18 changed files with 398 additions and 15 deletions

View File

@@ -14,6 +14,14 @@
Also terminate on ctrl-c. Also terminate on ctrl-c.
- Refactorings #4317 - Refactorings #4317
- Add JSON-RPC API `can_send()`. - Add JSON-RPC API `can_send()`.
- New `dc_get_next_msgs()` and `dc_wait_next_msgs()` C APIs.
New `get_next_msgs()` and `wait_next_msgs()` JSON-RPC API.
These APIs can be used by bots to get all unprocessed messages
in the order of their arrival and wait for them without relying on events.
- Async Python API `get_fresh_messages_in_arrival_order()` is deprecated
in favor of `get_next_msgs()` and `wait_next_msgs()`.
- New Python bindings API `Account.wait_next_incoming_message()`.
- New Python bindings APIs `Message.is_from_self()` and `Message.is_from_device()`.
### Fixes ### Fixes
- Fix python bindings README documentation on installing the bindings from source. - Fix python bindings README documentation on installing the bindings from source.

View File

@@ -181,12 +181,17 @@ typedef struct _dc_event_emitter dc_accounts_event_emitter_t;
* and check it in the event loop thread * and check it in the event loop thread
* every time before calling dc_get_next_event(). * every time before calling dc_get_next_event().
* To terminate the event loop, main thread should: * To terminate the event loop, main thread should:
* 1. Notify event loop that it should terminate by atomically setting the * 1. Notify background threads,
* boolean flag in the memory shared between the main thread and event loop. * such as event loop (blocking in dc_get_next_event())
* and message processing loop (blocking in dc_wait_next_msgs()),
* that they should terminate by atomically setting the
* boolean flag in the memory
* shared between the main thread and background loop threads.
* 2. Call dc_stop_io() or dc_accounts_stop_io(), depending * 2. Call dc_stop_io() or dc_accounts_stop_io(), depending
* on whether a single account or account manager is used. * on whether a single account or account manager is used.
* Stopping I/O is guaranteed to emit at least one event * Stopping I/O is guaranteed to emit at least one event
* and interrupt the event loop even if it was blocked on dc_get_next_event(). * and interrupt the event loop even if it was blocked on dc_get_next_event().
* Stopping I/O is guaranteed to interrupt a single dc_wait_next_msgs().
* 3. Wait until the event loop thread notices the flag, * 3. Wait until the event loop thread notices the flag,
* exits the event loop and terminates. * exits the event loop and terminates.
* 4. Call dc_context_unref() or dc_accounts_unref(). * 4. Call dc_context_unref() or dc_accounts_unref().
@@ -457,6 +462,16 @@ char* dc_get_blobdir (const dc_context_t* context);
* Prevents adding the "Device messages" and "Saved messages" chats, * Prevents adding the "Device messages" and "Saved messages" chats,
* adds Auto-Submitted header to outgoing messages * adds Auto-Submitted header to outgoing messages
* and accepts contact requests automatically (calling dc_accept_chat() is not needed for bots). * and accepts contact requests automatically (calling dc_accept_chat() is not needed for bots).
* - `last_msg_id` = database ID of the last message processed by the bot.
* This ID and IDs below it are guaranteed not to be returned
* by dc_get_next_msgs() and dc_wait_next_msgs().
* The value is updated automatically
* when dc_markseen_msgs() is called,
* but the bot can also set it manually if it processed
* the message but does not want to mark it as seen.
* For most bots calling `dc_markseen_msgs()` is the
* recommended way to update this value
* even for self-sent messages.
* - `fetch_existing_msgs` = 1=fetch most recent existing messages on configure (default), * - `fetch_existing_msgs` = 1=fetch most recent existing messages on configure (default),
* 0=do not fetch existing messages on configure. * 0=do not fetch existing messages on configure.
* In both cases, existing recipients are added to the contact database. * In both cases, existing recipients are added to the contact database.
@@ -1343,6 +1358,56 @@ int dc_estimate_deletion_cnt (dc_context_t* context, int from_ser
dc_array_t* dc_get_fresh_msgs (dc_context_t* context); dc_array_t* dc_get_fresh_msgs (dc_context_t* context);
/**
* Returns the message IDs of all messages of any chat
* with a database ID higher than `last_msg_id` config value.
*
* This function is intended for use by bots.
* Self-sent messages, device messages,
* messages from contact requests
* and muted chats are included,
* but messages from explicitly blocked contacts
* and chats are ignored.
*
* This function may be called as a part of event loop
* triggered by DC_EVENT_INCOMING_MSG if you are only interested
* in the incoming messages.
* Otherwise use a separate message processing loop
* calling dc_wait_next_msgs() in a separate thread.
*
* @memberof dc_context_t
* @param context The context object as returned from dc_context_new().
* @return An array of message IDs, must be dc_array_unref()'d when no longer used.
* On errors, the list is empty. NULL is never returned.
*/
dc_array_t* dc_get_next_msgs (dc_context_t* context);
/**
* Waits for notification of new messages
* and returns an array of new message IDs.
* See the documentation for dc_get_next_msgs()
* for the details of return value.
*
* This function waits for internal notification of
* a new message in the database and returns afterwards.
* Notification is also sent when I/O is started
* to allow processing new messages
* and when I/O is stopped using dc_stop_io() or dc_accounts_stop_io()
* to allow for manual interruption of the message processing loop.
* The function may return an empty array if there are
* no messages after notification,
* which may happen on start or if the message is quickly deleted
* after adding it to the database.
*
* @memberof dc_context_t
* @param context The context object as returned from dc_context_new().
* @return An array of message IDs, must be dc_array_unref()'d when no longer used.
* On errors, the list is empty. NULL is never returned.
*/
dc_array_t* dc_wait_next_msgs (dc_context_t* context);
/** /**
* Mark all messages in a chat as _noticed_. * Mark all messages in a chat as _noticed_.
* _Noticed_ messages are no longer _fresh_ and do not count as being unseen * _Noticed_ messages are no longer _fresh_ and do not count as being unseen
@@ -1942,6 +2007,11 @@ int dc_resend_msgs (dc_context_t* context, const uint3
* Moreover, timer is started for incoming ephemeral messages. * Moreover, timer is started for incoming ephemeral messages.
* This also happens for contact requests chats. * This also happens for contact requests chats.
* *
* This function updates last_msg_id configuration value
* to the maximum of the current value and IDs passed to this function.
* Bots which mark messages as seen can rely on this side effect
* to avoid updating last_msg_id value manually.
*
* One #DC_EVENT_MSGS_NOTICED event is emitted per modified chat. * One #DC_EVENT_MSGS_NOTICED event is emitted per modified chat.
* *
* @memberof dc_context_t * @memberof dc_context_t

View File

@@ -1280,6 +1280,50 @@ pub unsafe extern "C" fn dc_get_fresh_msgs(
}) })
} }
#[no_mangle]
pub unsafe extern "C" fn dc_get_next_msgs(context: *mut dc_context_t) -> *mut dc_array::dc_array_t {
if context.is_null() {
eprintln!("ignoring careless call to dc_get_next_msgs()");
return ptr::null_mut();
}
let ctx = &*context;
let msg_ids = block_on(ctx.get_next_msgs())
.context("failed to get next messages")
.log_err(ctx)
.unwrap_or_default();
let arr = dc_array_t::from(
msg_ids
.iter()
.map(|msg_id| msg_id.to_u32())
.collect::<Vec<u32>>(),
);
Box::into_raw(Box::new(arr))
}
#[no_mangle]
pub unsafe extern "C" fn dc_wait_next_msgs(
context: *mut dc_context_t,
) -> *mut dc_array::dc_array_t {
if context.is_null() {
eprintln!("ignoring careless call to dc_wait_next_msgs()");
return ptr::null_mut();
}
let ctx = &*context;
let msg_ids = block_on(ctx.wait_next_msgs())
.context("failed to wait for next messages")
.log_err(ctx)
.unwrap_or_default();
let arr = dc_array_t::from(
msg_ids
.iter()
.map(|msg_id| msg_id.to_u32())
.collect::<Vec<u32>>(),
);
Box::into_raw(Box::new(arr))
}
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn dc_marknoticed_chat(context: *mut dc_context_t, chat_id: u32) { pub unsafe extern "C" fn dc_marknoticed_chat(context: *mut dc_context_t, chat_id: u32) {
if context.is_null() { if context.is_null() {

View File

@@ -453,6 +453,49 @@ impl CommandApi {
ChatId::new(chat_id).get_fresh_msg_cnt(&ctx).await ChatId::new(chat_id).get_fresh_msg_cnt(&ctx).await
} }
/// Gets messages to be processed by the bot and returns their IDs.
///
/// Only messages with database ID higher than `last_msg_id` config value
/// are returned. After processing the messages, the bot should
/// update `last_msg_id` by calling [`markseen_msgs`]
/// or manually updating the value to avoid getting already
/// processed messages.
///
/// [`markseen_msgs`]: Self::markseen_msgs
async fn get_next_msgs(&self, account_id: u32) -> Result<Vec<u32>> {
let ctx = self.get_context(account_id).await?;
let msg_ids = ctx
.get_next_msgs()
.await?
.iter()
.map(|msg_id| msg_id.to_u32())
.collect();
Ok(msg_ids)
}
/// Waits for messages to be processed by the bot and returns their IDs.
///
/// This function is similar to [`get_next_msgs`],
/// but waits for internal new message notification before returning.
/// New message notification is sent when new message is added to the database,
/// on initialization, when I/O is started and when I/O is stopped.
/// This allows bots to use `wait_next_msgs` in a loop to process
/// old messages after initialization and during the bot runtime.
/// To shutdown the bot, stopping I/O can be used to interrupt
/// pending or next `wait_next_msgs` call.
///
/// [`get_next_msgs`]: Self::get_next_msgs
async fn wait_next_msgs(&self, account_id: u32) -> Result<Vec<u32>> {
let ctx = self.get_context(account_id).await?;
let msg_ids = ctx
.wait_next_msgs()
.await?
.iter()
.map(|msg_id| msg_id.to_u32())
.collect();
Ok(msg_ids)
}
/// Estimate the number of messages that will be deleted /// Estimate the number of messages that will be deleted
/// by the set_config()-options `delete_device_after` or `delete_server_after`. /// by the set_config()-options `delete_device_after` or `delete_server_after`.
/// This is typically used to show the estimated impact to the user /// This is typically used to show the estimated impact to the user
@@ -944,6 +987,11 @@ impl CommandApi {
/// Moreover, timer is started for incoming ephemeral messages. /// Moreover, timer is started for incoming ephemeral messages.
/// This also happens for contact requests chats. /// This also happens for contact requests chats.
/// ///
/// This function updates `last_msg_id` configuration value
/// to the maximum of the current value and IDs passed to this function.
/// Bots which mark messages as seen can rely on this side effect
/// to avoid updating `last_msg_id` value manually.
///
/// One #DC_EVENT_MSGS_NOTICED event is emitted per modified chat. /// One #DC_EVENT_MSGS_NOTICED event is emitted per modified chat.
async fn markseen_msgs(&self, account_id: u32, msg_ids: Vec<u32>) -> Result<()> { async fn markseen_msgs(&self, account_id: u32, msg_ids: Vec<u32>) -> Result<()> {
let ctx = self.get_context(account_id).await?; let ctx = self.get_context(account_id).await?;

View File

@@ -6,7 +6,7 @@ import asyncio
import logging import logging
import sys import sys
from deltachat_rpc_client import DeltaChat, EventType, Rpc from deltachat_rpc_client import DeltaChat, EventType, Rpc, SpecialContactId
async def main(): async def main():
@@ -30,9 +30,9 @@ async def main():
await deltachat.start_io() await deltachat.start_io()
async def process_messages(): async def process_messages():
for message in await account.get_fresh_messages_in_arrival_order(): for message in await account.get_next_messages():
snapshot = await message.get_snapshot() snapshot = await message.get_snapshot()
if not snapshot.is_bot and not snapshot.is_info: if snapshot.from_id != SpecialContactId.SELF and not snapshot.is_bot and not snapshot.is_info:
await snapshot.chat.send_text(snapshot.text) await snapshot.chat.send_text(snapshot.text)
await snapshot.message.mark_seen() await snapshot.message.mark_seen()

View File

@@ -3,7 +3,7 @@ from ._utils import AttrDict, run_bot_cli, run_client_cli
from .account import Account from .account import Account
from .chat import Chat from .chat import Chat
from .client import Bot, Client from .client import Bot, Client
from .const import EventType from .const import EventType, SpecialContactId
from .contact import Contact from .contact import Contact
from .deltachat import DeltaChat from .deltachat import DeltaChat
from .message import Message from .message import Message
@@ -19,6 +19,7 @@ __all__ = [
"DeltaChat", "DeltaChat",
"EventType", "EventType",
"Message", "Message",
"SpecialContactId",
"Rpc", "Rpc",
"run_bot_cli", "run_bot_cli",
"run_client_cli", "run_client_cli",

View File

@@ -1,5 +1,6 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import TYPE_CHECKING, List, Optional, Tuple, Union from typing import TYPE_CHECKING, List, Optional, Tuple, Union
from warnings import warn
from ._utils import AttrDict from ._utils import AttrDict
from .chat import Chat from .chat import Chat
@@ -239,7 +240,22 @@ class Account:
fresh_msg_ids = await self._rpc.get_fresh_msgs(self.id) fresh_msg_ids = await self._rpc.get_fresh_msgs(self.id)
return [Message(self, msg_id) for msg_id in fresh_msg_ids] return [Message(self, msg_id) for msg_id in fresh_msg_ids]
async def get_next_messages(self) -> List[Message]:
"""Return a list of next messages."""
next_msg_ids = await self._rpc.get_next_msgs(self.id)
return [Message(self, msg_id) for msg_id in next_msg_ids]
async def wait_next_messages(self) -> List[Message]:
"""Wait for new messages and return a list of them."""
next_msg_ids = await self._rpc.wait_next_msgs(self.id)
return [Message(self, msg_id) for msg_id in next_msg_ids]
async def get_fresh_messages_in_arrival_order(self) -> List[Message]: async def get_fresh_messages_in_arrival_order(self) -> List[Message]:
"""Return fresh messages list sorted in the order of their arrival, with ascending IDs.""" """Return fresh messages list sorted in the order of their arrival, with ascending IDs."""
warn(
"get_fresh_messages_in_arrival_order is deprecated, use get_next_messages instead.",
DeprecationWarning,
stacklevel=2,
)
fresh_msg_ids = sorted(await self._rpc.get_fresh_msgs(self.id)) fresh_msg_ids = sorted(await self._rpc.get_fresh_msgs(self.id))
return [Message(self, msg_id) for msg_id in fresh_msg_ids] return [Message(self, msg_id) for msg_id in fresh_msg_ids]

View File

@@ -20,7 +20,7 @@ from ._utils import (
parse_system_image_changed, parse_system_image_changed,
parse_system_title_changed, parse_system_title_changed,
) )
from .const import COMMAND_PREFIX, EventType, SystemMessageType from .const import COMMAND_PREFIX, EventType, SpecialContactId, SystemMessageType
from .events import ( from .events import (
EventFilter, EventFilter,
GroupImageChanged, GroupImageChanged,
@@ -189,9 +189,10 @@ class Client:
async def _process_messages(self) -> None: async def _process_messages(self) -> None:
if self._should_process_messages: if self._should_process_messages:
for message in await self.account.get_fresh_messages_in_arrival_order(): for message in await self.account.get_next_messages():
snapshot = await message.get_snapshot() snapshot = await message.get_snapshot()
await self._on_new_msg(snapshot) if snapshot.from_id not in [SpecialContactId.SELF, SpecialContactId.DEVICE]:
await self._on_new_msg(snapshot)
if snapshot.is_info and snapshot.system_message_type != SystemMessageType.WEBXDC_INFO_MESSAGE: if snapshot.is_info and snapshot.system_message_type != SystemMessageType.WEBXDC_INFO_MESSAGE:
await self._handle_info_msg(snapshot) await self._handle_info_msg(snapshot)
await snapshot.message.mark_seen() await snapshot.message.mark_seen()

View File

@@ -98,8 +98,8 @@ async def test_account(acfactory) -> None:
assert await alice.get_chatlist() assert await alice.get_chatlist()
assert await alice.get_chatlist(snapshot=True) assert await alice.get_chatlist(snapshot=True)
assert await alice.get_qr_code() assert await alice.get_qr_code()
await alice.get_fresh_messages() assert await alice.get_fresh_messages()
await alice.get_fresh_messages_in_arrival_order() assert await alice.get_next_messages()
group = await alice.create_group("test group") group = await alice.create_group("test group")
await group.add_contact(alice_contact_bob) await group.add_contact(alice_contact_bob)
@@ -305,3 +305,29 @@ async def test_bot(acfactory) -> None:
await acfactory.process_message(from_account=user, to_client=bot, text="hello") await acfactory.process_message(from_account=user, to_client=bot, text="hello")
event = await acfactory.process_message(from_account=user, to_client=bot, text="/help") event = await acfactory.process_message(from_account=user, to_client=bot, text="/help")
mock.hook.assert_called_once_with(event.msg_id) mock.hook.assert_called_once_with(event.msg_id)
@pytest.mark.asyncio()
async def test_wait_next_messages(acfactory) -> None:
alice = await acfactory.new_configured_account()
# Create a bot account so it does not receive device messages in the beginning.
bot = await acfactory.new_preconfigured_account()
await bot.set_config("bot", "1")
await bot.configure()
# There are no old messages and the call returns immediately.
assert not await bot.wait_next_messages()
# Bot starts waiting for messages.
next_messages_task = asyncio.create_task(bot.wait_next_messages())
bot_addr = await bot.get_config("addr")
alice_contact_bot = await alice.create_contact(bot_addr, "Bob")
alice_chat_bot = await alice_contact_bot.create_chat()
await alice_chat_bot.send_text("Hello!")
next_messages = await next_messages_task
assert len(next_messages) == 1
snapshot = await next_messages[0].get_snapshot()
assert snapshot.text == "Hello!"

View File

@@ -376,6 +376,22 @@ class Account:
dc_array = ffi.gc(lib.dc_get_fresh_msgs(self._dc_context), lib.dc_array_unref) dc_array = ffi.gc(lib.dc_get_fresh_msgs(self._dc_context), lib.dc_array_unref)
return (x for x in iter_array(dc_array, lambda x: Message.from_db(self, x)) if x is not None) return (x for x in iter_array(dc_array, lambda x: Message.from_db(self, x)) if x is not None)
def _wait_next_message_ids(self) -> List[int]:
"""Return IDs of all next messages from all chats."""
dc_array = ffi.gc(lib.dc_wait_next_msgs(self._dc_context), lib.dc_array_unref)
return [lib.dc_array_get_id(dc_array, i) for i in range(lib.dc_array_get_cnt(dc_array))]
def wait_next_incoming_message(self) -> Message:
"""Waits until the next incoming message
with ID higher than given is received and returns it."""
while True:
message_ids = self._wait_next_message_ids()
for msg_id in message_ids:
message = Message.from_db(self, msg_id)
if message and not message.is_from_self() and not message.is_from_device():
self.set_config("last_msg_id", str(msg_id))
return message
def create_chat(self, obj) -> Chat: def create_chat(self, obj) -> Chat:
"""Create a 1:1 chat with Account, Contact or e-mail address.""" """Create a 1:1 chat with Account, Contact or e-mail address."""
return self.create_contact(obj).create_chat() return self.create_contact(obj).create_chat()

View File

@@ -344,6 +344,16 @@ class Message:
contact_id = lib.dc_msg_get_from_id(self._dc_msg) contact_id = lib.dc_msg_get_from_id(self._dc_msg)
return Contact(self.account, contact_id) return Contact(self.account, contact_id)
def is_from_self(self):
"""Return true if the message is sent by self."""
contact_id = lib.dc_msg_get_from_id(self._dc_msg)
return contact_id == const.DC_CONTACT_ID_SELF
def is_from_device(self):
"""Return true if the message is sent by the device."""
contact_id = lib.dc_msg_get_from_id(self._dc_msg)
return contact_id == const.DC_CONTACT_ID_DEVICE
# #
# Message State query methods # Message State query methods
# #

View File

@@ -44,21 +44,21 @@ def test_configure_generate_key(acfactory, lp):
lp.sec("ac1: send unencrypted message to ac2") lp.sec("ac1: send unencrypted message to ac2")
chat.send_text("message1") chat.send_text("message1")
lp.sec("ac2: waiting for message from ac1") lp.sec("ac2: waiting for message from ac1")
msg_in = ac2._evtracker.wait_next_incoming_message() msg_in = ac2.wait_next_incoming_message()
assert msg_in.text == "message1" assert msg_in.text == "message1"
assert not msg_in.is_encrypted() assert not msg_in.is_encrypted()
lp.sec("ac2: send encrypted message to ac1") lp.sec("ac2: send encrypted message to ac1")
msg_in.chat.send_text("message2") msg_in.chat.send_text("message2")
lp.sec("ac1: waiting for message from ac2") lp.sec("ac1: waiting for message from ac2")
msg2_in = ac1._evtracker.wait_next_incoming_message() msg2_in = ac1.wait_next_incoming_message()
assert msg2_in.text == "message2" assert msg2_in.text == "message2"
assert msg2_in.is_encrypted() assert msg2_in.is_encrypted()
lp.sec("ac1: send encrypted message to ac2") lp.sec("ac1: send encrypted message to ac2")
msg2_in.chat.send_text("message3") msg2_in.chat.send_text("message3")
lp.sec("ac2: waiting for message from ac1") lp.sec("ac2: waiting for message from ac1")
msg3_in = ac2._evtracker.wait_next_incoming_message() msg3_in = ac2.wait_next_incoming_message()
assert msg3_in.text == "message3" assert msg3_in.text == "message3"
assert msg3_in.is_encrypted() assert msg3_in.is_encrypted()

View File

@@ -1666,6 +1666,7 @@ impl Chat {
], ],
) )
.await?; .await?;
context.new_msgs_notify.notify_one();
msg.id = MsgId::new(u32::try_from(raw_id)?); msg.id = MsgId::new(u32::try_from(raw_id)?);
maybe_set_logging_xdc(context, msg, self.id).await?; maybe_set_logging_xdc(context, msg, self.id).await?;
@@ -3628,6 +3629,7 @@ pub async fn add_device_msg_with_importance(
), ),
) )
.await?; .await?;
context.new_msgs_notify.notify_one();
msg_id = MsgId::new(u32::try_from(row_id)?); msg_id = MsgId::new(u32::try_from(row_id)?);
if !msg.hidden { if !msg.hidden {
@@ -3741,6 +3743,7 @@ pub(crate) async fn add_info_msg_with_cmd(
parent.map(|msg|msg.rfc724_mid.clone()).unwrap_or_default() parent.map(|msg|msg.rfc724_mid.clone()).unwrap_or_default()
) )
).await?; ).await?;
context.new_msgs_notify.notify_one();
let msg_id = MsgId::new(row_id.try_into()?); let msg_id = MsgId::new(row_id.try_into()?);
context.emit_msgs_changed(chat_id, msg_id); context.emit_msgs_changed(chat_id, msg_id);

View File

@@ -308,6 +308,9 @@ pub enum Config {
/// This value is used internally to remember the MsgId of the logging xdc /// This value is used internally to remember the MsgId of the logging xdc
#[strum(props(default = "0"))] #[strum(props(default = "0"))]
DebugLogging, DebugLogging,
/// Last message processed by the bot.
LastMsgId,
} }
impl Context { impl Context {
@@ -358,6 +361,11 @@ impl Context {
Ok(self.get_config_parsed(key).await?.unwrap_or_default()) Ok(self.get_config_parsed(key).await?.unwrap_or_default())
} }
/// Returns 32-bit unsigned integer configuration value for the given key.
pub async fn get_config_u32(&self, key: Config) -> Result<u32> {
Ok(self.get_config_parsed(key).await?.unwrap_or_default())
}
/// Returns 64-bit signed integer configuration value for the given key. /// Returns 64-bit signed integer configuration value for the given key.
pub async fn get_config_i64(&self, key: Config) -> Result<i64> { pub async fn get_config_i64(&self, key: Config) -> Result<i64> {
Ok(self.get_config_parsed(key).await?.unwrap_or_default()) Ok(self.get_config_parsed(key).await?.unwrap_or_default())
@@ -459,6 +467,12 @@ impl Context {
Ok(()) Ok(())
} }
/// Set the given config to an unsigned 32-bit integer value.
pub async fn set_config_u32(&self, key: Config, value: u32) -> Result<()> {
self.set_config(key, Some(&value.to_string())).await?;
Ok(())
}
/// Set the given config to a boolean value. /// Set the given config to a boolean value.
pub async fn set_config_bool(&self, key: Config, value: bool) -> Result<()> { pub async fn set_config_bool(&self, key: Config, value: bool) -> Result<()> {
self.set_config(key, if value { Some("1") } else { Some("0") }) self.set_config(key, if value { Some("1") } else { Some("0") })

View File

@@ -11,7 +11,7 @@ use std::time::{Duration, Instant, SystemTime};
use anyhow::{bail, ensure, Context as _, Result}; use anyhow::{bail, ensure, Context as _, Result};
use async_channel::{self as channel, Receiver, Sender}; use async_channel::{self as channel, Receiver, Sender};
use ratelimit::Ratelimit; use ratelimit::Ratelimit;
use tokio::sync::{Mutex, RwLock}; use tokio::sync::{Mutex, Notify, RwLock};
use tokio::task; use tokio::task;
use crate::chat::{get_chat_cnt, ChatId}; use crate::chat::{get_chat_cnt, ChatId};
@@ -218,6 +218,11 @@ pub struct InnerContext {
/// IMAP UID resync request. /// IMAP UID resync request.
pub(crate) resync_request: AtomicBool, pub(crate) resync_request: AtomicBool,
/// Notify about new messages.
///
/// This causes [`Context::wait_next_msgs`] to wake up.
pub(crate) new_msgs_notify: Notify,
/// Server ID response if ID capability is supported /// Server ID response if ID capability is supported
/// and the server returned non-NIL on the inbox connection. /// and the server returned non-NIL on the inbox connection.
/// <https://datatracker.ietf.org/doc/html/rfc2971> /// <https://datatracker.ietf.org/doc/html/rfc2971>
@@ -363,6 +368,11 @@ impl Context {
blobdir.display() blobdir.display()
); );
let new_msgs_notify = Notify::new();
// Notify once immediately to allow processing old messages
// without starting I/O.
new_msgs_notify.notify_one();
let inner = InnerContext { let inner = InnerContext {
id, id,
blobdir, blobdir,
@@ -379,6 +389,7 @@ impl Context {
quota: RwLock::new(None), quota: RwLock::new(None),
quota_update_request: AtomicBool::new(false), quota_update_request: AtomicBool::new(false),
resync_request: AtomicBool::new(false), resync_request: AtomicBool::new(false),
new_msgs_notify,
server_id: RwLock::new(None), server_id: RwLock::new(None),
creation_time: std::time::SystemTime::now(), creation_time: std::time::SystemTime::now(),
last_full_folder_scan: Mutex::new(None), last_full_folder_scan: Mutex::new(None),
@@ -767,6 +778,10 @@ impl Context {
"debug_logging", "debug_logging",
self.get_config_int(Config::DebugLogging).await?.to_string(), self.get_config_int(Config::DebugLogging).await?.to_string(),
); );
res.insert(
"last_msg_id",
self.get_config_int(Config::LastMsgId).await?.to_string(),
);
let elapsed = self.creation_time.elapsed(); let elapsed = self.creation_time.elapsed();
res.insert("uptime", duration_to_str(elapsed.unwrap_or_default())); res.insert("uptime", duration_to_str(elapsed.unwrap_or_default()));
@@ -813,6 +828,66 @@ impl Context {
Ok(list) Ok(list)
} }
/// Returns a list of messages with database ID higher than requested.
///
/// Blocked contacts and chats are excluded,
/// but self-sent messages and contact requests are included in the results.
pub async fn get_next_msgs(&self) -> Result<Vec<MsgId>> {
let last_msg_id = match self.get_config(Config::LastMsgId).await? {
Some(s) => MsgId::new(s.parse()?),
None => MsgId::new_unset(),
};
let list = self
.sql
.query_map(
"SELECT m.id
FROM msgs m
LEFT JOIN contacts ct
ON m.from_id=ct.id
LEFT JOIN chats c
ON m.chat_id=c.id
WHERE m.id>?
AND m.hidden=0
AND m.chat_id>9
AND ct.blocked=0
AND c.blocked!=1
ORDER BY m.id ASC",
(
last_msg_id.to_u32(), // Explicitly convert to u32 because 0 is allowed.
),
|row| {
let msg_id: MsgId = row.get(0)?;
Ok(msg_id)
},
|rows| {
let mut list = Vec::new();
for row in rows {
list.push(row?);
}
Ok(list)
},
)
.await?;
Ok(list)
}
/// Returns a list of messages with database ID higher than last marked as seen.
///
/// This function is supposed to be used by bot to request messages
/// that are not processed yet.
///
/// Waits for notification and returns a result.
/// Note that the result may be empty if the message is deleted
/// shortly after notification or notification is manually triggered
/// to interrupt waiting.
/// Notification may be manually triggered by calling [`Self::stop_io`].
pub async fn wait_next_msgs(&self) -> Result<Vec<MsgId>> {
self.new_msgs_notify.notified().await;
let list = self.get_next_msgs().await?;
Ok(list)
}
/// Searches for messages containing the query string. /// Searches for messages containing the query string.
/// ///
/// If `chat_id` is provided this searches only for messages in this chat, if `chat_id` /// If `chat_id` is provided this searches only for messages in this chat, if `chat_id`
@@ -1444,4 +1519,38 @@ mod tests {
Ok(()) Ok(())
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_get_next_msgs() -> Result<()> {
let alice = TestContext::new_alice().await;
let bob = TestContext::new_bob().await;
let alice_chat = alice.create_chat(&bob).await;
assert!(alice.get_next_msgs().await?.is_empty());
assert!(bob.get_next_msgs().await?.is_empty());
let sent_msg = alice.send_text(alice_chat.id, "Hi Bob").await;
let received_msg = bob.recv_msg(&sent_msg).await;
let bob_next_msg_ids = bob.get_next_msgs().await?;
assert_eq!(bob_next_msg_ids.len(), 1);
assert_eq!(bob_next_msg_ids.get(0), Some(&received_msg.id));
bob.set_config_u32(Config::LastMsgId, received_msg.id.to_u32())
.await?;
assert!(bob.get_next_msgs().await?.is_empty());
// Next messages include self-sent messages.
let alice_next_msg_ids = alice.get_next_msgs().await?;
assert_eq!(alice_next_msg_ids.len(), 1);
assert_eq!(alice_next_msg_ids.get(0), Some(&sent_msg.sender_msg_id));
alice
.set_config_u32(Config::LastMsgId, sent_msg.sender_msg_id.to_u32())
.await?;
assert!(alice.get_next_msgs().await?.is_empty());
Ok(())
}
} }

View File

@@ -1468,6 +1468,12 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> Result<()>
return Ok(()); return Ok(());
} }
let old_last_msg_id = MsgId::new(context.get_config_u32(Config::LastMsgId).await?);
let last_msg_id = msg_ids.iter().fold(&old_last_msg_id, std::cmp::max);
context
.set_config_u32(Config::LastMsgId, last_msg_id.to_u32())
.await?;
let msgs = context let msgs = context
.sql .sql
.query_map( .query_map(

View File

@@ -363,6 +363,7 @@ pub(crate) async fn receive_imf_inner(
chat_id.emit_msg_event(context, *msg_id, incoming && fresh); chat_id.emit_msg_event(context, *msg_id, incoming && fresh);
} }
} }
context.new_msgs_notify.notify_one();
mime_parser mime_parser
.handle_reports(context, from_id, sent_timestamp, &mime_parser.parts) .handle_reports(context, from_id, sent_timestamp, &mime_parser.parts)

View File

@@ -62,6 +62,11 @@ impl SchedulerState {
/// Starts the scheduler if it is not yet started. /// Starts the scheduler if it is not yet started.
async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) { async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) {
info!(context, "starting IO"); info!(context, "starting IO");
// Notify message processing loop
// to allow processing old messages after restart.
context.new_msgs_notify.notify_one();
let ctx = context.clone(); let ctx = context.clone();
match Scheduler::start(context).await { match Scheduler::start(context).await {
Ok(scheduler) => *inner = InnerSchedulerState::Started(scheduler), Ok(scheduler) => *inner = InnerSchedulerState::Started(scheduler),
@@ -95,6 +100,11 @@ impl SchedulerState {
// to terminate on receiving the next event and then call stop_io() // to terminate on receiving the next event and then call stop_io()
// which will emit the below event(s) // which will emit the below event(s)
info!(context, "stopping IO"); info!(context, "stopping IO");
// Wake up message processing loop even if there are no messages
// to allow for clean shutdown.
context.new_msgs_notify.notify_one();
if let Some(debug_logging) = context.debug_logging.read().await.as_ref() { if let Some(debug_logging) = context.debug_logging.read().await.as_ref() {
debug_logging.loop_handle.abort(); debug_logging.loop_handle.abort();
} }