improve typing hints

This commit is contained in:
adbenitez
2022-12-01 00:37:45 -05:00
parent 18426561e3
commit 46594ec707
6 changed files with 65 additions and 46 deletions

View File

@@ -1,4 +1,4 @@
from typing import Optional
from typing import List, Optional
from .chat import Chat
from .contact import Contact
@@ -6,14 +6,14 @@ from .message import Message
class Account:
def __init__(self, rpc, account_id):
def __init__(self, rpc, account_id) -> None:
self.rpc = rpc
self.account_id = account_id
def __repr__(self):
def __repr__(self) -> str:
return f"<Account id={self.account_id}>"
async def wait_for_event(self):
async def wait_for_event(self) -> dict:
"""Wait until the next event and return it."""
return await self.rpc.wait_for_event(self.account_id)
@@ -29,17 +29,17 @@ class Account:
"""Stop the account I/O."""
await self.rpc.stop_io(self.account_id)
async def get_info(self):
async def get_info(self) -> dict:
return await self.rpc.get_info(self.account_id)
async def get_file_size(self):
async def get_file_size(self) -> int:
return await self.rpc.get_account_file_size(self.account_id)
async def is_configured(self) -> bool:
"""Return True for configured accounts."""
return await self.rpc.is_configured(self.account_id)
async def set_config(self, key: str, value: Optional[str] = None):
async def set_config(self, key: str, value: Optional[str] = None) -> None:
"""Set the configuration value key pair."""
await self.rpc.set_config(self.account_id, key, value)
@@ -47,7 +47,7 @@ class Account:
"""Get the configuration value."""
return await self.rpc.get_config(self.account_id, key)
async def configure(self):
async def configure(self) -> None:
"""Configure an account."""
await self.rpc.configure(self.account_id)
@@ -63,7 +63,7 @@ class Account:
chat_id = await self.rpc.secure_join(self.account_id, qrdata)
return Chat(self.rpc, self.account_id, chat_id)
async def get_fresh_messages(self):
async def get_fresh_messages(self) -> List[Message]:
"""Return the list of fresh messages, newest messages first.
This call is intended for displaying notifications.
@@ -73,7 +73,7 @@ class Account:
fresh_msg_ids = await self.rpc.get_fresh_msgs(self.account_id)
return [Message(self.rpc, self.account_id, msg_id) for msg_id in fresh_msg_ids]
async def get_fresh_messages_in_arrival_order(self):
async def get_fresh_messages_in_arrival_order(self) -> List[Message]:
"""Return fresh messages list sorted in the order of their arrival, with ascending IDs."""
fresh_msg_ids = sorted(await self.rpc.get_fresh_msgs(self.account_id))
return [Message(self.rpc, self.account_id, msg_id) for msg_id in fresh_msg_ids]

View File

@@ -1,24 +1,32 @@
from typing import TYPE_CHECKING
from .rpc import Rpc
if TYPE_CHECKING:
from .message import Message
class Chat:
def __init__(self, rpc, account_id, chat_id):
def __init__(self, rpc: Rpc, account_id: int, chat_id: int) -> None:
self.rpc = rpc
self.account_id = account_id
self.chat_id = chat_id
async def block(self):
async def block(self) -> None:
"""Block the chat."""
await self.rpc.block_chat(self.account_id, self.chat_id)
async def accept(self):
async def accept(self) -> None:
"""Accept the contact request."""
await self.rpc.accept_chat(self.account_id, self.chat_id)
async def delete(self):
async def delete(self) -> None:
await self.rpc.delete_chat(self.account_id, self.chat_id)
async def get_encryption_info(self):
async def get_encryption_info(self) -> str:
return await self.rpc.get_chat_encryption_info(self.account_id, self.chat_id)
async def send_text(self, text: str):
async def send_text(self, text: str) -> "Message":
from .message import Message
msg_id = await self.rpc.misc_send_text_message(
@@ -26,7 +34,7 @@ class Chat:
)
return Message(self.rpc, self.account_id, msg_id)
async def leave(self):
async def leave(self) -> None:
await self.rpc.leave_group(self.account_id, self.chat_id)
async def get_fresh_message_count(self) -> int:

View File

@@ -1,3 +1,11 @@
from typing import TYPE_CHECKING
from .rpc import Rpc
if TYPE_CHECKING:
from .chat import Chat
class Contact:
"""
Contact API.
@@ -5,24 +13,24 @@ class Contact:
Essentially a wrapper for RPC, account ID and a contact ID.
"""
def __init__(self, rpc, account_id, contact_id):
def __init__(self, rpc: Rpc, account_id: int, contact_id: int) -> None:
self.rpc = rpc
self.account_id = account_id
self.contact_id = contact_id
async def block(self):
async def block(self) -> None:
"""Block contact."""
await self.rpc.block_contact(self.account_id, self.contact_id)
async def unblock(self):
async def unblock(self) -> None:
"""Unblock contact."""
await self.rpc.unblock_contact(self.account_id, self.contact_id)
async def delete(self):
async def delete(self) -> None:
"""Delete contact."""
await self.rpc.delete_contact(self.account_id, self.contact_id)
async def change_name(self, name: str):
async def change_name(self, name: str) -> None:
await self.rpc.change_contact_name(self.account_id, self.contact_id, name)
async def get_encryption_info(self) -> str:
@@ -30,11 +38,11 @@ class Contact:
self.account_id, self.contact_id
)
async def get_dictionary(self):
"""Returns a dictionary with a snapshot of all contact properties."""
async def get_dictionary(self) -> dict:
"""Return a dictionary with a snapshot of all contact properties."""
return await self.rpc.get_contact(self.account_id, self.contact_id)
async def create_chat(self):
async def create_chat(self) -> "Chat":
from .chat import Chat
return Chat(

View File

@@ -1,4 +1,7 @@
from typing import List
from .account import Account
from .rpc import Rpc
class Deltachat:
@@ -7,14 +10,14 @@ class Deltachat:
This is the root of the object oriented API.
"""
def __init__(self, rpc):
def __init__(self, rpc: Rpc) -> None:
self.rpc = rpc
async def add_account(self):
async def add_account(self) -> Account:
account_id = await self.rpc.add_account()
return Account(self.rpc, account_id)
async def get_all_accounts(self):
async def get_all_accounts(self) -> List[Account]:
account_ids = await self.rpc.get_all_account_ids()
return [Account(self.rpc, account_id) for account_id in account_ids]
@@ -27,5 +30,5 @@ class Deltachat:
async def maybe_network(self) -> None:
await self.rpc.maybe_network()
async def get_system_info(self):
async def get_system_info(self) -> dict:
return await self.rpc.get_system_info()

