mirror of
https://github.com/chatmail/core.git
synced 2026-05-22 16:26:31 +03:00
integrate imex and imap
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
extern crate deltachat;
|
extern crate deltachat;
|
||||||
|
|
||||||
use async_std::sync::{Arc, RwLock};
|
use async_std::task;
|
||||||
|
|
||||||
use std::time;
|
use std::time;
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
|
|
||||||
@@ -11,7 +12,7 @@ use deltachat::contact::*;
|
|||||||
use deltachat::context::*;
|
use deltachat::context::*;
|
||||||
use deltachat::Event;
|
use deltachat::Event;
|
||||||
|
|
||||||
fn cb(_ctx: &Context, event: Event) {
|
fn cb(event: Event) {
|
||||||
print!("[{:?}]", event);
|
print!("[{:?}]", event);
|
||||||
|
|
||||||
match event {
|
match event {
|
||||||
@@ -39,7 +40,16 @@ async fn main() {
|
|||||||
let duration = time::Duration::from_millis(4000);
|
let duration = time::Duration::from_millis(4000);
|
||||||
println!("info: {:#?}", info);
|
println!("info: {:#?}", info);
|
||||||
|
|
||||||
ctx.run().await;
|
let ctx1 = ctx.clone();
|
||||||
|
task::spawn(async move {
|
||||||
|
loop {
|
||||||
|
if let Ok(event) = ctx1.get_next_event() {
|
||||||
|
cb(event);
|
||||||
|
} else {
|
||||||
|
task::sleep(time::Duration::from_millis(100)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
println!("configuring");
|
println!("configuring");
|
||||||
let args = std::env::args().collect::<Vec<String>>();
|
let args = std::env::args().collect::<Vec<String>>();
|
||||||
@@ -51,9 +61,11 @@ async fn main() {
|
|||||||
ctx.set_config(config::Config::MailPw, Some(&pw))
|
ctx.set_config(config::Config::MailPw, Some(&pw))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
ctx.configure().await;
|
ctx.configure().await;
|
||||||
|
|
||||||
async_std::task::sleep(duration).await;
|
println!("------ RUN ------");
|
||||||
|
ctx.run().await;
|
||||||
|
|
||||||
println!("sending a message");
|
println!("sending a message");
|
||||||
let contact_id = Contact::create(&ctx, "dignifiedquire", "dignifiedquire@gmail.com")
|
let contact_id = Contact::create(&ctx, "dignifiedquire", "dignifiedquire@gmail.com")
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ impl Context {
|
|||||||
self.sql.get_raw_config_bool(self, "configured").await
|
self.sql.get_raw_config_bool(self, "configured").await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Starts a configuration job.
|
/// Configures this account with the currently set parameters.
|
||||||
pub async fn configure(&self) {
|
pub async fn configure(&self) {
|
||||||
if self.has_ongoing().await {
|
if self.has_ongoing().await {
|
||||||
warn!(self, "There is already another ongoing process running.",);
|
warn!(self, "There is already another ongoing process running.",);
|
||||||
|
|||||||
@@ -79,7 +79,7 @@ pub fn get_info() -> HashMap<&'static str, String> {
|
|||||||
impl Context {
|
impl Context {
|
||||||
/// Creates new context.
|
/// Creates new context.
|
||||||
pub async fn new(os_name: String, dbfile: PathBuf) -> Result<Context> {
|
pub async fn new(os_name: String, dbfile: PathBuf) -> Result<Context> {
|
||||||
pretty_env_logger::try_init_timed().ok();
|
// pretty_env_logger::try_init_timed().ok();
|
||||||
|
|
||||||
let mut blob_fname = OsString::new();
|
let mut blob_fname = OsString::new();
|
||||||
blob_fname.push(dbfile.file_name().unwrap_or_default());
|
blob_fname.push(dbfile.file_name().unwrap_or_default());
|
||||||
|
|||||||
140
src/imap/idle.rs
140
src/imap/idle.rs
@@ -64,6 +64,8 @@ impl Imap {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn idle(&mut self, context: &Context, watch_folder: Option<String>) -> Result<()> {
|
pub async fn idle(&mut self, context: &Context, watch_folder: Option<String>) -> Result<()> {
|
||||||
|
use futures::future::FutureExt;
|
||||||
|
|
||||||
if !self.can_idle() {
|
if !self.can_idle() {
|
||||||
return Err(Error::IdleAbilityMissing);
|
return Err(Error::IdleAbilityMissing);
|
||||||
}
|
}
|
||||||
@@ -76,6 +78,7 @@ impl Imap {
|
|||||||
|
|
||||||
let session = self.session.take();
|
let session = self.session.take();
|
||||||
let timeout = Duration::from_secs(23 * 60);
|
let timeout = Duration::from_secs(23 * 60);
|
||||||
|
|
||||||
if let Some(session) = session {
|
if let Some(session) = session {
|
||||||
match session.idle() {
|
match session.idle() {
|
||||||
// BEWARE: If you change the Secure branch you
|
// BEWARE: If you change the Secure branch you
|
||||||
@@ -86,17 +89,24 @@ impl Imap {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
|
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
|
||||||
self.interrupt = Some(interrupt);
|
|
||||||
|
|
||||||
if self.skip_next_idle_wait {
|
if self.skip_next_idle_wait {
|
||||||
// interrupt_idle has happened before we
|
// interrupt_idle has happened before we
|
||||||
// provided self.interrupt
|
// provided self.interrupt
|
||||||
self.skip_next_idle_wait = false;
|
self.skip_next_idle_wait = false;
|
||||||
std::mem::drop(idle_wait);
|
drop(idle_wait);
|
||||||
|
drop(interrupt);
|
||||||
|
|
||||||
info!(context, "Idle wait was skipped");
|
info!(context, "Idle wait was skipped");
|
||||||
} else {
|
} else {
|
||||||
info!(context, "Idle entering wait-on-remote state");
|
info!(context, "Idle entering wait-on-remote state");
|
||||||
match idle_wait.await {
|
let fut = idle_wait.race(
|
||||||
|
self.idle_interrupt
|
||||||
|
.recv()
|
||||||
|
.map(|_| Ok(IdleResponse::ManualInterrupt)),
|
||||||
|
);
|
||||||
|
|
||||||
|
match fut.await {
|
||||||
Ok(IdleResponse::NewData(_)) => {
|
Ok(IdleResponse::NewData(_)) => {
|
||||||
info!(context, "Idle has NewData");
|
info!(context, "Idle has NewData");
|
||||||
}
|
}
|
||||||
@@ -114,9 +124,12 @@ impl Imap {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we can't properly terminate the idle
|
// if we can't properly terminate the idle
|
||||||
// protocol let's break the connection.
|
// protocol let's break the connection.
|
||||||
let res = async_std::future::timeout(Duration::from_secs(15), handle.done())
|
let res = handle
|
||||||
|
.done()
|
||||||
|
.timeout(Duration::from_secs(15))
|
||||||
.await
|
.await
|
||||||
.map_err(|err| {
|
.map_err(|err| {
|
||||||
self.trigger_reconnect();
|
self.trigger_reconnect();
|
||||||
@@ -142,17 +155,23 @@ impl Imap {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
|
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
|
||||||
self.interrupt = Some(interrupt);
|
|
||||||
|
|
||||||
if self.skip_next_idle_wait {
|
if self.skip_next_idle_wait {
|
||||||
// interrupt_idle has happened before we
|
// interrupt_idle has happened before we
|
||||||
// provided self.interrupt
|
// provided self.interrupt
|
||||||
self.skip_next_idle_wait = false;
|
self.skip_next_idle_wait = false;
|
||||||
std::mem::drop(idle_wait);
|
drop(idle_wait);
|
||||||
|
drop(interrupt);
|
||||||
info!(context, "Idle wait was skipped");
|
info!(context, "Idle wait was skipped");
|
||||||
} else {
|
} else {
|
||||||
info!(context, "Idle entering wait-on-remote state");
|
info!(context, "Idle entering wait-on-remote state");
|
||||||
match idle_wait.await {
|
let fut = idle_wait.race(
|
||||||
|
self.idle_interrupt
|
||||||
|
.recv()
|
||||||
|
.map(|_| Ok(IdleResponse::ManualInterrupt)),
|
||||||
|
);
|
||||||
|
|
||||||
|
match fut.await {
|
||||||
Ok(IdleResponse::NewData(_)) => {
|
Ok(IdleResponse::NewData(_)) => {
|
||||||
info!(context, "Idle has NewData");
|
info!(context, "Idle has NewData");
|
||||||
}
|
}
|
||||||
@@ -172,7 +191,9 @@ impl Imap {
|
|||||||
}
|
}
|
||||||
// if we can't properly terminate the idle
|
// if we can't properly terminate the idle
|
||||||
// protocol let's break the connection.
|
// protocol let's break the connection.
|
||||||
let res = async_std::future::timeout(Duration::from_secs(15), handle.done())
|
let res = handle
|
||||||
|
.done()
|
||||||
|
.timeout(Duration::from_secs(15))
|
||||||
.await
|
.await
|
||||||
.map_err(|err| {
|
.map_err(|err| {
|
||||||
self.trigger_reconnect();
|
self.trigger_reconnect();
|
||||||
@@ -203,58 +224,66 @@ impl Imap {
|
|||||||
// in this case, we're waiting for a configure job (and an interrupt).
|
// in this case, we're waiting for a configure job (and an interrupt).
|
||||||
|
|
||||||
let fake_idle_start_time = SystemTime::now();
|
let fake_idle_start_time = SystemTime::now();
|
||||||
|
|
||||||
info!(context, "IMAP-fake-IDLEing...");
|
info!(context, "IMAP-fake-IDLEing...");
|
||||||
|
|
||||||
let interrupt = stop_token::StopSource::new();
|
|
||||||
|
|
||||||
// check every minute if there are new messages
|
|
||||||
// TODO: grow sleep durations / make them more flexible
|
|
||||||
let interval = async_std::stream::interval(Duration::from_secs(60));
|
|
||||||
let mut interrupt_interval = interrupt.stop_token().stop_stream(interval);
|
|
||||||
self.interrupt = Some(interrupt);
|
|
||||||
if self.skip_next_idle_wait {
|
if self.skip_next_idle_wait {
|
||||||
// interrupt_idle has happened before we
|
// interrupt_idle has happened before we
|
||||||
// provided self.interrupt
|
// provided self.interrupt
|
||||||
self.skip_next_idle_wait = false;
|
self.skip_next_idle_wait = false;
|
||||||
info!(context, "fake-idle wait was skipped");
|
info!(context, "fake-idle wait was skipped");
|
||||||
} else {
|
} else {
|
||||||
// loop until we are interrupted or if we fetched something
|
// check every minute if there are new messages
|
||||||
while let Some(_) = interrupt_interval.next().await {
|
// TODO: grow sleep durations / make them more flexible
|
||||||
// try to connect with proper login params
|
let mut interval = async_std::stream::interval(Duration::from_secs(60));
|
||||||
// (setup_handle_if_needed might not know about them if we
|
|
||||||
// never successfully connected)
|
|
||||||
if let Err(err) = self.connect_configured(context).await {
|
|
||||||
warn!(context, "fake_idle: could not connect: {}", err);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if self.config.can_idle {
|
|
||||||
// we only fake-idled because network was gone during IDLE, probably
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
info!(context, "fake_idle is connected");
|
|
||||||
// we are connected, let's see if fetching messages results
|
|
||||||
// in anything. If so, we behave as if IDLE had data but
|
|
||||||
// will have already fetched the messages so perform_*_fetch
|
|
||||||
// will not find any new.
|
|
||||||
|
|
||||||
if let Some(ref watch_folder) = watch_folder {
|
// loop until we are interrupted or if we fetched something
|
||||||
match self.fetch_new_messages(context, watch_folder).await {
|
loop {
|
||||||
Ok(res) => {
|
use futures::future::FutureExt;
|
||||||
info!(context, "fetch_new_messages returned {:?}", res);
|
match interval
|
||||||
if res {
|
.next()
|
||||||
break;
|
.race(self.idle_interrupt.recv().map(|_| None))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Some(_) => {
|
||||||
|
// try to connect with proper login params
|
||||||
|
// (setup_handle_if_needed might not know about them if we
|
||||||
|
// never successfully connected)
|
||||||
|
if let Err(err) = self.connect_configured(context).await {
|
||||||
|
warn!(context, "fake_idle: could not connect: {}", err);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if self.config.can_idle {
|
||||||
|
// we only fake-idled because network was gone during IDLE, probably
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
info!(context, "fake_idle is connected");
|
||||||
|
// we are connected, let's see if fetching messages results
|
||||||
|
// in anything. If so, we behave as if IDLE had data but
|
||||||
|
// will have already fetched the messages so perform_*_fetch
|
||||||
|
// will not find any new.
|
||||||
|
|
||||||
|
if let Some(ref watch_folder) = watch_folder {
|
||||||
|
match self.fetch_new_messages(context, watch_folder).await {
|
||||||
|
Ok(res) => {
|
||||||
|
info!(context, "fetch_new_messages returned {:?}", res);
|
||||||
|
if res {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!(context, "could not fetch from folder: {}", err);
|
||||||
|
self.trigger_reconnect()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) => {
|
}
|
||||||
error!(context, "could not fetch from folder: {}", err);
|
None => {
|
||||||
self.trigger_reconnect()
|
// Interrupt
|
||||||
}
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.interrupt.take();
|
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
context,
|
context,
|
||||||
@@ -266,25 +295,4 @@ impl Imap {
|
|||||||
/ 1000.,
|
/ 1000.,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn interrupt_idle(&mut self, context: &Context) {
|
|
||||||
let mut interrupt: Option<stop_token::StopSource> = self.interrupt.take();
|
|
||||||
if interrupt.is_none() {
|
|
||||||
// idle wait is not running, signal it needs to skip
|
|
||||||
self.skip_next_idle_wait = false;
|
|
||||||
|
|
||||||
// meanwhile idle-wait may have produced the StopSource
|
|
||||||
interrupt = self.interrupt.take();
|
|
||||||
}
|
|
||||||
// let's manually drop the StopSource
|
|
||||||
if interrupt.is_some() {
|
|
||||||
// the imap thread provided us a stop token but might
|
|
||||||
// not have entered idle_wait yet, give it some time
|
|
||||||
// for that to happen. XXX handle this without extra wait
|
|
||||||
// https://github.com/deltachat/deltachat-core-rust/issues/925
|
|
||||||
async_std::task::sleep(Duration::from_millis(200)).await;
|
|
||||||
info!(context, "low-level: dropping stop-source to interrupt idle");
|
|
||||||
std::mem::drop(interrupt)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -391,8 +391,7 @@ impl Imap {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// tries connecting to imap account using the specific login
|
/// Tries connecting to imap account using the specific login parameters.
|
||||||
/// parameters
|
|
||||||
pub async fn connect(&mut self, context: &Context, lp: &LoginParam) -> bool {
|
pub async fn connect(&mut self, context: &Context, lp: &LoginParam) -> bool {
|
||||||
if lp.mail_server.is_empty() || lp.mail_user.is_empty() || lp.mail_pw.is_empty() {
|
if lp.mail_server.is_empty() || lp.mail_user.is_empty() || lp.mail_pw.is_empty() {
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
46
src/imex.rs
46
src/imex.rs
@@ -4,7 +4,6 @@ use std::cmp::{max, min};
|
|||||||
|
|
||||||
use async_std::path::{Path, PathBuf};
|
use async_std::path::{Path, PathBuf};
|
||||||
use async_std::prelude::*;
|
use async_std::prelude::*;
|
||||||
use num_traits::FromPrimitive;
|
|
||||||
use rand::{thread_rng, Rng};
|
use rand::{thread_rng, Rng};
|
||||||
|
|
||||||
use crate::blob::BlobObject;
|
use crate::blob::BlobObject;
|
||||||
@@ -17,7 +16,6 @@ use crate::dc_tools::*;
|
|||||||
use crate::e2ee;
|
use crate::e2ee;
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use crate::events::Event;
|
use crate::events::Event;
|
||||||
use crate::job::Job;
|
|
||||||
use crate::key::{self, Key};
|
use crate::key::{self, Key};
|
||||||
use crate::message::{Message, MsgId};
|
use crate::message::{Message, MsgId};
|
||||||
use crate::mimeparser::SystemMessage;
|
use crate::mimeparser::SystemMessage;
|
||||||
@@ -70,15 +68,14 @@ pub enum ImexMode {
|
|||||||
///
|
///
|
||||||
/// Only one import-/export-progress can run at the same time.
|
/// Only one import-/export-progress can run at the same time.
|
||||||
/// To cancel an import-/export-progress, use dc_stop_ongoing_process().
|
/// To cancel an import-/export-progress, use dc_stop_ongoing_process().
|
||||||
pub async fn imex(context: &Context, what: ImexMode, param1: Option<impl AsRef<Path>>) {
|
pub async fn imex(
|
||||||
let mut param = Params::new();
|
context: &Context,
|
||||||
param.set_int(Param::Cmd, what as i32);
|
what: ImexMode,
|
||||||
if let Some(param1) = param1 {
|
param1: Option<impl AsRef<Path>>,
|
||||||
param.set(Param::Arg, param1.as_ref().to_string_lossy());
|
) -> Result<()> {
|
||||||
}
|
job_imex_imap(context, what, param1).await?;
|
||||||
|
|
||||||
// job::kill_action(context, Action::ImexImap).await;
|
Ok(())
|
||||||
// job::add(context, Action::ImexImap, 0, param, 0).await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the filename of the backup found (otherwise an error)
|
/// Returns the filename of the backup found (otherwise an error)
|
||||||
@@ -369,34 +366,35 @@ pub fn normalize_setup_code(s: &str) -> String {
|
|||||||
out
|
out
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn job_imex_imap(context: &Context, job: &Job) -> Result<()> {
|
pub async fn job_imex_imap(
|
||||||
|
context: &Context,
|
||||||
|
what: ImexMode,
|
||||||
|
param: Option<impl AsRef<Path>>,
|
||||||
|
) -> Result<()> {
|
||||||
ensure!(context.alloc_ongoing().await, "could not allocate ongoing");
|
ensure!(context.alloc_ongoing().await, "could not allocate ongoing");
|
||||||
let what: Option<ImexMode> = job.param.get_int(Param::Cmd).and_then(ImexMode::from_i32);
|
ensure!(!param.is_some(), "No Import/export dir/file given.");
|
||||||
let param = job.param.get(Param::Arg).unwrap_or_default();
|
|
||||||
|
|
||||||
ensure!(!param.is_empty(), "No Import/export dir/file given.");
|
|
||||||
info!(context, "Import/export process started.");
|
info!(context, "Import/export process started.");
|
||||||
context.call_cb(Event::ImexProgress(10));
|
context.call_cb(Event::ImexProgress(10));
|
||||||
|
|
||||||
ensure!(context.sql.is_open().await, "Database not opened.");
|
ensure!(context.sql.is_open().await, "Database not opened.");
|
||||||
if what == Some(ImexMode::ExportBackup) || what == Some(ImexMode::ExportSelfKeys) {
|
|
||||||
|
let path = param.unwrap();
|
||||||
|
if what == ImexMode::ExportBackup || what == ImexMode::ExportSelfKeys {
|
||||||
// before we export anything, make sure the private key exists
|
// before we export anything, make sure the private key exists
|
||||||
if e2ee::ensure_secret_key_exists(context).await.is_err() {
|
if e2ee::ensure_secret_key_exists(context).await.is_err() {
|
||||||
context.free_ongoing().await;
|
context.free_ongoing().await;
|
||||||
bail!("Cannot create private key or private key not available.");
|
bail!("Cannot create private key or private key not available.");
|
||||||
} else {
|
} else {
|
||||||
dc_create_folder(context, ¶m).await?;
|
dc_create_folder(context, &path).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let path = Path::new(param);
|
|
||||||
let success = match what {
|
let success = match what {
|
||||||
Some(ImexMode::ExportSelfKeys) => export_self_keys(context, path).await,
|
ImexMode::ExportSelfKeys => export_self_keys(context, path).await,
|
||||||
Some(ImexMode::ImportSelfKeys) => import_self_keys(context, path).await,
|
ImexMode::ImportSelfKeys => import_self_keys(context, path).await,
|
||||||
Some(ImexMode::ExportBackup) => export_backup(context, path).await,
|
ImexMode::ExportBackup => export_backup(context, path).await,
|
||||||
Some(ImexMode::ImportBackup) => import_backup(context, path).await,
|
ImexMode::ImportBackup => import_backup(context, path).await,
|
||||||
None => {
|
|
||||||
bail!("unknown IMEX type");
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
context.free_ongoing().await;
|
context.free_ongoing().await;
|
||||||
match success {
|
match success {
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ use async_std::task;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use crate::context::Context;
|
use crate::context::Context;
|
||||||
|
use crate::error::Error;
|
||||||
use crate::imap::Imap;
|
use crate::imap::Imap;
|
||||||
use crate::job::{self, Thread};
|
use crate::job::{self, Thread};
|
||||||
use crate::smtp::Smtp;
|
use crate::smtp::Smtp;
|
||||||
@@ -64,6 +65,7 @@ impl Scheduler {
|
|||||||
|
|
||||||
let ctx1 = ctx.clone();
|
let ctx1 = ctx.clone();
|
||||||
task::spawn(async move {
|
task::spawn(async move {
|
||||||
|
info!(ctx1, "starting inbox loop");
|
||||||
let ImapConnectionHandlers {
|
let ImapConnectionHandlers {
|
||||||
mut connection,
|
mut connection,
|
||||||
stop_receiver,
|
stop_receiver,
|
||||||
@@ -71,6 +73,8 @@ impl Scheduler {
|
|||||||
} = inbox_handlers;
|
} = inbox_handlers;
|
||||||
|
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
|
connection.connect_configured(&ctx1).await.unwrap();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// TODO: correct value
|
// TODO: correct value
|
||||||
let probe_network = false;
|
let probe_network = false;
|
||||||
@@ -87,11 +91,32 @@ impl Scheduler {
|
|||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
Ok(None) | Err(async_std::future::TimeoutError { .. }) => {
|
Ok(None) | Err(async_std::future::TimeoutError { .. }) => {
|
||||||
|
let watch_folder =
|
||||||
|
get_watch_folder(&ctx1, "configured_inbox_folder")
|
||||||
|
.await
|
||||||
|
.ok_or_else(|| {
|
||||||
|
Error::WatchFolderNotFound("not-set".to_string())
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
// fetch
|
// fetch
|
||||||
connection.fetch(&ctx1, "TODO").await;
|
connection.fetch(&ctx1, &watch_folder).await.unwrap_or_else(
|
||||||
|
|err| {
|
||||||
|
error!(ctx1, "{}", err);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
// idle
|
// idle
|
||||||
connection.idle(&ctx1, Some("TODO".into())).await;
|
if connection.can_idle() {
|
||||||
|
connection
|
||||||
|
.idle(&ctx1, Some(watch_folder))
|
||||||
|
.await
|
||||||
|
.unwrap_or_else(|err| {
|
||||||
|
error!(ctx1, "{}", err);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
connection.fake_idle(&ctx1, Some(watch_folder)).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -107,6 +132,7 @@ impl Scheduler {
|
|||||||
|
|
||||||
let ctx1 = ctx.clone();
|
let ctx1 = ctx.clone();
|
||||||
task::spawn(async move {
|
task::spawn(async move {
|
||||||
|
info!(ctx1, "starting smtp loop");
|
||||||
let SmtpConnectionHandlers {
|
let SmtpConnectionHandlers {
|
||||||
mut connection,
|
mut connection,
|
||||||
stop_receiver,
|
stop_receiver,
|
||||||
@@ -145,6 +171,8 @@ impl Scheduler {
|
|||||||
fut.race(stop_receiver.recv()).await;
|
fut.race(stop_receiver.recv()).await;
|
||||||
shutdown_sender.send(()).await;
|
shutdown_sender.send(()).await;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
info!(ctx, "scheduler is running");
|
||||||
}
|
}
|
||||||
Scheduler::Running { .. } => {
|
Scheduler::Running { .. } => {
|
||||||
// TODO: return an error
|
// TODO: return an error
|
||||||
@@ -153,38 +181,31 @@ impl Scheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inbox(&self) -> Option<&ImapConnectionState> {
|
|
||||||
match self {
|
|
||||||
Scheduler::Running { ref inbox, .. } => Some(inbox),
|
|
||||||
_ => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn interrupt_inbox(&self) {
|
async fn interrupt_inbox(&self) {
|
||||||
match self {
|
match self {
|
||||||
Scheduler::Running { ref inbox, .. } => inbox.interrupt().await,
|
Scheduler::Running { ref inbox, .. } => inbox.interrupt().await,
|
||||||
_ => panic!("interrupt_imap must be called in running mode"),
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn interrupt_mvbox(&self) {
|
async fn interrupt_mvbox(&self) {
|
||||||
match self {
|
match self {
|
||||||
Scheduler::Running { ref mvbox, .. } => mvbox.interrupt().await,
|
Scheduler::Running { ref mvbox, .. } => mvbox.interrupt().await,
|
||||||
_ => panic!("interrupt_mvbox must be called in running mode"),
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn interrupt_sentbox(&self) {
|
async fn interrupt_sentbox(&self) {
|
||||||
match self {
|
match self {
|
||||||
Scheduler::Running { ref sentbox, .. } => sentbox.interrupt().await,
|
Scheduler::Running { ref sentbox, .. } => sentbox.interrupt().await,
|
||||||
_ => panic!("interrupt_sentbox must be called in running mode"),
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn interrupt_smtp(&self) {
|
async fn interrupt_smtp(&self) {
|
||||||
match self {
|
match self {
|
||||||
Scheduler::Running { ref smtp, .. } => smtp.interrupt().await,
|
Scheduler::Running { ref smtp, .. } => smtp.interrupt().await,
|
||||||
_ => panic!("interrupt_smtp must be called in running mode"),
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -338,3 +359,21 @@ struct ImapConnectionHandlers {
|
|||||||
stop_receiver: Receiver<()>,
|
stop_receiver: Receiver<()>,
|
||||||
shutdown_sender: Sender<()>,
|
shutdown_sender: Sender<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_watch_folder(context: &Context, config_name: impl AsRef<str>) -> Option<String> {
|
||||||
|
match context
|
||||||
|
.sql
|
||||||
|
.get_raw_config(context, config_name.as_ref())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Some(name) => Some(name),
|
||||||
|
None => {
|
||||||
|
if config_name.as_ref() == "configured_inbox_folder" {
|
||||||
|
// initialized with old version, so has not set configured_inbox_folder
|
||||||
|
Some("INBOX".to_string())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user