Remove start_rpc_server() and make Rpc a context manager

Rpc now has a start() method.
This way it is possible to use Rpc from IPython without calling __aenter__()
This commit is contained in:
link2xt
2022-12-04 00:02:32 +00:00
parent 5a3065344e
commit 2ccf39800d
5 changed files with 38 additions and 36 deletions

View File

@@ -30,11 +30,12 @@ from the REPL.
$ pip install ipython $ pip install ipython
$ PATH="../target/debug:$PATH" ipython $ PATH="../target/debug:$PATH" ipython
... ...
In [1]: from deltachat_rpc_client import * In [1]: from deltachat_rpc_client import *
In [2]: rpc_generator = start_rpc_server() In [2]: rpc = Rpc()
In [3]: rpc = await rpc_generator.__aenter__() In [3]: await rpc.start()
In [4]: dc = Deltachat(rpc) In [4]: dc = Deltachat(rpc)
In [5]: system_info = await dc.get_system_info() In [5]: system_info = await dc.get_system_info()
In [6]: system_info["level"] In [6]: system_info["level"]
Out [6]: 'awesome' Out[6]: 'awesome'
In [7]: await rpc.close()
``` ```

View File

@@ -7,7 +7,7 @@ import deltachat_rpc_client as dc
async def main(): async def main():
async with dc.start_rpc_server() as rpc: async with dc.Rpc() as rpc:
deltachat = dc.Deltachat(rpc) deltachat = dc.Deltachat(rpc)
system_info = await deltachat.get_system_info() system_info = await deltachat.get_system_info()
logging.info("Running deltachat core %s", system_info["deltachat_core_version"]) logging.info("Running deltachat core %s", system_info["deltachat_core_version"])

View File

@@ -2,4 +2,4 @@ from .account import Account
from .contact import Contact from .contact import Contact
from .deltachat import Deltachat from .deltachat import Deltachat
from .message import Message from .message import Message
from .rpc import Rpc, start_rpc_server from .rpc import Rpc

View File

@@ -8,7 +8,7 @@ import pytest_asyncio
from .account import Account from .account import Account
from .deltachat import Deltachat from .deltachat import Deltachat
from .rpc import start_rpc_server from .rpc import Rpc
async def get_temp_credentials() -> dict: async def get_temp_credentials() -> dict:
@@ -42,7 +42,8 @@ class ACFactory:
@pytest_asyncio.fixture @pytest_asyncio.fixture
async def rpc(tmp_path) -> AsyncGenerator: async def rpc(tmp_path) -> AsyncGenerator:
env = {**os.environ, "DC_ACCOUNTS_PATH": str(tmp_path / "accounts")} env = {**os.environ, "DC_ACCOUNTS_PATH": str(tmp_path / "accounts")}
async with start_rpc_server(env=env) as rpc: rpc_server = Rpc(env=env)
async with rpc_server as rpc:
yield rpc yield rpc

View File

@@ -1,6 +1,5 @@
import asyncio import asyncio
import json import json
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Dict, Optional from typing import Any, AsyncGenerator, Dict, Optional
@@ -9,8 +8,19 @@ class JsonRpcError(Exception):
class Rpc: class Rpc:
def __init__(self, process: asyncio.subprocess.Process) -> None: def __init__(self, *args, **kwargs):
self.process = process """The given arguments will be passed to asyncio.create_subprocess_exec()"""
self.args = args
self.kwargs = kwargs
async def start(self) -> None:
self.process = await asyncio.create_subprocess_exec(
"deltachat-rpc-server",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
*self.args,
**self.kwargs
)
self.event_queues: Dict[int, asyncio.Queue] = {} self.event_queues: Dict[int, asyncio.Queue] = {}
self.id = 0 self.id = 0
self.reader_task = asyncio.create_task(self.reader_loop()) self.reader_task = asyncio.create_task(self.reader_loop())
@@ -18,6 +28,18 @@ class Rpc:
# Map from request ID to `asyncio.Future` returning the response. # Map from request ID to `asyncio.Future` returning the response.
self.request_events: Dict[int, asyncio.Future] = {} self.request_events: Dict[int, asyncio.Future] = {}
async def close(self) -> None:
"""Terminate RPC server process and wait until the reader loop finishes."""
self.process.terminate()
await self.reader_task
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc, tb):
await self.close()
async def reader_loop(self) -> None: async def reader_loop(self) -> None:
while True: while True:
line = await self.process.stdout.readline() line = await self.process.stdout.readline()
@@ -37,11 +59,6 @@ class Rpc:
else: else:
print(response) print(response)
async def close(self) -> None:
"""Terminate RPC server process and wait until the reader loop finishes."""
self.process.terminate()
await self.reader_task
async def wait_for_event(self, account_id: int) -> Optional[dict]: async def wait_for_event(self, account_id: int) -> Optional[dict]:
"""Waits for the next event from the given account and returns it.""" """Waits for the next event from the given account and returns it."""
if account_id in self.event_queues: if account_id in self.event_queues:
@@ -73,20 +90,3 @@ class Rpc:
return response["result"] return response["result"]
return method return method
@asynccontextmanager
async def start_rpc_server(*args, **kwargs) -> AsyncGenerator:
"""The given arguments will be passed to asyncio.create_subprocess_exec()"""
proc = await asyncio.create_subprocess_exec(
"deltachat-rpc-server",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
*args,
**kwargs
)
rpc = Rpc(proc)
try:
yield rpc
finally:
await rpc.close()