(dig,hpk) pull out job collection from sql query/lock logic

This commit is contained in:
holger krekel
2019-07-18 14:03:57 +02:00
parent 7d0b5d8abb
commit 8f240f7153
3 changed files with 87 additions and 81 deletions

View File

@@ -1962,7 +1962,7 @@ pub unsafe fn dc_forward_msgs(
curr_timestamp = dc_create_smeared_timestamps(context, msg_cnt); curr_timestamp = dc_create_smeared_timestamps(context, msg_cnt);
idsstr = dc_arr_to_string(msg_ids, msg_cnt); idsstr = dc_arr_to_string(msg_ids, msg_cnt);
context let ids = context
.sql .sql
.query_map( .query_map(
format!( format!(
@@ -1972,63 +1972,63 @@ pub unsafe fn dc_forward_msgs(
params![], params![],
|row| row.get::<_, i32>(0), |row| row.get::<_, i32>(0),
|ids| { |ids| {
for id in ids { ids.collect::<Result<Vec<_>, _>>().map_err(Into::into)
let src_msg_id = id?; }
if !dc_msg_load_from_db(msg, context, src_msg_id as u32) { );
break;
} for id in ids.unwrap() {
dc_param_set_packed(original_param, (*(*msg).param).packed); let src_msg_id = id;
if (*msg).from_id != 1i32 as libc::c_uint { if !dc_msg_load_from_db(msg, context, src_msg_id as u32) {
dc_param_set_int((*msg).param, 'a' as i32, 1i32); break;
} }
dc_param_set((*msg).param, 'c' as i32, 0 as *const libc::c_char); dc_param_set_packed(original_param, (*(*msg).param).packed);
dc_param_set((*msg).param, 'u' as i32, 0 as *const libc::c_char); if (*msg).from_id != 1i32 as libc::c_uint {
dc_param_set((*msg).param, 'S' as i32, 0 as *const libc::c_char); dc_param_set_int((*msg).param, 'a' as i32, 1i32);
let new_msg_id: uint32_t; }
if (*msg).state == 18i32 { dc_param_set((*msg).param, 'c' as i32, 0 as *const libc::c_char);
let fresh9 = curr_timestamp; dc_param_set((*msg).param, 'u' as i32, 0 as *const libc::c_char);
curr_timestamp = curr_timestamp + 1; dc_param_set((*msg).param, 'S' as i32, 0 as *const libc::c_char);
new_msg_id = prepare_msg_raw(context, chat, msg, fresh9); let new_msg_id: uint32_t;
let save_param: *mut dc_param_t = (*msg).param; if (*msg).state == 18i32 {
(*msg).param = original_param; let fresh9 = curr_timestamp;
(*msg).id = src_msg_id as uint32_t; curr_timestamp = curr_timestamp + 1;
let old_fwd: *mut libc::c_char = dc_param_get( new_msg_id = prepare_msg_raw(context, chat, msg, fresh9);
(*msg).param, let save_param: *mut dc_param_t = (*msg).param;
'P' as i32, (*msg).param = original_param;
b"\x00" as *const u8 as *const libc::c_char, (*msg).id = src_msg_id as uint32_t;
); let old_fwd: *mut libc::c_char = dc_param_get(
let new_fwd: *mut libc::c_char = dc_mprintf( (*msg).param,
b"%s %d\x00" as *const u8 as *const libc::c_char, 'P' as i32,
old_fwd, b"\x00" as *const u8 as *const libc::c_char,
new_msg_id, );
); let new_fwd: *mut libc::c_char = dc_mprintf(
dc_param_set((*msg).param, 'P' as i32, new_fwd); b"%s %d\x00" as *const u8 as *const libc::c_char,
dc_msg_save_param_to_disk(msg); old_fwd,
free(new_fwd as *mut libc::c_void); new_msg_id,
free(old_fwd as *mut libc::c_void); );
(*msg).param = save_param dc_param_set((*msg).param, 'P' as i32, new_fwd);
} else { dc_msg_save_param_to_disk(msg);
(*msg).state = 20i32; free(new_fwd as *mut libc::c_void);
let fresh10 = curr_timestamp; free(old_fwd as *mut libc::c_void);
curr_timestamp = curr_timestamp + 1; (*msg).param = save_param
new_msg_id = prepare_msg_raw(context, chat, msg, fresh10); } else {
dc_job_send_msg(context, new_msg_id); (*msg).state = 20i32;
} let fresh10 = curr_timestamp;
carray_add( curr_timestamp = curr_timestamp + 1;
created_db_entries, new_msg_id = prepare_msg_raw(context, chat, msg, fresh10);
chat_id as uintptr_t as *mut libc::c_void, dc_job_send_msg(context, new_msg_id);
0 as *mut libc::c_uint, }
); carray_add(
carray_add( created_db_entries,
created_db_entries, chat_id as uintptr_t as *mut libc::c_void,
new_msg_id as uintptr_t as *mut libc::c_void, 0 as *mut libc::c_uint,
0 as *mut libc::c_uint, );
); carray_add(
} created_db_entries,
Ok(()) new_msg_id as uintptr_t as *mut libc::c_void,
}, 0 as *mut libc::c_uint,
) );
.unwrap(); // TODO: better error handling }
} }
if !created_db_entries.is_null() { if !created_db_entries.is_null() {

View File

@@ -80,12 +80,14 @@ unsafe fn dc_job_perform(context: &Context, thread: libc::c_int, probe_network:
params_probe params_probe
}; };
let jobs: Vec<dc_job_t> = context info!(context, 0, "dc_job_perform before query");
let jobs: Result<Vec<dc_job_t>,_> = context
.sql .sql
.query_map( .query_map(
query, query,
params, params,
|row| { |row| {
info!(context, 0, "START jobs query_maps");
let job = dc_job_t { let job = dc_job_t {
job_id: row.get(0)?, job_id: row.get(0)?,
action: row.get(1)?, action: row.get(1)?,
@@ -100,16 +102,22 @@ unsafe fn dc_job_perform(context: &Context, thread: libc::c_int, probe_network:
let packed: String = row.get(3)?; let packed: String = row.get(3)?;
dc_param_set_packed(job.param, to_cstring(packed).as_ptr()); dc_param_set_packed(job.param, to_cstring(packed).as_ptr());
info!(context, 0, "DONE jobs query_maps row");
Ok(job) Ok(job)
}, },
|jobs| { |jobs| {
jobs.collect::<Result<Vec<dc_job_t>, _>>() info!(context, 0, "collecting jobs");
.map_err(Into::into) let res = jobs.collect::<Result<Vec<dc_job_t>, _>>()
.map_err(Into::into);
info!(context, 0, "collecting jobs done");
res
}, },
) );
.unwrap_or_default(); match jobs {
Ok(ref res) => {info!(context, 0, "query done, {:?}", res.len()); },
for mut job in jobs { Err(ref err) => {info!(context, 0, "query failed: {:?}", err); }
}
for mut job in jobs.unwrap_or_default() {
info!( info!(
context, context,
0, 0,

View File

@@ -471,24 +471,9 @@ impl Imap {
fn unsetup_handle(&self, context: &Context) { fn unsetup_handle(&self, context: &Context) {
info!(context, 0, "IMAP unsetup_handle starts"); info!(context, 0, "IMAP unsetup_handle starts");
// XXX the next line currently can block even if all threads
// terminated already
let session = self.session.lock().unwrap().take();
info!( info!(
context, context,
0, "IMAP unsetup_handle step1 (acquired session.lock)" 0, "IMAP unsetup_handle step 1 (closing down stream)."
);
if session.is_some() {
match session.unwrap().close() {
Ok(_) => {}
Err(err) => {
eprintln!("failed to close connection: {:?}", err);
}
}
}
info!(
context,
0, "IMAP unsetup_handle step 2 (closing down stream)."
); );
let stream = self.stream.write().unwrap().take(); let stream = self.stream.write().unwrap().take();
if stream.is_some() { if stream.is_some() {
@@ -499,6 +484,19 @@ impl Imap {
} }
} }
} }
info!(
context,
0, "IMAP unsetup_handle step 2 (acquired session.lock)"
);
let session = self.session.lock().unwrap().take();
if session.is_some() {
match session.unwrap().close() {
Ok(_) => {}
Err(err) => {
eprintln!("failed to close connection: {:?}", err);
}
}
}
info!(context, 0, "IMAP unsetup_handle step 3 (clearing config)."); info!(context, 0, "IMAP unsetup_handle step 3 (clearing config).");
self.config.write().unwrap().selected_folder = None; self.config.write().unwrap().selected_folder = None;