Merge pull request #490 from deltachat/fix/location-sql

fix(location): do not sql recurse in location sending
This commit is contained in:
björn petersen
2019-09-11 19:40:46 +02:00
committed by GitHub

View File

@@ -265,16 +265,16 @@ pub fn set(context: &Context, latitude: f64, longitude: f64, accuracy: f64) -> l
if latitude == 0.0 && longitude == 0.0 { if latitude == 0.0 && longitude == 0.0 {
return 1; return 1;
} }
let mut continue_streaming = false;
context.sql.query_map( if let Ok(chats) = context.sql.query_map(
"SELECT id FROM chats WHERE locations_send_until>?;", "SELECT id FROM chats WHERE locations_send_until>?;",
params![time()], |row| row.get::<_, i32>(0), params![time()],
|chats| { |row| row.get::<_, i32>(0),
let mut continue_streaming = false; |chats| chats.collect::<Result<Vec<_>, _>>().map_err(Into::into),
) {
for chat in chats { for chat_id in chats {
let chat_id = chat?; if let Err(err) = context.sql.execute(
context.sql.execute(
"INSERT INTO locations \ "INSERT INTO locations \
(latitude, longitude, accuracy, timestamp, chat_id, from_id) VALUES (?,?,?,?,?,?);", (latitude, longitude, accuracy, timestamp, chat_id, from_id) VALUES (?,?,?,?,?,?);",
params![ params![
@@ -285,16 +285,19 @@ pub fn set(context: &Context, latitude: f64, longitude: f64, accuracy: f64) -> l
chat_id, chat_id,
1, 1,
] ]
)?; ) {
warn!(context, "failed to store location {:?}", err);
} else {
continue_streaming = true; continue_streaming = true;
} }
if continue_streaming {
context.call_cb(Event::LOCATION_CHANGED, 1, 0);
};
schedule_MAYBE_SEND_LOCATIONS(context, 0);
Ok(continue_streaming as libc::c_int)
} }
).unwrap_or_default() if continue_streaming {
context.call_cb(Event::LOCATION_CHANGED, 1, 0);
};
schedule_MAYBE_SEND_LOCATIONS(context, 0);
}
continue_streaming as libc::c_int
} }
pub fn get_range( pub fn get_range(
@@ -548,73 +551,78 @@ pub fn job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context: &Context, _job: &Job) {
" ----------------- MAYBE_SEND_LOCATIONS -------------- ", " ----------------- MAYBE_SEND_LOCATIONS -------------- ",
); );
context if let Ok(rows) = context.sql.query_map(
.sql "SELECT id, locations_send_begin, locations_last_sent \
.query_map( FROM chats \
"SELECT id, locations_send_begin, locations_last_sent \ WHERE locations_send_until>?;",
FROM chats \ params![now],
WHERE locations_send_until>?;", |row| {
params![now], let chat_id: i32 = row.get(0)?;
|row| { let locations_send_begin: i64 = row.get(1)?;
let chat_id: i32 = row.get(0)?; let locations_last_sent: i64 = row.get(2)?;
let locations_send_begin: i64 = row.get(1)?; continue_streaming = 1;
let locations_last_sent: i64 = row.get(2)?;
continue_streaming = 1;
// be a bit tolerant as the timer may not align exactly with time(NULL) // be a bit tolerant as the timer may not align exactly with time(NULL)
if now - locations_last_sent < (60 - 3) { if now - locations_last_sent < (60 - 3) {
Ok(None) Ok(None)
} else { } else {
Ok(Some((chat_id, locations_send_begin, locations_last_sent))) Ok(Some((chat_id, locations_send_begin, locations_last_sent)))
} }
}, },
|rows| { |rows| {
context.sql.prepare( rows.filter_map(|v| v.transpose())
"SELECT id \ .collect::<Result<Vec<_>, _>>()
FROM locations \ .map_err(Into::into)
WHERE from_id=? \ },
AND timestamp>=? \ ) {
AND timestamp>? \ let msgs = context
AND independent=0 \ .sql
ORDER BY timestamp;", .prepare(
|mut stmt_locations, _| { "SELECT id \
for (chat_id, locations_send_begin, locations_last_sent) in FROM locations \
rows.filter_map(|r| match r { WHERE from_id=? \
Ok(Some(v)) => Some(v), AND timestamp>=? \
_ => None, AND timestamp>? \
}) AND independent=0 \
{ ORDER BY timestamp;",
// TODO: do I need to reset? |mut stmt_locations, _| {
let msgs = rows
.into_iter()
.filter_map(|(chat_id, locations_send_begin, locations_last_sent)| {
if !stmt_locations if !stmt_locations
.exists(params![1, locations_send_begin, locations_last_sent,]) .exists(params![1, locations_send_begin, locations_last_sent,])
.unwrap_or_default() .unwrap_or_default()
{ {
// if there is no new location, there's nothing to send. // if there is no new location, there's nothing to send.
// however, maybe we want to bypass this test eg. 15 minutes // however, maybe we want to bypass this test eg. 15 minutes
continue; None
} else {
// pending locations are attached automatically to every message,
// so also to this empty text message.
// DC_CMD_LOCATION is only needed to create a nicer subject.
//
// for optimisation and to avoid flooding the sending queue,
// we could sending these messages only if we're really online.
// the easiest way to determine this, is to check for an empty message queue.
// (might not be 100%, however, as positions are sent combined later
// and dc_set_location() is typically called periodically, this is ok)
let mut msg = dc_msg_new(context, Viewtype::Text);
msg.hidden = true;
msg.param.set_int(Param::Cmd, 9);
Some((chat_id, msg))
} }
// pending locations are attached automatically to every message, })
// so also to this empty text message. .collect::<Vec<_>>();
// DC_CMD_LOCATION is only needed to create a nicer subject. Ok(msgs)
// },
// for optimisation and to avoid flooding the sending queue, )
// we could sending these messages only if we're really online. .unwrap_or_default(); // TODO: Better error handling
// the easiest way to determine this, is to check for an empty message queue.
// (might not be 100%, however, as positions are sent combined later
// and dc_set_location() is typically called periodically, this is ok)
let mut msg = dc_msg_new(context, Viewtype::Text);
msg.hidden = true;
msg.param.set_int(Param::Cmd, 9);
// TODO: handle cleanup on error
chat::send_msg(context, chat_id as u32, &mut msg).unwrap();
}
Ok(())
},
)
},
)
.unwrap(); // TODO: Better error handling
for (chat_id, mut msg) in msgs.into_iter() {
// TODO: better error handling
chat::send_msg(context, chat_id as u32, &mut msg).unwrap();
}
}
if 0 != continue_streaming { if 0 != continue_streaming {
schedule_MAYBE_SEND_LOCATIONS(context, 0x1); schedule_MAYBE_SEND_LOCATIONS(context, 0x1);
} }