Reimplement connection pool on top of crossbeam

This commit is contained in:
link2xt
2023-02-19 01:08:17 +00:00
parent e88f21c010
commit f2b05ccc29
4 changed files with 18 additions and 18 deletions

2
Cargo.lock generated
View File

@@ -907,6 +907,7 @@ dependencies = [
"bitflags", "bitflags",
"chrono", "chrono",
"criterion", "criterion",
"crossbeam-queue",
"deltachat_derive", "deltachat_derive",
"email", "email",
"encoded-words", "encoded-words",
@@ -928,7 +929,6 @@ dependencies = [
"num-traits", "num-traits",
"num_cpus", "num_cpus",
"once_cell", "once_cell",
"parking_lot",
"percent-encoding", "percent-encoding",
"pgp", "pgp",
"pretty_env_logger", "pretty_env_logger",

View File

@@ -39,6 +39,7 @@ backtrace = "0.3"
base64 = "0.21" base64 = "0.21"
bitflags = "1.3" bitflags = "1.3"
chrono = { version = "0.4", default-features=false, features = ["clock", "std"] } chrono = { version = "0.4", default-features=false, features = ["clock", "std"] }
crossbeam-queue = "0.3"
email = { git = "https://github.com/deltachat/rust-email", branch = "master" } email = { git = "https://github.com/deltachat/rust-email", branch = "master" }
encoded-words = { git = "https://github.com/async-email/encoded-words", branch = "master" } encoded-words = { git = "https://github.com/async-email/encoded-words", branch = "master" }
escaper = "0.1" escaper = "0.1"
@@ -54,7 +55,6 @@ num_cpus = "1.15"
num-derive = "0.3" num-derive = "0.3"
num-traits = "0.2" num-traits = "0.2"
once_cell = "1.17.0" once_cell = "1.17.0"
parking_lot = "0.12"
percent-encoding = "2.2" percent-encoding = "2.2"
pgp = { version = "0.9", default-features = false } pgp = { version = "0.9", default-features = false }
pretty_env_logger = { version = "0.4", optional = true } pretty_env_logger = { version = "0.4", optional = true }

View File

@@ -363,7 +363,7 @@ impl Sql {
pub(crate) async fn get_conn(&self) -> Result<PooledConnection> { pub(crate) async fn get_conn(&self) -> Result<PooledConnection> {
let lock = self.pool.read().await; let lock = self.pool.read().await;
let pool = lock.as_ref().context("no SQL connection")?; let pool = lock.as_ref().context("no SQL connection")?;
let conn = pool.get(); let conn = pool.get().await;
Ok(conn) Ok(conn)
} }

View File

@@ -4,18 +4,19 @@ use std::fmt;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use parking_lot::{Condvar, Mutex}; use crossbeam_queue::ArrayQueue;
use rusqlite::Connection; use rusqlite::Connection;
use tokio::sync::Notify;
/// Inner connection pool. /// Inner connection pool.
struct InnerPool { struct InnerPool {
/// Available connections. /// Available connections.
connections: Mutex<Vec<Connection>>, connections: ArrayQueue<Connection>,
/// Conditional variable to notify about added connections. /// Notifies about added connections.
/// ///
/// Used to wait for available connection when the pool is empty. /// Used to wait for available connection when the pool is empty.
cond: Condvar, notify: Notify,
} }
impl InnerPool { impl InnerPool {
@@ -23,10 +24,8 @@ impl InnerPool {
/// ///
/// The connection could be new or returned back. /// The connection could be new or returned back.
fn put(&self, connection: Connection) { fn put(&self, connection: Connection) {
let mut connections = self.connections.lock(); self.connections.force_push(connection);
connections.push(connection); self.notify.notify_one();
drop(connections);
self.cond.notify_one();
} }
} }
@@ -81,25 +80,26 @@ impl Pool {
/// Creates a new connection pool. /// Creates a new connection pool.
pub fn new(connections: Vec<Connection>) -> Self { pub fn new(connections: Vec<Connection>) -> Self {
let inner = Arc::new(InnerPool { let inner = Arc::new(InnerPool {
connections: Mutex::new(connections), connections: ArrayQueue::new(connections.len()),
cond: Condvar::new(), notify: Notify::new(),
}); });
for connection in connections {
inner.connections.force_push(connection);
}
Pool { inner } Pool { inner }
} }
/// Retrieves a connection from the pool. /// Retrieves a connection from the pool.
pub fn get(&self) -> PooledConnection { pub async fn get(&self) -> PooledConnection {
let mut connections = self.inner.connections.lock();
loop { loop {
if let Some(conn) = connections.pop() { if let Some(conn) = self.inner.connections.pop() {
return PooledConnection { return PooledConnection {
pool: Arc::downgrade(&self.inner), pool: Arc::downgrade(&self.inner),
conn: Some(conn), conn: Some(conn),
}; };
} }
self.inner.cond.wait(&mut connections); self.inner.notify.notified().await;
} }
} }
} }