diff --git a/Cargo.lock b/Cargo.lock index b18e6b955..7da76f42e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -907,6 +907,7 @@ dependencies = [ "bitflags", "chrono", "criterion", + "crossbeam-queue", "deltachat_derive", "email", "encoded-words", @@ -928,7 +929,6 @@ dependencies = [ "num-traits", "num_cpus", "once_cell", - "parking_lot", "percent-encoding", "pgp", "pretty_env_logger", diff --git a/Cargo.toml b/Cargo.toml index 916aa8eaa..5f928ff98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ backtrace = "0.3" base64 = "0.21" bitflags = "1.3" chrono = { version = "0.4", default-features=false, features = ["clock", "std"] } +crossbeam-queue = "0.3" email = { git = "https://github.com/deltachat/rust-email", branch = "master" } encoded-words = { git = "https://github.com/async-email/encoded-words", branch = "master" } escaper = "0.1" @@ -54,7 +55,6 @@ num_cpus = "1.15" num-derive = "0.3" num-traits = "0.2" once_cell = "1.17.0" -parking_lot = "0.12" percent-encoding = "2.2" pgp = { version = "0.9", default-features = false } pretty_env_logger = { version = "0.4", optional = true } diff --git a/src/sql.rs b/src/sql.rs index 718fa5cee..32f1cdff2 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -363,7 +363,7 @@ impl Sql { pub(crate) async fn get_conn(&self) -> Result { let lock = self.pool.read().await; let pool = lock.as_ref().context("no SQL connection")?; - let conn = pool.get(); + let conn = pool.get().await; Ok(conn) } diff --git a/src/sql/pool.rs b/src/sql/pool.rs index 477b7d57f..8e8f9602c 100644 --- a/src/sql/pool.rs +++ b/src/sql/pool.rs @@ -4,18 +4,19 @@ use std::fmt; use std::ops::{Deref, DerefMut}; use std::sync::{Arc, Weak}; -use parking_lot::{Condvar, Mutex}; +use crossbeam_queue::ArrayQueue; use rusqlite::Connection; +use tokio::sync::Notify; /// Inner connection pool. struct InnerPool { /// Available connections. - connections: Mutex>, + connections: ArrayQueue, - /// Conditional variable to notify about added connections. + /// Notifies about added connections. /// /// Used to wait for available connection when the pool is empty. - cond: Condvar, + notify: Notify, } impl InnerPool { @@ -23,10 +24,8 @@ impl InnerPool { /// /// The connection could be new or returned back. fn put(&self, connection: Connection) { - let mut connections = self.connections.lock(); - connections.push(connection); - drop(connections); - self.cond.notify_one(); + self.connections.force_push(connection); + self.notify.notify_one(); } } @@ -81,25 +80,26 @@ impl Pool { /// Creates a new connection pool. pub fn new(connections: Vec) -> Self { let inner = Arc::new(InnerPool { - connections: Mutex::new(connections), - cond: Condvar::new(), + connections: ArrayQueue::new(connections.len()), + notify: Notify::new(), }); + for connection in connections { + inner.connections.force_push(connection); + } Pool { inner } } /// Retrieves a connection from the pool. - pub fn get(&self) -> PooledConnection { - let mut connections = self.inner.connections.lock(); - + pub async fn get(&self) -> PooledConnection { loop { - if let Some(conn) = connections.pop() { + if let Some(conn) = self.inner.connections.pop() { return PooledConnection { pool: Arc::downgrade(&self.inner), conn: Some(conn), }; } - self.inner.cond.wait(&mut connections); + self.inner.notify.notified().await; } } }