Peek reipients, fetch existing messages

Read all of an e-mail accounts messages and extract all To/CC addresses
if the From was from our own account.
Then, fetch existing messages from the server and show them.

Also, I fixed two other things:
- just by chance my test failed because of an completely unrelated bug.
The bug: bcc_self messages were not marked as read if mvbox_move was set
to true.
- add some color to the test output (minor change)
This commit is contained in:
Hocuri
2020-10-10 13:27:59 +02:00
committed by link2xt
parent cf5342c367
commit be88b946b6
14 changed files with 332 additions and 44 deletions

View File

@@ -121,9 +121,9 @@ pub enum Config {
#[strum(serialize = "sys.config_keys")]
SysConfigKeys,
#[strum(props(default = "0"))]
/// Whether we send a warning if the password is wrong (set to false when we send a warning
/// because we do not want to send a second warning)
#[strum(props(default = "0"))]
NotifyAboutWrongPw,
/// address to webrtc instance to use for videochats

View File

@@ -9,11 +9,10 @@ use anyhow::{bail, ensure, Context as _, Result};
use async_std::prelude::*;
use async_std::task;
use itertools::Itertools;
use job::Action;
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
use crate::config::Config;
use crate::constants::*;
use crate::context::Context;
use crate::dc_tools::*;
use crate::imap::Imap;
use crate::login_param::{LoginParam, ServerLoginParam};
@@ -23,6 +22,8 @@ use crate::provider::{Protocol, Socket, UsernamePattern};
use crate::smtp::Smtp;
use crate::stock::StockMessage;
use crate::{chat, e2ee, provider};
use crate::{constants::*, job};
use crate::{context::Context, param::Params};
use auto_mozilla::moz_autoconfigure;
use auto_outlook::outlk_autodiscover;
@@ -349,6 +350,12 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> {
e2ee::ensure_secret_key_exists(ctx).await?;
info!(ctx, "key generation completed");
job::add(
ctx,
job::Job::new(Action::FetchExistingMsgs, 0, Params::new(), 0),
)
.await;
progress!(ctx, 940);
Ok(())

View File

@@ -195,6 +195,9 @@ pub const DC_LP_AUTH_NORMAL: i32 = 0x4;
/// if none of these flags are set, the default is chosen
pub const DC_LP_AUTH_FLAGS: i32 = DC_LP_AUTH_OAUTH2 | DC_LP_AUTH_NORMAL;
/// How many existing messages shall be fetched after configuration.
pub const DC_FETCH_EXISTING_MSGS_COUNT: i64 = 100;
// max. width/height of an avatar
pub const AVATAR_SIZE: u32 = 192;

View File

@@ -43,6 +43,17 @@ pub async fn dc_receive_imf(
server_folder: impl AsRef<str>,
server_uid: u32,
seen: bool,
) -> Result<()> {
dc_receive_imf_inner(context, imf_raw, server_folder, server_uid, seen, false).await
}
pub(crate) async fn dc_receive_imf_inner(
context: &Context,
imf_raw: &[u8],
server_folder: impl AsRef<str>,
server_uid: u32,
seen: bool,
fetching_existing_messages: bool,
) -> Result<()> {
info!(
context,
@@ -169,6 +180,7 @@ pub async fn dc_receive_imf(
&mut insert_msg_id,
&mut created_db_entries,
&mut create_event_to_send,
fetching_existing_messages,
)
.await
{
@@ -335,6 +347,7 @@ async fn add_parts(
insert_msg_id: &mut MsgId,
created_db_entries: &mut Vec<(ChatId, MsgId)>,
create_event_to_send: &mut Option<CreateEvent>,
fetching_existing_messages: bool,
) -> Result<()> {
let mut state: MessageState;
let mut chat_id_blocked = Blocked::Not;
@@ -389,7 +402,7 @@ async fn add_parts(
let to_id: u32;
if incoming {
state = if seen {
state = if seen || fetching_existing_messages {
MessageState::InSeen
} else {
MessageState::InFresh
@@ -532,6 +545,10 @@ async fn add_parts(
&& show_emails != ShowEmails::All
{
state = MessageState::InNoticed;
} else if fetching_existing_messages && Blocked::Deaddrop == chat_id_blocked {
// The fetched existing message should be shown in the chatlist-contact-request because
// a new user won't find the contact request in the menu
state = MessageState::InFresh;
}
} else {
// Outgoing
@@ -628,6 +645,12 @@ async fn add_parts(
}
}
if fetching_existing_messages && mime_parser.decrypting_failed {
*chat_id = ChatId::new(DC_CHAT_ID_TRASH);
// We are only gathering old messages on first start. We do not want to add loads of non-decryptable messages to the chats.
info!(context, "Dropping existing non-decipherable message.");
}
// Extract ephemeral timer from the message.
let mut ephemeral_timer = if let Some(value) = mime_parser.get(HeaderDef::EphemeralTimer) {
match value.parse::<EphemeralTimer>() {

View File

@@ -160,7 +160,7 @@ impl Imap {
// will not find any new.
if let Some(ref watch_folder) = watch_folder {
match self.fetch_new_messages(context, watch_folder).await {
match self.fetch_new_messages(context, watch_folder, false).await {
Ok(res) => {
info!(context, "fetch_new_messages returned {:?}", res);
if res {

View File

@@ -3,8 +3,9 @@
//! uses [async-email/async-imap](https://github.com/async-email/async-imap)
//! to implement connect, fetch, delete functionality with standard IMAP servers.
use std::collections::BTreeMap;
use std::{cmp, collections::BTreeMap};
use anyhow::Context as _;
use async_imap::{
error::Result as ImapResult,
types::{Capability, Fetch, Flag, Mailbox, Name, NameAttribute},
@@ -13,12 +14,9 @@ use async_std::prelude::*;
use async_std::sync::Receiver;
use num_traits::FromPrimitive;
use crate::config::*;
use crate::constants::*;
use crate::context::Context;
use crate::dc_receive_imf::{
dc_receive_imf, from_field_to_contact_id, is_msgrmsg_rfc724_mid_in_list,
};
use crate::dc_receive_imf::{from_field_to_contact_id, is_msgrmsg_rfc724_mid_in_list};
use crate::error::{bail, format_err, Result};
use crate::events::EventType;
use crate::headerdef::{HeaderDef, HeaderDefMap};
@@ -32,6 +30,7 @@ use crate::provider::{get_provider_info, Socket};
use crate::{
chat, dc_tools::dc_extract_grpid_from_rfc724_mid, scheduler::InterruptInfo, stock::StockMessage,
};
use crate::{config::*, dc_receive_imf::dc_receive_imf_inner};
mod client;
mod idle;
@@ -40,6 +39,7 @@ mod session;
use chat::get_chat_id_by_grpid;
use client::Client;
use mailparse::SingleInfo;
use message::Message;
use session::Session;
@@ -448,7 +448,10 @@ impl Imap {
}
self.setup_handle(context).await?;
while self.fetch_new_messages(context, &watch_folder).await? {
while self
.fetch_new_messages(context, &watch_folder, false)
.await?
{
// We fetch until no more new messages are there.
}
Ok(())
@@ -643,10 +646,11 @@ impl Imap {
Ok((new_uid_validity, new_last_seen_uid))
}
async fn fetch_new_messages<S: AsRef<str>>(
pub(crate) async fn fetch_new_messages<S: AsRef<str>>(
&mut self,
context: &Context,
folder: S,
fetch_existing_msgs: bool,
) -> Result<bool> {
let show_emails = ShowEmails::from_i32(context.get_config_int(Config::ShowEmails).await)
.unwrap_or_default();
@@ -655,7 +659,11 @@ impl Imap {
.select_with_uidvalidity(context, folder.as_ref())
.await?;
let msgs = self.fetch_after(context, last_seen_uid).await?;
let msgs = if fetch_existing_msgs {
self.fetch_existing_msgs_prefetch().await?
} else {
self.fetch_after(context, last_seen_uid).await?
};
let read_cnt = msgs.len();
let folder: &str = folder.as_ref();
@@ -695,8 +703,9 @@ impl Imap {
}
// check passed, go fetch the emails
let (new_last_seen_uid_processed, error_cnt) =
self.fetch_many_msgs(context, &folder, &uids).await;
let (new_last_seen_uid_processed, error_cnt) = self
.fetch_many_msgs(context, &folder, &uids, fetch_existing_msgs)
.await;
read_errors += error_cnt;
// determine which last_seen_uid to use to update to
@@ -721,17 +730,66 @@ impl Imap {
Ok(read_cnt > 0)
}
/// Gets the from, to and bcc addresses from all existing outgoing emails.
pub async fn get_all_recipients(&mut self, context: &Context) -> Result<Vec<SingleInfo>> {
if self.session.is_none() {
bail!("IMAP No Connection established");
}
let session = self.session.as_mut().unwrap();
let self_addr = context
.get_config(Config::ConfiguredAddr)
.await
.ok_or_else(|| format_err!("Not configured"))?;
let search_command = format!("FROM \"{}\"", self_addr);
let uids = session.uid_search(search_command).await?;
let uid_strings: Vec<String> = uids.into_iter().map(|s| s.to_string()).collect();
let mut result = Vec::new();
// We fetch the emails in chunks of 100 because according to https://tools.ietf.org/html/rfc2683#section-3.2.1.5
// command lines should not be much more than 1000 chars and UIDs can get up to 9- or 10-digit
// (servers should allow at least 8000 chars)
for uid_chunk in uid_strings.chunks(100) {
let uid_set = uid_chunk.join(",");
let mut list = session
.uid_fetch(uid_set, "(UID BODY.PEEK[HEADER.FIELDS (FROM TO CC BCC)])")
.await
.map_err(|err| {
format_err!("IMAP Could not fetch (get_all_recipients()): {}", err)
})?;
while let Some(fetch) = list.next().await {
let msg = fetch?;
match get_fetch_headers(&msg) {
Ok(headers) => {
let (from_id, _, _) =
from_field_to_contact_id(context, &mimeparser::get_from(&headers))
.await?;
if from_id == DC_CONTACT_ID_SELF {
result.extend(mimeparser::get_recipients(&headers));
}
}
Err(err) => {
warn!(context, "{}", err);
continue;
}
};
}
}
Ok(result)
}
/// Fetch all uids larger than the passed in. Returns a sorted list of fetch results.
async fn fetch_after(
&mut self,
context: &Context,
uid: u32,
) -> Result<BTreeMap<u32, async_imap::types::Fetch>> {
if self.session.is_none() {
bail!("IMAP No Connection established");
}
let session = self.session.as_mut().unwrap();
let session = self.session.as_mut();
let session = session.context("fetch_after(): IMAP No Connection established")?;
// fetch messages with larger UID than the last one seen
// `(UID FETCH lastseenuid+1:*)`, see RFC 4549
@@ -769,6 +827,40 @@ impl Imap {
Ok(new_msgs)
}
/// Like fetch_after(), but not for new messages but existing ones (the DC_FETCH_EXISTING_MSGS_COUNT newest messages)
async fn fetch_existing_msgs_prefetch(
&mut self,
) -> Result<BTreeMap<u32, async_imap::types::Fetch>> {
let exists: i64 = {
let mailbox = self.config.selected_mailbox.as_ref();
let mailbox = mailbox.context("fetch_existing_msgs_prefetch(): no mailbox selected")?;
mailbox.exists.into()
};
let session = self.session.as_mut();
let session =
session.context("fetch_existing_msgs_prefetch(): IMAP No Connection established")?;
// Fetch last DC_FETCH_EXISTING_MSGS_COUNT (100) messages.
// Sequence numbers are sequential. If there are 1000 messages in the inbox,
// we can fetch the sequence numbers 900-1000 and get the last 100 messages.
let first = cmp::max(1, exists - DC_FETCH_EXISTING_MSGS_COUNT);
let set = format!("{}:*", first);
let mut list = session
.fetch(&set, PREFETCH_FLAGS)
.await
.map_err(|err| format_err!("IMAP Could not fetch: {}", err))?;
let mut msgs = BTreeMap::new();
while let Some(fetch) = list.next().await {
let msg = fetch?;
if let Some(msg_uid) = msg.uid {
msgs.insert(msg_uid, msg);
}
}
Ok(msgs)
}
async fn set_config_last_seen_uid<S: AsRef<str>>(
&self,
context: &Context,
@@ -795,6 +887,7 @@ impl Imap {
context: &Context,
folder: S,
server_uids: &[u32],
fetching_existing_messages: bool,
) -> (Option<u32>, usize) {
let set = match server_uids {
[] => return (None, 0),
@@ -868,7 +961,16 @@ impl Imap {
let body = msg.body().unwrap();
let is_seen = msg.flags().any(|flag| flag == Flag::Seen);
match dc_receive_imf(&context, &body, &folder, server_uid, is_seen).await {
match dc_receive_imf_inner(
&context,
&body,
&folder,
server_uid,
is_seen,
fetching_existing_messages,
)
.await
{
Ok(_) => last_uid = Some(server_uid),
Err(err) => {
warn!(context, "dc_receive_imf error: {}", err);
@@ -1457,17 +1559,16 @@ async fn precheck_imf(
if old_server_folder != server_folder || old_server_uid != server_uid {
update_server_uid(context, rfc724_mid, server_folder, server_uid).await;
if let Ok(MessageState::InSeen) = msg_id.get_state(context).await {
job::add(
context,
job::Job::new(Action::MarkseenMsgOnImap, msg_id.to_u32(), Params::new(), 0),
)
.await;
};
context
.interrupt_inbox(InterruptInfo::new(false, Some(msg_id)))
.await;
info!(context, "Updating server_uid and interrupting")
if let Ok(message_state) = msg_id.get_state(context).await {
if message_state == MessageState::InSeen || message_state.is_outgoing() {
job::add(
context,
job::Job::new(Action::MarkseenMsgOnImap, msg_id.to_u32(), Params::new(), 0),
)
.await;
}
}
info!(context, "Updating server_uid and adding markseen job");
}
Ok(true)
} else {

View File

@@ -14,7 +14,6 @@ use async_smtp::smtp::response::Category;
use async_smtp::smtp::response::Code;
use async_smtp::smtp::response::Detail;
use crate::blob::BlobObject;
use crate::chat::{self, ChatId};
use crate::config::Config;
use crate::contact::Contact;
@@ -30,6 +29,7 @@ use crate::message::{self, Message, MessageState};
use crate::mimefactory::MimeFactory;
use crate::param::*;
use crate::smtp::Smtp;
use crate::{blob::BlobObject, contact::normalize_name, contact::Modifier, contact::Origin};
use crate::{scheduler::InterruptInfo, sql};
// results in ~3 weeks for the last backoff timespan
@@ -92,6 +92,7 @@ pub enum Action {
// Jobs in the INBOX-thread, range from DC_IMAP_THREAD..DC_IMAP_THREAD+999
Housekeeping = 105, // low priority ...
FetchExistingMsgs = 110,
MarkseenMsgOnImap = 130,
// Moving message is prioritized lower than deletion so we don't
@@ -124,6 +125,7 @@ impl From<Action> for Thread {
Unknown => Thread::Unknown,
Housekeeping => Thread::Imap,
FetchExistingMsgs => Thread::Imap,
DeleteMsgOnImap => Thread::Imap,
ResyncFolders => Thread::Imap,
MarkseenMsgOnImap => Thread::Imap,
@@ -619,6 +621,38 @@ impl Job {
}
}
/// Read the recipients from old emails sent by the user and add them as contacts.
/// This way, we can already offer them some email addresses they can write to.
///
/// Then, Fetch the last messages DC_FETCH_EXISTING_MSGS_COUNT emails from the server
/// and show them in the chat list.
async fn fetch_existing_msgs(&mut self, context: &Context, imap: &mut Imap) -> Status {
if let Err(err) = imap.connect_configured(context).await {
warn!(context, "could not connect: {:?}", err);
return Status::RetryLater;
}
add_all_recipients_as_contacts(context, imap, Config::ConfiguredSentboxFolder).await;
add_all_recipients_as_contacts(context, imap, Config::ConfiguredMvboxFolder).await;
add_all_recipients_as_contacts(context, imap, Config::ConfiguredInboxFolder).await;
for config in &[
Config::ConfiguredMvboxFolder,
Config::ConfiguredInboxFolder,
Config::ConfiguredSentboxFolder,
] {
if let Some(folder) = context.get_config(*config).await {
if let Err(e) = imap.fetch_new_messages(context, folder, true).await {
// We are using Anyhow's .context() and to show the inner error, too, we need the {:#}:
warn!(context, "Could not fetch messages, retrying: {:#}", e);
return Status::RetryLater;
};
}
}
info!(context, "Done fetching existing messages.");
Status::Finished(Ok(()))
}
/// Synchronizes UIDs for sentbox, inbox and mvbox, in this order.
///
/// If a copy of the message is present in multiple folders, mvbox
@@ -759,6 +793,50 @@ async fn set_delivered(context: &Context, msg_id: MsgId) {
context.emit_event(EventType::MsgDelivered { chat_id, msg_id });
}
async fn add_all_recipients_as_contacts(context: &Context, imap: &mut Imap, folder: Config) {
let mailbox = if let Some(m) = context.get_config(folder).await {
m
} else {
return;
};
if let Err(e) = imap.select_with_uidvalidity(context, &mailbox).await {
warn!(context, "Could not select {}: {}", mailbox, e);
return;
}
match imap.get_all_recipients(context).await {
Ok(contacts) => {
let mut any_modified = false;
for contact in contacts {
let display_name_normalized = contact
.display_name
.as_ref()
.map(normalize_name)
.unwrap_or_default();
match Contact::add_or_lookup(
context,
display_name_normalized,
contact.addr,
Origin::OutgoingTo,
)
.await
{
Ok((_, modified)) => {
if modified != Modifier::None {
any_modified = true;
}
}
Err(e) => warn!(context, "Could not add recipient: {}", e),
}
}
if any_modified {
context.emit_event(EventType::ContactsChanged(None));
}
}
Err(e) => warn!(context, "Could not add recipients: {}", e),
};
}
/// Constructs a job for sending a message.
///
/// Returns `None` if no messages need to be sent out.
@@ -1007,6 +1085,7 @@ async fn perform_job_action(
Action::ResyncFolders => job.resync_folders(context, connection.inbox()).await,
Action::MarkseenMsgOnImap => job.markseen_msg_on_imap(context, connection.inbox()).await,
Action::MoveMsg => job.move_msg(context, connection.inbox()).await,
Action::FetchExistingMsgs => job.fetch_existing_msgs(context, connection.inbox()).await,
Action::Housekeeping => {
sql::housekeeping(context).await;
Status::Finished(Ok(()))
@@ -1072,6 +1151,7 @@ pub async fn add(context: &Context, job: Job) {
| Action::DeleteMsgOnImap
| Action::ResyncFolders
| Action::MarkseenMsgOnImap
| Action::FetchExistingMsgs
| Action::MoveMsg => {
info!(context, "interrupt: imap");
context

View File

@@ -920,12 +920,17 @@ impl From<MessageState> for LotState {
impl MessageState {
pub fn can_fail(self) -> bool {
use MessageState::*;
matches!(
self,
MessageState::OutPreparing
| MessageState::OutPending
| MessageState::OutDelivered
| MessageState::OutMdnRcvd // OutMdnRcvd can still fail because it could be a group message and only some recipients failed.
OutPreparing | OutPending | OutDelivered | OutMdnRcvd // OutMdnRcvd can still fail because it could be a group message and only some recipients failed.
)
}
pub fn is_outgoing(self) -> bool {
use MessageState::*;
matches!(
self,
OutPreparing | OutDraft | OutPending | OutFailed | OutDelivered | OutMdnRcvd
)
}
}

View File

@@ -1286,9 +1286,9 @@ fn get_attachment_filename(mail: &mailparse::ParsedMail) -> Result<Option<String
}
/// Returned addresses are normalized and lowercased.
fn get_recipients(headers: &[MailHeader]) -> Vec<SingleInfo> {
pub(crate) fn get_recipients(headers: &[MailHeader]) -> Vec<SingleInfo> {
get_all_addresses_from_header(headers, |header_key| {
header_key == "to" || header_key == "cc"
header_key == "to" || header_key == "cc" || header_key == "bcc"
})
}