python: add more type annotations

This commit is contained in:
link2xt
2023-02-16 18:24:19 +00:00
parent 7bdf79dee5
commit d1d43e889a
12 changed files with 54 additions and 37 deletions

View File

@@ -121,7 +121,7 @@ class Account:
"""re-enable logging."""
self._logging = True
def __repr__(self):
def __repr__(self) -> str:
return f"<Account path={self.db_path}>"
# def __del__(self):
@@ -363,12 +363,12 @@ class Account:
:returns: list of :class:`deltachat.contact.Contact` objects.
"""
flags = 0
query = as_dc_charpointer(query)
query_c = as_dc_charpointer(query)
if only_verified:
flags |= const.DC_GCL_VERIFIED_ONLY
if with_self:
flags |= const.DC_GCL_ADD_SELF
dc_array = ffi.gc(lib.dc_get_contacts(self._dc_context, flags, query), lib.dc_array_unref)
dc_array = ffi.gc(lib.dc_get_contacts(self._dc_context, flags, query_c), lib.dc_array_unref)
return list(iter_array(dc_array, lambda x: Contact(self, x)))
def get_fresh_messages(self) -> Generator[Message, None, None]:
@@ -767,7 +767,7 @@ class Account:
class ScannedQRCode:
def __init__(self, dc_lot):
def __init__(self, dc_lot) -> None:
self._dc_lot = dc_lot
def is_ask_verifycontact(self):

View File

@@ -24,7 +24,7 @@ class Chat:
You obtain instances of it through :class:`deltachat.account.Account`.
"""
def __init__(self, account, id) -> None:
def __init__(self, account, id: int) -> None:
from .account import Account
assert isinstance(account, Account), repr(account)
@@ -532,13 +532,13 @@ class Chat:
# ------ location streaming API ------------------------------
def is_sending_locations(self):
def is_sending_locations(self) -> bool:
"""return True if this chat has location-sending enabled currently.
:returns: True if location sending is enabled.
"""
return lib.dc_is_sending_locations_to_chat(self.account._dc_context, self.id)
return bool(lib.dc_is_sending_locations_to_chat(self.account._dc_context, self.id))
def enable_sending_locations(self, seconds):
def enable_sending_locations(self, seconds) -> None:
"""enable sending locations for this chat.
all subsequent messages will carry a location with them.
@@ -572,7 +572,7 @@ class Chat:
class Location:
def __init__(self, latitude, longitude, accuracy, timestamp, marker):
def __init__(self, latitude, longitude, accuracy, timestamp, marker) -> None:
assert isinstance(timestamp, datetime)
self.latitude = latitude
self.longitude = longitude
@@ -580,5 +580,5 @@ class Location:
self.timestamp = timestamp
self.marker = marker
def __eq__(self, other):
def __eq__(self, other) -> bool:
return self.__dict__ == other.__dict__

View File

@@ -15,7 +15,7 @@ class Contact:
You obtain instances of it through :class:`deltachat.account.Account`.
"""
def __init__(self, account, id):
def __init__(self, account, id) -> None:
from .account import Account
assert isinstance(account, Account), repr(account)
@@ -27,10 +27,10 @@ class Contact:
return False
return self.account._dc_context == other.account._dc_context and self.id == other.id
def __ne__(self, other):
def __ne__(self, other) -> bool:
return not self == other
def __repr__(self):
def __repr__(self) -> str:
return f"<Contact id={self.id} addr={self.addr} dc_context={self.account._dc_context}>"
@property

View File

@@ -191,7 +191,7 @@ class DirectImap:
class IdleManager:
def __init__(self, direct_imap):
def __init__(self, direct_imap) -> None:
self.direct_imap = direct_imap
self.log = direct_imap.account.log
# fetch latest messages before starting idle so that it only

View File

@@ -25,12 +25,12 @@ def get_dc_event_name(integer, _DC_EVENTNAME_MAP={}):
class FFIEvent:
def __init__(self, name: str, data1, data2):
def __init__(self, name: str, data1, data2) -> None:
self.name = name
self.data1 = data1
self.data2 = data2
def __str__(self):
def __str__(self) -> str:
if self.name == "DC_EVENT_INFO":
return f"INFO {self.data2}"
if self.name == "DC_EVENT_WARNING":
@@ -84,7 +84,10 @@ class FFIEventLogger:
class FFIEventTracker:
def __init__(self, account, timeout=None):
account: Account
_event_queue: Queue
def __init__(self, account: Account, timeout=None) -> None:
self.account = account
self._timeout = timeout
self._event_queue = Queue()

