Compare commits

..

1 Commits

Author SHA1 Message Date
link2xt
90087bde39 sql: release memory when returning connection to the pool 2023-02-17 22:09:24 +00:00
4 changed files with 42 additions and 41 deletions

View File

@@ -61,7 +61,7 @@ pretty_env_logger = { version = "0.4", optional = true }
quick-xml = "0.27"
rand = "0.8"
regex = "1.7"
rusqlite = { version = "0.28", features = ["sqlcipher"] }
rusqlite = { version = "0.28", features = ["sqlcipher", "release_memory"] }
rust-hsluv = "0.1"
sanitize-filename = "0.4"
serde_json = "1.0"

View File

@@ -1,6 +1,7 @@
//! # Account manager module.
use std::collections::BTreeMap;
use std::future::Future;
use std::path::{Path, PathBuf};
use anyhow::{ensure, Context as _, Result};
@@ -150,7 +151,7 @@ impl Accounts {
if let Some(cfg) = self.config.get_account(id) {
let account_path = self.dir.join(cfg.dir);
fs::remove_dir_all(&account_path)
try_many_times(|| fs::remove_dir_all(&account_path))
.await
.context("failed to remove account data")?;
}
@@ -186,10 +187,10 @@ impl Accounts {
fs::create_dir_all(self.dir.join(&account_config.dir))
.await
.context("failed to create dir")?;
fs::rename(&dbfile, &new_dbfile)
try_many_times(|| fs::rename(&dbfile, &new_dbfile))
.await
.context("failed to rename dbfile")?;
fs::rename(&blobdir, &new_blobdir)
try_many_times(|| fs::rename(&blobdir, &new_blobdir))
.await
.context("failed to rename blobdir")?;
if walfile.exists() {
@@ -214,7 +215,7 @@ impl Accounts {
}
Err(err) => {
let account_path = std::path::PathBuf::from(&account_config.dir);
fs::remove_dir_all(&account_path)
try_many_times(|| fs::remove_dir_all(&account_path))
.await
.context("failed to remove account data")?;
self.config.remove_account(account_config.id).await?;
@@ -471,6 +472,33 @@ impl Config {
}
}
/// Spend up to 1 minute trying to do the operation.
///
/// Files may remain locked up to 30 seconds due to r2d2 bug:
/// <https://github.com/sfackler/r2d2/issues/99>
async fn try_many_times<F, Fut, T>(f: F) -> std::result::Result<(), T>
where
F: Fn() -> Fut,
Fut: Future<Output = std::result::Result<(), T>>,
{
let mut counter = 0;
loop {
counter += 1;
if let Err(err) = f().await {
if counter > 60 {
return Err(err);
}
// Wait 1 second and try again.
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
} else {
break;
}
}
Ok(())
}
/// Configuration of a single account.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
struct AccountConfig {

View File

@@ -196,12 +196,12 @@ impl Sql {
/// Creates a new connection pool.
fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
let mut connections = [None, None, None]; // array from iter is not stable yet
for c in &mut connections {
let mut connections = Vec::new();
for _ in 0..3 {
let connection = new_connection(dbfile, &passphrase)?;
*c = Some(connection);
connections.push(connection);
}
let pool = Pool::new(connections);
Ok(pool)
}

View File

@@ -7,13 +7,10 @@ use std::sync::{Arc, Weak};
use parking_lot::{Condvar, Mutex};
use rusqlite::Connection;
/// Total size of the connection pool.
pub const POOL_SIZE: usize = 3;
/// Inner connection pool.
struct InnerPool {
/// Available connections.
connections: Mutex<[Option<Connection>; POOL_SIZE]>,
connections: Mutex<Vec<Connection>>,
/// Conditional variable to notify about added connections.
///
@@ -27,19 +24,7 @@ impl InnerPool {
/// The connection could be new or returned back.
fn put(&self, connection: Connection) {
let mut connections = self.connections.lock();
let mut found_one = false;
for c in &mut *connections {
if c.is_none() {
*c = Some(connection);
found_one = true;
break;
}
}
assert!(
found_one,
"attempted to put more connections than available"
);
connections.push(connection);
drop(connections);
self.cond.notify_one();
}
@@ -59,6 +44,7 @@ impl Drop for PooledConnection {
// Put the connection back unless the pool is already dropped.
if let Some(pool) = self.pool.upgrade() {
if let Some(conn) = self.conn.take() {
conn.release_memory().ok();
pool.put(conn);
}
}
@@ -94,7 +80,7 @@ impl fmt::Debug for Pool {
impl Pool {
/// Creates a new connection pool.
pub fn new(connections: [Option<Connection>; POOL_SIZE]) -> Self {
pub fn new(connections: Vec<Connection>) -> Self {
let inner = Arc::new(InnerPool {
connections: Mutex::new(connections),
cond: Condvar::new(),
@@ -107,7 +93,7 @@ impl Pool {
let mut connections = self.inner.connections.lock();
loop {
if let Some(conn) = get_next(&mut *connections) {
if let Some(conn) = connections.pop() {
return PooledConnection {
pool: Arc::downgrade(&self.inner),
conn: Some(conn),
@@ -118,16 +104,3 @@ impl Pool {
}
}
}
/// Returns the first available connection.
///
/// `None` if no connection is availble.
fn get_next(connections: &mut [Option<Connection>; POOL_SIZE]) -> Option<Connection> {
for c in &mut *connections {
if c.is_some() {
return c.take();
}
}
None
}