Compare commits

...

15 Commits

Author SHA1 Message Date
dignifiedquire
97afe582e1 fix: dont use self.error when it is not available 2020-01-29 18:54:43 +01:00
=
4498954918 simplify failure 2020-01-29 17:31:20 +00:00
=
61ff2c4bee add a failing test taht doesn't use pytest anymore 2020-01-29 17:14:17 +00:00
=
14902ecf15 another iteration 2020-01-29 17:01:47 +00:00
=
289d8fdabf simplify thread handling 2020-01-29 16:48:09 +00:00
=
26c39f876b correctly use dc_context_new 2020-01-29 16:43:01 +00:00
=
1f53d489cb snapshot to try to get test_lowlevel.py to pass 2020-01-29 16:36:00 +00:00
holger krekel
35d055ea62 fix lint 2020-01-29 16:25:39 +01:00
dignifiedquire
a2c94fc715 add threads start & stop 2020-01-29 16:05:39 +01:00
dignifiedquire
e93911effb update ffi 2020-01-29 15:44:20 +01:00
dignifiedquire
41a7e06332 refactor: make run take the callback and avoid boxing 2020-01-29 14:19:36 +01:00
dignifiedquire
29bb2ec58b some fixes 2020-01-29 14:09:21 +01:00
dignifiedquire
efcad4126d cargo fmt 2020-01-29 14:09:21 +01:00
dignifiedquire
b4a5767347 start integration work 2020-01-29 14:09:21 +01:00
dignifiedquire
4dbcab9e6d feat: implement minimal rust threads 2020-01-29 14:08:35 +01:00
21 changed files with 420 additions and 901 deletions

15
Cargo.lock generated
View File

