mirror of
https://github.com/chatmail/core.git
synced 2026-04-28 02:46:29 +03:00
feat: cache HTTP GET requests
This commit is contained in:
223
src/net/http.rs
223
src/net/http.rs
@@ -6,14 +6,17 @@ use http_body_util::BodyExt;
|
||||
use hyper_util::rt::TokioIo;
|
||||
use mime::Mime;
|
||||
use serde::Serialize;
|
||||
use tokio::fs;
|
||||
|
||||
use crate::blob::BlobObject;
|
||||
use crate::context::Context;
|
||||
use crate::net::proxy::ProxyConfig;
|
||||
use crate::net::session::SessionStream;
|
||||
use crate::net::tls::wrap_rustls;
|
||||
use crate::tools::{create_id, time};
|
||||
|
||||
/// HTTP(S) GET response.
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Response {
|
||||
/// Response body.
|
||||
pub blob: Vec<u8>,
|
||||
@@ -90,9 +93,127 @@ where
|
||||
Ok(sender)
|
||||
}
|
||||
|
||||
/// Converts the URL to expiration timestamp.
|
||||
fn http_url_cache_expires(url: &str, mimetype: Option<&str>) -> i64 {
|
||||
let now = time();
|
||||
if url.ends_with(".xdc") {
|
||||
// WebXDCs expire in 5 weeks.
|
||||
now + 3600 * 24 * 35
|
||||
} else if mimetype.is_some_and(|s| s.starts_with("image/")) {
|
||||
// Cache images for 1 day.
|
||||
now + 3600 * 24
|
||||
} else {
|
||||
// Cache everything else for 1 hour.
|
||||
now + 3600
|
||||
}
|
||||
}
|
||||
|
||||
/// Places the binary into HTTP cache.
|
||||
async fn http_cache_put(context: &Context, url: &str, response: &Response) -> Result<()> {
|
||||
let blob = BlobObject::create(
|
||||
context,
|
||||
&format!("http_cache_{}", create_id()),
|
||||
response.blob.as_slice(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
context
|
||||
.sql
|
||||
.insert(
|
||||
"INSERT OR IGNORE INTO http_cache (url, expires, blobname, mimetype, encoding)
|
||||
VALUES (?, ?, ?, ?, ?)",
|
||||
(
|
||||
url,
|
||||
http_url_cache_expires(url, response.mimetype.as_deref()),
|
||||
blob.as_name(),
|
||||
response.mimetype.as_deref().unwrap_or_default(),
|
||||
response.encoding.as_deref().unwrap_or_default(),
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Retrieves the binary from HTTP cache.
|
||||
async fn http_cache_get(context: &Context, url: &str) -> Result<Option<Response>> {
|
||||
let Some((blob_name, mimetype, encoding)) = context
|
||||
.sql
|
||||
.query_row_optional(
|
||||
"SELECT blobname, mimetype, encoding
|
||||
FROM http_cache WHERE url=? AND expires > ?",
|
||||
(url, time()),
|
||||
|row| {
|
||||
let blob_name: String = row.get(0)?;
|
||||
let mimetype: Option<String> = Some(row.get(1)?).filter(|s: &String| !s.is_empty());
|
||||
let encoding: Option<String> = Some(row.get(2)?).filter(|s: &String| !s.is_empty());
|
||||
Ok((blob_name, mimetype, encoding))
|
||||
},
|
||||
)
|
||||
.await?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let blob_object = BlobObject::from_name(context, blob_name)?;
|
||||
let blob_abs_path = blob_object.to_abs_path();
|
||||
let blob = match fs::read(blob_abs_path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to read blob for {url:?} cache entry."))
|
||||
{
|
||||
Ok(blob) => blob,
|
||||
Err(err) => {
|
||||
// This should not happen, but user may go into the blobdir and remove files,
|
||||
// antivirus may delete the file or there may be a bug in housekeeping.
|
||||
warn!(context, "{err:?}.");
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
let expires = http_url_cache_expires(url, mimetype.as_deref());
|
||||
let response = Response {
|
||||
blob,
|
||||
mimetype,
|
||||
encoding,
|
||||
};
|
||||
|
||||
// Update expiration timestamp.
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"UPDATE http_cache SET expires=? WHERE url=?",
|
||||
(expires, url),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(Some(response))
|
||||
}
|
||||
|
||||
/// Removes expired cache entries.
|
||||
pub(crate) async fn http_cache_cleanup(context: &Context) -> Result<()> {
|
||||
// Remove cache entries that are already expired
|
||||
// or entries that will not expire in a year
|
||||
// to make sure we don't have invalid timestamps that are way forward in the future.
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"DELETE FROM http_cache
|
||||
WHERE ?1 > expires OR expires > ?1 + 31536000",
|
||||
(time(),),
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Retrieves the binary contents of URL using HTTP GET request.
|
||||
pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
|
||||
let mut url = url.to_string();
|
||||
pub async fn read_url_blob(context: &Context, original_url: &str) -> Result<Response> {
|
||||
if let Some(response) = http_cache_get(context, original_url).await? {
|
||||
info!(context, "Returning {original_url:?} from cache.");
|
||||
return Ok(response);
|
||||
}
|
||||
|
||||
info!(context, "Not found {original_url:?} in cache.");
|
||||
let mut url = original_url.to_string();
|
||||
|
||||
// Follow up to 10 http-redirects
|
||||
for _i in 0..10 {
|
||||
@@ -139,11 +260,14 @@ pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
|
||||
});
|
||||
let body = response.collect().await?.to_bytes();
|
||||
let blob: Vec<u8> = body.to_vec();
|
||||
return Ok(Response {
|
||||
let response = Response {
|
||||
blob,
|
||||
mimetype,
|
||||
encoding,
|
||||
});
|
||||
};
|
||||
info!(context, "Inserting {original_url:?} into cache.");
|
||||
http_cache_put(context, &url, &response).await?;
|
||||
return Ok(response);
|
||||
}
|
||||
|
||||
Err(anyhow!("Followed 10 redirections"))
|
||||
@@ -241,3 +365,92 @@ pub(crate) async fn post_form<T: Serialize + ?Sized>(
|
||||
let bytes = response.collect().await?.to_bytes();
|
||||
Ok(bytes)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::sql::housekeeping;
|
||||
use crate::test_utils::TestContext;
|
||||
use crate::tools::SystemTime;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_http_cache() -> Result<()> {
|
||||
let t = &TestContext::new().await;
|
||||
|
||||
assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None);
|
||||
|
||||
let html_response = Response {
|
||||
blob: b"<!DOCTYPE html> ...".to_vec(),
|
||||
mimetype: Some("text/html".to_string()),
|
||||
encoding: None,
|
||||
};
|
||||
|
||||
let xdc_response = Response {
|
||||
blob: b"PK...".to_vec(),
|
||||
mimetype: Some("application/octet-stream".to_string()),
|
||||
encoding: None,
|
||||
};
|
||||
let xdc_editor_url = "https://apps.testrun.org/webxdc-editor-v3.2.0.xdc";
|
||||
let xdc_pixel_url = "https://apps.testrun.org/webxdc-pixel-v2.xdc";
|
||||
|
||||
http_cache_put(t, "https://webxdc.org/", &html_response).await?;
|
||||
|
||||
assert_eq!(http_cache_get(t, xdc_editor_url).await?, None);
|
||||
assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
|
||||
assert_eq!(
|
||||
http_cache_get(t, "https://webxdc.org/").await?,
|
||||
Some(html_response.clone())
|
||||
);
|
||||
|
||||
http_cache_put(t, xdc_editor_url, &xdc_response).await?;
|
||||
assert_eq!(
|
||||
http_cache_get(t, xdc_editor_url).await?,
|
||||
Some(xdc_response.clone())
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
http_cache_get(t, "https://webxdc.org/").await?,
|
||||
Some(html_response.clone())
|
||||
);
|
||||
|
||||
// HTML expires after 1 hour, but .xdc does not.
|
||||
SystemTime::shift(Duration::from_secs(3600 + 100));
|
||||
assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None);
|
||||
assert_eq!(
|
||||
http_cache_get(t, xdc_editor_url).await?,
|
||||
Some(xdc_response.clone())
|
||||
);
|
||||
|
||||
// 35 days later pixel .xdc expires because we did not request it for 35 days and 1 hour.
|
||||
// But editor is still there because we did not request it for just 35 days.
|
||||
SystemTime::shift(Duration::from_secs(3600 * 24 * 35 - 100));
|
||||
|
||||
// Run housekeeping to test that it does not delete the blob too early.
|
||||
housekeeping(t).await?;
|
||||
|
||||
assert_eq!(
|
||||
http_cache_get(t, xdc_editor_url).await?,
|
||||
Some(xdc_response.clone())
|
||||
);
|
||||
assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
|
||||
|
||||
// Test that if the file is accidentally removed from the blobdir,
|
||||
// there is no error when trying to load the cache entry.
|
||||
for entry in std::fs::read_dir(t.get_blobdir())? {
|
||||
let entry = entry.unwrap();
|
||||
let path = entry.path();
|
||||
std::fs::remove_file(path).expect("Failed to remove blob");
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
http_cache_get(t, xdc_editor_url)
|
||||
.await
|
||||
.context("Failed to get no cache response")?,
|
||||
None
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
23
src/sql.rs
23
src/sql.rs
@@ -19,6 +19,7 @@ use crate::location::delete_orphaned_poi_locations;
|
||||
use crate::log::LogExt;
|
||||
use crate::message::{Message, MsgId};
|
||||
use crate::net::dns::prune_dns_cache;
|
||||
use crate::net::http::http_cache_cleanup;
|
||||
use crate::net::prune_connection_history;
|
||||
use crate::param::{Param, Params};
|
||||
use crate::peerstate::Peerstate;
|
||||
@@ -720,6 +721,12 @@ pub async fn housekeeping(context: &Context) -> Result<()> {
|
||||
warn!(context, "Can't set config: {e:#}.");
|
||||
}
|
||||
|
||||
http_cache_cleanup(context)
|
||||
.await
|
||||
.context("Failed to cleanup HTTP cache")
|
||||
.log_err(context)
|
||||
.ok();
|
||||
|
||||
if let Err(err) = remove_unused_files(context).await {
|
||||
warn!(
|
||||
context,
|
||||
@@ -846,6 +853,22 @@ pub async fn remove_unused_files(context: &Context) -> Result<()> {
|
||||
.await
|
||||
.context("housekeeping: failed to SELECT value FROM config")?;
|
||||
|
||||
context
|
||||
.sql
|
||||
.query_map(
|
||||
"SELECT blobname FROM http_cache",
|
||||
(),
|
||||
|row| row.get::<_, String>(0),
|
||||
|rows| {
|
||||
for row in rows {
|
||||
maybe_add_file(&mut files_in_use, &row?);
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
.await
|
||||
.context("Failed to SELECT blobname FROM http_cache")?;
|
||||
|
||||
info!(context, "{} files in use.", files_in_use.len());
|
||||
/* go through directories and delete unused files */
|
||||
let blobdir = context.get_blobdir();
|
||||
|
||||
@@ -1088,6 +1088,21 @@ CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid);
|
||||
.await?;
|
||||
}
|
||||
|
||||
inc_and_check(&mut migration_version, 125)?;
|
||||
if dbversion < migration_version {
|
||||
sql.execute_migration(
|
||||
"CREATE TABLE http_cache (
|
||||
url TEXT PRIMARY KEY,
|
||||
expires INTEGER NOT NULL, -- When the cache entry is considered expired, timestamp in seconds.
|
||||
blobname TEXT NOT NULL,
|
||||
mimetype TEXT NOT NULL DEFAULT '', -- MIME type extracted from Content-Type header.
|
||||
encoding TEXT NOT NULL DEFAULT '' -- Encoding from Content-Type header.
|
||||
) STRICT",
|
||||
migration_version,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let new_version = sql
|
||||
.get_raw_config_int(VERSION_CFG)
|
||||
.await?
|
||||
|
||||
Reference in New Issue
Block a user