mirror of
https://github.com/chatmail/core.git
synced 2026-05-02 04:46:29 +03:00
first pass at async job
This commit is contained in:
371
src/imap/idle.rs
371
src/imap/idle.rs
@@ -65,235 +65,228 @@ impl Imap {
|
||||
task::block_on(async move { self.config.read().await.can_idle })
|
||||
}
|
||||
|
||||
pub fn idle(&self, context: &Context, watch_folder: Option<String>) -> Result<()> {
|
||||
task::block_on(async move {
|
||||
if !self.can_idle() {
|
||||
return Err(Error::IdleAbilityMissing);
|
||||
}
|
||||
pub async fn idle(&self, context: &Context, watch_folder: Option<String>) -> Result<()> {
|
||||
if !self.can_idle() {
|
||||
return Err(Error::IdleAbilityMissing);
|
||||
}
|
||||
|
||||
self.setup_handle_if_needed(context)
|
||||
.await
|
||||
.map_err(Error::SetupHandleError)?;
|
||||
self.setup_handle_if_needed(context)
|
||||
.await
|
||||
.map_err(Error::SetupHandleError)?;
|
||||
|
||||
self.select_folder(context, watch_folder.clone()).await?;
|
||||
self.select_folder(context, watch_folder.clone()).await?;
|
||||
|
||||
let session = self.session.lock().await.take();
|
||||
let timeout = Duration::from_secs(23 * 60);
|
||||
if let Some(session) = session {
|
||||
match session.idle() {
|
||||
// BEWARE: If you change the Secure branch you
|
||||
// typically also need to change the Insecure branch.
|
||||
IdleHandle::Secure(mut handle) => {
|
||||
if let Err(err) = handle.init().await {
|
||||
return Err(Error::IdleProtocolFailed(err));
|
||||
}
|
||||
let session = self.session.lock().await.take();
|
||||
let timeout = Duration::from_secs(23 * 60);
|
||||
if let Some(session) = session {
|
||||
match session.idle() {
|
||||
// BEWARE: If you change the Secure branch you
|
||||
// typically also need to change the Insecure branch.
|
||||
IdleHandle::Secure(mut handle) => {
|
||||
if let Err(err) = handle.init().await {
|
||||
return Err(Error::IdleProtocolFailed(err));
|
||||
}
|
||||
|
||||
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
|
||||
*self.interrupt.lock().await = Some(interrupt);
|
||||
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
|
||||
*self.interrupt.lock().await = Some(interrupt);
|
||||
|
||||
if self.skip_next_idle_wait.load(Ordering::SeqCst) {
|
||||
// interrupt_idle has happened before we
|
||||
// provided self.interrupt
|
||||
self.skip_next_idle_wait.store(false, Ordering::SeqCst);
|
||||
std::mem::drop(idle_wait);
|
||||
info!(context, "Idle wait was skipped");
|
||||
} else {
|
||||
info!(context, "Idle entering wait-on-remote state");
|
||||
match idle_wait.await {
|
||||
Ok(IdleResponse::NewData(_)) => {
|
||||
info!(context, "Idle has NewData");
|
||||
}
|
||||
// TODO: idle_wait does not distinguish manual interrupts
|
||||
// from Timeouts if we would know it's a Timeout we could bail
|
||||
// directly and reconnect .
|
||||
Ok(IdleResponse::Timeout) => {
|
||||
info!(context, "Idle-wait timeout or interruption");
|
||||
}
|
||||
Ok(IdleResponse::ManualInterrupt) => {
|
||||
info!(context, "Idle wait was interrupted");
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(context, "Idle wait errored: {:?}", err);
|
||||
}
|
||||
if self.skip_next_idle_wait.load(Ordering::SeqCst) {
|
||||
// interrupt_idle has happened before we
|
||||
// provided self.interrupt
|
||||
self.skip_next_idle_wait.store(false, Ordering::SeqCst);
|
||||
std::mem::drop(idle_wait);
|
||||
info!(context, "Idle wait was skipped");
|
||||
} else {
|
||||
info!(context, "Idle entering wait-on-remote state");
|
||||
match idle_wait.await {
|
||||
Ok(IdleResponse::NewData(_)) => {
|
||||
info!(context, "Idle has NewData");
|
||||
}
|
||||
}
|
||||
// if we can't properly terminate the idle
|
||||
// protocol let's break the connection.
|
||||
let res =
|
||||
async_std::future::timeout(Duration::from_secs(15), handle.done())
|
||||
.await
|
||||
.map_err(|err| {
|
||||
self.trigger_reconnect();
|
||||
Error::IdleTimeout(err)
|
||||
})?;
|
||||
|
||||
match res {
|
||||
Ok(session) => {
|
||||
*self.session.lock().await = Some(Session::Secure(session));
|
||||
// TODO: idle_wait does not distinguish manual interrupts
|
||||
// from Timeouts if we would know it's a Timeout we could bail
|
||||
// directly and reconnect .
|
||||
Ok(IdleResponse::Timeout) => {
|
||||
info!(context, "Idle-wait timeout or interruption");
|
||||
}
|
||||
Ok(IdleResponse::ManualInterrupt) => {
|
||||
info!(context, "Idle wait was interrupted");
|
||||
}
|
||||
Err(err) => {
|
||||
// if we cannot terminate IDLE it probably
|
||||
// means that we waited long (with idle_wait)
|
||||
// but the network went away/changed
|
||||
self.trigger_reconnect();
|
||||
return Err(Error::IdleProtocolFailed(err));
|
||||
warn!(context, "Idle wait errored: {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
IdleHandle::Insecure(mut handle) => {
|
||||
if let Err(err) = handle.init().await {
|
||||
// if we can't properly terminate the idle
|
||||
// protocol let's break the connection.
|
||||
let res = async_std::future::timeout(Duration::from_secs(15), handle.done())
|
||||
.await
|
||||
.map_err(|err| {
|
||||
self.trigger_reconnect();
|
||||
Error::IdleTimeout(err)
|
||||
})?;
|
||||
|
||||
match res {
|
||||
Ok(session) => {
|
||||
*self.session.lock().await = Some(Session::Secure(session));
|
||||
}
|
||||
Err(err) => {
|
||||
// if we cannot terminate IDLE it probably
|
||||
// means that we waited long (with idle_wait)
|
||||
// but the network went away/changed
|
||||
self.trigger_reconnect();
|
||||
return Err(Error::IdleProtocolFailed(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
IdleHandle::Insecure(mut handle) => {
|
||||
if let Err(err) = handle.init().await {
|
||||
return Err(Error::IdleProtocolFailed(err));
|
||||
}
|
||||
|
||||
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
|
||||
*self.interrupt.lock().await = Some(interrupt);
|
||||
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
|
||||
*self.interrupt.lock().await = Some(interrupt);
|
||||
|
||||
if self.skip_next_idle_wait.load(Ordering::SeqCst) {
|
||||
// interrupt_idle has happened before we
|
||||
// provided self.interrupt
|
||||
self.skip_next_idle_wait.store(false, Ordering::SeqCst);
|
||||
std::mem::drop(idle_wait);
|
||||
info!(context, "Idle wait was skipped");
|
||||
} else {
|
||||
info!(context, "Idle entering wait-on-remote state");
|
||||
match idle_wait.await {
|
||||
Ok(IdleResponse::NewData(_)) => {
|
||||
info!(context, "Idle has NewData");
|
||||
}
|
||||
// TODO: idle_wait does not distinguish manual interrupts
|
||||
// from Timeouts if we would know it's a Timeout we could bail
|
||||
// directly and reconnect .
|
||||
Ok(IdleResponse::Timeout) => {
|
||||
info!(context, "Idle-wait timeout or interruption");
|
||||
}
|
||||
Ok(IdleResponse::ManualInterrupt) => {
|
||||
info!(context, "Idle wait was interrupted");
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(context, "Idle wait errored: {:?}", err);
|
||||
}
|
||||
if self.skip_next_idle_wait.load(Ordering::SeqCst) {
|
||||
// interrupt_idle has happened before we
|
||||
// provided self.interrupt
|
||||
self.skip_next_idle_wait.store(false, Ordering::SeqCst);
|
||||
std::mem::drop(idle_wait);
|
||||
info!(context, "Idle wait was skipped");
|
||||
} else {
|
||||
info!(context, "Idle entering wait-on-remote state");
|
||||
match idle_wait.await {
|
||||
Ok(IdleResponse::NewData(_)) => {
|
||||
info!(context, "Idle has NewData");
|
||||
}
|
||||
}
|
||||
// if we can't properly terminate the idle
|
||||
// protocol let's break the connection.
|
||||
let res =
|
||||
async_std::future::timeout(Duration::from_secs(15), handle.done())
|
||||
.await
|
||||
.map_err(|err| {
|
||||
self.trigger_reconnect();
|
||||
Error::IdleTimeout(err)
|
||||
})?;
|
||||
|
||||
match res {
|
||||
Ok(session) => {
|
||||
*self.session.lock().await = Some(Session::Insecure(session));
|
||||
// TODO: idle_wait does not distinguish manual interrupts
|
||||
// from Timeouts if we would know it's a Timeout we could bail
|
||||
// directly and reconnect .
|
||||
Ok(IdleResponse::Timeout) => {
|
||||
info!(context, "Idle-wait timeout or interruption");
|
||||
}
|
||||
Ok(IdleResponse::ManualInterrupt) => {
|
||||
info!(context, "Idle wait was interrupted");
|
||||
}
|
||||
Err(err) => {
|
||||
// if we cannot terminate IDLE it probably
|
||||
// means that we waited long (with idle_wait)
|
||||
// but the network went away/changed
|
||||
self.trigger_reconnect();
|
||||
return Err(Error::IdleProtocolFailed(err));
|
||||
warn!(context, "Idle wait errored: {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
// if we can't properly terminate the idle
|
||||
// protocol let's break the connection.
|
||||
let res = async_std::future::timeout(Duration::from_secs(15), handle.done())
|
||||
.await
|
||||
.map_err(|err| {
|
||||
self.trigger_reconnect();
|
||||
Error::IdleTimeout(err)
|
||||
})?;
|
||||
|
||||
match res {
|
||||
Ok(session) => {
|
||||
*self.session.lock().await = Some(Session::Insecure(session));
|
||||
}
|
||||
Err(err) => {
|
||||
// if we cannot terminate IDLE it probably
|
||||
// means that we waited long (with idle_wait)
|
||||
// but the network went away/changed
|
||||
self.trigger_reconnect();
|
||||
return Err(Error::IdleProtocolFailed(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn fake_idle(&self, context: &Context, watch_folder: Option<String>) {
|
||||
pub(crate) async fn fake_idle(&self, context: &Context, watch_folder: Option<String>) {
|
||||
// Idle using polling. This is also needed if we're not yet configured -
|
||||
// in this case, we're waiting for a configure job (and an interrupt).
|
||||
task::block_on(async move {
|
||||
let fake_idle_start_time = SystemTime::now();
|
||||
|
||||
info!(context, "IMAP-fake-IDLEing...");
|
||||
let fake_idle_start_time = SystemTime::now();
|
||||
|
||||
let interrupt = stop_token::StopSource::new();
|
||||
info!(context, "IMAP-fake-IDLEing...");
|
||||
|
||||
// check every minute if there are new messages
|
||||
// TODO: grow sleep durations / make them more flexible
|
||||
let interval = async_std::stream::interval(Duration::from_secs(60));
|
||||
let mut interrupt_interval = interrupt.stop_token().stop_stream(interval);
|
||||
*self.interrupt.lock().await = Some(interrupt);
|
||||
if self.skip_next_idle_wait.load(Ordering::SeqCst) {
|
||||
// interrupt_idle has happened before we
|
||||
// provided self.interrupt
|
||||
self.skip_next_idle_wait.store(false, Ordering::SeqCst);
|
||||
info!(context, "fake-idle wait was skipped");
|
||||
} else {
|
||||
// loop until we are interrupted or if we fetched something
|
||||
while let Some(_) = interrupt_interval.next().await {
|
||||
// try to connect with proper login params
|
||||
// (setup_handle_if_needed might not know about them if we
|
||||
// never successfully connected)
|
||||
if let Err(err) = self.connect_configured(context) {
|
||||
warn!(context, "fake_idle: could not connect: {}", err);
|
||||
continue;
|
||||
}
|
||||
if self.config.read().await.can_idle {
|
||||
// we only fake-idled because network was gone during IDLE, probably
|
||||
break;
|
||||
}
|
||||
info!(context, "fake_idle is connected");
|
||||
// we are connected, let's see if fetching messages results
|
||||
// in anything. If so, we behave as if IDLE had data but
|
||||
// will have already fetched the messages so perform_*_fetch
|
||||
// will not find any new.
|
||||
let interrupt = stop_token::StopSource::new();
|
||||
|
||||
if let Some(ref watch_folder) = watch_folder {
|
||||
match self.fetch_new_messages(context, watch_folder).await {
|
||||
Ok(res) => {
|
||||
info!(context, "fetch_new_messages returned {:?}", res);
|
||||
if res {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!(context, "could not fetch from folder: {}", err);
|
||||
self.trigger_reconnect()
|
||||
// check every minute if there are new messages
|
||||
// TODO: grow sleep durations / make them more flexible
|
||||
let interval = async_std::stream::interval(Duration::from_secs(60));
|
||||
let mut interrupt_interval = interrupt.stop_token().stop_stream(interval);
|
||||
*self.interrupt.lock().await = Some(interrupt);
|
||||
if self.skip_next_idle_wait.load(Ordering::SeqCst) {
|
||||
// interrupt_idle has happened before we
|
||||
// provided self.interrupt
|
||||
self.skip_next_idle_wait.store(false, Ordering::SeqCst);
|
||||
info!(context, "fake-idle wait was skipped");
|
||||
} else {
|
||||
// loop until we are interrupted or if we fetched something
|
||||
while let Some(_) = interrupt_interval.next().await {
|
||||
// try to connect with proper login params
|
||||
// (setup_handle_if_needed might not know about them if we
|
||||
// never successfully connected)
|
||||
if let Err(err) = self.connect_configured(context).await {
|
||||
warn!(context, "fake_idle: could not connect: {}", err);
|
||||
continue;
|
||||
}
|
||||
if self.config.read().await.can_idle {
|
||||
// we only fake-idled because network was gone during IDLE, probably
|
||||
break;
|
||||
}
|
||||
info!(context, "fake_idle is connected");
|
||||
// we are connected, let's see if fetching messages results
|
||||
// in anything. If so, we behave as if IDLE had data but
|
||||
// will have already fetched the messages so perform_*_fetch
|
||||
// will not find any new.
|
||||
|
||||
if let Some(ref watch_folder) = watch_folder {
|
||||
match self.fetch_new_messages(context, watch_folder).await {
|
||||
Ok(res) => {
|
||||
info!(context, "fetch_new_messages returned {:?}", res);
|
||||
if res {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!(context, "could not fetch from folder: {}", err);
|
||||
self.trigger_reconnect()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
self.interrupt.lock().await.take();
|
||||
}
|
||||
self.interrupt.lock().await.take();
|
||||
|
||||
info!(
|
||||
context,
|
||||
"IMAP-fake-IDLE done after {:.4}s",
|
||||
SystemTime::now()
|
||||
.duration_since(fake_idle_start_time)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as f64
|
||||
/ 1000.,
|
||||
);
|
||||
})
|
||||
info!(
|
||||
context,
|
||||
"IMAP-fake-IDLE done after {:.4}s",
|
||||
SystemTime::now()
|
||||
.duration_since(fake_idle_start_time)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as f64
|
||||
/ 1000.,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn interrupt_idle(&self, context: &Context) {
|
||||
task::block_on(async move {
|
||||
let mut interrupt: Option<stop_token::StopSource> = self.interrupt.lock().await.take();
|
||||
if interrupt.is_none() {
|
||||
// idle wait is not running, signal it needs to skip
|
||||
self.skip_next_idle_wait.store(true, Ordering::SeqCst);
|
||||
pub async fn interrupt_idle(&self, context: &Context) {
|
||||
let mut interrupt: Option<stop_token::StopSource> = self.interrupt.lock().await.take();
|
||||
if interrupt.is_none() {
|
||||
// idle wait is not running, signal it needs to skip
|
||||
self.skip_next_idle_wait.store(true, Ordering::SeqCst);
|
||||
|
||||
// meanwhile idle-wait may have produced the StopSource
|
||||
interrupt = self.interrupt.lock().await.take();
|
||||
}
|
||||
// let's manually drop the StopSource
|
||||
if interrupt.is_some() {
|
||||
// the imap thread provided us a stop token but might
|
||||
// not have entered idle_wait yet, give it some time
|
||||
// for that to happen. XXX handle this without extra wait
|
||||
// https://github.com/deltachat/deltachat-core-rust/issues/925
|
||||
std::thread::sleep(Duration::from_millis(200));
|
||||
info!(context, "low-level: dropping stop-source to interrupt idle");
|
||||
std::mem::drop(interrupt)
|
||||
}
|
||||
});
|
||||
// meanwhile idle-wait may have produced the StopSource
|
||||
interrupt = self.interrupt.lock().await.take();
|
||||
}
|
||||
// let's manually drop the StopSource
|
||||
if interrupt.is_some() {
|
||||
// the imap thread provided us a stop token but might
|
||||
// not have entered idle_wait yet, give it some time
|
||||
// for that to happen. XXX handle this without extra wait
|
||||
// https://github.com/deltachat/deltachat-core-rust/issues/925
|
||||
std::thread::sleep(Duration::from_millis(200));
|
||||
info!(context, "low-level: dropping stop-source to interrupt idle");
|
||||
std::mem::drop(interrupt)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
407
src/imap/mod.rs
407
src/imap/mod.rs
@@ -22,7 +22,7 @@ use crate::dc_receive_imf::{
|
||||
};
|
||||
use crate::events::Event;
|
||||
use crate::headerdef::{HeaderDef, HeaderDefMap};
|
||||
use crate::job::{job_add, Action};
|
||||
use crate::job::{self, Action};
|
||||
use crate::login_param::{CertificateChecks, LoginParam};
|
||||
use crate::message::{self, update_server_uid};
|
||||
use crate::oauth2::dc_get_oauth2_access_token;
|
||||
@@ -363,8 +363,8 @@ impl Imap {
|
||||
}
|
||||
|
||||
/// Connects to imap account using already-configured parameters.
|
||||
pub fn connect_configured(&self, context: &Context) -> Result<()> {
|
||||
if async_std::task::block_on(self.is_connected()) && !self.should_reconnect() {
|
||||
pub async fn connect_configured(&self, context: &Context) -> Result<()> {
|
||||
if self.is_connected().await && !self.should_reconnect() {
|
||||
return Ok(());
|
||||
}
|
||||
if !context.sql.get_raw_config_bool(context, "configured") {
|
||||
@@ -374,7 +374,7 @@ impl Imap {
|
||||
let param = LoginParam::from_database(context, "configured_");
|
||||
// the trailing underscore is correct
|
||||
|
||||
if task::block_on(self.connect(context, ¶m)) {
|
||||
if self.connect(context, ¶m).await {
|
||||
self.ensure_configured_folders(context, true)
|
||||
} else {
|
||||
Err(Error::ConnectionFailed(format!("{}", param)))
|
||||
@@ -451,7 +451,7 @@ impl Imap {
|
||||
};
|
||||
|
||||
if teardown {
|
||||
self.disconnect(context);
|
||||
self.disconnect(context).await;
|
||||
|
||||
false
|
||||
} else {
|
||||
@@ -459,11 +459,9 @@ impl Imap {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn disconnect(&self, context: &Context) {
|
||||
task::block_on(async move {
|
||||
self.unsetup_handle(context).await;
|
||||
self.free_connect_params().await;
|
||||
});
|
||||
pub async fn disconnect(&self, context: &Context) {
|
||||
self.unsetup_handle(context).await;
|
||||
self.free_connect_params().await;
|
||||
}
|
||||
|
||||
pub async fn fetch(&self, context: &Context, watch_folder: &str) -> Result<()> {
|
||||
@@ -502,85 +500,83 @@ impl Imap {
|
||||
}
|
||||
|
||||
/// return Result with (uid_validity, last_seen_uid) tuple.
|
||||
pub(crate) fn select_with_uidvalidity(
|
||||
pub(crate) async fn select_with_uidvalidity(
|
||||
&self,
|
||||
context: &Context,
|
||||
folder: &str,
|
||||
) -> Result<(u32, u32)> {
|
||||
task::block_on(async move {
|
||||
self.select_folder(context, Some(folder)).await?;
|
||||
self.select_folder(context, Some(folder)).await?;
|
||||
|
||||
// compare last seen UIDVALIDITY against the current one
|
||||
let (uid_validity, last_seen_uid) = self.get_config_last_seen_uid(context, &folder);
|
||||
// compare last seen UIDVALIDITY against the current one
|
||||
let (uid_validity, last_seen_uid) = self.get_config_last_seen_uid(context, &folder);
|
||||
|
||||
let config = self.config.read().await;
|
||||
let mailbox = config
|
||||
.selected_mailbox
|
||||
.as_ref()
|
||||
.ok_or_else(|| Error::NoMailbox(folder.to_string()))?;
|
||||
let config = self.config.read().await;
|
||||
let mailbox = config
|
||||
.selected_mailbox
|
||||
.as_ref()
|
||||
.ok_or_else(|| Error::NoMailbox(folder.to_string()))?;
|
||||
|
||||
let new_uid_validity = match mailbox.uid_validity {
|
||||
Some(v) => v,
|
||||
None => {
|
||||
let s = format!("No UIDVALIDITY for folder {:?}", folder);
|
||||
return Err(Error::Other(s));
|
||||
}
|
||||
};
|
||||
|
||||
if new_uid_validity == uid_validity {
|
||||
return Ok((uid_validity, last_seen_uid));
|
||||
let new_uid_validity = match mailbox.uid_validity {
|
||||
Some(v) => v,
|
||||
None => {
|
||||
let s = format!("No UIDVALIDITY for folder {:?}", folder);
|
||||
return Err(Error::Other(s));
|
||||
}
|
||||
};
|
||||
|
||||
if mailbox.exists == 0 {
|
||||
info!(context, "Folder \"{}\" is empty.", folder);
|
||||
if new_uid_validity == uid_validity {
|
||||
return Ok((uid_validity, last_seen_uid));
|
||||
}
|
||||
|
||||
// set lastseenuid=0 for empty folders.
|
||||
// id we do not do this here, we'll miss the first message
|
||||
// as we will get in here again and fetch from lastseenuid+1 then
|
||||
if mailbox.exists == 0 {
|
||||
info!(context, "Folder \"{}\" is empty.", folder);
|
||||
|
||||
self.set_config_last_seen_uid(context, &folder, new_uid_validity, 0);
|
||||
return Ok((new_uid_validity, 0));
|
||||
// set lastseenuid=0 for empty folders.
|
||||
// id we do not do this here, we'll miss the first message
|
||||
// as we will get in here again and fetch from lastseenuid+1 then
|
||||
|
||||
self.set_config_last_seen_uid(context, &folder, new_uid_validity, 0);
|
||||
return Ok((new_uid_validity, 0));
|
||||
}
|
||||
|
||||
// uid_validity has changed or is being set the first time.
|
||||
// find the last seen uid within the new uid_validity scope.
|
||||
let new_last_seen_uid = match mailbox.uid_next {
|
||||
Some(uid_next) => {
|
||||
uid_next - 1 // XXX could uid_next be 0?
|
||||
}
|
||||
|
||||
// uid_validity has changed or is being set the first time.
|
||||
// find the last seen uid within the new uid_validity scope.
|
||||
let new_last_seen_uid = match mailbox.uid_next {
|
||||
Some(uid_next) => {
|
||||
uid_next - 1 // XXX could uid_next be 0?
|
||||
}
|
||||
None => {
|
||||
warn!(
|
||||
context,
|
||||
"IMAP folder has no uid_next, fall back to fetching"
|
||||
);
|
||||
if let Some(ref mut session) = &mut *self.session.lock().await {
|
||||
// note that we use fetch by sequence number
|
||||
// and thus we only need to get exactly the
|
||||
// last-index message.
|
||||
let set = format!("{}", mailbox.exists);
|
||||
match session.fetch(set, JUST_UID).await {
|
||||
Ok(list) => list[0].uid.unwrap_or_default(),
|
||||
Err(err) => {
|
||||
return Err(Error::FetchFailed(err));
|
||||
}
|
||||
None => {
|
||||
warn!(
|
||||
context,
|
||||
"IMAP folder has no uid_next, fall back to fetching"
|
||||
);
|
||||
if let Some(ref mut session) = &mut *self.session.lock().await {
|
||||
// note that we use fetch by sequence number
|
||||
// and thus we only need to get exactly the
|
||||
// last-index message.
|
||||
let set = format!("{}", mailbox.exists);
|
||||
match session.fetch(set, JUST_UID).await {
|
||||
Ok(list) => list[0].uid.unwrap_or_default(),
|
||||
Err(err) => {
|
||||
return Err(Error::FetchFailed(err));
|
||||
}
|
||||
} else {
|
||||
return Err(Error::NoConnection);
|
||||
}
|
||||
} else {
|
||||
return Err(Error::NoConnection);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
self.set_config_last_seen_uid(context, &folder, new_uid_validity, new_last_seen_uid);
|
||||
info!(
|
||||
context,
|
||||
"uid/validity change: new {}/{} current {}/{}",
|
||||
new_last_seen_uid,
|
||||
new_uid_validity,
|
||||
uid_validity,
|
||||
last_seen_uid
|
||||
);
|
||||
Ok((new_uid_validity, new_last_seen_uid))
|
||||
})
|
||||
self.set_config_last_seen_uid(context, &folder, new_uid_validity, new_last_seen_uid);
|
||||
info!(
|
||||
context,
|
||||
"uid/validity change: new {}/{} current {}/{}",
|
||||
new_last_seen_uid,
|
||||
new_uid_validity,
|
||||
uid_validity,
|
||||
last_seen_uid
|
||||
);
|
||||
Ok((new_uid_validity, new_last_seen_uid))
|
||||
}
|
||||
|
||||
async fn fetch_new_messages<S: AsRef<str>>(
|
||||
@@ -591,10 +587,11 @@ impl Imap {
|
||||
let show_emails =
|
||||
ShowEmails::from_i32(context.get_config_int(Config::ShowEmails)).unwrap_or_default();
|
||||
|
||||
let (uid_validity, last_seen_uid) =
|
||||
self.select_with_uidvalidity(context, folder.as_ref())?;
|
||||
let (uid_validity, last_seen_uid) = self
|
||||
.select_with_uidvalidity(context, folder.as_ref())
|
||||
.await?;
|
||||
|
||||
let mut read_cnt = 0;
|
||||
let mut read_cnt: usize = 0;
|
||||
|
||||
let mut list = if let Some(ref mut session) = &mut *self.session.lock().await {
|
||||
// fetch messages with larger UID than the last one seen
|
||||
@@ -636,7 +633,7 @@ impl Imap {
|
||||
|
||||
let headers = get_fetch_headers(fetch)?;
|
||||
let message_id = prefetch_get_message_id(&headers).unwrap_or_default();
|
||||
if precheck_imf(context, &message_id, folder.as_ref(), cur_uid) {
|
||||
if precheck_imf(context, &message_id, folder.as_ref(), cur_uid).await {
|
||||
// we know the message-id already or don't want the message otherwise.
|
||||
info!(
|
||||
context,
|
||||
@@ -763,7 +760,7 @@ impl Imap {
|
||||
if !is_deleted && msg.body().is_some() {
|
||||
let body = msg.body().unwrap_or_default();
|
||||
if let Err(err) =
|
||||
dc_receive_imf(context, &body, folder.as_ref(), server_uid, is_seen)
|
||||
dc_receive_imf(context, &body, folder.as_ref(), server_uid, is_seen).await
|
||||
{
|
||||
warn!(
|
||||
context,
|
||||
@@ -786,11 +783,11 @@ impl Imap {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn can_move(&self) -> bool {
|
||||
task::block_on(async move { self.config.read().await.can_move })
|
||||
pub async fn can_move(&self) -> bool {
|
||||
self.config.read().await.can_move
|
||||
}
|
||||
|
||||
pub fn mv(
|
||||
pub async fn mv(
|
||||
&self,
|
||||
context: &Context,
|
||||
folder: &str,
|
||||
@@ -798,96 +795,94 @@ impl Imap {
|
||||
dest_folder: &str,
|
||||
dest_uid: &mut u32,
|
||||
) -> ImapActionResult {
|
||||
task::block_on(async move {
|
||||
if folder == dest_folder {
|
||||
info!(
|
||||
context,
|
||||
"Skip moving message; message {}/{} is already in {}...",
|
||||
folder,
|
||||
uid,
|
||||
dest_folder,
|
||||
);
|
||||
return ImapActionResult::AlreadyDone;
|
||||
}
|
||||
if let Some(imapresult) = self.prepare_imap_operation_on_msg(context, folder, uid) {
|
||||
return imapresult;
|
||||
}
|
||||
// we are connected, and the folder is selected
|
||||
if folder == dest_folder {
|
||||
info!(
|
||||
context,
|
||||
"Skip moving message; message {}/{} is already in {}...", folder, uid, dest_folder,
|
||||
);
|
||||
return ImapActionResult::AlreadyDone;
|
||||
}
|
||||
if let Some(imapresult) = self
|
||||
.prepare_imap_operation_on_msg(context, folder, uid)
|
||||
.await
|
||||
{
|
||||
return imapresult;
|
||||
}
|
||||
// we are connected, and the folder is selected
|
||||
|
||||
// XXX Rust-Imap provides no target uid on mv, so just set it to 0
|
||||
*dest_uid = 0;
|
||||
// XXX Rust-Imap provides no target uid on mv, so just set it to 0
|
||||
*dest_uid = 0;
|
||||
|
||||
let set = format!("{}", uid);
|
||||
let display_folder_id = format!("{}/{}", folder, uid);
|
||||
|
||||
if self.can_move() {
|
||||
if let Some(ref mut session) = &mut *self.session.lock().await {
|
||||
match session.uid_mv(&set, &dest_folder).await {
|
||||
Ok(_) => {
|
||||
emit_event!(
|
||||
context,
|
||||
Event::ImapMessageMoved(format!(
|
||||
"IMAP Message {} moved to {}",
|
||||
display_folder_id, dest_folder
|
||||
))
|
||||
);
|
||||
return ImapActionResult::Success;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
context,
|
||||
"Cannot move message, fallback to COPY/DELETE {}/{} to {}: {}",
|
||||
folder,
|
||||
uid,
|
||||
dest_folder,
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
unreachable!();
|
||||
};
|
||||
} else {
|
||||
info!(
|
||||
context,
|
||||
"Server does not support MOVE, fallback to COPY/DELETE {}/{} to {}",
|
||||
folder,
|
||||
uid,
|
||||
dest_folder
|
||||
);
|
||||
}
|
||||
let set = format!("{}", uid);
|
||||
let display_folder_id = format!("{}/{}", folder, uid);
|
||||
|
||||
if self.can_move().await {
|
||||
if let Some(ref mut session) = &mut *self.session.lock().await {
|
||||
if let Err(err) = session.uid_copy(&set, &dest_folder).await {
|
||||
warn!(context, "Could not copy message: {}", err);
|
||||
return ImapActionResult::Failed;
|
||||
match session.uid_mv(&set, &dest_folder).await {
|
||||
Ok(_) => {
|
||||
emit_event!(
|
||||
context,
|
||||
Event::ImapMessageMoved(format!(
|
||||
"IMAP Message {} moved to {}",
|
||||
display_folder_id, dest_folder
|
||||
))
|
||||
);
|
||||
return ImapActionResult::Success;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
context,
|
||||
"Cannot move message, fallback to COPY/DELETE {}/{} to {}: {}",
|
||||
folder,
|
||||
uid,
|
||||
dest_folder,
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
unreachable!();
|
||||
}
|
||||
};
|
||||
} else {
|
||||
info!(
|
||||
context,
|
||||
"Server does not support MOVE, fallback to COPY/DELETE {}/{} to {}",
|
||||
folder,
|
||||
uid,
|
||||
dest_folder
|
||||
);
|
||||
}
|
||||
|
||||
if !self.add_flag_finalized(context, uid, "\\Deleted").await {
|
||||
warn!(context, "Cannot mark {} as \"Deleted\" after copy.", uid);
|
||||
emit_event!(
|
||||
context,
|
||||
Event::ImapMessageMoved(format!(
|
||||
"IMAP Message {} copied to {} (delete FAILED)",
|
||||
display_folder_id, dest_folder
|
||||
))
|
||||
);
|
||||
ImapActionResult::Failed
|
||||
} else {
|
||||
self.config.write().await.selected_folder_needs_expunge = true;
|
||||
emit_event!(
|
||||
context,
|
||||
Event::ImapMessageMoved(format!(
|
||||
"IMAP Message {} copied to {} (delete successfull)",
|
||||
display_folder_id, dest_folder
|
||||
))
|
||||
);
|
||||
ImapActionResult::Success
|
||||
if let Some(ref mut session) = &mut *self.session.lock().await {
|
||||
if let Err(err) = session.uid_copy(&set, &dest_folder).await {
|
||||
warn!(context, "Could not copy message: {}", err);
|
||||
return ImapActionResult::Failed;
|
||||
}
|
||||
})
|
||||
} else {
|
||||
unreachable!();
|
||||
}
|
||||
|
||||
if !self.add_flag_finalized(context, uid, "\\Deleted").await {
|
||||
warn!(context, "Cannot mark {} as \"Deleted\" after copy.", uid);
|
||||
emit_event!(
|
||||
context,
|
||||
Event::ImapMessageMoved(format!(
|
||||
"IMAP Message {} copied to {} (delete FAILED)",
|
||||
display_folder_id, dest_folder
|
||||
))
|
||||
);
|
||||
ImapActionResult::Failed
|
||||
} else {
|
||||
self.config.write().await.selected_folder_needs_expunge = true;
|
||||
emit_event!(
|
||||
context,
|
||||
Event::ImapMessageMoved(format!(
|
||||
"IMAP Message {} copied to {} (delete successfull)",
|
||||
display_folder_id, dest_folder
|
||||
))
|
||||
);
|
||||
ImapActionResult::Success
|
||||
}
|
||||
}
|
||||
|
||||
async fn add_flag_finalized(&self, context: &Context, server_uid: u32, flag: &str) -> bool {
|
||||
@@ -929,51 +924,52 @@ impl Imap {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn prepare_imap_operation_on_msg(
|
||||
pub async fn prepare_imap_operation_on_msg(
|
||||
&self,
|
||||
context: &Context,
|
||||
folder: &str,
|
||||
uid: u32,
|
||||
) -> Option<ImapActionResult> {
|
||||
task::block_on(async move {
|
||||
if uid == 0 {
|
||||
return Some(ImapActionResult::Failed);
|
||||
if uid == 0 {
|
||||
return Some(ImapActionResult::Failed);
|
||||
}
|
||||
if !self.is_connected().await {
|
||||
// currently jobs are only performed on the INBOX thread
|
||||
// TODO: make INBOX/SENT/MVBOX perform the jobs on their
|
||||
// respective folders to avoid select_folder network traffic
|
||||
// and the involved error states
|
||||
if let Err(err) = self.connect_configured(context).await {
|
||||
warn!(context, "prepare_imap_op failed: {}", err);
|
||||
return Some(ImapActionResult::RetryLater);
|
||||
}
|
||||
if !self.is_connected().await {
|
||||
// currently jobs are only performed on the INBOX thread
|
||||
// TODO: make INBOX/SENT/MVBOX perform the jobs on their
|
||||
// respective folders to avoid select_folder network traffic
|
||||
// and the involved error states
|
||||
if let Err(err) = self.connect_configured(context) {
|
||||
warn!(context, "prepare_imap_op failed: {}", err);
|
||||
return Some(ImapActionResult::RetryLater);
|
||||
}
|
||||
}
|
||||
match self.select_folder(context, Some(&folder)).await {
|
||||
Ok(()) => None,
|
||||
Err(select_folder::Error::ConnectionLost) => {
|
||||
warn!(context, "Lost imap connection");
|
||||
Some(ImapActionResult::RetryLater)
|
||||
}
|
||||
match self.select_folder(context, Some(&folder)).await {
|
||||
Ok(()) => None,
|
||||
Err(select_folder::Error::ConnectionLost) => {
|
||||
warn!(context, "Lost imap connection");
|
||||
Some(ImapActionResult::RetryLater)
|
||||
}
|
||||
Err(select_folder::Error::NoSession) => {
|
||||
warn!(context, "no imap session");
|
||||
Some(ImapActionResult::Failed)
|
||||
}
|
||||
Err(select_folder::Error::BadFolderName(folder_name)) => {
|
||||
warn!(context, "invalid folder name: {:?}", folder_name);
|
||||
Some(ImapActionResult::Failed)
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(context, "failed to select folder: {:?}: {:?}", folder, err);
|
||||
Some(ImapActionResult::RetryLater)
|
||||
}
|
||||
Err(select_folder::Error::NoSession) => {
|
||||
warn!(context, "no imap session");
|
||||
Some(ImapActionResult::Failed)
|
||||
}
|
||||
})
|
||||
Err(select_folder::Error::BadFolderName(folder_name)) => {
|
||||
warn!(context, "invalid folder name: {:?}", folder_name);
|
||||
Some(ImapActionResult::Failed)
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(context, "failed to select folder: {:?}: {:?}", folder, err);
|
||||
Some(ImapActionResult::RetryLater)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_seen(&self, context: &Context, folder: &str, uid: u32) -> ImapActionResult {
|
||||
task::block_on(async move {
|
||||
if let Some(imapresult) = self.prepare_imap_operation_on_msg(context, folder, uid) {
|
||||
if let Some(imapresult) = self
|
||||
.prepare_imap_operation_on_msg(context, folder, uid)
|
||||
.await
|
||||
{
|
||||
return imapresult;
|
||||
}
|
||||
// we are connected, and the folder is selected
|
||||
@@ -999,7 +995,10 @@ impl Imap {
|
||||
uid: &mut u32,
|
||||
) -> ImapActionResult {
|
||||
task::block_on(async move {
|
||||
if let Some(imapresult) = self.prepare_imap_operation_on_msg(context, folder, *uid) {
|
||||
if let Some(imapresult) = self
|
||||
.prepare_imap_operation_on_msg(context, folder, *uid)
|
||||
.await
|
||||
{
|
||||
return imapresult;
|
||||
}
|
||||
// we are connected, and the folder is selected
|
||||
@@ -1291,20 +1290,28 @@ fn get_folder_meaning(folder_name: &Name) -> FolderMeaning {
|
||||
}
|
||||
}
|
||||
|
||||
fn precheck_imf(context: &Context, rfc724_mid: &str, server_folder: &str, server_uid: u32) -> bool {
|
||||
async fn precheck_imf(
|
||||
context: &Context,
|
||||
rfc724_mid: &str,
|
||||
server_folder: &str,
|
||||
server_uid: u32,
|
||||
) -> bool {
|
||||
if let Ok((old_server_folder, old_server_uid, msg_id)) =
|
||||
message::rfc724_mid_exists(context, &rfc724_mid)
|
||||
{
|
||||
if old_server_folder.is_empty() && old_server_uid == 0 {
|
||||
info!(context, "[move] detected bcc-self {}", rfc724_mid,);
|
||||
context.do_heuristics_moves(server_folder.as_ref(), msg_id);
|
||||
job_add(
|
||||
context
|
||||
.do_heuristics_moves(server_folder.as_ref(), msg_id)
|
||||
.await;
|
||||
job::add(
|
||||
context,
|
||||
Action::MarkseenMsgOnImap,
|
||||
msg_id.to_u32() as i32,
|
||||
Params::new(),
|
||||
0,
|
||||
);
|
||||
)
|
||||
.await;
|
||||
} else if old_server_folder != server_folder {
|
||||
info!(context, "[move] detected moved message {}", rfc724_mid,);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user