diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index 5dc6d343c..d084dfc68 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -802,6 +802,23 @@ uint32_t dc_prepare_msg (dc_context_t* context, uint32_t ch */ uint32_t dc_send_msg (dc_context_t* context, uint32_t chat_id, dc_msg_t* msg); +/** + * Send a message defined by a dc_msg_t object to a chat, synchronously. + * This bypasses the IO scheduler and creates its own SMTP connection. Which means + * this is useful when the scheduler is not running. + * + * @memberof dc_context_t + * @param context The context object as returned from dc_context_new(). + * @param chat_id Chat ID to send the message to. + * If dc_prepare_msg() was called before, this parameter can be 0. + * @param msg Message object to send to the chat defined by the chat ID. + * On succcess, msg_id of the object is set up, + * The function does not take ownership of the object, + * so you have to free it using dc_msg_unref() as usual. + * @return The ID of the message that is about to be sent. 0 in case of errors. + */ +uint32_t dc_send_msg_sync (dc_context_t* context, uint32_t chat_id, dc_msg_t* msg); + /** * Send a simple text message a given chat. diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index f996db069..5e4a6b51b 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -670,6 +670,27 @@ pub unsafe extern "C" fn dc_send_msg( .to_u32() } +#[no_mangle] +pub unsafe extern "C" fn dc_send_msg_sync( + context: *mut dc_context_t, + chat_id: u32, + msg: *mut dc_msg_t, +) -> u32 { + if context.is_null() || msg.is_null() { + eprintln!("ignoring careless call to dc_send_msg_sync()"); + return 0; + } + let ctx = &mut *context; + let ffi_msg = &mut *msg; + + block_on(async move { + chat::send_msg_sync(&ctx, ChatId::new(chat_id), &mut ffi_msg.message) + .await + .unwrap_or_log_default(&ctx, "Failed to send message") + }) + .to_u32() +} + #[no_mangle] pub unsafe extern "C" fn dc_send_text_msg( context: *mut dc_context_t, diff --git a/src/chat.rs b/src/chat.rs index 00921bc6c..ae1efd74f 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -237,7 +237,8 @@ impl ChatId { }); job::kill_action(context, Action::Housekeeping).await; - job::add(context, Action::Housekeeping, 0, Params::new(), 10).await; + let j = job::Job::new(Action::Housekeeping, 0, Params::new(), 10); + job::add(context, j).await; Ok(()) } @@ -1454,11 +1455,73 @@ pub async fn send_msg( send_msg_inner(context, chat_id, msg).await } +/// Tries to send a message synchronously. +/// +/// Directly opens an smtp +/// connection and sends the message, bypassing the job system. If this fails, it writes a send job to +/// the database. +pub async fn send_msg_sync( + context: &Context, + chat_id: ChatId, + msg: &mut Message, +) -> Result { + if context.is_io_running().await { + return send_msg(context, chat_id, msg).await; + } + + if let Some(mut job) = prepare_send_msg(context, chat_id, msg).await? { + let mut smtp = crate::smtp::Smtp::new(); + + let status = job.send_msg_to_smtp(context, &mut smtp).await; + + match status { + job::Status::Finished(Ok(_)) => { + context.emit_event(Event::MsgsChanged { + chat_id: msg.chat_id, + msg_id: msg.id, + }); + + Ok(msg.id) + } + _ => { + job.save(context).await?; + Err(format_err!( + "failed to send message, queued for later sending" + )) + } + } + } else { + // Nothing to do + Ok(msg.id) + } +} + async fn send_msg_inner( context: &Context, chat_id: ChatId, msg: &mut Message, ) -> Result { + if let Some(send_job) = prepare_send_msg(context, chat_id, msg).await? { + job::add(context, send_job).await; + + context.emit_event(Event::MsgsChanged { + chat_id: msg.chat_id, + msg_id: msg.id, + }); + + if msg.param.exists(Param::SetLatitude) { + context.emit_event(Event::LocationChanged(Some(DC_CONTACT_ID_SELF))); + } + } + + Ok(msg.id) +} + +async fn prepare_send_msg( + context: &Context, + chat_id: ChatId, + msg: &mut Message, +) -> Result, Error> { // dc_prepare_msg() leaves the message state to OutPreparing, we // only have to change the state to OutPending in this case. // Otherwise we still have to prepare the message, which will set @@ -1474,18 +1537,9 @@ async fn send_msg_inner( ); message::update_msg_state(context, msg.id, MessageState::OutPending).await; } - job::send_msg(context, msg.id).await?; + let job = job::send_msg_job(context, msg.id).await?; - context.emit_event(Event::MsgsChanged { - chat_id: msg.chat_id, - msg_id: msg.id, - }); - - if msg.param.exists(Param::SetLatitude) { - context.emit_event(Event::LocationChanged(Some(DC_CONTACT_ID_SELF))); - } - - Ok(msg.id) + Ok(job) } pub async fn send_text_msg( @@ -2533,7 +2587,9 @@ pub async fn forward_msgs( let fresh10 = curr_timestamp; curr_timestamp += 1; new_msg_id = chat.prepare_msg_raw(context, &mut msg, fresh10).await?; - job::send_msg(context, new_msg_id).await?; + if let Some(send_job) = job::send_msg_job(context, new_msg_id).await? { + job::add(context, send_job).await; + } } created_chats.push(chat_id); created_msgs.push(new_msg_id); diff --git a/src/context.rs b/src/context.rs index 1fa3d6c8f..2eafdb22d 100644 --- a/src/context.rs +++ b/src/context.rs @@ -481,10 +481,7 @@ impl Context { MessengerMessage::Yes | MessengerMessage::Reply => { job::add( self, - Action::MoveMsg, - msg.id.to_u32() as i32, - Params::new(), - 0, + job::Job::new(Action::MoveMsg, msg.id.to_u32(), Params::new(), 0), ) .await; } diff --git a/src/dc_receive_imf.rs b/src/dc_receive_imf.rs index bcb6e39a2..813ed6732 100644 --- a/src/dc_receive_imf.rs +++ b/src/dc_receive_imf.rs @@ -213,10 +213,12 @@ pub async fn dc_receive_imf( for db_entry in &created_db_entries { job::add( context, - Action::DeleteMsgOnImap, - db_entry.1.to_u32() as i32, - Params::new(), - 0, + job::Job::new( + Action::DeleteMsgOnImap, + db_entry.1.to_u32(), + Params::new(), + 0, + ), ) .await; } diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 794e01d8c..d0e98cb36 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -1316,10 +1316,7 @@ async fn precheck_imf( .await; job::add( context, - Action::MarkseenMsgOnImap, - msg_id.to_u32() as i32, - Params::new(), - 0, + job::Job::new(Action::MarkseenMsgOnImap, msg_id.to_u32(), Params::new(), 0), ) .await; } diff --git a/src/job.rs b/src/job.rs index 98a7a3ad4..e8f6f0d25 100644 --- a/src/job.rs +++ b/src/job.rs @@ -28,7 +28,7 @@ use crate::location; use crate::login_param::LoginParam; use crate::message::MsgId; use crate::message::{self, Message, MessageState}; -use crate::mimefactory::{MimeFactory, RenderedEmail}; +use crate::mimefactory::MimeFactory; use crate::param::*; use crate::smtp::Smtp; use crate::sql; @@ -156,7 +156,7 @@ impl fmt::Display for Job { } impl Job { - fn new(action: Action, foreign_id: u32, param: Params, delay_seconds: i64) -> Self { + pub fn new(action: Action, foreign_id: u32, param: Params, delay_seconds: i64) -> Self { let timestamp = time(); Self { @@ -171,6 +171,10 @@ impl Job { } } + pub fn delay_seconds(&self) -> i64 { + self.desired_timestamp - self.added_timestamp + } + /// Deletes the job from the database. async fn delete(self, context: &Context) -> Result<()> { if self.job_id != 0 { @@ -186,7 +190,7 @@ impl Job { /// Saves the job to the database, creating a new entry if necessary. /// /// The Job is consumed by this method. - async fn save(self, context: &Context) -> Result<()> { + pub async fn save(self, context: &Context) -> Result<()> { let thread: Thread = self.action.into(); info!(context, "saving job for {}-thread: {:?}", thread, self); @@ -325,7 +329,7 @@ impl Job { } } - async fn send_msg_to_smtp(&mut self, context: &Context, smtp: &mut Smtp) -> Status { + pub async fn send_msg_to_smtp(&mut self, context: &Context, smtp: &mut Smtp) -> Status { // SMTP server, if not yet done if !smtp.is_connected().await { let loginparam = LoginParam::from_database(context, "configured_").await; @@ -694,8 +698,12 @@ async fn set_delivered(context: &Context, msg_id: MsgId) { context.emit_event(Event::MsgDelivered { chat_id, msg_id }); } -// special case for DC_JOB_SEND_MSG_TO_SMTP -pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> { +/// Constructs a job for sending a message. +/// +/// Returns `None` if no messages need to be sent out. +/// +/// In order to be processed, must be `add`ded. +pub async fn send_msg_job(context: &Context, msg_id: MsgId) -> Result> { let mut msg = Message::load_from_db(context, msg_id).await?; msg.try_calc_and_set_dimensions(context).await.ok(); @@ -738,7 +746,7 @@ pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> { "message {} has no recipient, skipping smtp-send", msg_id ); set_delivered(context, msg_id).await; - return Ok(()); + return Ok(None); } let rendered_msg = match mimefactory.render().await { @@ -793,16 +801,18 @@ pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> { msg.save_param_to_disk(context).await; } - add_smtp_job( - context, - Action::SendMsgToSmtp, - msg.id, - recipients, - &rendered_msg, - ) - .await?; + ensure!(!recipients.is_empty(), "no recipients for smtp job set"); + let mut param = Params::new(); + let bytes = &rendered_msg.message; + let blob = BlobObject::create(context, &rendered_msg.rfc724_mid, bytes).await?; - Ok(()) + let recipients = recipients.join("\x1e"); + param.set(Param::File, blob.as_name()); + param.set(Param::Recipients, &recipients); + + let job = create(Action::SendMsgToSmtp, msg_id.to_u32() as i32, param, 0)?; + + Ok(Some(job)) } #[derive(Debug)] @@ -986,46 +996,25 @@ async fn send_mdn(context: &Context, msg: &Message) -> Result<()> { let mut param = Params::new(); param.set(Param::MsgId, msg.id.to_u32().to_string()); - add(context, Action::SendMdn, msg.from_id as i32, param, 0).await; + add(context, Job::new(Action::SendMdn, msg.from_id, param, 0)).await; Ok(()) } -async fn add_smtp_job( - context: &Context, - action: Action, - msg_id: MsgId, - recipients: Vec, - rendered_msg: &RenderedEmail, -) -> Result<()> { - ensure!(!recipients.is_empty(), "no recipients for smtp job set"); - let mut param = Params::new(); - let bytes = &rendered_msg.message; - let blob = BlobObject::create(context, &rendered_msg.rfc724_mid, bytes).await?; +/// Creates a job. +pub fn create(action: Action, foreign_id: i32, param: Params, delay_seconds: i64) -> Result { + ensure!( + action != Action::Unknown, + "Invalid action passed to job_add" + ); - let recipients = recipients.join("\x1e"); - param.set(Param::File, blob.as_name()); - param.set(Param::Recipients, &recipients); - - add(context, action, msg_id.to_u32() as i32, param, 0).await; - - Ok(()) + Ok(Job::new(action, foreign_id as u32, param, delay_seconds)) } -/// Adds a job to the database, scheduling it `delay_seconds` after the current time. -pub async fn add( - context: &Context, - action: Action, - foreign_id: i32, - param: Params, - delay_seconds: i64, -) { - if action == Action::Unknown { - error!(context, "Invalid action passed to job_add"); - return; - } - - let job = Job::new(action, foreign_id as u32, param, delay_seconds); +/// Adds a job to the database, scheduling it. +pub async fn add(context: &Context, job: Job) { + let action = job.action; + let delay_seconds = job.delay_seconds(); job.save(context).await.unwrap_or_else(|err| { error!(context, "failed to save job: {}", err); }); diff --git a/src/location.rs b/src/location.rs index d36450bbf..c27f90c41 100644 --- a/src/location.rs +++ b/src/location.rs @@ -232,10 +232,12 @@ pub async fn send_locations_to_chat(context: &Context, chat_id: ChatId, seconds: schedule_maybe_send_locations(context, false).await; job::add( context, - job::Action::MaybeSendLocationsEnded, - chat_id.to_u32() as i32, - Params::new(), - seconds + 1, + job::Job::new( + job::Action::MaybeSendLocationsEnded, + chat_id.to_u32(), + Params::new(), + seconds + 1, + ), ) .await; } @@ -247,10 +249,7 @@ async fn schedule_maybe_send_locations(context: &Context, force_schedule: bool) if force_schedule || !job::action_exists(context, job::Action::MaybeSendLocations).await { job::add( context, - job::Action::MaybeSendLocations, - 0, - Params::new(), - 60, + job::Job::new(job::Action::MaybeSendLocations, 0, Params::new(), 60), ) .await; }; diff --git a/src/message.rs b/src/message.rs index 59951b265..1c77b4017 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1021,10 +1021,7 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) { } job::add( context, - Action::DeleteMsgOnImap, - msg_id.to_u32() as i32, - Params::new(), - 0, + job::Job::new(Action::DeleteMsgOnImap, msg_id.to_u32(), Params::new(), 0), ) .await; } @@ -1035,7 +1032,11 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) { msg_id: MsgId::new(0), }); job::kill_action(context, Action::Housekeeping).await; - job::add(context, Action::Housekeeping, 0, Params::new(), 10).await; + job::add( + context, + job::Job::new(Action::Housekeeping, 0, Params::new(), 10), + ) + .await; } } @@ -1097,10 +1098,7 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec) -> bool { job::add( context, - Action::MarkseenMsgOnImap, - id.to_u32() as i32, - Params::new(), - 0, + job::Job::new(Action::MarkseenMsgOnImap, id.to_u32(), Params::new(), 0), ) .await; send_event = true; @@ -1550,7 +1548,11 @@ pub async fn update_server_uid( #[allow(dead_code)] pub async fn dc_empty_server(context: &Context, flags: u32) { job::kill_action(context, Action::EmptyServer).await; - job::add(context, Action::EmptyServer, flags as i32, Params::new(), 0).await; + job::add( + context, + job::Job::new(Action::EmptyServer, flags, Params::new(), 0), + ) + .await; } #[cfg(test)]