sql: make all queries persistent and update to upstream sqlx

&str queries are not persistent by default.  To make queries persistent,
they have to be constructed with sqlx::query.

Upstream sqlx does not contain the change that make all queries
persistent, but it is not needed anymore. but
This commit is contained in:
link2xt
2021-04-10 21:39:24 +03:00
committed by bjoern
parent 3707471266
commit 37f68459f6
14 changed files with 114 additions and 83 deletions

View File

@@ -2860,7 +2860,9 @@ pub(crate) async fn get_chat_cnt(context: &Context) -> Result<usize, Error> {
// no database, no chats - this is no error (needed eg. for information)
let count = context
.sql
.count("SELECT COUNT(*) FROM chats WHERE id>9 AND blocked=0;")
.count(sqlx::query(
"SELECT COUNT(*) FROM chats WHERE id>9 AND blocked=0;",
))
.await?;
Ok(count as usize)
} else {
@@ -3030,7 +3032,10 @@ pub(crate) async fn delete_and_reset_all_device_msgs(context: &Context) -> Resul
.sql
.execute(sqlx::query("DELETE FROM msgs WHERE from_id=?;").bind(DC_CONTACT_ID_DEVICE as i32))
.await?;
context.sql.execute("DELETE FROM devmsglabels;").await?;
context
.sql
.execute(sqlx::query("DELETE FROM devmsglabels;"))
.await?;
Ok(())
}

View File

@@ -410,7 +410,9 @@ impl Chatlist {
pub async fn dc_get_archived_cnt(context: &Context) -> Result<usize> {
let count = context
.sql
.count("SELECT COUNT(*) FROM chats WHERE blocked=0 AND archived=1;")
.count(sqlx::query(
"SELECT COUNT(*) FROM chats WHERE blocked=0 AND archived=1;",
))
.await?;
Ok(count)
}
@@ -420,7 +422,7 @@ async fn get_last_deaddrop_fresh_msg(context: &Context) -> Result<Option<MsgId>>
// sufficient as there are typically only few fresh messages.
let id = context
.sql
.query_get_value(concat!(
.query_get_value(sqlx::query(concat!(
"SELECT m.id",
" FROM msgs m",
" LEFT JOIN chats c",
@@ -429,7 +431,7 @@ async fn get_last_deaddrop_fresh_msg(context: &Context) -> Result<Option<MsgId>>
" AND m.hidden=0",
" AND c.blocked=2",
" ORDER BY m.timestamp DESC, m.id DESC;"
))
)))
.await?;
Ok(id)
}

View File

