Skip to content
Draft
119 changes: 119 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ readme = "README.md"
assert2 = "0.4.0"
assert_matches = "1.5.0"
async-trait = "0.1.89"
axum = "0.8"
axum-reverse-proxy = { version = "1.1", default-features = false }
base64 = "0.22.1"
bincode = "1.3.3"
bs58 = "0.5.1"
candid = "0.10.20"
candid_parser = "0.3.0"
canhttp = "0.5.2"
canlog = "0.2.0"
derive_more = { version = "2.1.1", features = ["from"] }
futures = "0.3.31"
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ license.workspace = true

[dependencies]
async-trait = { workspace = true }
axum = { workspace = true }
axum-reverse-proxy = { workspace = true }
candid = { workspace = true }
canhttp = { workspace = true }
canlog = { workspace = true }
cksol-types = { path = "../libs/types" }
cksol-types-internal = { path = "../libs/types-internal", features = ["log", "event"] }
Expand Down
83 changes: 83 additions & 0 deletions integration_tests/src/json_rpc_reverse_proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//! A JSON-RPC reverse proxy with a configurable request blocklist.

use axum::{
Extension, Router, body::to_bytes, extract::Request, middleware, response::IntoResponse,
};
use axum_reverse_proxy::ReverseProxy;
use canhttp::http::json::JsonRpcRequest;
use serde_json::Value;
use std::sync::Arc;
use tokio::sync::RwLock;

#[derive(Clone, Debug)]
pub struct JsonRpcRequestMatcher {
method: String,
}

impl JsonRpcRequestMatcher {
pub fn with_method(method: impl Into<String>) -> Self {
Self {
method: method.into(),
}
}

fn matches(&self, body: &[u8]) -> bool {
serde_json::from_slice::<JsonRpcRequest<Value>>(body)
.is_ok_and(|req| req.method() == self.method)
}
}

type Blocklist = Arc<RwLock<Vec<JsonRpcRequestMatcher>>>;

pub struct JsonRpcReverseProxy {
blocklist: Blocklist,
port: u16,
}

impl JsonRpcReverseProxy {
pub async fn start(target_url: &str, port: u16) -> Self {
let blocklist: Blocklist = Default::default();
let proxy: Router<()> = ReverseProxy::new("/", target_url).into();
let app = proxy
.layer(middleware::from_fn(block_middleware))
.layer(Extension(blocklist.clone()));
let listener = tokio::net::TcpListener::bind(("127.0.0.1", port))
.await
.unwrap();
tokio::spawn(async move { axum::serve(listener, app).await.ok() });
Self { blocklist, port }
}

pub fn url(&self) -> String {
format!("http://127.0.0.1:{}", self.port)
}

pub async fn block(&self, filter: JsonRpcRequestMatcher) {
self.blocklist.write().await.push(filter);
}

pub async fn clear_blocklist(&self) {
self.blocklist.write().await.clear();
}
}

async fn block_middleware(
Extension(blocklist): Extension<Blocklist>,
req: Request,
next: middleware::Next,
) -> axum::response::Response {
let blocklist = blocklist.read().await;
if blocklist.is_empty() {
return next.run(req).await;
}

let (parts, body) = req.into_parts();
let bytes = to_bytes(body, 64 * 1024).await.unwrap();

if blocklist.iter().any(|f| f.matches(&bytes)) {
return axum::http::StatusCode::SERVICE_UNAVAILABLE.into_response();
}

next.run(Request::from_parts(parts, axum::body::Body::from(bytes)))
.await
}
48 changes: 45 additions & 3 deletions integration_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::{default::Default, env::var, fs, ops::Deref, path::PathBuf, time::Durat

pub mod events;
pub mod fixtures;
pub mod json_rpc_reverse_proxy;
pub mod ledger_init_args;

#[derive(Default)]
Expand All @@ -43,6 +44,7 @@ pub struct SetupBuilder {
sol_rpc_install_args: Option<sol_rpc_types::InstallArgs>,
initial_ledger_balances: Option<Vec<(Account, Nat)>>,
proxy_canister: bool,
json_rpc_proxy: Option<(String, u16)>,
}

impl SetupBuilder {
Expand Down Expand Up @@ -73,14 +75,46 @@ impl SetupBuilder {
self
}

/// Route all SOL RPC traffic through a JSON-RPC proxy that can block
/// requests by method name. Use [`Setup::proxy`] to toggle the blocklist.
pub fn with_json_rpc_proxy(mut self, target_url: &str, port: u16) -> Self {
self.json_rpc_proxy = Some((target_url.to_string(), port));
self
}

pub async fn build(self) -> Setup {
Setup::new(
use sol_rpc_types::{OverrideProvider, RegexSubstitution};

let proxy = match &self.json_rpc_proxy {
Some((target_url, port)) => {
Some(json_rpc_reverse_proxy::JsonRpcReverseProxy::start(target_url, *port).await)
}
None => None,
};

let sol_rpc_install_args = match &proxy {
Some(p) => {
let mut args = self.sol_rpc_install_args.unwrap_or_default();
args.override_provider = Some(OverrideProvider {
override_url: Some(RegexSubstitution {
pattern: ".*".into(),
replacement: p.url(),
}),
});
args
}
None => self.sol_rpc_install_args.unwrap_or_default(),
};

let mut setup = Setup::new(
self.make_live.unwrap_or_default(),
self.sol_rpc_install_args.unwrap_or_default(),
sol_rpc_install_args,
self.initial_ledger_balances,
self.proxy_canister,
)
.await
.await;
setup.json_rpc_proxy = proxy;
setup
}
}

Expand All @@ -90,6 +124,7 @@ pub struct Setup {
ledger_canister_id: CanisterId,
sol_rpc_canister_id: CanisterId,
proxy_canister_id: Option<CanisterId>,
json_rpc_proxy: Option<json_rpc_reverse_proxy::JsonRpcReverseProxy>,
}

impl Setup {
Expand All @@ -103,6 +138,12 @@ impl Setup {
pub const DEFAULT_MINIMUM_DEPOSIT_AMOUNT: Lamport = 10_000_000; // 0.01 SOL
pub const DEFAULT_UPDATE_BALANCE_REQUIRED_CYCLES: u128 = 1_000_000_000_000;

pub fn json_rpc_proxy(&self) -> &json_rpc_reverse_proxy::JsonRpcReverseProxy {
self.json_rpc_proxy
.as_ref()
.expect("Setup was not built with a JSON-RPC proxy")
}

pub async fn new(
make_live: PocketIcMode,
sol_rpc_install_args: sol_rpc_types::InstallArgs,
Expand Down Expand Up @@ -212,6 +253,7 @@ impl Setup {
ledger_canister_id,
sol_rpc_canister_id,
proxy_canister_id,
json_rpc_proxy: None,
}
}

Expand Down
Loading
Loading