Merge remote-tracking branch 'upstream/link2xt/async-jsonrpc-client' into adb/async-jsonrpc-client

This commit is contained in:
adbenitez
2022-12-01 20:32:36 -05:00
3 changed files with 55 additions and 46 deletions

View File

@@ -7,46 +7,46 @@ import deltachat_rpc_client as dc
async def main(): async def main():
rpc = await dc.start_rpc_server() async with dc.start_rpc_server() as rpc:
deltachat = dc.Deltachat(rpc) deltachat = dc.Deltachat(rpc)
system_info = await deltachat.get_system_info() system_info = await deltachat.get_system_info()
logging.info("Running deltachat core %s", system_info["deltachat_core_version"]) logging.info("Running deltachat core %s", system_info["deltachat_core_version"])
accounts = await deltachat.get_all_accounts() accounts = await deltachat.get_all_accounts()
account = accounts[0] if accounts else await deltachat.add_account() account = accounts[0] if accounts else await deltachat.add_account()
await account.set_config("bot", "1") await account.set_config("bot", "1")
if not await account.is_configured(): if not await account.is_configured():
logging.info("Account is not configured, configuring") logging.info("Account is not configured, configuring")
await account.set_config("addr", sys.argv[1]) await account.set_config("addr", sys.argv[1])
await account.set_config("mail_pw", sys.argv[2]) await account.set_config("mail_pw", sys.argv[2])
await account.configure() await account.configure()
logging.info("Configured") logging.info("Configured")
else: else:
logging.info("Account is already configured") logging.info("Account is already configured")
await deltachat.start_io() await deltachat.start_io()
async def process_messages(): async def process_messages():
for message in await account.get_fresh_messages_in_arrival_order(): for message in await account.get_fresh_messages_in_arrival_order():
snapshot = await message.get_snapshot() snapshot = await message.get_snapshot()
if not snapshot.is_info: if not snapshot.is_info:
await snapshot.chat.send_text(snapshot.text) await snapshot.chat.send_text(snapshot.text)
await snapshot.message.mark_seen() await snapshot.message.mark_seen()
# Process old messages. # Process old messages.
await process_messages() await process_messages()
while True: while True:
event = await account.wait_for_event() event = await account.wait_for_event()
if event["type"] == "Info": if event["type"] == "Info":
logging.info("%s", event["msg"]) logging.info("%s", event["msg"])
elif event["type"] == "Warning": elif event["type"] == "Warning":
logging.warning("%s", event["msg"]) logging.warning("%s", event["msg"])
elif event["type"] == "Error": elif event["type"] == "Error":
logging.error("%s", event["msg"]) logging.error("%s", event["msg"])
elif event["type"] == "IncomingMsg": elif event["type"] == "IncomingMsg":
logging.info("Got an incoming message") logging.info("Got an incoming message")
await process_messages() await process_messages()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,13 +1,14 @@
import asyncio
import json import json
import os import os
from typing import List from typing import AsyncGenerator, List
import aiohttp import aiohttp
import pytest_asyncio import pytest_asyncio
from .account import Account from .account import Account
from .deltachat import Deltachat from .deltachat import Deltachat
from .rpc import Rpc, start_rpc_server from .rpc import start_rpc_server
async def get_temp_credentials() -> dict: async def get_temp_credentials() -> dict:
@@ -39,12 +40,13 @@ class ACFactory:
@pytest_asyncio.fixture @pytest_asyncio.fixture
async def rpc(tmp_path) -> Rpc: async def rpc(tmp_path) -> AsyncGenerator:
return await start_rpc_server( env = {**os.environ, "DC_ACCOUNTS_PATH": str(tmp_path / "accounts")}
env={**os.environ, "DC_ACCOUNTS_PATH": str(tmp_path / "accounts")} async with start_rpc_server(env=env) as rpc:
) yield rpc
await asyncio.sleep(0.1) # avoid RuntimeError: Event loop is closed
@pytest_asyncio.fixture @pytest_asyncio.fixture
async def acfactory(rpc) -> ACFactory: async def acfactory(rpc) -> AsyncGenerator:
return ACFactory(Deltachat(rpc)) yield ACFactory(Deltachat(rpc))

View File

@@ -1,6 +1,7 @@
import asyncio import asyncio
import json import json
from typing import Any, Dict, Optional from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Dict, Optional
class JsonRpcError(Exception): class JsonRpcError(Exception):
@@ -20,6 +21,8 @@ class Rpc:
async def reader_loop(self) -> None: async def reader_loop(self) -> None:
while True: while True:
line = await self.process.stdout.readline() line = await self.process.stdout.readline()
if not line: # EOF
break
response = json.loads(line) response = json.loads(line)
if "id" in response: if "id" in response:
fut = self.request_events.pop(response["id"]) fut = self.request_events.pop(response["id"])
@@ -67,7 +70,8 @@ class Rpc:
return method return method
async def start_rpc_server(*args, **kwargs) -> Rpc: @asynccontextmanager
async def start_rpc_server(*args, **kwargs) -> AsyncGenerator:
"""The given arguments will be passed to asyncio.create_subprocess_exec()""" """The given arguments will be passed to asyncio.create_subprocess_exec()"""
proc = await asyncio.create_subprocess_exec( proc = await asyncio.create_subprocess_exec(
"deltachat-rpc-server", "deltachat-rpc-server",
@@ -77,4 +81,7 @@ async def start_rpc_server(*args, **kwargs) -> Rpc:
**kwargs **kwargs
) )
rpc = Rpc(proc) rpc = Rpc(proc)
return rpc try:
yield rpc
finally:
proc.terminate()