Merge branch 'master' into flub/ongoing-guard

This commit is contained in:
Floris Bruynooghe
2023-04-16 17:11:05 +02:00
70 changed files with 1775 additions and 903 deletions

View File

@@ -14,7 +14,7 @@ use std::time::{Duration, Instant, SystemTime};
use anyhow::{bail, ensure, Context as _, Result};
use async_channel::Sender;
use ratelimit::Ratelimit;
use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::sync::{oneshot, Mutex, Notify, RwLock};
use tokio::task;
use crate::chat::{get_chat_cnt, ChatId};
@@ -221,6 +221,11 @@ pub struct InnerContext {
/// IMAP UID resync request.
pub(crate) resync_request: AtomicBool,
/// Notify about new messages.
///
/// This causes [`Context::wait_next_msgs`] to wake up.
pub(crate) new_msgs_notify: Notify,
/// Server ID response if ID capability is supported
/// and the server returned non-NIL on the inbox connection.
/// <https://datatracker.ietf.org/doc/html/rfc2971>
@@ -251,8 +256,8 @@ pub(crate) struct DebugLogging {
pub(crate) msg_id: MsgId,
/// Handle to the background task responsible for sending
pub(crate) loop_handle: task::JoinHandle<()>,
/// Channel that log events should be send to
/// A background loop will receive and handle them
/// Channel that log events should be sent to.
/// A background loop will receive and handle them.
pub(crate) sender: Sender<DebugEventLogData>,
}
@@ -366,6 +371,11 @@ impl Context {
blobdir.display()
);
let new_msgs_notify = Notify::new();
// Notify once immediately to allow processing old messages
// without starting I/O.
new_msgs_notify.notify_one();
let inner = InnerContext {
id,
blobdir,
@@ -382,6 +392,7 @@ impl Context {
quota: RwLock::new(None),
quota_update_request: AtomicBool::new(false),
resync_request: AtomicBool::new(false),
new_msgs_notify,
server_id: RwLock::new(None),
creation_time: std::time::SystemTime::now(),
last_full_folder_scan: Mutex::new(None),
@@ -782,6 +793,10 @@ impl Context {
"debug_logging",
self.get_config_int(Config::DebugLogging).await?.to_string(),
);
res.insert(
"last_msg_id",
self.get_config_int(Config::LastMsgId).await?.to_string(),
);
let elapsed = self.creation_time.elapsed();
res.insert("uptime", duration_to_str(elapsed.unwrap_or_default()));
@@ -814,7 +829,7 @@ impl Context {
" AND NOT(c.muted_until=-1 OR c.muted_until>?)",
" ORDER BY m.timestamp DESC,m.id DESC;"
),
paramsv![MessageState::InFresh, time()],
(MessageState::InFresh, time()),
|row| row.get::<_, MsgId>(0),
|rows| {
let mut list = Vec::new();
@@ -828,6 +843,66 @@ impl Context {
Ok(list)
}
/// Returns a list of messages with database ID higher than requested.
///
/// Blocked contacts and chats are excluded,
/// but self-sent messages and contact requests are included in the results.
pub async fn get_next_msgs(&self) -> Result<Vec<MsgId>> {
let last_msg_id = match self.get_config(Config::LastMsgId).await? {
Some(s) => MsgId::new(s.parse()?),
None => MsgId::new_unset(),
};
let list = self
.sql
.query_map(
"SELECT m.id
FROM msgs m
LEFT JOIN contacts ct
ON m.from_id=ct.id
LEFT JOIN chats c
ON m.chat_id=c.id
WHERE m.id>?
AND m.hidden=0
AND m.chat_id>9
AND ct.blocked=0
AND c.blocked!=1
ORDER BY m.id ASC",
(
last_msg_id.to_u32(), // Explicitly convert to u32 because 0 is allowed.
),
|row| {
let msg_id: MsgId = row.get(0)?;
Ok(msg_id)
},
|rows| {
let mut list = Vec::new();
for row in rows {
list.push(row?);
}
Ok(list)
},
)
.await?;
Ok(list)
}
/// Returns a list of messages with database ID higher than last marked as seen.
///
/// This function is supposed to be used by bot to request messages
/// that are not processed yet.
///
/// Waits for notification and returns a result.
/// Note that the result may be empty if the message is deleted
/// shortly after notification or notification is manually triggered
/// to interrupt waiting.
/// Notification may be manually triggered by calling [`Self::stop_io`].
pub async fn wait_next_msgs(&self) -> Result<Vec<MsgId>> {
self.new_msgs_notify.notified().await;
let list = self.get_next_msgs().await?;
Ok(list)
}
/// Searches for messages containing the query string.
///
/// If `chat_id` is provided this searches only for messages in this chat, if `chat_id`
@@ -839,24 +914,10 @@ impl Context {
}
let str_like_in_text = format!("%{real_query}%");
let do_query = |query, params| {
self.sql.query_map(
query,
params,
|row| row.get::<_, MsgId>("id"),
|rows| {
let mut ret = Vec::new();
for id in rows {
ret.push(id?);
}
Ok(ret)
},
)
};
let list = if let Some(chat_id) = chat_id {
do_query(
"SELECT m.id AS id
self.sql
.query_map(
"SELECT m.id AS id
FROM msgs m
LEFT JOIN contacts ct
ON m.from_id=ct.id
@@ -865,9 +926,17 @@ impl Context {
AND ct.blocked=0
AND txt LIKE ?
ORDER BY m.timestamp,m.id;",
paramsv![chat_id, str_like_in_text],
)
.await?
(chat_id, str_like_in_text),
|row| row.get::<_, MsgId>("id"),
|rows| {
let mut ret = Vec::new();
for id in rows {
ret.push(id?);
}
Ok(ret)
},
)
.await?
} else {
// For performance reasons results are sorted only by `id`, that is in the order of
// message reception.
@@ -879,8 +948,9 @@ impl Context {
// of unwanted results that are discarded moments later, we added `LIMIT 1000`.
// According to some tests, this limit speeds up eg. 2 character searches by factor 10.
// The limit is documented and UI may add a hint when getting 1000 results.
do_query(
"SELECT m.id AS id
self.sql
.query_map(
"SELECT m.id AS id
FROM msgs m
LEFT JOIN contacts ct
ON m.from_id=ct.id
@@ -892,9 +962,17 @@ impl Context {
AND ct.blocked=0
AND m.txt LIKE ?
ORDER BY m.id DESC LIMIT 1000",
paramsv![str_like_in_text],
)
.await?
(str_like_in_text,),
|row| row.get::<_, MsgId>("id"),
|rows| {
let mut ret = Vec::new();
for id in rows {
ret.push(id?);
}
Ok(ret)
},
)
.await?
};
Ok(list)
@@ -1152,7 +1230,7 @@ mod tests {
t.sql
.execute(
"UPDATE chats SET muted_until=? WHERE id=?;",
paramsv![time() - 3600, bob.id],
(time() - 3600, bob.id),
)
.await
.unwrap();
@@ -1169,10 +1247,7 @@ mod tests {
// to test get_fresh_msgs() with invalid mute_until (everything < -1),
// that results in "muted forever" by definition.
t.sql
.execute(
"UPDATE chats SET muted_until=-2 WHERE id=?;",
paramsv![bob.id],
)
.execute("UPDATE chats SET muted_until=-2 WHERE id=?;", (bob.id,))
.await
.unwrap();
let bob = Chat::load_from_db(&t, bob.id).await.unwrap();
@@ -1521,4 +1596,38 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_get_next_msgs() -> Result<()> {
let alice = TestContext::new_alice().await;
let bob = TestContext::new_bob().await;
let alice_chat = alice.create_chat(&bob).await;
assert!(alice.get_next_msgs().await?.is_empty());
assert!(bob.get_next_msgs().await?.is_empty());
let sent_msg = alice.send_text(alice_chat.id, "Hi Bob").await;
let received_msg = bob.recv_msg(&sent_msg).await;
let bob_next_msg_ids = bob.get_next_msgs().await?;
assert_eq!(bob_next_msg_ids.len(), 1);
assert_eq!(bob_next_msg_ids.get(0), Some(&received_msg.id));
bob.set_config_u32(Config::LastMsgId, received_msg.id.to_u32())
.await?;
assert!(bob.get_next_msgs().await?.is_empty());
// Next messages include self-sent messages.
let alice_next_msg_ids = alice.get_next_msgs().await?;
assert_eq!(alice_next_msg_ids.len(), 1);
assert_eq!(alice_next_msg_ids.get(0), Some(&sent_msg.sender_msg_id));
alice
.set_config_u32(Config::LastMsgId, sent_msg.sender_msg_id.to_u32())
.await?;
assert!(alice.get_next_msgs().await?.is_empty());
Ok(())
}
}