mirror of
https://github.com/chatmail/core.git
synced 2026-04-05 23:22:11 +03:00
Compare commits
25 Commits
v1.132.1
...
ignore_slo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
186512b356 | ||
|
|
be573767f4 | ||
|
|
d40754273d | ||
|
|
6e05039a10 | ||
|
|
76f4b9baa4 | ||
|
|
1c936260a0 | ||
|
|
676ff127c1 | ||
|
|
a704947546 | ||
|
|
a862484927 | ||
|
|
d66b6acde4 | ||
|
|
5677bf3fe5 | ||
|
|
85a4637b9f | ||
|
|
232f1d4f86 | ||
|
|
3cc5aef016 | ||
|
|
79dac9513f | ||
|
|
a8d53301e3 | ||
|
|
a4def9c39c | ||
|
|
bd59f4914c | ||
|
|
d5e6b6231d | ||
|
|
89341c1ceb | ||
|
|
c0003c6726 | ||
|
|
1290c191b3 | ||
|
|
69a28df74d | ||
|
|
303417f095 | ||
|
|
8483b3e1d3 |
@@ -28,7 +28,7 @@ class Account(object):
|
||||
"""
|
||||
MissingCredentials = MissingCredentials
|
||||
|
||||
def __init__(self, db_path, os_name=None, logging=True):
|
||||
def __init__(self, db_path, os_name=None, logging=True, logid=None):
|
||||
""" initialize account object.
|
||||
|
||||
:param db_path: a path to the account database. The database
|
||||
@@ -38,6 +38,7 @@ class Account(object):
|
||||
# initialize per-account plugin system
|
||||
self._pm = hookspec.PerAccount._make_plugin_manager()
|
||||
self._logging = logging
|
||||
self.logid = logid
|
||||
|
||||
self.add_account_plugin(self)
|
||||
|
||||
|
||||
141
python/src/deltachat/direct_imap.py
Normal file
141
python/src/deltachat/direct_imap.py
Normal file
@@ -0,0 +1,141 @@
|
||||
import imaplib
|
||||
import pathlib
|
||||
from . import Account
|
||||
|
||||
INBOX = "Inbox"
|
||||
SENT = "Sent"
|
||||
MVBOX = "DeltaChat"
|
||||
MVBOX_FALLBBACK = "INBOX/DeltaChat"
|
||||
DC_CONSTANT_MSG_MOVESTATE_PENDING = 1
|
||||
DC_CONSTANT_MSG_MOVESTATE_STAY = 2
|
||||
DC_CONSTANT_MSG_MOVESTATE_MOVING = 3
|
||||
|
||||
|
||||
def db_folder_attr(name):
|
||||
def fget(s):
|
||||
return s.db_folder.get(name, 1)
|
||||
|
||||
def fset(s, val):
|
||||
s.db_folder[name] = val
|
||||
return property(fget, fset, None, None)
|
||||
|
||||
|
||||
class ImapConn():
|
||||
def __init__(self, foldername, conn_info):
|
||||
self.foldername = foldername
|
||||
host, user, pw = conn_info
|
||||
|
||||
self.connection = imaplib.IMAP4_SSL(host)
|
||||
self.connection.login(user, pw)
|
||||
messages = self.reselect_folder()
|
||||
try:
|
||||
self.original_msg_count = int(messages[0])
|
||||
except IndexError:
|
||||
self.original_msg_count = 0
|
||||
|
||||
def mark_all_read(self):
|
||||
self.reselect_folder()
|
||||
# result, data = self.connection.uid('search', None, "(UNSEEN)")
|
||||
result, data = self.connection.search(None, 'UnSeen')
|
||||
try:
|
||||
mails_uid = data[0].split()
|
||||
print("New mails")
|
||||
|
||||
# self.connection.store(data[0].replace(' ',','),'+FLAGS','\Seen')
|
||||
for e_id in mails_uid:
|
||||
self.connection.store(e_id, '+FLAGS', '\\Seen')
|
||||
print("marked:", e_id)
|
||||
|
||||
return True
|
||||
except IndexError:
|
||||
print("No unread")
|
||||
return False
|
||||
|
||||
def get_unread_cnt(self):
|
||||
self.reselect_folder()
|
||||
# result, data = self.connection.uid('search', None, "(UNSEEN)")
|
||||
result, data = self.connection.search(None, 'UnSeen')
|
||||
try:
|
||||
mails_uid = data[0].split()
|
||||
|
||||
return len(mails_uid)
|
||||
except IndexError:
|
||||
return 0
|
||||
|
||||
def get_new_email_cnt(self):
|
||||
messages = self.reselect_folder()
|
||||
try:
|
||||
return int(messages[0]) - self.original_msg_count
|
||||
except IndexError:
|
||||
return 0
|
||||
|
||||
def reselect_folder(self):
|
||||
status, messages = self.connection.select(self.foldername)
|
||||
if status != "OK":
|
||||
print("Incorrect mail box " + status + str(messages))
|
||||
raise ConnectionError
|
||||
# print("(Re-)Selected mailbox: " + status + " " + str(messages))
|
||||
return messages
|
||||
|
||||
def __del__(self):
|
||||
try:
|
||||
self.connection.close()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
self.connection.logout()
|
||||
except Exception:
|
||||
print("Could not logout direct_imap conn")
|
||||
|
||||
|
||||
def make_direct_imap(account, folder):
|
||||
conn_info = (account.get_config("configured_mail_server"),
|
||||
account.get_config("addr"), account.get_config("mail_pw"))
|
||||
# try:
|
||||
# return ImapConn(folder, conn_info=conn_info)
|
||||
# except ConnectionError as e:
|
||||
# if folder == MVBOX:
|
||||
# account.log("Selecting " + MVBOX_FALLBBACK + " not " + MVBOX + " because connecting to the latter failed")
|
||||
# return ImapConn(MVBOX_FALLBBACK, conn_info=conn_info)
|
||||
# else:
|
||||
# raise e
|
||||
if folder == MVBOX:
|
||||
new_folder = account.get_config("configured_mvbox_folder")
|
||||
else:
|
||||
new_folder = folder
|
||||
if new_folder != folder:
|
||||
account.log("Making connection with " + new_folder + " not " + folder)
|
||||
return ImapConn(new_folder, conn_info=conn_info)
|
||||
|
||||
|
||||
def print_imap_structure(database, dir="."):
|
||||
print_imap_structure_ac(Account(database), dir)
|
||||
|
||||
|
||||
def print_imap_structure_ac(ac, dir="."):
|
||||
acinfo = ac.logid + "-" + ac.get_config("addr")
|
||||
print("================= ACCOUNT", acinfo, "=================")
|
||||
print("----------------- CONFIG: -----------------")
|
||||
print(ac.get_info())
|
||||
|
||||
for imapfolder in [INBOX, MVBOX, SENT, MVBOX_FALLBBACK]:
|
||||
try:
|
||||
imap = make_direct_imap(ac, imapfolder)
|
||||
c = imap.connection
|
||||
typ, data = c.search(None, 'ALL')
|
||||
c._get_tagged_response
|
||||
print("-----------------", imapfolder, "-----------------")
|
||||
for num in data[0].split():
|
||||
typ, data = c.fetch(num, '(RFC822)')
|
||||
body = data[0][1]
|
||||
|
||||
typ, data = c.fetch(num, '(UID FLAGS)')
|
||||
info = data[0]
|
||||
|
||||
path = pathlib.Path(dir).joinpath("IMAP-MESSAGES", acinfo, imapfolder)
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
file = path.joinpath(str(info).replace("b'", "").replace("'", "").replace("\\", ""))
|
||||
file.write_bytes(body)
|
||||
print("Message", info, "saved as", file)
|
||||
except Exception:
|
||||
pass
|
||||
@@ -12,7 +12,7 @@ import tempfile
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from . import Account, const
|
||||
from . import Account, const, direct_imap
|
||||
from .capi import lib
|
||||
from .events import FFIEventLogger, FFIEventTracker
|
||||
from _pytest._code import Source
|
||||
@@ -33,9 +33,6 @@ def pytest_addoption(parser):
|
||||
|
||||
|
||||
def pytest_configure(config):
|
||||
config.addinivalue_line(
|
||||
"markers", "ignored: Mark test as bing slow, skipped unless --ignored is used."
|
||||
)
|
||||
cfg = config.getoption('--liveconfig')
|
||||
if not cfg:
|
||||
cfg = os.getenv('DCC_NEW_TMP_EMAIL')
|
||||
@@ -228,7 +225,7 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig, data):
|
||||
acc.disable_logging()
|
||||
|
||||
def make_account(self, path, logid, quiet=False):
|
||||
ac = Account(path, logging=self._logging)
|
||||
ac = Account(path, logging=self._logging, logid=logid)
|
||||
ac._evtracker = ac.add_account_plugin(FFIEventTracker(ac))
|
||||
ac.addr = ac.get_self_contact().addr
|
||||
if not quiet:
|
||||
@@ -377,7 +374,10 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig, data):
|
||||
|
||||
am = AccountMaker()
|
||||
request.addfinalizer(am.finalize)
|
||||
return am
|
||||
yield am
|
||||
if request.node.rep_call.failed:
|
||||
for ac in am._accounts:
|
||||
direct_imap.print_imap_structure_ac(ac, tmpdir)
|
||||
|
||||
|
||||
class BotProcess:
|
||||
@@ -446,3 +446,15 @@ def lp():
|
||||
def step(self, msg):
|
||||
print("-" * 5, "step " + msg, "-" * 5)
|
||||
return Printer()
|
||||
|
||||
|
||||
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
|
||||
def pytest_runtest_makereport(item, call):
|
||||
# execute all other hooks to obtain the report object
|
||||
outcome = yield
|
||||
rep = outcome.get_result()
|
||||
|
||||
# set a report attribute for each phase of a call, which can
|
||||
# be "setup", "call", "teardown"
|
||||
|
||||
setattr(item, "rep_" + rep.when, rep)
|
||||
|
||||
@@ -7,6 +7,8 @@ from deltachat import const, Account
|
||||
from deltachat.message import Message
|
||||
from deltachat.hookspec import account_hookimpl
|
||||
from datetime import datetime, timedelta
|
||||
from deltachat import direct_imap
|
||||
from deltachat.direct_imap import make_direct_imap
|
||||
|
||||
|
||||
@pytest.mark.parametrize("msgtext,res", [
|
||||
@@ -635,6 +637,85 @@ class TestOnlineAccount:
|
||||
ev_msg = ac1_clone._evtracker.wait_next_messages_changed()
|
||||
assert ev_msg.text == msg_out.text
|
||||
|
||||
@pytest.mark.ignored
|
||||
@pytest.mark.parametrize('i', range(30))
|
||||
def test_mark_read_on_server(self, acfactory, lp, i):
|
||||
ac1 = acfactory.get_online_configuring_account()
|
||||
ac2 = acfactory.get_online_configuring_account(mvbox=True, move=True)
|
||||
|
||||
ac1.wait_configure_finish()
|
||||
ac1.start_io()
|
||||
ac2.wait_configure_finish()
|
||||
ac2.start_io()
|
||||
|
||||
imap2 = make_direct_imap(ac2, direct_imap.MVBOX)
|
||||
# imap2.mark_all_read()
|
||||
assert imap2.get_unread_cnt() == 0
|
||||
|
||||
chat = self.get_chat(ac1, ac2)
|
||||
chat_on_ac2 = self.get_chat(ac2, ac1)
|
||||
|
||||
chat.send_text("Text message")
|
||||
|
||||
incoming_on_ac2 = ac2._evtracker.wait_next_incoming_message()
|
||||
lp.sec("Incoming: "+incoming_on_ac2.text)
|
||||
|
||||
assert list(ac2.get_fresh_messages())
|
||||
|
||||
for i in range(0, 20):
|
||||
if imap2.get_unread_cnt() == 1:
|
||||
break
|
||||
time.sleep(1) # We might need to wait because Imaplib is slower than DC-Core
|
||||
assert imap2.get_unread_cnt() == 1
|
||||
|
||||
chat_on_ac2.mark_noticed()
|
||||
incoming_on_ac2.mark_seen()
|
||||
ac2._evtracker.wait_next_messages_changed()
|
||||
|
||||
assert not list(ac2.get_fresh_messages())
|
||||
|
||||
# The new messages should be seen now.
|
||||
for i in range(0, 20):
|
||||
if imap2.get_unread_cnt() == 0:
|
||||
break
|
||||
time.sleep(1) # We might need to wait because Imaplib is slower than DC-Core
|
||||
assert imap2.get_unread_cnt() == 0
|
||||
|
||||
@pytest.mark.ignored
|
||||
@pytest.mark.parametrize('i', range(30))
|
||||
def test_mark_bcc_read_on_server(self, acfactory, lp, i):
|
||||
ac1 = acfactory.get_online_configuring_account(mvbox=True, move=True)
|
||||
ac2 = acfactory.get_online_configuring_account()
|
||||
|
||||
ac1.wait_configure_finish()
|
||||
ac1.start_io()
|
||||
ac2.wait_configure_finish()
|
||||
ac2.start_io()
|
||||
|
||||
imap1 = make_direct_imap(ac1, direct_imap.MVBOX)
|
||||
imap1.mark_all_read()
|
||||
assert imap1.get_unread_cnt() == 0
|
||||
|
||||
chat = self.get_chat(ac1, ac2)
|
||||
|
||||
ac1.set_config("bcc_self", "1")
|
||||
chat.send_text("Text message")
|
||||
|
||||
ac1._evtracker.get_matching("DC_EVENT_SMTP_MESSAGE_SENT")
|
||||
|
||||
for i in range(0, 20):
|
||||
if imap1.get_new_email_cnt() == 1:
|
||||
break
|
||||
time.sleep(1) # We might need to wait because Imaplib is slower than DC-Core
|
||||
assert imap1.get_new_email_cnt() == 1
|
||||
|
||||
for i in range(0, 20):
|
||||
if imap1.get_unread_cnt() == 0:
|
||||
break
|
||||
time.sleep(1) # We might need to wait because Imaplib is slower than DC-Core
|
||||
|
||||
assert imap1.get_unread_cnt() == 0
|
||||
|
||||
def test_send_file_twice_unicode_filename_mangling(self, tmpdir, acfactory, lp):
|
||||
ac1, ac2 = acfactory.get_two_online_accounts()
|
||||
chat = self.get_chat(ac1, ac2)
|
||||
|
||||
@@ -71,6 +71,8 @@ norecursedirs = .tox
|
||||
xfail_strict=true
|
||||
timeout = 90
|
||||
timeout_method = thread
|
||||
markers =
|
||||
ignored: ignore this test in default test runs, use --ignored to run.
|
||||
|
||||
[flake8]
|
||||
max-line-length = 120
|
||||
|
||||
@@ -11,7 +11,7 @@ use crate::dc_tools::*;
|
||||
use crate::events::Event;
|
||||
use crate::message::MsgId;
|
||||
use crate::mimefactory::RECOMMENDED_FILE_SIZE;
|
||||
use crate::stock::StockMessage;
|
||||
use crate::{scheduler::InterruptInfo, stock::StockMessage};
|
||||
|
||||
/// The available configuration keys.
|
||||
#[derive(
|
||||
@@ -104,6 +104,9 @@ pub enum Config {
|
||||
ConfiguredServerFlags,
|
||||
ConfiguredSendSecurity,
|
||||
ConfiguredE2EEEnabled,
|
||||
ConfiguredInboxFolder,
|
||||
ConfiguredMvboxFolder,
|
||||
ConfiguredSentboxFolder,
|
||||
Configured,
|
||||
|
||||
#[strum(serialize = "sys.version")]
|
||||
@@ -137,6 +140,7 @@ impl Context {
|
||||
// Default values
|
||||
match key {
|
||||
Config::Selfstatus => Some(self.stock_str(StockMessage::StatusLine).await.into_owned()),
|
||||
Config::ConfiguredInboxFolder => Some("INBOX".to_owned()),
|
||||
_ => key.get_str("default").map(|s| s.to_string()),
|
||||
}
|
||||
}
|
||||
@@ -199,17 +203,18 @@ impl Context {
|
||||
}
|
||||
Config::InboxWatch => {
|
||||
let ret = self.sql.set_raw_config(self, key, value).await;
|
||||
self.interrupt_inbox(false).await;
|
||||
self.interrupt_inbox(InterruptInfo::new(false, None)).await;
|
||||
ret
|
||||
}
|
||||
Config::SentboxWatch => {
|
||||
let ret = self.sql.set_raw_config(self, key, value).await;
|
||||
self.interrupt_sentbox(false).await;
|
||||
self.interrupt_sentbox(InterruptInfo::new(false, None))
|
||||
.await;
|
||||
ret
|
||||
}
|
||||
Config::MvboxWatch => {
|
||||
let ret = self.sql.set_raw_config(self, key, value).await;
|
||||
self.interrupt_mvbox(false).await;
|
||||
self.interrupt_mvbox(InterruptInfo::new(false, None)).await;
|
||||
ret
|
||||
}
|
||||
Config::Selfstatus => {
|
||||
|
||||
@@ -272,9 +272,7 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> {
|
||||
let create_mvbox = ctx.get_config_bool(Config::MvboxWatch).await
|
||||
|| ctx.get_config_bool(Config::MvboxMove).await;
|
||||
|
||||
imap.configure_folders(ctx, create_mvbox)
|
||||
.await
|
||||
.context("configuring folders failed")?;
|
||||
imap.configure_folders(ctx, create_mvbox).await?;
|
||||
|
||||
imap.select_with_uidvalidity(ctx, "INBOX")
|
||||
.await
|
||||
|
||||
@@ -300,13 +300,11 @@ impl Context {
|
||||
.unwrap_or_default();
|
||||
|
||||
let configured_sentbox_folder = self
|
||||
.sql
|
||||
.get_raw_config(self, "configured_sentbox_folder")
|
||||
.get_config(Config::ConfiguredSentboxFolder)
|
||||
.await
|
||||
.unwrap_or_else(|| "<unset>".to_string());
|
||||
let configured_mvbox_folder = self
|
||||
.sql
|
||||
.get_raw_config(self, "configured_mvbox_folder")
|
||||
.get_config(Config::ConfiguredMvboxFolder)
|
||||
.await
|
||||
.unwrap_or_else(|| "<unset>".to_string());
|
||||
|
||||
@@ -442,33 +440,19 @@ impl Context {
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
pub fn is_inbox(&self, folder_name: impl AsRef<str>) -> bool {
|
||||
folder_name.as_ref() == "INBOX"
|
||||
pub async fn is_inbox(&self, folder_name: impl AsRef<str>) -> bool {
|
||||
self.get_config(Config::ConfiguredInboxFolder).await
|
||||
== Some(folder_name.as_ref().to_string())
|
||||
}
|
||||
|
||||
pub async fn is_sentbox(&self, folder_name: impl AsRef<str>) -> bool {
|
||||
let sentbox_name = self
|
||||
.sql
|
||||
.get_raw_config(self, "configured_sentbox_folder")
|
||||
.await;
|
||||
if let Some(name) = sentbox_name {
|
||||
name == folder_name.as_ref()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
self.get_config(Config::ConfiguredSentboxFolder).await
|
||||
== Some(folder_name.as_ref().to_string())
|
||||
}
|
||||
|
||||
pub async fn is_mvbox(&self, folder_name: impl AsRef<str>) -> bool {
|
||||
let mvbox_name = self
|
||||
.sql
|
||||
.get_raw_config(self, "configured_mvbox_folder")
|
||||
.await;
|
||||
|
||||
if let Some(name) = mvbox_name {
|
||||
name == folder_name.as_ref()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
self.get_config(Config::ConfiguredMvboxFolder).await
|
||||
== Some(folder_name.as_ref().to_string())
|
||||
}
|
||||
|
||||
pub async fn do_heuristics_moves(&self, folder: &str, msg_id: MsgId) {
|
||||
|
||||
@@ -4,7 +4,7 @@ use async_imap::extensions::idle::IdleResponse;
|
||||
use async_std::prelude::*;
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use crate::context::Context;
|
||||
use crate::{context::Context, scheduler::InterruptInfo};
|
||||
|
||||
use super::select_folder;
|
||||
use super::session::Session;
|
||||
@@ -34,7 +34,11 @@ impl Imap {
|
||||
self.config.can_idle
|
||||
}
|
||||
|
||||
pub async fn idle(&mut self, context: &Context, watch_folder: Option<String>) -> Result<bool> {
|
||||
pub async fn idle(
|
||||
&mut self,
|
||||
context: &Context,
|
||||
watch_folder: Option<String>,
|
||||
) -> Result<InterruptInfo> {
|
||||
use futures::future::FutureExt;
|
||||
|
||||
if !self.can_idle() {
|
||||
@@ -46,7 +50,7 @@ impl Imap {
|
||||
|
||||
let session = self.session.take();
|
||||
let timeout = Duration::from_secs(23 * 60);
|
||||
let mut probe_network = false;
|
||||
let mut info = Default::default();
|
||||
|
||||
if let Some(session) = session {
|
||||
let mut handle = session.idle();
|
||||
@@ -58,7 +62,7 @@ impl Imap {
|
||||
|
||||
enum Event {
|
||||
IdleResponse(IdleResponse),
|
||||
Interrupt(bool),
|
||||
Interrupt(InterruptInfo),
|
||||
}
|
||||
|
||||
if self.skip_next_idle_wait {
|
||||
@@ -90,8 +94,8 @@ impl Imap {
|
||||
Ok(Event::IdleResponse(IdleResponse::ManualInterrupt)) => {
|
||||
info!(context, "Idle wait was interrupted");
|
||||
}
|
||||
Ok(Event::Interrupt(probe)) => {
|
||||
probe_network = probe;
|
||||
Ok(Event::Interrupt(i)) => {
|
||||
info = i;
|
||||
info!(context, "Idle wait was interrupted");
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -125,14 +129,14 @@ impl Imap {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(probe_network)
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
pub(crate) async fn fake_idle(
|
||||
&mut self,
|
||||
context: &Context,
|
||||
watch_folder: Option<String>,
|
||||
) -> bool {
|
||||
) -> InterruptInfo {
|
||||
// Idle using polling. This is also needed if we're not yet configured -
|
||||
// in this case, we're waiting for a configure job (and an interrupt).
|
||||
|
||||
@@ -144,7 +148,7 @@ impl Imap {
|
||||
return self.idle_interrupt.recv().await.unwrap_or_default();
|
||||
}
|
||||
|
||||
let mut probe_network = false;
|
||||
let mut info: InterruptInfo = Default::default();
|
||||
if self.skip_next_idle_wait {
|
||||
// interrupt_idle has happened before we
|
||||
// provided self.interrupt
|
||||
@@ -157,10 +161,10 @@ impl Imap {
|
||||
|
||||
enum Event {
|
||||
Tick,
|
||||
Interrupt(bool),
|
||||
Interrupt(InterruptInfo),
|
||||
}
|
||||
// loop until we are interrupted or if we fetched something
|
||||
probe_network =
|
||||
info =
|
||||
loop {
|
||||
use futures::future::FutureExt;
|
||||
match interval
|
||||
@@ -181,7 +185,7 @@ impl Imap {
|
||||
}
|
||||
if self.config.can_idle {
|
||||
// we only fake-idled because network was gone during IDLE, probably
|
||||
break false;
|
||||
break InterruptInfo::new(false, None);
|
||||
}
|
||||
info!(context, "fake_idle is connected");
|
||||
// we are connected, let's see if fetching messages results
|
||||
@@ -194,7 +198,7 @@ impl Imap {
|
||||
Ok(res) => {
|
||||
info!(context, "fetch_new_messages returned {:?}", res);
|
||||
if res {
|
||||
break false;
|
||||
break InterruptInfo::new(false, None);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -204,9 +208,9 @@ impl Imap {
|
||||
}
|
||||
}
|
||||
}
|
||||
Event::Interrupt(probe_network) => {
|
||||
Event::Interrupt(info) => {
|
||||
// Interrupt
|
||||
break probe_network;
|
||||
break info;
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -222,6 +226,6 @@ impl Imap {
|
||||
/ 1000.,
|
||||
);
|
||||
|
||||
probe_network
|
||||
info
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ use crate::message::{self, update_server_uid};
|
||||
use crate::mimeparser;
|
||||
use crate::oauth2::dc_get_oauth2_access_token;
|
||||
use crate::param::Params;
|
||||
use crate::stock::StockMessage;
|
||||
use crate::{scheduler::InterruptInfo, stock::StockMessage};
|
||||
|
||||
mod client;
|
||||
mod idle;
|
||||
@@ -109,7 +109,7 @@ const SELECT_ALL: &str = "1:*";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Imap {
|
||||
idle_interrupt: Receiver<bool>,
|
||||
idle_interrupt: Receiver<InterruptInfo>,
|
||||
config: ImapConfig,
|
||||
session: Option<Session>,
|
||||
connected: bool,
|
||||
@@ -181,7 +181,7 @@ impl Default for ImapConfig {
|
||||
}
|
||||
|
||||
impl Imap {
|
||||
pub fn new(idle_interrupt: Receiver<bool>) -> Self {
|
||||
pub fn new(idle_interrupt: Receiver<InterruptInfo>) -> Self {
|
||||
Imap {
|
||||
idle_interrupt,
|
||||
config: Default::default(),
|
||||
@@ -974,7 +974,7 @@ impl Imap {
|
||||
uid: u32,
|
||||
) -> Option<ImapActionResult> {
|
||||
if uid == 0 {
|
||||
return Some(ImapActionResult::Failed);
|
||||
return Some(ImapActionResult::RetryLater);
|
||||
}
|
||||
if !self.is_connected() {
|
||||
// currently jobs are only performed on the INBOX thread
|
||||
@@ -1223,19 +1223,16 @@ impl Imap {
|
||||
}
|
||||
}
|
||||
context
|
||||
.sql
|
||||
.set_raw_config(context, "configured_inbox_folder", Some("INBOX"))
|
||||
.set_config(Config::ConfiguredInboxFolder, Some("INBOX"))
|
||||
.await?;
|
||||
if let Some(ref mvbox_folder) = mvbox_folder {
|
||||
context
|
||||
.sql
|
||||
.set_raw_config(context, "configured_mvbox_folder", Some(mvbox_folder))
|
||||
.set_config(Config::ConfiguredMvboxFolder, Some(mvbox_folder))
|
||||
.await?;
|
||||
}
|
||||
if let Some(ref sentbox_folder) = sentbox_folder {
|
||||
context
|
||||
.sql
|
||||
.set_raw_config(context, "configured_sentbox_folder", Some(sentbox_folder))
|
||||
.set_config(Config::ConfiguredSentboxFolder, Some(sentbox_folder))
|
||||
.await?;
|
||||
}
|
||||
context
|
||||
@@ -1393,7 +1390,11 @@ async fn precheck_imf(
|
||||
}
|
||||
|
||||
if old_server_folder != server_folder || old_server_uid != server_uid {
|
||||
update_server_uid(context, &rfc724_mid, server_folder, server_uid).await;
|
||||
update_server_uid(context, rfc724_mid, server_folder, server_uid).await;
|
||||
context
|
||||
.interrupt_inbox(InterruptInfo::new(false, Some(msg_id)))
|
||||
.await;
|
||||
info!(context, "Updating server_uid and interrupting")
|
||||
}
|
||||
Ok(true)
|
||||
} else {
|
||||
|
||||
83
src/job.rs
83
src/job.rs
@@ -31,7 +31,7 @@ use crate::message::{self, Message, MessageState};
|
||||
use crate::mimefactory::MimeFactory;
|
||||
use crate::param::*;
|
||||
use crate::smtp::Smtp;
|
||||
use crate::sql;
|
||||
use crate::{scheduler::InterruptInfo, sql};
|
||||
|
||||
// results in ~3 weeks for the last backoff timespan
|
||||
const JOB_RETRIES: u32 = 17;
|
||||
@@ -504,10 +504,7 @@ impl Job {
|
||||
warn!(context, "could not configure folders: {:?}", err);
|
||||
return Status::RetryLater;
|
||||
}
|
||||
let dest_folder = context
|
||||
.sql
|
||||
.get_raw_config(context, "configured_mvbox_folder")
|
||||
.await;
|
||||
let dest_folder = context.get_config(Config::ConfiguredMvboxFolder).await;
|
||||
|
||||
if let Some(dest_folder) = dest_folder {
|
||||
let server_folder = msg.server_folder.as_ref().unwrap();
|
||||
@@ -518,7 +515,7 @@ impl Job {
|
||||
{
|
||||
ImapActionResult::RetryLater => Status::RetryLater,
|
||||
ImapActionResult::Success => {
|
||||
// XXX Rust-Imap provides no target uid on mv, so just set it to 0
|
||||
// Rust-Imap provides no target uid on mv, so just set it to 0, update again when precheck_imf() is called for the moved message
|
||||
message::update_server_uid(context, &msg.rfc724_mid, &dest_folder, 0).await;
|
||||
Status::Finished(Ok(()))
|
||||
}
|
||||
@@ -612,11 +609,7 @@ impl Job {
|
||||
|
||||
async fn empty_server(&mut self, context: &Context, imap: &mut Imap) -> Status {
|
||||
if self.foreign_id & DC_EMPTY_MVBOX > 0 {
|
||||
if let Some(mvbox_folder) = context
|
||||
.sql
|
||||
.get_raw_config(context, "configured_mvbox_folder")
|
||||
.await
|
||||
{
|
||||
if let Some(mvbox_folder) = &context.get_config(Config::ConfiguredMvboxFolder).await {
|
||||
imap.empty_folder(context, &mvbox_folder).await;
|
||||
}
|
||||
}
|
||||
@@ -1029,14 +1022,18 @@ pub async fn add(context: &Context, job: Job) {
|
||||
| Action::MarkseenMsgOnImap
|
||||
| Action::MoveMsg => {
|
||||
info!(context, "interrupt: imap");
|
||||
context.interrupt_inbox(false).await;
|
||||
context
|
||||
.interrupt_inbox(InterruptInfo::new(false, None))
|
||||
.await;
|
||||
}
|
||||
Action::MaybeSendLocations
|
||||
| Action::MaybeSendLocationsEnded
|
||||
| Action::SendMdn
|
||||
| Action::SendMsgToSmtp => {
|
||||
info!(context, "interrupt: smtp");
|
||||
context.interrupt_smtp(false).await;
|
||||
context
|
||||
.interrupt_smtp(InterruptInfo::new(false, None))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1051,38 +1048,49 @@ pub async fn add(context: &Context, job: Job) {
|
||||
pub(crate) async fn load_next(
|
||||
context: &Context,
|
||||
thread: Thread,
|
||||
probe_network: bool,
|
||||
info: &InterruptInfo,
|
||||
) -> Option<Job> {
|
||||
info!(context, "loading job for {}-thread", thread);
|
||||
let query = if !probe_network {
|
||||
|
||||
let query;
|
||||
let params;
|
||||
let t = time();
|
||||
let m;
|
||||
let thread_i = thread as i64;
|
||||
|
||||
if let Some(msg_id) = info.msg_id {
|
||||
query = r#"
|
||||
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
|
||||
FROM jobs
|
||||
WHERE thread=? AND foreign_id=?
|
||||
ORDER BY action DESC, added_timestamp
|
||||
LIMIT 1;
|
||||
"#;
|
||||
m = msg_id;
|
||||
params = paramsv![thread_i, m];
|
||||
} else if !info.probe_network {
|
||||
// processing for first-try and after backoff-timeouts:
|
||||
// process jobs in the order they were added.
|
||||
r#"
|
||||
query = r#"
|
||||
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
|
||||
FROM jobs
|
||||
WHERE thread=? AND desired_timestamp<=?
|
||||
ORDER BY action DESC, added_timestamp
|
||||
LIMIT 1;
|
||||
"#
|
||||
"#;
|
||||
params = paramsv![thread_i, t];
|
||||
} else {
|
||||
// processing after call to dc_maybe_network():
|
||||
// process _all_ pending jobs that failed before
|
||||
// in the order of their backoff-times.
|
||||
r#"
|
||||
query = r#"
|
||||
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
|
||||
FROM jobs
|
||||
WHERE thread=? AND tries>0
|
||||
ORDER BY desired_timestamp, action DESC
|
||||
LIMIT 1;
|
||||
"#
|
||||
};
|
||||
|
||||
let thread_i = thread as i64;
|
||||
let t = time();
|
||||
let params = if !probe_network {
|
||||
paramsv![thread_i, t]
|
||||
} else {
|
||||
paramsv![thread_i]
|
||||
"#;
|
||||
params = paramsv![thread_i];
|
||||
};
|
||||
|
||||
let job = loop {
|
||||
@@ -1189,11 +1197,21 @@ mod tests {
|
||||
// all jobs.
|
||||
let t = dummy_context().await;
|
||||
insert_job(&t.ctx, -1).await; // This can not be loaded into Job struct.
|
||||
let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await;
|
||||
let jobs = load_next(
|
||||
&t.ctx,
|
||||
Thread::from(Action::MoveMsg),
|
||||
&InterruptInfo::new(false, None),
|
||||
)
|
||||
.await;
|
||||
assert!(jobs.is_none());
|
||||
|
||||
insert_job(&t.ctx, 1).await;
|
||||
let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await;
|
||||
let jobs = load_next(
|
||||
&t.ctx,
|
||||
Thread::from(Action::MoveMsg),
|
||||
&InterruptInfo::new(false, None),
|
||||
)
|
||||
.await;
|
||||
assert!(jobs.is_some());
|
||||
}
|
||||
|
||||
@@ -1203,7 +1221,12 @@ mod tests {
|
||||
|
||||
insert_job(&t.ctx, 1).await;
|
||||
|
||||
let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await;
|
||||
let jobs = load_next(
|
||||
&t.ctx,
|
||||
Thread::from(Action::MoveMsg),
|
||||
&InterruptInfo::new(false, None),
|
||||
)
|
||||
.await;
|
||||
assert!(jobs.is_some());
|
||||
}
|
||||
}
|
||||
|
||||
134
src/scheduler.rs
134
src/scheduler.rs
@@ -5,7 +5,7 @@ use async_std::task;
|
||||
use crate::context::Context;
|
||||
use crate::imap::Imap;
|
||||
use crate::job::{self, Thread};
|
||||
use crate::smtp::Smtp;
|
||||
use crate::{config::Config, message::MsgId, smtp::Smtp};
|
||||
|
||||
pub(crate) struct StopToken;
|
||||
|
||||
@@ -32,36 +32,20 @@ impl Context {
|
||||
self.scheduler.read().await.maybe_network().await;
|
||||
}
|
||||
|
||||
pub(crate) async fn interrupt_inbox(&self, probe_network: bool) {
|
||||
self.scheduler
|
||||
.read()
|
||||
.await
|
||||
.interrupt_inbox(probe_network)
|
||||
.await;
|
||||
pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) {
|
||||
self.scheduler.read().await.interrupt_inbox(info).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn interrupt_sentbox(&self, probe_network: bool) {
|
||||
self.scheduler
|
||||
.read()
|
||||
.await
|
||||
.interrupt_sentbox(probe_network)
|
||||
.await;
|
||||
pub(crate) async fn interrupt_sentbox(&self, info: InterruptInfo) {
|
||||
self.scheduler.read().await.interrupt_sentbox(info).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn interrupt_mvbox(&self, probe_network: bool) {
|
||||
self.scheduler
|
||||
.read()
|
||||
.await
|
||||
.interrupt_mvbox(probe_network)
|
||||
.await;
|
||||
pub(crate) async fn interrupt_mvbox(&self, info: InterruptInfo) {
|
||||
self.scheduler.read().await.interrupt_mvbox(info).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn interrupt_smtp(&self, probe_network: bool) {
|
||||
self.scheduler
|
||||
.read()
|
||||
.await
|
||||
.interrupt_smtp(probe_network)
|
||||
.await;
|
||||
pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) {
|
||||
self.scheduler.read().await.interrupt_smtp(info).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,14 +70,14 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
|
||||
started.send(()).await;
|
||||
|
||||
// track number of continously executed jobs
|
||||
let mut jobs_loaded = 0;
|
||||
let mut probe_network = false;
|
||||
let mut jobs_loaded: i32 = 0;
|
||||
let mut info: InterruptInfo = Default::default();
|
||||
loop {
|
||||
match job::load_next(&ctx, Thread::Imap, probe_network).await {
|
||||
match job::load_next(&ctx, Thread::Imap, &info).await {
|
||||
Some(job) if jobs_loaded <= 20 => {
|
||||
jobs_loaded += 1;
|
||||
job::perform_job(&ctx, job::Connection::Inbox(&mut connection), job).await;
|
||||
probe_network = false;
|
||||
info = Default::default();
|
||||
}
|
||||
Some(job) => {
|
||||
// Let the fetch run, but return back to the job afterwards.
|
||||
@@ -103,8 +87,7 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
|
||||
}
|
||||
None => {
|
||||
jobs_loaded = 0;
|
||||
probe_network =
|
||||
fetch_idle(&ctx, &mut connection, "configured_inbox_folder").await;
|
||||
info = fetch_idle(&ctx, &mut connection, Config::ConfiguredInboxFolder).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -121,7 +104,7 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
|
||||
}
|
||||
|
||||
async fn fetch(ctx: &Context, connection: &mut Imap) {
|
||||
match get_watch_folder(&ctx, "configured_inbox_folder").await {
|
||||
match ctx.get_config(Config::ConfiguredInboxFolder).await {
|
||||
Some(watch_folder) => {
|
||||
// fetch
|
||||
if let Err(err) = connection.fetch(&ctx, &watch_folder).await {
|
||||
@@ -136,8 +119,8 @@ async fn fetch(ctx: &Context, connection: &mut Imap) {
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: &str) -> bool {
|
||||
match get_watch_folder(&ctx, folder).await {
|
||||
async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> InterruptInfo {
|
||||
match ctx.get_config(folder).await {
|
||||
Some(watch_folder) => {
|
||||
// fetch
|
||||
if let Err(err) = connection.fetch(&ctx, &watch_folder).await {
|
||||
@@ -153,7 +136,7 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: &str) -> bool
|
||||
.unwrap_or_else(|err| {
|
||||
connection.trigger_reconnect();
|
||||
error!(ctx, "{}", err);
|
||||
false
|
||||
InterruptInfo::new(false, None)
|
||||
})
|
||||
} else {
|
||||
connection.fake_idle(&ctx, Some(watch_folder)).await
|
||||
@@ -170,7 +153,7 @@ async fn simple_imap_loop(
|
||||
ctx: Context,
|
||||
started: Sender<()>,
|
||||
inbox_handlers: ImapConnectionHandlers,
|
||||
folder: impl AsRef<str>,
|
||||
folder: Config,
|
||||
) {
|
||||
use futures::future::FutureExt;
|
||||
|
||||
@@ -193,7 +176,7 @@ async fn simple_imap_loop(
|
||||
started.send(()).await;
|
||||
|
||||
loop {
|
||||
fetch_idle(&ctx, &mut connection, folder.as_ref()).await;
|
||||
fetch_idle(&ctx, &mut connection, folder).await;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -223,18 +206,18 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
|
||||
started.send(()).await;
|
||||
let ctx = ctx1;
|
||||
|
||||
let mut probe_network = false;
|
||||
let mut interrupt_info = Default::default();
|
||||
loop {
|
||||
match job::load_next(&ctx, Thread::Smtp, probe_network).await {
|
||||
match job::load_next(&ctx, Thread::Smtp, &interrupt_info).await {
|
||||
Some(job) => {
|
||||
info!(ctx, "executing smtp job");
|
||||
job::perform_job(&ctx, job::Connection::Smtp(&mut connection), job).await;
|
||||
probe_network = false;
|
||||
interrupt_info = Default::default();
|
||||
}
|
||||
None => {
|
||||
// Fake Idle
|
||||
info!(ctx, "smtp fake idle - started");
|
||||
probe_network = idle_interrupt_receiver.recv().await.unwrap_or_default();
|
||||
interrupt_info = idle_interrupt_receiver.recv().await.unwrap_or_default();
|
||||
info!(ctx, "smtp fake idle - interrupted")
|
||||
}
|
||||
}
|
||||
@@ -286,7 +269,7 @@ impl Scheduler {
|
||||
ctx1,
|
||||
mvbox_start_send,
|
||||
mvbox_handlers,
|
||||
"configured_mvbox_folder",
|
||||
Config::ConfiguredMvboxFolder,
|
||||
)
|
||||
.await
|
||||
}));
|
||||
@@ -300,7 +283,7 @@ impl Scheduler {
|
||||
ctx1,
|
||||
sentbox_start_send,
|
||||
sentbox_handlers,
|
||||
"configured_sentbox_folder",
|
||||
Config::ConfiguredSentboxFolder,
|
||||
)
|
||||
.await
|
||||
}));
|
||||
@@ -333,34 +316,34 @@ impl Scheduler {
|
||||
return;
|
||||
}
|
||||
|
||||
self.interrupt_inbox(true)
|
||||
.join(self.interrupt_mvbox(true))
|
||||
.join(self.interrupt_sentbox(true))
|
||||
.join(self.interrupt_smtp(true))
|
||||
self.interrupt_inbox(InterruptInfo::new(true, None))
|
||||
.join(self.interrupt_mvbox(InterruptInfo::new(true, None)))
|
||||
.join(self.interrupt_sentbox(InterruptInfo::new(true, None)))
|
||||
.join(self.interrupt_smtp(InterruptInfo::new(true, None)))
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn interrupt_inbox(&self, probe_network: bool) {
|
||||
async fn interrupt_inbox(&self, info: InterruptInfo) {
|
||||
if let Scheduler::Running { ref inbox, .. } = self {
|
||||
inbox.interrupt(probe_network).await;
|
||||
inbox.interrupt(info).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn interrupt_mvbox(&self, probe_network: bool) {
|
||||
async fn interrupt_mvbox(&self, info: InterruptInfo) {
|
||||
if let Scheduler::Running { ref mvbox, .. } = self {
|
||||
mvbox.interrupt(probe_network).await;
|
||||
mvbox.interrupt(info).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn interrupt_sentbox(&self, probe_network: bool) {
|
||||
async fn interrupt_sentbox(&self, info: InterruptInfo) {
|
||||
if let Scheduler::Running { ref sentbox, .. } = self {
|
||||
sentbox.interrupt(probe_network).await;
|
||||
sentbox.interrupt(info).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn interrupt_smtp(&self, probe_network: bool) {
|
||||
async fn interrupt_smtp(&self, info: InterruptInfo) {
|
||||
if let Scheduler::Running { ref smtp, .. } = self {
|
||||
smtp.interrupt(probe_network).await;
|
||||
smtp.interrupt(info).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -429,7 +412,7 @@ struct ConnectionState {
|
||||
/// Channel to interrupt the whole connection.
|
||||
stop_sender: Sender<()>,
|
||||
/// Channel to interrupt idle.
|
||||
idle_interrupt_sender: Sender<bool>,
|
||||
idle_interrupt_sender: Sender<InterruptInfo>,
|
||||
}
|
||||
|
||||
impl ConnectionState {
|
||||
@@ -441,9 +424,9 @@ impl ConnectionState {
|
||||
self.shutdown_receiver.recv().await.ok();
|
||||
}
|
||||
|
||||
async fn interrupt(&self, probe_network: bool) {
|
||||
async fn interrupt(&self, info: InterruptInfo) {
|
||||
// Use try_send to avoid blocking on interrupts.
|
||||
self.idle_interrupt_sender.try_send(probe_network).ok();
|
||||
self.idle_interrupt_sender.try_send(info).ok();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -477,8 +460,8 @@ impl SmtpConnectionState {
|
||||
}
|
||||
|
||||
/// Interrupt any form of idle.
|
||||
async fn interrupt(&self, probe_network: bool) {
|
||||
self.state.interrupt(probe_network).await;
|
||||
async fn interrupt(&self, info: InterruptInfo) {
|
||||
self.state.interrupt(info).await;
|
||||
}
|
||||
|
||||
/// Shutdown this connection completely.
|
||||
@@ -492,7 +475,7 @@ struct SmtpConnectionHandlers {
|
||||
connection: Smtp,
|
||||
stop_receiver: Receiver<()>,
|
||||
shutdown_sender: Sender<()>,
|
||||
idle_interrupt_receiver: Receiver<bool>,
|
||||
idle_interrupt_receiver: Receiver<InterruptInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -525,8 +508,8 @@ impl ImapConnectionState {
|
||||
}
|
||||
|
||||
/// Interrupt any form of idle.
|
||||
async fn interrupt(&self, probe_network: bool) {
|
||||
self.state.interrupt(probe_network).await;
|
||||
async fn interrupt(&self, info: InterruptInfo) {
|
||||
self.state.interrupt(info).await;
|
||||
}
|
||||
|
||||
/// Shutdown this connection completely.
|
||||
@@ -542,20 +525,17 @@ struct ImapConnectionHandlers {
|
||||
shutdown_sender: Sender<()>,
|
||||
}
|
||||
|
||||
async fn get_watch_folder(context: &Context, config_name: impl AsRef<str>) -> Option<String> {
|
||||
match context
|
||||
.sql
|
||||
.get_raw_config(context, config_name.as_ref())
|
||||
.await
|
||||
{
|
||||
Some(name) => Some(name),
|
||||
None => {
|
||||
if config_name.as_ref() == "configured_inbox_folder" {
|
||||
// initialized with old version, so has not set configured_inbox_folder
|
||||
Some("INBOX".to_string())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
#[derive(Default, Debug)]
|
||||
pub struct InterruptInfo {
|
||||
pub probe_network: bool,
|
||||
pub msg_id: Option<MsgId>,
|
||||
}
|
||||
|
||||
impl InterruptInfo {
|
||||
pub fn new(probe_network: bool, msg_id: Option<MsgId>) -> Self {
|
||||
Self {
|
||||
probe_network,
|
||||
msg_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user