diff --git a/README.md b/README.md index d3e321c..bc074ac 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,15 @@ To work on the service without Docker, Rust is required: env ROCKET_CONFIG=conf/config.dev.toml cargo run ``` +## Monitoring + +Cryptify exposes Prometheus metrics at `GET /metrics` for scraping from the +monitoring network. See [`docs/grafana/`](docs/grafana/) for the reference +dashboard JSON and the Prometheus scrape configuration. + +The endpoint is unauthenticated — restrict access at the firewall or reverse +proxy. + ## Releasing Releases are automated with [release-plz](https://release-plz.ieni.dev/). Merging to `main` triggers a release, and Docker images are published automatically. diff --git a/api-description.yaml b/api-description.yaml index 281894b..ff4eb84 100644 --- a/api-description.yaml +++ b/api-description.yaml @@ -9,6 +9,8 @@ servers: tags: - name: "Health" description: "Health check" +- name: "Metrics" + description: "Prometheus scrape endpoint" - name: "File upload" description: "Upload files" - name: "File download" @@ -30,6 +32,37 @@ paths: schema: type: "string" example: "OK" + /metrics: + get: + tags: + - "Metrics" + summary: "Prometheus text-format metrics" + description: | + Returns usage counters and gauges suitable for Prometheus scraping + by the Grafana instance on Scaleway. Intended to be reachable only + from the internal monitoring network; firewall or reverse-proxy + allow-list in front of Cryptify. + + Exposed metrics: + * `cryptify_uploads_total{channel}` — counter of finalized uploads. + * `cryptify_upload_bytes_total{channel}` — counter of bytes. + * `cryptify_storage_bytes` — gauge, current disk usage. + * `cryptify_active_files` — gauge, current file count. + * `cryptify_expired_files_total` — counter of uploads purged + before finalization. + + The `channel` label is derived from the `X-Cryptify-Source` header, + falling back to `Authorization`/`X-Api-Key` (→ `api`), then the + `Origin` header (`website` / `staging-website`), then `User-Agent` + (`outlook` / `thunderbird`), then `unknown`. + operationId: "metrics" + responses: + "200": + description: "Prometheus text exposition format" + content: + text/plain: + schema: + type: "string" /fileupload/init: post: tags: diff --git a/docs/grafana/README.md b/docs/grafana/README.md new file mode 100644 index 0000000..cee45b3 --- /dev/null +++ b/docs/grafana/README.md @@ -0,0 +1,67 @@ +# Grafana dashboards + +This directory ships dashboard JSON files intended to be imported into the +Scaleway Grafana instance that monitors PostGuard. + +Files: + +- `postguard-usage.json` — covers everything issue + [encryption4all/cryptify#101](https://github.com/encryption4all/cryptify/issues/101) + asks for: messages sent per channel (website / staging / Outlook / + Thunderbird / API) and Cryptify storage usage for staging vs. Procolix + production. + +## Metrics source + +Dashboards query Prometheus metrics exposed by Cryptify at `GET /metrics`. +The scrape target must be configured in Prometheus with two labels that the +dashboards rely on: + +- `instance` — the hostname of the Cryptify instance +- `environment` — `staging` or `production` + +Example Prometheus scrape config: + +```yaml +scrape_configs: + - job_name: cryptify + metrics_path: /metrics + static_configs: + - targets: ['cryptify-staging.postguard.eu:8000'] + labels: + environment: staging + - targets: ['cryptify.postguard.eu:8000'] + labels: + environment: production +``` + +The `/metrics` endpoint is unauthenticated — restrict access to the Prometheus +network segment via firewall or reverse-proxy allow-list. + +## Channel label + +The `channel` label on upload counters is derived inside Cryptify from the +request headers (in order of priority): + +1. `X-Cryptify-Source` (explicit) — set by the Outlook and Thunderbird addons + once their follow-up PRs land. Expected values: `outlook`, `thunderbird`, + `api`. +2. `Authorization: Bearer ...` or `X-Api-Key` — labelled `api`. +3. `Origin` — `staging.postguard.*` → `staging-website`, any other + `postguard.*` → `website`. +4. `User-Agent` substrings — `outlook` / `thunderbird`. +5. Fallback — `unknown`. + +## Importing + +1. In Grafana, navigate to **Dashboards → Import**. +2. Upload `postguard-usage.json` or paste its contents. +3. Select the Prometheus datasource that scrapes Cryptify. + +## Follow-up work + +- Outlook addon: send `X-Cryptify-Source: outlook` on every upload request + (via the SDK wrapper it uses to talk to Cryptify). +- Thunderbird addon: same with `thunderbird`. +- Until those land, requests from the addons fall back to the `User-Agent` + rule, which is approximate but functional. diff --git a/docs/grafana/postguard-usage.json b/docs/grafana/postguard-usage.json new file mode 100644 index 0000000..ed912e6 --- /dev/null +++ b/docs/grafana/postguard-usage.json @@ -0,0 +1,130 @@ +{ + "title": "PostGuard Usage", + "uid": "postguard-usage", + "schemaVersion": 39, + "timezone": "browser", + "refresh": "1m", + "time": { "from": "now-7d", "to": "now" }, + "tags": ["postguard", "cryptify"], + "templating": { + "list": [ + { + "name": "environment", + "type": "query", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "query": "label_values(cryptify_uploads_total, environment)", + "includeAll": true, + "multi": true, + "refresh": 2 + } + ] + }, + "panels": [ + { + "id": 1, + "type": "timeseries", + "title": "Messages sent per channel (per hour)", + "description": "Rate of finalized Cryptify uploads, split by source channel.", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 9, "w": 24, "x": 0, "y": 0 }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "options": { "legend": { "displayMode": "table", "placement": "right", "calcs": ["sum"] } }, + "targets": [ + { + "refId": "A", + "expr": "sum by (channel) (increase(cryptify_uploads_total{environment=~\"$environment\"}[1h]))", + "legendFormat": "{{channel}}" + } + ] + }, + { + "id": 2, + "type": "stat", + "title": "Total messages (selected window)", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 6, "w": 6, "x": 0, "y": 9 }, + "fieldConfig": { "defaults": { "unit": "short", "decimals": 0 } }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] }, + "textMode": "value_and_name", + "colorMode": "value" + }, + "targets": [ + { + "refId": "A", + "expr": "sum by (channel) (increase(cryptify_uploads_total{environment=~\"$environment\"}[$__range]))", + "legendFormat": "{{channel}}" + } + ] + }, + { + "id": 3, + "type": "stat", + "title": "Bytes uploaded (selected window)", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 6, "w": 6, "x": 6, "y": 9 }, + "fieldConfig": { "defaults": { "unit": "decbytes" } }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] }, + "textMode": "value_and_name", + "colorMode": "value" + }, + "targets": [ + { + "refId": "A", + "expr": "sum by (channel) (increase(cryptify_upload_bytes_total{environment=~\"$environment\"}[$__range]))", + "legendFormat": "{{channel}}" + } + ] + }, + { + "id": 4, + "type": "timeseries", + "title": "Cryptify storage usage", + "description": "Bytes currently held on disk per environment.", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 9, "w": 12, "x": 0, "y": 15 }, + "fieldConfig": { "defaults": { "unit": "decbytes" } }, + "options": { "legend": { "displayMode": "list", "placement": "bottom" } }, + "targets": [ + { + "refId": "A", + "expr": "cryptify_storage_bytes{environment=~\"$environment\"}", + "legendFormat": "{{environment}} ({{instance}})" + } + ] + }, + { + "id": 5, + "type": "timeseries", + "title": "Cryptify active file count", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 9, "w": 12, "x": 12, "y": 15 }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "options": { "legend": { "displayMode": "list", "placement": "bottom" } }, + "targets": [ + { + "refId": "A", + "expr": "cryptify_active_files{environment=~\"$environment\"}", + "legendFormat": "{{environment}} ({{instance}})" + } + ] + }, + { + "id": 6, + "type": "timeseries", + "title": "Expired (un-finalized) uploads", + "description": "Rate of uploads that expired before finalization — should remain low. Spikes can indicate a broken client or slow network.", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 8, "w": 24, "x": 0, "y": 24 }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "targets": [ + { + "refId": "A", + "expr": "sum by (environment) (increase(cryptify_expired_files_total{environment=~\"$environment\"}[1h]))", + "legendFormat": "{{environment}}" + } + ] + } + ] +} diff --git a/src/config.rs b/src/config.rs index c38c5e1..bcaa843 100644 --- a/src/config.rs +++ b/src/config.rs @@ -12,6 +12,7 @@ pub struct RawCryptifyConfig { smtp_tls: Option, allowed_origins: String, pkg_url: String, + metrics_scan_interval_secs: Option, chunk_size: Option, } @@ -28,6 +29,7 @@ pub struct CryptifyConfig { smtp_tls: bool, allowed_origins: String, pkg_url: String, + metrics_scan_interval_secs: u64, chunk_size: u64, } @@ -47,6 +49,7 @@ impl From for CryptifyConfig { smtp_tls: config.smtp_tls.unwrap_or(true), allowed_origins: config.allowed_origins, pkg_url: config.pkg_url, + metrics_scan_interval_secs: config.metrics_scan_interval_secs.unwrap_or(60), chunk_size: config.chunk_size.unwrap_or(5_000_000), } } @@ -93,6 +96,10 @@ impl CryptifyConfig { &self.pkg_url } + pub fn metrics_scan_interval_secs(&self) -> u64 { + self.metrics_scan_interval_secs + } + pub fn chunk_size(&self) -> u64 { self.chunk_size } diff --git a/src/main.rs b/src/main.rs index de0f6eb..772c46c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,16 @@ mod config; mod email; mod error; +mod metrics; mod store; +use std::sync::Arc; +use std::time::Duration; + use crate::config::CryptifyConfig; use crate::email::send_email; use crate::error::{Error, PayloadTooLargeBody}; +use crate::metrics::{detect_channel, storage_sampler, Metrics}; use crate::store::{ PER_UPLOAD_LIMIT, ROLLING_LIMIT, API_KEY_PER_UPLOAD_LIMIT, API_KEY_ROLLING_LIMIT, ROLLING_WINDOW_SECS, @@ -71,11 +76,35 @@ struct InitResponder { cryptify_token: CryptifyToken, } +/// Request guard that derives the traffic source channel from the request +/// headers for metrics labelling. +struct ClientHeaders { + channel: String, +} + +#[rocket::async_trait] +impl<'r> FromRequest<'r> for ClientHeaders { + type Error = std::convert::Infallible; + + async fn from_request( + request: &'r rocket::Request<'_>, + ) -> rocket::request::Outcome { + rocket::request::Outcome::Success(ClientHeaders { + channel: detect_channel(request.headers()), + }) + } +} + #[get("/health")] fn health() -> &'static str { "OK" } +#[get("/metrics")] +fn metrics_endpoint(metrics: &State>) -> rocket::response::content::RawText { + rocket::response::content::RawText(metrics.render()) +} + /// Presence of an X-Api-Key header (value not validated here — PKG handles that). struct ApiKeyPresent(bool); @@ -94,6 +123,7 @@ async fn upload_init( store: &State, api_key: ApiKeyPresent, request: Json, + client_headers: ClientHeaders, ) -> Result { let current_time = chrono::offset::Utc::now().timestamp(); let uuid = uuid::Uuid::new_v4().hyphenated().to_string(); @@ -122,6 +152,7 @@ async fn upload_init( sender: None, sender_attributes: Vec::new(), confirm: request.confirm, + source_channel: client_headers.channel, is_api_key: api_key.0, }, ); @@ -417,6 +448,7 @@ async fn upload_finalize( config: &State, store: &State, vk: &State>, + metrics: &State>, headers: FinalizeHeaders, uuid: &str, ) -> Result, Error> { @@ -499,6 +531,8 @@ async fn upload_finalize( Error::InternalServerError(Some("could not send email".to_owned())) })?; + metrics.record_upload(&state.source_channel, state.uploaded); + if let Some(sender_email) = sender { store.record_upload(sender_email, state.uploaded, now_secs); } @@ -584,13 +618,31 @@ async fn rocket() -> _ { .to_cors() .expect("unable to configure CORS"); + let metrics = Arc::new(Metrics::new()); + rocket::tokio::spawn(storage_sampler( + metrics.clone(), + std::path::PathBuf::from(config.data_dir()), + Duration::from_secs(config.metrics_scan_interval_secs()), + )); + rocket .attach(cors) - .mount("/", routes![health, upload_init, upload_chunk, upload_finalize, usage]) + .mount( + "/", + routes![ + health, + metrics_endpoint, + upload_init, + upload_chunk, + upload_finalize, + usage, + ], + ) .mount("/filedownload", FileServer::from(config.data_dir())) .attach(AdHoc::config::()) - .manage(Store::new()) + .manage(Store::new(metrics.clone())) .manage(vk) + .manage(metrics) } #[cfg(test)] diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..86d9b2c --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,331 @@ +//! Usage metrics for Grafana scraping. +//! +//! Exposes a Prometheus text-format `/metrics` endpoint covering: +//! - uploads completed, split by traffic source ("channel") +//! - bytes uploaded, split by channel +//! - current on-disk storage bytes and active file count (sampled +//! periodically by a background task) +//! +//! See `docs/grafana/` for the reference dashboard JSON. + +use std::collections::BTreeMap; +use std::fmt::Write as _; +use std::path::Path; +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use std::sync::Mutex; +use std::time::Duration; + +use rocket::http::HeaderMap; + +/// Channel label used when no other source information is present. +pub const CHANNEL_UNKNOWN: &str = "unknown"; + +/// Header clients can set to identify themselves (`outlook`, `thunderbird`, +/// `api`, ...). Leading whitespace is trimmed and the value is lowercased +/// and restricted to `[a-z0-9_-]` so it cannot inject Prometheus syntax. +pub const SOURCE_HEADER: &str = "X-Cryptify-Source"; + +#[derive(Default)] +pub struct Metrics { + uploads: Mutex>, + upload_bytes: Mutex>, + storage_bytes: AtomicI64, + active_files: AtomicI64, + expired_files: AtomicU64, +} + +impl Metrics { + pub fn new() -> Self { + Self::default() + } + + /// Record a successfully finalized upload. + pub fn record_upload(&self, channel: &str, bytes: u64) { + let channel = sanitize_label(channel); + let mut uploads = self.uploads.lock().unwrap(); + *uploads.entry(channel.clone()).or_insert(0) += 1; + let mut bytes_map = self.upload_bytes.lock().unwrap(); + *bytes_map.entry(channel).or_insert(0) += bytes; + } + + /// Record an upload that expired / was purged without finalizing. + pub fn record_expired(&self) { + self.expired_files.fetch_add(1, Ordering::Relaxed); + } + + /// Update the current on-disk storage sample. + pub fn set_storage(&self, bytes: i64, active_files: i64) { + self.storage_bytes.store(bytes, Ordering::Relaxed); + self.active_files.store(active_files, Ordering::Relaxed); + } + + /// Render all metrics in Prometheus text-exposition format. + pub fn render(&self) -> String { + let mut out = String::new(); + + let _ = writeln!(out, "# HELP cryptify_uploads_total Total finalized uploads per channel."); + let _ = writeln!(out, "# TYPE cryptify_uploads_total counter"); + let uploads = self.uploads.lock().unwrap(); + if uploads.is_empty() { + let _ = writeln!(out, "cryptify_uploads_total{{channel=\"{}\"}} 0", CHANNEL_UNKNOWN); + } else { + for (channel, count) in uploads.iter() { + let _ = writeln!(out, "cryptify_uploads_total{{channel=\"{}\"}} {}", channel, count); + } + } + drop(uploads); + + let _ = writeln!(out, "# HELP cryptify_upload_bytes_total Total bytes uploaded per channel."); + let _ = writeln!(out, "# TYPE cryptify_upload_bytes_total counter"); + let bytes = self.upload_bytes.lock().unwrap(); + if bytes.is_empty() { + let _ = writeln!(out, "cryptify_upload_bytes_total{{channel=\"{}\"}} 0", CHANNEL_UNKNOWN); + } else { + for (channel, b) in bytes.iter() { + let _ = writeln!(out, "cryptify_upload_bytes_total{{channel=\"{}\"}} {}", channel, b); + } + } + drop(bytes); + + let _ = writeln!(out, "# HELP cryptify_storage_bytes Current bytes of uploads held on disk."); + let _ = writeln!(out, "# TYPE cryptify_storage_bytes gauge"); + let _ = writeln!(out, "cryptify_storage_bytes {}", self.storage_bytes.load(Ordering::Relaxed)); + + let _ = writeln!(out, "# HELP cryptify_active_files Number of upload files currently on disk."); + let _ = writeln!(out, "# TYPE cryptify_active_files gauge"); + let _ = writeln!(out, "cryptify_active_files {}", self.active_files.load(Ordering::Relaxed)); + + let _ = writeln!(out, "# HELP cryptify_expired_files_total Uploads that expired before being finalized."); + let _ = writeln!(out, "# TYPE cryptify_expired_files_total counter"); + let _ = writeln!(out, "cryptify_expired_files_total {}", self.expired_files.load(Ordering::Relaxed)); + + out + } +} + +/// Derive the channel label for a request from its headers. +/// +/// Priority: +/// 1. `X-Cryptify-Source` explicit header. +/// 2. API auth (`Authorization: Bearer …` or `X-Api-Key`) → `api`. +/// 3. `Origin` → `staging-website` / `website`. +/// 4. `User-Agent` substring for Outlook / Thunderbird. +/// 5. `unknown`. +pub fn detect_channel(headers: &HeaderMap<'_>) -> String { + if let Some(raw) = headers.get_one(SOURCE_HEADER) { + let cleaned = sanitize_label(raw); + if !cleaned.is_empty() && cleaned != CHANNEL_UNKNOWN { + return cleaned; + } + } + if headers.get_one("X-Api-Key").is_some() + || headers + .get_one("Authorization") + .map(|v| v.trim_start().to_ascii_lowercase().starts_with("bearer ")) + .unwrap_or(false) + { + return "api".to_string(); + } + if let Some(origin) = headers.get_one("Origin") { + let o = origin.to_ascii_lowercase(); + if o.contains("staging.postguard") || o.contains("staging-postguard") { + return "staging-website".to_string(); + } + if o.contains("postguard.") { + return "website".to_string(); + } + } + if let Some(ua) = headers.get_one("User-Agent") { + let ua = ua.to_ascii_lowercase(); + if ua.contains("outlook") { + return "outlook".to_string(); + } + if ua.contains("thunderbird") { + return "thunderbird".to_string(); + } + } + CHANNEL_UNKNOWN.to_string() +} + +/// Reduce an arbitrary string to a safe Prometheus label value: +/// lower-case, `[a-z0-9_-]`, max 32 chars, non-empty (falls back to +/// `unknown`). This prevents clients from injecting label syntax or +/// exploding cardinality with arbitrary inputs. +fn sanitize_label(raw: &str) -> String { + let cleaned: String = raw + .trim() + .to_ascii_lowercase() + .chars() + .map(|c| match c { + 'a'..='z' | '0'..='9' | '-' | '_' => c, + _ => '-', + }) + .take(32) + .collect(); + let trimmed = cleaned.trim_matches('-').to_string(); + if trimmed.is_empty() { + CHANNEL_UNKNOWN.to_string() + } else { + trimmed + } +} + +/// Walk `data_dir` once and return `(total_bytes, file_count)`. Symlinks +/// and subdirectories are ignored — the upload directory is a flat +/// directory of files named by UUID. +pub fn sample_storage(data_dir: &Path) -> std::io::Result<(i64, i64)> { + let mut total: i64 = 0; + let mut count: i64 = 0; + match std::fs::read_dir(data_dir) { + Ok(rd) => { + for entry in rd.flatten() { + if let Ok(meta) = entry.metadata() { + if meta.is_file() { + total = total.saturating_add(meta.len() as i64); + count += 1; + } + } + } + Ok((total, count)) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok((0, 0)), + Err(e) => Err(e), + } +} + +/// Periodically sample `data_dir` and push the numbers onto `metrics`. +pub async fn storage_sampler( + metrics: std::sync::Arc, + data_dir: std::path::PathBuf, + interval: Duration, +) { + loop { + match sample_storage(&data_dir) { + Ok((bytes, count)) => metrics.set_storage(bytes, count), + Err(e) => log::warn!("metrics: storage sampling failed for {:?}: {}", data_dir, e), + } + rocket::tokio::time::sleep(interval).await; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rocket::http::Header; + + fn headers(pairs: &[(&'static str, &'static str)]) -> rocket::http::HeaderMap<'static> { + let mut h = rocket::http::HeaderMap::new(); + for (k, v) in pairs { + h.add(Header::new(*k, *v)); + } + h + } + + #[test] + fn channel_explicit_header_wins() { + let h = headers(&[ + ("X-Cryptify-Source", "OUTLOOK"), + ("Origin", "https://postguard.eu"), + ]); + assert_eq!(detect_channel(&h), "outlook"); + } + + #[test] + fn channel_bearer_is_api() { + let h = headers(&[("Authorization", "Bearer abc123")]); + assert_eq!(detect_channel(&h), "api"); + } + + #[test] + fn channel_api_key_is_api() { + let h = headers(&[("X-Api-Key", "s3cret")]); + assert_eq!(detect_channel(&h), "api"); + } + + #[test] + fn channel_origin_staging() { + let h = headers(&[("Origin", "https://staging.postguard.eu")]); + assert_eq!(detect_channel(&h), "staging-website"); + } + + #[test] + fn channel_origin_production() { + let h = headers(&[("Origin", "https://postguard.eu")]); + assert_eq!(detect_channel(&h), "website"); + } + + #[test] + fn channel_user_agent_outlook() { + let h = headers(&[("User-Agent", "Mozilla Outlook/16.0")]); + assert_eq!(detect_channel(&h), "outlook"); + } + + #[test] + fn channel_user_agent_thunderbird() { + let h = headers(&[("User-Agent", "Thunderbird/115.0")]); + assert_eq!(detect_channel(&h), "thunderbird"); + } + + #[test] + fn channel_defaults_to_unknown() { + let h = headers(&[]); + assert_eq!(detect_channel(&h), "unknown"); + } + + #[test] + fn sanitize_strips_unsafe_chars_and_caps_length() { + assert_eq!(sanitize_label("Outlook\n\"}"), "outlook"); + assert_eq!(sanitize_label(""), "unknown"); + assert_eq!(sanitize_label(" "), "unknown"); + let long = "a".repeat(100); + assert_eq!(sanitize_label(&long).len(), 32); + } + + #[test] + fn render_emits_zero_counters_when_empty() { + let m = Metrics::new(); + let text = m.render(); + assert!(text.contains("cryptify_uploads_total{channel=\"unknown\"} 0")); + assert!(text.contains("cryptify_upload_bytes_total{channel=\"unknown\"} 0")); + assert!(text.contains("cryptify_storage_bytes 0")); + assert!(text.contains("cryptify_active_files 0")); + assert!(text.contains("cryptify_expired_files_total 0")); + } + + #[test] + fn render_aggregates_by_channel() { + let m = Metrics::new(); + m.record_upload("website", 1_000); + m.record_upload("website", 500); + m.record_upload("outlook", 250); + m.record_expired(); + m.set_storage(9_999, 3); + let text = m.render(); + assert!(text.contains("cryptify_uploads_total{channel=\"website\"} 2")); + assert!(text.contains("cryptify_uploads_total{channel=\"outlook\"} 1")); + assert!(text.contains("cryptify_upload_bytes_total{channel=\"website\"} 1500")); + assert!(text.contains("cryptify_upload_bytes_total{channel=\"outlook\"} 250")); + assert!(text.contains("cryptify_storage_bytes 9999")); + assert!(text.contains("cryptify_active_files 3")); + assert!(text.contains("cryptify_expired_files_total 1")); + } + + #[test] + fn sample_storage_missing_dir_is_zero() { + let tmp = std::env::temp_dir().join("cryptify-metrics-missing-xyz"); + let (bytes, count) = sample_storage(&tmp).unwrap(); + assert_eq!((bytes, count), (0, 0)); + } + + #[test] + fn sample_storage_counts_files() { + let tmp = std::env::temp_dir().join(format!("cryptify-metrics-{}", uuid::Uuid::new_v4())); + std::fs::create_dir_all(&tmp).unwrap(); + std::fs::write(tmp.join("a"), b"hello").unwrap(); + std::fs::write(tmp.join("b"), b"world!").unwrap(); + let (bytes, count) = sample_storage(&tmp).unwrap(); + assert_eq!(count, 2); + assert_eq!(bytes, 11); + std::fs::remove_dir_all(&tmp).unwrap(); + } +} diff --git a/src/store.rs b/src/store.rs index 9d8acee..edc5192 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,4 +1,5 @@ use crate::email; +use crate::metrics::Metrics; use std::{ collections::{BTreeMap, HashMap, VecDeque}, @@ -24,6 +25,9 @@ pub struct FileState { pub sender: Option, pub sender_attributes: Vec<(String, String)>, pub confirm: bool, + /// Traffic source this upload originated from ("website", "outlook", + /// "thunderbird", "api", ...). Used only for metrics labelling. + pub source_channel: String, pub is_api_key: bool, } @@ -44,6 +48,7 @@ struct StoreState { struct SharedState { state: std::sync::Mutex, notify: Notify, + metrics: Arc, } pub struct Store { @@ -51,7 +56,7 @@ pub struct Store { } impl Store { - pub fn new() -> Self { + pub fn new(metrics: Arc) -> Self { let result = Store { shared: Arc::new(SharedState { state: std::sync::Mutex::new(StoreState { @@ -62,6 +67,7 @@ impl Store { shutdown: false, }), notify: Notify::new(), + metrics, }), }; @@ -164,7 +170,16 @@ impl SharedState { return Some(when); } - state.files.remove(id); + let id = id.clone(); + if let Some(entry) = state.files.remove(&id) { + // An entry that still had no `sender` set was never finalized. + // (`sender` is populated by `upload_finalize` once the file has + // been unsealed.) + let was_unfinalized = entry.try_lock().map(|g| g.sender.is_none()).unwrap_or(false); + if was_unfinalized { + self.metrics.record_expired(); + } + } state.expirations.remove(&(when, removal_id)); } @@ -195,7 +210,7 @@ mod tests { #[rocket::async_test] async fn usage_is_zero_for_unknown_email() { - let store = Store::new(); + let store = Store::new(Arc::new(Metrics::new())); assert_eq!( store.get_usage("unknown@example.com", 1_000_000).used_bytes, 0 @@ -204,7 +219,7 @@ mod tests { #[rocket::async_test] async fn usage_sums_records_in_window() { - let store = Store::new(); + let store = Store::new(Arc::new(Metrics::new())); let now: i64 = 2_000_000; store.record_upload("a@example.com".into(), 1_000_000_000, now - 3600); store.record_upload("a@example.com".into(), 2_000_000_000, now - 60); @@ -218,7 +233,7 @@ mod tests { #[rocket::async_test] async fn usage_excludes_records_outside_window() { - let store = Store::new(); + let store = Store::new(Arc::new(Metrics::new())); let now: i64 = 2_000_000; store.record_upload( "b@example.com".into(), @@ -234,7 +249,7 @@ mod tests { #[rocket::async_test] async fn usage_is_isolated_per_email() { - let store = Store::new(); + let store = Store::new(Arc::new(Metrics::new())); let now: i64 = 2_000_000; store.record_upload("a@example.com".into(), 1_000, now); store.record_upload("b@example.com".into(), 2_000, now); @@ -244,7 +259,7 @@ mod tests { #[rocket::async_test] async fn pruning_removes_only_expired_records() { - let store = Store::new(); + let store = Store::new(Arc::new(Metrics::new())); let now: i64 = 2_000_000; store.record_upload( "c@example.com".into(),