From 10066b2bc79ec0a902e41280e85048a8f59d569d Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 19 Feb 2023 17:04:18 +0000 Subject: [PATCH] sql: use semaphore to limit access to the connection pool This ensures that if multiple connections are returned to the pool at the same time, waiters get them in the order they were placed in the queue. --- CHANGELOG.md | 1 + src/sql.rs | 2 +- src/sql/pool.rs | 39 +++++++++++++++++++++------------------ 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5eabb8177..a3b24e038 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - deltachat-rpc-server: do not block stdin while processing the request. #4041 deltachat-rpc-server now reads the next request as soon as previous request handler is spawned. - enable `auto_vacuum` on all SQL connections #2955 +- use semaphore for connection pool #4061 ### API-Changes diff --git a/src/sql.rs b/src/sql.rs index f6fe25418..6d6a53fec 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -338,7 +338,7 @@ impl Sql { pub(crate) async fn get_conn(&self) -> Result { let lock = self.pool.read().await; let pool = lock.as_ref().context("no SQL connection")?; - let conn = pool.get().await; + let conn = pool.get().await?; Ok(conn) } diff --git a/src/sql/pool.rs b/src/sql/pool.rs index 133c72ae2..fc7bf05bf 100644 --- a/src/sql/pool.rs +++ b/src/sql/pool.rs @@ -3,9 +3,10 @@ use std::ops::{Deref, DerefMut}; use std::sync::{Arc, Weak}; +use anyhow::{Context, Result}; use crossbeam_queue::ArrayQueue; use rusqlite::Connection; -use tokio::sync::Notify; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; /// Inner connection pool. #[derive(Debug)] @@ -13,10 +14,8 @@ struct InnerPool { /// Available connections. connections: ArrayQueue, - /// Notifies about added connections. - /// - /// Used to wait for available connection when the pool is empty. - notify: Notify, + /// Counts the number of available connections. + semaphore: Arc, } impl InnerPool { @@ -25,7 +24,6 @@ impl InnerPool { /// The connection could be new or returned back. fn put(&self, connection: Connection) { self.connections.force_push(connection); - self.notify.notify_one(); } } @@ -36,6 +34,9 @@ pub struct PooledConnection { /// Only `None` right after moving the connection back to the pool. conn: Option, + + /// Semaphore permit, dropped after returning the connection to the pool. + _permit: OwnedSemaphorePermit, } impl Drop for PooledConnection { @@ -75,7 +76,7 @@ impl Pool { pub fn new(connections: Vec) -> Self { let inner = Arc::new(InnerPool { connections: ArrayQueue::new(connections.len()), - notify: Notify::new(), + semaphore: Arc::new(Semaphore::new(connections.len())), }); for connection in connections { inner.connections.force_push(connection); @@ -84,16 +85,18 @@ impl Pool { } /// Retrieves a connection from the pool. - pub async fn get(&self) -> PooledConnection { - loop { - if let Some(conn) = self.inner.connections.pop() { - return PooledConnection { - pool: Arc::downgrade(&self.inner), - conn: Some(conn), - }; - } - - self.inner.notify.notified().await; - } + pub async fn get(&self) -> Result { + let permit = self.inner.semaphore.clone().acquire_owned().await?; + let conn = self + .inner + .connections + .pop() + .context("got a permit when there are no connections in the pool")?; + let conn = PooledConnection { + pool: Arc::downgrade(&self.inner), + conn: Some(conn), + _permit: permit, + }; + Ok(conn) } }