Move file upload into SMTP send job.

- This adds params for the upload URL and local file path, so that the
actual upload can happen in the send job.
- This also moves the URL generation to the client side so that we can
  generate a valid URL before the upload (because the MIME rendering of
  the mail message happens earlier and we want to include the URL there)
This commit is contained in:
Franz Heinzmann (Frando)
2020-06-10 12:34:51 +02:00
parent 060492afe8
commit 7d2105dbc9
6 changed files with 133 additions and 50 deletions

View File

@@ -32,7 +32,7 @@ use crate::message::{self, Message, MessageState};
use crate::mimefactory::MimeFactory; use crate::mimefactory::MimeFactory;
use crate::param::*; use crate::param::*;
use crate::smtp::Smtp; use crate::smtp::Smtp;
use crate::upload::upload_file; use crate::upload::{generate_upload_url, upload_file};
use crate::{scheduler::InterruptInfo, sql}; use crate::{scheduler::InterruptInfo, sql};
// results in ~3 weeks for the last backoff timespan // results in ~3 weeks for the last backoff timespan
@@ -332,6 +332,17 @@ impl Job {
} }
pub(crate) async fn send_msg_to_smtp(&mut self, context: &Context, smtp: &mut Smtp) -> Status { pub(crate) async fn send_msg_to_smtp(&mut self, context: &Context, smtp: &mut Smtp) -> Status {
// Upload file to HTTP if set in params.
match (
self.param.get_upload_url(),
self.param.get_upload_path(context),
) {
(Some(upload_url), Ok(Some(upload_path))) => {
job_try!(upload_file(context, upload_url.to_string(), upload_path).await);
}
_ => {}
}
// SMTP server, if not yet done // SMTP server, if not yet done
if !smtp.is_connected().await { if !smtp.is_connected().await {
let loginparam = LoginParam::from_database(context, "configured_").await; let loginparam = LoginParam::from_database(context, "configured_").await;
@@ -728,25 +739,23 @@ pub async fn send_msg_job(context: &Context, msg_id: MsgId) -> Result<Option<Job
} }
}; };
// Upload file if DCC_UPLOAD_URL is set.
// See upload-server folder for an example.
// TODO: Move into send_msg_to_smtp job.
let mut did_upload_file = false;
if let Some(file) = msg.get_file(context) {
if let Ok(endpoint) = env::var("DCC_UPLOAD_URL") {
info!(context, "Upload file attachement to {}", endpoint);
let file_url = upload_file(context, endpoint, file).await?;
let text = msg.text.clone().unwrap_or("".into());
let suffix = format!("\n\nFile attachement: {}", file_url);
msg.set_text(Some(format!("{}{}", text, suffix)));
did_upload_file = true;
}
}
let mut mimefactory = MimeFactory::from_msg(context, &msg, attach_selfavatar).await?; let mut mimefactory = MimeFactory::from_msg(context, &msg, attach_selfavatar).await?;
if did_upload_file {
mimefactory.set_include_file(false); // Prepare file upload if DCC_UPLOAD_URL env variable is set.
// See upload-server folder for an example server impl.
// Here a new URL is generated, which the mimefactory includes in the message instead of the
// actual attachement. The upload then happens in the smtp send job.
let upload = if let Some(file) = msg.get_file(context) {
if let Ok(endpoint) = env::var("DCC_UPLOAD_URL") {
let upload_url = generate_upload_url(context, endpoint);
mimefactory.set_upload_url(upload_url.clone());
Some((upload_url, file))
} else {
None
} }
} else {
None
};
let mut recipients = mimefactory.recipients(); let mut recipients = mimefactory.recipients();
@@ -838,6 +847,11 @@ pub async fn send_msg_job(context: &Context, msg_id: MsgId) -> Result<Option<Job
param.set(Param::File, blob.as_name()); param.set(Param::File, blob.as_name());
param.set(Param::Recipients, &recipients); param.set(Param::Recipients, &recipients);
if let Some((upload_url, upload_path)) = upload {
param.set_upload_url(upload_url);
param.set_upload_path(upload_path);
}
let job = create(Action::SendMsgToSmtp, msg_id.to_u32() as i32, param, 0)?; let job = create(Action::SendMsgToSmtp, msg_id.to_u32() as i32, param, 0)?;
Ok(Some(job)) Ok(Some(job))

View File

