feat: add chat::send_msg_sync

This commit is contained in:
dignifiedquire
2020-05-23 18:56:45 +02:00
parent e55dc2213a
commit 0ea442ca36
9 changed files with 172 additions and 92 deletions

View File

@@ -802,6 +802,23 @@ uint32_t dc_prepare_msg (dc_context_t* context, uint32_t ch
*/
uint32_t dc_send_msg (dc_context_t* context, uint32_t chat_id, dc_msg_t* msg);
/**
* Send a message defined by a dc_msg_t object to a chat, synchronously.
* This bypasses the IO scheduler and creates its own SMTP connection. Which means
* this is useful when the scheduler is not running.
*
* @memberof dc_context_t
* @param context The context object as returned from dc_context_new().
* @param chat_id Chat ID to send the message to.
* If dc_prepare_msg() was called before, this parameter can be 0.
* @param msg Message object to send to the chat defined by the chat ID.
* On succcess, msg_id of the object is set up,
* The function does not take ownership of the object,
* so you have to free it using dc_msg_unref() as usual.
* @return The ID of the message that is about to be sent. 0 in case of errors.
*/
uint32_t dc_send_msg_sync (dc_context_t* context, uint32_t chat_id, dc_msg_t* msg);
/**
* Send a simple text message a given chat.

View File

@@ -670,6 +670,27 @@ pub unsafe extern "C" fn dc_send_msg(
.to_u32()
}
#[no_mangle]
pub unsafe extern "C" fn dc_send_msg_sync(
context: *mut dc_context_t,
chat_id: u32,
msg: *mut dc_msg_t,
) -> u32 {
if context.is_null() || msg.is_null() {
eprintln!("ignoring careless call to dc_send_msg_sync()");
return 0;
}
let ctx = &mut *context;
let ffi_msg = &mut *msg;
block_on(async move {
chat::send_msg_sync(&ctx, ChatId::new(chat_id), &mut ffi_msg.message)
.await
.unwrap_or_log_default(&ctx, "Failed to send message")
})
.to_u32()
}
#[no_mangle]
pub unsafe extern "C" fn dc_send_text_msg(
context: *mut dc_context_t,

View File

@@ -237,7 +237,8 @@ impl ChatId {
});
job::kill_action(context, Action::Housekeeping).await;
job::add(context, Action::Housekeeping, 0, Params::new(), 10).await;
let j = job::Job::new(Action::Housekeeping, 0, Params::new(), 10);
job::add(context, j).await;
Ok(())
}
@@ -1454,11 +1455,73 @@ pub async fn send_msg(
send_msg_inner(context, chat_id, msg).await
}
/// Tries to send a message synchronously.
///
/// Directly opens an smtp
/// connection and sends the message, bypassing the job system. If this fails, it writes a send job to
/// the database.
pub async fn send_msg_sync(
context: &Context,
chat_id: ChatId,
msg: &mut Message,
) -> Result<MsgId, Error> {
if context.is_io_running().await {
return send_msg(context, chat_id, msg).await;
}
if let Some(mut job) = prepare_send_msg(context, chat_id, msg).await? {
let mut smtp = crate::smtp::Smtp::new();
let status = job.send_msg_to_smtp(context, &mut smtp).await;
match status {
job::Status::Finished(Ok(_)) => {
context.emit_event(Event::MsgsChanged {
chat_id: msg.chat_id,
msg_id: msg.id,
});
Ok(msg.id)
}
_ => {
job.save(context).await?;
Err(format_err!(
"failed to send message, queued for later sending"
))
}
}
} else {
// Nothing to do
Ok(msg.id)
}
}
async fn send_msg_inner(
context: &Context,
chat_id: ChatId,
msg: &mut Message,
) -> Result<MsgId, Error> {
if let Some(send_job) = prepare_send_msg(context, chat_id, msg).await? {
job::add(context, send_job).await;
context.emit_event(Event::MsgsChanged {
chat_id: msg.chat_id,
msg_id: msg.id,
});
if msg.param.exists(Param::SetLatitude) {
context.emit_event(Event::LocationChanged(Some(DC_CONTACT_ID_SELF)));
}
}
Ok(msg.id)
}
async fn prepare_send_msg(
context: &Context,
chat_id: ChatId,
msg: &mut Message,
) -> Result<Option<crate::job::Job>, Error> {
// dc_prepare_msg() leaves the message state to OutPreparing, we
// only have to change the state to OutPending in this case.
// Otherwise we still have to prepare the message, which will set
@@ -1474,18 +1537,9 @@ async fn send_msg_inner(
);
message::update_msg_state(context, msg.id, MessageState::OutPending).await;
}
job::send_msg(context, msg.id).await?;
let job = job::send_msg_job(context, msg.id).await?;
context.emit_event(Event::MsgsChanged {
chat_id: msg.chat_id,
msg_id: msg.id,
});
if msg.param.exists(Param::SetLatitude) {
context.emit_event(Event::LocationChanged(Some(DC_CONTACT_ID_SELF)));
}
Ok(msg.id)
Ok(job)
}
pub async fn send_text_msg(
@@ -2533,7 +2587,9 @@ pub async fn forward_msgs(
let fresh10 = curr_timestamp;
curr_timestamp += 1;
new_msg_id = chat.prepare_msg_raw(context, &mut msg, fresh10).await?;
job::send_msg(context, new_msg_id).await?;
if let Some(send_job) = job::send_msg_job(context, new_msg_id).await? {
job::add(context, send_job).await;
}
}
created_chats.push(chat_id);
created_msgs.push(new_msg_id);

View File

@@ -481,10 +481,7 @@ impl Context {
MessengerMessage::Yes | MessengerMessage::Reply => {
job::add(
self,
Action::MoveMsg,
msg.id.to_u32() as i32,
Params::new(),
0,
job::Job::new(Action::MoveMsg, msg.id.to_u32(), Params::new(), 0),
)
.await;
}

View File

@@ -213,10 +213,12 @@ pub async fn dc_receive_imf(
for db_entry in &created_db_entries {
job::add(
context,
Action::DeleteMsgOnImap,
db_entry.1.to_u32() as i32,
Params::new(),
0,
job::Job::new(
Action::DeleteMsgOnImap,
db_entry.1.to_u32(),
Params::new(),
0,
),
)
.await;
}

View File

@@ -1316,10 +1316,7 @@ async fn precheck_imf(
.await;
job::add(
context,
Action::MarkseenMsgOnImap,
msg_id.to_u32() as i32,
Params::new(),
0,
job::Job::new(Action::MarkseenMsgOnImap, msg_id.to_u32(), Params::new(), 0),
)
.await;
}

View File

@@ -28,7 +28,7 @@ use crate::location;
use crate::login_param::LoginParam;
use crate::message::MsgId;
use crate::message::{self, Message, MessageState};
use crate::mimefactory::{MimeFactory, RenderedEmail};
use crate::mimefactory::MimeFactory;
use crate::param::*;
use crate::smtp::Smtp;
use crate::sql;
@@ -156,7 +156,7 @@ impl fmt::Display for Job {
}
impl Job {
fn new(action: Action, foreign_id: u32, param: Params, delay_seconds: i64) -> Self {
pub fn new(action: Action, foreign_id: u32, param: Params, delay_seconds: i64) -> Self {
let timestamp = time();
Self {
@@ -171,6 +171,10 @@ impl Job {
}
}
pub fn delay_seconds(&self) -> i64 {
self.desired_timestamp - self.added_timestamp
}
/// Deletes the job from the database.
async fn delete(self, context: &Context) -> Result<()> {
if self.job_id != 0 {
@@ -186,7 +190,7 @@ impl Job {
/// Saves the job to the database, creating a new entry if necessary.
///
/// The Job is consumed by this method.
async fn save(self, context: &Context) -> Result<()> {
pub async fn save(self, context: &Context) -> Result<()> {
let thread: Thread = self.action.into();
info!(context, "saving job for {}-thread: {:?}", thread, self);
@@ -325,7 +329,7 @@ impl Job {
}
}
async fn send_msg_to_smtp(&mut self, context: &Context, smtp: &mut Smtp) -> Status {
pub async fn send_msg_to_smtp(&mut self, context: &Context, smtp: &mut Smtp) -> Status {
// SMTP server, if not yet done
if !smtp.is_connected().await {
let loginparam = LoginParam::from_database(context, "configured_").await;
@@ -694,8 +698,12 @@ async fn set_delivered(context: &Context, msg_id: MsgId) {
context.emit_event(Event::MsgDelivered { chat_id, msg_id });
}
// special case for DC_JOB_SEND_MSG_TO_SMTP
pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
/// Constructs a job for sending a message.
///
/// Returns `None` if no messages need to be sent out.
///
/// In order to be processed, must be `add`ded.
pub async fn send_msg_job(context: &Context, msg_id: MsgId) -> Result<Option<Job>> {
let mut msg = Message::load_from_db(context, msg_id).await?;
msg.try_calc_and_set_dimensions(context).await.ok();
@@ -738,7 +746,7 @@ pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
"message {} has no recipient, skipping smtp-send", msg_id
);
set_delivered(context, msg_id).await;
return Ok(());
return Ok(None);
}
let rendered_msg = match mimefactory.render().await {
@@ -793,16 +801,18 @@ pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
msg.save_param_to_disk(context).await;
}
add_smtp_job(
context,
Action::SendMsgToSmtp,
msg.id,
recipients,
&rendered_msg,
)
.await?;
ensure!(!recipients.is_empty(), "no recipients for smtp job set");
let mut param = Params::new();
let bytes = &rendered_msg.message;
let blob = BlobObject::create(context, &rendered_msg.rfc724_mid, bytes).await?;
Ok(())
let recipients = recipients.join("\x1e");
param.set(Param::File, blob.as_name());
param.set(Param::Recipients, &recipients);
let job = create(Action::SendMsgToSmtp, msg_id.to_u32() as i32, param, 0)?;
Ok(Some(job))
}
#[derive(Debug)]
@@ -986,46 +996,25 @@ async fn send_mdn(context: &Context, msg: &Message) -> Result<()> {
let mut param = Params::new();
param.set(Param::MsgId, msg.id.to_u32().to_string());
add(context, Action::SendMdn, msg.from_id as i32, param, 0).await;
add(context, Job::new(Action::SendMdn, msg.from_id, param, 0)).await;
Ok(())
}
async fn add_smtp_job(
context: &Context,
action: Action,
msg_id: MsgId,
recipients: Vec<String>,
rendered_msg: &RenderedEmail,
) -> Result<()> {
ensure!(!recipients.is_empty(), "no recipients for smtp job set");
let mut param = Params::new();
let bytes = &rendered_msg.message;
let blob = BlobObject::create(context, &rendered_msg.rfc724_mid, bytes).await?;
/// Creates a job.
pub fn create(action: Action, foreign_id: i32, param: Params, delay_seconds: i64) -> Result<Job> {
ensure!(
action != Action::Unknown,
"Invalid action passed to job_add"
);
let recipients = recipients.join("\x1e");
param.set(Param::File, blob.as_name());
param.set(Param::Recipients, &recipients);
add(context, action, msg_id.to_u32() as i32, param, 0).await;
Ok(())
Ok(Job::new(action, foreign_id as u32, param, delay_seconds))
}
/// Adds a job to the database, scheduling it `delay_seconds` after the current time.
pub async fn add(
context: &Context,
action: Action,
foreign_id: i32,
param: Params,
delay_seconds: i64,
) {
if action == Action::Unknown {
error!(context, "Invalid action passed to job_add");
return;
}
let job = Job::new(action, foreign_id as u32, param, delay_seconds);
/// Adds a job to the database, scheduling it.
pub async fn add(context: &Context, job: Job) {
let action = job.action;
let delay_seconds = job.delay_seconds();
job.save(context).await.unwrap_or_else(|err| {
error!(context, "failed to save job: {}", err);
});

View File

@@ -232,10 +232,12 @@ pub async fn send_locations_to_chat(context: &Context, chat_id: ChatId, seconds:
schedule_maybe_send_locations(context, false).await;
job::add(
context,
job::Action::MaybeSendLocationsEnded,
chat_id.to_u32() as i32,
Params::new(),
seconds + 1,
job::Job::new(
job::Action::MaybeSendLocationsEnded,
chat_id.to_u32(),
Params::new(),
seconds + 1,
),
)
.await;
}
@@ -247,10 +249,7 @@ async fn schedule_maybe_send_locations(context: &Context, force_schedule: bool)
if force_schedule || !job::action_exists(context, job::Action::MaybeSendLocations).await {
job::add(
context,
job::Action::MaybeSendLocations,
0,
Params::new(),
60,
job::Job::new(job::Action::MaybeSendLocations, 0, Params::new(), 60),
)
.await;
};

View File

@@ -1021,10 +1021,7 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) {
}
job::add(
context,
Action::DeleteMsgOnImap,
msg_id.to_u32() as i32,
Params::new(),
0,
job::Job::new(Action::DeleteMsgOnImap, msg_id.to_u32(), Params::new(), 0),
)
.await;
}
@@ -1035,7 +1032,11 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) {
msg_id: MsgId::new(0),
});
job::kill_action(context, Action::Housekeeping).await;
job::add(context, Action::Housekeeping, 0, Params::new(), 10).await;
job::add(
context,
job::Job::new(Action::Housekeeping, 0, Params::new(), 10),
)
.await;
}
}
@@ -1097,10 +1098,7 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> bool {
job::add(
context,
Action::MarkseenMsgOnImap,
id.to_u32() as i32,
Params::new(),
0,
job::Job::new(Action::MarkseenMsgOnImap, id.to_u32(), Params::new(), 0),
)
.await;
send_event = true;
@@ -1550,7 +1548,11 @@ pub async fn update_server_uid(
#[allow(dead_code)]
pub async fn dc_empty_server(context: &Context, flags: u32) {
job::kill_action(context, Action::EmptyServer).await;
job::add(context, Action::EmptyServer, flags as i32, Params::new(), 0).await;
job::add(
context,
job::Job::new(Action::EmptyServer, flags, Params::new(), 0),
)
.await;
}
#[cfg(test)]