Compare commits

...

11 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
ac78f76602 Fix unused variable warning and run cargo fmt
Co-authored-by: link2xt <18373967+link2xt@users.noreply.github.com>
2025-11-07 00:07:51 +00:00
copilot-swe-agent[bot]
94c373368c Simplify calls table with single sdp column instead of offer_sdp and answer_sdp
Co-authored-by: link2xt <18373967+link2xt@users.noreply.github.com>
2025-11-06 23:10:34 +00:00
copilot-swe-agent[bot]
76db7853ff Use SystemTime::shift instead of manually rewinding timestamps
Co-authored-by: link2xt <18373967+link2xt@users.noreply.github.com>
2025-11-06 22:44:31 +00:00
copilot-swe-agent[bot]
d946979b88 Use UPSERT for ended_timestamp to avoid replacing SDP data
Co-authored-by: link2xt <18373967+link2xt@users.noreply.github.com>
2025-11-06 22:34:57 +00:00
copilot-swe-agent[bot]
12b21c6cb3 Fix test to use ended_timestamp instead of timestamp_sent
Co-authored-by: link2xt <18373967+link2xt@users.noreply.github.com>
2025-11-06 22:31:12 +00:00
copilot-swe-agent[bot]
1a04fc5db3 Address review feedback: use Message field for SDP, remove table id, store ended_timestamp
Co-authored-by: link2xt <18373967+link2xt@users.noreply.github.com>
2025-11-06 22:28:26 +00:00
copilot-swe-agent[bot]
55cf576e13 Add clarifying comments and improve documentation based on code review
Co-authored-by: link2xt <18373967+link2xt@users.noreply.github.com>
2025-11-06 20:17:01 +00:00
copilot-swe-agent[bot]
c6a871c64a Improve calls table with FOREIGN KEY, STRICT mode, and better housekeeping query
Co-authored-by: link2xt <18373967+link2xt@users.noreply.github.com>
2025-11-06 20:14:49 +00:00
copilot-swe-agent[bot]
19be18dcbf Add test for housekeeping cleanup of old call SDPs
Co-authored-by: link2xt <18373967+link2xt@users.noreply.github.com>
2025-11-06 20:11:06 +00:00
copilot-swe-agent[bot]
c556b07380 Add calls table to store SDPs separately from message params
Co-authored-by: link2xt <18373967+link2xt@users.noreply.github.com>
2025-11-06 20:06:47 +00:00
copilot-swe-agent[bot]
abece73db1 Initial plan 2025-11-06 19:33:55 +00:00
6 changed files with 257 additions and 30 deletions

View File

