mirror of
https://github.com/chatmail/core.git
synced 2026-04-23 00:16:34 +03:00
use ProgressEmitter from sendme
This commit is contained in:
@@ -23,20 +23,17 @@
|
||||
//! download to an impersonated getter.
|
||||
|
||||
use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicU16, AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
|
||||
use async_channel::Receiver;
|
||||
use futures_lite::StreamExt;
|
||||
use sendme::get::{DataStream, Options};
|
||||
use sendme::progress::ProgressEmitter;
|
||||
use sendme::protocol::AuthToken;
|
||||
use sendme::provider::{DataSource, Event, Provider, Ticket};
|
||||
use sendme::Hash;
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::{self, AsyncRead, AsyncWriteExt, BufWriter};
|
||||
use tokio::io::{self, AsyncWriteExt, BufWriter};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::task::JoinHandle;
|
||||
@@ -454,132 +451,6 @@ impl From<ReceiveProgress> for EventType {
|
||||
}
|
||||
}
|
||||
|
||||
/// A generic progress event emitter.
|
||||
///
|
||||
/// It is created with a total value to reach and at which increments progress should be
|
||||
/// emitted. E.g. when downloading a file of any size but you want percentage increments
|
||||
/// you would create `ProgressEmitter::new(file_size_in_bytes, 100)` and
|
||||
/// [`ProgressEmitter::subscribe`] will yield numbers `1..100` only.
|
||||
///
|
||||
/// Progress is made by calling [`ProgressEmitter::inc`], which can be implicitly done by
|
||||
/// [`ProgressEmitter::wrap_async_read`].
|
||||
#[derive(Debug, Clone)]
|
||||
struct ProgressEmitter {
|
||||
inner: Arc<InnerProgressEmitter>,
|
||||
}
|
||||
|
||||
impl ProgressEmitter {
|
||||
/// Creates a new emitter.
|
||||
///
|
||||
/// The emitter expects to see *total* being added via [`ProgressEmitter::inc`] and will
|
||||
/// emit *steps* updates.
|
||||
fn new(total: u64, steps: u16) -> Self {
|
||||
let (tx, _rx) = broadcast::channel(16);
|
||||
Self {
|
||||
inner: Arc::new(InnerProgressEmitter {
|
||||
total: AtomicU64::new(total),
|
||||
count: AtomicU64::new(0),
|
||||
steps,
|
||||
last_step: AtomicU16::new(0u16),
|
||||
tx,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets a new total in case you did not now the total up front.
|
||||
fn set_total(&self, value: u64) {
|
||||
self.inner.set_total(value)
|
||||
}
|
||||
|
||||
/// Returns a receiver that gets incremental values.
|
||||
///
|
||||
/// The values yielded depend on *steps* passed to [`ProgressEmitter::new`]: it will go
|
||||
/// from `1..steps`.
|
||||
fn subscribe(&self) -> broadcast::Receiver<u16> {
|
||||
self.inner.subscribe()
|
||||
}
|
||||
|
||||
/// Increments the progress by *amount*.
|
||||
fn inc(&self, amount: u64) {
|
||||
self.inner.inc(amount);
|
||||
}
|
||||
|
||||
/// Wraps an [`AsyncRead`] which implicitly calls [`ProgressEmitter::inc`].
|
||||
fn wrap_async_read<R: AsyncRead + Unpin>(&self, read: R) -> ProgressAsyncReader<R> {
|
||||
ProgressAsyncReader {
|
||||
emitter: self.clone(),
|
||||
inner: read,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The actual implementation.
|
||||
///
|
||||
/// This exists so it can be Arc'd into [`ProgressEmitter`] and we can easily have multiple
|
||||
/// `Send + Sync` copies of it. This is used by the
|
||||
/// [`ProgressEmitter::ProgressAsyncReader`] to update the progress without intertwining
|
||||
/// lifetimes.
|
||||
#[derive(Debug)]
|
||||
struct InnerProgressEmitter {
|
||||
total: AtomicU64,
|
||||
count: AtomicU64,
|
||||
steps: u16,
|
||||
last_step: AtomicU16,
|
||||
tx: broadcast::Sender<u16>,
|
||||
}
|
||||
|
||||
impl InnerProgressEmitter {
|
||||
fn inc(&self, amount: u64) {
|
||||
let prev_count = self.count.fetch_add(amount, Ordering::Relaxed);
|
||||
let count = prev_count + amount;
|
||||
let total = self.total.load(Ordering::Relaxed);
|
||||
let step = (std::cmp::min(count, total) * u64::from(self.steps) / total) as u16;
|
||||
let last_step = self.last_step.swap(step, Ordering::Relaxed);
|
||||
if step > last_step {
|
||||
self.tx.send(step).ok();
|
||||
}
|
||||
}
|
||||
|
||||
fn set_total(&self, value: u64) {
|
||||
self.total.store(value, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
fn subscribe(&self) -> broadcast::Receiver<u16> {
|
||||
self.tx.subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper around [`AsyncRead`] which increments a [`ProgressEmitter`].
|
||||
///
|
||||
/// This can be used just like the underlying [`AsyncRead`] but increments progress for each
|
||||
/// byte read. Create this using [`ProgressEmitter::wrap_async_read`].
|
||||
#[derive(Debug)]
|
||||
struct ProgressAsyncReader<R: AsyncRead + Unpin> {
|
||||
emitter: ProgressEmitter,
|
||||
inner: R,
|
||||
}
|
||||
|
||||
impl<R> AsyncRead for ProgressAsyncReader<R>
|
||||
where
|
||||
R: AsyncRead + Unpin,
|
||||
{
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &mut io::ReadBuf<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
let prev_len = buf.filled().len() as u64;
|
||||
match Pin::new(&mut self.inner).poll_read(cx, buf) {
|
||||
Poll::Ready(val) => {
|
||||
let new_len = buf.filled().len() as u64;
|
||||
self.emitter.inc(new_len - prev_len);
|
||||
Poll::Ready(val)
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
Reference in New Issue
Block a user