diff --git a/CHANGELOG.md b/CHANGELOG.md index 5eabb8177..a3b24e038 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - deltachat-rpc-server: do not block stdin while processing the request. #4041 deltachat-rpc-server now reads the next request as soon as previous request handler is spawned. - enable `auto_vacuum` on all SQL connections #2955 +- use semaphore for connection pool #4061 ### API-Changes diff --git a/src/sql.rs b/src/sql.rs index f6fe25418..6d6a53fec 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -338,7 +338,7 @@ impl Sql { pub(crate) async fn get_conn(&self) -> Result { let lock = self.pool.read().await; let pool = lock.as_ref().context("no SQL connection")?; - let conn = pool.get().await; + let conn = pool.get().await?; Ok(conn) } diff --git a/src/sql/pool.rs b/src/sql/pool.rs index 133c72ae2..fc7bf05bf 100644 --- a/src/sql/pool.rs +++ b/src/sql/pool.rs @@ -3,9 +3,10 @@ use std::ops::{Deref, DerefMut}; use std::sync::{Arc, Weak}; +use anyhow::{Context, Result}; use crossbeam_queue::ArrayQueue; use rusqlite::Connection; -use tokio::sync::Notify; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; /// Inner connection pool. #[derive(Debug)] @@ -13,10 +14,8 @@ struct InnerPool { /// Available connections. connections: ArrayQueue, - /// Notifies about added connections. - /// - /// Used to wait for available connection when the pool is empty. - notify: Notify, + /// Counts the number of available connections. + semaphore: Arc, } impl InnerPool { @@ -25,7 +24,6 @@ impl InnerPool { /// The connection could be new or returned back. fn put(&self, connection: Connection) { self.connections.force_push(connection); - self.notify.notify_one(); } } @@ -36,6 +34,9 @@ pub struct PooledConnection { /// Only `None` right after moving the connection back to the pool. conn: Option, + + /// Semaphore permit, dropped after returning the connection to the pool. + _permit: OwnedSemaphorePermit, } impl Drop for PooledConnection { @@ -75,7 +76,7 @@ impl Pool { pub fn new(connections: Vec) -> Self { let inner = Arc::new(InnerPool { connections: ArrayQueue::new(connections.len()), - notify: Notify::new(), + semaphore: Arc::new(Semaphore::new(connections.len())), }); for connection in connections { inner.connections.force_push(connection); @@ -84,16 +85,18 @@ impl Pool { } /// Retrieves a connection from the pool. - pub async fn get(&self) -> PooledConnection { - loop { - if let Some(conn) = self.inner.connections.pop() { - return PooledConnection { - pool: Arc::downgrade(&self.inner), - conn: Some(conn), - }; - } - - self.inner.notify.notified().await; - } + pub async fn get(&self) -> Result { + let permit = self.inner.semaphore.clone().acquire_owned().await?; + let conn = self + .inner + .connections + .pop() + .context("got a permit when there are no connections in the pool")?; + let conn = PooledConnection { + pool: Arc::downgrade(&self.inner), + conn: Some(conn), + _permit: permit, + }; + Ok(conn) } }