@@ -498,6 +498,19 @@ dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-deque 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-channel"
version = "0.4.0"
@@ -641,6 +654,7 @@ dependencies = [
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"charset 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"debug_stub_derive 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"deltachat_derive 2.0.0",
"email 0.0.21 (git+https://github.com/deltachat/rust-email)",
@@ -3346,6 +3360,7 @@ dependencies = [
"checksum core-foundation-sys 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e7ca8a5221364ef15ce201e8ed2f609fc312682a8f4e0e3d4aa5879764e0fa3b"
"checksum crc24 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "fd121741cf3eb82c08dd3023eb55bf2665e5f60ec20f89760cf836ae4562e6a0"
"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
"checksum crossbeam 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e"
"checksum crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "acec9a3b0b3559f15aee4f90746c4e5e293b701c0f7d3925d24e01645267b68c"
"checksum crossbeam-deque 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3aa945d63861bfe624b55d153a39684da1e8c0bc8fba932f7ee3a3c16cea3ca"
"checksum crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5064ebdbf05ce3cb95e45c8b086f72263f4166b29b97f6baff7ef7fe047b55ac"

View File

@@ -6,7 +6,7 @@ edition = "2018"
license = "MPL-2.0"
[profile.release]
lto = true
# lto = true
[dependencies]
deltachat_derive = { path = "./deltachat_derive" }
@@ -58,8 +58,8 @@ encoded-words = { git = "https://github.com/async-email/encoded-words", branch="
native-tls = "0.2.3"
image = { version = "0.22.4", default-features=false, features = ["gif_codec", "jpeg", "ico", "png_codec", "pnm", "webp", "bmp"] }
pretty_env_logger = "0.3.1"
rustyline = { version = "4.1.0", optional = true }
crossbeam = "0.7.3"
[dev-dependencies]
tempfile = "3.0"

View File

@@ -243,7 +243,7 @@ typedef uintptr_t (*dc_callback_t) (dc_context_t* context, int event, uintptr_t
* The object must be passed to the other context functions
* and must be freed using dc_context_unref() after usage.
*/
dc_context_t* dc_context_new (dc_callback_t cb, void* userdata, const char* os_name);
dc_context_t* dc_context_new (void* userdata, const char* os_name);
/**
@@ -538,307 +538,14 @@ int dc_is_configured (const dc_context_t* context);
/**
* Execute pending imap-jobs.
* This function and dc_perform_imap_fetch() and dc_perform_imap_idle()
* must be called from the same thread, typically in a loop.
*
* Example:
*
* void* imap_thread_func(void* context)
* {
* while (true) {
* dc_perform_imap_jobs(context);
* dc_perform_imap_fetch(context);
* dc_perform_imap_idle(context);
* }
* }
*
* // start imap-thread that runs forever
* pthread_t imap_thread;
* pthread_create(&imap_thread, NULL, imap_thread_func, context);
*
* ... program runs ...
*
* // network becomes available again -
* // the interrupt causes dc_perform_imap_idle() in the thread above
* // to return so that jobs are executed and messages are fetched.
* dc_maybe_network(context);
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
* TODO: Document
*/
void dc_perform_imap_jobs (dc_context_t* context);
void dc_context_run (dc_context_t* context, dc_callback_t cb);
/**
* Fetch new messages, if any.
* This function and dc_perform_imap_jobs() and dc_perform_imap_idle() must be called from the same thread,
* typically in a loop.
*
* See dc_perform_imap_jobs() for an example.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
* TODO: Document
*/
void dc_perform_imap_fetch (dc_context_t* context);
/**
* Wait for messages or jobs.
* This function and dc_perform_imap_jobs() and dc_perform_imap_fetch() must be called from the same thread,
* typically in a loop.
*
* You should call this function directly after calling dc_perform_imap_fetch().
*
* See dc_perform_imap_jobs() for an example.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_imap_idle (dc_context_t* context);
/**
* Interrupt waiting for imap-jobs.
* If dc_perform_imap_jobs(), dc_perform_imap_fetch() and dc_perform_imap_idle() are called in a loop,
* calling this function causes imap-jobs to be executed and messages to be fetched.
*
* dc_interrupt_imap_idle() does _not_ interrupt dc_perform_imap_jobs() or dc_perform_imap_fetch().
* If the imap-thread is inside one of these functions when dc_interrupt_imap_idle() is called, however,
* the next call of the imap-thread to dc_perform_imap_idle() is interrupted immediately.
*
* Internally, this function is called whenever a imap-jobs should be processed
* (delete message, markseen etc.).
*
* When you need to call this function just because to get jobs done after network changes,
* use dc_maybe_network() instead.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_interrupt_imap_idle (dc_context_t* context);
/**
* Execute pending mvbox-jobs.
* This function and dc_perform_mvbox_fetch() and dc_perform_mvbox_idle()
* must be called from the same thread, typically in a loop.
*
* Example:
*
* void* mvbox_thread_func(void* context)
* {
* while (true) {
* dc_perform_mvbox_jobs(context);
* dc_perform_mvbox_fetch(context);
* dc_perform_mvbox_idle(context);
* }
* }
*
* // start mvbox-thread that runs forever
* pthread_t mvbox_thread;
* pthread_create(&mvbox_thread, NULL, mvbox_thread_func, context);
*
* ... program runs ...
*
* // network becomes available again -
* // the interrupt causes dc_perform_mvbox_idle() in the thread above
* // to return so that jobs are executed and messages are fetched.
* dc_maybe_network(context);
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_mvbox_jobs (dc_context_t* context);
/**
* Fetch new messages from the MVBOX, if any.
* The MVBOX is a folder on the account where chat messages are moved to.
* The moving is done to not disturb shared accounts that are used by both,
* Delta Chat and a classical MUA.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_mvbox_fetch (dc_context_t* context);
/**
* Wait for messages or jobs in the MVBOX-thread.
* This function and dc_perform_mvbox_fetch().
* must be called from the same thread, typically in a loop.
*
* You should call this function directly after calling dc_perform_mvbox_fetch().
*
* See dc_perform_mvbox_fetch() for an example.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_mvbox_idle (dc_context_t* context);
/**
* Interrupt waiting for MVBOX-fetch.
* dc_interrupt_mvbox_idle() does _not_ interrupt dc_perform_mvbox_fetch().
* If the MVBOX-thread is inside this function when dc_interrupt_mvbox_idle() is called, however,
* the next call of the MVBOX-thread to dc_perform_mvbox_idle() is interrupted immediately.
*
* Internally, this function is called whenever a imap-jobs should be processed.
*
* When you need to call this function just because to get jobs done after network changes,
* use dc_maybe_network() instead.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_interrupt_mvbox_idle (dc_context_t* context);
/**
* Execute pending sentbox-jobs.
* This function and dc_perform_sentbox_fetch() and dc_perform_sentbox_idle()
* must be called from the same thread, typically in a loop.
*
* Example:
*
* void* sentbox_thread_func(void* context)
* {
* while (true) {
* dc_perform_sentbox_jobs(context);
* dc_perform_sentbox_fetch(context);
* dc_perform_sentbox_idle(context);
* }
* }
*
* // start sentbox-thread that runs forever
* pthread_t sentbox_thread;
* pthread_create(&sentbox_thread, NULL, sentbox_thread_func, context);
*
* ... program runs ...
*
* // network becomes available again -
* // the interrupt causes dc_perform_sentbox_idle() in the thread above
* // to return so that jobs are executed and messages are fetched.
* dc_maybe_network(context);
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_sentbox_jobs (dc_context_t* context);
/**
* Fetch new messages from the Sent folder, if any.
* This function and dc_perform_sentbox_idle()
* must be called from the same thread, typically in a loop.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_sentbox_fetch (dc_context_t* context);
/**
* Wait for messages or jobs in the SENTBOX-thread.
* This function and dc_perform_sentbox_fetch()
* must be called from the same thread, typically in a loop.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_sentbox_idle (dc_context_t* context);
/**
* Interrupt waiting for messages or jobs in the SENTBOX-thread.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_interrupt_sentbox_idle (dc_context_t* context);
/**
* Execute pending smtp-jobs.
* This function and dc_perform_smtp_idle() must be called from the same thread,
* typically in a loop.
*
* Example:
*
* void* smtp_thread_func(void* context)
* {
* while (true) {
* dc_perform_smtp_jobs(context);
* dc_perform_smtp_idle(context);
* }
* }
*
* // start smtp-thread that runs forever
* pthread_t smtp_thread;
* pthread_create(&smtp_thread, NULL, smtp_thread_func, context);
*
* ... program runs ...
*
* // network becomes available again -
* // the interrupt causes dc_perform_smtp_idle() in the thread above
* // to return so that jobs are executed
* dc_maybe_network(context);
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_smtp_jobs (dc_context_t* context);
/**
* Wait for smtp-jobs.
* This function and dc_perform_smtp_jobs() must be called from the same thread,
* typically in a loop.
*
* See dc_interrupt_smtp_idle() for an example.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_smtp_idle (dc_context_t* context);
/**
* Interrupt waiting for smtp-jobs.
* If dc_perform_smtp_jobs() and dc_perform_smtp_idle() are called in a loop,
* calling this function causes jobs to be executed.
*
* dc_interrupt_smtp_idle() does _not_ interrupt dc_perform_smtp_jobs().
* If the smtp-thread is inside this function when dc_interrupt_smtp_idle() is called, however,
* the next call of the smtp-thread to dc_perform_smtp_idle() is interrupted immediately.
*
* Internally, this function is called whenever a message is to be sent.
*
* When you need to call this function just because to get jobs done after network changes,
* use dc_maybe_network() instead.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_interrupt_smtp_idle (dc_context_t* context);
void dc_context_shutdown (dc_context_t* context);
/**
* This function can be called whenever there is a hint

View File

@@ -57,7 +57,6 @@ use self::string::*;
/// and protected by an [RwLock]. Other than that it needs to store
/// the data which is passed into [dc_context_new].
pub struct ContextWrapper {
cb: Option<dc_callback_t>,
userdata: *mut libc::c_void,
os_name: String,
inner: RwLock<Option<context::Context>>,
@@ -91,7 +90,10 @@ impl ContextWrapper {
///
/// This function makes it easy to log an error.
unsafe fn error(&self, msg: &str) {
self.translate_cb(Event::Error(msg.to_string()));
self.with_inner(|ctx| {
ctx.call_cb(Event::Error(msg.to_string()));
})
.unwrap();
}
/// Unlock the context and execute a closure with it.
@@ -117,93 +119,19 @@ impl ContextWrapper {
match guard.as_ref() {
Some(ref ctx) => Ok(ctxfn(ctx)),
None => {
unsafe { self.error("context not open") };
eprintln!("context not open");
Err(())
}
}
}
/// Translates the callback from the rust style to the C-style version.
unsafe fn translate_cb(&self, event: Event) {
if let Some(ffi_cb) = self.cb {
let event_id = event.as_id();
match event {
Event::Info(msg)
| Event::SmtpConnected(msg)
| Event::ImapConnected(msg)
| Event::SmtpMessageSent(msg)
| Event::ImapMessageDeleted(msg)
| Event::ImapMessageMoved(msg)
| Event::ImapFolderEmptied(msg)
| Event::NewBlobFile(msg)
| Event::DeletedBlobFile(msg)
| Event::Warning(msg)
| Event::Error(msg)
| Event::ErrorNetwork(msg)
| Event::ErrorSelfNotInGroup(msg) => {
let data2 = CString::new(msg).unwrap_or_default();
ffi_cb(self, event_id, 0, data2.as_ptr() as uintptr_t);
}
Event::MsgsChanged { chat_id, msg_id }
| Event::IncomingMsg { chat_id, msg_id }
| Event::MsgDelivered { chat_id, msg_id }
| Event::MsgFailed { chat_id, msg_id }
| Event::MsgRead { chat_id, msg_id } => {
ffi_cb(
self,
event_id,
chat_id.to_u32() as uintptr_t,
msg_id.to_u32() as uintptr_t,
);
}
Event::ChatModified(chat_id) => {
ffi_cb(self, event_id, chat_id.to_u32() as uintptr_t, 0);
}
Event::ContactsChanged(id) | Event::LocationChanged(id) => {
let id = id.unwrap_or_default();
ffi_cb(self, event_id, id as uintptr_t, 0);
}
Event::ConfigureProgress(progress) | Event::ImexProgress(progress) => {
ffi_cb(self, event_id, progress as uintptr_t, 0);
}
Event::ImexFileWritten(file) => {
let data1 = file.to_c_string().unwrap_or_default();
ffi_cb(self, event_id, data1.as_ptr() as uintptr_t, 0);
}
Event::SecurejoinInviterProgress {
contact_id,
progress,
}
| Event::SecurejoinJoinerProgress {
contact_id,
progress,
} => {
ffi_cb(
self,
event_id,
contact_id as uintptr_t,
progress as uintptr_t,
);
}
Event::SecurejoinMemberAdded {
chat_id,
contact_id,
} => {
ffi_cb(
self,
event_id,
chat_id.to_u32() as uintptr_t,
contact_id as uintptr_t,
);
}
}
}
fn is_open(&self) -> bool {
self.inner.read().unwrap().is_some()
}
}
#[no_mangle]
pub unsafe extern "C" fn dc_context_new(
cb: Option<dc_callback_t>,
userdata: *mut libc::c_void,
os_name: *const libc::c_char,
) -> *mut dc_context_t {
@@ -215,11 +143,11 @@ pub unsafe extern "C" fn dc_context_new(
to_string_lossy(os_name)
};
let ffi_ctx = ContextWrapper {
cb,
userdata,
os_name,
inner: RwLock::new(None),
};
Box::into_raw(Box::new(ffi_ctx))
}
@@ -257,17 +185,11 @@ pub unsafe extern "C" fn dc_open(
return 0;
}
let ffi_context = &*context;
let rust_cb = move |_ctx: &Context, evt: Event| ffi_context.translate_cb(evt);
let ctx = if blobdir.is_null() || *blobdir == 0 {
Context::new(
Box::new(rust_cb),
ffi_context.os_name.clone(),
as_path(dbfile).to_path_buf(),
)
Context::new(ffi_context.os_name.clone(), as_path(dbfile).to_path_buf())
} else {
Context::with_blobdir(
Box::new(rust_cb),
ffi_context.os_name.clone(),
as_path(dbfile).to_path_buf(),
as_path(blobdir).to_path_buf(),
@@ -300,11 +222,7 @@ pub unsafe extern "C" fn dc_is_open(context: *mut dc_context_t) -> libc::c_int {
return 0;
}
let ffi_context = &*context;
let inner_guard = ffi_context.inner.read().unwrap();
match *inner_guard {
Some(_) => 1,
None => 0,
}
ffi_context.is_open() as libc::c_int
}
#[no_mangle]
@@ -455,9 +373,7 @@ pub unsafe extern "C" fn dc_configure(context: *mut dc_context_t) {
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| configure::configure(ctx))
.unwrap_or(())
ffi_context.with_inner(|ctx| ctx.configure()).unwrap_or(())
}
#[no_mangle]
@@ -468,188 +384,120 @@ pub unsafe extern "C" fn dc_is_configured(context: *mut dc_context_t) -> libc::c
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| configure::dc_is_configured(ctx) as libc::c_int)
.with_inner(|ctx| ctx.is_configured() as libc::c_int)
.unwrap_or(0)
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_imap_jobs(context: *mut dc_context_t) {
pub unsafe extern "C" fn dc_context_run(context: *mut dc_context_t, cb: Option<dc_callback_t>) {
eprintln!("dc_context_run");
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_imap_jobs()");
eprintln!("ignoring careless call to dc_run()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_inbox_jobs(ctx))
.with_inner(|ctx| {
eprintln!("RUN");
ctx.run(|_ctx, event| {
translate_cb(ffi_context, cb, event);
});
eprintln!("RUN DONE");
})
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_imap_fetch(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_imap_fetch()");
return;
/// Translates the callback from the rust style to the C-style version.
unsafe fn translate_cb(ctx: &ContextWrapper, ffi_cb: Option<dc_callback_t>, event: Event) {
if let Some(ffi_cb) = ffi_cb {
let event_id = event.as_id();
match event {
Event::Info(msg)
| Event::SmtpConnected(msg)
| Event::ImapConnected(msg)
| Event::SmtpMessageSent(msg)
| Event::ImapMessageDeleted(msg)
| Event::ImapMessageMoved(msg)
| Event::ImapFolderEmptied(msg)
| Event::NewBlobFile(msg)
| Event::DeletedBlobFile(msg)
| Event::Warning(msg)
| Event::Error(msg)
| Event::ErrorNetwork(msg)
| Event::ErrorSelfNotInGroup(msg) => {
let data2 = CString::new(msg).unwrap_or_default();
ffi_cb(ctx, event_id, 0, data2.as_ptr() as uintptr_t);
}
Event::MsgsChanged { chat_id, msg_id }
| Event::IncomingMsg { chat_id, msg_id }
| Event::MsgDelivered { chat_id, msg_id }
| Event::MsgFailed { chat_id, msg_id }
| Event::MsgRead { chat_id, msg_id } => {
ffi_cb(
ctx,
event_id,
chat_id.to_u32() as uintptr_t,
msg_id.to_u32() as uintptr_t,
);
}
Event::ChatModified(chat_id) => {
ffi_cb(ctx, event_id, chat_id.to_u32() as uintptr_t, 0);
}
Event::ContactsChanged(id) | Event::LocationChanged(id) => {
let id = id.unwrap_or_default();
ffi_cb(ctx, event_id, id as uintptr_t, 0);
}
Event::ConfigureProgress(progress) | Event::ImexProgress(progress) => {
ffi_cb(ctx, event_id, progress as uintptr_t, 0);
}
Event::ImexFileWritten(file) => {
let data1 = file.to_c_string().unwrap_or_default();
ffi_cb(ctx, event_id, data1.as_ptr() as uintptr_t, 0);
}
Event::SecurejoinInviterProgress {
contact_id,
progress,
}
| Event::SecurejoinJoinerProgress {
contact_id,
progress,
} => {
ffi_cb(
ctx,
event_id,
contact_id as uintptr_t,
progress as uintptr_t,
);
}
Event::SecurejoinMemberAdded {
chat_id,
contact_id,
} => {
ffi_cb(
ctx,
event_id,
chat_id.to_u32() as uintptr_t,
contact_id as uintptr_t,
);
}
}
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_inbox_fetch(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_imap_idle(context: *mut dc_context_t) {
// TODO rename function in co-ordination with UIs
pub unsafe extern "C" fn dc_context_shutdown(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_imap_idle()");
eprintln!("ignoring careless call to dc_shutdown()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_inbox_idle(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_interrupt_imap_idle(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_interrupt_imap_idle()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::interrupt_inbox_idle(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_mvbox_fetch(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_mvbox_fetch()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_mvbox_fetch(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_mvbox_jobs(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_mvbox_jobs()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_mvbox_jobs(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_mvbox_idle(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_mvbox_idle()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_mvbox_idle(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_interrupt_mvbox_idle(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_interrupt_mvbox_idle()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::interrupt_mvbox_idle(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_sentbox_fetch(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_sentbox_fetch()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_sentbox_fetch(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_sentbox_jobs(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_sentbox_jobs()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_sentbox_jobs(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_sentbox_idle(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_sentbox_idle()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_sentbox_idle(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_interrupt_sentbox_idle(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_interrupt_sentbox_idle()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::interrupt_sentbox_idle(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_smtp_jobs(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_smtp_jobs()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_smtp_jobs(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_smtp_idle(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_smtp_idle()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_smtp_idle(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_interrupt_smtp_idle(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_interrupt_smtp_idle()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::interrupt_smtp_idle(ctx))
.with_inner(|ctx| {
eprintln!("SHUTDOWN");
ctx.shutdown();
eprintln!("SHUTDOWN:DONE")
})
.unwrap_or(())
}

View File

@@ -9,22 +9,17 @@ extern crate deltachat;
#[macro_use]
extern crate failure;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate rusqlite;
use std::borrow::Cow::{self, Borrowed, Owned};
use std::io::{self, Write};
use std::path::Path;
use std::process::Command;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, RwLock};
use deltachat::chat::ChatId;
use deltachat::config;
use deltachat::configure::*;
use deltachat::context::*;
use deltachat::job::*;
use deltachat::oauth2::*;
use deltachat::securejoin::*;
use deltachat::Event;
@@ -114,107 +109,6 @@ fn receive_event(_context: &Context, event: Event) {
}
}
// Threads for waiting for messages and for jobs
lazy_static! {
static ref HANDLE: Arc<Mutex<Option<Handle>>> = Arc::new(Mutex::new(None));
static ref IS_RUNNING: AtomicBool = AtomicBool::new(true);
}
struct Handle {
handle_imap: Option<std::thread::JoinHandle<()>>,
handle_mvbox: Option<std::thread::JoinHandle<()>>,
handle_sentbox: Option<std::thread::JoinHandle<()>>,
handle_smtp: Option<std::thread::JoinHandle<()>>,
}
macro_rules! while_running {
($code:block) => {
if IS_RUNNING.load(Ordering::Relaxed) {
$code
} else {
break;
}
};
}
fn start_threads(c: Arc<RwLock<Context>>) {
if HANDLE.clone().lock().unwrap().is_some() {
return;
}
println!("Starting threads");
IS_RUNNING.store(true, Ordering::Relaxed);
let ctx = c.clone();
let handle_imap = std::thread::spawn(move || loop {
while_running!({
perform_inbox_jobs(&ctx.read().unwrap());
perform_inbox_fetch(&ctx.read().unwrap());
while_running!({
let context = ctx.read().unwrap();
perform_inbox_idle(&context);
});
});
});
let ctx = c.clone();
let handle_mvbox = std::thread::spawn(move || loop {
while_running!({
perform_mvbox_fetch(&ctx.read().unwrap());
while_running!({
perform_mvbox_idle(&ctx.read().unwrap());
});
});
});
let ctx = c.clone();
let handle_sentbox = std::thread::spawn(move || loop {
while_running!({
perform_sentbox_fetch(&ctx.read().unwrap());
while_running!({
perform_sentbox_idle(&ctx.read().unwrap());
});
});
});
let ctx = c;
let handle_smtp = std::thread::spawn(move || loop {
while_running!({
perform_smtp_jobs(&ctx.read().unwrap());
while_running!({
perform_smtp_idle(&ctx.read().unwrap());
});
});
});
*HANDLE.clone().lock().unwrap() = Some(Handle {
handle_imap: Some(handle_imap),
handle_mvbox: Some(handle_mvbox),
handle_sentbox: Some(handle_sentbox),
handle_smtp: Some(handle_smtp),
});
}
fn stop_threads(context: &Context) {
if let Some(ref mut handle) = *HANDLE.clone().lock().unwrap() {
println!("Stopping threads");
IS_RUNNING.store(false, Ordering::Relaxed);
interrupt_inbox_idle(context);
interrupt_mvbox_idle(context);
interrupt_sentbox_idle(context);
interrupt_smtp_idle(context);
handle.handle_imap.take().unwrap().join().unwrap();
handle.handle_mvbox.take().unwrap().join().unwrap();
handle.handle_sentbox.take().unwrap().join().unwrap();
handle.handle_smtp.take().unwrap().join().unwrap();
}
}
// === The main loop
struct DcHelper {
completer: FilenameCompleter,
highlighter: MatchingBracketHighlighter,
@@ -420,9 +314,7 @@ fn main_0(args: Vec<String>) -> Result<(), failure::Error> {
}
rl.save_history(".dc-history.txt")?;
println!("history saved");
{
stop_threads(&ctx.read().unwrap());
}
ctx.read().unwrap().shutdown();
Ok(())
}
@@ -440,28 +332,20 @@ fn handle_cmd(line: &str, ctx: Arc<RwLock<Context>>) -> Result<ExitResult, failu
match arg0 {
"connect" => {
start_threads(ctx);
crossbeam::scope(|s| {
s.spawn(|_| ctx.read().unwrap().run());
})
.unwrap();
}
"disconnect" => {
stop_threads(&ctx.read().unwrap());
}
"smtp-jobs" => {
if HANDLE.clone().lock().unwrap().is_some() {
println!("smtp-jobs are already running in a thread.",);
} else {
perform_smtp_jobs(&ctx.read().unwrap());
}
}
"imap-jobs" => {
if HANDLE.clone().lock().unwrap().is_some() {
println!("inbox-jobs are already running in a thread.");
} else {
perform_inbox_jobs(&ctx.read().unwrap());
}
ctx.read().unwrap().shutdown();
}
"configure" => {
start_threads(ctx.clone());
configure(&ctx.read().unwrap());
crossbeam::scope(|s| {
s.spawn(|_| ctx.read().unwrap().run());
})
.unwrap();
ctx.read().unwrap().configure();
}
"oauth2" => {
if let Some(addr) = ctx.read().unwrap().get_config(config::Config::Addr) {
@@ -484,7 +368,11 @@ fn handle_cmd(line: &str, ctx: Arc<RwLock<Context>>) -> Result<ExitResult, failu
print!("\x1b[1;1H\x1b[2J");
}
"getqr" | "getbadqr" => {
start_threads(ctx.clone());
crossbeam::scope(|s| {
s.spawn(|_| ctx.read().unwrap().run());
})
.unwrap();
if let Some(mut qr) = dc_get_securejoin_qr(
&ctx.read().unwrap(),
ChatId::new(arg1.parse().unwrap_or_default()),
@@ -504,8 +392,12 @@ fn handle_cmd(line: &str, ctx: Arc<RwLock<Context>>) -> Result<ExitResult, failu
}
}
"joinqr" => {
start_threads(ctx.clone());
if !arg0.is_empty() {
crossbeam::scope(|s| {
s.spawn(|_| ctx.read().unwrap().run());
})
.unwrap();
dc_join_securejoin(&ctx.read().unwrap(), arg1);
}
}

View File

@@ -1,19 +1,13 @@
extern crate deltachat;
use std::sync::{Arc, RwLock};
use std::{thread, time};
use tempfile::tempdir;
use deltachat::chat;
use deltachat::chatlist::*;
use deltachat::config;
use deltachat::configure::*;
use deltachat::contact::*;
use deltachat::context::*;
use deltachat::job::{
perform_inbox_fetch, perform_inbox_idle, perform_inbox_jobs, perform_smtp_idle,
perform_smtp_jobs,
};
use deltachat::Event;
fn cb(_ctx: &Context, event: Event) {
@@ -36,77 +30,52 @@ fn main() {
let dir = tempdir().unwrap();
let dbfile = dir.path().join("db.sqlite");
println!("creating database {:?}", dbfile);
let ctx =
Context::new(Box::new(cb), "FakeOs".into(), dbfile).expect("Failed to create context");
let running = Arc::new(RwLock::new(true));
let ctx = Context::new("FakeOs".into(), dbfile).expect("Failed to create context");
let info = ctx.get_info();
let duration = time::Duration::from_millis(4000);
let duration = time::Duration::from_millis(8000);
println!("info: {:#?}", info);
let ctx = Arc::new(ctx);
let ctx1 = ctx.clone();
let r1 = running.clone();
let t1 = thread::spawn(move || {
while *r1.read().unwrap() {
perform_inbox_jobs(&ctx1);
if *r1.read().unwrap() {
perform_inbox_fetch(&ctx1);
crossbeam::scope(|s| {
let t1 = s.spawn(|_| {
ctx.run(cb);
});
if *r1.read().unwrap() {
perform_inbox_idle(&ctx1);
}
}
println!("configuring");
let args = std::env::args().collect::<Vec<String>>();
assert_eq!(args.len(), 2, "missing password");
let pw = args[1].clone();
ctx.set_config(config::Config::Addr, Some("d@testrun.org"))
.unwrap();
ctx.set_config(config::Config::MailPw, Some(&pw)).unwrap();
ctx.configure();
thread::sleep(duration);
println!("sending a message");
let contact_id =
Contact::create(&ctx, "dignifiedquire", "dignifiedquire@gmail.com").unwrap();
let chat_id = chat::create_by_contact_id(&ctx, contact_id).unwrap();
chat::send_text_msg(&ctx, chat_id, "Hi, here is my first message!".into()).unwrap();
println!("fetching chats..");
let chats = Chatlist::try_load(&ctx, 0, None, None).unwrap();
for i in 0..chats.len() {
let summary = chats.get_summary(&ctx, 0, None);
let text1 = summary.get_text1();
let text2 = summary.get_text2();
println!("chat: {} - {:?} - {:?}", i, text1, text2,);
}
});
let ctx1 = ctx.clone();
let r1 = running.clone();
let t2 = thread::spawn(move || {
while *r1.read().unwrap() {
perform_smtp_jobs(&ctx1);
if *r1.read().unwrap() {
perform_smtp_idle(&ctx1);
}
}
});
thread::sleep(duration);
println!("configuring");
let args = std::env::args().collect::<Vec<String>>();
assert_eq!(args.len(), 2, "missing password");
let pw = args[1].clone();
ctx.set_config(config::Config::Addr, Some("d@testrun.org"))
.unwrap();
ctx.set_config(config::Config::MailPw, Some(&pw)).unwrap();
configure(&ctx);
println!("stopping threads");
ctx.shutdown();
thread::sleep(duration);
println!("joining");
t1.join().unwrap();
println!("sending a message");
let contact_id = Contact::create(&ctx, "dignifiedquire", "dignifiedquire@gmail.com").unwrap();
let chat_id = chat::create_by_contact_id(&ctx, contact_id).unwrap();
chat::send_text_msg(&ctx, chat_id, "Hi, here is my first message!".into()).unwrap();
println!("fetching chats..");
let chats = Chatlist::try_load(&ctx, 0, None, None).unwrap();
for i in 0..chats.len() {
let summary = chats.get_summary(&ctx, 0, None);
let text1 = summary.get_text1();
let text2 = summary.get_text2();
println!("chat: {} - {:?} - {:?}", i, text1, text2,);
}
thread::sleep(duration);
println!("stopping threads");
*running.write().unwrap() = false;
deltachat::job::interrupt_inbox_idle(&ctx);
deltachat::job::interrupt_smtp_idle(&ctx);
println!("joining");
t1.join().unwrap();
t2.join().unwrap();
println!("closing");
println!("closing");
})
.unwrap();
}

7
python/fail_test.py Normal file
View File

@@ -0,0 +1,7 @@
from __future__ import print_function
from deltachat import capi
from deltachat.capi import ffi, lib
if __name__ == "__main__":
ctx = capi.lib.dc_context_new(ffi.NULL, ffi.NULL)
lib.dc_context_shutdown(ctx)

View File

@@ -38,7 +38,7 @@ class Account(object):
:param debug: turn on debug logging for events.
"""
self._dc_context = ffi.gc(
lib.dc_context_new(lib.py_dc_callback, ffi.NULL, as_dc_charpointer(os_name)),
lib.dc_context_new(ffi.NULL, as_dc_charpointer(os_name)),
_destroy_dc_context,
)
if eventlogging:
@@ -380,8 +380,6 @@ class Account(object):
def _export(self, path, imex_cmd):
self._imex_events_clear()
lib.dc_imex(self._dc_context, imex_cmd, as_dc_charpointer(path), ffi.NULL)
if not self._threads.is_started():
lib.dc_perform_imap_jobs(self._dc_context)
files_written = []
while True:
ev = self._imex_events.get()
@@ -410,8 +408,6 @@ class Account(object):
def _import(self, path, imex_cmd):
self._imex_events_clear()
lib.dc_imex(self._dc_context, imex_cmd, as_dc_charpointer(path), ffi.NULL)
if not self._threads.is_started():
lib.dc_perform_imap_jobs(self._dc_context)
if not self._imex_events.get():
raise ValueError("import from path '{}' failed".format(path))
@@ -490,7 +486,7 @@ class Account(object):
ev = self._evlogger.get_matching("DC_EVENT_INCOMING_MSG")
return self.get_message_by_id(ev[2])
def start_threads(self, mvbox=False, sentbox=False):
def start_threads(self):
""" start IMAP/SMTP threads (and configure account if it hasn't happened).
:raises: ValueError if 'addr' or 'mail_pw' are not configured.
@@ -498,7 +494,7 @@ class Account(object):
"""
if not self.is_configured():
self.configure()
self._threads.start(mvbox=mvbox, sentbox=sentbox)
self._threads.start()
def stop_threads(self, wait=True):
""" stop IMAP/SMTP threads. """
@@ -560,16 +556,9 @@ class IOThreads:
def is_started(self):
return len(self._name2thread) > 0
def start(self, imap=True, smtp=True, mvbox=False, sentbox=False):
def start(self):
assert not self.is_started()
if imap:
self._start_one_thread("inbox", self.imap_thread_run)
if mvbox:
self._start_one_thread("mvbox", self.mvbox_thread_run)
if sentbox:
self._start_one_thread("sentbox", self.sentbox_thread_run)
if smtp:
self._start_one_thread("smtp", self.smtp_thread_run)
self._start_one_thread("deltachat", self.dc_thread_run)
def _start_one_thread(self, name, func):
self._name2thread[name] = t = threading.Thread(target=func, name=name)
@@ -577,58 +566,18 @@ class IOThreads:
t.start()
def stop(self, wait=False):
self._thread_quitflag = True
# Workaround for a race condition. Make sure that thread is
# not in between checking for quitflag and entering idle.
time.sleep(0.5)
lib.dc_interrupt_imap_idle(self._dc_context)
lib.dc_interrupt_smtp_idle(self._dc_context)
lib.dc_interrupt_mvbox_idle(self._dc_context)
lib.dc_interrupt_sentbox_idle(self._dc_context)
lib.dc_context_shutdown(self._dc_context)
if wait:
for name, thread in self._name2thread.items():
thread.join()
def imap_thread_run(self):
self._log_event("py-bindings-info", 0, "INBOX THREAD START")
while not self._thread_quitflag:
lib.dc_perform_imap_jobs(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_imap_fetch(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_imap_idle(self._dc_context)
def dc_thread_run(self):
self._log_event("py-bindings-info", 0, "DC THREAD START")
lib.dc_context_run(self._dc_context, lib.py_dc_callback)
self._log_event("py-bindings-info", 0, "INBOX THREAD FINISHED")
def mvbox_thread_run(self):
self._log_event("py-bindings-info", 0, "MVBOX THREAD START")
while not self._thread_quitflag:
lib.dc_perform_mvbox_jobs(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_mvbox_fetch(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_mvbox_idle(self._dc_context)
self._log_event("py-bindings-info", 0, "MVBOX THREAD FINISHED")
def sentbox_thread_run(self):
self._log_event("py-bindings-info", 0, "SENTBOX THREAD START")
while not self._thread_quitflag:
lib.dc_perform_sentbox_jobs(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_sentbox_fetch(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_sentbox_idle(self._dc_context)
self._log_event("py-bindings-info", 0, "SENTBOX THREAD FINISHED")
def smtp_thread_run(self):
self._log_event("py-bindings-info", 0, "SMTP THREAD START")
while not self._thread_quitflag:
lib.dc_perform_smtp_jobs(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_smtp_idle(self._dc_context)
self._log_event("py-bindings-info", 0, "SMTP THREAD FINISHED")
class EventLogger:
_loglock = threading.RLock()

View File

@@ -182,7 +182,7 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig):
def get_online_configuring_account(self, mvbox=False, sentbox=False):
ac, configdict = self.get_online_config()
ac.configure(**configdict)
ac.start_threads(mvbox=mvbox, sentbox=sentbox)
ac.start_threads()
return ac
def get_one_online_account(self):

View File

@@ -176,6 +176,7 @@ class TestOfflineChat:
assert d["draft"] == "" if chat.get_draft() is None else chat.get_draft()
def test_group_chat_creation_with_translation(self, ac1):
ac1.start_threads()
ac1.set_stock_translation(const.DC_STR_NEWGROUPDRAFT, "xyz %1$s")
ac1._evlogger.consume_events()
with pytest.raises(ValueError):
@@ -195,6 +196,7 @@ class TestOfflineChat:
assert not chat.is_promoted()
msg = chat.get_draft()
assert msg.text == "xyz title1"
ac1.stop_threads()
@pytest.mark.parametrize("verified", [True, False])
def test_group_chat_qr(self, acfactory, ac1, verified):

View File

@@ -1,25 +1,48 @@
from __future__ import print_function
import threading
from deltachat import capi, cutil, const, set_context_callback, clear_context_callback
from deltachat.capi import ffi
from deltachat.capi import lib
from deltachat.account import EventLogger
class EventThread(threading.Thread):
def __init__(self, dc_context):
self.dc_context = dc_context
super(EventThread, self).__init__()
self.setDaemon(1)
def run(self):
lib.dc_context_run(self.dc_context, lib.py_dc_callback)
def stop(self):
lib.dc_context_shutdown(self.dc_context)
def test_empty_context():
ctx = capi.lib.dc_context_new(capi.ffi.NULL, capi.ffi.NULL, capi.ffi.NULL)
ctx = capi.lib.dc_context_new(capi.ffi.NULL, capi.ffi.NULL)
capi.lib.dc_close(ctx)
def test_callback_None2int():
ctx = capi.lib.dc_context_new(capi.lib.py_dc_callback, ffi.NULL, ffi.NULL)
ctx = capi.lib.dc_context_new(ffi.NULL, ffi.NULL)
set_context_callback(ctx, lambda *args: None)
capi.lib.dc_close(ctx)
clear_context_callback(ctx)
def test_start_stop_event_thread_basic():
print("1")
ctx = capi.lib.dc_context_new(ffi.NULL, ffi.NULL)
print("2")
ev_thread = EventThread(ctx)
print("3 -- starting event thread")
ev_thread.start()
print("4 -- stopping event thread")
ev_thread.stop()
def test_dc_close_events(tmpdir):
ctx = ffi.gc(
capi.lib.dc_context_new(capi.lib.py_dc_callback, ffi.NULL, ffi.NULL),
capi.lib.dc_context_new(ffi.NULL, ffi.NULL),
lib.dc_context_unref,
)
evlog = EventLogger(ctx)
@@ -28,6 +51,9 @@ def test_dc_close_events(tmpdir):
ctx,
lambda ctx, evt_name, data1, data2: evlog(evt_name, data1, data2)
)
ev_thread = EventThread(ctx)
ev_thread.start()
p = tmpdir.join("hello.db")
lib.dc_open(ctx, p.strpath.encode("ascii"), ffi.NULL)
capi.lib.dc_close(ctx)
@@ -46,11 +72,12 @@ def test_dc_close_events(tmpdir):
find("disconnecting mvbox-thread")
find("disconnecting SMTP")
find("Database closed")
ev_thread.stop()
def test_wrong_db(tmpdir):
dc_context = ffi.gc(
lib.dc_context_new(lib.py_dc_callback, ffi.NULL, ffi.NULL),
lib.dc_context_new(ffi.NULL, ffi.NULL),
lib.dc_context_unref,
)
p = tmpdir.join("hello.db")
@@ -62,7 +89,7 @@ def test_wrong_db(tmpdir):
def test_empty_blobdir(tmpdir):
# Apparently some client code expects this to be the same as passing NULL.
ctx = ffi.gc(
lib.dc_context_new(lib.py_dc_callback, ffi.NULL, ffi.NULL),
lib.dc_context_new(ffi.NULL, ffi.NULL),
lib.dc_context_unref,
)
db_fname = tmpdir.join("hello.db")
@@ -119,7 +146,7 @@ def test_provider_info_none():
def test_get_info_closed():
ctx = ffi.gc(
lib.dc_context_new(lib.py_dc_callback, ffi.NULL, ffi.NULL),
lib.dc_context_new(ffi.NULL, ffi.NULL),
lib.dc_context_unref,
)
info = cutil.from_dc_charpointer(lib.dc_get_info(ctx))
@@ -129,7 +156,7 @@ def test_get_info_closed():
def test_get_info_open(tmpdir):
ctx = ffi.gc(
lib.dc_context_new(lib.py_dc_callback, ffi.NULL, ffi.NULL),
lib.dc_context_new(ffi.NULL, ffi.NULL),
lib.dc_context_unref,
)
db_fname = tmpdir.join("test.db")
@@ -141,7 +168,7 @@ def test_get_info_open(tmpdir):
def test_is_open_closed():
ctx = ffi.gc(
lib.dc_context_new(lib.py_dc_callback, ffi.NULL, ffi.NULL),
lib.dc_context_new(ffi.NULL, ffi.NULL),
lib.dc_context_unref,
)
assert lib.dc_is_open(ctx) == 0
@@ -149,7 +176,7 @@ def test_is_open_closed():
def test_is_open_actually_open(tmpdir):
ctx = ffi.gc(
lib.dc_context_new(lib.py_dc_callback, ffi.NULL, ffi.NULL),
lib.dc_context_new(ffi.NULL, ffi.NULL),
lib.dc_context_unref,
)
db_fname = tmpdir.join("test.db")

View File

@@ -1 +1 @@
nightly-2019-11-06
nightly-2020-01-26

View File

@@ -13,10 +13,9 @@ use crate::constants::*;
use crate::context::Context;
use crate::dc_tools::*;
use crate::e2ee;
use crate::job::{self, job_add, job_kill_action};
use crate::job;
use crate::login_param::{CertificateChecks, LoginParam};
use crate::oauth2::*;
use crate::param::Params;
use auto_mozilla::moz_autoconfigure;
use auto_outlook::outlk_autodiscover;
@@ -31,21 +30,6 @@ macro_rules! progress {
};
}
// connect
pub fn configure(context: &Context) {
if context.has_ongoing() {
warn!(context, "There is already another ongoing process running.",);
return;
}
job_kill_action(context, job::Action::ConfigureImap);
job_add(context, job::Action::ConfigureImap, 0, Params::new(), 0);
}
/// Check if the context is already configured.
pub fn dc_is_configured(context: &Context) -> bool {
context.sql.get_raw_config_bool(context, "configured")
}
/*******************************************************************************
* Configure JOB
******************************************************************************/

View File

@@ -3,7 +3,13 @@
use std::collections::HashMap;
use std::ffi::OsString;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Condvar, Mutex, RwLock,
};
use crossbeam::channel::select;
use crossbeam::channel::{bounded, unbounded, Receiver, Sender};
use crate::chat::*;
use crate::config::Config;
@@ -12,7 +18,7 @@ use crate::contact::*;
use crate::error::*;
use crate::events::Event;
use crate::imap::*;
use crate::job::*;
use crate::job;
use crate::job_thread::JobThread;
use crate::key::*;
use crate::login_param::LoginParam;
@@ -32,7 +38,7 @@ use crate::sql::Sql;
/// * `data2` - Depends on the event parameter, see [Event].
pub type ContextCallback = dyn Fn(&Context, Event) -> () + Send + Sync;
#[derive(DebugStub)]
#[derive(Debug)]
pub struct Context {
/// Database file path
dbfile: PathBuf,
@@ -47,8 +53,6 @@ pub struct Context {
pub smtp: Arc<Mutex<Smtp>>,
pub smtp_state: Arc<(Mutex<SmtpState>, Condvar)>,
pub oauth2_critical: Arc<Mutex<()>>,
#[debug_stub = "Callback"]
cb: Box<ContextCallback>,
pub os_name: Option<String>,
pub cmdline_sel_chat_id: Arc<RwLock<ChatId>>,
pub bob: Arc<RwLock<BobStatus>>,
@@ -57,6 +61,13 @@ pub struct Context {
/// Mutex to avoid generating the key for the user more than once.
pub generating_key_mutex: Mutex<()>,
pub translated_stockstrings: RwLock<HashMap<usize, String>>,
event_sender: Sender<Event>,
event_receiver: Receiver<Event>,
shutdown_sender: Sender<()>,
shutdown_receiver: Receiver<()>,
is_running: AtomicBool,
}
#[derive(Debug, PartialEq, Eq)]
@@ -80,11 +91,19 @@ pub fn get_info() -> HashMap<&'static str, String> {
res
}
macro_rules! while_running {
($self:expr, $code:block) => {
if $self.is_running.load(Ordering::Relaxed) {
$code
} else {
break;
}
};
}
impl Context {
/// Creates new context.
pub fn new(cb: Box<ContextCallback>, os_name: String, dbfile: PathBuf) -> Result<Context> {
pretty_env_logger::try_init_timed().ok();
pub fn new(os_name: String, dbfile: PathBuf) -> Result<Context> {
let mut blob_fname = OsString::new();
blob_fname.push(dbfile.file_name().unwrap_or_default());
blob_fname.push("-blobs");
@@ -92,24 +111,22 @@ impl Context {
if !blobdir.exists() {
std::fs::create_dir_all(&blobdir)?;
}
Context::with_blobdir(cb, os_name, dbfile, blobdir)
Context::with_blobdir(os_name, dbfile, blobdir)
}
pub fn with_blobdir(
cb: Box<ContextCallback>,
os_name: String,
dbfile: PathBuf,
blobdir: PathBuf,
) -> Result<Context> {
pub fn with_blobdir(os_name: String, dbfile: PathBuf, blobdir: PathBuf) -> Result<Context> {
ensure!(
blobdir.is_dir(),
"Blobdir does not exist: {}",
blobdir.display()
);
let (event_sender, event_receiver) = unbounded();
let (shutdown_sender, shutdown_receiver) = bounded(0);
let ctx = Context {
blobdir,
dbfile,
cb,
os_name: Some(os_name),
running_state: Arc::new(RwLock::new(Default::default())),
sql: Sql::new(),
@@ -138,6 +155,11 @@ impl Context {
perform_inbox_jobs_needed: Arc::new(RwLock::new(false)),
generating_key_mutex: Mutex::new(()),
translated_stockstrings: RwLock::new(HashMap::new()),
event_sender,
event_receiver,
shutdown_sender,
shutdown_receiver,
is_running: Default::default(),
};
ensure!(
@@ -159,7 +181,98 @@ impl Context {
}
pub fn call_cb(&self, event: Event) {
(*self.cb)(self, event);
self.event_sender.send(event).unwrap();
}
/// Start the run loop.
pub fn run<F>(&self, cb: F)
where
F: Fn(&Context, Event) -> () + Send + Sync,
{
if self.is_running.swap(true, Ordering::Relaxed) {
panic!("Run can only be called once");
}
crossbeam::scope(|s| {
let imap_handle = s.spawn(|_| loop {
while_running!(self, {
job::perform_inbox_jobs(self);
while_running!(self, {
job::perform_inbox_fetch(self);
while_running!(self, { job::perform_inbox_idle(self) });
});
});
});
let mvbox_handle = s.spawn(|_| loop {
while_running!(self, {
job::perform_mvbox_fetch(self);
while_running!(self, {
job::perform_mvbox_idle(self);
});
});
});
let sentbox_handle = s.spawn(|_| loop {
while_running!(self, {
job::perform_sentbox_fetch(self);
while_running!(self, {
job::perform_sentbox_idle(self);
});
});
});
let smtp_handle = s.spawn(|_| loop {
while_running!(self, {
job::perform_smtp_jobs(self);
while_running!(self, {
job::perform_smtp_idle(self);
});
});
});
loop {
select! {
recv(self.event_receiver) -> event => {
// This gurantees that the callback is always called from the thread
// that called `run`.
cb(self, event.unwrap())
},
recv(self.shutdown_receiver) -> _ => break,
}
}
imap_handle.join().unwrap();
mvbox_handle.join().unwrap();
sentbox_handle.join().unwrap();
smtp_handle.join().unwrap();
})
.unwrap()
}
/// Stop the run loop. Blocks until all threads have shutdown.
pub fn shutdown(&self) {
if !self.is_running.swap(false, Ordering::Relaxed) {
// already shutdown
return;
}
self.shutdown_sender.send(()).unwrap();
job::interrupt_inbox_idle(self);
job::interrupt_mvbox_idle(self);
job::interrupt_sentbox_idle(self);
job::interrupt_smtp_idle(self);
}
pub fn configure(&self) {
if self.has_ongoing() {
warn!(self, "There is already another ongoing process running.");
return;
}
job::job_kill_action(self, job::Action::ConfigureImap);
job::job_add(self, job::Action::ConfigureImap, 0, Params::new(), 0);
}
/// Check if the context is already configured.
pub fn is_configured(&self) -> bool {
self.sql.get_raw_config_bool(self, "configured")
}
/*******************************************************************************
@@ -441,9 +554,9 @@ impl Context {
match msg.is_dc_message {
MessengerMessage::No => {}
MessengerMessage::Yes | MessengerMessage::Reply => {
job_add(
job::job_add(
self,
Action::MoveMsg,
job::Action::MoveMsg,
msg.id.to_u32() as i32,
Params::new(),
0,
@@ -521,7 +634,7 @@ mod tests {
let tmp = tempfile::tempdir().unwrap();
let dbfile = tmp.path().join("db.sqlite");
std::fs::write(&dbfile, b"123").unwrap();
let res = Context::new(Box::new(|_, _| ()), "FakeOs".into(), dbfile);
let res = Context::new("FakeOs".into(), dbfile);
assert!(res.is_err());
}
@@ -536,7 +649,7 @@ mod tests {
fn test_blobdir_exists() {
let tmp = tempfile::tempdir().unwrap();
let dbfile = tmp.path().join("db.sqlite");
Context::new(Box::new(|_, _| ()), "FakeOS".into(), dbfile).unwrap();
Context::new("FakeOS".into(), dbfile).unwrap();
let blobdir = tmp.path().join("db.sqlite-blobs");
assert!(blobdir.is_dir());
}
@@ -547,7 +660,7 @@ mod tests {
let dbfile = tmp.path().join("db.sqlite");
let blobdir = tmp.path().join("db.sqlite-blobs");
std::fs::write(&blobdir, b"123").unwrap();
let res = Context::new(Box::new(|_, _| ()), "FakeOS".into(), dbfile);
let res = Context::new("FakeOS".into(), dbfile);
assert!(res.is_err());
}
@@ -557,7 +670,7 @@ mod tests {
let subdir = tmp.path().join("subdir");
let dbfile = subdir.join("db.sqlite");
let dbfile2 = dbfile.clone();
Context::new(Box::new(|_, _| ()), "FakeOS".into(), dbfile).unwrap();
Context::new("FakeOS".into(), dbfile).unwrap();
assert!(subdir.is_dir());
assert!(dbfile2.is_file());
}
@@ -567,7 +680,7 @@ mod tests {
let tmp = tempfile::tempdir().unwrap();
let dbfile = tmp.path().join("db.sqlite");
let blobdir = PathBuf::new();
let res = Context::with_blobdir(Box::new(|_, _| ()), "FakeOS".into(), dbfile, blobdir);
let res = Context::with_blobdir("FakeOS".into(), dbfile, blobdir);
assert!(res.is_err());
}
@@ -576,7 +689,7 @@ mod tests {
let tmp = tempfile::tempdir().unwrap();
let dbfile = tmp.path().join("db.sqlite");
let blobdir = tmp.path().join("blobs");
let res = Context::with_blobdir(Box::new(|_, _| ()), "FakeOS".into(), dbfile, blobdir);
let res = Context::with_blobdir("FakeOS".into(), dbfile, blobdir);
assert!(res.is_err());
}

View File

@@ -10,7 +10,6 @@ use crate::blob::BlobObject;
use crate::chat;
use crate::chat::delete_and_reset_all_device_msgs;
use crate::config::Config;
use crate::configure::*;
use crate::constants::*;
use crate::context::Context;
use crate::dc_tools::*;
@@ -423,7 +422,7 @@ fn import_backup(context: &Context, backup_to_import: impl AsRef<Path>) -> Resul
);
ensure!(
!dc_is_configured(context),
!context.is_configured(),
"Cannot import backups to accounts in use."
);
context.sql.close(&context);

View File

@@ -553,7 +553,7 @@ pub fn job_kill_ids(context: &Context, job_ids: &[u32]) -> sql::Result<()> {
)
}
pub fn perform_inbox_fetch(context: &Context) {
pub(crate) fn perform_inbox_fetch(context: &Context) {
let use_network = context.get_config_bool(Config::InboxWatch);
task::block_on(
@@ -565,7 +565,7 @@ pub fn perform_inbox_fetch(context: &Context) {
);
}
pub fn perform_mvbox_fetch(context: &Context) {
pub(crate) fn perform_mvbox_fetch(context: &Context) {
let use_network = context.get_config_bool(Config::MvboxWatch);
task::block_on(
@@ -577,7 +577,7 @@ pub fn perform_mvbox_fetch(context: &Context) {
);
}
pub fn perform_sentbox_fetch(context: &Context) {
pub(crate) fn perform_sentbox_fetch(context: &Context) {
let use_network = context.get_config_bool(Config::SentboxWatch);
task::block_on(
@@ -589,7 +589,7 @@ pub fn perform_sentbox_fetch(context: &Context) {
);
}
pub fn perform_inbox_idle(context: &Context) {
pub(crate) fn perform_inbox_idle(context: &Context) {
if *context.perform_inbox_jobs_needed.clone().read().unwrap() {
info!(
context,
@@ -606,7 +606,7 @@ pub fn perform_inbox_idle(context: &Context) {
.idle(context, use_network);
}
pub fn perform_mvbox_idle(context: &Context) {
pub(crate) fn perform_mvbox_idle(context: &Context) {
let use_network = context.get_config_bool(Config::MvboxWatch);
context
@@ -616,7 +616,7 @@ pub fn perform_mvbox_idle(context: &Context) {
.idle(context, use_network);
}
pub fn perform_sentbox_idle(context: &Context) {
pub(crate) fn perform_sentbox_idle(context: &Context) {
let use_network = context.get_config_bool(Config::SentboxWatch);
context
@@ -626,7 +626,7 @@ pub fn perform_sentbox_idle(context: &Context) {
.idle(context, use_network);
}
pub fn interrupt_inbox_idle(context: &Context) {
pub(crate) fn interrupt_inbox_idle(context: &Context) {
info!(context, "interrupt_inbox_idle called");
// we do not block on trying to obtain the thread lock
// because we don't know in which state the thread is.
@@ -643,11 +643,11 @@ pub fn interrupt_inbox_idle(context: &Context) {
}
}
pub fn interrupt_mvbox_idle(context: &Context) {
pub(crate) fn interrupt_mvbox_idle(context: &Context) {
context.mvbox_thread.read().unwrap().interrupt_idle(context);
}
pub fn interrupt_sentbox_idle(context: &Context) {
pub(crate) fn interrupt_sentbox_idle(context: &Context) {
context
.sentbox_thread
.read()
@@ -655,7 +655,7 @@ pub fn interrupt_sentbox_idle(context: &Context) {
.interrupt_idle(context);
}
pub fn perform_smtp_jobs(context: &Context) {
pub(crate) fn perform_smtp_jobs(context: &Context) {
let probe_smtp_network = {
let &(ref lock, _) = &*context.smtp_state.clone();
let mut state = lock.lock().unwrap();
@@ -684,7 +684,7 @@ pub fn perform_smtp_jobs(context: &Context) {
}
}
pub fn perform_smtp_idle(context: &Context) {
pub(crate) fn perform_smtp_idle(context: &Context) {
info!(context, "SMTP-idle started...",);
{
let &(ref lock, ref cvar) = &*context.smtp_state.clone();
@@ -864,7 +864,7 @@ pub fn job_send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
Ok(())
}
pub fn perform_inbox_jobs(context: &Context) {
pub(crate) fn perform_inbox_jobs(context: &Context) {
info!(context, "dc_perform_inbox_jobs starting.",);
let probe_imap_network = *context.probe_imap_network.clone().read().unwrap();
@@ -875,14 +875,6 @@ pub fn perform_inbox_jobs(context: &Context) {
info!(context, "dc_perform_inbox_jobs ended.",);
}
pub fn perform_mvbox_jobs(context: &Context) {
info!(context, "dc_perform_mbox_jobs EMPTY (for now).",);
}
pub fn perform_sentbox_jobs(context: &Context) {
info!(context, "dc_perform_sentbox_jobs EMPTY (for now).",);
}
fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
while let Some(mut job) = load_next_job(context, thread, probe_network) {
info!(context, "{}-job {} started...", thread, job);
@@ -1137,7 +1129,7 @@ pub fn job_add(
}
}
pub fn interrupt_smtp_idle(context: &Context) {
pub(crate) fn interrupt_smtp_idle(context: &Context) {
info!(context, "Interrupting SMTP-idle...",);
let &(ref lock, ref cvar) = &*context.smtp_state.clone();

View File

@@ -1,4 +1,3 @@
#![forbid(unsafe_code)]
#![deny(clippy::correctness, missing_debug_implementations, clippy::all)]
#![allow(clippy::match_bool)]

View File

@@ -203,7 +203,8 @@ impl Message {
pub fn load_from_db(context: &Context, id: MsgId) -> Result<Message, Error> {
ensure!(
!id.is_special(),
"Can not load special message IDs from DB."
"Can not load special message IDs from DB. ({:?})",
id
);
context
.sql

View File

@@ -1120,7 +1120,7 @@ fn is_file_size_okay(context: &Context, msg: &Message) -> bool {
fn render_rfc724_mid(rfc724_mid: &str) -> String {
let rfc724_mid = rfc724_mid.trim().to_string();
if rfc724_mid.chars().nth(0).unwrap_or_default() == '<' {
if rfc724_mid.chars().next().unwrap_or_default() == '<' {
rfc724_mid
} else {
format!("<{}>", rfc724_mid)

View File

@@ -2,6 +2,8 @@
//!
//! This module is only compiled for test runs.
use std::sync::Arc;
use tempfile::{tempdir, TempDir};
use crate::config::Config;
@@ -15,7 +17,7 @@ use crate::key;
/// The temporary directory can be used to store the SQLite database,
/// see e.g. [test_context] which does this.
pub struct TestContext {
pub ctx: Context,
pub ctx: Arc<Context>,
pub dir: TempDir,
}
@@ -32,7 +34,13 @@ pub fn test_context(callback: Option<Box<ContextCallback>>) -> TestContext {
Some(cb) => cb,
None => Box::new(|_, _| ()),
};
let ctx = Context::new(cb, "FakeOs".into(), dbfile).unwrap();
let ctx = Arc::new(Context::new("FakeOs".into(), dbfile).unwrap());
let ctx_1 = ctx.clone();
std::thread::spawn(move || {
ctx_1.run(|context, event| (*cb)(context, event));
});
TestContext { ctx, dir }
}

View File

@@ -1,6 +1,7 @@
//! Stress some functions for testing; if used as a lib, this file is obsolete.
use std::collections::HashSet;
use std::sync::Arc;
use deltachat::config;
use deltachat::context::*;
@@ -207,14 +208,20 @@ fn cb(_context: &Context, _event: Event) {}
#[allow(dead_code)]
struct TestContext {
ctx: Context,
ctx: Arc<Context>,
dir: TempDir,
}
fn create_test_context() -> TestContext {
let dir = tempdir().unwrap();
let dbfile = dir.path().join("db.sqlite");
let ctx = Context::new(Box::new(cb), "FakeOs".into(), dbfile).unwrap();
let ctx = Arc::new(Context::new("FakeOs".into(), dbfile).unwrap());
let ctx_1 = ctx.clone();
std::thread::spawn(move || {
ctx_1.run(cb);
});
TestContext { ctx, dir }
}