@@ -135,8 +135,19 @@ impl CallInfo {
}
async fn mark_as_ended(&mut self, context: &Context) -> Result<()> {
self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, time());
let now = time();
self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, now);
self.msg.update_param(context).await?;
// Store ended timestamp in calls table. If no entry exists yet, create one.
context
.sql
.execute(
"INSERT INTO calls (msg_id, ended_timestamp) VALUES (?, ?)
ON CONFLICT(msg_id) DO UPDATE SET ended_timestamp=excluded.ended_timestamp",
(self.msg.id, now),
)
.await?;
Ok(())
}
@@ -152,6 +163,16 @@ impl CallInfo {
self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, now);
self.msg.param.set_i64(CALL_CANCELED_TIMESTAMP, now);
self.msg.update_param(context).await?;
// Store ended timestamp in calls table. If no entry exists yet, create one.
context
.sql
.execute(
"INSERT INTO calls (msg_id, ended_timestamp) VALUES (?, ?)
ON CONFLICT(msg_id) DO UPDATE SET ended_timestamp=excluded.ended_timestamp",
(self.msg.id, now),
)
.await?;
Ok(())
}
@@ -193,11 +214,15 @@ impl Context {
let mut call = Message {
viewtype: Viewtype::Call,
text: "Outgoing call".into(),
call_sdp_offer: Some(place_call_info.clone()),
..Default::default()
};
call.param.set(Param::WebrtcRoom, &place_call_info);
call.id = send_msg(self, chat_id, &mut call).await?;
// For outgoing calls, we don't store our own offer SDP in the database.
// It's only kept in memory for sending the message.
let wait = RINGING_SECONDS;
task::spawn(Context::emit_end_call_if_unaccepted(
self.clone(),
@@ -229,6 +254,14 @@ impl Context {
chat.id.accept(self).await?;
}
// Store our answer SDP in calls table (replacing the offer from the other side)
self.sql
.execute(
"UPDATE calls SET sdp=? WHERE msg_id=?",
(&accept_call_info, call_id),
)
.await?;
// send an acceptance message around: to the caller as well as to the other devices of the callee
let mut msg = Message {
viewtype: Viewtype::Text,
@@ -237,8 +270,6 @@ impl Context {
};
msg.param.set_cmd(SystemMessage::CallAccepted);
msg.hidden = true;
msg.param
.set(Param::WebrtcAccepted, accept_call_info.to_string());
msg.set_quote(self, Some(&call.msg)).await?;
msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
self.emit_event(EventType::IncomingCallAccepted {
@@ -327,6 +358,16 @@ impl Context {
from_id: ContactId,
) -> Result<()> {
if mime_message.is_call() {
// Extract SDP offer from message headers and store in calls table for incoming calls
if let Some(offer_sdp) = mime_message.get_header(HeaderDef::ChatWebrtcRoom) {
self.sql
.execute(
"INSERT OR IGNORE INTO calls (msg_id, sdp) VALUES (?, ?)",
(call_id, offer_sdp),
)
.await?;
}
let Some(call) = self.load_call_by_id(call_id).await? else {
warn!(self, "{call_id} does not refer to a call message");
return Ok(());
@@ -391,6 +432,18 @@ impl Context {
return Ok(());
}
// Store SDP answer in calls table for outgoing calls
// (for incoming calls, we've already replaced our offer with our answer in accept_incoming_call)
if let Some(answer_sdp) = mime_message.get_header(HeaderDef::ChatWebrtcAccepted)
{
self.sql
.execute(
"INSERT OR REPLACE INTO calls (msg_id, sdp) VALUES (?, ?)",
(call.msg.id, answer_sdp),
)
.await?;
}
call.mark_as_accepted(self).await?;
self.emit_msgs_changed(call.msg.chat_id, call_id);
if call.is_incoming() {
@@ -463,33 +516,49 @@ impl Context {
/// not a call message, returns `None`.
pub async fn load_call_by_id(&self, call_id: MsgId) -> Result<Option<CallInfo>> {
let call = Message::load_from_db(self, call_id).await?;
Ok(self.load_call_by_message(call))
self.load_call_by_message(call).await
}
// Loads information about the call given the `Message`.
//
// If the `Message` is not a call message, returns `None`
fn load_call_by_message(&self, call: Message) -> Option<CallInfo> {
// If the `Message` is not a call message, returns `None`.
//
// This function is async because it queries the calls table
// to retrieve SDP offers and answers.
async fn load_call_by_message(&self, call: Message) -> Result<Option<CallInfo>> {
if call.viewtype != Viewtype::Call {
// This can happen e.g. if a "call accepted"
// or "call ended" message is received
// with `In-Reply-To` referring to non-call message.
return None;
return Ok(None);
}
Some(CallInfo {
place_call_info: call
.param
.get(Param::WebrtcRoom)
.unwrap_or_default()
.to_string(),
accept_call_info: call
.param
.get(Param::WebrtcAccepted)
.unwrap_or_default()
.to_string(),
// Load SDP from calls table. Returns empty strings if no record exists,
// which can happen for old messages from before the migration or for
// calls where SDPs have been cleaned up by housekeeping.
// For incoming calls, the SDP is the offer from the other side.
// For outgoing calls (after acceptance), the SDP is the answer from the other side.
let sdp = self
.sql
.query_row_optional("SELECT sdp FROM calls WHERE msg_id=?", (call.id,), |row| {
row.get::<_, String>(0)
})
.await?
.unwrap_or_default();
let (place_call_info, accept_call_info) = if call.from_id == ContactId::SELF {
// Outgoing call: the stored SDP (if any) is the answer from the other side
(String::new(), sdp)
} else {
// Incoming call: the stored SDP is the offer from the other side
(sdp, String::new())
};
Ok(Some(CallInfo {
place_call_info,
accept_call_info,
msg: call,
})
}))
}
}

View File

@@ -4,6 +4,8 @@ use crate::config::Config;
use crate::constants::DC_CHAT_ID_TRASH;
use crate::receive_imf::{receive_imf, receive_imf_from_inbox};
use crate::test_utils::{TestContext, TestContextManager};
use crate::tools::SystemTime;
use std::time::Duration;
struct CallSetup {
pub alice: TestContext,
@@ -672,3 +674,74 @@ async fn test_no_partial_calls() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_housekeeping_deletes_old_call_sdps() -> Result<()> {
use crate::sql::housekeeping;
let alice = TestContext::new_alice().await;
// Simulate receiving an incoming call from Bob
let received_call = receive_imf(
&alice,
b"From: bob@example.net\n\
To: alice@example.org\n\
Message-ID: <incoming-call@example.net>\n\
Chat-Version: 1.0\n\
Chat-Content: call\n\
Chat-Webrtc-Room: dGVzdC1zZHAtb2ZmZXI=\n\
\n\
Hello, this is a call\n",
false,
)
.await?
.unwrap();
let call_id = received_call.msg_ids[0];
// Verify SDP is stored in calls table for incoming call
let sdp_before: Option<String> = alice
.sql
.query_row_optional("SELECT sdp FROM calls WHERE msg_id=?", (call_id,), |row| {
row.get(0)
})
.await?;
assert!(sdp_before.is_some());
// End the call
alice.end_call(call_id).await?;
// Verify the call message is marked as ended
let call = alice.load_call_by_id(call_id).await?.unwrap();
assert!(call.is_ended());
// SDP should still be there after ending
let sdp_after_end: Option<String> = alice
.sql
.query_row_optional("SELECT sdp FROM calls WHERE msg_id=?", (call_id,), |row| {
row.get(0)
})
.await?;
assert!(sdp_after_end.is_some());
// Simulate passage of time - shift forward by 24 hours + 1 second
SystemTime::shift(Duration::from_secs(86400 + 1));
// Run housekeeping
housekeeping(&alice).await?;
// Verify SDP has been deleted from calls table
let sdp_after_housekeeping: Option<String> = alice
.sql
.query_row_optional("SELECT sdp FROM calls WHERE msg_id=?", (call_id,), |row| {
row.get(0)
})
.await?;
assert_eq!(sdp_after_housekeeping, None);
// The call message should still exist
let msg = Message::load_from_db(&alice, call_id).await?;
assert_eq!(msg.viewtype, Viewtype::Call);
Ok(())
}

View File

@@ -443,6 +443,13 @@ pub struct Message {
pub(crate) location_id: u32,
pub(crate) error: Option<String>,
pub(crate) param: Params,
/// SDP offer for outgoing calls.
/// This field is used to pass the SDP offer to the database
/// without storing it in message parameters.
/// It is not persisted in the msgs table, only in the calls table.
#[serde(skip)]
pub(crate) call_sdp_offer: Option<String>,
}
impl Message {
@@ -572,6 +579,7 @@ impl Message {
chat_blocked: row
.get::<_, Option<Blocked>>("blocked")?
.unwrap_or_default(),
call_sdp_offer: None,
};
Ok(msg)
},

View File

@@ -1680,6 +1680,42 @@ impl MimeFactory {
"Chat-Content",
mail_builder::headers::raw::Raw::new("call-accepted").into(),
));
// Get SDP answer from the referenced call message in calls table,
// or fall back to params if not yet migrated
if let Some(ref quoted_rfc724_mid) = msg.in_reply_to {
// Look up msg_id from rfc724_mid first
let quoted_msg_id: Option<MsgId> = context
.sql
.query_row_optional(
"SELECT id FROM msgs WHERE rfc724_mid=?",
(quoted_rfc724_mid,),
|row| row.get(0),
)
.await?;
if let Some(quoted_msg_id) = quoted_msg_id {
// For CallAccepted messages, retrieve the SDP (which is our answer)
let answer_sdp = context
.sql
.query_row_optional(
"SELECT sdp FROM calls WHERE msg_id=?",
(quoted_msg_id,),
|row| row.get::<_, Option<String>>(0),
)
.await?
.flatten()
.or_else(|| {
msg.param.get(Param::WebrtcAccepted).map(|s| s.to_string())
});
if let Some(answer_sdp) = answer_sdp {
headers.push((
"Chat-Webrtc-Accepted",
mail_builder::headers::raw::Raw::new(b_encode(&answer_sdp)).into(),
));
}
}
}
}
SystemMessage::CallEnded => {
headers.push((
@@ -1716,16 +1752,23 @@ impl MimeFactory {
);
}
if let Some(offer) = msg.param.get(Param::WebrtcRoom) {
headers.push((
"Chat-Webrtc-Room",
mail_builder::headers::raw::Raw::new(b_encode(offer)).into(),
));
} else if let Some(answer) = msg.param.get(Param::WebrtcAccepted) {
headers.push((
"Chat-Webrtc-Accepted",
mail_builder::headers::raw::Raw::new(b_encode(answer)).into(),
));
if msg.viewtype == Viewtype::Call {
// Get SDP offer from the message field (if being sent) or params (for old messages).
// For outgoing calls, we don't store the offer in the database, only in memory.
// For incoming calls that are stored, we could query the database, but we typically
// only render outgoing call messages where we use the call_sdp_offer field.
let offer_sdp = if let Some(ref offer) = msg.call_sdp_offer {
Some(offer.clone())
} else {
msg.param.get(Param::WebrtcRoom).map(|s| s.to_string())
};
if let Some(offer_sdp) = offer_sdp {
headers.push((
"Chat-Webrtc-Room",
mail_builder::headers::raw::Raw::new(b_encode(&offer_sdp)).into(),
));
}
}
if msg.viewtype == Viewtype::Voice

View File

@@ -921,6 +921,27 @@ pub async fn housekeeping(context: &Context) -> Result<()> {
.log_err(context)
.ok();
// Delete call SDPs for ended calls (older than 24 hours) or trashed calls.
// We clean up calls that ended more than 24 hours ago to protect privacy
// as SDPs contain IP addresses.
// The ON DELETE CASCADE foreign key handles orphaned entries automatically.
context
.sql
.execute(
"DELETE FROM calls WHERE msg_id IN (
SELECT calls.msg_id FROM calls
INNER JOIN msgs ON calls.msg_id = msgs.id
WHERE msgs.chat_id = ?
OR (calls.ended_timestamp IS NOT NULL
AND calls.ended_timestamp < ?)
)",
(DC_CHAT_ID_TRASH, time() - 86400),
)
.await
.context("Failed to delete old call SDPs")
.log_err(context)
.ok();
info!(context, "Housekeeping done.");
Ok(())
}

View File

@@ -1339,6 +1339,19 @@ CREATE INDEX gossip_timestamp_index ON gossip_timestamp (chat_id, fingerprint);
.await?;
}
inc_and_check(&mut migration_version, 139)?;
if dbversion < migration_version {
sql.execute_migration(
"CREATE TABLE calls(
msg_id INTEGER PRIMARY KEY REFERENCES msgs(id) ON DELETE CASCADE,
sdp TEXT NOT NULL,
ended_timestamp INTEGER
) STRICT;",
migration_version,
)
.await?;
}
let new_version = sql
.get_raw_config_int(VERSION_CFG)
.await?