mirror of
https://github.com/chatmail/core.git
synced 2026-05-08 09:26:29 +03:00
fix: do not block transfer on db import
This commit is contained in:
@@ -39,9 +39,9 @@ use iroh::provider::{DataSource, Event, Provider, Ticket};
|
|||||||
use iroh::Hash;
|
use iroh::Hash;
|
||||||
use tokio::fs::{self, File};
|
use tokio::fs::{self, File};
|
||||||
use tokio::io::{self, AsyncWriteExt, BufWriter};
|
use tokio::io::{self, AsyncWriteExt, BufWriter};
|
||||||
use tokio::sync::broadcast;
|
|
||||||
use tokio::sync::broadcast::error::RecvError;
|
use tokio::sync::broadcast::error::RecvError;
|
||||||
use tokio::task::JoinHandle;
|
use tokio::sync::{broadcast, Mutex};
|
||||||
|
use tokio::task::{JoinHandle, JoinSet};
|
||||||
use tokio_stream::wrappers::ReadDirStream;
|
use tokio_stream::wrappers::ReadDirStream;
|
||||||
|
|
||||||
use crate::blob::BlobDirContents;
|
use crate::blob::BlobDirContents;
|
||||||
@@ -50,7 +50,7 @@ use crate::context::Context;
|
|||||||
use crate::qr::Qr;
|
use crate::qr::Qr;
|
||||||
use crate::{e2ee, EventType};
|
use crate::{e2ee, EventType};
|
||||||
|
|
||||||
use super::{export_database, DeleteOnDrop, DBFILE_BACKUP_NAME};
|
use super::{export_database, DBFILE_BACKUP_NAME};
|
||||||
|
|
||||||
/// Provide or send a backup of this device.
|
/// Provide or send a backup of this device.
|
||||||
///
|
///
|
||||||
@@ -423,7 +423,9 @@ async fn transfer_from_provider(
|
|||||||
connected = true;
|
connected = true;
|
||||||
async { Ok(()) }
|
async { Ok(()) }
|
||||||
};
|
};
|
||||||
let on_blob = |hash, reader, name| on_blob(context, &progress, ticket, hash, reader, name);
|
let jobs = Mutex::new(JoinSet::default());
|
||||||
|
let on_blob =
|
||||||
|
|hash, reader, name| on_blob(context, &progress, &jobs, ticket, hash, reader, name);
|
||||||
let res = iroh::get::run(
|
let res = iroh::get::run(
|
||||||
ticket.hash,
|
ticket.hash,
|
||||||
ticket.token,
|
ticket.token,
|
||||||
@@ -437,6 +439,12 @@ async fn transfer_from_provider(
|
|||||||
on_blob,
|
on_blob,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
let mut jobs = jobs.lock().await;
|
||||||
|
while let Some(job) = jobs.join_next().await {
|
||||||
|
job.context("job failed").map_err(TransferError::Other)?;
|
||||||
|
}
|
||||||
|
|
||||||
drop(progress);
|
drop(progress);
|
||||||
res.map(|_| ()).map_err(|err| match connected {
|
res.map(|_| ()).map_err(|err| match connected {
|
||||||
true => TransferError::Other(err),
|
true => TransferError::Other(err),
|
||||||
@@ -451,6 +459,7 @@ async fn transfer_from_provider(
|
|||||||
async fn on_blob(
|
async fn on_blob(
|
||||||
context: &Context,
|
context: &Context,
|
||||||
progress: &ProgressEmitter,
|
progress: &ProgressEmitter,
|
||||||
|
jobs: &Mutex<JoinSet<()>>,
|
||||||
ticket: &Ticket,
|
ticket: &Ticket,
|
||||||
_hash: Hash,
|
_hash: Hash,
|
||||||
mut reader: DataStream,
|
mut reader: DataStream,
|
||||||
@@ -473,11 +482,6 @@ async fn on_blob(
|
|||||||
let blobname = name.rsplit('/').next().context("malformatted blob name")?;
|
let blobname = name.rsplit('/').next().context("malformatted blob name")?;
|
||||||
context.get_blobdir().join(blobname)
|
context.get_blobdir().join(blobname)
|
||||||
};
|
};
|
||||||
let _guard = if name.starts_with("db/") {
|
|
||||||
Some(DeleteOnDrop(path.clone()))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut wrapped_reader = progress.wrap_async_read(&mut reader);
|
let mut wrapped_reader = progress.wrap_async_read(&mut reader);
|
||||||
let file = File::create(&path).await?;
|
let file = File::create(&path).await?;
|
||||||
@@ -486,14 +490,21 @@ async fn on_blob(
|
|||||||
file.flush().await?;
|
file.flush().await?;
|
||||||
|
|
||||||
if name.starts_with("db/") {
|
if name.starts_with("db/") {
|
||||||
context
|
let context = context.clone();
|
||||||
.sql
|
let token = ticket.token.to_string();
|
||||||
.import(&path, ticket.token.to_string())
|
jobs.lock().await.spawn(async move {
|
||||||
.await
|
if let Err(err) = context.sql.import(&path, token).await {
|
||||||
.context("cannot import database")?;
|
error!(context, "cannot import database: {:#?}", err);
|
||||||
fs::remove_file(&path)
|
}
|
||||||
.await
|
if let Err(err) = fs::remove_file(&path).await {
|
||||||
.with_context(|| format!("database import file: {}", path.display()))?;
|
error!(
|
||||||
|
context,
|
||||||
|
"failed to delete database import file '{}': {:#?}",
|
||||||
|
path.display(),
|
||||||
|
err,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
Ok(reader)
|
Ok(reader)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user