diff --git a/src/chat.rs b/src/chat.rs index af4ef43d4..4b4868b7c 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -1225,11 +1225,11 @@ pub(crate) async fn create_or_lookup_by_contact_id( } let contact = Contact::load_from_db(context, contact_id).await?; - let chat_name = contact.get_display_name(); + let chat_name = contact.get_display_name().to_string(); context .sql - .with_conn(|mut conn| { + .with_conn(move |mut conn| { let conn2 = &mut conn; let tx = conn2.transaction()?; tx.execute( diff --git a/src/dc_receive_imf.rs b/src/dc_receive_imf.rs index 91c4b3f23..688ee5611 100644 --- a/src/dc_receive_imf.rs +++ b/src/dc_receive_imf.rs @@ -586,13 +586,32 @@ async fn add_parts( // (eg. one per attachment)) let icnt = mime_parser.parts.len(); - context - .sql - .with_conn(|mut conn| { - let subject = mime_parser.get_subject().unwrap_or_default(); - let mut txt_raw = None; + let subject = mime_parser.get_subject().unwrap_or_default(); + + let mut parts = std::mem::replace(&mut mime_parser.parts, Vec::new()); + let server_folder = server_folder.as_ref().to_string(); + let location_kml_is = mime_parser.location_kml.is_some(); + let is_system_message = mime_parser.is_system_message; + let mime_headers = if save_mime_headers { + Some(String::from_utf8_lossy(imf_raw).to_string()) + } else { + None + }; + let sent_timestamp = *sent_timestamp; + let is_hidden = *hidden; + let chat_id = *chat_id; + + // TODO: can this clone be avoided? + let rfc724_mid = rfc724_mid.to_string(); + + let (new_parts, ids, is_hidden) = context + .sql + .with_conn(move |mut conn| { + let mut ids = Vec::with_capacity(parts.len()); + for part in &mut parts { + let mut txt_raw = "".to_string(); + let mut is_hidden = is_hidden; - for part in mime_parser.parts.iter_mut() { let mut stmt = conn.prepare_cached( "INSERT INTO msgs \ (rfc724_mid, server_folder, server_uid, chat_id, from_id, to_id, timestamp, \ @@ -601,11 +620,9 @@ async fn add_parts( VALUES (?,?,?,?,?,?, ?,?,?,?,?,?, ?,?,?,?,?,?, ?,?);", )?; - if mime_parser.location_kml.is_some() - && icnt == 1 - && (part.msg == "-location-" || part.msg.is_empty()) + if location_kml_is && icnt == 1 && (part.msg == "-location-" || part.msg.is_empty()) { - *hidden = true; + is_hidden = true; if state == MessageState::InFresh { state = MessageState::InNoticed; } @@ -613,57 +630,59 @@ async fn add_parts( if part.typ == Viewtype::Text { let msg_raw = part.msg_raw.as_ref().cloned().unwrap_or_default(); - txt_raw = Some(format!("{}\n\n{}", subject, msg_raw)); + txt_raw = format!("{}\n\n{}", subject, msg_raw); } - if mime_parser.is_system_message != SystemMessage::Unknown { - part.param - .set_int(Param::Cmd, mime_parser.is_system_message as i32); + if is_system_message != SystemMessage::Unknown { + part.param.set_int(Param::Cmd, is_system_message as i32); } stmt.execute(paramsv![ rfc724_mid, - server_folder.as_ref().to_string(), + server_folder, server_uid as i32, - *chat_id, + chat_id, from_id as i32, to_id as i32, sort_timestamp, - *sent_timestamp, + sent_timestamp, rcvd_timestamp, part.typ, state, msgrmsg, part.msg, // txt_raw might contain invalid utf8 - txt_raw.unwrap_or_default(), + txt_raw, part.param.to_string(), part.bytes as isize, - *hidden, - if save_mime_headers { - Some(String::from_utf8_lossy(imf_raw)) - } else { - None - }, + is_hidden, + mime_headers, mime_in_reply_to, mime_references, ])?; - txt_raw = None; - - // This is okay, as we use a cached prepared statement. drop(stmt); - let row_id = - crate::sql::get_rowid(context, &mut conn, "msgs", "rfc724_mid", &rfc724_mid)?; - *insert_msg_id = MsgId::new(row_id); - created_db_entries.push((*chat_id, *insert_msg_id)); + ids.push(MsgId::new(crate::sql::get_rowid( + &mut conn, + "msgs", + "rfc724_mid", + &rfc724_mid, + )?)); } - Ok(()) + Ok((parts, ids, is_hidden)) }) .await?; + if let Some(id) = ids.iter().last() { + *insert_msg_id = *id; + } + + *hidden = is_hidden; + created_db_entries.extend(ids.iter().map(|id| (chat_id, *id))); + mime_parser.parts = new_parts; + info!( context, - "Message has {} parts and is assigned to chat #{}.", icnt, *chat_id, + "Message has {} parts and is assigned to chat #{}.", icnt, chat_id, ); // check event to send diff --git a/src/location.rs b/src/location.rs index 728f8c654..9e3f43205 100644 --- a/src/location.rs +++ b/src/location.rs @@ -521,13 +521,15 @@ pub async fn save( ) -> Result { ensure!(!chat_id.is_special(), "Invalid chat id"); - let newest_location_id = context - .sql - .with_conn(|mut conn| { - let mut newest_timestamp = 0; - let mut newest_location_id = 0; + let mut newest_timestamp = 0; + let mut newest_location_id = 0; - for location in locations { + for location in locations { + // TODO: can this clone be avoided? + let location = location.clone(); + context + .sql + .with_conn(move |mut conn| { let mut stmt_test = conn .prepare_cached("SELECT id FROM locations WHERE timestamp=? AND from_id=?")?; let mut stmt_insert = conn.prepare_cached( @@ -555,7 +557,6 @@ pub async fn save( drop(stmt_insert); newest_timestamp = location.timestamp; newest_location_id = crate::sql::get_rowid2( - context, &mut conn, "locations", "timestamp", @@ -565,10 +566,11 @@ pub async fn save( )?; } } - } - Ok(newest_location_id) - }) - .await?; + Ok(()) + }) + .await?; + } + Ok(newest_location_id) } @@ -580,7 +582,7 @@ pub(crate) async fn job_maybe_send_locations(context: &Context, _job: &Job) -> j " ----------------- MAYBE_SEND_LOCATIONS -------------- ", ); - if let Ok(ref rows) = context + let rows = context .sql .query_map( "SELECT id, locations_send_begin, locations_last_sent \ @@ -606,11 +608,14 @@ pub(crate) async fn job_maybe_send_locations(context: &Context, _job: &Job) -> j .map_err(Into::into) }, ) - .await - { + .await; + + if rows.is_ok() { let msgs = context .sql - .with_conn(|conn| { + .with_conn(move |conn| { + let rows = rows.unwrap(); + let mut stmt_locations = conn.prepare_cached( "SELECT id \ FROM locations \ @@ -621,37 +626,34 @@ pub(crate) async fn job_maybe_send_locations(context: &Context, _job: &Job) -> j ORDER BY timestamp;", )?; - let msgs = rows - .iter() - .filter_map(|(chat_id, locations_send_begin, locations_last_sent)| { - if !stmt_locations - .exists(paramsv![ - DC_CONTACT_ID_SELF, - *locations_send_begin, - *locations_last_sent, - ]) - .unwrap_or_default() - { - // if there is no new location, there's nothing to send. - // however, maybe we want to bypass this test eg. 15 minutes - None - } else { - // pending locations are attached automatically to every message, - // so also to this empty text message. - // DC_CMD_LOCATION is only needed to create a nicer subject. - // - // for optimisation and to avoid flooding the sending queue, - // we could sending these messages only if we're really online. - // the easiest way to determine this, is to check for an empty message queue. - // (might not be 100%, however, as positions are sent combined later - // and dc_set_location() is typically called periodically, this is ok) - let mut msg = Message::new(Viewtype::Text); - msg.hidden = true; - msg.param.set_cmd(SystemMessage::LocationOnly); - Some((chat_id, msg)) - } - }) - .collect::>(); + let mut msgs = Vec::new(); + for (chat_id, locations_send_begin, locations_last_sent) in &rows { + if !stmt_locations + .exists(paramsv![ + DC_CONTACT_ID_SELF, + *locations_send_begin, + *locations_last_sent, + ]) + .unwrap_or_default() + { + // if there is no new location, there's nothing to send. + // however, maybe we want to bypass this test eg. 15 minutes + } else { + // pending locations are attached automatically to every message, + // so also to this empty text message. + // DC_CMD_LOCATION is only needed to create a nicer subject. + // + // for optimisation and to avoid flooding the sending queue, + // we could sending these messages only if we're really online. + // the easiest way to determine this, is to check for an empty message queue. + // (might not be 100%, however, as positions are sent combined later + // and dc_set_location() is typically called periodically, this is ok) + let mut msg = Message::new(Viewtype::Text); + msg.hidden = true; + msg.param.set_cmd(SystemMessage::LocationOnly); + msgs.push((*chat_id, msg)); + } + } Ok(msgs) }) @@ -660,11 +662,12 @@ pub(crate) async fn job_maybe_send_locations(context: &Context, _job: &Job) -> j for (chat_id, mut msg) in msgs.into_iter() { // TODO: better error handling - chat::send_msg(context, *chat_id, &mut msg) + chat::send_msg(context, chat_id, &mut msg) .await .unwrap_or_default(); } } + if continue_streaming { schedule_maybe_send_locations(context, true).await; } diff --git a/src/message.rs b/src/message.rs index 8ae5cd2ec..e1e874667 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1026,14 +1026,14 @@ async fn delete_poi_location(context: &Context, location_id: u32) -> bool { .is_ok() } -pub async fn markseen_msgs(context: &Context, msg_ids: &[MsgId]) -> bool { +pub async fn markseen_msgs(context: &Context, msg_ids: Vec) -> bool { if msg_ids.is_empty() { return false; } let msgs = context .sql - .with_conn(|conn| { + .with_conn(move |conn| { let mut stmt = conn.prepare_cached(concat!( "SELECT", " m.state AS state,", @@ -1043,8 +1043,8 @@ pub async fn markseen_msgs(context: &Context, msg_ids: &[MsgId]) -> bool { ))?; let mut msgs = Vec::with_capacity(msg_ids.len()); - for id in msg_ids.iter() { - let query_res = stmt.query_row(paramsv![*id], |row| { + for id in msg_ids.into_iter() { + let query_res = stmt.query_row(paramsv![id], |row| { Ok(( row.get::<_, MessageState>("state")?, row.get::<_, Option>("blocked")? @@ -1070,7 +1070,7 @@ pub async fn markseen_msgs(context: &Context, msg_ids: &[MsgId]) -> bool { for (id, curr_state, curr_blocked) in msgs.into_iter() { if curr_blocked == Blocked::Not { if curr_state == MessageState::InFresh || curr_state == MessageState::InNoticed { - update_msg_state(context, *id, MessageState::InSeen).await; + update_msg_state(context, id, MessageState::InSeen).await; info!(context, "Seen message {}.", id); job::add( @@ -1084,7 +1084,7 @@ pub async fn markseen_msgs(context: &Context, msg_ids: &[MsgId]) -> bool { send_event = true; } } else if curr_state == MessageState::InFresh { - update_msg_state(context, *id, MessageState::InNoticed).await; + update_msg_state(context, id, MessageState::InNoticed).await; send_event = true; } } @@ -1110,16 +1110,16 @@ pub async fn update_msg_state(context: &Context, msg_id: MsgId, state: MessageSt .is_ok() } -pub async fn star_msgs(context: &Context, msg_ids: &[MsgId], star: bool) -> bool { +pub async fn star_msgs(context: &Context, msg_ids: Vec, star: bool) -> bool { if msg_ids.is_empty() { return false; } context .sql - .with_conn(|conn| { + .with_conn(move |conn| { let mut stmt = conn.prepare("UPDATE msgs SET starred=? WHERE id=?;")?; - for msg_id in msg_ids.iter() { - stmt.execute(paramsv![star as i32, *msg_id])?; + for msg_id in msg_ids.into_iter() { + stmt.execute(paramsv![star as i32, msg_id])?; } Ok(()) }) diff --git a/src/sql.rs b/src/sql.rs index 30abf1da4..b32d6dfbb 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -162,19 +162,20 @@ impl Sql { Ok(conn) } - pub async fn with_conn(&self, mut g: G) -> Result + pub async fn with_conn(&self, g: G) -> Result where - G: FnMut(r2d2::PooledConnection) -> Result, + H: Send + 'static, + G: Send + + 'static + + FnOnce(r2d2::PooledConnection) -> Result, { let lock = self.pool.read().await; let pool = lock.as_ref().ok_or_else(|| Error::SqlNoConnection)?; + let conn = pool.get()?; + + let res = async_std::task::spawn_blocking(move || g(conn)).await; + self.in_use.remove(); - let res = { - let conn = pool.get()?; - let res = g(conn); - self.in_use.remove(); - res - }; res } @@ -234,9 +235,10 @@ impl Sql { pub async fn table_exists(&self, name: impl AsRef) -> Result { self.start_stmt("table_exists"); - self.with_conn(|conn| { + let name = name.as_ref().to_string(); + self.with_conn(move |conn| { let mut exists = false; - conn.pragma(None, "table_info", &name.as_ref().to_string(), |_row| { + conn.pragma(None, "table_info", &name, |_row| { // will only be executed if the info was found exists = true; Ok(()) @@ -422,7 +424,7 @@ impl Sql { /// eg. if a Message-ID is split into different messages. pub async fn get_rowid( &self, - context: &Context, + _context: &Context, table: impl AsRef, field: impl AsRef, value: impl AsRef, @@ -431,7 +433,7 @@ impl Sql { let res = { let mut conn = self.get_conn().await?; - let res = get_rowid(context, &mut conn, table, field, value); + let res = get_rowid(&mut conn, table, field, value); self.in_use.remove(); res }; @@ -441,7 +443,7 @@ impl Sql { pub async fn get_rowid2( &self, - context: &Context, + _context: &Context, table: impl AsRef, field: impl AsRef, value: i64, @@ -452,7 +454,7 @@ impl Sql { let res = { let mut conn = self.get_conn().await?; - let res = get_rowid2(context, &mut conn, table, field, value, field2, value2); + let res = get_rowid2(&mut conn, table, field, value, field2, value2); self.in_use.remove(); res }; @@ -462,7 +464,6 @@ impl Sql { } pub fn get_rowid( - _context: &Context, conn: &mut Connection, table: impl AsRef, field: impl AsRef, @@ -481,7 +482,6 @@ pub fn get_rowid( } pub fn get_rowid2( - _context: &Context, conn: &mut Connection, table: impl AsRef, field: impl AsRef,