mirror of
https://github.com/chatmail/core.git
synced 2026-04-05 15:02:11 +03:00
Compare commits
6 Commits
v2.42.0
...
hpk/rpc-pi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51e58583e0 | ||
|
|
5542dcfa0a | ||
|
|
00a6f41441 | ||
|
|
a77e9cb8d5 | ||
|
|
7dc717fa62 | ||
|
|
8b9f5c7795 |
@@ -8,7 +8,7 @@ from .const import EventType, SpecialContactId
|
||||
from .contact import Contact
|
||||
from .deltachat import DeltaChat
|
||||
from .message import Message
|
||||
from .rpc import Rpc
|
||||
from .rpc import Rpc, RpcFIFO
|
||||
|
||||
__all__ = [
|
||||
"Account",
|
||||
@@ -22,6 +22,7 @@ __all__ = [
|
||||
"Message",
|
||||
"SpecialContactId",
|
||||
"Rpc",
|
||||
"RpcFIFO",
|
||||
"run_bot_cli",
|
||||
"run_client_cli",
|
||||
]
|
||||
|
||||
@@ -47,7 +47,6 @@ class AttrDict(dict):
|
||||
def run_client_cli(
|
||||
hooks: Optional[Iterable[Tuple[Callable, Union[type, "EventFilter"]]]] = None,
|
||||
argv: Optional[list] = None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
"""Run a simple command line app, using the given hooks.
|
||||
|
||||
@@ -55,13 +54,12 @@ def run_client_cli(
|
||||
"""
|
||||
from .client import Client
|
||||
|
||||
_run_cli(Client, hooks, argv, **kwargs)
|
||||
_run_cli(Client, hooks, argv)
|
||||
|
||||
|
||||
def run_bot_cli(
|
||||
hooks: Optional[Iterable[Tuple[Callable, Union[type, "EventFilter"]]]] = None,
|
||||
argv: Optional[list] = None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
"""Run a simple bot command line using the given hooks.
|
||||
|
||||
@@ -69,14 +67,13 @@ def run_bot_cli(
|
||||
"""
|
||||
from .client import Bot
|
||||
|
||||
_run_cli(Bot, hooks, argv, **kwargs)
|
||||
_run_cli(Bot, hooks, argv)
|
||||
|
||||
|
||||
def _run_cli(
|
||||
client_type: Type["Client"],
|
||||
hooks: Optional[Iterable[Tuple[Callable, Union[type, "EventFilter"]]]] = None,
|
||||
argv: Optional[list] = None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
from .deltachat import DeltaChat
|
||||
from .rpc import Rpc
|
||||
@@ -94,7 +91,7 @@ def _run_cli(
|
||||
parser.add_argument("--password", action="store", help="password", default=os.getenv("DELTACHAT_PASSWORD"))
|
||||
args = parser.parse_args(argv[1:])
|
||||
|
||||
with Rpc(accounts_dir=args.accounts_dir, **kwargs) as rpc:
|
||||
with Rpc(accounts_dir=args.accounts_dir) as rpc:
|
||||
deltachat = DeltaChat(rpc)
|
||||
core_version = (deltachat.get_system_info()).deltachat_core_version
|
||||
accounts = deltachat.get_all_accounts()
|
||||
|
||||
@@ -10,17 +10,24 @@ import subprocess
|
||||
import sys
|
||||
from queue import Empty, Queue
|
||||
from threading import Thread
|
||||
from typing import Any, Iterator, Optional
|
||||
from typing import TYPE_CHECKING, Any, Iterator, Optional
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import io
|
||||
|
||||
|
||||
class JsonRpcError(Exception):
|
||||
"""JSON-RPC error."""
|
||||
|
||||
|
||||
class RpcShutdownError(JsonRpcError):
|
||||
"""Raised in RPC methods if the connection to server is closing."""
|
||||
|
||||
|
||||
class RpcMethod:
|
||||
"""RPC method."""
|
||||
|
||||
def __init__(self, rpc: "Rpc", name: str):
|
||||
def __init__(self, rpc: "BaseRpc", name: str):
|
||||
self.rpc = rpc
|
||||
self.name = name
|
||||
|
||||
@@ -44,6 +51,8 @@ class RpcMethod:
|
||||
def rpc_future():
|
||||
"""Wait for the request to receive a result."""
|
||||
response = queue.get()
|
||||
if response is None:
|
||||
raise RpcShutdownError(f"no response for {request_id}/{self.name} while rpc is shutting down")
|
||||
if "error" in response:
|
||||
raise JsonRpcError(response["error"])
|
||||
return response.get("result", None)
|
||||
@@ -51,28 +60,18 @@ class RpcMethod:
|
||||
return rpc_future
|
||||
|
||||
|
||||
class Rpc:
|
||||
"""RPC client."""
|
||||
class BaseRpc:
|
||||
"""Base Rpc class which requires 'connect_to_server' and 'disconnect_from_server' methods
|
||||
from subclasses to work concretely."""
|
||||
|
||||
def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path="deltachat-rpc-server", **kwargs):
|
||||
"""Initialize RPC client.
|
||||
|
||||
The given arguments will be passed to subprocess.Popen().
|
||||
"""
|
||||
if accounts_dir:
|
||||
kwargs["env"] = {
|
||||
**kwargs.get("env", os.environ),
|
||||
"DC_ACCOUNTS_PATH": str(accounts_dir),
|
||||
}
|
||||
|
||||
self._kwargs = kwargs
|
||||
self.rpc_server_path = rpc_server_path
|
||||
self.process: subprocess.Popen
|
||||
def __init__(self):
|
||||
self.id_iterator: Iterator[int]
|
||||
self.event_queues: dict[int, Queue]
|
||||
# Map from request ID to a Queue which provides a single result
|
||||
self.request_results: dict[int, Queue]
|
||||
self.request_queue: Queue[Any]
|
||||
self.server_stdin: io.Writer[bytes]
|
||||
self.server_stdout: io.Reader[bytes]
|
||||
self.closing: bool
|
||||
self.reader_thread: Thread
|
||||
self.writer_thread: Thread
|
||||
@@ -80,16 +79,7 @@ class Rpc:
|
||||
|
||||
def start(self) -> None:
|
||||
"""Start RPC server subprocess."""
|
||||
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
|
||||
if sys.version_info >= (3, 11):
|
||||
# Prevent subprocess from capturing SIGINT.
|
||||
popen_kwargs["process_group"] = 0
|
||||
else:
|
||||
# `process_group` is not supported before Python 3.11.
|
||||
popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509
|
||||
|
||||
popen_kwargs.update(self._kwargs)
|
||||
self.process = subprocess.Popen(self.rpc_server_path, **popen_kwargs)
|
||||
self.server_stdout, self.server_stdin = self.connect_to_server()
|
||||
self.id_iterator = itertools.count(start=1)
|
||||
self.event_queues = {}
|
||||
self.request_results = {}
|
||||
@@ -105,10 +95,9 @@ class Rpc:
|
||||
def close(self) -> None:
|
||||
"""Terminate RPC server process and wait until the reader loop finishes."""
|
||||
self.closing = True
|
||||
self.stop_io_for_all_accounts()
|
||||
self.events_thread.join()
|
||||
self.process.stdin.close()
|
||||
self.disconnect_from_server()
|
||||
self.reader_thread.join()
|
||||
self.events_thread.join()
|
||||
self.request_queue.put(None)
|
||||
self.writer_thread.join()
|
||||
|
||||
@@ -122,7 +111,7 @@ class Rpc:
|
||||
def reader_loop(self) -> None:
|
||||
"""Process JSON-RPC responses from the RPC server process output."""
|
||||
try:
|
||||
while line := self.process.stdout.readline():
|
||||
while line := self.server_stdout.readline():
|
||||
response = json.loads(line)
|
||||
if "id" in response:
|
||||
response_id = response["id"]
|
||||
@@ -133,13 +122,17 @@ class Rpc:
|
||||
# Log an exception if the reader loop dies.
|
||||
logging.exception("Exception in the reader loop")
|
||||
|
||||
# terminate pending rpc requests because no responses can arrive anymore
|
||||
for queue in self.request_results.values():
|
||||
queue.put(None)
|
||||
|
||||
def writer_loop(self) -> None:
|
||||
"""Writer loop ensuring only a single thread writes requests."""
|
||||
try:
|
||||
while request := self.request_queue.get():
|
||||
data = (json.dumps(request) + "\n").encode()
|
||||
self.process.stdin.write(data)
|
||||
self.process.stdin.flush()
|
||||
self.server_stdin.write(data)
|
||||
self.server_stdin.flush()
|
||||
|
||||
except Exception:
|
||||
# Log an exception if the writer loop dies.
|
||||
@@ -157,7 +150,10 @@ class Rpc:
|
||||
while True:
|
||||
if self.closing:
|
||||
return
|
||||
event = self.get_next_event()
|
||||
try:
|
||||
event = self.get_next_event()
|
||||
except RpcShutdownError:
|
||||
return
|
||||
account_id = event["contextId"]
|
||||
queue = self.get_queue(account_id)
|
||||
event = event["event"]
|
||||
@@ -183,3 +179,58 @@ class Rpc:
|
||||
|
||||
def __getattr__(self, attr: str):
|
||||
return RpcMethod(self, attr)
|
||||
|
||||
|
||||
class RpcSubprocess(BaseRpc):
|
||||
"""RPC client that runs and connects to a deltachat-rpc-server in a subprocess."""
|
||||
|
||||
def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path: Optional[str] = "deltachat-rpc-server"):
|
||||
"""Initialize RPC client.
|
||||
|
||||
The given arguments will be passed to subprocess.Popen().
|
||||
"""
|
||||
super(RpcSubprocess, self).__init__()
|
||||
self._accounts_dir = accounts_dir
|
||||
self.rpc_server_path: str = rpc_server_path
|
||||
|
||||
def connect_to_server(self):
|
||||
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
|
||||
if sys.version_info >= (3, 11):
|
||||
# Prevent subprocess from capturing SIGINT.
|
||||
popen_kwargs["process_group"] = 0
|
||||
else:
|
||||
# `process_group` is not supported before Python 3.11.
|
||||
popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509
|
||||
|
||||
if self._accounts_dir:
|
||||
popen_kwargs["env"] = os.environ.copy()
|
||||
popen_kwargs["env"]["DC_ACCOUNTS_PATH"] = str(self._accounts_dir)
|
||||
|
||||
process = subprocess.Popen(self.rpc_server_path, **popen_kwargs)
|
||||
return process.stdout, process.stdin
|
||||
|
||||
def disconnect_from_server(self):
|
||||
self.stop_io_for_all_accounts()
|
||||
self.server_stdin.close()
|
||||
|
||||
|
||||
# backward compatibility
|
||||
Rpc = RpcSubprocess
|
||||
|
||||
|
||||
class RpcFIFO(BaseRpc):
|
||||
"""RPC client that runs and connects to a deltachat-rpc-server through FIFO files."""
|
||||
|
||||
def __init__(self, fn_request_fifo: str, fn_response_fifo: str):
|
||||
super(RpcFIFO, self).__init__()
|
||||
self.fn_request_fifo = fn_request_fifo
|
||||
self.fn_response_fifo = fn_response_fifo
|
||||
|
||||
def connect_to_server(self):
|
||||
server_stdin = open(self.fn_request_fifo, "wb") # noqa
|
||||
server_stdout = open(self.fn_response_fifo, "rb") # noqa
|
||||
return server_stdout, server_stdin
|
||||
|
||||
def disconnect_from_server(self):
|
||||
self.server_stdin.close()
|
||||
self.server_stdout.close()
|
||||
|
||||
22
deltachat-rpc-client/tests/test_rpc_fifo.py
Normal file
22
deltachat-rpc-client/tests/test_rpc_fifo.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import os
|
||||
import platform # noqa
|
||||
import subprocess
|
||||
|
||||
import pytest
|
||||
|
||||
from deltachat_rpc_client import DeltaChat, RpcFIFO
|
||||
|
||||
|
||||
@pytest.mark.skipif("platform.system() == 'Windows'")
|
||||
def test_rpc_fifo(tmp_path):
|
||||
fn_request_fifo = tmp_path.joinpath("request_fifo")
|
||||
fn_response_fifo = tmp_path.joinpath("response_fifo")
|
||||
os.mkfifo(fn_request_fifo)
|
||||
os.mkfifo(fn_response_fifo)
|
||||
popen = subprocess.Popen(f"deltachat-rpc-server <{fn_request_fifo} >{fn_response_fifo}", shell=True)
|
||||
|
||||
rpc = RpcFIFO(fn_response_fifo=fn_response_fifo, fn_request_fifo=fn_request_fifo)
|
||||
with rpc:
|
||||
dc = DeltaChat(rpc)
|
||||
assert dc.rpc.get_system_info()["deltachat_core_version"] is not None
|
||||
popen.wait()
|
||||
Reference in New Issue
Block a user