diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index 63b9e1060..4493bb633 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -572,6 +572,8 @@ int dc_is_configured (const dc_context_t* context); */ void dc_context_run (dc_context_t* context); +int dc_is_running (const dc_context_t* context); + /** * TODO: Document */ diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index 8ae9d34e8..73c08dcc5 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -119,7 +119,7 @@ impl ContextWrapper { match guard.as_ref() { Some(ref ctx) => Ok(ctxfn(ctx)), None => { - eprintln!("context not open"); + eprintln!("ignoring careless call to non open context"); Err(()) } } @@ -142,7 +142,7 @@ macro_rules! with_inner_async { Ok(res) } None => { - eprintln!("context not open"); + eprintln!("ignoring careless call to non open context"); Err(()) } } @@ -150,6 +150,23 @@ macro_rules! with_inner_async { }}; } +macro_rules! with_inner_spawn { + ($ctx:expr, $name:ident, $block:expr) => {{ + let l = $ctx.inner.clone(); + let lock = l.read().unwrap(); + match lock.as_ref() { + Some(ctx) => { + let $name = ctx.clone(); + async_std::task::spawn(async move { $block.await }); + Ok(()) + } + None => { + eprintln!("ignoring careless call to non open context"); + Err(()) + } + } + }}; +} macro_rules! try_inner_async { ($ctx:expr, $name:ident, $block:expr) => {{ let l = $ctx.inner.clone(); @@ -443,9 +460,15 @@ pub unsafe extern "C" fn dc_configure(context: *mut dc_context_t) { eprintln!("ignoring careless call to dc_configure()"); return; } - // TODO: this is now blocking, maybe change this + let ffi_context = &*context; - with_inner_async!(ffi_context, ctx, async move { ctx.configure().await }).ok(); + + with_inner_spawn!(ffi_context, ctx, async move { + ctx.configure() + .await + .log_err(ffi_context, "Configure failed") + }) + .ok(); } #[no_mangle] @@ -472,6 +495,16 @@ pub unsafe extern "C" fn dc_context_run(context: *mut dc_context_t) { with_inner_async!(ffi_context, ctx, { ctx.run() }).unwrap_or(()) } +#[no_mangle] +pub unsafe extern "C" fn dc_is_running(context: *mut dc_context_t) -> libc::c_int { + if context.is_null() { + return 0; + } + let ffi_context = &*context; + + with_inner_async!(ffi_context, ctx, { ctx.is_running() }).unwrap_or_default() as libc::c_int +} + #[no_mangle] pub unsafe extern "C" fn dc_has_next_event(context: *mut dc_context_t) -> libc::c_int { if context.is_null() { @@ -1833,7 +1866,7 @@ pub unsafe extern "C" fn dc_get_contact( #[no_mangle] pub unsafe extern "C" fn dc_imex( context: *mut dc_context_t, - what: libc::c_int, + what_raw: libc::c_int, param1: *const libc::c_char, _param2: *const libc::c_char, ) { @@ -1841,21 +1874,24 @@ pub unsafe extern "C" fn dc_imex( eprintln!("ignoring careless call to dc_imex()"); return; } - let what = match imex::ImexMode::from_i32(what as i32) { + let what = match imex::ImexMode::from_i32(what_raw as i32) { Some(what) => what, None => { - eprintln!("ignoring invalid argument {} to dc_imex", what); + eprintln!("ignoring invalid argument {} to dc_imex", what_raw); return; } }; // TODO: this is now blocking, figure out if that is okay let ffi_context = &*context; - with_inner_async!( - ffi_context, - ctx, - imex::imex(&ctx, what, to_opt_string_lossy(param1)) - ) + + let param1 = to_opt_string_lossy(param1); + + with_inner_spawn!(ffi_context, ctx, async move { + imex::imex(&ctx, what, param1) + .await + .log_err(ffi_context, "IMEX failed") + }) .ok(); } diff --git a/examples/simple.rs b/examples/simple.rs index a94eab63e..354a9b7d3 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -46,7 +46,7 @@ async fn main() { cb(event); } } else { - async_std::task::sleep(time::Duration::from_millis(50)); + async_std::task::sleep(time::Duration::from_millis(50)).await; } } }); diff --git a/python/src/deltachat/account.py b/python/src/deltachat/account.py index 313395fe9..24746064f 100644 --- a/python/src/deltachat/account.py +++ b/python/src/deltachat/account.py @@ -565,15 +565,18 @@ class IOThreads: self._thread_quitflag = False self._name2thread = {} self._log_event = log_event - self._running = False + self._log_running = True + # Make sure the current + self._start_one_thread("deltachat-log", self.dc_thread_run) + def is_started(self): - return self._running + return lib.dc_is_open(self._dc_context) and lib.dc_is_running(self._dc_context) def start(self, imap=True, smtp=True, mvbox=False, sentbox=False): assert not self.is_started() - self._running = True - self._start_one_thread("deltachat", self.dc_thread_run) + + lib.dc_context_run(self._dc_context) def _start_one_thread(self, name, func): self._name2thread[name] = t = threading.Thread(target=func, name=name) @@ -581,18 +584,14 @@ class IOThreads: t.start() def stop(self, wait=False): - lib.dc_context_shutdown(self._dc_context) - if wait: - for name, thread in self._name2thread.items(): - thread.join() - self._running = False + if self.is_started(): + lib.dc_context_shutdown(self._dc_context) def dc_thread_run(self): - self._log_event("py-bindings-info", 0, "DC THREAD START") + self._log_event("py-bindings-info", 0, "DC LOG THREAD START") - lib.dc_context_run(self._dc_context) - while self._running: - if lib.dc_has_next_event(self._dc_context): + while self._log_running: + if lib.dc_is_open(self._dc_context) and lib.dc_has_next_event(self._dc_context): event = lib.dc_get_next_event(self._dc_context) if event != ffi.NULL: deltachat.py_dc_callback( @@ -605,7 +604,7 @@ class IOThreads: else: time.sleep(0.05) - self._log_event("py-bindings-info", 0, "DC THREAD FINISHED") + self._log_event("py-bindings-info", 0, "DC LOG THREAD FINISHED") def _destroy_dc_context(dc_context, dc_context_unref=lib.dc_context_unref): # destructor for dc_context diff --git a/python/tests/conftest.py b/python/tests/conftest.py index b6868bc2c..4dc3781af 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -223,7 +223,7 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig, datadir): pre_generated_key=pre_generated_key) configdict.update(config) ac.configure(**configdict) - ac.start_threads(mvbox=mvbox, sentbox=sentbox) + ac.start_threads() return ac def get_one_online_account(self, pre_generated_key=True): diff --git a/python/tests/test_account.py b/python/tests/test_account.py index 1b868e191..d95486e03 100644 --- a/python/tests/test_account.py +++ b/python/tests/test_account.py @@ -178,7 +178,6 @@ class TestOfflineChat: assert d["draft"] == "" if chat.get_draft() is None else chat.get_draft() def test_group_chat_creation_with_translation(self, ac1): - ac1.start_threads() ac1.set_stock_translation(const.DC_STR_NEWGROUPDRAFT, "xyz %1$s") ac1._evlogger.consume_events() with pytest.raises(ValueError): @@ -198,7 +197,6 @@ class TestOfflineChat: assert not chat.is_promoted() msg = chat.get_draft() assert msg.text == "xyz title1" - ac1.stop_threads() @pytest.mark.parametrize("verified", [True, False]) def test_group_chat_qr(self, acfactory, ac1, verified): @@ -351,6 +349,7 @@ class TestOfflineChat: ac1.configure(addr="123@example.org") def test_import_export_one_contact(self, acfactory, tmpdir): + print("START") backupdir = tmpdir.mkdir("backup") ac1 = acfactory.get_configured_offline_account() contact1 = ac1.create_contact("some1@hello.com", name="some1") @@ -362,24 +361,27 @@ class TestOfflineChat: with bin.open("w") as f: f.write("\00123" * 10000) msg = chat.send_file(bin.strpath) - + print("L1") contact = msg.get_sender_contact() assert contact == ac1.get_self_contact() assert not backupdir.listdir() - + print("L2") path = ac1.export_all(backupdir.strpath) assert os.path.exists(path) ac2 = acfactory.get_unconfigured_account() ac2.import_all(path) contacts = ac2.get_contacts(query="some1") assert len(contacts) == 1 + print("L3") contact2 = contacts[0] assert contact2.addr == "some1@hello.com" chat2 = ac2.create_chat_by_contact(contact2) messages = chat2.get_messages() assert len(messages) == 2 + print("L4") assert messages[0].text == "msg1" assert os.path.exists(messages[1].filename) + print("STOP") def test_ac_setup_message_fails(self, ac1): with pytest.raises(RuntimeError): diff --git a/python/tests/test_lowlevel.py b/python/tests/test_lowlevel.py index 60640a228..70cdc2b6c 100644 --- a/python/tests/test_lowlevel.py +++ b/python/tests/test_lowlevel.py @@ -131,8 +131,7 @@ def test_sig(): def test_markseen_invalid_message_ids(acfactory): ac1 = acfactory.get_configured_offline_account() - - ac1.start_threads() + contact1 = ac1.create_contact(email="some1@example.com", name="some1") chat = ac1.create_chat_by_contact(contact1) chat.send_text("one messae") @@ -140,7 +139,6 @@ def test_markseen_invalid_message_ids(acfactory): msg_ids = [9] lib.dc_markseen_msgs(ac1._dc_context, msg_ids, len(msg_ids)) ac1._evlogger.ensure_event_not_queued("DC_EVENT_WARNING|DC_EVENT_ERROR") - ac1.stop_threads() def test_get_special_message_id_returns_empty_message(acfactory): diff --git a/src/scheduler.rs b/src/scheduler.rs index 3af84f15b..46c196756 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -48,6 +48,8 @@ impl Context { } async fn inbox_loop(ctx: Context, inbox_handlers: ImapConnectionHandlers) { + use futures::future::FutureExt; + info!(ctx, "starting inbox loop"); let ImapConnectionHandlers { mut connection, @@ -56,7 +58,10 @@ async fn inbox_loop(ctx: Context, inbox_handlers: ImapConnectionHandlers) { } = inbox_handlers; let fut = async move { - connection.connect_configured(&ctx).await.unwrap(); + if let Err(err) = connection.connect_configured(&ctx).await { + error!(ctx, "{}", err); + return; + } loop { let probe_network = ctx.scheduler.read().await.get_probe_network(); @@ -101,7 +106,7 @@ async fn inbox_loop(ctx: Context, inbox_handlers: ImapConnectionHandlers) { } }; - fut.race(stop_receiver.recv()).await; + fut.race(stop_receiver.recv().map(|_| ())).await; shutdown_sender.send(()).await; } @@ -110,6 +115,8 @@ async fn simple_imap_loop( inbox_handlers: ImapConnectionHandlers, folder: impl AsRef, ) { + use futures::future::FutureExt; + info!(ctx, "starting simple loop for {}", folder.as_ref()); let ImapConnectionHandlers { mut connection, @@ -118,7 +125,10 @@ async fn simple_imap_loop( } = inbox_handlers; let fut = async move { - connection.connect_configured(&ctx).await.unwrap(); + if let Err(err) = connection.connect_configured(&ctx).await { + error!(ctx, "{}", err); + return; + } loop { match get_watch_folder(&ctx, folder.as_ref()).await { @@ -155,7 +165,7 @@ async fn simple_imap_loop( } }; - fut.race(stop_receiver.recv()).await; + fut.race(stop_receiver.recv().map(|_| ())).await; shutdown_sender.send(()).await; }