Merge v1.112.2 into master

This commit is contained in:
link2xt
2023-03-30 15:42:27 +00:00
13 changed files with 271 additions and 420 deletions

View File

@@ -2727,12 +2727,14 @@ pub async fn get_chat_media(
"SELECT id
FROM msgs
WHERE (1=? OR chat_id=?)
AND chat_id != ?
AND (type=? OR type=? OR type=?)
AND hidden=0
ORDER BY timestamp, id;",
paramsv![
chat_id.is_none(),
chat_id.unwrap_or_else(|| ChatId::new(0)),
DC_CHAT_ID_TRASH,
msg_type,
if msg_type2 != Viewtype::Unknown {
msg_type2
@@ -3795,6 +3797,7 @@ mod tests {
use crate::chatlist::{get_archived_cnt, Chatlist};
use crate::constants::{DC_GCL_ARCHIVED_ONLY, DC_GCL_NO_SPECIALS};
use crate::contact::{Contact, ContactAddress};
use crate::message::delete_msgs;
use crate::receive_imf::receive_imf;
use crate::test_utils::TestContext;
@@ -5977,7 +5980,7 @@ mod tests {
include_bytes!("../test-data/image/avatar64x64.png"),
)
.await?;
send_media(
let second_image_msg_id = send_media(
&t,
chat_id2,
Viewtype::Image,
@@ -6079,6 +6082,21 @@ mod tests {
4
);
// Delete an image.
delete_msgs(&t, &[second_image_msg_id]).await?;
assert_eq!(
get_chat_media(
&t,
None,
Viewtype::Image,
Viewtype::Sticker,
Viewtype::Webxdc,
)
.await?
.len(),
3
);
Ok(())
}
}

View File

@@ -22,7 +22,6 @@
//! getter can not connect to an impersonated provider and the provider does not offer the
//! download to an impersonated getter.
use std::cmp::Ordering;
use std::future::Future;
use std::net::Ipv4Addr;
use std::ops::Deref;
@@ -33,7 +32,8 @@ use std::task::Poll;
use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
use async_channel::Receiver;
use futures_lite::StreamExt;
use iroh::get::{DataStream, Options};
use iroh::blobs::Collection;
use iroh::get::DataStream;
use iroh::progress::ProgressEmitter;
use iroh::protocol::AuthToken;
use iroh::provider::{DataSource, Event, Provider, Ticket};
@@ -44,6 +44,7 @@ use tokio::sync::broadcast::error::RecvError;
use tokio::sync::{broadcast, Mutex};
use tokio::task::{JoinHandle, JoinSet};
use tokio_stream::wrappers::ReadDirStream;
use tokio_util::sync::CancellationToken;
use crate::blob::BlobDirContents;
use crate::chat::delete_and_reset_all_device_msgs;
@@ -53,6 +54,8 @@ use crate::{e2ee, EventType};
use super::{export_database, DBFILE_BACKUP_NAME};
const MAX_CONCURRENT_DIALS: u8 = 16;
/// Provide or send a backup of this device.
///
/// This creates a backup of the current device and starts a service which offers another
@@ -72,6 +75,8 @@ pub struct BackupProvider {
handle: JoinHandle<Result<()>>,
/// The ticket to retrieve the backup collection.
ticket: Ticket,
/// Guard to cancel the provider on drop.
_drop_guard: tokio_util::sync::DropGuard,
}
impl BackupProvider {
@@ -123,10 +128,12 @@ impl BackupProvider {
return Err(err);
}
};
let drop_token = CancellationToken::new();
let handle = {
let context = context.clone();
let drop_token = drop_token.clone();
tokio::spawn(async move {
let res = Self::watch_provider(&context, provider, cancel_token).await;
let res = Self::watch_provider(&context, provider, cancel_token, drop_token).await;
context.free_ongoing().await;
// Explicit drop to move the guards into this future
@@ -135,7 +142,11 @@ impl BackupProvider {
res
})
};
Ok(Self { handle, ticket })
Ok(Self {
handle,
ticket,
_drop_guard: drop_token.drop_guard(),
})
}
/// Creates the provider task.
@@ -171,7 +182,7 @@ impl BackupProvider {
.spawn()?;
context.emit_event(SendProgress::ProviderListening.into());
info!(context, "Waiting for remote to connect");
let ticket = provider.ticket(hash);
let ticket = provider.ticket(hash)?;
Ok((provider, ticket))
}
@@ -189,8 +200,8 @@ impl BackupProvider {
context: &Context,
mut provider: Provider,
cancel_token: Receiver<()>,
drop_token: CancellationToken,
) -> Result<()> {
// _dbfile exists so we can clean up the file once it is no longer needed
let mut events = provider.subscribe();
let mut total_size = 0;
let mut current_size = 0;
@@ -250,8 +261,12 @@ impl BackupProvider {
},
_ = cancel_token.recv() => {
provider.shutdown();
break Err(anyhow!("BackupSender cancelled"));
break Err(anyhow!("BackupProvider cancelled"));
},
_ = drop_token.cancelled() => {
provider.shutdown();
break Err(anyhow!("BackupProvider dropped"));
}
}
};
match &res {
@@ -387,122 +402,71 @@ pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
}
async fn get_backup_inner(context: &Context, qr: Qr) -> Result<()> {
let mut ticket = match qr {
let ticket = match qr {
Qr::Backup { ticket } => ticket,
_ => bail!("QR code for backup must be of type DCBACKUP"),
};
if ticket.addrs.is_empty() {
bail!("ticket is missing addresses to dial");
}
// Crude sorting, most local wifi's are in the 192.168.0.0/24 range so this will try
// them first.
ticket.addrs.sort_by(|a, b| {
let a = a.to_string();
let b = b.to_string();
if a.starts_with("192.168.") && !b.starts_with("192.168.") {
Ordering::Less
} else if b.starts_with("192.168.") && !a.starts_with("192.168.") {
Ordering::Greater
} else {
Ordering::Equal
match transfer_from_provider(context, &ticket).await {
Ok(()) => {
delete_and_reset_all_device_msgs(context).await?;
context.emit_event(ReceiveProgress::Completed.into());
Ok(())
}
});
for addr in &ticket.addrs {
let opts = Options {
addr: *addr,
peer_id: Some(ticket.peer),
keylog: false,
};
info!(context, "attempting to contact {}", addr);
match transfer_from_provider(context, &ticket, opts).await {
Ok(_) => {
delete_and_reset_all_device_msgs(context).await?;
context.emit_event(ReceiveProgress::Completed.into());
return Ok(());
}
Err(TransferError::ConnectionError(err)) => {
warn!(context, "Connection error: {err:#}.");
continue;
}
Err(TransferError::Other(err)) => {
// Clean up any blobs we already wrote.
let readdir = fs::read_dir(context.get_blobdir()).await?;
let mut readdir = ReadDirStream::new(readdir);
while let Some(dirent) = readdir.next().await {
if let Ok(dirent) = dirent {
fs::remove_file(dirent.path()).await.ok();
}
Err(err) => {
// Clean up any blobs we already wrote.
let readdir = fs::read_dir(context.get_blobdir()).await?;
let mut readdir = ReadDirStream::new(readdir);
while let Some(dirent) = readdir.next().await {
if let Ok(dirent) = dirent {
fs::remove_file(dirent.path()).await.ok();
}
context.emit_event(ReceiveProgress::Failed.into());
return Err(err);
}
context.emit_event(ReceiveProgress::Failed.into());
Err(err)
}
}
Err(anyhow!("failed to contact provider"))
}
/// Error during a single transfer attempt.
///
/// Mostly exists to distinguish between `ConnectionError` and any other errors.
#[derive(Debug, thiserror::Error)]
enum TransferError {
#[error("connection error")]
ConnectionError(#[source] anyhow::Error),
#[error("other")]
Other(#[source] anyhow::Error),
}
async fn transfer_from_provider(
context: &Context,
ticket: &Ticket,
opts: Options,
) -> Result<(), TransferError> {
async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<()> {
let progress = ProgressEmitter::new(0, ReceiveProgress::max_blob_progress());
spawn_progress_proxy(context.clone(), progress.subscribe());
let mut connected = false;
let on_connected = || {
context.emit_event(ReceiveProgress::Connected.into());
connected = true;
async { Ok(()) }
};
let on_collection = |collection: &Collection| {
context.emit_event(ReceiveProgress::CollectionReceived.into());
progress.set_total(collection.total_blobs_size());
async { Ok(()) }
};
let jobs = Mutex::new(JoinSet::default());
let on_blob =
|hash, reader, name| on_blob(context, &progress, &jobs, ticket, hash, reader, name);
let res = iroh::get::run(
ticket.hash,
ticket.token,
opts,
// Perform the transfer.
let keylog = false; // Do not enable rustls SSLKEYLOGFILE env var functionality
let stats = iroh::get::run_ticket(
ticket,
keylog,
MAX_CONCURRENT_DIALS,
on_connected,
|collection| {
context.emit_event(ReceiveProgress::CollectionReceived.into());
progress.set_total(collection.total_blobs_size());
async { Ok(()) }
},
on_collection,
on_blob,
)
.await;
.await?;
let mut jobs = jobs.lock().await;
while let Some(job) = jobs.join_next().await {
job.context("job failed").map_err(TransferError::Other)?;
job.context("job failed")?;
}
drop(progress);
match res {
Ok(stats) => {
info!(
context,
"Backup transfer finished, transfer rate is {} Mbps.",
stats.mbits()
);
Ok(())
}
Err(err) => match connected {
true => Err(TransferError::Other(err)),
false => Err(TransferError::ConnectionError(err)),
},
}
info!(
context,
"Backup transfer finished, transfer rate was {} Mbps.",
stats.mbits()
);
Ok(())
}
/// Get callback when a blob is received from the provider.
@@ -544,7 +508,7 @@ async fn on_blob(
if name.starts_with("db/") {
let context = context.clone();
let token = ticket.token.to_string();
let token = ticket.token().to_string();
jobs.lock().await.spawn(async move {
if let Err(err) = context.sql.import(&path, token).await {
error!(context, "cannot import database: {:#?}", err);
@@ -706,4 +670,16 @@ mod tests {
assert_eq!(out, EventType::ImexProgress(progress));
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_drop_provider() {
let mut tcm = TestContextManager::new();
let ctx = tcm.alice().await;
let provider = BackupProvider::prepare(&ctx).await.unwrap();
drop(provider);
ctx.evtracker
.get_matching(|ev| matches!(ev, EventType::ImexProgress(0)))
.await;
}
}