mirror of
https://github.com/chatmail/core.git
synced 2026-05-22 16:26:31 +03:00
Add calls table to store SDPs separately from message params
Co-authored-by: link2xt <18373967+link2xt@users.noreply.github.com>
This commit is contained in:
82
src/calls.rs
82
src/calls.rs
@@ -195,9 +195,24 @@ impl Context {
|
|||||||
text: "Outgoing call".into(),
|
text: "Outgoing call".into(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Set a placeholder parameter so the message is recognized as a call
|
||||||
|
// This will be used by mimefactory until the DB entry is available
|
||||||
call.param.set(Param::WebrtcRoom, &place_call_info);
|
call.param.set(Param::WebrtcRoom, &place_call_info);
|
||||||
call.id = send_msg(self, chat_id, &mut call).await?;
|
call.id = send_msg(self, chat_id, &mut call).await?;
|
||||||
|
|
||||||
|
// Store SDP offer in calls table and remove from params
|
||||||
|
self.sql
|
||||||
|
.execute(
|
||||||
|
"INSERT OR REPLACE INTO calls (msg_id, offer_sdp) VALUES (?, ?)",
|
||||||
|
(call.id, &place_call_info),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Remove SDP from message params for privacy
|
||||||
|
call.param.remove(Param::WebrtcRoom);
|
||||||
|
call.update_param(self).await?;
|
||||||
|
|
||||||
let wait = RINGING_SECONDS;
|
let wait = RINGING_SECONDS;
|
||||||
task::spawn(Context::emit_end_call_if_unaccepted(
|
task::spawn(Context::emit_end_call_if_unaccepted(
|
||||||
self.clone(),
|
self.clone(),
|
||||||
@@ -229,6 +244,14 @@ impl Context {
|
|||||||
chat.id.accept(self).await?;
|
chat.id.accept(self).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Store SDP answer in calls table
|
||||||
|
self.sql
|
||||||
|
.execute(
|
||||||
|
"UPDATE calls SET answer_sdp=? WHERE msg_id=?",
|
||||||
|
(&accept_call_info, call_id),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
// send an acceptance message around: to the caller as well as to the other devices of the callee
|
// send an acceptance message around: to the caller as well as to the other devices of the callee
|
||||||
let mut msg = Message {
|
let mut msg = Message {
|
||||||
viewtype: Viewtype::Text,
|
viewtype: Viewtype::Text,
|
||||||
@@ -237,8 +260,6 @@ impl Context {
|
|||||||
};
|
};
|
||||||
msg.param.set_cmd(SystemMessage::CallAccepted);
|
msg.param.set_cmd(SystemMessage::CallAccepted);
|
||||||
msg.hidden = true;
|
msg.hidden = true;
|
||||||
msg.param
|
|
||||||
.set(Param::WebrtcAccepted, accept_call_info.to_string());
|
|
||||||
msg.set_quote(self, Some(&call.msg)).await?;
|
msg.set_quote(self, Some(&call.msg)).await?;
|
||||||
msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
|
msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
|
||||||
self.emit_event(EventType::IncomingCallAccepted {
|
self.emit_event(EventType::IncomingCallAccepted {
|
||||||
@@ -327,6 +348,16 @@ impl Context {
|
|||||||
from_id: ContactId,
|
from_id: ContactId,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
if mime_message.is_call() {
|
if mime_message.is_call() {
|
||||||
|
// Extract SDP from message headers and store in calls table
|
||||||
|
if let Some(offer_sdp) = mime_message.get_header(HeaderDef::ChatWebrtcRoom) {
|
||||||
|
self.sql
|
||||||
|
.execute(
|
||||||
|
"INSERT OR IGNORE INTO calls (msg_id, offer_sdp) VALUES (?, ?)",
|
||||||
|
(call_id, offer_sdp),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
let Some(call) = self.load_call_by_id(call_id).await? else {
|
let Some(call) = self.load_call_by_id(call_id).await? else {
|
||||||
warn!(self, "{call_id} does not refer to a call message");
|
warn!(self, "{call_id} does not refer to a call message");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
@@ -391,6 +422,16 @@ impl Context {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Store SDP answer in calls table
|
||||||
|
if let Some(answer_sdp) = mime_message.get_header(HeaderDef::ChatWebrtcAccepted) {
|
||||||
|
self.sql
|
||||||
|
.execute(
|
||||||
|
"UPDATE calls SET answer_sdp=? WHERE msg_id=?",
|
||||||
|
(answer_sdp, call.msg.id),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
call.mark_as_accepted(self).await?;
|
call.mark_as_accepted(self).await?;
|
||||||
self.emit_msgs_changed(call.msg.chat_id, call_id);
|
self.emit_msgs_changed(call.msg.chat_id, call_id);
|
||||||
if call.is_incoming() {
|
if call.is_incoming() {
|
||||||
@@ -463,33 +504,40 @@ impl Context {
|
|||||||
/// not a call message, returns `None`.
|
/// not a call message, returns `None`.
|
||||||
pub async fn load_call_by_id(&self, call_id: MsgId) -> Result<Option<CallInfo>> {
|
pub async fn load_call_by_id(&self, call_id: MsgId) -> Result<Option<CallInfo>> {
|
||||||
let call = Message::load_from_db(self, call_id).await?;
|
let call = Message::load_from_db(self, call_id).await?;
|
||||||
Ok(self.load_call_by_message(call))
|
self.load_call_by_message(call).await
|
||||||
}
|
}
|
||||||
|
|
||||||
// Loads information about the call given the `Message`.
|
// Loads information about the call given the `Message`.
|
||||||
//
|
//
|
||||||
// If the `Message` is not a call message, returns `None`
|
// If the `Message` is not a call message, returns `None`
|
||||||
fn load_call_by_message(&self, call: Message) -> Option<CallInfo> {
|
async fn load_call_by_message(&self, call: Message) -> Result<Option<CallInfo>> {
|
||||||
if call.viewtype != Viewtype::Call {
|
if call.viewtype != Viewtype::Call {
|
||||||
// This can happen e.g. if a "call accepted"
|
// This can happen e.g. if a "call accepted"
|
||||||
// or "call ended" message is received
|
// or "call ended" message is received
|
||||||
// with `In-Reply-To` referring to non-call message.
|
// with `In-Reply-To` referring to non-call message.
|
||||||
return None;
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
Some(CallInfo {
|
// Load SDP from calls table
|
||||||
place_call_info: call
|
let (place_call_info, accept_call_info) = self
|
||||||
.param
|
.sql
|
||||||
.get(Param::WebrtcRoom)
|
.query_row_optional(
|
||||||
.unwrap_or_default()
|
"SELECT offer_sdp, answer_sdp FROM calls WHERE msg_id=?",
|
||||||
.to_string(),
|
(call.id,),
|
||||||
accept_call_info: call
|
|row| {
|
||||||
.param
|
let offer: Option<String> = row.get(0)?;
|
||||||
.get(Param::WebrtcAccepted)
|
let answer: Option<String> = row.get(1)?;
|
||||||
.unwrap_or_default()
|
Ok((offer.unwrap_or_default(), answer.unwrap_or_default()))
|
||||||
.to_string(),
|
},
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
Ok(Some(CallInfo {
|
||||||
|
place_call_info,
|
||||||
|
accept_call_info,
|
||||||
msg: call,
|
msg: call,
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1680,6 +1680,27 @@ impl MimeFactory {
|
|||||||
"Chat-Content",
|
"Chat-Content",
|
||||||
mail_builder::headers::raw::Raw::new("call-accepted").into(),
|
mail_builder::headers::raw::Raw::new("call-accepted").into(),
|
||||||
));
|
));
|
||||||
|
// Get SDP answer from the referenced call message in calls table,
|
||||||
|
// or fall back to params if not yet migrated
|
||||||
|
if let Some(ref quoted_msg_id) = msg.in_reply_to {
|
||||||
|
let answer_sdp = context
|
||||||
|
.sql
|
||||||
|
.query_row_optional(
|
||||||
|
"SELECT answer_sdp FROM calls WHERE msg_id=?",
|
||||||
|
(quoted_msg_id,),
|
||||||
|
|row| row.get::<_, Option<String>>(0),
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
.flatten()
|
||||||
|
.or_else(|| msg.param.get(Param::WebrtcAccepted).map(|s| s.to_string()));
|
||||||
|
|
||||||
|
if let Some(answer_sdp) = answer_sdp {
|
||||||
|
headers.push((
|
||||||
|
"Chat-Webrtc-Accepted",
|
||||||
|
mail_builder::headers::raw::Raw::new(b_encode(&answer_sdp)).into(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
SystemMessage::CallEnded => {
|
SystemMessage::CallEnded => {
|
||||||
headers.push((
|
headers.push((
|
||||||
@@ -1716,16 +1737,25 @@ impl MimeFactory {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(offer) = msg.param.get(Param::WebrtcRoom) {
|
if msg.viewtype == Viewtype::Call {
|
||||||
headers.push((
|
// Get SDP offer from calls table, or fall back to params if not yet migrated
|
||||||
"Chat-Webrtc-Room",
|
let offer_sdp = context
|
||||||
mail_builder::headers::raw::Raw::new(b_encode(offer)).into(),
|
.sql
|
||||||
));
|
.query_row_optional(
|
||||||
} else if let Some(answer) = msg.param.get(Param::WebrtcAccepted) {
|
"SELECT offer_sdp FROM calls WHERE msg_id=?",
|
||||||
headers.push((
|
(msg.id,),
|
||||||
"Chat-Webrtc-Accepted",
|
|row| row.get::<_, Option<String>>(0),
|
||||||
mail_builder::headers::raw::Raw::new(b_encode(answer)).into(),
|
)
|
||||||
));
|
.await?
|
||||||
|
.flatten()
|
||||||
|
.or_else(|| msg.param.get(Param::WebrtcRoom).map(|s| s.to_string()));
|
||||||
|
|
||||||
|
if let Some(offer_sdp) = offer_sdp {
|
||||||
|
headers.push((
|
||||||
|
"Chat-Webrtc-Room",
|
||||||
|
mail_builder::headers::raw::Raw::new(b_encode(&offer_sdp)).into(),
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if msg.viewtype == Viewtype::Voice
|
if msg.viewtype == Viewtype::Voice
|
||||||
|
|||||||
22
src/sql.rs
22
src/sql.rs
@@ -921,6 +921,28 @@ pub async fn housekeeping(context: &Context) -> Result<()> {
|
|||||||
.log_err(context)
|
.log_err(context)
|
||||||
.ok();
|
.ok();
|
||||||
|
|
||||||
|
// Delete call SDPs for ended calls (older than 24 hours) or orphaned calls.
|
||||||
|
// Ended calls have Param::Arg4 (H=timestamp) set in their params.
|
||||||
|
// We clean up calls that ended more than 24 hours ago to protect privacy
|
||||||
|
// as SDPs contain IP addresses.
|
||||||
|
context
|
||||||
|
.sql
|
||||||
|
.execute(
|
||||||
|
"DELETE FROM calls WHERE msg_id IN (
|
||||||
|
SELECT calls.msg_id FROM calls
|
||||||
|
LEFT JOIN msgs ON calls.msg_id = msgs.id
|
||||||
|
WHERE msgs.id IS NULL
|
||||||
|
OR msgs.chat_id = ?
|
||||||
|
OR (msgs.param LIKE '%H=%'
|
||||||
|
AND msgs.timestamp_sent < ?)
|
||||||
|
)",
|
||||||
|
(DC_CHAT_ID_TRASH, time() - 86400),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.context("Failed to delete old call SDPs")
|
||||||
|
.log_err(context)
|
||||||
|
.ok();
|
||||||
|
|
||||||
info!(context, "Housekeeping done.");
|
info!(context, "Housekeeping done.");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1339,6 +1339,21 @@ CREATE INDEX gossip_timestamp_index ON gossip_timestamp (chat_id, fingerprint);
|
|||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inc_and_check(&mut migration_version, 139)?;
|
||||||
|
if dbversion < migration_version {
|
||||||
|
sql.execute_migration(
|
||||||
|
"CREATE TABLE calls(
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
msg_id INTEGER NOT NULL UNIQUE,
|
||||||
|
offer_sdp TEXT,
|
||||||
|
answer_sdp TEXT
|
||||||
|
);
|
||||||
|
CREATE INDEX calls_msg_id_index ON calls (msg_id);",
|
||||||
|
migration_version,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
let new_version = sql
|
let new_version = sql
|
||||||
.get_raw_config_int(VERSION_CFG)
|
.get_raw_config_int(VERSION_CFG)
|
||||||
.await?
|
.await?
|
||||||
|
|||||||
Reference in New Issue
Block a user