From 64b00fce7de035c759adf9e569d3af638b8c1553 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 11 Sep 2019 17:46:05 +0200 Subject: [PATCH 1/3] fix(location): do not sql recurse in location sending --- src/location.rs | 125 +++++++++++++++++++++++------------------------- 1 file changed, 61 insertions(+), 64 deletions(-) diff --git a/src/location.rs b/src/location.rs index 504276086..44f122e7e 100644 --- a/src/location.rs +++ b/src/location.rs @@ -548,73 +548,70 @@ pub fn job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context: &Context, _job: &Job) { " ----------------- MAYBE_SEND_LOCATIONS -------------- ", ); - context - .sql - .query_map( - "SELECT id, locations_send_begin, locations_last_sent \ - FROM chats \ - WHERE locations_send_until>?;", - params![now], - |row| { - let chat_id: i32 = row.get(0)?; - let locations_send_begin: i64 = row.get(1)?; - let locations_last_sent: i64 = row.get(2)?; - continue_streaming = 1; + if let Ok(rows) = context.sql.query_map( + "SELECT id, locations_send_begin, locations_last_sent \ + FROM chats \ + WHERE locations_send_until>?;", + params![now], + |row| { + let chat_id: i32 = row.get(0)?; + let locations_send_begin: i64 = row.get(1)?; + let locations_last_sent: i64 = row.get(2)?; + continue_streaming = 1; - // be a bit tolerant as the timer may not align exactly with time(NULL) - if now - locations_last_sent < (60 - 3) { - Ok(None) - } else { - Ok(Some((chat_id, locations_send_begin, locations_last_sent))) - } - }, - |rows| { - context.sql.prepare( - "SELECT id \ - FROM locations \ - WHERE from_id=? \ - AND timestamp>=? \ - AND timestamp>? \ - AND independent=0 \ - ORDER BY timestamp;", - |mut stmt_locations, _| { - for (chat_id, locations_send_begin, locations_last_sent) in - rows.filter_map(|r| match r { - Ok(Some(v)) => Some(v), - _ => None, - }) + // be a bit tolerant as the timer may not align exactly with time(NULL) + if now - locations_last_sent < (60 - 3) { + Ok(None) + } else { + Ok(Some((chat_id, locations_send_begin, locations_last_sent))) + } + }, + |rows| { + rows.filter_map(|v| v.transpose()) + .collect::, _>>() + .map_err(Into::into) + }, + ) { + context + .sql + .prepare( + "SELECT id \ + FROM locations \ + WHERE from_id=? \ + AND timestamp>=? \ + AND timestamp>? \ + AND independent=0 \ + ORDER BY timestamp;", + |mut stmt_locations, _| { + for (chat_id, locations_send_begin, locations_last_sent) in rows { + if !stmt_locations + .exists(params![1, locations_send_begin, locations_last_sent,]) + .unwrap_or_default() { - // TODO: do I need to reset? - if !stmt_locations - .exists(params![1, locations_send_begin, locations_last_sent,]) - .unwrap_or_default() - { - // if there is no new location, there's nothing to send. - // however, maybe we want to bypass this test eg. 15 minutes - continue; - } - // pending locations are attached automatically to every message, - // so also to this empty text message. - // DC_CMD_LOCATION is only needed to create a nicer subject. - // - // for optimisation and to avoid flooding the sending queue, - // we could sending these messages only if we're really online. - // the easiest way to determine this, is to check for an empty message queue. - // (might not be 100%, however, as positions are sent combined later - // and dc_set_location() is typically called periodically, this is ok) - let mut msg = dc_msg_new(context, Viewtype::Text); - msg.hidden = true; - msg.param.set_int(Param::Cmd, 9); - // TODO: handle cleanup on error - chat::send_msg(context, chat_id as u32, &mut msg).unwrap(); + // if there is no new location, there's nothing to send. + // however, maybe we want to bypass this test eg. 15 minutes + continue; } - Ok(()) - }, - ) - }, - ) - .unwrap(); // TODO: Better error handling - + // pending locations are attached automatically to every message, + // so also to this empty text message. + // DC_CMD_LOCATION is only needed to create a nicer subject. + // + // for optimisation and to avoid flooding the sending queue, + // we could sending these messages only if we're really online. + // the easiest way to determine this, is to check for an empty message queue. + // (might not be 100%, however, as positions are sent combined later + // and dc_set_location() is typically called periodically, this is ok) + let mut msg = dc_msg_new(context, Viewtype::Text); + msg.hidden = true; + msg.param.set_int(Param::Cmd, 9); + // TODO: handle cleanup on error + chat::send_msg(context, chat_id as u32, &mut msg).unwrap(); + } + Ok(()) + }, + ) + .unwrap(); // TODO: Better error handling + } if 0 != continue_streaming { schedule_MAYBE_SEND_LOCATIONS(context, 0x1); } From ca9dccfcd7cd111caf4f3ffc1b2a4cd79c279361 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 11 Sep 2019 17:57:59 +0200 Subject: [PATCH 2/3] fix(location): another nested sql --- src/location.rs | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/location.rs b/src/location.rs index 44f122e7e..aae7c39c5 100644 --- a/src/location.rs +++ b/src/location.rs @@ -265,16 +265,16 @@ pub fn set(context: &Context, latitude: f64, longitude: f64, accuracy: f64) -> l if latitude == 0.0 && longitude == 0.0 { return 1; } + let mut continue_streaming = false; - context.sql.query_map( + if let Ok(chats) = context.sql.query_map( "SELECT id FROM chats WHERE locations_send_until>?;", - params![time()], |row| row.get::<_, i32>(0), - |chats| { - let mut continue_streaming = false; - - for chat in chats { - let chat_id = chat?; - context.sql.execute( + params![time()], + |row| row.get::<_, i32>(0), + |chats| chats.collect::, _>>().map_err(Into::into), + ) { + for chat_id in chats { + if let Err(err) = context.sql.execute( "INSERT INTO locations \ (latitude, longitude, accuracy, timestamp, chat_id, from_id) VALUES (?,?,?,?,?,?);", params![ @@ -285,16 +285,19 @@ pub fn set(context: &Context, latitude: f64, longitude: f64, accuracy: f64) -> l chat_id, 1, ] - )?; + ) { + warn!(context, "failed to store location {:?}", err); + } else { continue_streaming = true; } - if continue_streaming { - context.call_cb(Event::LOCATION_CHANGED, 1, 0); - }; - schedule_MAYBE_SEND_LOCATIONS(context, 0); - Ok(continue_streaming as libc::c_int) } - ).unwrap_or_default() + if continue_streaming { + context.call_cb(Event::LOCATION_CHANGED, 1, 0); + }; + schedule_MAYBE_SEND_LOCATIONS(context, 0); + } + + continue_streaming as libc::c_int } pub fn get_range( From dd2e3d35fd866b5ef7300a3bd9e78935bf5ab3e1 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 11 Sep 2019 18:13:21 +0200 Subject: [PATCH 3/3] fix(location): another nesting --- src/location.rs | 62 ++++++++++++++++++++++++++++--------------------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/src/location.rs b/src/location.rs index aae7c39c5..31be5ab27 100644 --- a/src/location.rs +++ b/src/location.rs @@ -575,7 +575,7 @@ pub fn job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context: &Context, _job: &Job) { .map_err(Into::into) }, ) { - context + let msgs = context .sql .prepare( "SELECT id \ @@ -586,34 +586,42 @@ pub fn job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context: &Context, _job: &Job) { AND independent=0 \ ORDER BY timestamp;", |mut stmt_locations, _| { - for (chat_id, locations_send_begin, locations_last_sent) in rows { - if !stmt_locations - .exists(params![1, locations_send_begin, locations_last_sent,]) - .unwrap_or_default() - { - // if there is no new location, there's nothing to send. - // however, maybe we want to bypass this test eg. 15 minutes - continue; - } - // pending locations are attached automatically to every message, - // so also to this empty text message. - // DC_CMD_LOCATION is only needed to create a nicer subject. - // - // for optimisation and to avoid flooding the sending queue, - // we could sending these messages only if we're really online. - // the easiest way to determine this, is to check for an empty message queue. - // (might not be 100%, however, as positions are sent combined later - // and dc_set_location() is typically called periodically, this is ok) - let mut msg = dc_msg_new(context, Viewtype::Text); - msg.hidden = true; - msg.param.set_int(Param::Cmd, 9); - // TODO: handle cleanup on error - chat::send_msg(context, chat_id as u32, &mut msg).unwrap(); - } - Ok(()) + let msgs = rows + .into_iter() + .filter_map(|(chat_id, locations_send_begin, locations_last_sent)| { + if !stmt_locations + .exists(params![1, locations_send_begin, locations_last_sent,]) + .unwrap_or_default() + { + // if there is no new location, there's nothing to send. + // however, maybe we want to bypass this test eg. 15 minutes + None + } else { + // pending locations are attached automatically to every message, + // so also to this empty text message. + // DC_CMD_LOCATION is only needed to create a nicer subject. + // + // for optimisation and to avoid flooding the sending queue, + // we could sending these messages only if we're really online. + // the easiest way to determine this, is to check for an empty message queue. + // (might not be 100%, however, as positions are sent combined later + // and dc_set_location() is typically called periodically, this is ok) + let mut msg = dc_msg_new(context, Viewtype::Text); + msg.hidden = true; + msg.param.set_int(Param::Cmd, 9); + Some((chat_id, msg)) + } + }) + .collect::>(); + Ok(msgs) }, ) - .unwrap(); // TODO: Better error handling + .unwrap_or_default(); // TODO: Better error handling + + for (chat_id, mut msg) in msgs.into_iter() { + // TODO: better error handling + chat::send_msg(context, chat_id as u32, &mut msg).unwrap(); + } } if 0 != continue_streaming { schedule_MAYBE_SEND_LOCATIONS(context, 0x1);