start integration work

This commit is contained in:
dignifiedquire
2020-01-26 13:40:06 +01:00
parent 4dbcab9e6d
commit b4a5767347
11 changed files with 136 additions and 775 deletions

View File

@@ -538,307 +538,14 @@ int dc_is_configured (const dc_context_t* context);
/**
* Execute pending imap-jobs.
* This function and dc_perform_imap_fetch() and dc_perform_imap_idle()
* must be called from the same thread, typically in a loop.
*
* Example:
*
* void* imap_thread_func(void* context)
* {
* while (true) {
* dc_perform_imap_jobs(context);
* dc_perform_imap_fetch(context);
* dc_perform_imap_idle(context);
* }
* }
*
* // start imap-thread that runs forever
* pthread_t imap_thread;
* pthread_create(&imap_thread, NULL, imap_thread_func, context);
*
* ... program runs ...
*
* // network becomes available again -
* // the interrupt causes dc_perform_imap_idle() in the thread above
* // to return so that jobs are executed and messages are fetched.
* dc_maybe_network(context);
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
* TODO: Document
*/
void dc_perform_imap_jobs (dc_context_t* context);
void dc_run (dc_context_t* context);
/**
* Fetch new messages, if any.
* This function and dc_perform_imap_jobs() and dc_perform_imap_idle() must be called from the same thread,
* typically in a loop.
*
* See dc_perform_imap_jobs() for an example.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
* TODO: Document
*/
void dc_perform_imap_fetch (dc_context_t* context);
/**
* Wait for messages or jobs.
* This function and dc_perform_imap_jobs() and dc_perform_imap_fetch() must be called from the same thread,
* typically in a loop.
*
* You should call this function directly after calling dc_perform_imap_fetch().
*
* See dc_perform_imap_jobs() for an example.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_imap_idle (dc_context_t* context);
/**
* Interrupt waiting for imap-jobs.
* If dc_perform_imap_jobs(), dc_perform_imap_fetch() and dc_perform_imap_idle() are called in a loop,
* calling this function causes imap-jobs to be executed and messages to be fetched.
*
* dc_interrupt_imap_idle() does _not_ interrupt dc_perform_imap_jobs() or dc_perform_imap_fetch().
* If the imap-thread is inside one of these functions when dc_interrupt_imap_idle() is called, however,
* the next call of the imap-thread to dc_perform_imap_idle() is interrupted immediately.
*
* Internally, this function is called whenever a imap-jobs should be processed
* (delete message, markseen etc.).
*
* When you need to call this function just because to get jobs done after network changes,
* use dc_maybe_network() instead.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_interrupt_imap_idle (dc_context_t* context);
/**
* Execute pending mvbox-jobs.
* This function and dc_perform_mvbox_fetch() and dc_perform_mvbox_idle()
* must be called from the same thread, typically in a loop.
*
* Example:
*
* void* mvbox_thread_func(void* context)
* {
* while (true) {
* dc_perform_mvbox_jobs(context);
* dc_perform_mvbox_fetch(context);
* dc_perform_mvbox_idle(context);
* }
* }
*
* // start mvbox-thread that runs forever
* pthread_t mvbox_thread;
* pthread_create(&mvbox_thread, NULL, mvbox_thread_func, context);
*
* ... program runs ...
*
* // network becomes available again -
* // the interrupt causes dc_perform_mvbox_idle() in the thread above
* // to return so that jobs are executed and messages are fetched.
* dc_maybe_network(context);
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_mvbox_jobs (dc_context_t* context);
/**
* Fetch new messages from the MVBOX, if any.
* The MVBOX is a folder on the account where chat messages are moved to.
* The moving is done to not disturb shared accounts that are used by both,
* Delta Chat and a classical MUA.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_mvbox_fetch (dc_context_t* context);
/**
* Wait for messages or jobs in the MVBOX-thread.
* This function and dc_perform_mvbox_fetch().
* must be called from the same thread, typically in a loop.
*
* You should call this function directly after calling dc_perform_mvbox_fetch().
*
* See dc_perform_mvbox_fetch() for an example.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_mvbox_idle (dc_context_t* context);
/**
* Interrupt waiting for MVBOX-fetch.
* dc_interrupt_mvbox_idle() does _not_ interrupt dc_perform_mvbox_fetch().
* If the MVBOX-thread is inside this function when dc_interrupt_mvbox_idle() is called, however,
* the next call of the MVBOX-thread to dc_perform_mvbox_idle() is interrupted immediately.
*
* Internally, this function is called whenever a imap-jobs should be processed.
*
* When you need to call this function just because to get jobs done after network changes,
* use dc_maybe_network() instead.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_interrupt_mvbox_idle (dc_context_t* context);
/**
* Execute pending sentbox-jobs.
* This function and dc_perform_sentbox_fetch() and dc_perform_sentbox_idle()
* must be called from the same thread, typically in a loop.
*
* Example:
*
* void* sentbox_thread_func(void* context)
* {
* while (true) {
* dc_perform_sentbox_jobs(context);
* dc_perform_sentbox_fetch(context);
* dc_perform_sentbox_idle(context);
* }
* }
*
* // start sentbox-thread that runs forever
* pthread_t sentbox_thread;
* pthread_create(&sentbox_thread, NULL, sentbox_thread_func, context);
*
* ... program runs ...
*
* // network becomes available again -
* // the interrupt causes dc_perform_sentbox_idle() in the thread above
* // to return so that jobs are executed and messages are fetched.
* dc_maybe_network(context);
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_sentbox_jobs (dc_context_t* context);
/**
* Fetch new messages from the Sent folder, if any.
* This function and dc_perform_sentbox_idle()
* must be called from the same thread, typically in a loop.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_sentbox_fetch (dc_context_t* context);
/**
* Wait for messages or jobs in the SENTBOX-thread.
* This function and dc_perform_sentbox_fetch()
* must be called from the same thread, typically in a loop.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_sentbox_idle (dc_context_t* context);
/**
* Interrupt waiting for messages or jobs in the SENTBOX-thread.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_interrupt_sentbox_idle (dc_context_t* context);
/**
* Execute pending smtp-jobs.
* This function and dc_perform_smtp_idle() must be called from the same thread,
* typically in a loop.
*
* Example:
*
* void* smtp_thread_func(void* context)
* {
* while (true) {
* dc_perform_smtp_jobs(context);
* dc_perform_smtp_idle(context);
* }
* }
*
* // start smtp-thread that runs forever
* pthread_t smtp_thread;
* pthread_create(&smtp_thread, NULL, smtp_thread_func, context);
*
* ... program runs ...
*
* // network becomes available again -
* // the interrupt causes dc_perform_smtp_idle() in the thread above
* // to return so that jobs are executed
* dc_maybe_network(context);
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_smtp_jobs (dc_context_t* context);
/**
* Wait for smtp-jobs.
* This function and dc_perform_smtp_jobs() must be called from the same thread,
* typically in a loop.
*
* See dc_interrupt_smtp_idle() for an example.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_perform_smtp_idle (dc_context_t* context);
/**
* Interrupt waiting for smtp-jobs.
* If dc_perform_smtp_jobs() and dc_perform_smtp_idle() are called in a loop,
* calling this function causes jobs to be executed.
*
* dc_interrupt_smtp_idle() does _not_ interrupt dc_perform_smtp_jobs().
* If the smtp-thread is inside this function when dc_interrupt_smtp_idle() is called, however,
* the next call of the smtp-thread to dc_perform_smtp_idle() is interrupted immediately.
*
* Internally, this function is called whenever a message is to be sent.
*
* When you need to call this function just because to get jobs done after network changes,
* use dc_maybe_network() instead.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_interrupt_smtp_idle (dc_context_t* context);
void dc_shutdown (dc_context_t* context);
/**
* This function can be called whenever there is a hint

View File

@@ -456,7 +456,7 @@ pub unsafe extern "C" fn dc_configure(context: *mut dc_context_t) {
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| configure::configure(ctx))
.with_inner(|ctx| ctx.configure())
.unwrap_or(())
}
@@ -468,190 +468,34 @@ pub unsafe extern "C" fn dc_is_configured(context: *mut dc_context_t) -> libc::c
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| configure::dc_is_configured(ctx) as libc::c_int)
.with_inner(|ctx| ctx.is_configured() as libc::c_int)
.unwrap_or(0)
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_imap_jobs(context: *mut dc_context_t) {
pub unsafe extern "C" fn dc_run(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_imap_jobs()");
eprintln!("ignoring careless call to dc_run()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_inbox_jobs(ctx))
.with_inner(|ctx| ctx.run())
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_imap_fetch(context: *mut dc_context_t) {
pub unsafe extern "C" fn dc_shutdown(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_imap_fetch()");
eprintln!("ignoring careless call to dc_shutdown()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_inbox_fetch(ctx))
.with_inner(|ctx| ctx.shutdown())
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_imap_idle(context: *mut dc_context_t) {
// TODO rename function in co-ordination with UIs
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_imap_idle()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_inbox_idle(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_interrupt_imap_idle(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_interrupt_imap_idle()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::interrupt_inbox_idle(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_mvbox_fetch(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_mvbox_fetch()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_mvbox_fetch(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_mvbox_jobs(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_mvbox_jobs()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_mvbox_jobs(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_mvbox_idle(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_mvbox_idle()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_mvbox_idle(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_interrupt_mvbox_idle(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_interrupt_mvbox_idle()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::interrupt_mvbox_idle(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_sentbox_fetch(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_sentbox_fetch()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_sentbox_fetch(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_sentbox_jobs(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_sentbox_jobs()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_sentbox_jobs(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_sentbox_idle(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_sentbox_idle()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_sentbox_idle(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_interrupt_sentbox_idle(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_interrupt_sentbox_idle()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::interrupt_sentbox_idle(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_smtp_jobs(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_smtp_jobs()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_smtp_jobs(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_perform_smtp_idle(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_perform_smtp_idle()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::perform_smtp_idle(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_interrupt_smtp_idle(context: *mut dc_context_t) {
if context.is_null() {
eprintln!("ignoring careless call to dc_interrupt_smtp_idle()");
return;
}
let ffi_context = &*context;
ffi_context
.with_inner(|ctx| job::interrupt_smtp_idle(ctx))
.unwrap_or(())
}
#[no_mangle]
pub unsafe extern "C" fn dc_maybe_network(context: *mut dc_context_t) {

View File

@@ -9,22 +9,17 @@ extern crate deltachat;
#[macro_use]
extern crate failure;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate rusqlite;
use std::borrow::Cow::{self, Borrowed, Owned};
use std::io::{self, Write};
use std::path::Path;
use std::process::Command;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, RwLock};
use deltachat::chat::ChatId;
use deltachat::config;
use deltachat::configure::*;
use deltachat::context::*;
use deltachat::job::*;
use deltachat::oauth2::*;
use deltachat::securejoin::*;
use deltachat::Event;
@@ -114,107 +109,6 @@ fn receive_event(_context: &Context, event: Event) {
}
}
// Threads for waiting for messages and for jobs
lazy_static! {
static ref HANDLE: Arc<Mutex<Option<Handle>>> = Arc::new(Mutex::new(None));
static ref IS_RUNNING: AtomicBool = AtomicBool::new(true);
}
struct Handle {
handle_imap: Option<std::thread::JoinHandle<()>>,
handle_mvbox: Option<std::thread::JoinHandle<()>>,
handle_sentbox: Option<std::thread::JoinHandle<()>>,
handle_smtp: Option<std::thread::JoinHandle<()>>,
}
macro_rules! while_running {
($code:block) => {
if IS_RUNNING.load(Ordering::Relaxed) {
$code
} else {
break;
}
};
}
fn start_threads(c: Arc<RwLock<Context>>) {
if HANDLE.clone().lock().unwrap().is_some() {
return;
}
println!("Starting threads");
IS_RUNNING.store(true, Ordering::Relaxed);
let ctx = c.clone();
let handle_imap = std::thread::spawn(move || loop {
while_running!({
perform_inbox_jobs(&ctx.read().unwrap());
perform_inbox_fetch(&ctx.read().unwrap());
while_running!({
let context = ctx.read().unwrap();
perform_inbox_idle(&context);
});
});
});
let ctx = c.clone();
let handle_mvbox = std::thread::spawn(move || loop {
while_running!({
perform_mvbox_fetch(&ctx.read().unwrap());
while_running!({
perform_mvbox_idle(&ctx.read().unwrap());
});
});
});
let ctx = c.clone();
let handle_sentbox = std::thread::spawn(move || loop {
while_running!({
perform_sentbox_fetch(&ctx.read().unwrap());
while_running!({
perform_sentbox_idle(&ctx.read().unwrap());
});
});
});
let ctx = c;
let handle_smtp = std::thread::spawn(move || loop {
while_running!({
perform_smtp_jobs(&ctx.read().unwrap());
while_running!({
perform_smtp_idle(&ctx.read().unwrap());
});
});
});
*HANDLE.clone().lock().unwrap() = Some(Handle {
handle_imap: Some(handle_imap),
handle_mvbox: Some(handle_mvbox),
handle_sentbox: Some(handle_sentbox),
handle_smtp: Some(handle_smtp),
});
}
fn stop_threads(context: &Context) {
if let Some(ref mut handle) = *HANDLE.clone().lock().unwrap() {
println!("Stopping threads");
IS_RUNNING.store(false, Ordering::Relaxed);
interrupt_inbox_idle(context);
interrupt_mvbox_idle(context);
interrupt_sentbox_idle(context);
interrupt_smtp_idle(context);
handle.handle_imap.take().unwrap().join().unwrap();
handle.handle_mvbox.take().unwrap().join().unwrap();
handle.handle_sentbox.take().unwrap().join().unwrap();
handle.handle_smtp.take().unwrap().join().unwrap();
}
}
// === The main loop
struct DcHelper {
completer: FilenameCompleter,
highlighter: MatchingBracketHighlighter,
@@ -420,9 +314,7 @@ fn main_0(args: Vec<String>) -> Result<(), failure::Error> {
}
rl.save_history(".dc-history.txt")?;
println!("history saved");
{
stop_threads(&ctx.read().unwrap());
}
ctx.read().unwrap().shutdown();
Ok(())
}
@@ -440,28 +332,20 @@ fn handle_cmd(line: &str, ctx: Arc<RwLock<Context>>) -> Result<ExitResult, failu
match arg0 {
"connect" => {
start_threads(ctx);
crossbeam::scope(|s| {
s.spawn(|_| ctx.read().unwrap().run());
})
.unwrap();
}
"disconnect" => {
stop_threads(&ctx.read().unwrap());
}
"smtp-jobs" => {
if HANDLE.clone().lock().unwrap().is_some() {
println!("smtp-jobs are already running in a thread.",);
} else {
perform_smtp_jobs(&ctx.read().unwrap());
}
}
"imap-jobs" => {
if HANDLE.clone().lock().unwrap().is_some() {
println!("inbox-jobs are already running in a thread.");
} else {
perform_inbox_jobs(&ctx.read().unwrap());
}
ctx.read().unwrap().shutdown();
}
"configure" => {
start_threads(ctx.clone());
configure(&ctx.read().unwrap());
crossbeam::scope(|s| {
s.spawn(|_| ctx.read().unwrap().run());
})
.unwrap();
ctx.read().unwrap().configure();
}
"oauth2" => {
if let Some(addr) = ctx.read().unwrap().get_config(config::Config::Addr) {
@@ -484,7 +368,11 @@ fn handle_cmd(line: &str, ctx: Arc<RwLock<Context>>) -> Result<ExitResult, failu
print!("\x1b[1;1H\x1b[2J");
}
"getqr" | "getbadqr" => {
start_threads(ctx.clone());
crossbeam::scope(|s| {
s.spawn(|_| ctx.read().unwrap().run());
})
.unwrap();
if let Some(mut qr) = dc_get_securejoin_qr(
&ctx.read().unwrap(),
ChatId::new(arg1.parse().unwrap_or_default()),
@@ -504,8 +392,12 @@ fn handle_cmd(line: &str, ctx: Arc<RwLock<Context>>) -> Result<ExitResult, failu
}
}
"joinqr" => {
start_threads(ctx.clone());
if !arg0.is_empty() {
crossbeam::scope(|s| {
s.spawn(|_| ctx.read().unwrap().run());
})
.unwrap();
dc_join_securejoin(&ctx.read().unwrap(), arg1);
}
}

View File

@@ -1,19 +1,13 @@
extern crate deltachat;
use std::sync::{Arc, RwLock};
use std::{thread, time};
use tempfile::tempdir;
use deltachat::chat;
use deltachat::chatlist::*;
use deltachat::config;
use deltachat::configure::*;
use deltachat::contact::*;
use deltachat::context::*;
use deltachat::job::{
perform_inbox_fetch, perform_inbox_idle, perform_inbox_jobs, perform_smtp_idle,
perform_smtp_jobs,
};
use deltachat::Event;
fn cb(_ctx: &Context, event: Event) {
@@ -38,75 +32,51 @@ fn main() {
println!("creating database {:?}", dbfile);
let ctx =
Context::new(Box::new(cb), "FakeOs".into(), dbfile).expect("Failed to create context");
let running = Arc::new(RwLock::new(true));
let info = ctx.get_info();
let duration = time::Duration::from_millis(4000);
let duration = time::Duration::from_millis(8000);
println!("info: {:#?}", info);
let ctx = Arc::new(ctx);
let ctx1 = ctx.clone();
let r1 = running.clone();
let t1 = thread::spawn(move || {
while *r1.read().unwrap() {
perform_inbox_jobs(&ctx1);
if *r1.read().unwrap() {
perform_inbox_fetch(&ctx1);
crossbeam::scope(|s| {
let t1 = s.spawn(|_| {
ctx.run();
});
if *r1.read().unwrap() {
perform_inbox_idle(&ctx1);
}
}
println!("configuring");
let args = std::env::args().collect::<Vec<String>>();
assert_eq!(args.len(), 2, "missing password");
let pw = args[1].clone();
ctx.set_config(config::Config::Addr, Some("d@testrun.org"))
.unwrap();
ctx.set_config(config::Config::MailPw, Some(&pw)).unwrap();
ctx.configure();
thread::sleep(duration);
println!("sending a message");
let contact_id =
Contact::create(&ctx, "dignifiedquire", "dignifiedquire@gmail.com").unwrap();
let chat_id = chat::create_by_contact_id(&ctx, contact_id).unwrap();
chat::send_text_msg(&ctx, chat_id, "Hi, here is my first message!".into()).unwrap();
println!("fetching chats..");
let chats = Chatlist::try_load(&ctx, 0, None, None).unwrap();
for i in 0..chats.len() {
let summary = chats.get_summary(&ctx, 0, None);
let text1 = summary.get_text1();
let text2 = summary.get_text2();
println!("chat: {} - {:?} - {:?}", i, text1, text2,);
}
});
let ctx1 = ctx.clone();
let r1 = running.clone();
let t2 = thread::spawn(move || {
while *r1.read().unwrap() {
perform_smtp_jobs(&ctx1);
if *r1.read().unwrap() {
perform_smtp_idle(&ctx1);
}
}
});
thread::sleep(duration);
println!("configuring");
let args = std::env::args().collect::<Vec<String>>();
assert_eq!(args.len(), 2, "missing password");
let pw = args[1].clone();
ctx.set_config(config::Config::Addr, Some("d@testrun.org"))
.unwrap();
ctx.set_config(config::Config::MailPw, Some(&pw)).unwrap();
configure(&ctx);
println!("stopping threads");
ctx.shutdown();
thread::sleep(duration);
println!("joining");
t1.join().unwrap();
println!("sending a message");
let contact_id = Contact::create(&ctx, "dignifiedquire", "dignifiedquire@gmail.com").unwrap();
let chat_id = chat::create_by_contact_id(&ctx, contact_id).unwrap();
chat::send_text_msg(&ctx, chat_id, "Hi, here is my first message!".into()).unwrap();
println!("fetching chats..");
let chats = Chatlist::try_load(&ctx, 0, None, None).unwrap();
for i in 0..chats.len() {
let summary = chats.get_summary(&ctx, 0, None);
let text1 = summary.get_text1();
let text2 = summary.get_text2();
println!("chat: {} - {:?} - {:?}", i, text1, text2,);
}
thread::sleep(duration);
println!("stopping threads");
*running.write().unwrap() = false;
deltachat::job::interrupt_inbox_idle(&ctx);
deltachat::job::interrupt_smtp_idle(&ctx);
println!("joining");
t1.join().unwrap();
t2.join().unwrap();
println!("closing");
println!("closing");
})
.unwrap();
}

View File

@@ -380,8 +380,6 @@ class Account(object):
def _export(self, path, imex_cmd):
self._imex_events_clear()
lib.dc_imex(self._dc_context, imex_cmd, as_dc_charpointer(path), ffi.NULL)
if not self._threads.is_started():
lib.dc_perform_imap_jobs(self._dc_context)
files_written = []
while True:
ev = self._imex_events.get()
@@ -410,8 +408,6 @@ class Account(object):
def _import(self, path, imex_cmd):
self._imex_events_clear()
lib.dc_imex(self._dc_context, imex_cmd, as_dc_charpointer(path), ffi.NULL)
if not self._threads.is_started():
lib.dc_perform_imap_jobs(self._dc_context)
if not self._imex_events.get():
raise ValueError("import from path '{}' failed".format(path))
@@ -560,16 +556,9 @@ class IOThreads:
def is_started(self):
return len(self._name2thread) > 0
def start(self, imap=True, smtp=True, mvbox=False, sentbox=False):
def start(self):
assert not self.is_started()
if imap:
self._start_one_thread("inbox", self.imap_thread_run)
if mvbox:
self._start_one_thread("mvbox", self.mvbox_thread_run)
if sentbox:
self._start_one_thread("sentbox", self.sentbox_thread_run)
if smtp:
self._start_one_thread("smtp", self.smtp_thread_run)
self._start_one_thread("deltachat", self.dc_thread_run)
def _start_one_thread(self, name, func):
self._name2thread[name] = t = threading.Thread(target=func, name=name)
@@ -577,59 +566,19 @@ class IOThreads:
t.start()
def stop(self, wait=False):
self._thread_quitflag = True
# Workaround for a race condition. Make sure that thread is
# not in between checking for quitflag and entering idle.
time.sleep(0.5)
lib.dc_interrupt_imap_idle(self._dc_context)
lib.dc_interrupt_smtp_idle(self._dc_context)
lib.dc_interrupt_mvbox_idle(self._dc_context)
lib.dc_interrupt_sentbox_idle(self._dc_context)
lib.dc_shutdown(self._dc_context)
if wait:
for name, thread in self._name2thread.items():
thread.join()
def imap_thread_run(self):
self._log_event("py-bindings-info", 0, "INBOX THREAD START")
while not self._thread_quitflag:
lib.dc_perform_imap_jobs(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_imap_fetch(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_imap_idle(self._dc_context)
def dc_thread_run(self):
self._log_event("py-bindings-info", 0, "DC THREAD START")
lib.dc_run(self._dc_context)
self._log_event("py-bindings-info", 0, "INBOX THREAD FINISHED")
def mvbox_thread_run(self):
self._log_event("py-bindings-info", 0, "MVBOX THREAD START")
while not self._thread_quitflag:
lib.dc_perform_mvbox_jobs(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_mvbox_fetch(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_mvbox_idle(self._dc_context)
self._log_event("py-bindings-info", 0, "MVBOX THREAD FINISHED")
def sentbox_thread_run(self):
self._log_event("py-bindings-info", 0, "SENTBOX THREAD START")
while not self._thread_quitflag:
lib.dc_perform_sentbox_jobs(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_sentbox_fetch(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_sentbox_idle(self._dc_context)
self._log_event("py-bindings-info", 0, "SENTBOX THREAD FINISHED")
def smtp_thread_run(self):
self._log_event("py-bindings-info", 0, "SMTP THREAD START")
while not self._thread_quitflag:
lib.dc_perform_smtp_jobs(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_smtp_idle(self._dc_context)
self._log_event("py-bindings-info", 0, "SMTP THREAD FINISHED")
class EventLogger:
_loglock = threading.RLock()

View File

@@ -13,10 +13,9 @@ use crate::constants::*;
use crate::context::Context;
use crate::dc_tools::*;
use crate::e2ee;
use crate::job::{self, job_add, job_kill_action};
use crate::job;
use crate::login_param::{CertificateChecks, LoginParam};
use crate::oauth2::*;
use crate::param::Params;
use auto_mozilla::moz_autoconfigure;
use auto_outlook::outlk_autodiscover;
@@ -31,21 +30,6 @@ macro_rules! progress {
};
}
// connect
pub fn configure(context: &Context) {
if context.has_ongoing() {
warn!(context, "There is already another ongoing process running.",);
return;
}
job_kill_action(context, job::Action::ConfigureImap);
job_add(context, job::Action::ConfigureImap, 0, Params::new(), 0);
}
/// Check if the context is already configured.
pub fn dc_is_configured(context: &Context) -> bool {
context.sql.get_raw_config_bool(context, "configured")
}
/*******************************************************************************
* Configure JOB
******************************************************************************/

