refactor(sql): add query_map_vec()

This also replaces some cases where flatten()
was used, effectively ignoring the errors.
This commit is contained in:
link2xt
2025-10-24 00:58:58 +00:00
committed by l
parent 45a1d81805
commit 5f3948b462
16 changed files with 83 additions and 203 deletions

View File

@@ -925,9 +925,9 @@ impl ChatId {
/// Chat is considered active if something was posted there within the last 42 days.
pub async fn get_similar_chat_ids(self, context: &Context) -> Result<Vec<(ChatId, f64)>> {
// Count number of common members in this and other chats.
let intersection: Vec<(ChatId, f64)> = context
let intersection = context
.sql
.query_map(
.query_map_vec(
"SELECT y.chat_id, SUM(x.contact_id = y.contact_id)
FROM chats_contacts as x
JOIN chats_contacts as y
@@ -945,10 +945,6 @@ impl ChatId {
let intersection: f64 = row.get(1)?;
Ok((chat_id, intersection))
},
|rows| {
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.await
.context("failed to calculate member set intersections")?;
@@ -2010,7 +2006,7 @@ impl Chat {
let self_fp = self_fingerprint(context).await?;
let fingerprint_addrs = context
.sql
.query_map(
.query_map_vec(
"SELECT c.id, c.fingerprint, c.addr
FROM contacts c INNER JOIN chats_contacts cc
ON c.id=cc.contact_id
@@ -2024,7 +2020,6 @@ impl Chat {
let addr = row.get(2)?;
Ok((fingerprint, addr))
},
|addrs| addrs.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await?;
self.sync(context, SyncAction::SetPgpContacts(fingerprint_addrs))
@@ -2032,14 +2027,13 @@ impl Chat {
} else {
let addrs = context
.sql
.query_map(
.query_map_vec(
"SELECT c.addr \
FROM contacts c INNER JOIN chats_contacts cc \
ON c.id=cc.contact_id \
WHERE cc.chat_id=? AND cc.add_timestamp >= cc.remove_timestamp",
(self.id,),
|row| row.get::<_, String>(0),
|addrs| addrs.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await?;
self.sync(context, SyncAction::SetContacts(addrs)).await?;
@@ -3125,13 +3119,12 @@ pub async fn marknoticed_chat(context: &Context, chat_id: ChatId) -> Result<()>
if chat_id.is_archived_link() {
let chat_ids_in_archive = context
.sql
.query_map(
.query_map_vec(
"SELECT DISTINCT(m.chat_id) FROM msgs m
LEFT JOIN chats c ON m.chat_id=c.id
WHERE m.state=10 AND m.hidden=0 AND m.chat_id>9 AND c.archived=1",
(),
|row| row.get::<_, ChatId>(0),
|ids| ids.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await?;
if chat_ids_in_archive.is_empty() {
@@ -3175,7 +3168,7 @@ pub async fn marknoticed_chat(context: &Context, chat_id: ChatId) -> Result<()>
// locally (i.e. when the chat was opened locally).
let hidden_messages = context
.sql
.query_map(
.query_map_vec(
"SELECT id, rfc724_mid FROM msgs
WHERE state=?
AND hidden=1
@@ -3187,10 +3180,6 @@ pub async fn marknoticed_chat(context: &Context, chat_id: ChatId) -> Result<()>
let rfc724_mid: String = row.get(1)?;
Ok((msg_id, rfc724_mid))
},
|rows| {
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.await?;
for (msg_id, rfc724_mid) in &hidden_messages {
@@ -3299,7 +3288,7 @@ pub async fn get_chat_media(
{
context
.sql
.query_map(
.query_map_vec(
"SELECT id
FROM msgs
WHERE (1=? OR chat_id=?)
@@ -3314,13 +3303,12 @@ pub async fn get_chat_media(
Viewtype::Webxdc,
),
|row| row.get::<_, MsgId>(0),
|ids| Ok(ids.flatten().collect()),
)
.await?
} else {
context
.sql
.query_map(
.query_map_vec(
"SELECT id
FROM msgs
WHERE (1=? OR chat_id=?)
@@ -3345,7 +3333,6 @@ pub async fn get_chat_media(
},
),
|row| row.get::<_, MsgId>(0),
|ids| Ok(ids.flatten().collect()),
)
.await?
};
@@ -3356,10 +3343,9 @@ pub async fn get_chat_media(
pub async fn get_chat_contacts(context: &Context, chat_id: ChatId) -> Result<Vec<ContactId>> {
// Normal chats do not include SELF. Group chats do (as it may happen that one is deleted from a
// groupchat but the chats stays visible, moreover, this makes displaying lists easier)
let list = context
context
.sql
.query_map(
.query_map_vec(
"SELECT cc.contact_id
FROM chats_contacts cc
LEFT JOIN contacts c
@@ -3368,11 +3354,8 @@ pub async fn get_chat_contacts(context: &Context, chat_id: ChatId) -> Result<Vec
ORDER BY c.id=1, c.last_seen DESC, c.id DESC;",
(chat_id,),
|row| row.get::<_, ContactId>(0),
|ids| ids.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await?;
Ok(list)
.await
}
/// Returns a vector of contact IDs for given chat ID that are no longer part of the group.
@@ -3380,9 +3363,9 @@ pub async fn get_chat_contacts(context: &Context, chat_id: ChatId) -> Result<Vec
/// Members that have been removed recently are in the beginning of the list.
pub async fn get_past_chat_contacts(context: &Context, chat_id: ChatId) -> Result<Vec<ContactId>> {
let now = time();
let list = context
context
.sql
.query_map(
.query_map_vec(
"SELECT cc.contact_id
FROM chats_contacts cc
LEFT JOIN contacts c
@@ -3393,11 +3376,8 @@ pub async fn get_past_chat_contacts(context: &Context, chat_id: ChatId) -> Resul
ORDER BY c.id=1, cc.remove_timestamp DESC, c.id DESC",
(chat_id, now.saturating_sub(60 * 24 * 3600)),
|row| row.get::<_, ContactId>(0),
|ids| ids.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await?;
Ok(list)
.await
}
/// Creates an encrypted group chat.

View File

@@ -1282,14 +1282,10 @@ impl Contact {
let list = context
.sql
.query_map(
.query_map_vec(
"SELECT id FROM contacts WHERE id>? AND blocked!=0 ORDER BY last_seen DESC, id DESC;",
(ContactId::LAST_SPECIAL,),
|row| row.get::<_, ContactId>(0),
|ids| {
ids.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.await?;
Ok(list)

View File

@@ -1095,7 +1095,7 @@ impl Context {
pub async fn get_fresh_msgs(&self) -> Result<Vec<MsgId>> {
let list = self
.sql
.query_map(
.query_map_vec(
concat!(
"SELECT m.id",
" FROM msgs m",
@@ -1113,13 +1113,6 @@ impl Context {
),
(MessageState::InFresh, time()),
|row| row.get::<_, MsgId>(0),
|rows| {
let mut list = Vec::new();
for row in rows {
list.push(row?);
}
Ok(list)
},
)
.await?;
Ok(list)
@@ -1152,7 +1145,7 @@ impl Context {
let list = self
.sql
.query_map(
.query_map_vec(
"SELECT m.id
FROM msgs m
LEFT JOIN contacts ct
@@ -1172,13 +1165,6 @@ impl Context {
let msg_id: MsgId = row.get(0)?;
Ok(msg_id)
},
|rows| {
let mut list = Vec::new();
for row in rows {
list.push(row?);
}
Ok(list)
},
)
.await?;
Ok(list)
@@ -1219,7 +1205,7 @@ impl Context {
let list = if let Some(chat_id) = chat_id {
self.sql
.query_map(
.query_map_vec(
"SELECT m.id AS id
FROM msgs m
LEFT JOIN contacts ct
@@ -1231,13 +1217,6 @@ impl Context {
ORDER BY m.timestamp,m.id;",
(chat_id, str_like_in_text),
|row| row.get::<_, MsgId>("id"),
|rows| {
let mut ret = Vec::new();
for id in rows {
ret.push(id?);
}
Ok(ret)
},
)
.await?
} else {
@@ -1252,7 +1231,7 @@ impl Context {
// According to some tests, this limit speeds up eg. 2 character searches by factor 10.
// The limit is documented and UI may add a hint when getting 1000 results.
self.sql
.query_map(
.query_map_vec(
"SELECT m.id AS id
FROM msgs m
LEFT JOIN contacts ct
@@ -1267,13 +1246,6 @@ impl Context {
ORDER BY m.id DESC LIMIT 1000",
(str_like_in_text,),
|row| row.get::<_, MsgId>("id"),
|rows| {
let mut ret = Vec::new();
for id in rows {
ret.push(id?);
}
Ok(ret)
},
)
.await?
};

View File

@@ -386,7 +386,7 @@ async fn select_expired_messages(
) -> Result<Vec<(MsgId, ChatId, Viewtype, u32)>> {
let mut rows = context
.sql
.query_map(
.query_map_vec(
r#"
SELECT id, chat_id, type, location_id
FROM msgs
@@ -407,7 +407,6 @@ WHERE
let location_id: u32 = row.get("location_id")?;
Ok((id, chat_id, viewtype, location_id))
},
|rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await?;
@@ -425,7 +424,7 @@ WHERE
let rows_expired = context
.sql
.query_map(
.query_map_vec(
r#"
SELECT id, chat_id, type, location_id
FROM msgs
@@ -453,7 +452,6 @@ WHERE
let location_id: u32 = row.get("location_id")?;
Ok((id, chat_id, viewtype, location_id))
},
|rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await?;

View File

@@ -1036,7 +1036,7 @@ impl Session {
async fn move_delete_messages(&mut self, context: &Context, folder: &str) -> Result<()> {
let rows = context
.sql
.query_map(
.query_map_vec(
"SELECT id, uid, target FROM imap
WHERE folder = ?
AND target != folder
@@ -1048,7 +1048,6 @@ impl Session {
let target: String = row.get(2)?;
Ok((rowid, uid, target))
},
|rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await?;
@@ -1139,7 +1138,7 @@ impl Session {
pub(crate) async fn store_seen_flags_on_imap(&mut self, context: &Context) -> Result<()> {
let rows = context
.sql
.query_map(
.query_map_vec(
"SELECT imap.id, uid, folder FROM imap, imap_markseen
WHERE imap.id = imap_markseen.id AND target = folder
ORDER BY folder, uid",
@@ -1150,7 +1149,6 @@ impl Session {
let folder: String = row.get(2)?;
Ok((rowid, uid, folder))
},
|rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await?;

View File

@@ -648,7 +648,7 @@ async fn export_self_keys(context: &Context, dir: &Path) -> Result<()> {
let keys = context
.sql
.query_map(
.query_map_vec(
"SELECT id, public_key, private_key, id=(SELECT value FROM config WHERE keyname='key_id') FROM keypairs;",
(),
|row| {
@@ -661,10 +661,6 @@ async fn export_self_keys(context: &Context, dir: &Path) -> Result<()> {
Ok((id, public_key, private_key, is_default))
},
|keys| {
keys.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.await?;
let self_addr = context.get_primary_self_addr().await?;

View File

@@ -345,15 +345,10 @@ pub async fn set(context: &Context, latitude: f64, longitude: f64, accuracy: f64
let chats = context
.sql
.query_map(
.query_map_vec(
"SELECT id FROM chats WHERE locations_send_until>?;",
(now,),
|row| row.get::<_, i32>(0),
|chats| {
chats
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.await?;
@@ -408,7 +403,7 @@ pub async fn get_range(
};
let list = context
.sql
.query_map(
.query_map_vec(
"SELECT l.id, l.latitude, l.longitude, l.accuracy, l.timestamp, l.independent, \
COALESCE(m.id, 0) AS msg_id, l.from_id, l.chat_id, COALESCE(m.txt, '') AS txt \
FROM locations l LEFT JOIN msgs m ON l.id=m.location_id WHERE (? OR l.chat_id=?) \
@@ -445,14 +440,6 @@ pub async fn get_range(
};
Ok(loc)
},
|locations| {
let mut ret = Vec::new();
for location in locations {
ret.push(location?);
}
Ok(ret)
},
)
.await?;
Ok(list)
@@ -768,7 +755,7 @@ async fn maybe_send_locations(context: &Context) -> Result<Option<u64>> {
let now = time();
let rows = context
.sql
.query_map(
.query_map_vec(
"SELECT id, locations_send_begin, locations_send_until, locations_last_sent
FROM chats
WHERE locations_send_until>0",
@@ -785,10 +772,6 @@ async fn maybe_send_locations(context: &Context) -> Result<Option<u64>> {
locations_last_sent,
))
},
|rows| {
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.await
.context("failed to query location streaming chats")?;

View File

@@ -170,7 +170,7 @@ impl MsgId {
) -> Result<Vec<String>> {
context
.sql
.query_map(
.query_map_vec(
"SELECT folder, uid FROM imap WHERE rfc724_mid=?",
(rfc724_mid,),
|row| {
@@ -178,10 +178,6 @@ impl MsgId {
let uid: u32 = row.get("uid")?;
Ok(format!("</{folder}/;UID={uid}>"))
},
|rows| {
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.await
}
@@ -240,7 +236,7 @@ impl MsgId {
if let Ok(rows) = context
.sql
.query_map(
.query_map_vec(
"SELECT contact_id, timestamp_sent FROM msgs_mdns WHERE msg_id=?",
(self,),
|row| {
@@ -248,7 +244,6 @@ impl MsgId {
let ts: i64 = row.get(1)?;
Ok((contact_id, ts))
},
|rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await
{
@@ -1426,7 +1421,7 @@ pub async fn get_msg_read_receipts(
) -> Result<Vec<(ContactId, i64)>> {
context
.sql
.query_map(
.query_map_vec(
"SELECT contact_id, timestamp_sent FROM msgs_mdns WHERE msg_id=?",
(msg_id,),
|row| {
@@ -1434,7 +1429,6 @@ pub async fn get_msg_read_receipts(
let ts: i64 = row.get(1)?;
Ok((contact_id, ts))
},
|rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await
}

View File

@@ -2432,9 +2432,9 @@ async fn handle_ndn(
// The NDN might be for a message-id that had attachments and was sent from a non-Delta Chat client.
// In this case we need to mark multiple "msgids" as failed that all refer to the same message-id.
let msgs: Vec<_> = context
let msg_ids = context
.sql
.query_map(
.query_map_vec(
"SELECT id FROM msgs
WHERE rfc724_mid=? AND from_id=1",
(&failed.rfc724_mid,),
@@ -2442,7 +2442,6 @@ async fn handle_ndn(
let msg_id: MsgId = row.get(0)?;
Ok(msg_id)
},
|rows| Ok(rows.collect::<Vec<_>>()),
)
.await?;
@@ -2453,8 +2452,7 @@ async fn handle_ndn(
};
let err_msg = &error;
for msg in msgs {
let msg_id = msg?;
for msg_id in msg_ids {
let mut message = Message::load_from_db(context, msg_id).await?;
let aggregated_error = message
.error

View File

@@ -630,7 +630,7 @@ async fn lookup_cache(
let mut res = Vec::new();
for cached_address in context
.sql
.query_map(
.query_map_vec(
"SELECT dns_cache.address
FROM dns_cache
LEFT JOIN connection_history
@@ -647,10 +647,6 @@ async fn lookup_cache(
let address: String = row.get(0)?;
Ok(address)
},
|rows| {
rows.collect::<std::result::Result<Vec<String>, _>>()
.map_err(Into::into)
},
)
.await?
{

View File

@@ -328,20 +328,11 @@ pub async fn get_msg_reactions(context: &Context, msg_id: MsgId) -> Result<React
|row| {
let contact_id: ContactId = row.get(0)?;
let reaction: String = row.get(1)?;
Ok((contact_id, reaction))
},
|rows| {
let mut reactions = Vec::new();
for row in rows {
let (contact_id, reaction) = row?;
reactions.push((contact_id, Reaction::from(reaction.as_str())));
}
Ok(reactions)
Ok((contact_id, Reaction::from(reaction.as_str())))
},
|rows| Ok(rows.collect::<rusqlite::Result<BTreeMap<_, _>>>()?),
)
.await?
.into_iter()
.collect();
.await?;
Ok(Reactions { reactions })
}

View File

@@ -349,19 +349,10 @@ pub(crate) struct Scheduler {
async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
let msg_ids = context
.sql
.query_map(
"SELECT msg_id FROM download",
(),
|row| {
let msg_id: MsgId = row.get(0)?;
Ok(msg_id)
},
|rowids| {
rowids
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.query_map_vec("SELECT msg_id FROM download", (), |row| {
let msg_id: MsgId = row.get(0)?;
Ok(msg_id)
})
.await?;
for msg_id in msg_ids {

View File

@@ -167,19 +167,14 @@ pub(super) async fn handle_auth_required(
message: &MimeMessage,
) -> Result<HandshakeMessage> {
// Load all Bob states that expect `vc-auth-required` or `vg-auth-required`.
let bob_states: Vec<(i64, QrInvite, ChatId)> = context
let bob_states = context
.sql
.query_map(
"SELECT id, invite, chat_id FROM bobstate",
(),
|row| {
let row_id: i64 = row.get(0)?;
let invite: QrInvite = row.get(1)?;
let chat_id: ChatId = row.get(2)?;
Ok((row_id, invite, chat_id))
},
|rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.query_map_vec("SELECT id, invite, chat_id FROM bobstate", (), |row| {
let row_id: i64 = row.get(0)?;
let invite: QrInvite = row.get(1)?;
let chat_id: ChatId = row.get(2)?;
Ok((row_id, invite, chat_id))
})
.await?;
info!(

View File

@@ -503,19 +503,10 @@ pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp)
let rowids = context
.sql
.query_map(
"SELECT id FROM smtp ORDER BY id ASC",
(),
|row| {
let rowid: i64 = row.get(0)?;
Ok(rowid)
},
|rowids| {
rowids
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.query_map_vec("SELECT id FROM smtp ORDER BY id ASC", (), |row| {
let rowid: i64 = row.get(0)?;
Ok(rowid)
})
.await?;
info!(context, "Selected rows from SMTP queue: {rowids:?}.");
@@ -557,9 +548,9 @@ async fn send_mdn_rfc724_mid(
}
// Try to aggregate additional MDNs into this MDN.
let additional_rfc724_mids: Vec<String> = context
let additional_rfc724_mids = context
.sql
.query_map(
.query_map_vec(
"SELECT rfc724_mid
FROM smtp_mdns
WHERE from_id=? AND rfc724_mid!=?",
@@ -568,11 +559,8 @@ async fn send_mdn_rfc724_mid(
let rfc724_mid: String = row.get(0)?;
Ok(rfc724_mid)
},
|rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await?
.into_iter()
.collect();
.await?;
let mimefactory = MimeFactory::from_mdn(
context,

View File

@@ -386,6 +386,26 @@ impl Sql {
.await
}
/// Prepares and executes the statement and maps a function over the resulting rows.
///
/// Collects the resulting rows into a `Vec`.
pub async fn query_map_vec<T, F>(
&self,
sql: &str,
params: impl rusqlite::Params + Send,
f: F,
) -> Result<Vec<T>>
where
T: Send + 'static,
F: Send + FnMut(&rusqlite::Row) -> rusqlite::Result<T>,
{
self.query_map(sql, params, f, |rows| {
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
})
.await
}
/// Used for executing `SELECT COUNT` statements only. Returns the resulting count.
pub async fn count(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<usize> {
let count: isize = self.query_row(query, params, |row| row.get(0)).await?;

View File

@@ -372,20 +372,14 @@ async fn get_stats(context: &Context) -> Result<String> {
}
async fn get_timestamps(context: &Context, sql_table: &str) -> Result<Vec<i64>> {
let res = context
context
.sql
.query_map(
.query_map_vec(
&format!("SELECT timestamp FROM {sql_table} LIMIT 1000"),
(),
|row| row.get(0),
|rows| {
rows.collect::<rusqlite::Result<Vec<i64>>>()
.map_err(Into::into)
},
)
.await?;
Ok(res)
.await
}
pub(crate) async fn stats_id(context: &Context) -> Result<String> {
@@ -424,9 +418,9 @@ async fn get_contact_stats(context: &Context, last_old_contact: u32) -> Result<V
let mut verified_by_map: BTreeMap<ContactId, ContactId> = BTreeMap::new();
let mut bot_ids: BTreeSet<ContactId> = BTreeSet::new();
let mut contacts: Vec<ContactStat> = context
let mut contacts = context
.sql
.query_map(
.query_map_vec(
"SELECT id, fingerprint<>'', verifier, last_seen, is_bot FROM contacts c
WHERE id>9 AND origin>? AND addr<>?",
(Origin::Hidden, STATISTICS_BOT_EMAIL),
@@ -462,10 +456,6 @@ async fn get_contact_stats(context: &Context, last_old_contact: u32) -> Result<V
new: id.to_u32() > last_old_contact,
})
},
|rows| {
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.await?;
@@ -866,9 +856,9 @@ pub(crate) async fn count_securejoin_invite(context: &Context, invite: &QrInvite
}
async fn get_securejoin_invite_stats(context: &Context) -> Result<Vec<JoinedInvite>> {
let qr_scans: Vec<JoinedInvite> = context
context
.sql
.query_map(
.query_map_vec(
"SELECT already_existed, already_verified, type FROM stats_securejoin_invites",
(),
|row| {
@@ -882,14 +872,8 @@ async fn get_securejoin_invite_stats(context: &Context) -> Result<Vec<JoinedInvi
typ,
})
},
|rows| {
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.await?;
Ok(qr_scans)
.await
}
#[cfg(test)]