mirror of
https://github.com/chatmail/core.git
synced 2026-04-02 05:22:14 +03:00
Compare commits
2 Commits
d6dacdcd27
...
dc_event_e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
69ea56eb75 | ||
|
|
eb3242e077 |
@@ -2,6 +2,9 @@
|
||||
|
||||
## 1.79.0
|
||||
|
||||
### API-Changes
|
||||
- add `dc_event_emitter_close()` to termitate event loops in a thread-safe way
|
||||
|
||||
### Changes
|
||||
- Send locations in the background regardless of SMTP loop activity #3247
|
||||
- refactorings #3268
|
||||
|
||||
@@ -5201,6 +5201,15 @@ dc_event_t* dc_get_next_event(dc_event_emitter_t* emitter);
|
||||
void dc_event_emitter_unref(dc_event_emitter_t* emitter);
|
||||
|
||||
|
||||
/**
|
||||
* Closes event emitter object.
|
||||
*
|
||||
* @memberof dc_event_emitter_t
|
||||
* @param emitter Event emitter object as returned from dc_get_event_emitter().
|
||||
*/
|
||||
void dc_event_emitter_close(dc_event_emitter_t* emitter);
|
||||
|
||||
|
||||
/**
|
||||
* @class dc_accounts_event_emitter_t
|
||||
*
|
||||
|
||||
@@ -640,6 +640,18 @@ pub unsafe extern "C" fn dc_event_emitter_unref(emitter: *mut dc_event_emitter_t
|
||||
Box::from_raw(emitter);
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn dc_event_emitter_close(emitter: *mut dc_event_emitter_t) {
|
||||
if emitter.is_null() {
|
||||
eprintln!("ignoring careless call to dc_event_emitter_close()");
|
||||
return;
|
||||
}
|
||||
|
||||
let emitter = &mut *emitter;
|
||||
|
||||
block_on(emitter.close())
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn dc_get_next_event(events: *mut dc_event_emitter_t) -> *mut dc_event_t {
|
||||
if events.is_null() {
|
||||
|
||||
@@ -699,14 +699,7 @@ class Account(object):
|
||||
|
||||
self.log("remove dc_context references")
|
||||
|
||||
# if _dc_context is unref'ed the event thread should quickly
|
||||
# receive the termination signal. However, some python code might
|
||||
# still hold a reference and so we use a secondary signal
|
||||
# to make sure the even thread terminates if it receives any new
|
||||
# event, indepedently from waiting for the core to send NULL to
|
||||
# get_next_event().
|
||||
self._event_thread.mark_shutdown()
|
||||
self._dc_context = None
|
||||
|
||||
self.log("wait for event thread to finish")
|
||||
try:
|
||||
@@ -714,6 +707,8 @@ class Account(object):
|
||||
except RuntimeError as e:
|
||||
self.log("Waiting for event thread failed: {}".format(e))
|
||||
|
||||
self._dc_context = None
|
||||
|
||||
if self._event_thread.is_alive():
|
||||
self.log("WARN: event thread did not terminate yet, ignoring.")
|
||||
|
||||
|
||||
@@ -196,7 +196,7 @@ class EventThread(threading.Thread):
|
||||
self.account = account
|
||||
super(EventThread, self).__init__(name="events")
|
||||
self.daemon = True
|
||||
self._marked_for_shutdown = False
|
||||
self._event_emitter = None
|
||||
self.start()
|
||||
|
||||
@contextmanager
|
||||
@@ -206,7 +206,7 @@ class EventThread(threading.Thread):
|
||||
self.account.log(message + " FINISHED")
|
||||
|
||||
def mark_shutdown(self) -> None:
|
||||
self._marked_for_shutdown = True
|
||||
lib.dc_event_emitter_close(self._event_emitter)
|
||||
|
||||
def wait(self, timeout=None) -> None:
|
||||
if self == threading.current_thread():
|
||||
@@ -221,18 +221,16 @@ class EventThread(threading.Thread):
|
||||
self._inner_run()
|
||||
|
||||
def _inner_run(self):
|
||||
if self._marked_for_shutdown or self.account._dc_context is None:
|
||||
if self.account._dc_context is None:
|
||||
return
|
||||
event_emitter = ffi.gc(
|
||||
self._event_emitter = ffi.gc(
|
||||
lib.dc_get_event_emitter(self.account._dc_context),
|
||||
lib.dc_event_emitter_unref,
|
||||
)
|
||||
while not self._marked_for_shutdown:
|
||||
event = lib.dc_get_next_event(event_emitter)
|
||||
while True:
|
||||
event = lib.dc_get_next_event(self._event_emitter)
|
||||
if event == ffi.NULL:
|
||||
break
|
||||
if self._marked_for_shutdown:
|
||||
break
|
||||
evt = lib.dc_event_get_id(event)
|
||||
data1 = lib.dc_event_get_data1_int(event)
|
||||
# the following code relates to the deltachat/_build.py's helper
|
||||
@@ -245,15 +243,11 @@ class EventThread(threading.Thread):
|
||||
|
||||
lib.dc_event_unref(event)
|
||||
ffi_event = FFIEvent(name=evt_name, data1=data1, data2=data2)
|
||||
try:
|
||||
self.account._pm.hook.ac_process_ffi_event(account=self, ffi_event=ffi_event)
|
||||
for name, kwargs in self._map_ffi_event(ffi_event):
|
||||
self.account.log("calling hook name={} kwargs={}".format(name, kwargs))
|
||||
hook = getattr(self.account._pm.hook, name)
|
||||
hook(**kwargs)
|
||||
except Exception:
|
||||
if not self._marked_for_shutdown and self.account._dc_context is not None:
|
||||
raise
|
||||
self.account._pm.hook.ac_process_ffi_event(account=self, ffi_event=ffi_event)
|
||||
for name, kwargs in self._map_ffi_event(ffi_event):
|
||||
self.account.log("calling hook name={} kwargs={}".format(name, kwargs))
|
||||
hook = getattr(self.account._pm.hook, name)
|
||||
hook(**kwargs)
|
||||
|
||||
def _map_ffi_event(self, ffi_event: FFIEvent):
|
||||
name = ffi_event.name
|
||||
|
||||
@@ -64,15 +64,20 @@ impl Events {
|
||||
pub struct EventEmitter(Receiver<Event>);
|
||||
|
||||
impl EventEmitter {
|
||||
/// Blocking recv of an event. Return `None` if the `Sender` has been droped.
|
||||
/// Blocking recv of an event. Return `None` if the `Sender` has been dropped.
|
||||
pub fn recv_sync(&self) -> Option<Event> {
|
||||
async_std::task::block_on(self.recv())
|
||||
}
|
||||
|
||||
/// Async recv of an event. Return `None` if the `Sender` has been droped.
|
||||
/// Async recv of an event. Return `None` if the `Sender` has been dropped.
|
||||
pub async fn recv(&self) -> Option<Event> {
|
||||
self.0.recv().await.ok()
|
||||
}
|
||||
|
||||
/// Closes event emitter.
|
||||
pub async fn close(&mut self) {
|
||||
self.0.close();
|
||||
}
|
||||
}
|
||||
|
||||
impl async_std::stream::Stream for EventEmitter {
|
||||
|
||||
Reference in New Issue
Block a user