fix(imap): properly disconnect

This commit is contained in:
dignifiedquire
2019-05-12 23:12:38 +02:00
parent 693474d5be
commit 173d7cd767
3 changed files with 80 additions and 134 deletions

View File

@@ -1,4 +1,5 @@
use std::ffi::{CStr, CString};
use std::net;
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, SystemTime};
@@ -30,7 +31,7 @@ pub struct Imap {
precheck_imf: dc_precheck_imf_t,
receive_imf: dc_receive_imf_t,
session: Arc<Mutex<Option<Session>>>,
session: Arc<Mutex<(Option<Session>, Option<net::TcpStream>)>>,
}
#[derive(Debug)]
@@ -42,39 +43,20 @@ pub enum FolderMeaning {
pub enum Client {
Secure(
imap::Client<native_tls::TlsStream<std::net::TcpStream>>,
std::net::TcpStream,
imap::Client<native_tls::TlsStream<net::TcpStream>>,
net::TcpStream,
),
Insecure(imap::Client<std::net::TcpStream>, std::net::TcpStream),
Insecure(imap::Client<net::TcpStream>, net::TcpStream),
}
pub enum Session {
Secure(
imap::Session<native_tls::TlsStream<std::net::TcpStream>>,
std::net::TcpStream,
),
Insecure(imap::Session<std::net::TcpStream>, std::net::TcpStream),
Secure(imap::Session<native_tls::TlsStream<net::TcpStream>>),
Insecure(imap::Session<net::TcpStream>),
}
pub enum IdleHandle<'a> {
Secure(imap::extensions::idle::Handle<'a, native_tls::TlsStream<std::net::TcpStream>>),
Insecure(imap::extensions::idle::Handle<'a, std::net::TcpStream>),
}
impl<'a> From<imap::extensions::idle::Handle<'a, native_tls::TlsStream<std::net::TcpStream>>>
for IdleHandle<'a>
{
fn from(
handle: imap::extensions::idle::Handle<'a, native_tls::TlsStream<std::net::TcpStream>>,
) -> Self {
IdleHandle::Secure(handle)
}
}
impl<'a> From<imap::extensions::idle::Handle<'a, std::net::TcpStream>> for IdleHandle<'a> {
fn from(handle: imap::extensions::idle::Handle<'a, std::net::TcpStream>) -> Self {
IdleHandle::Insecure(handle)
}
Secure(imap::extensions::idle::Handle<'a, native_tls::TlsStream<net::TcpStream>>),
Insecure(imap::extensions::idle::Handle<'a, net::TcpStream>),
}
impl<'a> IdleHandle<'a> {
@@ -94,11 +76,11 @@ impl<'a> IdleHandle<'a> {
}
impl Client {
pub fn connect_secure<A: std::net::ToSocketAddrs, S: AsRef<str>>(
pub fn connect_secure<A: net::ToSocketAddrs, S: AsRef<str>>(
addr: A,
domain: S,
) -> imap::error::Result<Self> {
let stream = std::net::TcpStream::connect(addr)?;
let stream = net::TcpStream::connect(addr)?;
let tls = native_tls::TlsConnector::builder()
.danger_accept_invalid_hostnames(true)
.build()
@@ -113,8 +95,8 @@ impl Client {
Ok(Client::Secure(client, stream))
}
pub fn connect_insecure<A: std::net::ToSocketAddrs>(addr: A) -> imap::error::Result<Self> {
let stream = std::net::TcpStream::connect(addr)?;
pub fn connect_insecure<A: net::ToSocketAddrs>(addr: A) -> imap::error::Result<Self> {
let stream = net::TcpStream::connect(addr)?;
let client = imap::Client::new(stream.try_clone().unwrap());
// TODO: Read greeting
@@ -143,14 +125,14 @@ impl Client {
self,
username: U,
password: P,
) -> Result<Session, (imap::error::Error, Client)> {
) -> Result<(Session, net::TcpStream), (imap::error::Error, Client)> {
match self {
Client::Secure(i, stream) => match i.login(username, password) {
Ok(session) => Ok(Session::Secure(session, stream)),
Ok(session) => Ok((Session::Secure(session), stream)),
Err((err, c)) => Err((err, Client::Secure(c, stream))),
},
Client::Insecure(i, stream) => match i.login(username, password) {
Ok(session) => Ok(Session::Insecure(session, stream)),
Ok(session) => Ok((Session::Insecure(session), stream)),
Err((err, c)) => Err((err, Client::Insecure(c, stream))),
},
}
@@ -162,8 +144,8 @@ impl Session {
&mut self,
) -> imap::error::Result<imap::types::ZeroCopy<imap::types::Capabilities>> {
match self {
Session::Secure(i, _) => i.capabilities(),
Session::Insecure(i, _) => i.capabilities(),
Session::Secure(i) => i.capabilities(),
Session::Insecure(i) => i.capabilities(),
}
}
@@ -173,36 +155,29 @@ impl Session {
mailbox_pattern: Option<&str>,
) -> imap::error::Result<imap::types::ZeroCopy<Vec<imap::types::Name>>> {
match self {
Session::Secure(i, _) => i.list(reference_name, mailbox_pattern),
Session::Insecure(i, _) => i.list(reference_name, mailbox_pattern),
Session::Secure(i) => i.list(reference_name, mailbox_pattern),
Session::Insecure(i) => i.list(reference_name, mailbox_pattern),
}
}
pub fn create<S: AsRef<str>>(&mut self, mailbox_name: S) -> imap::error::Result<()> {
match self {
Session::Secure(i, _) => i.subscribe(mailbox_name),
Session::Insecure(i, _) => i.subscribe(mailbox_name),
Session::Secure(i) => i.subscribe(mailbox_name),
Session::Insecure(i) => i.subscribe(mailbox_name),
}
}
pub fn subscribe<S: AsRef<str>>(&mut self, mailbox: S) -> imap::error::Result<()> {
match self {
Session::Secure(i, _) => i.subscribe(mailbox),
Session::Insecure(i, _) => i.subscribe(mailbox),
Session::Secure(i) => i.subscribe(mailbox),
Session::Insecure(i) => i.subscribe(mailbox),
}
}
pub fn close(&mut self) -> imap::error::Result<()> {
match self {
Session::Secure(i, _) => i.close(),
Session::Insecure(i, _) => i.close(),
}
}
pub fn shutdown(&self) -> std::io::Result<()> {
match self {
Session::Secure(_, stream) => stream.shutdown(std::net::Shutdown::Both),
Session::Insecure(_, stream) => stream.shutdown(std::net::Shutdown::Both),
Session::Secure(i) => i.close(),
Session::Insecure(i) => i.close(),
}
}
@@ -211,8 +186,8 @@ impl Session {
mailbox_name: S,
) -> imap::error::Result<imap::types::Mailbox> {
match self {
Session::Secure(i, _) => i.select(mailbox_name),
Session::Insecure(i, _) => i.select(mailbox_name),
Session::Secure(i) => i.select(mailbox_name),
Session::Insecure(i) => i.select(mailbox_name),
}
}
@@ -226,8 +201,8 @@ impl Session {
S2: AsRef<str>,
{
match self {
Session::Secure(i, _) => i.fetch(sequence_set, query),
Session::Insecure(i, _) => i.fetch(sequence_set, query),
Session::Secure(i) => i.fetch(sequence_set, query),
Session::Insecure(i) => i.fetch(sequence_set, query),
}
}
@@ -241,15 +216,15 @@ impl Session {
S2: AsRef<str>,
{
match self {
Session::Secure(i, _) => i.uid_fetch(uid_set, query),
Session::Insecure(i, _) => i.uid_fetch(uid_set, query),
Session::Secure(i) => i.uid_fetch(uid_set, query),
Session::Insecure(i) => i.uid_fetch(uid_set, query),
}
}
pub fn idle(&mut self) -> imap::error::Result<IdleHandle> {
match self {
Session::Secure(i, _) => i.idle().map(Into::into),
Session::Insecure(i, _) => i.idle().map(Into::into),
Session::Secure(i) => i.idle().map(|h| IdleHandle::Secure(h)),
Session::Insecure(i) => i.idle().map(|h| IdleHandle::Insecure(h)),
}
}
@@ -263,8 +238,8 @@ impl Session {
S2: AsRef<str>,
{
match self {
Session::Secure(i, _) => i.uid_store(uid_set, query),
Session::Insecure(i, _) => i.uid_store(uid_set, query),
Session::Secure(i) => i.uid_store(uid_set, query),
Session::Insecure(i) => i.uid_store(uid_set, query),
}
}
@@ -274,8 +249,8 @@ impl Session {
mailbox_name: S2,
) -> imap::error::Result<()> {
match self {
Session::Secure(i, _) => i.uid_mv(uid_set, mailbox_name),
Session::Insecure(i, _) => i.uid_mv(uid_set, mailbox_name),
Session::Secure(i) => i.uid_mv(uid_set, mailbox_name),
Session::Insecure(i) => i.uid_mv(uid_set, mailbox_name),
}
}
@@ -285,8 +260,8 @@ impl Session {
mailbox_name: S2,
) -> imap::error::Result<()> {
match self {
Session::Secure(i, _) => i.uid_copy(uid_set, mailbox_name),
Session::Insecure(i, _) => i.uid_copy(uid_set, mailbox_name),
Session::Secure(i) => i.uid_copy(uid_set, mailbox_name),
Session::Insecure(i) => i.uid_copy(uid_set, mailbox_name),
}
}
}
@@ -339,8 +314,7 @@ impl Imap {
receive_imf: dc_receive_imf_t,
) -> Self {
Imap {
session: Arc::new(Mutex::new(None)),
// idle: Arc::new(Mutex::new(None)),
session: Arc::new(Mutex::new((None, None))),
config: Arc::new(RwLock::new(ImapConfig::default())),
watch: Arc::new((Mutex::new(false), Condvar::new())),
get_config,
@@ -351,7 +325,7 @@ impl Imap {
}
pub fn is_connected(&self) -> bool {
self.session.lock().unwrap().is_some()
self.session.lock().unwrap().1.is_some()
}
pub fn should_reconnect(&self) -> bool {
@@ -395,7 +369,7 @@ impl Imap {
Ok(client) => {
// TODO: handle oauth2
match client.login(imap_user, imap_pw) {
Ok(mut session) => {
Ok((mut session, stream)) => {
// TODO: error handling
let caps = session.capabilities().unwrap();
let can_idle = caps.has("IDLE");
@@ -420,7 +394,7 @@ impl Imap {
config.imap_pw = Some(imap_pw.into());
config.server_flags = Some(server_flags);
*self.session.lock().unwrap() = Some(session);
*self.session.lock().unwrap() = (Some(session), Some(stream));
1
}
@@ -460,16 +434,18 @@ impl Imap {
}
pub fn disconnect(&self, context: &dc_context_t) {
let session = self.session.lock().unwrap().take();
let session = self.session.lock().unwrap().0.take();
if session.is_some() {
let mut session = session.unwrap();
match session.close() {
match session.unwrap().close() {
Ok(_) => {}
Err(err) => {
eprintln!("failed to close connection: {:?}", err);
}
}
match session.shutdown() {
}
let stream = self.session.lock().unwrap().1.take();
if stream.is_some() {
match stream.unwrap().shutdown(net::Shutdown::Both) {
Ok(_) => {}
Err(err) => {
eprintln!("failed to shutdown connection: {:?}", err);
@@ -545,7 +521,7 @@ impl Imap {
);
// a CLOSE-SELECT is considerably faster than an EXPUNGE-SELECT, see https://tools.ietf.org/html/rfc3501#section-6.4.2
if let Some(ref mut session) = *self.session.lock().unwrap() {
if let Some(ref mut session) = self.session.lock().unwrap().0 {
session.close().expect("failed to expunge");
} else {
return 0;
@@ -555,7 +531,7 @@ impl Imap {
// select new folder
if let Some(folder) = folder {
if let Some(ref mut session) = *self.session.lock().unwrap() {
if let Some(ref mut session) = self.session.lock().unwrap().0 {
match session.select(folder) {
Ok(mailbox) => {
self.config.write().unwrap().selected_mailbox = Some(mailbox);
@@ -658,7 +634,7 @@ impl Imap {
return 0;
}
let list = if let Some(ref mut session) = *self.session.lock().unwrap() {
let list = if let Some(ref mut session) = self.session.lock().unwrap().0 {
// `FETCH <message sequence number> (UID)`
let set = format!("{}", mailbox.exists);
match session.fetch(set, PREFETCH_FLAGS) {
@@ -702,7 +678,7 @@ impl Imap {
let mut read_errors = 0;
let mut new_last_seen_uid = 0;
let list = if let Some(ref mut session) = *self.session.lock().unwrap() {
let list = if let Some(ref mut session) = self.session.lock().unwrap().0 {
// fetch messages with larger UID than the last one seen
// (`UID FETCH lastseenuid+1:*)`, see RFC 4549
let set = format!("{}:*", last_seen_uid + 1);
@@ -825,7 +801,7 @@ impl Imap {
let mut retry_later = false;
let msgs = if let Some(ref mut session) = *self.session.lock().unwrap() {
let msgs = if let Some(ref mut session) = self.session.lock().unwrap().0 {
let set = format!("{}", server_uid);
match session.uid_fetch(set, BODY_FLAGS) {
Ok(msgs) => msgs,
@@ -920,30 +896,7 @@ impl Imap {
return self.fake_idle(context);
}
// let mut session = self.session.lock().unwrap().take().unwrap();
// match RentSession::try_new(Box::new(session), |session| session.idle()) {
// Ok(idle) => {
// *self.idle.lock().unwrap() = Some(idle);
// }
// Err(err) => {
// eprintln!("imap idle error: {:?}", err.0);
// unsafe {
// dc_log_warning(
// context,
// 0,
// b"IMAP-IDLE: Cannot start.\x00" as *const u8 as *const libc::c_char,
// );
// }
// // put session back
// *self.session.lock().unwrap() = Some(*err.1);
// return self.fake_idle(context);
// }
// }
let mut session = self.session.lock().unwrap().take().unwrap();
let mut session = self.session.lock().unwrap().0.take().unwrap();
let mut idle = match session.idle() {
Ok(idle) => idle,
Err(err) => {
@@ -969,7 +922,7 @@ impl Imap {
}
// put session back
*self.session.lock().unwrap() = Some(session);
self.session.lock().unwrap().0 = Some(session);
}
fn fake_idle(&self, context: &dc_context_t) {
@@ -989,7 +942,6 @@ impl Imap {
};
let &(ref lock, ref cvar) = &*self.watch.clone();
let mut watch = lock.lock().unwrap();
loop {
@@ -1019,8 +971,8 @@ impl Imap {
}
pub fn interrupt_idle(&self) {
if let Some(ref mut session) = *self.session.lock().unwrap() {
match session.shutdown() {
if let Some(ref mut stream) = self.session.lock().unwrap().1 {
match stream.shutdown(net::Shutdown::Both) {
Ok(_) => {}
Err(err) => {
eprintln!("failed to disconnect: {}", err);
@@ -1081,7 +1033,7 @@ impl Imap {
CString::new(folder.as_ref().to_owned()).unwrap().as_ptr(),
);
} else {
let moved = if let Some(ref mut session) = *self.session.lock().unwrap() {
let moved = if let Some(ref mut session) = self.session.lock().unwrap().0 {
match session.uid_mv(&set, &dest_folder) {
Ok(_) => {
res = DC_SUCCESS;
@@ -1108,7 +1060,7 @@ impl Imap {
};
if !moved {
let copied = if let Some(ref mut session) = *self.session.lock().unwrap() {
let copied = if let Some(ref mut session) = self.session.lock().unwrap().0 {
match session.uid_copy(&set, &dest_folder) {
Ok(_) => true,
Err(err) => {
@@ -1150,7 +1102,7 @@ impl Imap {
}
fn add_flag<S: AsRef<str>>(&self, server_uid: u32, flag: S) -> usize {
if let Some(ref mut session) = *self.session.lock().unwrap() {
if let Some(ref mut session) = self.session.lock().unwrap().0 {
let set = format!("{}", server_uid);
let query = format!("+FLAGS ({})", flag.as_ref());
match session.uid_store(&set, &query) {
@@ -1260,7 +1212,7 @@ impl Imap {
.expect("just selected folder");
if can_create_flag {
let fetched_msgs = if let Some(ref mut session) = *self.session.lock().unwrap()
let fetched_msgs = if let Some(ref mut session) = self.session.lock().unwrap().0
{
match session.uid_fetch(set, FETCH_FLAGS) {
Ok(res) => Some(res),
@@ -1354,7 +1306,7 @@ impl Imap {
);
} else {
let set = format!("{}", server_uid);
if let Some(ref mut session) = *self.session.lock().unwrap() {
if let Some(ref mut session) = self.session.lock().unwrap().0 {
match session.uid_fetch(set, PREFETCH_FLAGS) {
Ok(msgs) => {
if msgs.is_empty()
@@ -1441,7 +1393,7 @@ impl Imap {
b"DeltaChat\x00" as *const u8 as *const libc::c_char
);
if let Some(ref mut session) = *self.session.lock().unwrap() {
if let Some(ref mut session) = self.session.lock().unwrap().0 {
match session.create("DeltaChat") {
Ok(_) => {
mvbox_folder = Some("DeltaChat".into());
@@ -1507,7 +1459,7 @@ impl Imap {
&self,
context: &dc_context_t,
) -> Option<imap::types::ZeroCopy<Vec<imap::types::Name>>> {
if let Some(ref mut session) = *self.session.lock().unwrap() {
if let Some(ref mut session) = self.session.lock().unwrap().0 {
// TODO: use xlist when available
match session.list(Some(""), Some("*")) {
Ok(list) => {

View File

@@ -56,8 +56,8 @@ pub unsafe fn dc_perform_imap_jobs(context: &dc_context_t) {
);
let probe_imap_network = *context.probe_imap_network.clone().read().unwrap();
*context.probe_imap_network.clone().write().unwrap() = 0;
*context.perform_inbox_jobs_needed.clone().write().unwrap() = 0;
*context.probe_imap_network.write().unwrap() = 0;
*context.perform_inbox_jobs_needed.write().unwrap() = 0;
dc_job_perform(context, 100, probe_imap_network);
dc_log_info(
@@ -293,10 +293,10 @@ unsafe fn dc_job_update(context: &dc_context_t, job: &dc_job_t) {
sqlite3_finalize(stmt);
}
unsafe fn dc_suspend_smtp_thread(context: &dc_context_t, suspend: libc::c_int) {
context.smtp_state.clone().0.lock().unwrap().suspended = suspend;
context.smtp_state.0.lock().unwrap().suspended = suspend;
if 0 != suspend {
loop {
if context.smtp_state.clone().0.lock().unwrap().doing_jobs == 0 {
if context.smtp_state.0.lock().unwrap().doing_jobs == 0 {
return;
}
usleep((300i32 * 1000i32) as libc::useconds_t);
@@ -311,7 +311,7 @@ unsafe fn dc_job_do_DC_JOB_SEND(context: &dc_context_t, job: &mut dc_job_t) {
let mut recipients: *mut libc::c_char = 0 as *mut libc::c_char;
let mut stmt: *mut sqlite3_stmt = 0 as *mut sqlite3_stmt;
/* connect to SMTP server, if not yet done */
if !context.smtp.clone().lock().unwrap().is_connected() {
if !context.smtp.lock().unwrap().is_connected() {
let loginparam: *mut dc_loginparam_t = dc_loginparam_new();
dc_loginparam_read(
context,
@@ -319,12 +319,7 @@ unsafe fn dc_job_do_DC_JOB_SEND(context: &dc_context_t, job: &mut dc_job_t) {
&context.sql.clone().read().unwrap(),
b"configured_\x00" as *const u8 as *const libc::c_char,
);
let connected = context
.smtp
.clone()
.lock()
.unwrap()
.connect(context, loginparam);
let connected = context.smtp.lock().unwrap().connect(context, loginparam);
dc_loginparam_unref(loginparam);
if 0 == connected {
dc_job_try_again_later(job, 3i32, 0 as *const libc::c_char);
@@ -393,12 +388,12 @@ unsafe fn dc_job_do_DC_JOB_SEND(context: &dc_context_t, job: &mut dc_job_t) {
/* send message */
let body =
std::slice::from_raw_parts(buf as *const u8, buf_bytes).to_vec();
if 0 == context.smtp.clone().lock().unwrap().send(
if 0 == context.smtp.lock().unwrap().send(
context,
recipients_list,
body,
) {
context.smtp.clone().lock().unwrap().disconnect();
context.smtp.lock().unwrap().disconnect();
dc_job_try_again_later(
job,
-1i32,
@@ -1254,7 +1249,7 @@ pub unsafe fn dc_maybe_network(context: &dc_context_t) {
let mut state = lock.lock().unwrap();
state.probe_network = 1;
*context.probe_imap_network.clone().write().unwrap() = 1;
*context.probe_imap_network.write().unwrap() = 1;
}
dc_interrupt_smtp_idle(context);

View File

@@ -59,11 +59,11 @@ pub unsafe fn dc_jobthread_suspend(
);
{
jobthread.state.clone().0.lock().unwrap().suspended = 1;
jobthread.state.0.lock().unwrap().suspended = 1;
}
dc_jobthread_interrupt_idle(context, jobthread);
loop {
let using_handle = jobthread.state.clone().0.lock().unwrap().using_handle;
let using_handle = jobthread.state.0.lock().unwrap().using_handle;
if using_handle == 0 {
return;
}
@@ -88,7 +88,7 @@ pub unsafe fn dc_jobthread_suspend(
pub unsafe fn dc_jobthread_interrupt_idle(context: &dc_context_t, jobthread: &dc_jobthread_t) {
{
jobthread.state.clone().0.lock().unwrap().jobs_needed = 1;
jobthread.state.0.lock().unwrap().jobs_needed = 1;
}
dc_log_info(
@@ -98,7 +98,6 @@ pub unsafe fn dc_jobthread_interrupt_idle(context: &dc_context_t, jobthread: &dc
jobthread.name,
);
println!("jobthread interrupt, waiting for lock");
jobthread.imap.interrupt_idle();
let &(ref lock, ref cvar) = &*jobthread.state.clone();
@@ -157,7 +156,7 @@ pub unsafe fn dc_jobthread_fetch(
}
}
jobthread.state.clone().0.lock().unwrap().using_handle = 0;
jobthread.state.0.lock().unwrap().using_handle = 0;
}
/* ******************************************************************************
@@ -258,5 +257,5 @@ pub unsafe fn dc_jobthread_idle(
jobthread.name,
);
jobthread.state.clone().0.lock().unwrap().using_handle = 0;
jobthread.state.0.lock().unwrap().using_handle = 0;
}