diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/message.py b/deltachat-rpc-client/src/deltachat_rpc_client/message.py index 4aac1d7f9..6d1d68ac4 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/message.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/message.py @@ -2,7 +2,7 @@ import json from dataclasses import dataclass from typing import TYPE_CHECKING, Optional, Union -from ._utils import AttrDict +from ._utils import AttrDict, futuremethod from .const import EventType from .contact import Contact @@ -71,8 +71,10 @@ class Message: if event.kind == EventType.MSG_DELIVERED and event.msg_id == self.id: break - def send_webxdc_realtime_advertisement(self) -> None: - self._rpc.send_webxdc_realtime_advertisement(self.account.id, self.id) + @futuremethod + def send_webxdc_realtime_advertisement(self): + yield self._rpc.send_webxdc_realtime_advertisement.future(self.account.id, self.id) + @futuremethod def send_webxdc_realtime_data(self, data) -> None: - self._rpc.send_webxdc_realtime_data(self.account.id, self.id, list(data)) + yield self._rpc.send_webxdc_realtime_data.future(self.account.id, self.id, list(data)) diff --git a/deltachat-rpc-client/tests/test_iroh_webxdc.py b/deltachat-rpc-client/tests/test_iroh_webxdc.py index b29a04661..cb24bc572 100644 --- a/deltachat-rpc-client/tests/test_iroh_webxdc.py +++ b/deltachat-rpc-client/tests/test_iroh_webxdc.py @@ -136,3 +136,43 @@ def test_two_parallel_realtime_simultaneously(acfactory, path_to_webxdc): wait_receive_realtime_data([(ac1_webxdc_msg, [30]), (ac1_webxdc_msg2, [40])]) wait_receive_realtime_data([(ac2_webxdc_msg, [10]), (ac2_webxdc_msg2, [20])]) + + +def test_no_duplicate_messages(acfactory, path_to_webxdc): + """Test that messages are received only once.""" + ac1, ac2 = acfactory.get_online_accounts(2) + + ac1_ac2_chat = ac1.create_chat(ac2) + + ac1_webxdc_msg = ac1_ac2_chat.send_message(text="webxdc", file=path_to_webxdc) + + ac1_webxdc_msg.send_webxdc_realtime_advertisement + + ac2_webxdc_msg = ac2.wait_for_incoming_msg() + ac2_webxdc_msg.get_snapshot().chat.accept() + assert ac2_webxdc_msg.get_snapshot().text == "webxdc" + + # Issue a "send" call in parallel with sending advertisement. + # Previously due to a bug this caused subscribing to the channel twice. + future = ac2_webxdc_msg.send_webxdc_realtime_data.future(b"foobar") + ac2_webxdc_msg.send_webxdc_realtime_advertisement() + + def thread_run(): + for i in range(10): + data = str(i).encode() + ac1_webxdc_msg.send_webxdc_realtime_data(data) + time.sleep(1) + + threading.Thread(target=thread_run, daemon=True).start() + + while 1: + event = ac2.wait_for_event() + if event.kind == EventType.WEBXDC_REALTIME_DATA: + n = int(bytes(event.data).decode()) + break + + while 1: + event = ac2.wait_for_event() + if event.kind == EventType.WEBXDC_REALTIME_DATA: + assert int(bytes(event.data).decode()) > n + break