api(deltachat-rpc-client): add futures

futures allow to call multiple methods in parallel
without threads.

This introduces RpcFuture class and futuremethod decorator.
This commit is contained in:
link2xt
2023-11-21 00:07:10 +00:00
parent 26400a9e4e
commit 2b8bf29fce
4 changed files with 86 additions and 29 deletions

View File

@@ -168,3 +168,33 @@ def parse_system_add_remove(text: str) -> Optional[Tuple[str, str, str]]:
return "removed", addr, addr
return None
class futuremethod: # noqa: N801
"""Decorator for async methods."""
def __init__(self, func):
self._func = func
def __get__(self, instance, owner=None):
if instance is None:
return self
def future(*args):
generator = self._func(instance, *args)
res = next(generator)
def f():
try:
generator.send(res())
except StopIteration as e:
return e.value
return f
def wrapper(*args):
f = future(*args)
return f()
wrapper.future = future
return wrapper

View File

@@ -2,7 +2,7 @@ from dataclasses import dataclass
from typing import TYPE_CHECKING, List, Optional, Tuple, Union
from warnings import warn
from ._utils import AttrDict
from ._utils import AttrDict, futuremethod
from .chat import Chat
from .const import ChatlistFlag, ContactFlag, EventType, SpecialContactId
from .contact import Contact
@@ -76,9 +76,10 @@ class Account:
"""Get self avatar."""
return self.get_config("selfavatar")
def configure(self) -> None:
@futuremethod
def configure(self):
"""Configure an account."""
self._rpc.configure(self.id)
yield self._rpc.configure.future(self.id)
def create_contact(self, obj: Union[int, str, Contact], name: Optional[str] = None) -> Contact:
"""Create a new Contact or return an existing one.

View File

@@ -5,6 +5,7 @@ from typing import AsyncGenerator, List, Optional
import pytest
from . import Account, AttrDict, Bot, Client, DeltaChat, EventType, Message
from ._utils import futuremethod
from .rpc import Rpc
@@ -37,9 +38,10 @@ class ACFactory:
assert not account.is_configured()
return account
def new_configured_account(self) -> Account:
@futuremethod
def new_configured_account(self):
account = self.new_preconfigured_account()
account.configure()
yield account.configure.future()
assert account.is_configured()
return account
@@ -49,8 +51,9 @@ class ACFactory:
bot.configure(credentials["email"], credentials["password"])
return bot
def get_online_account(self) -> Account:
account = self.new_configured_account()
@futuremethod
def get_online_account(self):
account = yield self.new_configured_account.future()
account.start_io()
while True:
event = account.wait_for_event()
@@ -59,7 +62,8 @@ class ACFactory:
return account
def get_online_accounts(self, num: int) -> List[Account]:
return [self.get_online_account() for _ in range(num)]
futures = [self.get_online_account.future() for _ in range(num)]
return [f() for f in futures]
def resetup_account(self, ac: Account) -> Account:
"""Resetup account from scratch, losing the encryption key."""

View File

@@ -13,6 +13,48 @@ class JsonRpcError(Exception):
pass
class RpcFuture:
def __init__(self, rpc: "Rpc", request_id: int, event: Event):
self.rpc = rpc
self.request_id = request_id
self.event = event
def __call__(self):
self.event.wait()
response = self.rpc.request_results.pop(self.request_id)
if "error" in response:
raise JsonRpcError(response["error"])
if "result" in response:
return response["result"]
return None
class RpcMethod:
def __init__(self, rpc: "Rpc", name: str):
self.rpc = rpc
self.name = name
def __call__(self, *args) -> Any:
"""Synchronously calls JSON-RPC method."""
future = self.future(*args)
return future()
def future(self, *args) -> Any:
"""Asynchronously calls JSON-RPC method."""
request_id = next(self.rpc.id_iterator)
request = {
"jsonrpc": "2.0",
"method": self.name,
"params": args,
"id": request_id,
}
event = Event()
self.rpc.request_events[request_id] = event
self.rpc.request_queue.put(request)
return RpcFuture(self.rpc, request_id, event)
class Rpc:
def __init__(self, accounts_dir: Optional[str] = None, **kwargs):
"""The given arguments will be passed to subprocess.Popen()"""
@@ -145,24 +187,4 @@ class Rpc:
return queue.get()
def __getattr__(self, attr: str):
def method(*args) -> Any:
request_id = next(self.id_iterator)
request = {
"jsonrpc": "2.0",
"method": attr,
"params": args,
"id": request_id,
}
event = Event()
self.request_events[request_id] = event
self.request_queue.put(request)
event.wait()
response = self.request_results.pop(request_id)
if "error" in response:
raise JsonRpcError(response["error"])
if "result" in response:
return response["result"]
return None
return method
return RpcMethod(self, attr)