@@ -50,7 +50,7 @@ pub struct MimeFactory<'a, 'b> {
context: &'a Context, context: &'a Context,
last_added_location_id: u32, last_added_location_id: u32,
attach_selfavatar: bool, attach_selfavatar: bool,
include_file: bool, upload_url: Option<String>,
} }
/// Result of rendering a message, ready to be submitted to a send job. /// Result of rendering a message, ready to be submitted to a send job.
@@ -160,7 +160,7 @@ impl<'a, 'b> MimeFactory<'a, 'b> {
last_added_location_id: 0, last_added_location_id: 0,
attach_selfavatar, attach_selfavatar,
context, context,
include_file: true, upload_url: None,
}; };
Ok(factory) Ok(factory)
} }
@@ -208,7 +208,7 @@ impl<'a, 'b> MimeFactory<'a, 'b> {
req_mdn: false, req_mdn: false,
last_added_location_id: 0, last_added_location_id: 0,
attach_selfavatar: false, attach_selfavatar: false,
include_file: true, upload_url: None,
}; };
Ok(res) Ok(res)
@@ -412,8 +412,8 @@ impl<'a, 'b> MimeFactory<'a, 'b> {
.collect() .collect()
} }
pub fn set_include_file(&mut self, include_file: bool) { pub fn set_upload_url(&mut self, upload_url: String) {
self.include_file = include_file self.upload_url = Some(upload_url)
} }
pub async fn render(mut self) -> Result<RenderedEmail, Error> { pub async fn render(mut self) -> Result<RenderedEmail, Error> {
@@ -886,11 +886,21 @@ impl<'a, 'b> MimeFactory<'a, 'b> {
} }
}; };
// if upload url is present: add as header and to message text
// TODO: make text part translatable (or remove)
let upload_url_text = if let Some(ref upload_url) = self.upload_url {
protected_headers.push(Header::new("Chat-Upload-Url".into(), upload_url.clone()));
Some(format!("\n\nFile attachement: {}", upload_url.clone()))
} else {
None
};
let footer = &self.selfstatus; let footer = &self.selfstatus;
let message_text = format!( let message_text = format!(
"{}{}{}{}{}", "{}{}{}{}{}{}",
fwdhint.unwrap_or_default(), fwdhint.unwrap_or_default(),
escape_message_footer_marks(final_text), escape_message_footer_marks(final_text),
upload_url_text.unwrap_or_default(),
if !final_text.is_empty() && !footer.is_empty() { if !final_text.is_empty() && !footer.is_empty() {
"\r\n\r\n" "\r\n\r\n"
} else { } else {
@@ -906,8 +916,8 @@ impl<'a, 'b> MimeFactory<'a, 'b> {
.body(message_text); .body(message_text);
let mut parts = Vec::new(); let mut parts = Vec::new();
// add attachment part // add attachment part, skip if upload url was provided
if chat::msgtype_has_file(self.msg.viewtype) && self.include_file { if chat::msgtype_has_file(self.msg.viewtype) && self.upload_url.is_none() {
if !is_file_size_okay(context, &self.msg).await { if !is_file_size_okay(context, &self.msg).await {
bail!( bail!(
"Message exceeds the recommended {} MB.", "Message exceeds the recommended {} MB.",

View File

@@ -120,6 +120,12 @@ pub enum Param {
/// For MDN-sending job /// For MDN-sending job
MsgId = b'I', MsgId = b'I',
/// For messages that have a HTTP file upload instead of attachement
UploadUrl = b'y',
/// For messages that have a HTTP file upload instead of attachement: Path to local file
UploadPath = b'Y',
} }
/// Possible values for `Param::ForcePlaintext`. /// Possible values for `Param::ForcePlaintext`.
@@ -317,6 +323,23 @@ impl Params {
Ok(Some(path)) Ok(Some(path))
} }
pub fn get_upload_url(&self) -> Option<&str> {
self.get(Param::UploadUrl)
}
pub fn get_upload_path(&self, context: &Context) -> Result<Option<PathBuf>, BlobError> {
self.get_path(Param::UploadPath, context)
}
pub fn set_upload_path(&mut self, path: PathBuf) {
// TODO: Remove unwrap? May panic for invalid UTF8 in path.
self.set(Param::UploadPath, path.to_str().unwrap());
}
pub fn set_upload_url(&mut self, url: impl AsRef<str>) {
self.set(Param::UploadUrl, url);
}
pub fn get_msg_id(&self) -> Option<MsgId> { pub fn get_msg_id(&self) -> Option<MsgId> {
self.get(Param::MsgId) self.get(Param::MsgId)
.and_then(|x| x.parse::<u32>().ok()) .and_then(|x| x.parse::<u32>().ok())

View File

@@ -1,11 +1,12 @@
use crate::context::Context; use crate::context::Context;
use crate::error::{bail, Result}; use crate::error::{bail, Result};
use async_std::path::PathBuf; use async_std::path::PathBuf;
use rand::Rng;
/// Upload file to a HTTP upload endpoint. /// Upload file to a HTTP upload endpoint.
pub async fn upload_file(_context: &Context, endpoint: String, file: PathBuf) -> Result<String> { pub async fn upload_file(_context: &Context, url: String, filepath: PathBuf) -> Result<String> {
// TODO: Use tokens for upload, encrypt file with PGP. // TODO: Use tokens for upload, encrypt file with PGP.
let response = surf::post(endpoint).body_file(file)?.await; let response = surf::put(url).body_file(filepath)?.await;
if let Err(err) = response { if let Err(err) = response {
bail!("Upload failed: {}", err); bail!("Upload failed: {}", err);
} }
@@ -15,3 +16,17 @@ pub async fn upload_file(_context: &Context, endpoint: String, file: PathBuf) ->
Err(err) => bail!("Invalid response from upload: {}", err), Err(err) => bail!("Invalid response from upload: {}", err),
} }
} }
/// Generate a random URL based on the provided endpoint.
pub fn generate_upload_url(_context: &Context, endpoint: String) -> String {
const CROCKFORD_ALPHABET: &[u8] = b"0123456789abcdefghjkmnpqrstvwxyz";
const FILENAME_LEN: usize = 27;
let mut rng = rand::thread_rng();
let filename: String = (0..FILENAME_LEN)
.map(|_| {
let idx = rng.gen_range(0, CROCKFORD_ALPHABET.len());
CROCKFORD_ALPHABET[idx] as char
})
.collect();
format!("{}{}", endpoint, filename)
}

View File

@@ -7,6 +7,7 @@
"start": "node server.js" "start": "node server.js"
}, },
"dependencies": { "dependencies": {
"base32": "^0.0.6",
"express": "^4.17.1" "express": "^4.17.1"
} }
} }

View File

@@ -1,5 +1,4 @@
const p = require('path') const p = require('path')
const crypto = require('crypto')
const express = require('express') const express = require('express')
const fs = require('fs') const fs = require('fs')
const { pipeline } = require('stream') const { pipeline } = require('stream')
@@ -10,44 +9,65 @@ const config = {
path: process.env.UPLOAD_PATH || p.resolve('./uploads'), path: process.env.UPLOAD_PATH || p.resolve('./uploads'),
port: process.env.PORT || 8080, port: process.env.PORT || 8080,
hostname: process.env.HOSTNAME || '0.0.0.0', hostname: process.env.HOSTNAME || '0.0.0.0',
baseurl: process.env.BASE_URL || null baseurl: process.env.BASE_URL
} }
if (!config.baseurl) config.baseurl = `http://${config.hostname}:${config.port}/`
if (!config.baseurl.endsWith('/')) config.baseurl = config.baseurl + '/' if (!config.baseurl.endsWith('/')) config.baseurl = config.baseurl + '/'
if (!fs.existsSync(config.path)) { if (!fs.existsSync(config.path)) {
fs.mkdirSync(config.path, { recursive: true }) fs.mkdirSync(config.path, { recursive: true })
} }
const baseUrl = config.baseurl || `http://${config.hostname}:${config.port}/` app.use('/:filename', checkFilenameMiddleware)
app.put('/:filename', (req, res) => {
const uploadpath = req.uploadpath
const filename = req.params.filename
fs.stat(uploadpath, (err, stat) => {
if (err && err.code !== 'ENOENT') {
console.error('error', err.message)
return res.code(500).send('internal server error')
}
if (stat) return res.status(500).send('filename in use')
app.post('*', (req, res) => { const ws = fs.createWriteStream(uploadpath)
const filename = crypto.randomBytes(12).toString('hex')
const ws = fs.createWriteStream(p.join(config.path, filename))
pipeline(req, ws, err => { pipeline(req, ws, err => {
if (err) console.error(err) if (err) {
if (err) res.status(500).send(err.message) console.error('error', err.message)
const url = baseUrl + filename return res.status(500).send('internal server error')
console.log('file uploaded: ' + filename) }
res.send(url) console.log('file uploaded: ' + uploadpath)
const url = config.baseurl + filename
res.end(url)
})
}) })
}) })
app.get('/:filename', (req, res) => { app.get('/:filename', (req, res) => {
const filepath = p.normalize(p.join(config.path, req.params.filename)) const uploadpath = req.uploadpath
if (!filepath.startsWith(config.path)) { const rs = fs.createReadStream(uploadpath)
return res.code(500).send('bad request')
}
const rs = fs.createReadStream(filepath)
res.setHeader('content-type', 'application/octet-stream') res.setHeader('content-type', 'application/octet-stream')
pipeline(rs, res, err => { pipeline(rs, res, err => {
if (err) console.error(err) if (err) console.error('error', err.message)
if (err) res.status(500).send(err.message) if (err) return res.status(500).send(err.message)
res.end()
}) })
}) })
function checkFilenameMiddleware (req, res, next) {
const filename = req.params.filename
if (!filename) return res.status(500).send('missing filename')
if (!filename.match(/^[a-zA-Z0-9]{26,32}$/)) {
return res.status(500).send('illegal filename')
}
const uploadpath = p.normalize(p.join(config.path, req.params.filename))
if (!uploadpath.startsWith(config.path)) {
return res.code(500).send('bad request')
}
req.uploadpath = uploadpath
next()
}
app.listen(config.port, err => { app.listen(config.port, err => {
if (err) console.error(err) if (err) console.error(err)
else console.log('Listening on ' + baseUrl) else console.log(`Listening on ${config.baseurl}`)
}) })