View File

@@ -19,7 +19,7 @@ class Message:
:class:`deltachat.chat.Chat`.
"""
def __init__(self, account, dc_msg):
def __init__(self, account, dc_msg) -> None:
self.account = account
assert isinstance(self.account._dc_context, ffi.CData)
assert isinstance(dc_msg, ffi.CData)
@@ -33,7 +33,7 @@ class Message:
return False
return self.account == other.account and self.id == other.id
def __repr__(self):
def __repr__(self) -> str:
c = self.get_sender_contact()
typ = "outgoing" if self.is_outgoing() else "incoming"
return (

View File

@@ -10,14 +10,14 @@ class Reactions:
You obtain instances of it through :class:`deltachat.message.Message`.
"""
def __init__(self, account, dc_reactions):
def __init__(self, account, dc_reactions) -> None:
assert isinstance(account._dc_context, ffi.CData)
assert isinstance(dc_reactions, ffi.CData)
assert dc_reactions != ffi.NULL
self.account = account
self._dc_reactions = dc_reactions
def __repr__(self):
def __repr__(self) -> str:
return f"<Reactions dc_reactions={self._dc_reactions}>"
@classmethod

View File

@@ -9,7 +9,7 @@ import threading
import time
import weakref
from queue import Queue
from typing import Callable, List, Optional
from typing import Callable, List, Optional, Dict, Set
import pytest
import requests
@@ -65,8 +65,8 @@ def pytest_configure(config):
# Additionally make the acfactory use a logging/no-logging default.
class LoggingAspect:
def __init__(self):
self._accounts = weakref.WeakSet()
def __init__(self) -> None:
self._accounts: weakref.WeakSet = weakref.WeakSet()
@deltachat.global_hookimpl
def dc_account_init(self, account):
@@ -143,10 +143,12 @@ def testprocess(request):
class TestProcess:
"""A pytest session-scoped instance to help with managing "live" account configurations."""
def __init__(self, pytestconfig):
_addr2files: Dict[str, Dict[pathlib.Path, bytes]]
def __init__(self, pytestconfig) -> None:
self.pytestconfig = pytestconfig
self._addr2files = {}
self._configlist = []
self._configlist: List[Dict[str, str]] = []
def get_liveconfig_producer(self):
"""provide live account configs, cached on a per-test-process scope
@@ -277,10 +279,10 @@ class ACSetup:
_configured_events: Queue
def __init__(self, testprocess, init_time):
def __init__(self, testprocess, init_time) -> None:
self._configured_events = Queue()
self._account2state = {}
self._imap_cleaned = set()
self._account2state: Dict[Account, str] = {}
self._imap_cleaned: Set[str] = set()
self.testprocess = testprocess
self.init_time = init_time

View File

@@ -1,19 +1,25 @@
from queue import Queue
from threading import Event
from typing import List, TYPE_CHECKING
from .hookspec import Global, account_hookimpl
if TYPE_CHECKING:
from .events import FFIEvent
class ImexFailed(RuntimeError):
"""Exception for signalling that import/export operations failed."""
class ImexTracker:
def __init__(self):
_imex_events: Queue
def __init__(self) -> None:
self._imex_events = Queue()
@account_hookimpl
def ac_process_ffi_event(self, ffi_event):
def ac_process_ffi_event(self, ffi_event: "FFIEvent") -> None:
if ffi_event.name == "DC_EVENT_IMEX_PROGRESS":
self._imex_events.put(ffi_event.data1)
elif ffi_event.name == "DC_EVENT_IMEX_FILE_WRITTEN":
@@ -50,7 +56,13 @@ class ConfigureFailed(RuntimeError):
class ConfigureTracker:
ConfigureFailed = ConfigureFailed
def __init__(self, account):
_configure_events: Queue
_smtp_finished: Event
_imap_finished: Event
_ffi_events: List["FFIEvent"]
_progress: Queue
def __init__(self, account) -> None:
self.account = account
self._configure_events = Queue()
self._smtp_finished = Event()
@@ -60,7 +72,7 @@ class ConfigureTracker:
self._gm = Global._get_plugin_manager()
@account_hookimpl
def ac_process_ffi_event(self, ffi_event):
def ac_process_ffi_event(self, ffi_event: "FFIEvent") -> None:
self._ffi_events.append(ffi_event)
if ffi_event.name == "DC_EVENT_SMTP_CONNECTED":
self._smtp_finished.set()