blocking for with_conn

This commit is contained in:
dignifiedquire
2020-03-08 18:34:00 +01:00
parent 818e921192
commit 4c9d049b10
5 changed files with 130 additions and 108 deletions

View File

@@ -1225,11 +1225,11 @@ pub(crate) async fn create_or_lookup_by_contact_id(
} }
let contact = Contact::load_from_db(context, contact_id).await?; let contact = Contact::load_from_db(context, contact_id).await?;
let chat_name = contact.get_display_name(); let chat_name = contact.get_display_name().to_string();
context context
.sql .sql
.with_conn(|mut conn| { .with_conn(move |mut conn| {
let conn2 = &mut conn; let conn2 = &mut conn;
let tx = conn2.transaction()?; let tx = conn2.transaction()?;
tx.execute( tx.execute(

View File

@@ -586,13 +586,32 @@ async fn add_parts(
// (eg. one per attachment)) // (eg. one per attachment))
let icnt = mime_parser.parts.len(); let icnt = mime_parser.parts.len();
context let subject = mime_parser.get_subject().unwrap_or_default();
.sql
.with_conn(|mut conn| { let mut parts = std::mem::replace(&mut mime_parser.parts, Vec::new());
let subject = mime_parser.get_subject().unwrap_or_default(); let server_folder = server_folder.as_ref().to_string();
let mut txt_raw = None; let location_kml_is = mime_parser.location_kml.is_some();
let is_system_message = mime_parser.is_system_message;
let mime_headers = if save_mime_headers {
Some(String::from_utf8_lossy(imf_raw).to_string())
} else {
None
};
let sent_timestamp = *sent_timestamp;
let is_hidden = *hidden;
let chat_id = *chat_id;
// TODO: can this clone be avoided?
let rfc724_mid = rfc724_mid.to_string();
let (new_parts, ids, is_hidden) = context
.sql
.with_conn(move |mut conn| {
let mut ids = Vec::with_capacity(parts.len());
for part in &mut parts {
let mut txt_raw = "".to_string();
let mut is_hidden = is_hidden;
for part in mime_parser.parts.iter_mut() {
let mut stmt = conn.prepare_cached( let mut stmt = conn.prepare_cached(
"INSERT INTO msgs \ "INSERT INTO msgs \
(rfc724_mid, server_folder, server_uid, chat_id, from_id, to_id, timestamp, \ (rfc724_mid, server_folder, server_uid, chat_id, from_id, to_id, timestamp, \
@@ -601,11 +620,9 @@ async fn add_parts(
VALUES (?,?,?,?,?,?, ?,?,?,?,?,?, ?,?,?,?,?,?, ?,?);", VALUES (?,?,?,?,?,?, ?,?,?,?,?,?, ?,?,?,?,?,?, ?,?);",
)?; )?;
if mime_parser.location_kml.is_some() if location_kml_is && icnt == 1 && (part.msg == "-location-" || part.msg.is_empty())
&& icnt == 1
&& (part.msg == "-location-" || part.msg.is_empty())
{ {
*hidden = true; is_hidden = true;
if state == MessageState::InFresh { if state == MessageState::InFresh {
state = MessageState::InNoticed; state = MessageState::InNoticed;
} }
@@ -613,57 +630,59 @@ async fn add_parts(
if part.typ == Viewtype::Text { if part.typ == Viewtype::Text {
let msg_raw = part.msg_raw.as_ref().cloned().unwrap_or_default(); let msg_raw = part.msg_raw.as_ref().cloned().unwrap_or_default();
txt_raw = Some(format!("{}\n\n{}", subject, msg_raw)); txt_raw = format!("{}\n\n{}", subject, msg_raw);
} }
if mime_parser.is_system_message != SystemMessage::Unknown { if is_system_message != SystemMessage::Unknown {
part.param part.param.set_int(Param::Cmd, is_system_message as i32);
.set_int(Param::Cmd, mime_parser.is_system_message as i32);
} }
stmt.execute(paramsv![ stmt.execute(paramsv![
rfc724_mid, rfc724_mid,
server_folder.as_ref().to_string(), server_folder,
server_uid as i32, server_uid as i32,
*chat_id, chat_id,
from_id as i32, from_id as i32,
to_id as i32, to_id as i32,
sort_timestamp, sort_timestamp,
*sent_timestamp, sent_timestamp,
rcvd_timestamp, rcvd_timestamp,
part.typ, part.typ,
state, state,
msgrmsg, msgrmsg,
part.msg, part.msg,
// txt_raw might contain invalid utf8 // txt_raw might contain invalid utf8
txt_raw.unwrap_or_default(), txt_raw,
part.param.to_string(), part.param.to_string(),
part.bytes as isize, part.bytes as isize,
*hidden, is_hidden,
if save_mime_headers { mime_headers,
Some(String::from_utf8_lossy(imf_raw))
} else {
None
},
mime_in_reply_to, mime_in_reply_to,
mime_references, mime_references,
])?; ])?;
txt_raw = None;
// This is okay, as we use a cached prepared statement.
drop(stmt); drop(stmt);
let row_id = ids.push(MsgId::new(crate::sql::get_rowid(
crate::sql::get_rowid(context, &mut conn, "msgs", "rfc724_mid", &rfc724_mid)?; &mut conn,
*insert_msg_id = MsgId::new(row_id); "msgs",
created_db_entries.push((*chat_id, *insert_msg_id)); "rfc724_mid",
&rfc724_mid,
)?));
} }
Ok(()) Ok((parts, ids, is_hidden))
}) })
.await?; .await?;
if let Some(id) = ids.iter().last() {
*insert_msg_id = *id;
}
*hidden = is_hidden;
created_db_entries.extend(ids.iter().map(|id| (chat_id, *id)));
mime_parser.parts = new_parts;
info!( info!(
context, context,
"Message has {} parts and is assigned to chat #{}.", icnt, *chat_id, "Message has {} parts and is assigned to chat #{}.", icnt, chat_id,
); );
// check event to send // check event to send

View File

@@ -521,13 +521,15 @@ pub async fn save(
) -> Result<u32, Error> { ) -> Result<u32, Error> {
ensure!(!chat_id.is_special(), "Invalid chat id"); ensure!(!chat_id.is_special(), "Invalid chat id");
let newest_location_id = context let mut newest_timestamp = 0;
.sql let mut newest_location_id = 0;
.with_conn(|mut conn| {
let mut newest_timestamp = 0;
let mut newest_location_id = 0;
for location in locations { for location in locations {
// TODO: can this clone be avoided?
let location = location.clone();
context
.sql
.with_conn(move |mut conn| {
let mut stmt_test = conn let mut stmt_test = conn
.prepare_cached("SELECT id FROM locations WHERE timestamp=? AND from_id=?")?; .prepare_cached("SELECT id FROM locations WHERE timestamp=? AND from_id=?")?;
let mut stmt_insert = conn.prepare_cached( let mut stmt_insert = conn.prepare_cached(
@@ -555,7 +557,6 @@ pub async fn save(
drop(stmt_insert); drop(stmt_insert);
newest_timestamp = location.timestamp; newest_timestamp = location.timestamp;
newest_location_id = crate::sql::get_rowid2( newest_location_id = crate::sql::get_rowid2(
context,
&mut conn, &mut conn,
"locations", "locations",
"timestamp", "timestamp",
@@ -565,10 +566,11 @@ pub async fn save(
)?; )?;
} }
} }
} Ok(())
Ok(newest_location_id) })
}) .await?;
.await?; }
Ok(newest_location_id) Ok(newest_location_id)
} }
@@ -580,7 +582,7 @@ pub(crate) async fn job_maybe_send_locations(context: &Context, _job: &Job) -> j
" ----------------- MAYBE_SEND_LOCATIONS -------------- ", " ----------------- MAYBE_SEND_LOCATIONS -------------- ",
); );
if let Ok(ref rows) = context let rows = context
.sql .sql
.query_map( .query_map(
"SELECT id, locations_send_begin, locations_last_sent \ "SELECT id, locations_send_begin, locations_last_sent \
@@ -606,11 +608,14 @@ pub(crate) async fn job_maybe_send_locations(context: &Context, _job: &Job) -> j
.map_err(Into::into) .map_err(Into::into)
}, },
) )
.await .await;
{
if rows.is_ok() {
let msgs = context let msgs = context
.sql .sql
.with_conn(|conn| { .with_conn(move |conn| {
let rows = rows.unwrap();
let mut stmt_locations = conn.prepare_cached( let mut stmt_locations = conn.prepare_cached(
"SELECT id \ "SELECT id \
FROM locations \ FROM locations \
@@ -621,37 +626,34 @@ pub(crate) async fn job_maybe_send_locations(context: &Context, _job: &Job) -> j
ORDER BY timestamp;", ORDER BY timestamp;",
)?; )?;
let msgs = rows let mut msgs = Vec::new();
.iter() for (chat_id, locations_send_begin, locations_last_sent) in &rows {
.filter_map(|(chat_id, locations_send_begin, locations_last_sent)| { if !stmt_locations
if !stmt_locations .exists(paramsv![
.exists(paramsv![ DC_CONTACT_ID_SELF,
DC_CONTACT_ID_SELF, *locations_send_begin,
*locations_send_begin, *locations_last_sent,
*locations_last_sent, ])
]) .unwrap_or_default()
.unwrap_or_default() {
{ // if there is no new location, there's nothing to send.
// if there is no new location, there's nothing to send. // however, maybe we want to bypass this test eg. 15 minutes
// however, maybe we want to bypass this test eg. 15 minutes } else {
None // pending locations are attached automatically to every message,
} else { // so also to this empty text message.
// pending locations are attached automatically to every message, // DC_CMD_LOCATION is only needed to create a nicer subject.
// so also to this empty text message. //
// DC_CMD_LOCATION is only needed to create a nicer subject. // for optimisation and to avoid flooding the sending queue,
// // we could sending these messages only if we're really online.
// for optimisation and to avoid flooding the sending queue, // the easiest way to determine this, is to check for an empty message queue.
// we could sending these messages only if we're really online. // (might not be 100%, however, as positions are sent combined later
// the easiest way to determine this, is to check for an empty message queue. // and dc_set_location() is typically called periodically, this is ok)
// (might not be 100%, however, as positions are sent combined later let mut msg = Message::new(Viewtype::Text);
// and dc_set_location() is typically called periodically, this is ok) msg.hidden = true;
let mut msg = Message::new(Viewtype::Text); msg.param.set_cmd(SystemMessage::LocationOnly);
msg.hidden = true; msgs.push((*chat_id, msg));
msg.param.set_cmd(SystemMessage::LocationOnly); }
Some((chat_id, msg)) }
}
})
.collect::<Vec<_>>();
Ok(msgs) Ok(msgs)
}) })
@@ -660,11 +662,12 @@ pub(crate) async fn job_maybe_send_locations(context: &Context, _job: &Job) -> j
for (chat_id, mut msg) in msgs.into_iter() { for (chat_id, mut msg) in msgs.into_iter() {
// TODO: better error handling // TODO: better error handling
chat::send_msg(context, *chat_id, &mut msg) chat::send_msg(context, chat_id, &mut msg)
.await .await
.unwrap_or_default(); .unwrap_or_default();
} }
} }
if continue_streaming { if continue_streaming {
schedule_maybe_send_locations(context, true).await; schedule_maybe_send_locations(context, true).await;
} }

View File

@@ -1026,14 +1026,14 @@ async fn delete_poi_location(context: &Context, location_id: u32) -> bool {
.is_ok() .is_ok()
} }
pub async fn markseen_msgs(context: &Context, msg_ids: &[MsgId]) -> bool { pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> bool {
if msg_ids.is_empty() { if msg_ids.is_empty() {
return false; return false;
} }
let msgs = context let msgs = context
.sql .sql
.with_conn(|conn| { .with_conn(move |conn| {
let mut stmt = conn.prepare_cached(concat!( let mut stmt = conn.prepare_cached(concat!(
"SELECT", "SELECT",
" m.state AS state,", " m.state AS state,",
@@ -1043,8 +1043,8 @@ pub async fn markseen_msgs(context: &Context, msg_ids: &[MsgId]) -> bool {
))?; ))?;
let mut msgs = Vec::with_capacity(msg_ids.len()); let mut msgs = Vec::with_capacity(msg_ids.len());
for id in msg_ids.iter() { for id in msg_ids.into_iter() {
let query_res = stmt.query_row(paramsv![*id], |row| { let query_res = stmt.query_row(paramsv![id], |row| {
Ok(( Ok((
row.get::<_, MessageState>("state")?, row.get::<_, MessageState>("state")?,
row.get::<_, Option<Blocked>>("blocked")? row.get::<_, Option<Blocked>>("blocked")?
@@ -1070,7 +1070,7 @@ pub async fn markseen_msgs(context: &Context, msg_ids: &[MsgId]) -> bool {
for (id, curr_state, curr_blocked) in msgs.into_iter() { for (id, curr_state, curr_blocked) in msgs.into_iter() {
if curr_blocked == Blocked::Not { if curr_blocked == Blocked::Not {
if curr_state == MessageState::InFresh || curr_state == MessageState::InNoticed { if curr_state == MessageState::InFresh || curr_state == MessageState::InNoticed {
update_msg_state(context, *id, MessageState::InSeen).await; update_msg_state(context, id, MessageState::InSeen).await;
info!(context, "Seen message {}.", id); info!(context, "Seen message {}.", id);
job::add( job::add(
@@ -1084,7 +1084,7 @@ pub async fn markseen_msgs(context: &Context, msg_ids: &[MsgId]) -> bool {
send_event = true; send_event = true;
} }
} else if curr_state == MessageState::InFresh { } else if curr_state == MessageState::InFresh {
update_msg_state(context, *id, MessageState::InNoticed).await; update_msg_state(context, id, MessageState::InNoticed).await;
send_event = true; send_event = true;
} }
} }
@@ -1110,16 +1110,16 @@ pub async fn update_msg_state(context: &Context, msg_id: MsgId, state: MessageSt
.is_ok() .is_ok()
} }
pub async fn star_msgs(context: &Context, msg_ids: &[MsgId], star: bool) -> bool { pub async fn star_msgs(context: &Context, msg_ids: Vec<MsgId>, star: bool) -> bool {
if msg_ids.is_empty() { if msg_ids.is_empty() {
return false; return false;
} }
context context
.sql .sql
.with_conn(|conn| { .with_conn(move |conn| {
let mut stmt = conn.prepare("UPDATE msgs SET starred=? WHERE id=?;")?; let mut stmt = conn.prepare("UPDATE msgs SET starred=? WHERE id=?;")?;
for msg_id in msg_ids.iter() { for msg_id in msg_ids.into_iter() {
stmt.execute(paramsv![star as i32, *msg_id])?; stmt.execute(paramsv![star as i32, msg_id])?;
} }
Ok(()) Ok(())
}) })

View File

@@ -162,19 +162,20 @@ impl Sql {
Ok(conn) Ok(conn)
} }
pub async fn with_conn<G, H>(&self, mut g: G) -> Result<H> pub async fn with_conn<G, H>(&self, g: G) -> Result<H>
where where
G: FnMut(r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>) -> Result<H>, H: Send + 'static,
G: Send
+ 'static
+ FnOnce(r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>) -> Result<H>,
{ {
let lock = self.pool.read().await; let lock = self.pool.read().await;
let pool = lock.as_ref().ok_or_else(|| Error::SqlNoConnection)?; let pool = lock.as_ref().ok_or_else(|| Error::SqlNoConnection)?;
let conn = pool.get()?;
let res = async_std::task::spawn_blocking(move || g(conn)).await;
self.in_use.remove();
let res = {
let conn = pool.get()?;
let res = g(conn);
self.in_use.remove();
res
};
res res
} }
@@ -234,9 +235,10 @@ impl Sql {
pub async fn table_exists(&self, name: impl AsRef<str>) -> Result<bool> { pub async fn table_exists(&self, name: impl AsRef<str>) -> Result<bool> {
self.start_stmt("table_exists"); self.start_stmt("table_exists");
self.with_conn(|conn| { let name = name.as_ref().to_string();
self.with_conn(move |conn| {
let mut exists = false; let mut exists = false;
conn.pragma(None, "table_info", &name.as_ref().to_string(), |_row| { conn.pragma(None, "table_info", &name, |_row| {
// will only be executed if the info was found // will only be executed if the info was found
exists = true; exists = true;
Ok(()) Ok(())
@@ -422,7 +424,7 @@ impl Sql {
/// eg. if a Message-ID is split into different messages. /// eg. if a Message-ID is split into different messages.
pub async fn get_rowid( pub async fn get_rowid(
&self, &self,
context: &Context, _context: &Context,
table: impl AsRef<str>, table: impl AsRef<str>,
field: impl AsRef<str>, field: impl AsRef<str>,
value: impl AsRef<str>, value: impl AsRef<str>,
@@ -431,7 +433,7 @@ impl Sql {
let res = { let res = {
let mut conn = self.get_conn().await?; let mut conn = self.get_conn().await?;
let res = get_rowid(context, &mut conn, table, field, value); let res = get_rowid(&mut conn, table, field, value);
self.in_use.remove(); self.in_use.remove();
res res
}; };
@@ -441,7 +443,7 @@ impl Sql {
pub async fn get_rowid2( pub async fn get_rowid2(
&self, &self,
context: &Context, _context: &Context,
table: impl AsRef<str>, table: impl AsRef<str>,
field: impl AsRef<str>, field: impl AsRef<str>,
value: i64, value: i64,
@@ -452,7 +454,7 @@ impl Sql {
let res = { let res = {
let mut conn = self.get_conn().await?; let mut conn = self.get_conn().await?;
let res = get_rowid2(context, &mut conn, table, field, value, field2, value2); let res = get_rowid2(&mut conn, table, field, value, field2, value2);
self.in_use.remove(); self.in_use.remove();
res res
}; };
@@ -462,7 +464,6 @@ impl Sql {
} }
pub fn get_rowid( pub fn get_rowid(
_context: &Context,
conn: &mut Connection, conn: &mut Connection,
table: impl AsRef<str>, table: impl AsRef<str>,
field: impl AsRef<str>, field: impl AsRef<str>,
@@ -481,7 +482,6 @@ pub fn get_rowid(
} }
pub fn get_rowid2( pub fn get_rowid2(
_context: &Context,
conn: &mut Connection, conn: &mut Connection,
table: impl AsRef<str>, table: impl AsRef<str>,
field: impl AsRef<str>, field: impl AsRef<str>,