python: upgrade from .format() to f-strings

They are supported since Python 3.5.
This commit is contained in:
link2xt
2023-02-08 12:05:16 +00:00
parent 315e944b69
commit 817760a6ef
18 changed files with 72 additions and 77 deletions

View File

@@ -55,6 +55,7 @@ def run_cmdline(argv=None, account_plugins=None):
ac.run_account(addr=args.email, password=args.password, account_plugins=account_plugins, show_ffi=args.show_ffi)
print("{}: waiting for message".format(ac.get_config("addr")))
addr = ac.get_config("addr")
print(f"{addr}: waiting for message")
ac.wait_shutdown()

View File

@@ -171,7 +171,7 @@ def extract_defines(flags):
match = defines_re.match(line)
if match:
defines.append(match.group(1))
return "\n".join("#define {} ...".format(d) for d in defines)
return "\n".join(f"#define {d} ..." for d in defines)
def ffibuilder():

View File

@@ -84,7 +84,7 @@ class Account(object):
ptr = lib.dc_context_new_closed(db_path) if closed else lib.dc_context_new(ffi.NULL, db_path, ffi.NULL)
if ptr == ffi.NULL:
raise ValueError("Could not dc_context_new: {} {}".format(os_name, db_path))
raise ValueError(f"Could not dc_context_new: {os_name} {db_path}")
self._dc_context = ffi.gc(
ptr,
lib.dc_context_unref,
@@ -116,7 +116,7 @@ class Account(object):
self._logging = True
def __repr__(self):
return "<Account path={}>".format(self.db_path)
return f"<Account path={self.db_path}>"
# def __del__(self):
# self.shutdown()
@@ -127,7 +127,7 @@ class Account(object):
def _check_config_key(self, name: str) -> None:
if name not in self._configkeys:
raise KeyError("{!r} not a valid config key, existing keys: {!r}".format(name, self._configkeys))
raise KeyError(f"{name!r} not a valid config key, existing keys: {self._configkeys!r}")
def get_info(self) -> Dict[str, str]:
"""return dictionary of built config parameters."""
@@ -141,7 +141,7 @@ class Account(object):
log("=============== " + self.get_config("displayname") + " ===============")
cursor = 0
for name, val in self.get_info().items():
entry = "{}={}".format(name.upper(), val)
entry = f"{name.upper()}={val}"
if cursor + len(entry) > 80:
log("")
cursor = 0
@@ -186,7 +186,7 @@ class Account(object):
self._check_config_key(name)
namebytes = name.encode("utf8")
res = lib.dc_get_config(self._dc_context, namebytes)
assert res != ffi.NULL, "config value not found for: {!r}".format(name)
assert res != ffi.NULL, f"config value not found for: {name!r}"
return from_dc_charpointer(res)
def _preconfigure_keypair(self, addr: str, public: str, secret: str) -> None:
@@ -296,7 +296,7 @@ class Account(object):
addr, displayname = obj.get_config("addr"), obj.get_config("displayname")
elif isinstance(obj, Contact):
if obj.account != self:
raise ValueError("account mismatch {}".format(obj))
raise ValueError(f"account mismatch {obj}")
addr, displayname = obj.addr, obj.name
elif isinstance(obj, str):
displayname, addr = parseaddr(obj)
@@ -428,7 +428,7 @@ class Account(object):
"""
res = lib.dc_get_chat(self._dc_context, chat_id)
if res == ffi.NULL:
raise ValueError("cannot get chat with id={}".format(chat_id))
raise ValueError(f"cannot get chat with id={chat_id}")
lib.dc_chat_unref(res)
return Chat(self, chat_id)
@@ -545,7 +545,7 @@ class Account(object):
res = ffi.gc(lib.dc_check_qr(self._dc_context, as_dc_charpointer(qr)), lib.dc_lot_unref)
lot = DCLot(res)
if lot.state() == const.DC_QR_ERROR:
raise ValueError("invalid or unknown QR code: {}".format(lot.text1()))
raise ValueError(f"invalid or unknown QR code: {lot.text1()}")
return ScannedQRCode(lot)
def qr_setup_contact(self, qr):
@@ -743,7 +743,7 @@ class Account(object):
try:
self._event_thread.wait(timeout=5)
except RuntimeError as e:
self.log("Waiting for event thread failed: {}".format(e))
self.log(f"Waiting for event thread failed: {e}")
if self._event_thread.is_alive():
self.log("WARN: event thread did not terminate yet, ignoring.")

View File

@@ -40,7 +40,7 @@ class Chat(object):
return not self == other
def __repr__(self) -> str:
return "<Chat id={} name={}>".format(self.id, self.get_name())
return f"<Chat id={self.id} name={self.get_name()}>"
@property
def _dc_chat(self):
@@ -444,7 +444,7 @@ class Chat(object):
contact = self.account.create_contact(obj)
ret = lib.dc_add_contact_to_chat(self.account._dc_context, self.id, contact.id)
if ret != 1:
raise ValueError("could not add contact {!r} to chat".format(contact))
raise ValueError(f"could not add contact {contact!r} to chat")
return contact
def remove_contact(self, obj):
@@ -457,7 +457,7 @@ class Chat(object):
contact = self.account.get_contact(obj)
ret = lib.dc_remove_contact_from_chat(self.account._dc_context, self.id, contact.id)
if ret != 1:
raise ValueError("could not remove contact {!r} from chat".format(contact))
raise ValueError(f"could not remove contact {contact!r} from chat")
def get_contacts(self):
"""get all contacts for this chat.
@@ -493,7 +493,7 @@ class Chat(object):
p = as_dc_charpointer(img_path)
res = lib.dc_set_chat_profile_image(self.account._dc_context, self.id, p)
if res != 1:
raise ValueError("Setting Profile Image {!r} failed".format(p))
raise ValueError(f"Setting Profile Image {p!r} failed")
def remove_profile_image(self):
"""remove group profile image.

View File

@@ -31,7 +31,7 @@ class Contact(object):
return not self == other
def __repr__(self):
return "<Contact id={} addr={} dc_context={}>".format(self.id, self.addr, self.account._dc_context)
return f"<Contact id={self.id} addr={self.addr} dc_context={self.account._dc_context}>"
@property
def _dc_contact(self):

View File

@@ -82,7 +82,7 @@ class DirectImap:
configured, otherwise None.
"""
if "_" not in config_name:
config_name = "configured_{}_folder".format(config_name)
config_name = f"configured_{config_name}_folder"
foldername = self.account.get_config(config_name)
if foldername:
return self.select_folder(foldername)
@@ -203,7 +203,7 @@ class IdleManager:
"""(blocking) wait for next idle message from server."""
self.log("imap-direct: calling idle_check")
res = self.direct_imap.conn.idle.poll(timeout=timeout)
self.log("imap-direct: idle_check returned {!r}".format(res))
self.log(f"imap-direct: idle_check returned {res!r}")
return res
def wait_for_new_message(self, timeout=None) -> bytes:

View File

@@ -31,11 +31,11 @@ class FFIEvent:
def __str__(self):
if self.name == "DC_EVENT_INFO":
return "INFO {data2}".format(data2=self.data2)
return f"INFO {self.data2}"
if self.name == "DC_EVENT_WARNING":
return "WARNING {data2}".format(data2=self.data2)
return f"WARNING {self.data2}"
if self.name == "DC_EVENT_ERROR":
return "ERROR {data2}".format(data2=self.data2)
return f"ERROR {self.data2}"
return "{name} data1={data1} data2={data2}".format(**self.__dict__)
@@ -68,7 +68,7 @@ class FFIEventLogger:
locname = tname
if self.logid:
locname += "-" + self.logid
s = "{:2.2f} [{}] {}".format(elapsed, locname, message)
s = f"{elapsed:2.2f} [{locname}] {message}"
if os.name == "posix":
WARN = "\033[93m"
@@ -103,7 +103,7 @@ class FFIEventTracker:
timeout = timeout if timeout is not None else self._timeout
ev = self._event_queue.get(timeout=timeout)
if check_error and ev.name == "DC_EVENT_ERROR":
raise ValueError("unexpected event: {}".format(ev))
raise ValueError(f"unexpected event: {ev}")
return ev
def iter_events(self, timeout=None, check_error=True):
@@ -111,7 +111,7 @@ class FFIEventTracker:
yield self.get(timeout=timeout, check_error=check_error)
def get_matching(self, event_name_regex, check_error=True, timeout=None):
rex = re.compile("^(?:{})$".format(event_name_regex))
rex = re.compile(f"^(?:{event_name_regex})$")
for ev in self.iter_events(timeout=timeout, check_error=check_error):
if rex.match(ev.name):
return ev
@@ -162,20 +162,20 @@ class FFIEventTracker:
def ensure_event_not_queued(self, event_name_regex):
__tracebackhide__ = True
rex = re.compile("(?:{}).*".format(event_name_regex))
rex = re.compile(f"(?:{event_name_regex}).*")
while 1:
try:
ev = self._event_queue.get(False)
except Empty:
break
else:
assert not rex.match(ev.name), "event found {}".format(ev)
assert not rex.match(ev.name), f"event found {ev}"
def wait_securejoin_inviter_progress(self, target):
while 1:
event = self.get_matching("DC_EVENT_SECUREJOIN_INVITER_PROGRESS")
if event.data2 >= target:
print("** SECUREJOINT-INVITER PROGRESS {}".format(target), self.account)
print(f"** SECUREJOINT-INVITER PROGRESS {target}", self.account)
break
def wait_idle_inbox_ready(self):
@@ -270,11 +270,11 @@ class EventThread(threading.Thread):
lib.dc_event_unref(event)
ffi_event = FFIEvent(name=evt_name, data1=data1, data2=data2)
with self.swallow_and_log_exception("ac_process_ffi_event {}".format(ffi_event)):
with self.swallow_and_log_exception(f"ac_process_ffi_event {ffi_event}"):
self.account._pm.hook.ac_process_ffi_event(account=self, ffi_event=ffi_event)
for name, kwargs in self._map_ffi_event(ffi_event):
hook = getattr(self.account._pm.hook, name)
info = "call {} kwargs={} failed".format(name, kwargs)
info = f"call {name} kwargs={kwargs} failed"
with self.swallow_and_log_exception(info):
hook(**kwargs)
@@ -285,7 +285,7 @@ class EventThread(threading.Thread):
except Exception as ex:
logfile = io.StringIO()
traceback.print_exception(*sys.exc_info(), file=logfile)
self.account.log("{}\nException {}\nTraceback:\n{}".format(info, ex, logfile.getvalue()))
self.account.log(f"{info}\nException {ex}\nTraceback:\n{logfile.getvalue()}")
def _map_ffi_event(self, ffi_event: FFIEvent):
name = ffi_event.name

View File

@@ -36,15 +36,9 @@ class Message(object):
def __repr__(self):
c = self.get_sender_contact()
typ = "outgoing" if self.is_outgoing() else "incoming"
return "<Message {} sys={} {} id={} sender={}/{} chat={}/{}>".format(
typ,
self.is_system_message(),
repr(self.text[:100]),
self.id,
c.id,
c.addr,
self.chat.id,
self.chat.get_name(),
return (
f"<Message {typ} sys={self.is_system_message()} {repr(self.text[:100])} "
f"id={self.id} sender={c.id}/{c.addr} chat={self.chat.id}/{self.chat.get_name()}>"
)
@classmethod
@@ -115,7 +109,7 @@ class Message(object):
"""set file for this message from path and mime_type."""
mtype = ffi.NULL if mime_type is None else as_dc_charpointer(mime_type)
if not os.path.exists(path):
raise ValueError("path does not exist: {!r}".format(path))
raise ValueError(f"path does not exist: {path!r}")
lib.dc_msg_set_file(self._dc_msg, as_dc_charpointer(path), mtype)
@props.with_doc

View File

@@ -18,7 +18,7 @@ class Reactions(object):
self._dc_reactions = dc_reactions
def __repr__(self):
return "<Reactions dc_reactions={}>".format(self._dc_reactions)
return f"<Reactions dc_reactions={self._dc_reactions}>"
@classmethod
def from_msg(cls, msg):

View File

@@ -131,9 +131,9 @@ def pytest_report_header(config, startdir):
if cfg:
if "?" in cfg:
url, token = cfg.split("?", 1)
summary.append("Liveconfig provider: {}?<token ommitted>".format(url))
summary.append(f"Liveconfig provider: {url}?<token ommitted>")
else:
summary.append("Liveconfig file: {}".format(cfg))
summary.append(f"Liveconfig file: {cfg}")
return summary
@@ -178,13 +178,13 @@ class TestProcess:
except IndexError:
res = requests.post(liveconfig_opt, timeout=60)
if res.status_code != 200:
pytest.fail("newtmpuser count={} code={}: '{}'".format(index, res.status_code, res.text))
pytest.fail(f"newtmpuser count={index} code={res.status_code}: '{res.text}'")
d = res.json()
config = {"addr": d["email"], "mail_pw": d["password"]}
print("newtmpuser {}: addr={}".format(index, config["addr"]))
self._configlist.append(config)
yield config
pytest.fail("more than {} live accounts requested.".format(MAX_LIVE_CREATED_ACCOUNTS))
pytest.fail(f"more than {MAX_LIVE_CREATED_ACCOUNTS} live accounts requested.")
def cache_maybe_retrieve_configured_db_files(self, cache_addr, db_target_path):
db_target_path = pathlib.Path(db_target_path)
@@ -252,7 +252,7 @@ def data(request):
fn = os.path.join(path, *bn.split("/"))
if os.path.exists(fn):
return fn
print("WARNING: path does not exist: {!r}".format(fn))
print(f"WARNING: path does not exist: {fn!r}")
return None
def read_path(self, bn, mode="r"):
@@ -285,7 +285,7 @@ class ACSetup:
self.init_time = init_time
def log(self, *args):
print("[acsetup]", "{:.3f}".format(time.time() - self.init_time), *args)
print("[acsetup]", f"{time.time() - self.init_time:.3f}", *args)
def add_configured(self, account):
"""add an already configured account."""
@@ -339,7 +339,7 @@ class ACSetup:
def _pop_config_success(self):
acc, success, comment = self._configured_events.get()
if not success:
pytest.fail("configuring online account {} failed: {}".format(acc, comment))
pytest.fail(f"configuring online account {acc} failed: {comment}")
self._account2state[acc] = self.CONFIGURED
return acc
@@ -373,7 +373,7 @@ class ACSetup:
imap.delete("1:*", expunge=True)
else:
imap.conn.folder.delete(folder)
acc.log("imap cleaned for addr {}".format(addr))
acc.log(f"imap cleaned for addr {addr}")
self._imap_cleaned.add(addr)
@@ -397,7 +397,7 @@ class ACFactory:
request.addfinalizer(self.finalize)
def log(self, *args):
print("[acfactory]", "{:.3f}".format(time.time() - self.init_time), *args)
print("[acfactory]", f"{time.time() - self.init_time:.3f}", *args)
def finalize(self):
while self._finalizers:
@@ -439,7 +439,7 @@ class ACFactory:
return self._getaccount(closed=closed)
def _getaccount(self, try_cache_addr=None, closed=False):
logid = "ac{}".format(len(self._accounts) + 1)
logid = f"ac{len(self._accounts) + 1}"
# we need to use fixed database basename for maybe_cache_* functions to work
path = self.tmpdir.mkdir(logid).join("dc.db")
if try_cache_addr:
@@ -465,12 +465,12 @@ class ACFactory:
except IndexError:
pass
else:
fname_pub = self.data.read_path("key/{name}-public.asc".format(name=keyname))
fname_sec = self.data.read_path("key/{name}-secret.asc".format(name=keyname))
fname_pub = self.data.read_path(f"key/{keyname}-public.asc")
fname_sec = self.data.read_path(f"key/{keyname}-secret.asc")
if fname_pub and fname_sec:
account._preconfigure_keypair(addr, fname_pub, fname_sec)
return True
print("WARN: could not use preconfigured keys for {!r}".format(addr))
print(f"WARN: could not use preconfigured keys for {addr!r}")
def get_pseudo_configured_account(self, passphrase: Optional[str] = None) -> Account:
# do a pseudo-configured account
@@ -478,7 +478,7 @@ class ACFactory:
if passphrase:
ac.open(passphrase)
acname = ac._logid
addr = "{}@offline.org".format(acname)
addr = f"{acname}@offline.org"
ac.update_config(
{
"addr": addr,

View File

@@ -38,7 +38,7 @@ class ImexTracker:
if isinstance(ev, str):
files_written.append(ev)
elif ev == 0:
raise ImexFailed("export failed, exp-files: {}".format(files_written))
raise ImexFailed(f"export failed, exp-files: {files_written}")
elif ev == 1000:
return files_written