View File

@@ -3,19 +3,20 @@ from typing import Optional
from .chat import Chat
from .contact import Contact
from .rpc import Rpc
class Message:
def __init__(self, rpc, account_id, msg_id):
def __init__(self, rpc: Rpc, account_id: int, msg_id: int) -> None:
self.rpc = rpc
self.account_id = account_id
self.msg_id = msg_id
async def send_reaction(self, reactions):
async def send_reaction(self, reactions: str) -> "Message":
msg_id = await self.rpc.send_reaction(self.account_id, self.msg_id, reactions)
return Message(self.rpc, self.account_id, msg_id)
async def get_snapshot(self):
async def get_snapshot(self) -> "MessageSnapshot":
message_object = await self.rpc.get_message(self.account_id, self.msg_id)
return MessageSnapshot(
message=self,

View File

@@ -1,6 +1,7 @@
import asyncio
import json
import os
from typing import Any, Dict, Optional
import aiohttp
@@ -10,16 +11,16 @@ class JsonRpcError(Exception):
class Rpc:
def __init__(self, process):
def __init__(self, process: asyncio.Process) -> None:
self.process = process
self.event_queues = {}
self.event_queues: Dict[int, asyncio.Queue] = {}
self.id = 0
self.reader_task = asyncio.create_task(self.reader_loop())
# Map from request ID to `asyncio.Future` returning the response.
self.request_events = {}
self.request_events: Dict[int, asyncio.Future] = {}
async def reader_loop(self):
async def reader_loop(self) -> None:
while True:
line = await self.process.stdout.readline()
response = json.loads(line)
@@ -36,25 +37,23 @@ class Rpc:
else:
print(response)
async def wait_for_event(self, account_id):
async def wait_for_event(self, account_id: int) -> Optional[dict]:
"""Waits for the next event from the given account and returns it."""
if account_id in self.event_queues:
return await self.event_queues[account_id].get()
return None
def __getattr__(self, attr):
async def method(*args, **kwargs):
def __getattr__(self, attr: str):
async def method(*args, **kwargs) -> Any:
self.id += 1
request_id = self.id
params = args
if kwargs:
assert not args
params = kwargs
assert not (args and kwargs), "Mixing positional and keyword arguments"
request = {
"jsonrpc": "2.0",
"method": attr,
"params": params,
"params": args or kwargs,
"id": self.id,
}
data = (json.dumps(request) + "\n").encode()
@@ -71,7 +70,7 @@ class Rpc:
return method
async def start_rpc_server(*args, **kwargs):
async def start_rpc_server(*args, **kwargs) -> Rpc:
proc = await asyncio.create_subprocess_exec(
"deltachat-rpc-server",
stdin=asyncio.subprocess.PIPE,
@@ -83,7 +82,7 @@ async def start_rpc_server(*args, **kwargs):
return rpc
async def new_online_account():
async def new_online_account() -> dict:
url = os.getenv("DCC_NEW_TMP_EMAIL")
async with aiohttp.ClientSession() as session:
async with session.post(url) as response: