docs: add missing documentation to deltachat-rpc-client

This commit is contained in:
link2xt
2025-05-09 21:02:42 +00:00
parent a981573e48
commit c1471bdbd9
12 changed files with 160 additions and 77 deletions

View File

@@ -1,4 +1,4 @@
"""Delta Chat JSON-RPC high-level API"""
"""Delta Chat JSON-RPC high-level API."""
from ._utils import AttrDict, run_bot_cli, run_client_cli
from .account import Account

View File

@@ -115,7 +115,7 @@ def _run_cli(
def extract_addr(text: str) -> str:
"""extract email address from the given text."""
"""Extract email address from the given text."""
match = re.match(r".*\((.+@.+)\)", text)
if match:
text = match.group(1)
@@ -124,7 +124,7 @@ def extract_addr(text: str) -> str:
def parse_system_image_changed(text: str) -> Optional[Tuple[str, bool]]:
"""return image changed/deleted info from parsing the given system message text."""
"""Return image changed/deleted info from parsing the given system message text."""
text = text.lower()
match = re.match(r"group image (changed|deleted) by (.+).", text)
if match:
@@ -143,7 +143,7 @@ def parse_system_title_changed(text: str) -> Optional[Tuple[str, str]]:
def parse_system_add_remove(text: str) -> Optional[Tuple[str, str, str]]:
"""return add/remove info from parsing the given system message text.
"""Return add/remove info from parsing the given system message text.
returns a (action, affected, actor) tuple.
"""

View File

@@ -1,3 +1,5 @@
"""Account module."""
from __future__ import annotations
from dataclasses import dataclass
@@ -34,7 +36,10 @@ class Account:
return next_event
def clear_all_events(self):
"""Removes all queued-up events for a given account. Useful for tests."""
"""Remove all queued-up events for a given account.
Useful for tests.
"""
self._rpc.clear_all_events(self.id)
def remove(self) -> None:
@@ -43,7 +48,9 @@ class Account:
def clone(self) -> "Account":
"""Clone given account.
This uses backup-transfer via iroh, i.e. the 'Add second device' feature."""
This uses backup-transfer via iroh, i.e. the 'Add second device' feature.
"""
future = self._rpc.provide_backup.future(self.id)
qr = self._rpc.get_backup_qr(self.id)
new_account = self.manager.add_account()
@@ -80,7 +87,7 @@ class Account:
return self._rpc.get_config(self.id, key)
def update_config(self, **kwargs) -> None:
"""update config values."""
"""Update config values."""
for key, value in kwargs.items():
self.set_config(key, value)
@@ -99,10 +106,12 @@ class Account:
"""Parse QR code contents.
This function takes the raw text scanned
and checks what can be done with it."""
and checks what can be done with it.
"""
return self._rpc.check_qr(self.id, qr)
def set_config_from_qr(self, qr: str):
"""Set configuration values from a QR code."""
self._rpc.set_config_from_qr(self.id, qr)
@futuremethod
@@ -117,7 +126,7 @@ class Account:
@futuremethod
def list_transports(self):
"""Returns the list of all email accounts that are used as a transport in the current profile."""
"""Return the list of all email accounts that are used as a transport in the current profile."""
transports = yield self._rpc.list_transports.future(self.id)
return transports
@@ -158,7 +167,8 @@ class Account:
def import_vcard(self, vcard: str) -> list[Contact]:
"""Import vCard.
Return created or modified contacts in the order they appear in vCard."""
Return created or modified contacts in the order they appear in vCard.
"""
contact_ids = self._rpc.import_vcard_contents(self.id, vcard)
return [Contact(self, contact_id) for contact_id in contact_ids]
@@ -227,12 +237,12 @@ class Account:
@property
def self_contact(self) -> Contact:
"""This account's identity as a Contact."""
"""Account's identity as a Contact."""
return Contact(self, SpecialContactId.SELF)
@property
def device_contact(self) -> Chat:
"""This account's device contact."""
"""Account's device contact."""
return Contact(self, SpecialContactId.DEVICE)
def get_chatlist(
@@ -290,8 +300,7 @@ class Account:
return Chat(self, chat_id)
def secure_join(self, qrdata: str) -> Chat:
"""Continue a Setup-Contact or Verified-Group-Invite protocol started on
another device.
"""Continue a Setup-Contact or Verified-Group-Invite protocol started on another device.
The function returns immediately and the handshake runs in background, sending
and receiving several messages.
@@ -361,22 +370,26 @@ class Account:
def wait_for_incoming_msg(self):
"""Wait for incoming message and return it.
Consumes all events before the next incoming message event."""
Consumes all events before the next incoming message event.
"""
return self.get_message_by_id(self.wait_for_incoming_msg_event().msg_id)
def wait_for_securejoin_inviter_success(self):
"""Wait until SecureJoin process finishes successfully on the inviter side."""
while True:
event = self.wait_for_event()
if event["kind"] == "SecurejoinInviterProgress" and event["progress"] == 1000:
break
def wait_for_securejoin_joiner_success(self):
"""Wait until SecureJoin process finishes successfully on the joiner side."""
while True:
event = self.wait_for_event()
if event["kind"] == "SecurejoinJoinerProgress" and event["progress"] == 1000:
break
def wait_for_reactions_changed(self):
"""Wait for reaction change event."""
return self.wait_for_event(EventType.REACTIONS_CHANGED)
def get_fresh_messages_in_arrival_order(self) -> list[Message]:

View File

@@ -1,3 +1,5 @@
"""Chat module."""
from __future__ import annotations
import calendar
@@ -89,7 +91,8 @@ class Chat:
def set_ephemeral_timer(self, timer: int) -> None:
"""Set ephemeral timer of this chat in seconds.
0 means the timer is disabled, use 1 for immediate deletion."""
0 means the timer is disabled, use 1 for immediate deletion.
"""
self._rpc.set_chat_ephemeral_timer(self.account.id, self.id, timer)
def get_encryption_info(self) -> str:
@@ -199,12 +202,12 @@ class Chat:
return snapshot
def get_messages(self, info_only: bool = False, add_daymarker: bool = False) -> list[Message]:
"""get the list of messages in this chat."""
"""Get the list of messages in this chat."""
msgs = self._rpc.get_message_ids(self.account.id, self.id, info_only, add_daymarker)
return [Message(self.account, msg_id) for msg_id in msgs]
def get_fresh_message_count(self) -> int:
"""Get number of fresh messages in this chat"""
"""Get number of fresh messages in this chat."""
return self._rpc.get_fresh_msg_cnt(self.account.id, self.id)
def mark_noticed(self) -> None:

View File

@@ -48,6 +48,7 @@ class Client:
self.add_hooks(hooks or [])
def add_hooks(self, hooks: Iterable[tuple[Callable, Union[type, EventFilter]]]) -> None:
"""Register multiple hooks."""
for hook, event in hooks:
self.add_hook(hook, event)
@@ -77,9 +78,11 @@ class Client:
self._hooks.get(type(event), set()).remove((hook, event))
def is_configured(self) -> bool:
"""Return True if the client is configured."""
return self.account.is_configured()
def configure(self, email: str, password: str, **kwargs) -> None:
"""Configure the client."""
self.account.set_config("addr", email)
self.account.set_config("mail_pw", password)
for key, value in kwargs.items():
@@ -198,5 +201,6 @@ class Bot(Client):
"""Simple bot implementation that listens to events of a single account."""
def configure(self, email: str, password: str, **kwargs) -> None:
"""Configure the bot."""
kwargs.setdefault("bot", "1")
super().configure(email, password, **kwargs)

View File

@@ -1,14 +1,20 @@
"""Constants module."""
from enum import Enum, IntEnum
COMMAND_PREFIX = "/"
class ContactFlag(IntEnum):
"""Bit flags for get_contacts() method."""
VERIFIED_ONLY = 0x01
ADD_SELF = 0x02
class ChatlistFlag(IntEnum):
"""Bit flags for get_chatlist() method."""
ARCHIVED_ONLY = 0x01
NO_SPECIALS = 0x02
ADD_ALLDONE_HINT = 0x04
@@ -16,6 +22,8 @@ class ChatlistFlag(IntEnum):
class SpecialContactId(IntEnum):
"""Special contact IDs."""
SELF = 1
INFO = 2 # centered messages as "member added", used in all chats
DEVICE = 5 # messages "update info" in the device-chat
@@ -23,7 +31,7 @@ class SpecialContactId(IntEnum):
class EventType(str, Enum):
"""Core event types"""
"""Core event types."""
INFO = "Info"
SMTP_CONNECTED = "SmtpConnected"
@@ -71,7 +79,7 @@ class EventType(str, Enum):
class ChatId(IntEnum):
"""Special chat ids"""
"""Special chat IDs."""
TRASH = 3
ARCHIVED_LINK = 6
@@ -80,7 +88,7 @@ class ChatId(IntEnum):
class ChatType(IntEnum):
"""Chat types"""
"""Chat type."""
UNDEFINED = 0
SINGLE = 100
@@ -90,7 +98,7 @@ class ChatType(IntEnum):
class ChatVisibility(str, Enum):
"""Chat visibility types"""
"""Chat visibility types."""
NORMAL = "Normal"
ARCHIVED = "Archived"
@@ -98,7 +106,7 @@ class ChatVisibility(str, Enum):
class DownloadState(str, Enum):
"""Message download state"""
"""Message download state."""
DONE = "Done"
AVAILABLE = "Available"
@@ -159,14 +167,14 @@ class MessageState(IntEnum):
class MessageId(IntEnum):
"""Special message ids"""
"""Special message IDs."""
DAYMARKER = 9
LAST_SPECIAL = 9
class CertificateChecks(IntEnum):
"""Certificate checks mode"""
"""Certificate checks mode."""
AUTOMATIC = 0
STRICT = 1
@@ -174,7 +182,7 @@ class CertificateChecks(IntEnum):
class Connectivity(IntEnum):
"""Connectivity states"""
"""Connectivity states."""
NOT_CONNECTED = 1000
CONNECTING = 2000
@@ -183,7 +191,7 @@ class Connectivity(IntEnum):
class KeyGenType(IntEnum):
"""Type of the key to generate"""
"""Type of the key to generate."""
DEFAULT = 0
RSA2048 = 1
@@ -193,21 +201,21 @@ class KeyGenType(IntEnum):
# "Lp" means "login parameters"
class LpAuthFlag(IntEnum):
"""Authorization flags"""
"""Authorization flags."""
OAUTH2 = 0x2
NORMAL = 0x4
class MediaQuality(IntEnum):
"""Media quality setting"""
"""Media quality setting."""
BALANCED = 0
WORSE = 1
class ProviderStatus(IntEnum):
"""Provider status according to manual testing"""
"""Provider status according to manual testing."""
OK = 1
PREPARATION = 2
@@ -215,7 +223,7 @@ class ProviderStatus(IntEnum):
class PushNotifyState(IntEnum):
"""Push notifications state"""
"""Push notifications state."""
NOT_CONNECTED = 0
HEARTBEAT = 1
@@ -223,7 +231,7 @@ class PushNotifyState(IntEnum):
class ShowEmails(IntEnum):
"""Show emails mode"""
"""Show emails mode."""
OFF = 0
ACCEPTED_CONTACTS = 1
@@ -231,7 +239,7 @@ class ShowEmails(IntEnum):
class SocketSecurity(IntEnum):
"""Socket security"""
"""Socket security."""
AUTOMATIC = 0
SSL = 1
@@ -240,7 +248,7 @@ class SocketSecurity(IntEnum):
class VideochatType(IntEnum):
"""Video chat URL type"""
"""Video chat URL type."""
UNKNOWN = 0
BASICWEBRTC = 1

View File

@@ -1,3 +1,5 @@
"""Contact module."""
from dataclasses import dataclass
from typing import TYPE_CHECKING
@@ -11,8 +13,7 @@ if TYPE_CHECKING:
@dataclass
class Contact:
"""
Contact API.
"""Contact API.
Essentially a wrapper for RPC, account ID and a contact ID.
"""
@@ -45,8 +46,9 @@ class Contact:
self._rpc.change_contact_name(self.account.id, self.id, name)
def get_encryption_info(self) -> str:
"""Get a multi-line encryption info, containing your fingerprint and
the fingerprint of the contact.
"""Get a multi-line encryption info.
Encryption info contains your fingerprint and the fingerprint of the contact.
"""
return self._rpc.get_contact_encryption_info(self.account.id, self.id)
@@ -66,4 +68,5 @@ class Contact:
)
def make_vcard(self) -> str:
"""Make a vCard for the contact."""
return self.account.make_vcard([self])

View File

@@ -1,3 +1,5 @@
"""Account manager module."""
from __future__ import annotations
from typing import TYPE_CHECKING
@@ -10,12 +12,13 @@ if TYPE_CHECKING:
class DeltaChat:
"""
Delta Chat accounts manager.
"""Delta Chat accounts manager.
This is the root of the object oriented API.
"""
def __init__(self, rpc: "Rpc") -> None:
"""Initialize account manager."""
self.rpc = rpc
def add_account(self) -> Account:
@@ -37,9 +40,7 @@ class DeltaChat:
self.rpc.stop_io_for_all_accounts()
def maybe_network(self) -> None:
"""Indicate that the network likely has come back or just that the network
conditions might have changed.
"""
"""Indicate that the network conditions might have changed."""
self.rpc.maybe_network()
def get_system_info(self) -> AttrDict:

View File

@@ -36,7 +36,7 @@ class EventFilter(ABC):
@abstractmethod
def __hash__(self) -> int:
"""Object's unique hash"""
"""Object's unique hash."""
@abstractmethod
def __eq__(self, other) -> bool:
@@ -52,9 +52,7 @@ class EventFilter(ABC):
@abstractmethod
def filter(self, event):
"""Return True-like value if the event passed the filter and should be
used, or False-like value otherwise.
"""
"""Return True-like value if the event passed the filter."""
class RawEvent(EventFilter):
@@ -82,31 +80,17 @@ class RawEvent(EventFilter):
return False
def filter(self, event: "AttrDict") -> bool:
"""Filter an event.
Return true if the event should be processed.
"""
if self.types and event.kind not in self.types:
return False
return self._call_func(event)
class NewMessage(EventFilter):
"""Matches whenever a new message arrives.
Warning: registering a handler for this event will cause the messages
to be marked as read. Its usage is mainly intended for bots.
:param pattern: if set, this Pattern will be used to filter the message by its text
content.
:param command: If set, only match messages with the given command (ex. /help).
Setting this property implies `is_info==False`.
:param is_bot: If set to True only match messages sent by bots, if set to None
match messages from bots and users. If omitted or set to False
only messages from users will be matched.
:param is_info: If set to True only match info/system messages, if set to False
only match messages that are not info/system messages. If omitted
info/system messages as well as normal messages will be matched.
:param func: A Callable function that should accept the event as input
parameter, and return a bool value indicating whether the event
should be dispatched or not.
"""
"""Matches whenever a new message arrives."""
def __init__(
self,
@@ -121,6 +105,25 @@ class NewMessage(EventFilter):
is_info: Optional[bool] = None,
func: Optional[Callable[["AttrDict"], bool]] = None,
) -> None:
"""Initialize a new message filter.
Warning: registering a handler for this event will cause the messages
to be marked as read. Its usage is mainly intended for bots.
:param pattern: if set, this Pattern will be used to filter the message by its text
content.
:param command: If set, only match messages with the given command (ex. /help).
Setting this property implies `is_info==False`.
:param is_bot: If set to True only match messages sent by bots, if set to None
match messages from bots and users. If omitted or set to False
only messages from users will be matched.
:param is_info: If set to True only match info/system messages, if set to False
only match messages that are not info/system messages. If omitted
info/system messages as well as normal messages will be matched.
:param func: A Callable function that should accept the event as input
parameter, and return a bool value indicating whether the event
should be dispatched or not.
"""
super().__init__(func=func)
self.is_bot = is_bot
self.is_info = is_info
@@ -159,6 +162,7 @@ class NewMessage(EventFilter):
return False
def filter(self, event: "AttrDict") -> bool:
"""Return true if if the event is a new message event."""
if self.is_bot is not None and self.is_bot != event.message_snapshot.is_bot:
return False
if self.is_info is not None and self.is_info != event.message_snapshot.is_info:
@@ -199,6 +203,7 @@ class MemberListChanged(EventFilter):
return False
def filter(self, event: "AttrDict") -> bool:
"""Return true if if the event is a member addition event."""
if self.added is not None and self.added != event.member_added:
return False
return self._call_func(event)
@@ -231,6 +236,7 @@ class GroupImageChanged(EventFilter):
return False
def filter(self, event: "AttrDict") -> bool:
"""Return True if event is matched."""
if self.deleted is not None and self.deleted != event.image_deleted:
return False
return self._call_func(event)
@@ -256,13 +262,12 @@ class GroupNameChanged(EventFilter):
return False
def filter(self, event: "AttrDict") -> bool:
"""Return True if event is matched."""
return self._call_func(event)
class HookCollection:
"""
Helper class to collect event hooks that can later be added to a Delta Chat client.
"""
"""Helper class to collect event hooks that can later be added to a Delta Chat client."""
def __init__(self) -> None:
self._hooks: set[tuple[Callable, Union[type, EventFilter]]] = set()

View File

@@ -1,3 +1,5 @@
"""Message module."""
import json
from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional, Union
@@ -45,6 +47,7 @@ class Message:
return None
def get_sender_contact(self) -> Contact:
"""Return sender contact."""
from_id = self.get_snapshot().from_id
return self.account.get_contact_by_id(from_id)
@@ -53,6 +56,11 @@ class Message:
self._rpc.markseen_msgs(self.account.id, [self.id])
def continue_autocrypt_key_transfer(self, setup_code: str) -> None:
"""Continue the Autocrypt Setup Message key transfer.
This function can be called on received Autocrypt Setup Message
to import the key encrypted with the provided setup code.
"""
self._rpc.continue_autocrypt_key_transfer(self.account.id, self.id, setup_code)
def send_webxdc_status_update(self, update: Union[dict, str], description: str) -> None:
@@ -62,6 +70,7 @@ class Message:
self._rpc.send_webxdc_status_update(self.account.id, self.id, update, description)
def get_webxdc_status_updates(self, last_known_serial: int = 0) -> list:
"""Return a list of Webxdc status updates for Webxdc instance message."""
return json.loads(self._rpc.get_webxdc_status_updates(self.account.id, self.id, last_known_serial))
def get_info(self) -> str:
@@ -69,6 +78,7 @@ class Message:
return self._rpc.get_message_info(self.account.id, self.id)
def get_webxdc_info(self) -> dict:
"""Get info from a Webxdc message in JSON format."""
return self._rpc.get_webxdc_info(self.account.id, self.id)
def wait_until_delivered(self) -> None:
@@ -80,8 +90,10 @@ class Message:
@futuremethod
def send_webxdc_realtime_advertisement(self):
"""Send an advertisement to join the realtime channel."""
yield self._rpc.send_webxdc_realtime_advertisement.future(self.account.id, self.id)
@futuremethod
def send_webxdc_realtime_data(self, data) -> None:
"""Send data to the realtime channel."""
yield self._rpc.send_webxdc_realtime_data.future(self.account.id, self.id, list(data))

View File

@@ -1,3 +1,5 @@
"""Pytest plugin module."""
from __future__ import annotations
import os
@@ -13,24 +15,30 @@ from .rpc import Rpc
class ACFactory:
"""Test account factory."""
def __init__(self, deltachat: DeltaChat) -> None:
self.deltachat = deltachat
def get_unconfigured_account(self) -> Account:
"""Create a new unconfigured account."""
account = self.deltachat.add_account()
account.set_config("verified_one_on_one_chats", "1")
return account
def get_unconfigured_bot(self) -> Bot:
"""Create a new unconfigured bot."""
return Bot(self.get_unconfigured_account())
def get_credentials(self) -> (str, str):
"""Generate new credentials for chatmail account."""
domain = os.getenv("CHATMAIL_DOMAIN")
username = "ci-" + "".join(random.choice("2345789acdefghjkmnpqrstuvwxyz") for i in range(6))
return f"{username}@{domain}", f"{username}${username}"
@futuremethod
def new_configured_account(self):
"""Create a new configured account."""
addr, password = self.get_credentials()
account = self.get_unconfigured_account()
params = {"addr": addr, "password": password}
@@ -40,6 +48,7 @@ class ACFactory:
return account
def new_configured_bot(self) -> Bot:
"""Create a new configured bot."""
addr, password = self.get_credentials()
bot = self.get_unconfigured_bot()
bot.configure(addr, password)
@@ -47,11 +56,13 @@ class ACFactory:
@futuremethod
def get_online_account(self):
"""Create a new account and start I/O."""
account = yield self.new_configured_account.future()
account.bring_online()
return account
def get_online_accounts(self, num: int) -> list[Account]:
"""Create multiple online accounts."""
futures = [self.get_online_account.future() for _ in range(num)]
return [f() for f in futures]
@@ -66,6 +77,10 @@ class ACFactory:
return ac_clone
def get_accepted_chat(self, ac1: Account, ac2: Account) -> Chat:
"""Create a new 1:1 chat between ac1 and ac2 accepted on both sides.
Returned chat is a chat with ac2 from ac1 point of view.
"""
ac2.create_chat(ac1)
return ac1.create_chat(ac2)
@@ -77,6 +92,7 @@ class ACFactory:
file: Optional[str] = None,
group: Optional[str] = None,
) -> Message:
"""Send a message."""
if not from_account:
from_account = (self.get_online_accounts(1))[0]
to_contact = from_account.create_contact(to_account)
@@ -95,6 +111,7 @@ class ACFactory:
file: Optional[str] = None,
group: Optional[str] = None,
) -> AttrDict:
"""Send a message and wait until recipient processes it."""
self.send_message(
to_account=to_client.account,
from_account=from_account,
@@ -108,6 +125,7 @@ class ACFactory:
@pytest.fixture
def rpc(tmp_path) -> AsyncGenerator:
"""RPC client fixture."""
rpc_server = Rpc(accounts_dir=str(tmp_path / "accounts"))
with rpc_server:
yield rpc_server
@@ -115,6 +133,7 @@ def rpc(tmp_path) -> AsyncGenerator:
@pytest.fixture
def acfactory(rpc) -> AsyncGenerator:
"""Return account factory fixture."""
return ACFactory(DeltaChat(rpc))
@@ -132,7 +151,7 @@ def data():
raise Exception("Data path cannot be found")
def get_path(self, bn):
"""return path of file or None if it doesn't exist."""
"""Return path of file or None if it doesn't exist."""
fn = os.path.join(self.path, *bn.split("/"))
assert os.path.exists(fn)
return fn

View File

@@ -1,3 +1,5 @@
"""JSON-RPC client module."""
from __future__ import annotations
import itertools
@@ -12,16 +14,19 @@ from typing import Any, Iterator, Optional
class JsonRpcError(Exception):
pass
"""JSON-RPC error."""
class RpcFuture:
"""RPC future waiting for RPC call result."""
def __init__(self, rpc: "Rpc", request_id: int, event: Event):
self.rpc = rpc
self.request_id = request_id
self.event = event
def __call__(self):
"""Wait for the future to return the result."""
self.event.wait()
response = self.rpc.request_results.pop(self.request_id)
if "error" in response:
@@ -32,17 +37,19 @@ class RpcFuture:
class RpcMethod:
"""RPC method."""
def __init__(self, rpc: "Rpc", name: str):
self.rpc = rpc
self.name = name
def __call__(self, *args) -> Any:
"""Synchronously calls JSON-RPC method."""
"""Call JSON-RPC method synchronously."""
future = self.future(*args)
return future()
def future(self, *args) -> Any:
"""Asynchronously calls JSON-RPC method."""
"""Call JSON-RPC method asynchronously."""
request_id = next(self.rpc.id_iterator)
request = {
"jsonrpc": "2.0",
@@ -58,8 +65,13 @@ class RpcMethod:
class Rpc:
"""RPC client."""
def __init__(self, accounts_dir: Optional[str] = None, **kwargs):
"""The given arguments will be passed to subprocess.Popen()"""
"""Initialize RPC client.
The given arguments will be passed to subprocess.Popen().
"""
if accounts_dir:
kwargs["env"] = {
**kwargs.get("env", os.environ),
@@ -81,6 +93,7 @@ class Rpc:
self.events_thread: Thread
def start(self) -> None:
"""Start RPC server subprocess."""
if sys.version_info >= (3, 11):
self.process = subprocess.Popen(
"deltachat-rpc-server",
@@ -130,6 +143,7 @@ class Rpc:
self.close()
def reader_loop(self) -> None:
"""Process JSON-RPC responses from the RPC server process output."""
try:
while line := self.process.stdout.readline():
response = json.loads(line)
@@ -157,12 +171,13 @@ class Rpc:
logging.exception("Exception in the writer loop")
def get_queue(self, account_id: int) -> Queue:
"""Get event queue corresponding to the given account ID."""
if account_id not in self.event_queues:
self.event_queues[account_id] = Queue()
return self.event_queues[account_id]
def events_loop(self) -> None:
"""Requests new events and distributes them between queues."""
"""Request new events and distributes them between queues."""
try:
while True:
if self.closing:
@@ -178,12 +193,12 @@ class Rpc:
logging.exception("Exception in the event loop")
def wait_for_event(self, account_id: int) -> Optional[dict]:
"""Waits for the next event from the given account and returns it."""
"""Wait for the next event from the given account and returns it."""
queue = self.get_queue(account_id)
return queue.get()
def clear_all_events(self, account_id: int):
"""Removes all queued-up events for a given account. Useful for tests."""
"""Remove all queued-up events for a given account. Useful for tests."""
queue = self.get_queue(account_id)
try:
while True: