sql: switch from sqlx to rusqlite

This commit is contained in:
link2xt
2021-04-25 00:00:00 +00:00
parent d179dced4e
commit 8610b0c945
32 changed files with 2336 additions and 2425 deletions

View File

@@ -2,10 +2,8 @@
use std::convert::TryFrom;
use anyhow::{ensure, Error};
use async_std::prelude::*;
use bitflags::bitflags;
use quick_xml::events::{BytesEnd, BytesStart, BytesText};
use sqlx::Row;
use crate::chat::{self, ChatId};
use crate::config::Config;
@@ -201,15 +199,15 @@ pub async fn send_locations_to_chat(context: &Context, chat_id: ChatId, seconds:
if context
.sql
.execute(
sqlx::query(
"UPDATE chats \
"UPDATE chats \
SET locations_send_begin=?, \
locations_send_until=? \
WHERE id=?",
)
.bind(if 0 != seconds { now } else { 0 })
.bind(if 0 != seconds { now + seconds } else { 0 })
.bind(chat_id),
paramsv![
if 0 != seconds { now } else { 0 },
if 0 != seconds { now + seconds } else { 0 },
chat_id,
],
)
.await
.is_ok()
@@ -262,17 +260,16 @@ pub async fn is_sending_locations_to_chat(context: &Context, chat_id: Option<Cha
Some(chat_id) => context
.sql
.exists(
sqlx::query("SELECT COUNT(id) FROM chats WHERE id=? AND locations_send_until>?;")
.bind(chat_id)
.bind(time()),
"SELECT COUNT(id) FROM chats WHERE id=? AND locations_send_until>?;",
paramsv![chat_id, time()],
)
.await
.unwrap_or_default(),
None => context
.sql
.exists(
sqlx::query("SELECT COUNT(id) FROM chats WHERE locations_send_until>?;")
.bind(time()),
"SELECT COUNT(id) FROM chats WHERE locations_send_until>?;",
paramsv![time()],
)
.await
.unwrap_or_default(),
@@ -285,29 +282,28 @@ pub async fn set(context: &Context, latitude: f64, longitude: f64, accuracy: f64
}
let mut continue_streaming = false;
if let Ok(mut chats) = context
if let Ok(chats) = context
.sql
.fetch(sqlx::query("SELECT id FROM chats WHERE locations_send_until>?;").bind(time()))
.query_map(
"SELECT id FROM chats WHERE locations_send_until>?;",
paramsv![time()],
|row| row.get::<_, i32>(0),
|chats| chats.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await
.map(|rows| rows.map(|row| row?.try_get::<i32, _>(0)))
{
while let Some(chat_id) = chats.next().await {
let chat_id = match chat_id {
Ok(id) => id,
Err(_) => break,
};
for chat_id in chats {
if let Err(err) = context.sql.execute(
sqlx::query(
"INSERT INTO locations \
(latitude, longitude, accuracy, timestamp, chat_id, from_id) VALUES (?,?,?,?,?,?);"
)
.bind(latitude)
.bind(longitude)
.bind(accuracy)
.bind(time())
.bind(chat_id)
.bind(DC_CONTACT_ID_SELF)
(latitude, longitude, accuracy, timestamp, chat_id, from_id) VALUES (?,?,?,?,?,?);",
paramsv![
latitude,
longitude,
accuracy,
time(),
chat_id,
DC_CONTACT_ID_SELF,
]
).await {
warn!(context, "failed to store location {:?}", err);
} else {
@@ -342,50 +338,54 @@ pub async fn get_range(
Some(contact_id) => (0, contact_id),
None => (1, 0), // this contact_id is unused
};
let list = context
.sql
.fetch(
sqlx::query(
"SELECT l.id, l.latitude, l.longitude, l.accuracy, l.timestamp, l.independent, \
.query_map(
"SELECT l.id, l.latitude, l.longitude, l.accuracy, l.timestamp, l.independent, \
COALESCE(m.id, 0) AS msg_id, l.from_id, l.chat_id, COALESCE(m.txt, '') AS txt \
FROM locations l LEFT JOIN msgs m ON l.id=m.location_id WHERE (? OR l.chat_id=?) \
AND (? OR l.from_id=?) \
AND (l.independent=1 OR (l.timestamp>=? AND l.timestamp<=?)) \
ORDER BY l.timestamp DESC, l.id DESC, msg_id DESC;",
)
.bind(disable_chat_id)
.bind(chat_id)
.bind(disable_contact_id)
.bind(contact_id as i64)
.bind(timestamp_from)
.bind(timestamp_to),
paramsv![
disable_chat_id,
chat_id,
disable_contact_id,
contact_id as i32,
timestamp_from,
timestamp_to,
],
|row| {
let msg_id = row.get(6)?;
let txt: String = row.get(9)?;
let marker = if msg_id != 0 && is_marker(&txt) {
Some(txt)
} else {
None
};
let loc = Location {
location_id: row.get(0)?,
latitude: row.get(1)?,
longitude: row.get(2)?,
accuracy: row.get(3)?,
timestamp: row.get(4)?,
independent: row.get(5)?,
msg_id,
contact_id: row.get(7)?,
chat_id: row.get(8)?,
marker,
};
Ok(loc)
},
|locations| {
let mut ret = Vec::new();
for location in locations {
ret.push(location?);
}
Ok(ret)
},
)
.await?
.map(|row| {
let row = row?;
let msg_id = row.try_get(6)?;
let txt: String = row.try_get(9)?;
let marker = if msg_id != 0 && is_marker(&txt) {
Some(txt)
} else {
None
};
let loc = Location {
location_id: row.try_get(0)?,
latitude: row.try_get(1)?,
longitude: row.try_get(2)?,
accuracy: row.try_get(3)?,
timestamp: row.try_get(4)?,
independent: row.try_get(5)?,
msg_id,
contact_id: row.try_get(7)?,
chat_id: row.try_get(8)?,
marker,
};
Ok(loc)
})
.collect::<sqlx::Result<_>>()
.await?;
Ok(list)
}
@@ -403,7 +403,7 @@ fn is_marker(txt: &str) -> bool {
pub async fn delete_all(context: &Context) -> Result<(), Error> {
context
.sql
.execute(sqlx::query("DELETE FROM locations;"))
.execute("DELETE FROM locations;", paramsv![])
.await?;
context.emit_event(EventType::LocationChanged(None));
Ok(())
@@ -417,65 +417,70 @@ pub async fn get_kml(context: &Context, chat_id: ChatId) -> Result<(String, u32)
.await?
.unwrap_or_default();
let (locations_send_begin, locations_send_until, locations_last_sent) = {
let row = context.sql.fetch_one(
sqlx::query(
"SELECT locations_send_begin, locations_send_until, locations_last_sent FROM chats WHERE id=?;"
)
.bind(chat_id)
).await?;
let (locations_send_begin, locations_send_until, locations_last_sent) = context.sql.query_row(
"SELECT locations_send_begin, locations_send_until, locations_last_sent FROM chats WHERE id=?;",
paramsv![chat_id], |row| {
let send_begin: i64 = row.get(0)?;
let send_until: i64 = row.get(1)?;
let last_sent: i64 = row.get(2)?;
let send_begin: i64 = row.try_get(0)?;
let send_until: i64 = row.try_get(1)?;
let last_sent: i64 = row.try_get(2)?;
(send_begin, send_until, last_sent)
};
Ok((send_begin, send_until, last_sent))
})
.await?;
let now = time();
let mut location_count = 0;
let mut ret = String::new();
if locations_send_begin != 0 && now <= locations_send_until {
ret += &format!(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<kml xmlns=\"http://www.opengis.net/kml/2.2\">\n<Document addr=\"{}\">\n",
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n\
<kml xmlns=\"http://www.opengis.net/kml/2.2\">\n<Document addr=\"{}\">\n",
self_addr,
);
let mut rows = context.sql.fetch(
sqlx::query(
context
.sql
.query_map(
"SELECT id, latitude, longitude, accuracy, timestamp \
FROM locations WHERE from_id=? \
AND timestamp>=? \
AND (timestamp>=? OR timestamp=(SELECT MAX(timestamp) FROM locations WHERE from_id=?)) \
AND (timestamp>=? OR \
timestamp=(SELECT MAX(timestamp) FROM locations WHERE from_id=?)) \
AND independent=0 \
GROUP BY timestamp \
ORDER BY timestamp;"
ORDER BY timestamp;",
paramsv![
DC_CONTACT_ID_SELF,
locations_send_begin,
locations_last_sent,
DC_CONTACT_ID_SELF
],
|row| {
let location_id: i32 = row.get(0)?;
let latitude: f64 = row.get(1)?;
let longitude: f64 = row.get(2)?;
let accuracy: f64 = row.get(3)?;
let timestamp = get_kml_timestamp(row.get(4)?);
Ok((location_id, latitude, longitude, accuracy, timestamp))
},
|rows| {
for row in rows {
let (location_id, latitude, longitude, accuracy, timestamp) = row?;
ret += &format!(
"<Placemark>\
<Timestamp><when>{}</when></Timestamp>\
<Point><coordinates accuracy=\"{}\">{},{}</coordinates></Point>\
</Placemark>\n",
timestamp, accuracy, longitude, latitude
);
location_count += 1;
last_added_location_id = location_id as u32;
}
Ok(())
},
)
.bind(DC_CONTACT_ID_SELF)
.bind(locations_send_begin)
.bind(locations_last_sent)
.bind(DC_CONTACT_ID_SELF)
).await?;
while let Some(row) = rows.next().await {
let row = row?;
let location_id: u32 = row.try_get(0)?;
let latitude: f64 = row.try_get(1)?;
let longitude: f64 = row.try_get(2)?;
let accuracy: f64 = row.try_get(3)?;
let timestamp = get_kml_timestamp(row.try_get(4)?);
ret += &format!(
"<Placemark><Timestamp><when>{}</when></Timestamp><Point><coordinates accuracy=\"{}\">{},{}</coordinates></Point></Placemark>\n",
timestamp,
accuracy,
longitude,
latitude
);
location_count += 1;
last_added_location_id = location_id;
}
.await?;
ret += "</Document>\n</kml>";
}
@@ -516,9 +521,8 @@ pub async fn set_kml_sent_timestamp(
context
.sql
.execute(
sqlx::query("UPDATE chats SET locations_last_sent=? WHERE id=?;")
.bind(timestamp)
.bind(chat_id),
"UPDATE chats SET locations_last_sent=? WHERE id=?;",
paramsv![timestamp, chat_id],
)
.await?;
Ok(())
@@ -532,9 +536,8 @@ pub async fn set_msg_location_id(
context
.sql
.execute(
sqlx::query("UPDATE msgs SET location_id=? WHERE id=?;")
.bind(location_id)
.bind(msg_id),
"UPDATE msgs SET location_id=? WHERE id=?;",
paramsv![location_id, msg_id],
)
.await?;
@@ -553,7 +556,6 @@ pub async fn save(
let mut newest_timestamp = 0;
let mut newest_location_id = 0;
let stmt_test = "SELECT COUNT(*) FROM locations WHERE timestamp=? AND from_id=?";
let stmt_insert = "INSERT INTO locations\
(timestamp, from_id, chat_id, latitude, longitude, accuracy, independent) \
VALUES (?,?,?,?,?,?,?);";
@@ -566,30 +568,39 @@ pub async fn save(
accuracy,
..
} = location;
let exists = context
let (loc_id, ts) = context
.sql
.exists(sqlx::query(stmt_test).bind(timestamp).bind(contact_id))
.await?;
if independent || !exists {
let row_id = context
.sql
.insert(
sqlx::query(stmt_insert)
.bind(timestamp)
.bind(contact_id)
.bind(chat_id)
.bind(latitude)
.bind(longitude)
.bind(accuracy)
.bind(independent),
)
.await?;
.with_conn(move |conn| {
let mut stmt_test = conn
.prepare_cached("SELECT id FROM locations WHERE timestamp=? AND from_id=?")?;
let mut stmt_insert = conn.prepare_cached(stmt_insert)?;
if timestamp > newest_timestamp {
newest_timestamp = timestamp;
newest_location_id = row_id;
}
}
let exists = stmt_test.exists(paramsv![timestamp, contact_id as i32])?;
if independent || !exists {
stmt_insert.execute(paramsv![
timestamp,
contact_id as i32,
chat_id,
latitude,
longitude,
accuracy,
independent,
])?;
if timestamp > newest_timestamp {
// okay to drop, as we use cached prepared statements
drop(stmt_test);
drop(stmt_insert);
newest_timestamp = timestamp;
newest_location_id = conn.last_insert_rowid();
}
}
Ok((newest_location_id, newest_timestamp))
})
.await?;
newest_timestamp = ts;
newest_location_id = loc_id;
}
Ok(u32::try_from(newest_location_id)?)
@@ -605,21 +616,15 @@ pub(crate) async fn job_maybe_send_locations(context: &Context, _job: &Job) -> j
let rows = context
.sql
.fetch(
sqlx::query(
"SELECT id, locations_send_begin, locations_last_sent \
.query_map(
"SELECT id, locations_send_begin, locations_last_sent \
FROM chats \
WHERE locations_send_until>?;",
)
.bind(now),
)
.await
.map(|rows| {
rows.map(|row| -> sqlx::Result<Option<_>> {
let row = row?;
let chat_id: ChatId = row.try_get(0)?;
let locations_send_begin: i64 = row.try_get(1)?;
let locations_last_sent: i64 = row.try_get(2)?;
paramsv![now],
|row| {
let chat_id: ChatId = row.get(0)?;
let locations_send_begin: i64 = row.get(1)?;
let locations_last_sent: i64 = row.get(2)?;
continue_streaming = true;
// be a bit tolerant as the timer may not align exactly with time(NULL)
@@ -628,55 +633,64 @@ pub(crate) async fn job_maybe_send_locations(context: &Context, _job: &Job) -> j
} else {
Ok(Some((chat_id, locations_send_begin, locations_last_sent)))
}
})
.filter_map(|v| v.transpose())
});
},
|rows| {
rows.filter_map(|v| v.transpose())
.collect::<Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.await;
let stmt = "SELECT COUNT(*) \
if rows.is_ok() {
let msgs = context
.sql
.with_conn(move |conn| {
let rows = rows.unwrap();
let mut stmt_locations = conn.prepare_cached(
"SELECT id \
FROM locations \
WHERE from_id=? \
AND timestamp>=? \
AND timestamp>? \
AND independent=0 \
ORDER BY timestamp;";
ORDER BY timestamp;",
)?;
if let Ok(mut rows) = rows {
let mut msgs = Vec::new();
while let Some(row) = rows.next().await {
let (chat_id, locations_send_begin, locations_last_sent) = match row {
Ok(row) => row,
Err(_) => break,
};
let exists = context
.sql
.exists(
sqlx::query(stmt)
.bind(DC_CONTACT_ID_SELF)
.bind(locations_send_begin)
.bind(locations_last_sent),
)
.await
.unwrap_or_default(); // TODO: better error handling
let mut msgs = Vec::new();
for (chat_id, locations_send_begin, locations_last_sent) in &rows {
if !stmt_locations
.exists(paramsv![
DC_CONTACT_ID_SELF,
*locations_send_begin,
*locations_last_sent,
])
.unwrap_or_default()
{
// if there is no new location, there's nothing to send.
// however, maybe we want to bypass this test eg. 15 minutes
} else {
// pending locations are attached automatically to every message,
// so also to this empty text message.
// DC_CMD_LOCATION is only needed to create a nicer subject.
//
// for optimisation and to avoid flooding the sending queue,
// we could sending these messages only if we're really online.
// the easiest way to determine this, is to check for an empty message queue.
// (might not be 100%, however, as positions are sent combined later
// and dc_set_location() is typically called periodically, this is ok)
let mut msg = Message::new(Viewtype::Text);
msg.hidden = true;
msg.param.set_cmd(SystemMessage::LocationOnly);
msgs.push((*chat_id, msg));
}
}
if !exists {
// if there is no new location, there's nothing to send.
// however, maybe we want to bypass this test eg. 15 minutes
} else {
// pending locations are attached automatically to every message,
// so also to this empty text message.
// DC_CMD_LOCATION is only needed to create a nicer subject.
//
// for optimisation and to avoid flooding the sending queue,
// we could sending these messages only if we're really online.
// the easiest way to determine this, is to check for an empty message queue.
// (might not be 100%, however, as positions are sent combined later
// and dc_set_location() is typically called periodically, this is ok)
let mut msg = Message::new(Viewtype::Text);
msg.hidden = true;
msg.param.set_cmd(SystemMessage::LocationOnly);
msgs.push((chat_id, msg));
}
}
Ok(msgs)
})
.await
.unwrap_or_default(); // TODO: better error handling
for (chat_id, mut msg) in msgs.into_iter() {
// TODO: better error handling
@@ -702,16 +716,16 @@ pub(crate) async fn job_maybe_send_locations_ended(
let chat_id = ChatId::new(job.foreign_id);
let (send_begin, send_until) = job_try!(context
.sql
.fetch_one(
sqlx::query(
let (send_begin, send_until) = job_try!(
context
.sql
.query_row(
"SELECT locations_send_begin, locations_send_until FROM chats WHERE id=?",
paramsv![chat_id],
|row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)),
)
.bind(chat_id)
)
.await
.and_then(|row| { Ok((row.try_get::<i64, _>(0)?, row.try_get::<i64, _>(1)?)) }));
.await
);
if !(send_begin != 0 && time() <= send_until) {
// still streaming -
@@ -723,12 +737,10 @@ pub(crate) async fn job_maybe_send_locations_ended(
context
.sql
.execute(
sqlx::query(
"UPDATE chats \
"UPDATE chats \
SET locations_send_begin=0, locations_send_until=0 \
WHERE id=?"
)
.bind(chat_id)
WHERE id=?",
paramsv![chat_id],
)
.await
);