refactor(sql): do not expose rusqlite Error type in query_map methods

We use query_and_then() instead of query_map() function now.
The difference is that row processing function
returns anyhow::Result, so simple fallible processing
like JSON parsing can be done inside of it
when calling query_map_vec() and query_map_collect()
without having to resort to query_map()
and iterating over all rows again afterwards.
This commit is contained in:
link2xt
2025-11-03 15:51:17 +00:00
parent 1db6ea70cc
commit 9c2a13b88e
12 changed files with 102 additions and 64 deletions

View File

@@ -2036,7 +2036,10 @@ impl Chat {
ON c.id=cc.contact_id \
WHERE cc.chat_id=? AND cc.add_timestamp >= cc.remove_timestamp",
(self.id,),
|row| row.get::<_, String>(0),
|row| {
let addr: String = row.get(0)?;
Ok(addr)
},
)
.await?;
self.sync(context, SyncAction::SetContacts(addrs)).await?;
@@ -3050,7 +3053,7 @@ pub async fn get_chat_msgs_ex(
))
}
};
let process_rows = |rows: rusqlite::MappedRows<_>| {
let process_rows = |rows: rusqlite::AndThenRows<_>| {
// It is faster to sort here rather than
// let sqlite execute an ORDER BY clause.
let mut sorted_rows = Vec::new();
@@ -3132,7 +3135,10 @@ pub async fn marknoticed_chat(context: &Context, chat_id: ChatId) -> Result<()>
LEFT JOIN chats c ON m.chat_id=c.id
WHERE m.state=10 AND m.hidden=0 AND m.chat_id>9 AND c.archived=1",
(),
|row| row.get::<_, ChatId>(0),
|row| {
let chat_id: ChatId = row.get(0)?;
Ok(chat_id)
},
)
.await?;
if chat_ids_in_archive.is_empty() {
@@ -3310,7 +3316,10 @@ pub async fn get_chat_media(
DC_CHAT_ID_TRASH,
Viewtype::Webxdc,
),
|row| row.get::<_, MsgId>(0),
|row| {
let msg_id: MsgId = row.get(0)?;
Ok(msg_id)
},
)
.await?
} else {
@@ -3340,7 +3349,10 @@ pub async fn get_chat_media(
msg_type
},
),
|row| row.get::<_, MsgId>(0),
|row| {
let msg_id: MsgId = row.get(0)?;
Ok(msg_id)
},
)
.await?
};
@@ -3361,7 +3373,10 @@ pub async fn get_chat_contacts(context: &Context, chat_id: ChatId) -> Result<Vec
WHERE cc.chat_id=? AND cc.add_timestamp >= cc.remove_timestamp
ORDER BY c.id=1, c.last_seen DESC, c.id DESC;",
(chat_id,),
|row| row.get::<_, ContactId>(0),
|row| {
let contact_id: ContactId = row.get(0)?;
Ok(contact_id)
},
)
.await
}
@@ -3383,7 +3398,10 @@ pub async fn get_past_chat_contacts(context: &Context, chat_id: ChatId) -> Resul
AND ? < cc.remove_timestamp
ORDER BY c.id=1, cc.remove_timestamp DESC, c.id DESC",
(chat_id, now.saturating_sub(60 * 24 * 3600)),
|row| row.get::<_, ContactId>(0),
|row| {
let contact_id: ContactId = row.get(0)?;
Ok(contact_id)
},
)
.await
}
@@ -3822,11 +3840,13 @@ pub(crate) async fn shall_attach_selfavatar(context: &Context, chat_id: ChatId)
LEFT JOIN contacts c ON c.id=cc.contact_id
WHERE cc.chat_id=? AND cc.contact_id!=? AND cc.add_timestamp >= cc.remove_timestamp",
(chat_id, ContactId::SELF),
|row| Ok(row.get::<_, i64>(0)),
|row| {
let selfavatar_sent: i64 = row.get(0)?;
Ok(selfavatar_sent)
},
|rows| {
let mut needs_attach = false;
for row in rows {
let row = row?;
let selfavatar_sent = row?;
if selfavatar_sent < timestamp_some_days_ago {
needs_attach = true;

View File

@@ -107,11 +107,6 @@ impl Chatlist {
Ok((chat_id, msg_id))
};
let process_rows = |rows: rusqlite::MappedRows<_>| {
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
};
let skip_id = if flag_for_forwarding {
ChatId::lookup_by_contact(context, ContactId::DEVICE)
.await?
@@ -132,7 +127,7 @@ impl Chatlist {
// groups. Otherwise it would be hard to follow conversations.
let ids = if let Some(query_contact_id) = query_contact_id {
// show chats shared with a given contact
context.sql.query_map(
context.sql.query_map_vec(
"SELECT c.id, m.id
FROM chats c
LEFT JOIN msgs m
@@ -150,7 +145,6 @@ impl Chatlist {
ORDER BY c.archived=?3 DESC, IFNULL(m.timestamp,c.created_timestamp) DESC, m.id DESC;",
(MessageState::OutDraft, query_contact_id, ChatVisibility::Pinned),
process_row,
process_rows,
).await?
} else if flag_archived_only {
// show archived chats
@@ -159,7 +153,7 @@ impl Chatlist {
// and adapting the number requires larger refactorings and seems not to be worth the effort)
context
.sql
.query_map(
.query_map_vec(
"SELECT c.id, m.id
FROM chats c
LEFT JOIN msgs m
@@ -177,7 +171,6 @@ impl Chatlist {
ORDER BY IFNULL(m.timestamp,c.created_timestamp) DESC, m.id DESC;",
(MessageState::OutDraft,),
process_row,
process_rows,
)
.await?
} else if let Some(query) = query {
@@ -195,7 +188,7 @@ impl Chatlist {
let str_like_cmd = format!("%{query}%");
context
.sql
.query_map(
.query_map_vec(
"SELECT c.id, m.id
FROM chats c
LEFT JOIN msgs m
@@ -214,7 +207,6 @@ impl Chatlist {
ORDER BY IFNULL(m.timestamp,c.created_timestamp) DESC, m.id DESC;",
(MessageState::OutDraft, skip_id, str_like_cmd, only_unread, MessageState::InFresh),
process_row,
process_rows,
)
.await?
} else {
@@ -229,7 +221,7 @@ impl Chatlist {
let msg_id: Option<MsgId> = row.get(3)?;
Ok((chat_id, typ, param, msg_id))
};
let process_rows = |rows: rusqlite::MappedRows<_>| {
let process_rows = |rows: rusqlite::AndThenRows<_>| {
rows.filter_map(|row: std::result::Result<(_, _, Params, _), _>| match row {
Ok((chat_id, typ, param, msg_id)) => {
if typ == Chattype::Mailinglist
@@ -243,7 +235,6 @@ impl Chatlist {
Err(e) => Some(Err(e)),
})
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
};
context.sql.query_map(
"SELECT c.id, c.type, c.param, m.id
@@ -272,7 +263,7 @@ impl Chatlist {
).await?
} else {
// show normal chatlist
context.sql.query_map(
context.sql.query_map_vec(
"SELECT c.id, m.id
FROM chats c
LEFT JOIN msgs m
@@ -290,7 +281,6 @@ impl Chatlist {
ORDER BY c.id=0 DESC, c.archived=? DESC, IFNULL(m.timestamp,c.created_timestamp) DESC, m.id DESC;",
(MessageState::OutDraft, skip_id, ChatVisibility::Archived, ChatVisibility::Pinned),
process_row,
process_rows,
).await?
};
if !flag_no_specials && get_archived_cnt(context).await? > 0 {

View File

@@ -194,16 +194,11 @@ impl Context {
pub async fn list_transports(&self) -> Result<Vec<EnteredLoginParam>> {
let transports = self
.sql
.query_map(
"SELECT entered_param FROM transports",
(),
|row| row.get::<_, String>(0),
|rows| {
rows.flatten()
.map(|s| Ok(serde_json::from_str(&s)?))
.collect::<Result<Vec<EnteredLoginParam>>>()
},
)
.query_map_vec("SELECT entered_param FROM transports", (), |row| {
let entered_param: String = row.get(0)?;
let transport: EnteredLoginParam = serde_json::from_str(&entered_param)?;
Ok(transport)
})
.await?;
Ok(transports)

View File

@@ -1285,7 +1285,10 @@ impl Contact {
.query_map_vec(
"SELECT id FROM contacts WHERE id>? AND blocked!=0 ORDER BY last_seen DESC, id DESC;",
(ContactId::LAST_SPECIAL,),
|row| row.get::<_, ContactId>(0),
|row| {
let contact_id: ContactId = row.get(0)?;
Ok(contact_id)
}
)
.await?;
Ok(list)
@@ -2044,7 +2047,7 @@ impl RecentlySeenLoop {
// become unseen in the future.
let mut unseen_queue: BinaryHeap<MyHeapElem> = context
.sql
.query_map(
.query_map_collect(
"SELECT id, last_seen FROM contacts
WHERE last_seen > ?",
(now_ts - SEEN_RECENTLY_SECONDS,),
@@ -2053,10 +2056,6 @@ impl RecentlySeenLoop {
let last_seen: i64 = row.get("last_seen")?;
Ok((Reverse(last_seen + SEEN_RECENTLY_SECONDS), contact_id))
},
|rows| {
rows.collect::<std::result::Result<BinaryHeap<MyHeapElem>, _>>()
.map_err(Into::into)
},
)
.await
.unwrap_or_default();

View File

@@ -1101,7 +1101,10 @@ impl Context {
" ORDER BY m.timestamp DESC,m.id DESC;"
),
(MessageState::InFresh, time()),
|row| row.get::<_, MsgId>(0),
|row| {
let msg_id: MsgId = row.get(0)?;
Ok(msg_id)
},
)
.await?;
Ok(list)
@@ -1205,7 +1208,10 @@ impl Context {
AND IFNULL(txt_normalized, txt) LIKE ?
ORDER BY m.timestamp,m.id;",
(chat_id, str_like_in_text),
|row| row.get::<_, MsgId>("id"),
|row| {
let msg_id: MsgId = row.get("id")?;
Ok(msg_id)
},
)
.await?
} else {
@@ -1234,7 +1240,10 @@ impl Context {
AND IFNULL(txt_normalized, txt) LIKE ?
ORDER BY m.id DESC LIMIT 1000",
(str_like_in_text,),
|row| row.get::<_, MsgId>("id"),
|row| {
let msg_id: MsgId = row.get("id")?;
Ok(msg_id)
},
)
.await?
};

View File

@@ -159,13 +159,15 @@ pub(crate) async fn load_self_public_key(context: &Context) -> Result<SignedPubl
pub(crate) async fn load_self_public_keyring(context: &Context) -> Result<Vec<SignedPublicKey>> {
let keys = context
.sql
.query_map(
.query_map_vec(
r#"SELECT public_key
FROM keypairs
ORDER BY id=(SELECT value FROM config WHERE keyname='key_id') DESC"#,
(),
|row| row.get::<_, Vec<u8>>(0),
|keys| keys.collect::<Result<Vec<_>, _>>().map_err(Into::into),
|row| {
let public_key_bytes: Vec<u8> = row.get(0)?;
Ok(public_key_bytes)
},
)
.await?
.into_iter()
@@ -232,13 +234,15 @@ pub(crate) async fn load_self_secret_key(context: &Context) -> Result<SignedSecr
pub(crate) async fn load_self_secret_keyring(context: &Context) -> Result<Vec<SignedSecretKey>> {
let keys = context
.sql
.query_map(
.query_map_vec(
r#"SELECT private_key
FROM keypairs
ORDER BY id=(SELECT value FROM config WHERE keyname='key_id') DESC"#,
(),
|row| row.get::<_, Vec<u8>>(0),
|keys| keys.collect::<Result<Vec<_>, _>>().map_err(Into::into),
|row| {
let bytes: Vec<u8> = row.get(0)?;
Ok(bytes)
},
)
.await?
.into_iter()

View File

@@ -348,7 +348,10 @@ pub async fn set(context: &Context, latitude: f64, longitude: f64, accuracy: f64
.query_map_vec(
"SELECT id FROM chats WHERE locations_send_until>?;",
(now,),
|row| row.get::<_, i32>(0),
|row| {
let id: i32 = row.get(0)?;
Ok(id)
},
)
.await?;

View File

@@ -357,7 +357,10 @@ impl MimeMessage {
let decrypted_msg; // Decrypted signed OpenPGP message.
let secrets: Vec<String> = context
.sql
.query_map_vec("SELECT secret FROM broadcast_secrets", (), |row| row.get(0))
.query_map_vec("SELECT secret FROM broadcast_secrets", (), |row| {
let secret: String = row.get(0)?;
Ok(secret)
})
.await?;
let (mail, is_encrypted) =

View File

@@ -373,14 +373,14 @@ impl Sql {
g: G,
) -> Result<H>
where
F: Send + FnMut(&rusqlite::Row) -> rusqlite::Result<T>,
G: Send + FnOnce(rusqlite::MappedRows<F>) -> Result<H>,
F: Send + FnMut(&rusqlite::Row) -> Result<T>,
G: Send + FnOnce(rusqlite::AndThenRows<F>) -> Result<H>,
H: Send + 'static,
{
let query_only = true;
self.call(query_only, move |conn| {
let mut stmt = conn.prepare(sql)?;
let res = stmt.query_map(params, f)?;
let res = stmt.query_and_then(params, f)?;
g(res)
})
.await
@@ -398,11 +398,10 @@ impl Sql {
where
T: Send + 'static,
C: Send + 'static + std::iter::FromIterator<T>,
F: Send + FnMut(&rusqlite::Row) -> rusqlite::Result<T>,
F: Send + FnMut(&rusqlite::Row) -> Result<T>,
{
self.query_map(sql, params, f, |rows| {
rows.collect::<std::result::Result<C, _>>()
.map_err(Into::into)
})
.await
}
@@ -418,7 +417,7 @@ impl Sql {
) -> Result<Vec<T>>
where
T: Send + 'static,
F: Send + FnMut(&rusqlite::Row) -> rusqlite::Result<T>,
F: Send + FnMut(&rusqlite::Row) -> Result<T>,
{
self.query_map_collect(sql, params, f).await
}
@@ -969,7 +968,10 @@ pub async fn remove_unused_files(context: &Context) -> Result<()> {
.query_map(
"SELECT value FROM config;",
(),
|row| row.get::<_, String>(0),
|row| {
let row: String = row.get(0)?;
Ok(row)
},
|rows| {
for row in rows {
maybe_add_file(&mut files_in_use, &row?);
@@ -985,7 +987,10 @@ pub async fn remove_unused_files(context: &Context) -> Result<()> {
.query_map(
"SELECT blobname FROM http_cache",
(),
|row| row.get::<_, String>(0),
|row| {
let row: String = row.get(0)?;
Ok(row)
},
|rows| {
for row in rows {
maybe_add_file(&mut files_in_use, &row?);
@@ -1125,7 +1130,10 @@ async fn maybe_add_from_param(
sql.query_map(
query,
(),
|row| row.get::<_, String>(0),
|row| {
let row: String = row.get(0)?;
Ok(row)
},
|rows| {
for row in rows {
let param: Params = row?.parse().unwrap_or_default();

View File

@@ -378,7 +378,10 @@ async fn get_timestamps(context: &Context, sql_table: &str) -> Result<Vec<i64>>
.query_map_vec(
&format!("SELECT timestamp FROM {sql_table} LIMIT 1000"),
(),
|row| row.get(0),
|row| {
let timestamp: i64 = row.get(0)?;
Ok(timestamp)
},
)
.await
}

View File

@@ -202,7 +202,11 @@ impl Context {
.query_map(
"SELECT id, item FROM multi_device_sync ORDER BY id;",
(),
|row| Ok((row.get::<_, u32>(0)?, row.get::<_, String>(1)?)),
|row| {
let id: u32 = row.get(0)?;
let item: String = row.get(1)?;
Ok((id, item))
},
|rows| {
let mut ids = vec![];
let mut serialized = String::default();

View File

@@ -732,8 +732,8 @@ impl Context {
"SELECT update_item, id FROM msgs_status_updates WHERE msg_id=? AND id>? ORDER BY id",
(instance_msg_id, last_known_serial),
|row| {
let update_item_str = row.get::<_, String>(0)?;
let serial = row.get::<_, StatusUpdateSerial>(1)?;
let update_item_str: String = row.get(0)?;
let serial: StatusUpdateSerial = row.get(1)?;
Ok((update_item_str, serial))
},
|rows| {