mirror of
https://github.com/chatmail/core.git
synced 2026-05-16 21:36:30 +03:00
combine session and interrupt lock
This commit is contained in:
60
src/imap.rs
60
src/imap.rs
@@ -42,9 +42,8 @@ const SELECT_ALL: &str = "1:*";
|
|||||||
pub struct Imap {
|
pub struct Imap {
|
||||||
config: Arc<RwLock<ImapConfig>>,
|
config: Arc<RwLock<ImapConfig>>,
|
||||||
|
|
||||||
session: Arc<Mutex<Option<Session>>>,
|
session: Arc<Mutex<(Option<Session>, Option<stop_token::StopSource>)>>,
|
||||||
connected: Arc<Mutex<bool>>,
|
connected: Arc<Mutex<bool>>,
|
||||||
interrupt: Arc<Mutex<Option<stop_token::StopSource>>>,
|
|
||||||
|
|
||||||
should_reconnect: AtomicBool,
|
should_reconnect: AtomicBool,
|
||||||
}
|
}
|
||||||
@@ -115,9 +114,8 @@ impl Default for ImapConfig {
|
|||||||
impl Imap {
|
impl Imap {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Imap {
|
Imap {
|
||||||
session: Arc::new(Mutex::new(None)),
|
session: Arc::new(Mutex::new((None, None))),
|
||||||
config: Arc::new(RwLock::new(ImapConfig::default())),
|
config: Arc::new(RwLock::new(ImapConfig::default())),
|
||||||
interrupt: Arc::new(Mutex::new(None)),
|
|
||||||
connected: Arc::new(Mutex::new(false)),
|
connected: Arc::new(Mutex::new(false)),
|
||||||
should_reconnect: AtomicBool::new(false),
|
should_reconnect: AtomicBool::new(false),
|
||||||
}
|
}
|
||||||
@@ -226,7 +224,7 @@ impl Imap {
|
|||||||
|
|
||||||
match login_res {
|
match login_res {
|
||||||
Ok(session) => {
|
Ok(session) => {
|
||||||
*self.session.lock().await = Some(session);
|
self.session.lock().await.0 = Some(session);
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
Err((err, _)) => {
|
Err((err, _)) => {
|
||||||
@@ -251,7 +249,10 @@ impl Imap {
|
|||||||
context,
|
context,
|
||||||
"IMAP unsetup_handle step 2 (acquiring session.lock)"
|
"IMAP unsetup_handle step 2 (acquiring session.lock)"
|
||||||
);
|
);
|
||||||
if let Some(mut session) = self.session.lock().await.take() {
|
let mut lock = self.session.lock().await;
|
||||||
|
drop(lock.1.take());
|
||||||
|
|
||||||
|
if let Some(mut session) = lock.0.take() {
|
||||||
if let Err(err) = session.close().await {
|
if let Err(err) = session.close().await {
|
||||||
warn!(context, "failed to close connection: {:?}", err);
|
warn!(context, "failed to close connection: {:?}", err);
|
||||||
}
|
}
|
||||||
@@ -311,7 +312,7 @@ impl Imap {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
let (teardown, can_idle, has_xlist) = match &mut *self.session.lock().await {
|
let (teardown, can_idle, has_xlist) = match &mut self.session.lock().await.0 {
|
||||||
Some(ref mut session) => match session.capabilities().await {
|
Some(ref mut session) => match session.capabilities().await {
|
||||||
Ok(caps) => {
|
Ok(caps) => {
|
||||||
if !context.sql.is_open() {
|
if !context.sql.is_open() {
|
||||||
@@ -397,7 +398,7 @@ impl Imap {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn select_folder<S: AsRef<str>>(&self, context: &Context, folder: Option<S>) -> usize {
|
async fn select_folder<S: AsRef<str>>(&self, context: &Context, folder: Option<S>) -> usize {
|
||||||
if self.session.lock().await.is_none() {
|
if self.session.lock().await.0.is_none() {
|
||||||
let mut cfg = self.config.write().await;
|
let mut cfg = self.config.write().await;
|
||||||
cfg.selected_folder = None;
|
cfg.selected_folder = None;
|
||||||
cfg.selected_folder_needs_expunge = false;
|
cfg.selected_folder_needs_expunge = false;
|
||||||
@@ -422,7 +423,7 @@ impl Imap {
|
|||||||
|
|
||||||
// A CLOSE-SELECT is considerably faster than an EXPUNGE-SELECT, see
|
// A CLOSE-SELECT is considerably faster than an EXPUNGE-SELECT, see
|
||||||
// https://tools.ietf.org/html/rfc3501#section-6.4.2
|
// https://tools.ietf.org/html/rfc3501#section-6.4.2
|
||||||
if let Some(ref mut session) = &mut *self.session.lock().await {
|
if let Some(ref mut session) = &mut self.session.lock().await.0 {
|
||||||
match session.close().await {
|
match session.close().await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
info!(context, "close/expunge succeeded");
|
info!(context, "close/expunge succeeded");
|
||||||
@@ -441,7 +442,7 @@ impl Imap {
|
|||||||
|
|
||||||
// select new folder
|
// select new folder
|
||||||
if let Some(ref folder) = folder {
|
if let Some(ref folder) = folder {
|
||||||
if let Some(ref mut session) = &mut *self.session.lock().await {
|
if let Some(ref mut session) = &mut self.session.lock().await.0 {
|
||||||
match session.select(folder).await {
|
match session.select(folder).await {
|
||||||
Ok(mailbox) => {
|
Ok(mailbox) => {
|
||||||
let mut config = self.config.write().await;
|
let mut config = self.config.write().await;
|
||||||
@@ -547,7 +548,7 @@ impl Imap {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
let list = if let Some(ref mut session) = &mut *self.session.lock().await {
|
let list = if let Some(ref mut session) = &mut self.session.lock().await.0 {
|
||||||
// `FETCH <message sequence number> (UID)`
|
// `FETCH <message sequence number> (UID)`
|
||||||
let set = format!("{}", mailbox.exists);
|
let set = format!("{}", mailbox.exists);
|
||||||
match session.fetch(set, PREFETCH_FLAGS).await {
|
match session.fetch(set, PREFETCH_FLAGS).await {
|
||||||
@@ -589,7 +590,7 @@ impl Imap {
|
|||||||
let mut read_errors = 0;
|
let mut read_errors = 0;
|
||||||
let mut new_last_seen_uid = 0;
|
let mut new_last_seen_uid = 0;
|
||||||
|
|
||||||
let list = if let Some(ref mut session) = &mut *self.session.lock().await {
|
let list = if let Some(ref mut session) = &mut self.session.lock().await.0 {
|
||||||
// fetch messages with larger UID than the last one seen
|
// fetch messages with larger UID than the last one seen
|
||||||
// (`UID FETCH lastseenuid+1:*)`, see RFC 4549
|
// (`UID FETCH lastseenuid+1:*)`, see RFC 4549
|
||||||
let set = format!("{}:*", last_seen_uid + 1);
|
let set = format!("{}:*", last_seen_uid + 1);
|
||||||
@@ -693,7 +694,7 @@ impl Imap {
|
|||||||
|
|
||||||
let set = format!("{}", server_uid);
|
let set = format!("{}", server_uid);
|
||||||
|
|
||||||
let msgs = if let Some(ref mut session) = &mut *self.session.lock().await {
|
let msgs = if let Some(ref mut session) = &mut self.session.lock().await.0 {
|
||||||
match session.uid_fetch(set, BODY_FLAGS).await {
|
match session.uid_fetch(set, BODY_FLAGS).await {
|
||||||
Ok(msgs) => msgs,
|
Ok(msgs) => msgs,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@@ -763,9 +764,12 @@ impl Imap {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let session = self.session.lock().await.take();
|
let (session, interrupt) = &mut *self.session.lock().await;
|
||||||
|
// make sure to remove any interrupts
|
||||||
|
drop(interrupt.take());
|
||||||
|
|
||||||
let timeout = Duration::from_secs(23 * 60);
|
let timeout = Duration::from_secs(23 * 60);
|
||||||
if let Some(session) = session {
|
if let Some(session) = session.take() {
|
||||||
match session.idle() {
|
match session.idle() {
|
||||||
IdleHandle::Secure(mut handle) => {
|
IdleHandle::Secure(mut handle) => {
|
||||||
if let Err(err) = handle.init().await {
|
if let Err(err) = handle.init().await {
|
||||||
@@ -773,13 +777,13 @@ impl Imap {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
|
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
|
||||||
*self.interrupt.lock().await = Some(interrupt);
|
self.session.lock().await.1 = Some(interrupt);
|
||||||
|
|
||||||
let res = idle_wait.await;
|
let res = idle_wait.await;
|
||||||
info!(context, "Idle finished: {:?}", res);
|
info!(context, "Idle finished: {:?}", res);
|
||||||
match handle.done().await {
|
match handle.done().await {
|
||||||
Ok(session) => {
|
Ok(session) => {
|
||||||
*self.session.lock().await = Some(Session::Secure(session));
|
self.session.lock().await.0 = Some(Session::Secure(session));
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!(context, "Failed to close IMAP IDLE connection: {:?}", err);
|
warn!(context, "Failed to close IMAP IDLE connection: {:?}", err);
|
||||||
@@ -792,13 +796,13 @@ impl Imap {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
|
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
|
||||||
*self.interrupt.lock().await = Some(interrupt);
|
self.session.lock().await.1 = Some(interrupt);
|
||||||
|
|
||||||
let res = idle_wait.await;
|
let res = idle_wait.await;
|
||||||
info!(context, "Idle finished: {:?}", res);
|
info!(context, "Idle finished: {:?}", res);
|
||||||
match handle.done().await {
|
match handle.done().await {
|
||||||
Ok(session) => {
|
Ok(session) => {
|
||||||
*self.session.lock().await = Some(Session::Insecure(session));
|
self.session.lock().await.0 = Some(Session::Insecure(session));
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!(context, "Failed to close IMAP IDLE connection: {:?}", err);
|
warn!(context, "Failed to close IMAP IDLE connection: {:?}", err);
|
||||||
@@ -823,7 +827,7 @@ impl Imap {
|
|||||||
// TODO: More flexible interval
|
// TODO: More flexible interval
|
||||||
let interval = async_std::stream::interval(Duration::from_secs(10));
|
let interval = async_std::stream::interval(Duration::from_secs(10));
|
||||||
let mut interrupt_interval = interrupt.stop_token().stop_stream(interval);
|
let mut interrupt_interval = interrupt.stop_token().stop_stream(interval);
|
||||||
*self.interrupt.lock().await = Some(interrupt);
|
self.session.lock().await.1 = Some(interrupt);
|
||||||
|
|
||||||
while let Some(_) = interrupt_interval.next().await {
|
while let Some(_) = interrupt_interval.next().await {
|
||||||
// check if we want to finish fake-idling.
|
// check if we want to finish fake-idling.
|
||||||
@@ -832,7 +836,7 @@ impl Imap {
|
|||||||
// (setup_handle_if_needed might not know about them if we
|
// (setup_handle_if_needed might not know about them if we
|
||||||
// never successfully connected)
|
// never successfully connected)
|
||||||
if dc_connect_to_configured_imap(context, &self) != 0 {
|
if dc_connect_to_configured_imap(context, &self) != 0 {
|
||||||
self.interrupt.lock().await.take();
|
self.session.lock().await.1.take();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// we are connected, let's see if fetching messages results
|
// we are connected, let's see if fetching messages results
|
||||||
@@ -843,7 +847,7 @@ impl Imap {
|
|||||||
let watch_folder = self.config.read().await.watch_folder.clone();
|
let watch_folder = self.config.read().await.watch_folder.clone();
|
||||||
if let Some(watch_folder) = watch_folder {
|
if let Some(watch_folder) = watch_folder {
|
||||||
if 0 != self.fetch_from_single_folder(context, watch_folder).await {
|
if 0 != self.fetch_from_single_folder(context, watch_folder).await {
|
||||||
self.interrupt.lock().await.take();
|
self.session.lock().await.1.take();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -863,7 +867,7 @@ impl Imap {
|
|||||||
|
|
||||||
pub fn interrupt_idle(&self) {
|
pub fn interrupt_idle(&self) {
|
||||||
task::block_on(async move {
|
task::block_on(async move {
|
||||||
let _ = self.interrupt.lock().await.take();
|
let _ = self.session.lock().await.1.take();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -896,7 +900,7 @@ impl Imap {
|
|||||||
|
|
||||||
let set = format!("{}", uid);
|
let set = format!("{}", uid);
|
||||||
let display_folder_id = format!("{}/{}", folder, uid);
|
let display_folder_id = format!("{}/{}", folder, uid);
|
||||||
if let Some(ref mut session) = &mut *self.session.lock().await {
|
if let Some(ref mut session) = &mut self.session.lock().await.0 {
|
||||||
match session.uid_mv(&set, &dest_folder).await {
|
match session.uid_mv(&set, &dest_folder).await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
emit_event!(
|
emit_event!(
|
||||||
@@ -923,7 +927,7 @@ impl Imap {
|
|||||||
unreachable!();
|
unreachable!();
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(ref mut session) = &mut *self.session.lock().await {
|
if let Some(ref mut session) = &mut self.session.lock().await.0 {
|
||||||
match session.uid_copy(&set, &dest_folder).await {
|
match session.uid_copy(&set, &dest_folder).await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
if !self.add_flag_finalized(context, uid, "\\Deleted").await {
|
if !self.add_flag_finalized(context, uid, "\\Deleted").await {
|
||||||
@@ -967,7 +971,7 @@ impl Imap {
|
|||||||
if self.should_reconnect() {
|
if self.should_reconnect() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if let Some(ref mut session) = &mut *self.session.lock().await {
|
if let Some(ref mut session) = &mut self.session.lock().await.0 {
|
||||||
let query = format!("+FLAGS ({})", flag);
|
let query = format!("+FLAGS ({})", flag);
|
||||||
match session.uid_store(uid_set, &query).await {
|
match session.uid_store(uid_set, &query).await {
|
||||||
Ok(_) => {}
|
Ok(_) => {}
|
||||||
@@ -1050,7 +1054,7 @@ impl Imap {
|
|||||||
|
|
||||||
// double-check that we are deleting the correct message-id
|
// double-check that we are deleting the correct message-id
|
||||||
// this comes at the expense of another imap query
|
// this comes at the expense of another imap query
|
||||||
if let Some(ref mut session) = &mut *self.session.lock().await {
|
if let Some(ref mut session) = &mut self.session.lock().await.0 {
|
||||||
match session.uid_fetch(set, PREFETCH_FLAGS).await {
|
match session.uid_fetch(set, PREFETCH_FLAGS).await {
|
||||||
Ok(msgs) => {
|
Ok(msgs) => {
|
||||||
if msgs.is_empty() {
|
if msgs.is_empty() {
|
||||||
@@ -1115,7 +1119,7 @@ impl Imap {
|
|||||||
|
|
||||||
info!(context, "Configuring IMAP-folders.");
|
info!(context, "Configuring IMAP-folders.");
|
||||||
|
|
||||||
if let Some(ref mut session) = &mut *self.session.lock().await {
|
if let Some(ref mut session) = &mut self.session.lock().await.0 {
|
||||||
let folders = self
|
let folders = self
|
||||||
.list_folders(session, context)
|
.list_folders(session, context)
|
||||||
.await
|
.await
|
||||||
|
|||||||
Reference in New Issue
Block a user