api(jsonrpc): add run_until parameter for bots (#7688)

This commit also makes testing hooks easier, as it allows to process
events and run hooks on them, until a certain event occurs.

---------

Co-authored-by: iequidoo <117991069+iequidoo@users.noreply.github.com>
This commit is contained in:
missytake
2026-01-14 19:58:44 +01:00
committed by GitHub
parent f50e3d6ffa
commit 47b49fd02e
2 changed files with 31 additions and 11 deletions

View File

@@ -44,8 +44,13 @@ class AttrDict(dict):
super().__setattr__(attr, val)
def _forever(_event: AttrDict) -> bool:
return False
def run_client_cli(
hooks: Optional[Iterable[Tuple[Callable, Union[type, "EventFilter"]]]] = None,
until: Callable[[AttrDict], bool] = _forever,
argv: Optional[list] = None,
**kwargs,
) -> None:
@@ -55,10 +60,11 @@ def run_client_cli(
"""
from .client import Client
_run_cli(Client, hooks, argv, **kwargs)
_run_cli(Client, until, hooks, argv, **kwargs)
def run_bot_cli(
until: Callable[[AttrDict], bool] = _forever,
hooks: Optional[Iterable[Tuple[Callable, Union[type, "EventFilter"]]]] = None,
argv: Optional[list] = None,
**kwargs,
@@ -69,11 +75,12 @@ def run_bot_cli(
"""
from .client import Bot
_run_cli(Bot, hooks, argv, **kwargs)
_run_cli(Bot, until, hooks, argv, **kwargs)
def _run_cli(
client_type: Type["Client"],
until: Callable[[AttrDict], bool] = _forever,
hooks: Optional[Iterable[Tuple[Callable, Union[type, "EventFilter"]]]] = None,
argv: Optional[list] = None,
**kwargs,
@@ -111,7 +118,7 @@ def _run_cli(
kwargs={"email": args.email, "password": args.password},
)
configure_thread.start()
client.run_forever()
client.run_until(until)
def extract_addr(text: str) -> str:

View File

@@ -14,6 +14,7 @@ from typing import (
from ._utils import (
AttrDict,
_forever,
parse_system_add_remove,
parse_system_image_changed,
parse_system_title_changed,
@@ -91,19 +92,28 @@ class Client:
def run_forever(self) -> None:
"""Process events forever."""
self.run_until(lambda _: False)
self.run_until(_forever)
def run_until(self, func: Callable[[AttrDict], bool]) -> AttrDict:
"""Process events until the given callable evaluates to True.
The callable should accept an AttrDict object representing the
last processed event. The event is returned when the callable
evaluates to True.
"""
"""Start the event processing loop."""
self.logger.debug("Listening to incoming events...")
if self.is_configured():
self.account.start_io()
self._process_messages() # Process old messages.
return self._process_events(until_func=func) # Loop over incoming events
def _process_events(
self,
until_func: Callable[[AttrDict], bool],
until_event: EventType = False,
) -> AttrDict:
"""Process events until the given callable evaluates to True,
or until a certain event happens.
The until_func callable should accept an AttrDict object representing
the last processed event. The event is returned when the callable
evaluates to True.
"""
while True:
event = self.account.wait_for_event()
event["kind"] = EventType(event.kind)
@@ -112,10 +122,13 @@ class Client:
if event.kind == EventType.INCOMING_MSG:
self._process_messages()
stop = func(event)
stop = until_func(event)
if stop:
return event
if event.kind == until_event:
return event
def _on_event(self, event: AttrDict, filter_type: Type[EventFilter] = RawEvent) -> None:
for hook, evfilter in self._hooks.get(filter_type, []):
if evfilter.filter(event):