View File

@@ -3,7 +3,10 @@
use std::collections::HashMap;
use std::ffi::OsString;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Condvar, Mutex, RwLock, atomic::{Ordering, AtomicBool}};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Condvar, Mutex, RwLock,
};
use crossbeam::channel::{bounded, unbounded, Receiver, Sender};
@@ -14,7 +17,7 @@ use crate::contact::*;
use crate::error::*;
use crate::events::Event;
use crate::imap::*;
use crate::job::*;
use crate::job;
use crate::job_thread::JobThread;
use crate::key::*;
use crate::login_param::LoginParam;
@@ -194,40 +197,40 @@ impl Context {
/// Start the run loop.
pub fn run(&self) {
use crossbeam::channel::select;
self.is_running.store(true, Ordering::Relaxed);
crossbeam::scope(|s| {
let imap_handle = s.spawn(|_| loop {
while_running!(self, {
perform_inbox_jobs(self);
job::perform_inbox_jobs(self);
while_running!(self, {
perform_inbox_fetch(self);
while_running!(self, { perform_inbox_idle(self) });
job::perform_inbox_fetch(self);
while_running!(self, { job::perform_inbox_idle(self) });
});
});
});
let mvbox_handle = s.spawn(|_| loop {
while_running!(self, {
perform_mvbox_fetch(self);
job::perform_mvbox_fetch(self);
while_running!(self, {
perform_mvbox_idle(self);
job::perform_mvbox_idle(self);
});
});
});
let sentbox_handle = s.spawn(|_| loop {
while_running!(self, {
perform_sentbox_fetch(self);
job::perform_sentbox_fetch(self);
while_running!(self, {
perform_sentbox_idle(self);
job::perform_sentbox_idle(self);
});
});
});
let smtp_handle = s.spawn(|_| loop {
while_running!(self, {
perform_smtp_jobs(self);
job::perform_smtp_jobs(self);
while_running!(self, {
perform_smtp_idle(self);
job::perform_smtp_idle(self);
});
});
});
@@ -255,6 +258,26 @@ impl Context {
pub fn shutdown(&self) {
self.is_running.store(false, Ordering::Relaxed);
self.shutdown_sender.send(()).unwrap();
job::interrupt_inbox_idle(self);
job::interrupt_mvbox_idle(self);
job::interrupt_sentbox_idle(self);
job::interrupt_smtp_idle(self);
}
// connect
pub fn configure(&self) {
if self.has_ongoing() {
warn!(self, "There is already another ongoing process running.");
return;
}
job::job_kill_action(self, job::Action::ConfigureImap);
job::job_add(self, job::Action::ConfigureImap, 0, Params::new(), 0);
}
/// Check if the context is already configured.
pub fn is_configured(&self) -> bool {
self.sql.get_raw_config_bool(self, "configured")
}
/*******************************************************************************
@@ -536,9 +559,9 @@ impl Context {
match msg.is_dc_message {
MessengerMessage::No => {}
MessengerMessage::Yes | MessengerMessage::Reply => {
job_add(
job::job_add(
self,
Action::MoveMsg,
job::Action::MoveMsg,
msg.id.to_u32() as i32,
Params::new(),
0,

View File

@@ -10,7 +10,6 @@ use crate::blob::BlobObject;
use crate::chat;
use crate::chat::delete_and_reset_all_device_msgs;
use crate::config::Config;
use crate::configure::*;
use crate::constants::*;
use crate::context::Context;
use crate::dc_tools::*;
@@ -423,7 +422,7 @@ fn import_backup(context: &Context, backup_to_import: impl AsRef<Path>) -> Resul
);
ensure!(
!dc_is_configured(context),
!context.is_configured(),
"Cannot import backups to accounts in use."
);
context.sql.close(&context);

View File

@@ -553,7 +553,7 @@ pub fn job_kill_ids(context: &Context, job_ids: &[u32]) -> sql::Result<()> {
)
}
pub fn perform_inbox_fetch(context: &Context) {
pub(crate) fn perform_inbox_fetch(context: &Context) {
let use_network = context.get_config_bool(Config::InboxWatch);
task::block_on(
@@ -565,7 +565,7 @@ pub fn perform_inbox_fetch(context: &Context) {
);
}
pub fn perform_mvbox_fetch(context: &Context) {
pub(crate) fn perform_mvbox_fetch(context: &Context) {
let use_network = context.get_config_bool(Config::MvboxWatch);
task::block_on(
@@ -577,7 +577,7 @@ pub fn perform_mvbox_fetch(context: &Context) {
);
}
pub fn perform_sentbox_fetch(context: &Context) {
pub(crate) fn perform_sentbox_fetch(context: &Context) {
let use_network = context.get_config_bool(Config::SentboxWatch);
task::block_on(
@@ -589,7 +589,7 @@ pub fn perform_sentbox_fetch(context: &Context) {
);
}
pub fn perform_inbox_idle(context: &Context) {
pub(crate) fn perform_inbox_idle(context: &Context) {
if *context.perform_inbox_jobs_needed.clone().read().unwrap() {
info!(
context,
@@ -606,7 +606,7 @@ pub fn perform_inbox_idle(context: &Context) {
.idle(context, use_network);
}
pub fn perform_mvbox_idle(context: &Context) {
pub(crate) fn perform_mvbox_idle(context: &Context) {
let use_network = context.get_config_bool(Config::MvboxWatch);
context
@@ -616,7 +616,7 @@ pub fn perform_mvbox_idle(context: &Context) {
.idle(context, use_network);
}
pub fn perform_sentbox_idle(context: &Context) {
pub(crate) fn perform_sentbox_idle(context: &Context) {
let use_network = context.get_config_bool(Config::SentboxWatch);
context
@@ -626,7 +626,7 @@ pub fn perform_sentbox_idle(context: &Context) {
.idle(context, use_network);
}
pub fn interrupt_inbox_idle(context: &Context) {
pub(crate) fn interrupt_inbox_idle(context: &Context) {
info!(context, "interrupt_inbox_idle called");
// we do not block on trying to obtain the thread lock
// because we don't know in which state the thread is.
@@ -643,11 +643,11 @@ pub fn interrupt_inbox_idle(context: &Context) {
}
}
pub fn interrupt_mvbox_idle(context: &Context) {
pub(crate) fn interrupt_mvbox_idle(context: &Context) {
context.mvbox_thread.read().unwrap().interrupt_idle(context);
}
pub fn interrupt_sentbox_idle(context: &Context) {
pub(crate) fn interrupt_sentbox_idle(context: &Context) {
context
.sentbox_thread
.read()
@@ -655,7 +655,7 @@ pub fn interrupt_sentbox_idle(context: &Context) {
.interrupt_idle(context);
}
pub fn perform_smtp_jobs(context: &Context) {
pub(crate) fn perform_smtp_jobs(context: &Context) {
let probe_smtp_network = {
let &(ref lock, _) = &*context.smtp_state.clone();
let mut state = lock.lock().unwrap();
@@ -684,7 +684,7 @@ pub fn perform_smtp_jobs(context: &Context) {
}
}
pub fn perform_smtp_idle(context: &Context) {
pub(crate) fn perform_smtp_idle(context: &Context) {
info!(context, "SMTP-idle started...",);
{
let &(ref lock, ref cvar) = &*context.smtp_state.clone();
@@ -864,7 +864,7 @@ pub fn job_send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
Ok(())
}
pub fn perform_inbox_jobs(context: &Context) {
pub(crate) fn perform_inbox_jobs(context: &Context) {
info!(context, "dc_perform_inbox_jobs starting.",);
let probe_imap_network = *context.probe_imap_network.clone().read().unwrap();
@@ -875,14 +875,6 @@ pub fn perform_inbox_jobs(context: &Context) {
info!(context, "dc_perform_inbox_jobs ended.",);
}
pub fn perform_mvbox_jobs(context: &Context) {
info!(context, "dc_perform_mbox_jobs EMPTY (for now).",);
}
pub fn perform_sentbox_jobs(context: &Context) {
info!(context, "dc_perform_sentbox_jobs EMPTY (for now).",);
}
fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
while let Some(mut job) = load_next_job(context, thread, probe_network) {
info!(context, "{}-job {} started...", thread, job);
@@ -1137,7 +1129,7 @@ pub fn job_add(
}
}
pub fn interrupt_smtp_idle(context: &Context) {
pub(crate) fn interrupt_smtp_idle(context: &Context) {
info!(context, "Interrupting SMTP-idle...",);
let &(ref lock, ref cvar) = &*context.smtp_state.clone();

View File

@@ -203,7 +203,8 @@ impl Message {
pub fn load_from_db(context: &Context, id: MsgId) -> Result<Message, Error> {
ensure!(
!id.is_special(),
"Can not load special message IDs from DB."
"Can not load special message IDs from DB. ({:?})",
id
);
context
.sql

View File

@@ -1120,7 +1120,7 @@ fn is_file_size_okay(context: &Context, msg: &Message) -> bool {
fn render_rfc724_mid(rfc724_mid: &str) -> String {
let rfc724_mid = rfc724_mid.trim().to_string();
if rfc724_mid.chars().nth(0).unwrap_or_default() == '<' {
if rfc724_mid.chars().next().unwrap_or_default() == '<' {
rfc724_mid
} else {
format!("<{}>", rfc724_mid)