mirror of
https://github.com/chatmail/core.git
synced 2026-05-19 23:06:32 +03:00
Simplify calls table with single sdp column instead of offer_sdp and answer_sdp
Co-authored-by: link2xt <18373967+link2xt@users.noreply.github.com>
This commit is contained in:
44
src/calls.rs
44
src/calls.rs
@@ -220,13 +220,8 @@ impl Context {
|
|||||||
|
|
||||||
call.id = send_msg(self, chat_id, &mut call).await?;
|
call.id = send_msg(self, chat_id, &mut call).await?;
|
||||||
|
|
||||||
// Store SDP offer in calls table
|
// For outgoing calls, we don't store our own offer SDP in the database.
|
||||||
self.sql
|
// It's only kept in memory for sending the message.
|
||||||
.execute(
|
|
||||||
"INSERT OR REPLACE INTO calls (msg_id, offer_sdp) VALUES (?, ?)",
|
|
||||||
(call.id, &place_call_info),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let wait = RINGING_SECONDS;
|
let wait = RINGING_SECONDS;
|
||||||
task::spawn(Context::emit_end_call_if_unaccepted(
|
task::spawn(Context::emit_end_call_if_unaccepted(
|
||||||
@@ -259,10 +254,10 @@ impl Context {
|
|||||||
chat.id.accept(self).await?;
|
chat.id.accept(self).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store SDP answer in calls table
|
// Store our answer SDP in calls table (replacing the offer from the other side)
|
||||||
self.sql
|
self.sql
|
||||||
.execute(
|
.execute(
|
||||||
"UPDATE calls SET answer_sdp=? WHERE msg_id=?",
|
"UPDATE calls SET sdp=? WHERE msg_id=?",
|
||||||
(&accept_call_info, call_id),
|
(&accept_call_info, call_id),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
@@ -363,11 +358,11 @@ impl Context {
|
|||||||
from_id: ContactId,
|
from_id: ContactId,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
if mime_message.is_call() {
|
if mime_message.is_call() {
|
||||||
// Extract SDP from message headers and store in calls table
|
// Extract SDP offer from message headers and store in calls table for incoming calls
|
||||||
if let Some(offer_sdp) = mime_message.get_header(HeaderDef::ChatWebrtcRoom) {
|
if let Some(offer_sdp) = mime_message.get_header(HeaderDef::ChatWebrtcRoom) {
|
||||||
self.sql
|
self.sql
|
||||||
.execute(
|
.execute(
|
||||||
"INSERT OR IGNORE INTO calls (msg_id, offer_sdp) VALUES (?, ?)",
|
"INSERT OR IGNORE INTO calls (msg_id, sdp) VALUES (?, ?)",
|
||||||
(call_id, offer_sdp),
|
(call_id, offer_sdp),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
@@ -437,12 +432,13 @@ impl Context {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store SDP answer in calls table
|
// Store SDP answer in calls table for outgoing calls
|
||||||
|
// (for incoming calls, we've already replaced our offer with our answer in accept_incoming_call)
|
||||||
if let Some(answer_sdp) = mime_message.get_header(HeaderDef::ChatWebrtcAccepted) {
|
if let Some(answer_sdp) = mime_message.get_header(HeaderDef::ChatWebrtcAccepted) {
|
||||||
self.sql
|
self.sql
|
||||||
.execute(
|
.execute(
|
||||||
"UPDATE calls SET answer_sdp=? WHERE msg_id=?",
|
"INSERT OR REPLACE INTO calls (msg_id, sdp) VALUES (?, ?)",
|
||||||
(answer_sdp, call.msg.id),
|
(call.msg.id, answer_sdp),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
@@ -539,20 +535,26 @@ impl Context {
|
|||||||
// Load SDP from calls table. Returns empty strings if no record exists,
|
// Load SDP from calls table. Returns empty strings if no record exists,
|
||||||
// which can happen for old messages from before the migration or for
|
// which can happen for old messages from before the migration or for
|
||||||
// calls where SDPs have been cleaned up by housekeeping.
|
// calls where SDPs have been cleaned up by housekeeping.
|
||||||
let (place_call_info, accept_call_info) = self
|
// For incoming calls, the SDP is the offer from the other side.
|
||||||
|
// For outgoing calls (after acceptance), the SDP is the answer from the other side.
|
||||||
|
let sdp = self
|
||||||
.sql
|
.sql
|
||||||
.query_row_optional(
|
.query_row_optional(
|
||||||
"SELECT offer_sdp, answer_sdp FROM calls WHERE msg_id=?",
|
"SELECT sdp FROM calls WHERE msg_id=?",
|
||||||
(call.id,),
|
(call.id,),
|
||||||
|row| {
|
|row| row.get::<_, String>(0),
|
||||||
let offer: Option<String> = row.get(0)?;
|
|
||||||
let answer: Option<String> = row.get(1)?;
|
|
||||||
Ok((offer.unwrap_or_default(), answer.unwrap_or_default()))
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
let (place_call_info, accept_call_info) = if call.from_id == ContactId::SELF {
|
||||||
|
// Outgoing call: the stored SDP (if any) is the answer from the other side
|
||||||
|
(String::new(), sdp)
|
||||||
|
} else {
|
||||||
|
// Incoming call: the stored SDP is the offer from the other side
|
||||||
|
(sdp, String::new())
|
||||||
|
};
|
||||||
|
|
||||||
Ok(Some(CallInfo {
|
Ok(Some(CallInfo {
|
||||||
place_call_info,
|
place_call_info,
|
||||||
accept_call_info,
|
accept_call_info,
|
||||||
|
|||||||
@@ -682,21 +682,34 @@ async fn test_housekeeping_deletes_old_call_sdps() -> Result<()> {
|
|||||||
let alice = TestContext::new_alice().await;
|
let alice = TestContext::new_alice().await;
|
||||||
let bob = alice.create_chat_with_contact("", "bob@example.net").await;
|
let bob = alice.create_chat_with_contact("", "bob@example.net").await;
|
||||||
|
|
||||||
// Place a call
|
// Simulate receiving an incoming call from Bob
|
||||||
let call_id = alice
|
let received_call = receive_imf(
|
||||||
.place_outgoing_call(bob.id, PLACE_INFO.to_string())
|
&alice,
|
||||||
.await?;
|
b"From: bob@example.net\n\
|
||||||
|
To: alice@example.org\n\
|
||||||
|
Message-ID: <incoming-call@example.net>\n\
|
||||||
|
Chat-Version: 1.0\n\
|
||||||
|
Chat-Content: call\n\
|
||||||
|
Chat-Webrtc-Room: dGVzdC1zZHAtb2ZmZXI=\n\
|
||||||
|
\n\
|
||||||
|
Hello, this is a call\n",
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let call_id = received_call.msg_ids[0];
|
||||||
|
|
||||||
// Verify SDP is stored in calls table
|
// Verify SDP is stored in calls table for incoming call
|
||||||
let sdp_before: Option<String> = alice
|
let sdp_before: Option<String> = alice
|
||||||
.sql
|
.sql
|
||||||
.query_row_optional(
|
.query_row_optional(
|
||||||
"SELECT offer_sdp FROM calls WHERE msg_id=?",
|
"SELECT sdp FROM calls WHERE msg_id=?",
|
||||||
(call_id,),
|
(call_id,),
|
||||||
|row| row.get(0),
|
|row| row.get(0),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
assert_eq!(sdp_before, Some(PLACE_INFO.to_string()));
|
assert!(sdp_before.is_some());
|
||||||
|
|
||||||
// End the call
|
// End the call
|
||||||
alice.end_call(call_id).await?;
|
alice.end_call(call_id).await?;
|
||||||
@@ -709,12 +722,12 @@ async fn test_housekeeping_deletes_old_call_sdps() -> Result<()> {
|
|||||||
let sdp_after_end: Option<String> = alice
|
let sdp_after_end: Option<String> = alice
|
||||||
.sql
|
.sql
|
||||||
.query_row_optional(
|
.query_row_optional(
|
||||||
"SELECT offer_sdp FROM calls WHERE msg_id=?",
|
"SELECT sdp FROM calls WHERE msg_id=?",
|
||||||
(call_id,),
|
(call_id,),
|
||||||
|row| row.get(0),
|
|row| row.get(0),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
assert_eq!(sdp_after_end, Some(PLACE_INFO.to_string()));
|
assert!(sdp_after_end.is_some());
|
||||||
|
|
||||||
// Simulate passage of time - shift forward by 24 hours + 1 second
|
// Simulate passage of time - shift forward by 24 hours + 1 second
|
||||||
SystemTime::shift(Duration::from_secs(86400 + 1));
|
SystemTime::shift(Duration::from_secs(86400 + 1));
|
||||||
@@ -726,7 +739,7 @@ async fn test_housekeeping_deletes_old_call_sdps() -> Result<()> {
|
|||||||
let sdp_after_housekeeping: Option<String> = alice
|
let sdp_after_housekeeping: Option<String> = alice
|
||||||
.sql
|
.sql
|
||||||
.query_row_optional(
|
.query_row_optional(
|
||||||
"SELECT offer_sdp FROM calls WHERE msg_id=?",
|
"SELECT sdp FROM calls WHERE msg_id=?",
|
||||||
(call_id,),
|
(call_id,),
|
||||||
|row| row.get(0),
|
|row| row.get(0),
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1694,10 +1694,11 @@ impl MimeFactory {
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if let Some(quoted_msg_id) = quoted_msg_id {
|
if let Some(quoted_msg_id) = quoted_msg_id {
|
||||||
|
// For CallAccepted messages, retrieve the SDP (which is our answer)
|
||||||
let answer_sdp = context
|
let answer_sdp = context
|
||||||
.sql
|
.sql
|
||||||
.query_row_optional(
|
.query_row_optional(
|
||||||
"SELECT answer_sdp FROM calls WHERE msg_id=?",
|
"SELECT sdp FROM calls WHERE msg_id=?",
|
||||||
(quoted_msg_id,),
|
(quoted_msg_id,),
|
||||||
|row| row.get::<_, Option<String>>(0),
|
|row| row.get::<_, Option<String>>(0),
|
||||||
)
|
)
|
||||||
@@ -1750,24 +1751,15 @@ impl MimeFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if msg.viewtype == Viewtype::Call {
|
if msg.viewtype == Viewtype::Call {
|
||||||
// Get SDP offer from the message field (if being sent), calls table, or params (for old messages)
|
// Get SDP offer from the message field (if being sent) or params (for old messages).
|
||||||
|
// For outgoing calls, we don't store the offer in the database, only in memory.
|
||||||
|
// For incoming calls that are stored, we could query the database, but we typically
|
||||||
|
// only render outgoing call messages where we use the call_sdp_offer field.
|
||||||
let offer_sdp = if let Some(ref offer) = msg.call_sdp_offer {
|
let offer_sdp = if let Some(ref offer) = msg.call_sdp_offer {
|
||||||
Some(offer.clone())
|
Some(offer.clone())
|
||||||
} else if !msg.id.is_unset() {
|
|
||||||
// Try to get from calls table if message is already stored
|
|
||||||
context
|
|
||||||
.sql
|
|
||||||
.query_row_optional(
|
|
||||||
"SELECT offer_sdp FROM calls WHERE msg_id=?",
|
|
||||||
(msg.id,),
|
|
||||||
|row| row.get::<_, Option<String>>(0),
|
|
||||||
)
|
|
||||||
.await?
|
|
||||||
.flatten()
|
|
||||||
} else {
|
} else {
|
||||||
None
|
msg.param.get(Param::WebrtcRoom).map(|s| s.to_string())
|
||||||
}
|
};
|
||||||
.or_else(|| msg.param.get(Param::WebrtcRoom).map(|s| s.to_string()));
|
|
||||||
|
|
||||||
if let Some(offer_sdp) = offer_sdp {
|
if let Some(offer_sdp) = offer_sdp {
|
||||||
headers.push((
|
headers.push((
|
||||||
|
|||||||
@@ -1344,8 +1344,7 @@ CREATE INDEX gossip_timestamp_index ON gossip_timestamp (chat_id, fingerprint);
|
|||||||
sql.execute_migration(
|
sql.execute_migration(
|
||||||
"CREATE TABLE calls(
|
"CREATE TABLE calls(
|
||||||
msg_id INTEGER PRIMARY KEY REFERENCES msgs(id) ON DELETE CASCADE,
|
msg_id INTEGER PRIMARY KEY REFERENCES msgs(id) ON DELETE CASCADE,
|
||||||
offer_sdp TEXT,
|
sdp TEXT NOT NULL,
|
||||||
answer_sdp TEXT,
|
|
||||||
ended_timestamp INTEGER
|
ended_timestamp INTEGER
|
||||||
) STRICT;",
|
) STRICT;",
|
||||||
migration_version,
|
migration_version,
|
||||||
|
|||||||
Reference in New Issue
Block a user