mirror of
https://github.com/chatmail/core.git
synced 2026-04-17 21:46:35 +03:00
Replace r2d2 with an own connection pool
New connection pool does not use threads and does not remove idle connections or create new connections at runtime.
This commit is contained in:
@@ -1,73 +0,0 @@
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use r2d2::ManageConnection;
|
||||
use rusqlite::{Connection, Error, OpenFlags};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionManager {
|
||||
/// Database file path.
|
||||
path: PathBuf,
|
||||
|
||||
/// SQLite open flags.
|
||||
flags: rusqlite::OpenFlags,
|
||||
|
||||
/// SQLCipher database passphrase.
|
||||
/// Empty string if database is not encrypted.
|
||||
passphrase: String,
|
||||
}
|
||||
|
||||
impl ConnectionManager {
|
||||
/// Creates new connection manager.
|
||||
pub fn new(path: PathBuf, passphrase: String) -> Self {
|
||||
let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX;
|
||||
flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE);
|
||||
flags.insert(OpenFlags::SQLITE_OPEN_CREATE);
|
||||
|
||||
Self {
|
||||
path,
|
||||
flags,
|
||||
passphrase,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ManageConnection for ConnectionManager {
|
||||
type Connection = Connection;
|
||||
type Error = Error;
|
||||
|
||||
fn connect(&self) -> Result<Connection, Error> {
|
||||
let conn = Connection::open_with_flags(&self.path, self.flags)?;
|
||||
conn.execute_batch(&format!(
|
||||
"PRAGMA cipher_memory_security = OFF; -- Too slow on Android
|
||||
PRAGMA secure_delete=on;
|
||||
PRAGMA busy_timeout = {};
|
||||
PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android
|
||||
PRAGMA foreign_keys=on;
|
||||
",
|
||||
Duration::from_secs(60).as_millis()
|
||||
))?;
|
||||
conn.pragma_update(None, "key", &self.passphrase)?;
|
||||
// Try to enable auto_vacuum. This will only be
|
||||
// applied if the database is new or after successful
|
||||
// VACUUM, which usually happens before backup export.
|
||||
// When auto_vacuum is INCREMENTAL, it is possible to
|
||||
// use PRAGMA incremental_vacuum to return unused
|
||||
// database pages to the filesystem.
|
||||
conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
|
||||
|
||||
conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
|
||||
// Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
|
||||
conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
|
||||
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
fn is_valid(&self, _conn: &mut Connection) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn has_broken(&self, _conn: &mut Connection) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
105
src/sql/pool.rs
Normal file
105
src/sql/pool.rs
Normal file
@@ -0,0 +1,105 @@
|
||||
//! Connection pool.
|
||||
|
||||
use std::fmt;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
use parking_lot::{Condvar, Mutex};
|
||||
use rusqlite::Connection;
|
||||
|
||||
/// Inner connection pool.
|
||||
struct InnerPool {
|
||||
/// Available connections.
|
||||
connections: Mutex<Vec<Connection>>,
|
||||
|
||||
/// Conditional variable to notify about added connections.
|
||||
///
|
||||
/// Used to wait for available connection when the pool is empty.
|
||||
cond: Condvar,
|
||||
}
|
||||
|
||||
impl InnerPool {
|
||||
/// Puts a connection into the pool.
|
||||
///
|
||||
/// The connection could be new or returned back.
|
||||
fn put(&self, connection: Connection) {
|
||||
let mut connections = self.connections.lock();
|
||||
connections.push(connection);
|
||||
drop(connections);
|
||||
self.cond.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
/// Pooled connection.
|
||||
pub struct PooledConnection {
|
||||
/// Weak reference to the pool used to return the connection back.
|
||||
pool: Weak<InnerPool>,
|
||||
|
||||
/// Only `None` right after moving the connection back to the pool.
|
||||
conn: Option<Connection>,
|
||||
}
|
||||
|
||||
impl Drop for PooledConnection {
|
||||
fn drop(&mut self) {
|
||||
// Put the connection back unless the pool is already dropped.
|
||||
if let Some(pool) = self.pool.upgrade() {
|
||||
if let Some(conn) = self.conn.take() {
|
||||
pool.put(conn);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for PooledConnection {
|
||||
type Target = Connection;
|
||||
|
||||
fn deref(&self) -> &Connection {
|
||||
self.conn.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for PooledConnection {
|
||||
fn deref_mut(&mut self) -> &mut Connection {
|
||||
self.conn.as_mut().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
/// Connection pool.
|
||||
#[derive(Clone)]
|
||||
pub struct Pool {
|
||||
/// Reference to the actual connection pool.
|
||||
inner: Arc<InnerPool>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Pool {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(fmt, "Pool")
|
||||
}
|
||||
}
|
||||
|
||||
impl Pool {
|
||||
/// Creates a new connection pool.
|
||||
pub fn new(connections: Vec<Connection>) -> Self {
|
||||
let inner = Arc::new(InnerPool {
|
||||
connections: Mutex::new(connections),
|
||||
cond: Condvar::new(),
|
||||
});
|
||||
Pool { inner }
|
||||
}
|
||||
|
||||
/// Retrieves a connection from the pool.
|
||||
pub fn get(&self) -> PooledConnection {
|
||||
let mut connections = self.inner.connections.lock();
|
||||
|
||||
loop {
|
||||
if let Some(conn) = connections.pop() {
|
||||
return PooledConnection {
|
||||
pool: Arc::downgrade(&self.inner),
|
||||
conn: Some(conn),
|
||||
};
|
||||
}
|
||||
|
||||
self.inner.cond.wait(&mut connections);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user