Simplify SQL error handling (#2415)

* Remove sql::error submodule

Use anyhow errors instead.

* Remove explicit checks for open SQL connection

An error will be thrown anyway during attempt to execute query.

* Don't use `with_conn()` and remove it

* Remove unused `with_conn_async`

* Resultify markseen_msgs
This commit is contained in:
link2xt
2021-05-03 23:01:06 +03:00
committed by GitHub
parent d421670477
commit f42da17a78
21 changed files with 295 additions and 398 deletions

View File

@@ -612,8 +612,6 @@ pub enum BlobError {
WrongBlobdir { blobdir: PathBuf, src: PathBuf },
#[error("Blob has a badname {}", .blobname.display())]
WrongName { blobname: PathBuf },
#[error("Sql: {0}")]
Sql(#[from] crate::sql::Error),
#[error("{0}")]
Other(#[from] anyhow::Error),
}

View File

@@ -38,7 +38,6 @@ use crate::message::{self, InvalidMsgId, Message, MessageState, MsgId};
use crate::mimeparser::SystemMessage;
use crate::param::{Param, Params};
use crate::peerstate::{Peerstate, PeerstateVerifiedStatus};
use crate::sql;
use crate::stock_str;
/// An chat item, such as a message or a marker.
@@ -599,7 +598,7 @@ impl ChatId {
async fn get_parent_mime_headers(
self,
context: &Context,
) -> sql::Result<Option<(String, String, String)>> {
) -> Result<Option<(String, String, String)>> {
if let Some((rfc724_mid, mime_in_reply_to, mime_references, error)) = self
.parent_query(
context,
@@ -2808,7 +2807,7 @@ pub(crate) async fn get_chat_cnt(context: &Context) -> Result<usize> {
pub(crate) async fn get_chat_id_by_grpid(
context: &Context,
grpid: impl AsRef<str>,
) -> Result<(ChatId, bool, Blocked), sql::Error> {
) -> Result<(ChatId, bool, Blocked)> {
context
.sql
.query_row(
@@ -3882,7 +3881,7 @@ mod tests {
}
#[async_std::test]
async fn test_marknoticed_chat() -> anyhow::Result<()> {
async fn test_marknoticed_chat() -> Result<()> {
let t = TestContext::new_alice().await;
let chat = t.create_chat_with_contact("bob", "bob@example.org").await;
@@ -3930,7 +3929,7 @@ mod tests {
}
#[async_std::test]
async fn test_marknoticed_deaddrop_chat() -> anyhow::Result<()> {
async fn test_marknoticed_deaddrop_chat() -> Result<()> {
let t = TestContext::new_alice().await;
let chats = Chatlist::try_load(&t, 0, None, None).await?;

View File

@@ -305,7 +305,7 @@ impl Context {
}
}
pub async fn set_config_bool(&self, key: Config, value: bool) -> crate::sql::Result<()> {
pub async fn set_config_bool(&self, key: Config, value: bool) -> Result<()> {
self.set_config(key, if value { Some("1") } else { None })
.await?;
Ok(())

View File

@@ -175,7 +175,7 @@ pub enum VerifiedStatus {
}
impl Contact {
pub async fn load_from_db(context: &Context, contact_id: u32) -> crate::sql::Result<Self> {
pub async fn load_from_db(context: &Context, contact_id: u32) -> Result<Self> {
let mut contact = context
.sql
.query_row(
@@ -915,7 +915,7 @@ impl Contact {
}
Err(err) => {
error!(context, "delete_contact {} failed ({})", contact_id, err);
return Err(err.into());
return Err(err);
}
}
}

View File

@@ -943,16 +943,15 @@ async fn add_parts(
// TODO: can this clone be avoided?
let rfc724_mid = rfc724_mid.to_string();
let (new_parts, ids, is_hidden) = context
.sql
.with_conn(move |conn| {
let mut ids = Vec::with_capacity(parts.len());
let mut is_hidden = is_hidden;
let mut is_hidden = is_hidden;
let mut ids = Vec::with_capacity(parts.len());
for part in &mut parts {
let mut txt_raw = "".to_string();
let mut stmt = conn.prepare_cached(
r#"
let conn = context.sql.get_conn().await?;
for part in &mut parts {
let mut txt_raw = "".to_string();
let mut stmt = conn.prepare_cached(
r#"
INSERT INTO msgs
(
rfc724_mid, server_folder, server_uid, chat_id,
@@ -973,92 +972,87 @@ INSERT INTO msgs
?
);
"#,
)?;
)?;
let is_location_kml = location_kml_is
&& icnt == 1
&& (part.msg == "-location-" || part.msg.is_empty());
let is_location_kml =
location_kml_is && icnt == 1 && (part.msg == "-location-" || part.msg.is_empty());
if is_mdn || is_location_kml {
is_hidden = true;
if incoming {
state = MessageState::InSeen; // Set the state to InSeen so that precheck_imf() adds a markseen job after we moved the message
}
}
let mime_modified = save_mime_modified && !part.msg.is_empty();
if mime_modified {
// Avoid setting mime_modified for more than one part.
save_mime_modified = false;
}
if part.typ == Viewtype::Text {
let msg_raw = part.msg_raw.as_ref().cloned().unwrap_or_default();
txt_raw = format!("{}\n\n{}", subject, msg_raw);
}
if is_system_message != SystemMessage::Unknown {
part.param.set_int(Param::Cmd, is_system_message as i32);
}
let ephemeral_timestamp = if in_fresh {
0
} else {
match ephemeral_timer {
EphemeralTimer::Disabled => 0,
EphemeralTimer::Enabled { duration } => {
rcvd_timestamp + i64::from(duration)
}
}
};
// If you change which information is skipped if the message is trashed,
// also change `MsgId::trash()` and `delete_expired_messages()`
let trash = chat_id.is_trash();
stmt.execute(paramsv![
rfc724_mid,
server_folder,
server_uid as i32,
chat_id,
if trash { 0 } else { from_id as i32 },
if trash { 0 } else { to_id as i32 },
sort_timestamp,
sent_timestamp,
rcvd_timestamp,
part.typ,
state,
is_dc_message,
if trash { "" } else { &part.msg },
if trash { "" } else { &subject },
// txt_raw might contain invalid utf8
if trash { "" } else { &txt_raw },
if trash {
"".to_string()
} else {
part.param.to_string()
},
part.bytes as isize,
is_hidden,
if (save_mime_headers || mime_modified) && !trash {
mime_headers.to_string()
} else {
"".to_string()
},
mime_in_reply_to,
mime_references,
mime_modified,
part.error.take().unwrap_or_default(),
ephemeral_timer,
ephemeral_timestamp
])?;
let row_id = conn.last_insert_rowid();
drop(stmt);
ids.push(MsgId::new(u32::try_from(row_id)?));
if is_mdn || is_location_kml {
is_hidden = true;
if incoming {
state = MessageState::InSeen; // Set the state to InSeen so that precheck_imf() adds a markseen job after we moved the message
}
Ok((parts, ids, is_hidden))
})
.await?;
}
let mime_modified = save_mime_modified && !part.msg.is_empty();
if mime_modified {
// Avoid setting mime_modified for more than one part.
save_mime_modified = false;
}
if part.typ == Viewtype::Text {
let msg_raw = part.msg_raw.as_ref().cloned().unwrap_or_default();
txt_raw = format!("{}\n\n{}", subject, msg_raw);
}
if is_system_message != SystemMessage::Unknown {
part.param.set_int(Param::Cmd, is_system_message as i32);
}
let ephemeral_timestamp = if in_fresh {
0
} else {
match ephemeral_timer {
EphemeralTimer::Disabled => 0,
EphemeralTimer::Enabled { duration } => rcvd_timestamp + i64::from(duration),
}
};
// If you change which information is skipped if the message is trashed,
// also change `MsgId::trash()` and `delete_expired_messages()`
let trash = chat_id.is_trash();
stmt.execute(paramsv![
rfc724_mid,
server_folder,
server_uid as i32,
chat_id,
if trash { 0 } else { from_id as i32 },
if trash { 0 } else { to_id as i32 },
sort_timestamp,
sent_timestamp,
rcvd_timestamp,
part.typ,
state,
is_dc_message,
if trash { "" } else { &part.msg },
if trash { "" } else { &subject },
// txt_raw might contain invalid utf8
if trash { "" } else { &txt_raw },
if trash {
"".to_string()
} else {
part.param.to_string()
},
part.bytes as isize,
is_hidden,
if (save_mime_headers || mime_modified) && !trash {
mime_headers.to_string()
} else {
"".to_string()
},
mime_in_reply_to,
mime_references,
mime_modified,
part.error.take().unwrap_or_default(),
ephemeral_timer,
ephemeral_timestamp
])?;
let row_id = conn.last_insert_rowid();
drop(stmt);
ids.push(MsgId::new(u32::try_from(row_id)?));
}
drop(conn);
if let Some(id) = ids.iter().last() {
*insert_msg_id = *id;
@@ -1070,7 +1064,7 @@ INSERT INTO msgs
*hidden = is_hidden;
created_db_entries.extend(ids.iter().map(|id| (chat_id, *id)));
mime_parser.parts = new_parts;
mime_parser.parts = parts;
info!(
context,

View File

@@ -61,7 +61,7 @@ use std::num::ParseIntError;
use std::str::FromStr;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::{ensure, Context as _, Error};
use anyhow::{ensure, Context as _, Result};
use async_std::task;
use serde::{Deserialize, Serialize};
@@ -73,7 +73,6 @@ use crate::dc_tools::time;
use crate::events::EventType;
use crate::message::{Message, MessageState, MsgId};
use crate::mimeparser::SystemMessage;
use crate::sql;
use crate::stock_str;
use crate::{
chat::{lookup_by_contact_id, send_msg, ChatId},
@@ -150,7 +149,7 @@ impl rusqlite::types::FromSql for Timer {
impl ChatId {
/// Get ephemeral message timer value in seconds.
pub async fn get_ephemeral_timer(self, context: &Context) -> Result<Timer, Error> {
pub async fn get_ephemeral_timer(self, context: &Context) -> Result<Timer> {
let timer = context
.sql
.query_get_value(
@@ -169,7 +168,7 @@ impl ChatId {
self,
context: &Context,
timer: Timer,
) -> Result<(), Error> {
) -> Result<()> {
ensure!(!self.is_special(), "Invalid chat ID");
context
@@ -192,7 +191,7 @@ impl ChatId {
/// Set ephemeral message timer value in seconds.
///
/// If timer value is 0, disable ephemeral message timer.
pub async fn set_ephemeral_timer(self, context: &Context, timer: Timer) -> Result<(), Error> {
pub async fn set_ephemeral_timer(self, context: &Context, timer: Timer) -> Result<()> {
if timer == self.get_ephemeral_timer(context).await? {
return Ok(());
}
@@ -307,7 +306,7 @@ impl MsgId {
/// false. This function does not emit the MsgsChanged event itself,
/// because it is also called when chatlist is reloaded, and emitting
/// MsgsChanged there will cause infinite reload loop.
pub(crate) async fn delete_expired_messages(context: &Context) -> Result<bool, Error> {
pub(crate) async fn delete_expired_messages(context: &Context) -> Result<bool> {
let mut updated = context
.sql
.execute(
@@ -484,7 +483,7 @@ pub(crate) async fn load_imap_deletion_msgid(context: &Context) -> anyhow::Resul
///
/// This function is supposed to be called in the background,
/// e.g. from housekeeping task.
pub(crate) async fn start_ephemeral_timers(context: &Context) -> sql::Result<()> {
pub(crate) async fn start_ephemeral_timers(context: &Context) -> Result<()> {
context
.sql
.execute(

View File

@@ -430,7 +430,7 @@ impl Job {
&self,
context: &Context,
contact_id: u32,
) -> sql::Result<(Vec<u32>, Vec<String>)> {
) -> Result<(Vec<u32>, Vec<String>)> {
// Extract message IDs from job parameters
let res: Vec<(u32, MsgId)> = context
.sql
@@ -846,7 +846,7 @@ pub async fn kill_action(context: &Context, action: Action) -> bool {
}
/// Remove jobs with specified IDs.
async fn kill_ids(context: &Context, job_ids: &[u32]) -> sql::Result<()> {
async fn kill_ids(context: &Context, job_ids: &[u32]) -> Result<()> {
let q = format!(
"DELETE FROM jobs WHERE id IN({})",
job_ids.iter().map(|_| "?").join(",")
@@ -1050,7 +1050,7 @@ pub(crate) enum Connection<'a> {
Smtp(&'a mut Smtp),
}
pub(crate) async fn load_imap_deletion_job(context: &Context) -> sql::Result<Option<Job>> {
pub(crate) async fn load_imap_deletion_job(context: &Context) -> Result<Option<Job>> {
let res = if let Some(msg_id) = load_imap_deletion_msgid(context).await? {
Some(Job::new(
Action::DeleteMsgOnImap,

View File

@@ -15,7 +15,6 @@ use crate::config::Config;
use crate::constants::KeyGenType;
use crate::context::Context;
use crate::dc_tools::{time, EmailAddress, InvalidEmailError};
use crate::sql;
// Re-export key types
pub use crate::pgp::KeyPair;
@@ -31,8 +30,6 @@ pub enum Error {
Pgp(#[from] pgp::errors::Error),
#[error("Failed to generate PGP key: {}", _0)]
Keygen(#[from] crate::pgp::PgpKeygenError),
#[error("Failed to load key: {}", _0)]
LoadKey(#[from] sql::Error),
#[error("Failed to save generated key: {}", _0)]
StoreKey(#[from] SaveKeyError),
#[error("No address configured")]

View File

@@ -568,39 +568,33 @@ pub async fn save(
accuracy,
..
} = location;
let (loc_id, ts) = context
.sql
.with_conn(move |conn| {
let mut stmt_test = conn
.prepare_cached("SELECT id FROM locations WHERE timestamp=? AND from_id=?")?;
let mut stmt_insert = conn.prepare_cached(stmt_insert)?;
let exists = stmt_test.exists(paramsv![timestamp, contact_id as i32])?;
let conn = context.sql.get_conn().await?;
let mut stmt_test =
conn.prepare_cached("SELECT id FROM locations WHERE timestamp=? AND from_id=?")?;
let mut stmt_insert = conn.prepare_cached(stmt_insert)?;
if independent || !exists {
stmt_insert.execute(paramsv![
timestamp,
contact_id as i32,
chat_id,
latitude,
longitude,
accuracy,
independent,
])?;
let exists = stmt_test.exists(paramsv![timestamp, contact_id as i32])?;
if timestamp > newest_timestamp {
// okay to drop, as we use cached prepared statements
drop(stmt_test);
drop(stmt_insert);
newest_timestamp = timestamp;
newest_location_id = conn.last_insert_rowid();
}
}
Ok((newest_location_id, newest_timestamp))
})
.await?;
newest_timestamp = ts;
newest_location_id = loc_id;
if independent || !exists {
stmt_insert.execute(paramsv![
timestamp,
contact_id as i32,
chat_id,
latitude,
longitude,
accuracy,
independent,
])?;
if timestamp > newest_timestamp {
// okay to drop, as we use cached prepared statements
drop(stmt_test);
drop(stmt_insert);
newest_timestamp = timestamp;
newest_location_id = conn.last_insert_rowid();
}
}
}
Ok(u32::try_from(newest_location_id)?)
@@ -642,55 +636,50 @@ pub(crate) async fn job_maybe_send_locations(context: &Context, _job: &Job) -> j
)
.await;
if rows.is_ok() {
let msgs = context
.sql
.with_conn(move |conn| {
let rows = rows.unwrap();
if let Ok(rows) = rows {
let mut msgs = Vec::new();
let mut stmt_locations = conn.prepare_cached(
"SELECT id \
FROM locations \
WHERE from_id=? \
AND timestamp>=? \
AND timestamp>? \
AND independent=0 \
ORDER BY timestamp;",
)?;
{
let conn = job_try!(context.sql.get_conn().await);
let mut msgs = Vec::new();
for (chat_id, locations_send_begin, locations_last_sent) in &rows {
if !stmt_locations
.exists(paramsv![
DC_CONTACT_ID_SELF,
*locations_send_begin,
*locations_last_sent,
])
.unwrap_or_default()
{
// if there is no new location, there's nothing to send.
// however, maybe we want to bypass this test eg. 15 minutes
} else {
// pending locations are attached automatically to every message,
// so also to this empty text message.
// DC_CMD_LOCATION is only needed to create a nicer subject.
//
// for optimisation and to avoid flooding the sending queue,
// we could sending these messages only if we're really online.
// the easiest way to determine this, is to check for an empty message queue.
// (might not be 100%, however, as positions are sent combined later
// and dc_set_location() is typically called periodically, this is ok)
let mut msg = Message::new(Viewtype::Text);
msg.hidden = true;
msg.param.set_cmd(SystemMessage::LocationOnly);
msgs.push((*chat_id, msg));
}
let mut stmt_locations = job_try!(conn.prepare_cached(
"SELECT id \
FROM locations \
WHERE from_id=? \
AND timestamp>=? \
AND timestamp>? \
AND independent=0 \
ORDER BY timestamp;",
));
for (chat_id, locations_send_begin, locations_last_sent) in &rows {
if !stmt_locations
.exists(paramsv![
DC_CONTACT_ID_SELF,
*locations_send_begin,
*locations_last_sent,
])
.unwrap_or_default()
{
// if there is no new location, there's nothing to send.
// however, maybe we want to bypass this test eg. 15 minutes
} else {
// pending locations are attached automatically to every message,
// so also to this empty text message.
// DC_CMD_LOCATION is only needed to create a nicer subject.
//
// for optimisation and to avoid flooding the sending queue,
// we could sending these messages only if we're really online.
// the easiest way to determine this, is to check for an empty message queue.
// (might not be 100%, however, as positions are sent combined later
// and dc_set_location() is typically called periodically, this is ok)
let mut msg = Message::new(Viewtype::Text);
msg.hidden = true;
msg.param.set_cmd(SystemMessage::LocationOnly);
msgs.push((*chat_id, msg));
}
Ok(msgs)
})
.await
.unwrap_or_default(); // TODO: better error handling
}
}
for (chat_id, mut msg) in msgs.into_iter() {
// TODO: better error handling

View File

@@ -5,6 +5,7 @@ use std::fmt;
use crate::provider::{get_provider_by_id, Provider};
use crate::{context::Context, provider::Socket};
use anyhow::Result;
#[derive(Copy, Clone, Debug, Display, FromPrimitive, PartialEq, Eq)]
#[repr(u32)]
@@ -54,10 +55,7 @@ pub struct LoginParam {
impl LoginParam {
/// Read the login parameters from the database.
pub async fn from_database(
context: &Context,
prefix: impl AsRef<str>,
) -> crate::sql::Result<Self> {
pub async fn from_database(context: &Context, prefix: impl AsRef<str>) -> Result<Self> {
let prefix = prefix.as_ref();
let sql = &context.sql;
@@ -156,11 +154,7 @@ impl LoginParam {
}
/// Save this loginparam to the database.
pub async fn save_to_database(
&self,
context: &Context,
prefix: impl AsRef<str>,
) -> crate::sql::Result<()> {
pub async fn save_to_database(&self, context: &Context, prefix: impl AsRef<str>) -> Result<()> {
let prefix = prefix.as_ref();
let sql = &context.sql;
@@ -317,7 +311,7 @@ mod tests {
}
#[async_std::test]
async fn test_save_load_login_param() -> anyhow::Result<()> {
async fn test_save_load_login_param() -> Result<()> {
let t = TestContext::new().await;
let param = LoginParam {

View File

@@ -3,7 +3,7 @@
use std::collections::BTreeMap;
use std::convert::TryInto;
use anyhow::{ensure, Error};
use anyhow::{ensure, format_err, Result};
use async_std::path::{Path, PathBuf};
use deltachat_derive::{FromSql, ToSql};
use itertools::Itertools;
@@ -78,7 +78,7 @@ impl MsgId {
}
/// Returns message state.
pub async fn get_state(self, context: &Context) -> crate::sql::Result<MessageState> {
pub async fn get_state(self, context: &Context) -> Result<MessageState> {
let result = context
.sql
.query_get_value("SELECT state FROM msgs WHERE id=?", paramsv![self])
@@ -90,11 +90,7 @@ impl MsgId {
/// Returns Some if the message needs to be moved from `folder`.
/// If yes, returns `ConfiguredInboxFolder`, `ConfiguredMvboxFolder` or `ConfiguredSentboxFolder`,
/// depending on where the message should be moved
pub async fn needs_move(
self,
context: &Context,
folder: &str,
) -> Result<Option<Config>, Error> {
pub async fn needs_move(self, context: &Context, folder: &str) -> Result<Option<Config>> {
use Config::*;
if context.is_mvbox(folder).await? {
return Ok(None);
@@ -133,7 +129,7 @@ impl MsgId {
}
}
async fn needs_move_to_mvbox(self, context: &Context, msg: &Message) -> Result<bool, Error> {
async fn needs_move_to_mvbox(self, context: &Context, msg: &Message) -> Result<bool> {
if !context.get_config_bool(Config::MvboxMove).await? {
return Ok(false);
}
@@ -156,7 +152,7 @@ impl MsgId {
/// We keep some infos to
/// 1. not download the same message again
/// 2. be able to delete the message on the server if we want to
pub async fn trash(self, context: &Context) -> crate::sql::Result<()> {
pub async fn trash(self, context: &Context) -> Result<()> {
let chat_id = DC_CHAT_ID_TRASH;
context
.sql
@@ -181,7 +177,7 @@ WHERE id=?;
}
/// Deletes a message and corresponding MDNs from the database.
pub async fn delete_from_db(self, context: &Context) -> crate::sql::Result<()> {
pub async fn delete_from_db(self, context: &Context) -> Result<()> {
// We don't use transactions yet, so remove MDNs first to make
// sure they are not left while the message is deleted.
context
@@ -200,7 +196,7 @@ WHERE id=?;
/// It is used to avoid trying to remove the message from the
/// server multiple times when there are multiple message records
/// pointing to the same server UID.
pub(crate) async fn unlink(self, context: &Context) -> crate::sql::Result<()> {
pub(crate) async fn unlink(self, context: &Context) -> Result<()> {
context
.sql
.execute(
@@ -342,7 +338,7 @@ impl Message {
}
}
pub async fn load_from_db(context: &Context, id: MsgId) -> Result<Message, Error> {
pub async fn load_from_db(context: &Context, id: MsgId) -> Result<Message> {
ensure!(
!id.is_special(),
"Can not load special message ID {} from DB.",
@@ -456,7 +452,7 @@ impl Message {
self.param.get_path(Param::File, context).unwrap_or(None)
}
pub async fn try_calc_and_set_dimensions(&mut self, context: &Context) -> Result<(), Error> {
pub async fn try_calc_and_set_dimensions(&mut self, context: &Context) -> Result<()> {
if chat::msgtype_has_file(self.viewtype) {
let file_param = self.param.get_path(Param::File, context)?;
if let Some(path_and_filename) = file_param {
@@ -875,7 +871,7 @@ impl Message {
///
/// The message itself is not required to exist in the database,
/// it may even be deleted from the database by the time the message is prepared.
pub async fn set_quote(&mut self, context: &Context, quote: &Message) -> Result<(), Error> {
pub async fn set_quote(&mut self, context: &Context, quote: &Message) -> Result<()> {
ensure!(
!quote.rfc724_mid.is_empty(),
"Message without Message-Id cannot be quoted"
@@ -908,7 +904,7 @@ impl Message {
self.param.get(Param::Quote).map(|s| s.to_string())
}
pub async fn quoted_message(&self, context: &Context) -> Result<Option<Message>, Error> {
pub async fn quoted_message(&self, context: &Context) -> Result<Option<Message>> {
if self.param.get(Param::Quote).is_some() {
if let Some(in_reply_to) = &self.in_reply_to {
if let Some((_, _, msg_id)) = rfc724_mid_exists(context, in_reply_to).await? {
@@ -1222,7 +1218,7 @@ pub async fn decide_on_contact_request(
created_chat_id
}
pub async fn get_msg_info(context: &Context, msg_id: MsgId) -> Result<String, Error> {
pub async fn get_msg_info(context: &Context, msg_id: MsgId) -> Result<String> {
let msg = Message::load_from_db(context, msg_id).await?;
let rawtxt: Option<String> = context
.sql
@@ -1443,7 +1439,7 @@ pub fn guess_msgtype_from_suffix(path: &Path) -> Option<(Viewtype, &str)> {
/// Returns an empty string if there are no headers saved for the given message,
/// e.g. because of save_mime_headers is not set
/// or the message is not incoming.
pub async fn get_mime_headers(context: &Context, msg_id: MsgId) -> Result<String, Error> {
pub async fn get_mime_headers(context: &Context, msg_id: MsgId) -> Result<String> {
let headers = context
.sql
.query_get_value(
@@ -1497,44 +1493,39 @@ async fn delete_poi_location(context: &Context, location_id: u32) -> bool {
.is_ok()
}
pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> bool {
pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> Result<()> {
if msg_ids.is_empty() {
return false;
return Ok(());
}
let msgs = context
.sql
.with_conn(move |conn| {
let mut stmt = conn.prepare_cached(concat!(
"SELECT",
" m.chat_id AS chat_id,",
" m.state AS state,",
" c.blocked AS blocked",
" FROM msgs m LEFT JOIN chats c ON c.id=m.chat_id",
" WHERE m.id=? AND m.chat_id>9"
))?;
let conn = context.sql.get_conn().await?;
let mut stmt = conn.prepare_cached(concat!(
"SELECT",
" m.chat_id AS chat_id,",
" m.state AS state,",
" c.blocked AS blocked",
" FROM msgs m LEFT JOIN chats c ON c.id=m.chat_id",
" WHERE m.id=? AND m.chat_id>9"
))?;
let mut msgs = Vec::with_capacity(msg_ids.len());
for id in msg_ids.into_iter() {
let query_res = stmt.query_row(paramsv![id], |row| {
Ok((
row.get::<_, ChatId>("chat_id")?,
row.get::<_, MessageState>("state")?,
row.get::<_, Option<Blocked>>("blocked")?
.unwrap_or_default(),
))
});
if let Err(rusqlite::Error::QueryReturnedNoRows) = query_res {
continue;
}
let (chat_id, state, blocked) = query_res.map_err(Into::<anyhow::Error>::into)?;
msgs.push((id, chat_id, state, blocked));
}
Ok(msgs)
})
.await
.unwrap_or_default();
let mut msgs = Vec::with_capacity(msg_ids.len());
for id in msg_ids.into_iter() {
let query_res = stmt.query_row(paramsv![id], |row| {
Ok((
row.get::<_, ChatId>("chat_id")?,
row.get::<_, MessageState>("state")?,
row.get::<_, Option<Blocked>>("blocked")?
.unwrap_or_default(),
))
});
if let Err(rusqlite::Error::QueryReturnedNoRows) = query_res {
continue;
}
let (chat_id, state, blocked) = query_res.map_err(Into::<anyhow::Error>::into)?;
msgs.push((id, chat_id, state, blocked));
}
drop(stmt);
drop(conn);
let mut updated_chat_ids = BTreeMap::new();
@@ -1566,7 +1557,7 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> bool {
context.emit_event(EventType::MsgsNoticed(*updated_chat_id));
}
true
Ok(())
}
pub async fn update_msg_state(context: &Context, msg_id: MsgId, state: MessageState) -> bool {
@@ -1669,7 +1660,7 @@ pub async fn get_summarytext_by_raw(
// Context functions to work with messages
pub async fn exists(context: &Context, msg_id: MsgId) -> anyhow::Result<bool> {
pub async fn exists(context: &Context, msg_id: MsgId) -> Result<bool> {
if msg_id.is_special() {
return Ok(false);
}
@@ -1724,7 +1715,7 @@ pub async fn handle_mdn(
from_id: u32,
rfc724_mid: &str,
timestamp_sent: i64,
) -> anyhow::Result<Option<(ChatId, MsgId)>> {
) -> Result<Option<(ChatId, MsgId)>> {
if from_id <= DC_CONTACT_ID_LAST_SPECIAL || rfc724_mid.is_empty() {
return Ok(None);
}
@@ -1829,7 +1820,7 @@ pub(crate) async fn handle_ndn(
context: &Context,
failed: &FailureReport,
error: Option<impl AsRef<str>>,
) -> anyhow::Result<()> {
) -> Result<()> {
if failed.rfc724_mid.is_empty() {
return Ok(());
}
@@ -1878,7 +1869,7 @@ async fn ndn_maybe_add_info_msg(
failed: &FailureReport,
chat_id: ChatId,
chat_type: Chattype,
) -> anyhow::Result<()> {
) -> Result<()> {
match chat_type {
Chattype::Group => {
if let Some(failed_recipient) = &failed.failed_recipient {
@@ -1886,7 +1877,7 @@ async fn ndn_maybe_add_info_msg(
Contact::lookup_id_by_addr(context, failed_recipient, Origin::Unknown)
.await?
.ok_or_else(|| {
Error::msg("ndn_maybe_add_info_msg: Contact ID not found")
format_err!("ndn_maybe_add_info_msg: Contact ID not found")
})?;
let contact = Contact::load_from_db(context, contact_id).await?;
// Tell the user which of the recipients failed if we know that (because in
@@ -1949,7 +1940,7 @@ pub async fn estimate_deletion_cnt(
context: &Context,
from_server: bool,
seconds: i64,
) -> Result<usize, Error> {
) -> Result<usize> {
let self_chat_id = chat::lookup_by_contact_id(context, DC_CONTACT_ID_SELF)
.await
.unwrap_or_default()
@@ -2016,7 +2007,7 @@ pub async fn rfc724_mid_cnt(context: &Context, rfc724_mid: &str) -> usize {
pub(crate) async fn rfc724_mid_exists(
context: &Context,
rfc724_mid: &str,
) -> Result<Option<(String, u32, MsgId)>, Error> {
) -> Result<Option<(String, u32, MsgId)>> {
let rfc724_mid = rfc724_mid.trim_start_matches('<').trim_end_matches('>');
if rfc724_mid.is_empty() {
warn!(context, "Empty rfc724_mid passed to rfc724_mid_exists");
@@ -2739,7 +2730,7 @@ mod tests {
}
#[async_std::test]
async fn test_markseen_msgs() -> anyhow::Result<()> {
async fn test_markseen_msgs() -> Result<()> {
let alice = TestContext::new_alice().await;
let bob = TestContext::new_bob().await;
let alice_chat = alice.create_chat(&bob).await;
@@ -2765,7 +2756,7 @@ mod tests {
assert_eq!(bob.get_fresh_msgs().await?.len(), 0);
// that has no effect in deaddrop
markseen_msgs(&bob, vec![msg1.id, msg2.id]).await;
markseen_msgs(&bob, vec![msg1.id, msg2.id]).await?;
assert_eq!(Chatlist::try_load(&bob, 0, None, None).await?.len(), 1);
let msgs = chat::get_chat_msgs(&bob, DC_CHAT_ID_DEADDROP, 0, None).await?;
@@ -2794,22 +2785,22 @@ mod tests {
assert_eq!(alice.get_fresh_msgs().await?.len(), 2);
// no message-ids, that should have no effect
markseen_msgs(&alice, vec![]).await;
markseen_msgs(&alice, vec![]).await?;
// bad message-id, that should have no effect
markseen_msgs(&alice, vec![MsgId::new(123456)]).await;
markseen_msgs(&alice, vec![MsgId::new(123456)]).await?;
assert_eq!(alice_chat.id.get_fresh_msg_cnt(&alice).await?, 2);
assert_eq!(alice.get_fresh_msgs().await?.len(), 2);
// mark the most recent as seen
markseen_msgs(&alice, vec![msg2.id]).await;
markseen_msgs(&alice, vec![msg2.id]).await?;
assert_eq!(alice_chat.id.get_fresh_msg_cnt(&alice).await?, 1);
assert_eq!(alice.get_fresh_msgs().await?.len(), 1);
// user scrolled up - mark both as seen
markseen_msgs(&alice, vec![msg1.id, msg2.id]).await;
markseen_msgs(&alice, vec![msg1.id, msg2.id]).await?;
assert_eq!(alice_chat.id.get_fresh_msg_cnt(&alice).await?, 0);
assert_eq!(alice.get_fresh_msgs().await?.len(), 0);
@@ -2818,7 +2809,7 @@ mod tests {
}
#[async_std::test]
async fn test_get_state() -> anyhow::Result<()> {
async fn test_get_state() -> Result<()> {
let alice = TestContext::new_alice().await;
let bob = TestContext::new_bob().await;
let alice_chat = alice.create_chat(&bob).await;
@@ -2868,7 +2859,7 @@ mod tests {
marknoticed_chat(&bob, bob_msg.chat_id).await?;
assert_state(&bob, bob_msg.id, MessageState::InNoticed).await;
markseen_msgs(&bob, vec![bob_msg.id]).await;
markseen_msgs(&bob, vec![bob_msg.id]).await?;
assert_state(&bob, bob_msg.id, MessageState::InSeen).await;
Ok(())

View File

@@ -309,7 +309,7 @@ impl Oauth2 {
}
}
async fn is_expired(context: &Context) -> Result<bool, crate::sql::Error> {
async fn is_expired(context: &Context) -> Result<bool> {
let expire_timestamp = context
.sql
.get_raw_config_int64("oauth2_timestamp_expires")

View File

@@ -421,7 +421,7 @@ impl Peerstate {
}
}
pub async fn save_to_db(&self, sql: &Sql, create: bool) -> crate::sql::Result<()> {
pub async fn save_to_db(&self, sql: &Sql, create: bool) -> Result<()> {
if self.to_save == Some(ToSave::All) || create {
sql.execute(
if create {

View File

@@ -23,7 +23,6 @@ use crate::mimeparser::{MimeMessage, SystemMessage};
use crate::param::Param;
use crate::peerstate::{Peerstate, PeerstateKeyType, PeerstateVerifiedStatus, ToSave};
use crate::qr::check_qr;
use crate::sql;
use crate::stock_str;
use crate::token;
@@ -267,8 +266,6 @@ pub enum JoinError {
#[error("Unknown contact (this is a bug)")]
UnknownContact(#[source] anyhow::Error),
// Note that this can only occur if we failed to create the chat correctly.
#[error("No Chat found for group (this is a bug)")]
MissingChat(#[source] sql::Error),
#[error("Ongoing sender dropped (this is a bug)")]
OngoingSenderDropped,
#[error("Other")]
@@ -335,7 +332,9 @@ async fn securejoin(context: &Context, qr: &str) -> Result<ChatId, JoinError> {
Err(err) => {
if start.elapsed() > Duration::from_secs(7) {
context.free_ongoing().await;
return Err(JoinError::MissingChat(err));
return Err(err
.context("Ongoing sender dropped (this is a bug)")
.into());
}
}
}
@@ -669,8 +668,9 @@ pub(crate) async fn handle_securejoin_handshake(
}
Err(err) => {
error!(context, "Chat {} not found: {}", &field_grpid, err);
return Err(Error::new(err)
.context(format!("Chat for group {} not found", &field_grpid)));
return Err(
err.context(format!("Chat for group {} not found", &field_grpid))
);
}
}
} else {
@@ -745,8 +745,9 @@ pub(crate) async fn handle_securejoin_handshake(
.unwrap_or_else(|| "");
if let Err(err) = chat::get_chat_id_by_grpid(context, &field_grpid).await {
warn!(context, "Failed to lookup chat_id from grpid: {}", err);
return Err(Error::new(err)
.context(format!("Chat for group {} not found", &field_grpid)));
return Err(
err.context(format!("Chat for group {} not found", &field_grpid))
);
}
}
Ok(HandshakeMessage::Ignore) // "Done" deletes the message and breaks multi-device

View File

@@ -36,8 +36,6 @@ pub enum Error {
Oauth2Error { address: String },
#[error("TLS error {0}")]
Tls(#[from] async_native_tls::Error),
#[error("Sql {0}")]
Sql(#[from] crate::sql::Error),
#[error("{0}")]
Other(#[from] anyhow::Error),
}

View File

@@ -7,7 +7,7 @@ use std::convert::TryFrom;
use std::path::Path;
use std::time::Duration;
use anyhow::Context as _;
use anyhow::{bail, format_err, Context as _, Result};
use async_std::prelude::*;
use rusqlite::OpenFlags;
@@ -33,11 +33,8 @@ macro_rules! paramsv {
};
}
mod error;
mod migrations;
pub use self::error::*;
/// A wrapper around the underlying Sqlite3 object.
#[derive(Debug)]
pub struct Sql {
@@ -101,7 +98,7 @@ impl Sql {
.max_size(10)
.connection_timeout(Duration::from_secs(60))
.build(mgr)
.map_err(Error::ConnectionPool)?;
.context("Can't build SQL connection pool")?;
Ok(pool)
}
@@ -119,21 +116,20 @@ impl Sql {
"Cannot open, database \"{:?}\" already opened.",
dbfile.as_ref(),
);
return Err(Error::SqlAlreadyOpen.into());
bail!("SQL database is already opened.");
}
*self.pool.write().await = Some(Self::new_pool(dbfile.as_ref(), readonly)?);
if !readonly {
self.with_conn(move |conn| {
{
let conn = self.get_conn().await?;
// journal_mode is persisted, it is sufficient to change it only for one handle.
conn.pragma_update(None, "journal_mode", &"WAL".to_string())?;
// Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
conn.pragma_update(None, "synchronous", &"NORMAL".to_string())?;
Ok(())
})
.await?;
}
// (1) update low-level database structure.
// this should be done before updates that use high-level objects that
@@ -256,40 +252,14 @@ impl Sql {
&self,
) -> Result<r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>> {
let lock = self.pool.read().await;
let pool = lock.as_ref().ok_or(Error::SqlNoConnection)?;
let pool = lock
.as_ref()
.ok_or_else(|| format_err!("No SQL connection"))?;
let conn = pool.get()?;
Ok(conn)
}
pub async fn with_conn<G, H>(&self, g: G) -> anyhow::Result<H>
where
H: Send + 'static,
G: Send
+ 'static
+ FnOnce(
r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>,
) -> anyhow::Result<H>,
{
let lock = self.pool.read().await;
let pool = lock.as_ref().ok_or(Error::SqlNoConnection)?;
let conn = pool.get()?;
g(conn)
}
pub async fn with_conn_async<G, H, Fut>(&self, mut g: G) -> Result<H>
where
G: FnMut(r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>) -> Fut,
Fut: Future<Output = Result<H>> + Send,
{
let lock = self.pool.read().await;
let pool = lock.as_ref().ok_or(Error::SqlNoConnection)?;
let conn = pool.get()?;
g(conn).await
}
/// Used for executing `SELECT COUNT` statements only. Returns the resulting count.
pub async fn count(
&self,
@@ -331,39 +301,34 @@ impl Sql {
H: Send + 'static,
G: Send + 'static + FnOnce(&mut rusqlite::Transaction<'_>) -> anyhow::Result<H>,
{
self.with_conn(move |mut conn| {
let conn2 = &mut conn;
let mut transaction = conn2.transaction()?;
let ret = callback(&mut transaction);
let mut conn = self.get_conn().await?;
let mut transaction = conn.transaction()?;
let ret = callback(&mut transaction);
match ret {
Ok(ret) => {
transaction.commit()?;
Ok(ret)
}
Err(err) => {
transaction.rollback()?;
Err(err)
}
match ret {
Ok(ret) => {
transaction.commit()?;
Ok(ret)
}
})
.await
Err(err) => {
transaction.rollback()?;
Err(err)
}
}
}
/// Query the database if the requested table already exists.
pub async fn table_exists(&self, name: impl AsRef<str>) -> anyhow::Result<bool> {
let name = name.as_ref().to_string();
self.with_conn(move |conn| {
let mut exists = false;
conn.pragma(None, "table_info", &name, |_row| {
// will only be executed if the info was found
exists = true;
Ok(())
})?;
let conn = self.get_conn().await?;
let mut exists = false;
conn.pragma(None, "table_info", &name, |_row| {
// will only be executed if the info was found
exists = true;
Ok(())
})?;
Ok(exists)
})
.await
Ok(exists)
}
/// Check if a column exists in a given table.
@@ -374,21 +339,19 @@ impl Sql {
) -> anyhow::Result<bool> {
let table_name = table_name.as_ref().to_string();
let col_name = col_name.as_ref().to_string();
self.with_conn(move |conn| {
let mut exists = false;
// `PRAGMA table_info` returns one row per column,
// each row containing 0=cid, 1=name, 2=type, 3=notnull, 4=dflt_value
conn.pragma(None, "table_info", &table_name, |row| {
let curr_name: String = row.get(1)?;
if col_name == curr_name {
exists = true;
}
Ok(())
})?;
let conn = self.get_conn().await?;
let mut exists = false;
// `PRAGMA table_info` returns one row per column,
// each row containing 0=cid, 1=name, 2=type, 3=notnull, 4=dflt_value
conn.pragma(None, "table_info", &table_name, |row| {
let curr_name: String = row.get(1)?;
if col_name == curr_name {
exists = true;
}
Ok(())
})?;
Ok(exists)
})
.await
Ok(exists)
}
/// Execute a query which is expected to return zero or one row.
@@ -401,14 +364,11 @@ impl Sql {
where
F: FnOnce(&rusqlite::Row) -> rusqlite::Result<T>,
{
let res = match self.query_row(sql, params, f).await {
let conn = self.get_conn().await?;
let res = match conn.query_row(sql.as_ref(), params, f) {
Ok(res) => Ok(Some(res)),
Err(Error::Sql(rusqlite::Error::QueryReturnedNoRows)) => Ok(None),
Err(Error::Sql(rusqlite::Error::InvalidColumnType(
_,
_,
rusqlite::types::Type::Null,
))) => Ok(None),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(rusqlite::Error::InvalidColumnType(_, _, rusqlite::types::Type::Null)) => Ok(None),
Err(err) => Err(err),
}?;
Ok(res)
@@ -434,10 +394,6 @@ impl Sql {
/// Setting `None` deletes the value. On failure an error message
/// will already have been logged.
pub async fn set_raw_config(&self, key: impl AsRef<str>, value: Option<&str>) -> Result<()> {
if !self.is_open().await {
return Err(Error::SqlNoConnection);
}
let key = key.as_ref();
if let Some(value) = value {
let exists = self
@@ -470,9 +426,6 @@ impl Sql {
/// Get configuration options from the database.
pub async fn get_raw_config(&self, key: impl AsRef<str>) -> Result<Option<String>> {
if !self.is_open().await || key.as_ref().is_empty() {
return Err(Error::SqlNoConnection);
}
let value = self
.query_get_value(
"SELECT value FROM config WHERE keyname=?;",

View File

@@ -1,21 +0,0 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Sqlite error: {0:?}")]
Sql(#[from] rusqlite::Error),
#[error("Sqlite Connection Pool Error: {0:?}")]
ConnectionPool(#[from] r2d2::Error),
#[error("Sqlite: Connection closed")]
SqlNoConnection,
#[error("Sqlite: Already open")]
SqlAlreadyOpen,
#[error("Sqlite: Failed to open")]
SqlFailedToOpen,
#[error("{0}")]
Io(#[from] std::io::Error),
// #[error("{0:?}")]
// BlobError(#[from] crate::blob::BlobError),
#[error("{0}")]
Other(#[from] anyhow::Error),
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -1,10 +1,12 @@
use super::{Result, Sql};
use anyhow::Result;
use crate::config::Config;
use crate::constants::ShowEmails;
use crate::context::Context;
use crate::dc_tools::EmailAddress;
use crate::imap;
use crate::provider::get_provider_by_domain;
use crate::sql::Sql;
const DBVERSION: i32 = 68;
const VERSION_CFG: &str = "dbversion";

View File

@@ -4,6 +4,7 @@
//!
//! Tokens are used in countermitm verification protocols.
use anyhow::Result;
use deltachat_derive::{FromSql, ToSql};
use crate::chat::ChatId;
@@ -58,7 +59,7 @@ pub async fn lookup(
context: &Context,
namespace: Namespace,
chat: Option<ChatId>,
) -> crate::sql::Result<Option<String>> {
) -> Result<Option<String>> {
let token = match chat {
Some(chat_id) => {
context