mirror of
https://github.com/chatmail/core.git
synced 2026-05-09 01:46:30 +03:00
feat(sql): set PRAGMA query_only to avoid writing on read-only connections
Co-authored-by: iequidoo <dgreshilov@gmail.com>
This commit is contained in:
99
src/sql.rs
99
src/sql.rs
@@ -325,7 +325,8 @@ impl Sql {
|
|||||||
let mut lock = self.pool.write().await;
|
let mut lock = self.pool.write().await;
|
||||||
|
|
||||||
let pool = lock.take().context("SQL connection pool is not open")?;
|
let pool = lock.take().context("SQL connection pool is not open")?;
|
||||||
let conn = pool.get().await?;
|
let query_only = false;
|
||||||
|
let conn = pool.get(query_only).await?;
|
||||||
if !passphrase.is_empty() {
|
if !passphrase.is_empty() {
|
||||||
conn.pragma_update(None, "rekey", passphrase.clone())
|
conn.pragma_update(None, "rekey", passphrase.clone())
|
||||||
.context("Failed to set PRAGMA rekey")?;
|
.context("Failed to set PRAGMA rekey")?;
|
||||||
@@ -382,14 +383,14 @@ impl Sql {
|
|||||||
/// - or use `call_write()` instead.
|
/// - or use `call_write()` instead.
|
||||||
///
|
///
|
||||||
/// Returns the result of the function.
|
/// Returns the result of the function.
|
||||||
async fn call<'a, F, R>(&'a self, function: F) -> Result<R>
|
async fn call<'a, F, R>(&'a self, query_only: bool, function: F) -> Result<R>
|
||||||
where
|
where
|
||||||
F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
|
F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
|
||||||
R: Send + 'static,
|
R: Send + 'static,
|
||||||
{
|
{
|
||||||
let lock = self.pool.read().await;
|
let lock = self.pool.read().await;
|
||||||
let pool = lock.as_ref().context("no SQL connection")?;
|
let pool = lock.as_ref().context("no SQL connection")?;
|
||||||
let mut conn = pool.get().await?;
|
let mut conn = pool.get(query_only).await?;
|
||||||
let res = tokio::task::block_in_place(move || function(&mut conn))?;
|
let res = tokio::task::block_in_place(move || function(&mut conn))?;
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
@@ -404,7 +405,8 @@ impl Sql {
|
|||||||
R: Send + 'static,
|
R: Send + 'static,
|
||||||
{
|
{
|
||||||
let _lock = self.write_lock().await;
|
let _lock = self.write_lock().await;
|
||||||
self.call(function).await
|
let query_only = false;
|
||||||
|
self.call(query_only, function).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Execute `query` assuming it is a write query, returning the number of affected rows.
|
/// Execute `query` assuming it is a write query, returning the number of affected rows.
|
||||||
@@ -444,7 +446,8 @@ impl Sql {
|
|||||||
G: Send + FnMut(rusqlite::MappedRows<F>) -> Result<H>,
|
G: Send + FnMut(rusqlite::MappedRows<F>) -> Result<H>,
|
||||||
H: Send + 'static,
|
H: Send + 'static,
|
||||||
{
|
{
|
||||||
self.call(move |conn| {
|
let query_only = true;
|
||||||
|
self.call(query_only, move |conn| {
|
||||||
let mut stmt = conn.prepare(sql)?;
|
let mut stmt = conn.prepare(sql)?;
|
||||||
let res = stmt.query_map(params, f)?;
|
let res = stmt.query_map(params, f)?;
|
||||||
g(res)
|
g(res)
|
||||||
@@ -476,7 +479,8 @@ impl Sql {
|
|||||||
F: FnOnce(&rusqlite::Row) -> rusqlite::Result<T> + Send,
|
F: FnOnce(&rusqlite::Row) -> rusqlite::Result<T> + Send,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
{
|
{
|
||||||
self.call(move |conn| {
|
let query_only = true;
|
||||||
|
self.call(query_only, move |conn| {
|
||||||
let res = conn.query_row(query, params, f)?;
|
let res = conn.query_row(query, params, f)?;
|
||||||
Ok(res)
|
Ok(res)
|
||||||
})
|
})
|
||||||
@@ -512,7 +516,8 @@ impl Sql {
|
|||||||
|
|
||||||
/// Query the database if the requested table already exists.
|
/// Query the database if the requested table already exists.
|
||||||
pub async fn table_exists(&self, name: &str) -> Result<bool> {
|
pub async fn table_exists(&self, name: &str) -> Result<bool> {
|
||||||
self.call(move |conn| {
|
let query_only = true;
|
||||||
|
self.call(query_only, move |conn| {
|
||||||
let mut exists = false;
|
let mut exists = false;
|
||||||
conn.pragma(None, "table_info", name.to_string(), |_row| {
|
conn.pragma(None, "table_info", name.to_string(), |_row| {
|
||||||
// will only be executed if the info was found
|
// will only be executed if the info was found
|
||||||
@@ -527,7 +532,8 @@ impl Sql {
|
|||||||
|
|
||||||
/// Check if a column exists in a given table.
|
/// Check if a column exists in a given table.
|
||||||
pub async fn col_exists(&self, table_name: &str, col_name: &str) -> Result<bool> {
|
pub async fn col_exists(&self, table_name: &str, col_name: &str) -> Result<bool> {
|
||||||
self.call(move |conn| {
|
let query_only = true;
|
||||||
|
self.call(query_only, move |conn| {
|
||||||
let mut exists = false;
|
let mut exists = false;
|
||||||
// `PRAGMA table_info` returns one row per column,
|
// `PRAGMA table_info` returns one row per column,
|
||||||
// each row containing 0=cid, 1=name, 2=type, 3=notnull, 4=dflt_value
|
// each row containing 0=cid, 1=name, 2=type, 3=notnull, 4=dflt_value
|
||||||
@@ -555,10 +561,13 @@ impl Sql {
|
|||||||
F: Send + FnOnce(&rusqlite::Row) -> rusqlite::Result<T>,
|
F: Send + FnOnce(&rusqlite::Row) -> rusqlite::Result<T>,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
{
|
{
|
||||||
self.call(move |conn| match conn.query_row(sql.as_ref(), params, f) {
|
let query_only = true;
|
||||||
Ok(res) => Ok(Some(res)),
|
self.call(query_only, move |conn| {
|
||||||
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
|
match conn.query_row(sql.as_ref(), params, f) {
|
||||||
Err(err) => Err(err.into()),
|
Ok(res) => Ok(Some(res)),
|
||||||
|
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
|
||||||
|
Err(err) => Err(err.into()),
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
@@ -1092,9 +1101,10 @@ mod tests {
|
|||||||
async fn test_auto_vacuum() -> Result<()> {
|
async fn test_auto_vacuum() -> Result<()> {
|
||||||
let t = TestContext::new().await;
|
let t = TestContext::new().await;
|
||||||
|
|
||||||
|
let query_only = true;
|
||||||
let auto_vacuum = t
|
let auto_vacuum = t
|
||||||
.sql
|
.sql
|
||||||
.call(|conn| {
|
.call(query_only, |conn| {
|
||||||
let auto_vacuum = conn.pragma_query_value(None, "auto_vacuum", |row| {
|
let auto_vacuum = conn.pragma_query_value(None, "auto_vacuum", |row| {
|
||||||
let auto_vacuum: i32 = row.get(0)?;
|
let auto_vacuum: i32 = row.get(0)?;
|
||||||
Ok(auto_vacuum)
|
Ok(auto_vacuum)
|
||||||
@@ -1320,8 +1330,9 @@ mod tests {
|
|||||||
{
|
{
|
||||||
let lock = sql.pool.read().await;
|
let lock = sql.pool.read().await;
|
||||||
let pool = lock.as_ref().unwrap();
|
let pool = lock.as_ref().unwrap();
|
||||||
let conn1 = pool.get().await?;
|
let query_only = true;
|
||||||
let conn2 = pool.get().await?;
|
let conn1 = pool.get(query_only).await?;
|
||||||
|
let conn2 = pool.get(query_only).await?;
|
||||||
conn1
|
conn1
|
||||||
.query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
|
.query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -1346,4 +1357,62 @@ mod tests {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_query_only() -> Result<()> {
|
||||||
|
let t = TestContext::new().await;
|
||||||
|
|
||||||
|
// `query_row` does not acquire write lock
|
||||||
|
// and operates on read-only connection.
|
||||||
|
// Using it to `INSERT` should fail.
|
||||||
|
let res = t
|
||||||
|
.sql
|
||||||
|
.query_row(
|
||||||
|
"INSERT INTO config (keyname, value) VALUES (?, ?) RETURNING 1",
|
||||||
|
("xyz", "ijk"),
|
||||||
|
|row| {
|
||||||
|
let res: u32 = row.get(0)?;
|
||||||
|
Ok(res)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert!(res.is_err());
|
||||||
|
|
||||||
|
// If you want to `INSERT` and get value via `RETURNING`,
|
||||||
|
// use `call_write` or `transaction`.
|
||||||
|
|
||||||
|
let res: Result<u32> = t
|
||||||
|
.sql
|
||||||
|
.call_write(|conn| {
|
||||||
|
let val = conn.query_row(
|
||||||
|
"INSERT INTO config (keyname, value) VALUES (?, ?) RETURNING 2",
|
||||||
|
("foo", "bar"),
|
||||||
|
|row| {
|
||||||
|
let res: u32 = row.get(0)?;
|
||||||
|
Ok(res)
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
Ok(val)
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
assert_eq!(res.unwrap(), 2);
|
||||||
|
|
||||||
|
let res = t
|
||||||
|
.sql
|
||||||
|
.transaction(|t| {
|
||||||
|
let val = t.query_row(
|
||||||
|
"INSERT INTO config (keyname, value) VALUES (?, ?) RETURNING 3",
|
||||||
|
("abc", "def"),
|
||||||
|
|row| {
|
||||||
|
let res: u32 = row.get(0)?;
|
||||||
|
Ok(res)
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
Ok(val)
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
assert_eq!(res.unwrap(), 3);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -93,7 +93,13 @@ impl Pool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Retrieves a connection from the pool.
|
/// Retrieves a connection from the pool.
|
||||||
pub async fn get(&self) -> Result<PooledConnection> {
|
///
|
||||||
|
/// Sets `query_only` pragma to the provided value
|
||||||
|
/// to prevent accidentaly misuse of connection
|
||||||
|
/// for writing when reading is intended.
|
||||||
|
/// Only pass `query_only=false` if you want
|
||||||
|
/// to use the connection for writing.
|
||||||
|
pub async fn get(&self, query_only: bool) -> Result<PooledConnection> {
|
||||||
let permit = self.inner.semaphore.clone().acquire_owned().await?;
|
let permit = self.inner.semaphore.clone().acquire_owned().await?;
|
||||||
let mut connections = self.inner.connections.lock();
|
let mut connections = self.inner.connections.lock();
|
||||||
let conn = connections
|
let conn = connections
|
||||||
@@ -104,6 +110,15 @@ impl Pool {
|
|||||||
conn: Some(conn),
|
conn: Some(conn),
|
||||||
_permit: permit,
|
_permit: permit,
|
||||||
};
|
};
|
||||||
|
conn.pragma_update(
|
||||||
|
None,
|
||||||
|
"query_only",
|
||||||
|
if query_only {
|
||||||
|
"1".to_string()
|
||||||
|
} else {
|
||||||
|
"0".to_string()
|
||||||
|
},
|
||||||
|
)?;
|
||||||
Ok(conn)
|
Ok(conn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user