@@ -242,7 +242,7 @@ impl Context {
match key {
Config::Selfavatar => {
self.sql
.execute("UPDATE contacts SET selfavatar_sent=0;")
.execute(sqlx::query("UPDATE contacts SET selfavatar_sent=0;"))
.await?;
self.sql
.set_raw_config_bool("attach_selfavatar", true)

View File

@@ -290,16 +290,22 @@ impl Context {
.unwrap_or_default();
let journal_mode = self
.sql
.query_get_value("PRAGMA journal_mode;")
.query_get_value(sqlx::query("PRAGMA journal_mode;"))
.await?
.unwrap_or_else(|| "unknown".to_string());
let e2ee_enabled = self.get_config_int(Config::E2eeEnabled).await?;
let mdns_enabled = self.get_config_int(Config::MdnsEnabled).await?;
let bcc_self = self.get_config_int(Config::BccSelf).await?;
let prv_key_cnt = self.sql.count("SELECT COUNT(*) FROM keypairs;").await?;
let prv_key_cnt = self
.sql
.count(sqlx::query("SELECT COUNT(*) FROM keypairs;"))
.await?;
let pub_key_cnt = self.sql.count("SELECT COUNT(*) FROM acpeerstates;").await?;
let pub_key_cnt = self
.sql
.count(sqlx::query("SELECT COUNT(*) FROM acpeerstates;"))
.await?;
let fingerprint_str = match SignedPublicKey::load_self(self).await {
Ok(key) => key.fingerprint().hex(),
Err(err) => format!("<key failure: {}>", err),

View File

@@ -595,7 +595,7 @@ async fn import_backup_old(context: &Context, backup_to_import: impl AsRef<Path>
let total_files_cnt = context
.sql
.count("SELECT COUNT(*) FROM backup_blobs;")
.count(sqlx::query("SELECT COUNT(*) FROM backup_blobs;"))
.await?;
info!(
@@ -607,7 +607,7 @@ async fn import_backup_old(context: &Context, backup_to_import: impl AsRef<Path>
// consuming too much memory.
let file_ids = context
.sql
.fetch("SELECT id FROM backup_blobs ORDER BY id")
.fetch(sqlx::query("SELECT id FROM backup_blobs ORDER BY id"))
.await?
.map(|row| row?.try_get(0))
.collect::<sqlx::Result<Vec<i64>>>()
@@ -648,8 +648,11 @@ async fn import_backup_old(context: &Context, backup_to_import: impl AsRef<Path>
if all_files_extracted {
// only delete backup_blobs if all files were successfully extracted
context.sql.execute("DROP TABLE backup_blobs;").await?;
context.sql.execute("VACUUM;").await.ok();
context
.sql
.execute(sqlx::query("DROP TABLE backup_blobs;"))
.await?;
context.sql.execute(sqlx::query("VACUUM;")).await.ok();
Ok(())
} else {
bail!("received stop signal");
@@ -674,7 +677,7 @@ async fn export_backup(context: &Context, dir: impl AsRef<Path>) -> Result<()> {
context
.sql
.execute("VACUUM;")
.execute(sqlx::query("VACUUM;"))
.await
.map_err(|e| warn!(context, "Vacuum failed, exporting anyway {}", e));
@@ -829,7 +832,9 @@ async fn export_self_keys(context: &Context, dir: impl AsRef<Path>) -> Result<()
let mut keys = context
.sql
.fetch("SELECT id, public_key, private_key, is_default FROM keypairs;")
.fetch(sqlx::query(
"SELECT id, public_key, private_key, is_default FROM keypairs;",
))
.await?
.map(|row| -> sqlx::Result<_> {
let row = row?;

View File

@@ -123,14 +123,14 @@ impl DcKey for SignedPublicKey {
async fn load_self(context: &Context) -> Result<Self::KeyType> {
match context
.sql
.fetch_optional(
.fetch_optional(sqlx::query(
r#"
SELECT public_key
FROM keypairs
WHERE addr=(SELECT value FROM config WHERE keyname="configured_addr")
AND is_default=1;
"#,
)
))
.await?
{
Some(row) => Self::from_slice(row.try_get(0)?),
@@ -165,14 +165,14 @@ impl DcKey for SignedSecretKey {
async fn load_self(context: &Context) -> Result<Self::KeyType> {
match context
.sql
.fetch_optional(
.fetch_optional(sqlx::query(
r#"
SELECT private_key
FROM keypairs
WHERE addr=(SELECT value FROM config WHERE keyname="configured_addr")
AND is_default=1;
"#,
)
))
.await?
{
Some(row) => Self::from_slice(row.try_get(0)?),
@@ -328,7 +328,7 @@ pub async fn store_self_keypair(
if default == KeyPairUse::Default {
context
.sql
.execute("UPDATE keypairs SET is_default=0;")
.execute(sqlx::query("UPDATE keypairs SET is_default=0;"))
.await
.map_err(|err| SaveKeyError::new("failed to clear default", err))?;
}
@@ -625,7 +625,7 @@ i8pcjGO+IZffvyZJVRWfVooBJmWWbPB1pueo3tx8w3+fcuzpxz+RLFKaPyqXO+dD
let nrows = || async {
ctx.sql
.count("SELECT COUNT(*) FROM keypairs;")
.count(sqlx::query("SELECT COUNT(*) FROM keypairs;"))
.await
.unwrap()
};

View File

@@ -401,7 +401,10 @@ fn is_marker(txt: &str) -> bool {
/// Deletes all locations from the database.
pub async fn delete_all(context: &Context) -> Result<(), Error> {
context.sql.execute("DELETE FROM locations;").await?;
context
.sql
.execute(sqlx::query("DELETE FROM locations;"))
.await?;
context.emit_event(EventType::LocationChanged(None));
Ok(())
}

View File

@@ -1878,11 +1878,11 @@ async fn ndn_maybe_add_info_msg(
pub async fn get_real_msg_cnt(context: &Context) -> usize {
match context
.sql
.count(
.count(sqlx::query(
"SELECT COUNT(*) \
FROM msgs m LEFT JOIN chats c ON c.id=m.chat_id \
WHERE m.id>9 AND m.chat_id>9 AND c.blocked=0;",
)
))
.await
{
Ok(res) => res,
@@ -1896,11 +1896,11 @@ pub async fn get_real_msg_cnt(context: &Context) -> usize {
pub async fn get_deaddrop_msg_cnt(context: &Context) -> usize {
match context
.sql
.count(
.count(sqlx::query(
"SELECT COUNT(*) \
FROM msgs m LEFT JOIN chats c ON c.id=m.chat_id \
WHERE c.blocked=2;",
)
))
.await
{
Ok(res) => res,

View File

@@ -5,7 +5,7 @@ use std::fmt;
use anyhow::{bail, Result};
use num_traits::FromPrimitive;
use sqlx::Row;
use sqlx::{query::Query, sqlite::Sqlite, Row};
use crate::aheader::{Aheader, EncryptPreference};
use crate::chat;
@@ -173,10 +173,12 @@ impl Peerstate {
Self::from_stmt(context, query).await
}
async fn from_stmt<'e, 'q, E>(context: &Context, query: E) -> Result<Option<Peerstate>>
async fn from_stmt<'q, E>(
context: &Context,
query: Query<'q, Sqlite, E>,
) -> Result<Option<Peerstate>>
where
'q: 'e,
E: 'q + sqlx::Execute<'q, sqlx::Sqlite>,
E: 'q + sqlx::IntoArguments<'q, sqlx::Sqlite>,
{
if let Some(row) = context.sql.fetch_optional(query).await? {
// all the above queries start with this: SELECT

View File

@@ -417,9 +417,9 @@ ALTER TABLE msgs ADD COLUMN mime_modified INTEGER DEFAULT 0;"#,
if dbversion < 73 {
use Config::*;
info!(context, "[migration] v73");
sql.execute(
sql.execute(sqlx::query(
r#"
CREATE TABLE imap_sync (folder TEXT PRIMARY KEY, uidvalidity INTEGER DEFAULT 0, uid_next INTEGER DEFAULT 0);"#,
CREATE TABLE imap_sync (folder TEXT PRIMARY KEY, uidvalidity INTEGER DEFAULT 0, uid_next INTEGER DEFAULT 0);"#),
)
.await?;
for c in &[

View File

@@ -10,8 +10,9 @@ use async_std::prelude::*;
use async_std::sync::RwLock;
use sqlx::{
pool::PoolOptions,
query::Query,
sqlite::{Sqlite, SqliteConnectOptions, SqliteJournalMode, SqlitePool, SqliteSynchronous},
Execute, Executor, Row,
Executor, IntoArguments, Row,
};
use crate::chat::{add_device_msg, update_device_icon, update_saved_messages_icon};
@@ -171,7 +172,9 @@ PRAGMA query_only=1; -- Protect against writes even in read-write mode
if recalc_fingerprints {
info!(context, "[migration] recalc fingerprints");
let mut rows = self.fetch("SELECT addr FROM acpeerstates;").await?;
let mut rows = self
.fetch(sqlx::query("SELECT addr FROM acpeerstates;"))
.await?;
while let Some(row) = rows.next().await {
let row = row?;
@@ -208,10 +211,9 @@ PRAGMA query_only=1; -- Protect against writes even in read-write mode
}
/// Execute the given query, returning the number of affected rows.
pub async fn execute<'e, 'q, E>(&self, query: E) -> Result<u64>
pub async fn execute<'q, E>(&self, query: Query<'q, Sqlite, E>) -> Result<u64>
where
'q: 'e,
E: 'q + Execute<'q, Sqlite>,
E: 'q + IntoArguments<'q, Sqlite>,
{
let lock = self.writer.read().await;
let pool = lock.as_ref().ok_or(Error::SqlNoConnection)?;
@@ -221,10 +223,9 @@ PRAGMA query_only=1; -- Protect against writes even in read-write mode
}
/// Execute many queries.
pub async fn execute_many<'e, 'q, E>(&self, query: E) -> Result<()>
pub async fn execute_many<'q, E>(&self, query: Query<'q, Sqlite, E>) -> Result<()>
where
'q: 'e,
E: 'q + Execute<'q, Sqlite>,
E: 'q + IntoArguments<'q, Sqlite>,
{
let lock = self.writer.read().await;
let pool = lock.as_ref().ok_or(Error::SqlNoConnection)?;
@@ -236,13 +237,12 @@ PRAGMA query_only=1; -- Protect against writes even in read-write mode
}
/// Fetch the given query.
pub async fn fetch<'e, 'q, E>(
pub async fn fetch<'q, E>(
&self,
query: E,
) -> Result<impl Stream<Item = sqlx::Result<<Sqlite as sqlx::Database>::Row>> + 'e + Send>
query: Query<'q, Sqlite, E>,
) -> Result<impl Stream<Item = sqlx::Result<<Sqlite as sqlx::Database>::Row>> + Send + 'q>
where
'q: 'e,
E: 'q + Execute<'q, Sqlite>,
E: 'q + IntoArguments<'q, Sqlite>,
{
let lock = self.reader.read().await;
let pool = lock.as_ref().ok_or(Error::SqlNoConnection)?;
@@ -252,10 +252,12 @@ PRAGMA query_only=1; -- Protect against writes even in read-write mode
}
/// Fetch exactly one row, errors if no row is found.
pub async fn fetch_one<'e, 'q, E>(&self, query: E) -> Result<<Sqlite as sqlx::Database>::Row>
pub async fn fetch_one<'q, E>(
&self,
query: Query<'q, Sqlite, E>,
) -> Result<<Sqlite as sqlx::Database>::Row>
where
'q: 'e,
E: 'q + Execute<'q, Sqlite>,
E: 'q + IntoArguments<'q, Sqlite>,
{
let lock = self.reader.read().await;
let pool = lock.as_ref().ok_or(Error::SqlNoConnection)?;
@@ -267,11 +269,10 @@ PRAGMA query_only=1; -- Protect against writes even in read-write mode
/// Fetches at most one row.
pub async fn fetch_optional<'e, 'q, E>(
&self,
query: E,
query: Query<'q, Sqlite, E>,
) -> Result<Option<<Sqlite as sqlx::Database>::Row>>
where
'q: 'e,
E: 'q + Execute<'q, Sqlite>,
E: 'q + IntoArguments<'q, Sqlite>,
{
let lock = self.reader.read().await;
let pool = lock.as_ref().ok_or(Error::SqlNoConnection)?;
@@ -281,10 +282,9 @@ PRAGMA query_only=1; -- Protect against writes even in read-write mode
}
/// Used for executing `SELECT COUNT` statements only. Returns the resulting count.
pub async fn count<'e, 'q, E>(&self, query: E) -> Result<usize>
pub async fn count<'e, 'q, E>(&self, query: Query<'q, Sqlite, E>) -> Result<usize>
where
'q: 'e,
E: 'q + Execute<'q, Sqlite>,
E: 'q + IntoArguments<'q, Sqlite>,
{
use std::convert::TryFrom;
@@ -296,10 +296,9 @@ PRAGMA query_only=1; -- Protect against writes even in read-write mode
/// Used for executing `SELECT COUNT` statements only. Returns `true`, if the count is at least
/// one, `false` otherwise.
pub async fn exists<'e, 'q, E>(&self, query: E) -> Result<bool>
pub async fn exists<'e, 'q, E>(&self, query: Query<'q, Sqlite, E>) -> Result<bool>
where
'q: 'e,
E: 'q + Execute<'q, Sqlite>,
E: 'q + IntoArguments<'q, Sqlite>,
{
let count = self.count(query).await?;
Ok(count > 0)
@@ -383,10 +382,12 @@ PRAGMA query_only=1; -- Protect against writes even in read-write mode
/// Executes a query which is expected to return one row and one
/// column. If the query does not return a value or returns SQL
/// `NULL`, returns `Ok(None)`.
pub async fn query_get_value<'e, 'q, E, T>(&self, query: E) -> Result<Option<T>>
pub async fn query_get_value<'e, 'q, E, T>(
&self,
query: Query<'q, Sqlite, E>,
) -> Result<Option<T>>
where
'q: 'e,
E: 'q + Execute<'q, Sqlite>,
E: 'q + IntoArguments<'q, Sqlite>,
T: for<'r> sqlx::Decode<'r, Sqlite> + sqlx::Type<Sqlite>,
{
let res = self
@@ -569,7 +570,10 @@ pub async fn housekeeping(context: &Context) -> Result<()> {
)
.await?;
let mut rows = context.sql.fetch("SELECT value FROM config;").await?;
let mut rows = context
.sql
.fetch(sqlx::query("SELECT value FROM config;"))
.await?;
while let Some(row) = rows.next().await {
let row: String = row?.try_get(0)?;
maybe_add_file(&mut files_in_use, row);
@@ -692,7 +696,7 @@ async fn maybe_add_from_param(
query: &str,
param_id: Param,
) -> Result<()> {
let mut rows = sql.fetch(query).await?;
let mut rows = sql.fetch(sqlx::query(query)).await?;
while let Some(row) = rows.next().await {
let row: String = row?.try_get(0)?;
let param: Params = row.parse().unwrap_or_default();