add a test echo_and_quit examples

This commit is contained in:
holger krekel
2020-03-03 18:55:19 +01:00
parent 91cdc76414
commit f61b9f7964
8 changed files with 106 additions and 57 deletions

View File

@@ -0,0 +1,2 @@
from deltachat.testplugin import * # noqa

View File

@@ -1,37 +1,57 @@
# instantiate and configure deltachat account
# content of echo_and_quit.py
import sys
import optparse
import deltachat
ac = deltachat.Account("/tmp/db")
# to see low-level events in the console uncomment the following line
# ac.add_account_plugin(deltachat.eventlogger.FFIEventLogger(ac, ""))
if not ac.is_configured():
ac.set_config("addr", "tmpy.94mtm@testrun.org")
ac.set_config("mail_pw", "5CbD6VnjD/li")
ac.set_config("mvbox_watch", "0")
ac.set_config("sentbox_watch", "0")
class MyPlugin:
class SimpleEchoPlugin:
@deltachat.hookspec.account_hookimpl
def process_incoming_message(self, message):
print("process_incoming message", message)
if message.text.strip() == "/quit":
print("shutting down")
ac.shutdown()
message.account.shutdown()
else:
ch = ac.create_chat_by_contact(message.get_sender_contact())
ch.send_text("echoing {}".format(message.text))
ch = message.account.create_chat_by_contact(message.get_sender_contact())
ch.send_text("echoing from {}:\n{}".format(message.get_sender_contact().addr, message.text))
@deltachat.hookspec.account_hookimpl
def process_message_delivered(self, message):
print("process_message_delivered", message)
ac.add_account_plugin(MyPlugin())
# start IO threads and perform configuration
ac.start()
def main(argv):
p = optparse.OptionParser("simple-echo")
p.add_option("-l", action="store_true", help="show low-level events")
p.add_option("--db", type="str", help="database file")
p.add_option("--email", type="str", help="email address")
p.add_option("--password", type="str", help="password")
print("waiting for /quit or other message on {}".format(ac.get_config("addr")))
opts, posargs = p.parse_args(argv)
ac.wait_shutdown()
assert opts.db, "you must specify --db"
ac = deltachat.Account(opts.db)
if opts.l:
ac.add_account_plugin(deltachat.eventlogger.FFIEventLogger(ac, "echo"))
if not ac.is_configured():
assert opts.email and opts.password, "you must specify --email and --password"
ac.set_config("addr", opts.email)
ac.set_config("mail_pw", opts.password)
ac.set_config("mvbox_watch", "0")
ac.set_config("sentbox_watch", "0")
ac.add_account_plugin(SimpleEchoPlugin())
# start IO threads and perform configure if neccessary
ac.start(callback_thread=True)
print("waiting for /quit or message to echo on: {}".format(ac.get_config("addr")))
ac.wait_shutdown()
if __name__ == "__main__":
main(sys.argv)

View File

@@ -0,0 +1,41 @@
import threading
import pytest
import py
from echo_and_quit import main
@pytest.fixture(scope='session')
def datadir():
"""The py.path.local object of the test-data/ directory."""
for path in reversed(py.path.local(__file__).parts()):
datadir = path.join('test-data')
if datadir.isdir():
return datadir
else:
pytest.skip('test-data directory not found')
def test_echo_quit_plugin(acfactory):
bot_ac, bot_cfg = acfactory.get_online_config()
def run_bot():
print("*"*20 + " starting bot")
main([
"-l",
"--email", bot_cfg["addr"],
"--password", bot_cfg["mail_pw"],
"--db", bot_ac.db_path
])
t = threading.Thread(target=run_bot)
t.start()
ac1 = acfactory.get_one_online_account()
bot_contact = ac1.create_contact(bot_cfg["addr"])
ch1 = ac1.create_chat_by_contact(bot_contact)
ch1.send_text("hello")
reply = ac1._evtracker.wait_next_incoming_message()
assert "hello" in reply.text
ch1.send_text("/quit")
t.join()