Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,076 changes: 662 additions & 414 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions crates/rproxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,8 @@ uuid = { version = "1.18.1", features = ["v7" ]}
valuable = { version = "0.1.1", features = ["derive"] }
x509-parser = "0.18.0"
zstd = "0.13.3"

[dev-dependencies]
actix-rt = "2.11.0"
jsonrpsee = { version = "0.26.0", features = ["server"] }
mime = "0.3.17"
54 changes: 53 additions & 1 deletion crates/rproxy/src/jrpc/jrpc_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const JRPC_METHOD_FCUV3_WITH_PAYLOAD: Cow<'static, str> =

const EMPTY_PARAMS: &Vec<serde_json::Value> = &Vec::new();

#[derive(Debug)]
pub(crate) struct JrpcRequestMeta {
id: Id,

Expand Down Expand Up @@ -54,6 +55,7 @@ impl<'a> Deserialize<'a> for JrpcRequestMeta {
struct JrpcRequestMetaWire {
id: Id,
method: Cow<'static, str>,
#[serde(default)]
params: serde_json::Value,
}

Expand Down Expand Up @@ -93,7 +95,7 @@ impl<'a> Deserialize<'a> for JrpcRequestMeta {

const JRPC_METHOD_BATCH: Cow<'static, str> = Cow::Borrowed("batch");

#[derive(Deserialize)]
#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub(crate) enum JrpcRequestMetaMaybeBatch {
Single(JrpcRequestMeta),
Expand All @@ -108,3 +110,53 @@ impl JrpcRequestMetaMaybeBatch {
}
}
}

// tests ---------------------------------------------------------------

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_jrpc_request_meta_maybe_batch_deserialize() {
let json = r#"[
{
"jsonrpc": "2.0",
"id": 1108954,
"method": "net_version"
},
{
"jsonrpc": "2.0",
"id": "1108955",
"method": "eth_getBlockByNumber",
"params": [
"0x73f151",
true
]
}
]"#;

let result: Result<JrpcRequestMetaMaybeBatch, _> = serde_json::from_str(json);
assert!(result.is_ok(), "{result:?}");

let batch = result.unwrap();
match batch {
JrpcRequestMetaMaybeBatch::Batch(requests) => {
assert_eq!(requests.len(), 2);

// First request
assert_eq!(*requests[0].id(), Id::Number(1108954));
assert_eq!(requests[0].method(), Cow::Borrowed("net_version"));
assert!(requests[0].params().is_empty());

// Second request
assert_eq!(*requests[1].id(), Id::Number(1108955));
assert_eq!(requests[1].method(), Cow::Borrowed("eth_getBlockByNumber"));
assert_eq!(requests[1].params().len(), 2);
}
JrpcRequestMetaMaybeBatch::Single(_) => {
panic!("Expected Batch variant, got Single");
}
}
}
}
189 changes: 184 additions & 5 deletions crates/rproxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ use crate::{
utils::tls_certificate_validity_timestamps,
};

const MAX_OPEN_FILES: u64 = 10240;

// Proxy ---------------------------------------------------------------

pub struct Server {}
Expand All @@ -39,6 +37,14 @@ impl Server {
let canceller = Server::wait_for_shutdown_signal();
let resetter = Server::wait_for_reset_signal(canceller.clone());

Self::_run(config, canceller, resetter).await
}

async fn _run(
config: Config,
canceller: CancellationToken,
resetter: broadcast::Sender<()>,
) -> Result<(), Box<dyn std::error::Error + Send>> {
// try to set system limits
match rlimit::getrlimit(rlimit::Resource::NOFILE) {
Ok((_, hard)) => {
Expand Down Expand Up @@ -195,7 +201,11 @@ impl Server {
}));
}

futures::future::join_all(services).await;
for res in futures::future::join_all(services).await.iter() {
if let Err(err) = res {
warn!(error = ?err, "One of the services had failed")
}
}
}

Ok(())
Expand Down Expand Up @@ -237,7 +247,7 @@ impl Server {
}

fn wait_for_reset_signal(canceller: CancellationToken) -> broadcast::Sender<()> {
let (resetter, _) = broadcast::channel::<()>(2);
let (resetter, _) = broadcast::channel::<()>(1);

{
let resetter = resetter.clone();
Expand All @@ -252,7 +262,8 @@ impl Server {
info!("Hangup signal received, resetting...");

if let Err(err) = resetter.send(()) {
error!(from = "sighup", error = ?err, "Failed to broadcast reset signal");
error!(from = "sighup", error = ?err, "Failed to broadcast reset signal, shutting down whole proxy...");
canceller.cancel();
}
}

Expand All @@ -267,3 +278,171 @@ impl Server {
resetter
}
}

// tests ===============================================================

#[cfg(test)]
mod tests {
use std::{net::SocketAddr, time::Duration};

use awc::{Client, http::header};
use clap::Parser;
use jsonrpsee::{
RpcModule,
server::{ServerBuilder, ServerHandle},
};
use tracing::{debug, info};

use super::*;
use crate::config::Config;

async fn spawn_rpc_backend() -> (SocketAddr, ServerHandle) {
let server = ServerBuilder::default().build("127.0.0.1:0").await.unwrap();

let addr: SocketAddr = server.local_addr().unwrap();

let mut module = RpcModule::new(());

module
.register_async_method("eth_chainId", |_params, _ctx, _ext| async move {
Ok::<_, jsonrpsee::types::ErrorObjectOwned>("0x1")
})
.unwrap();

let handle = server.start(module);

(addr, handle)
}

#[actix_web::test]
async fn test_circuit_breaker() {
let (backend, _handle) = spawn_rpc_backend().await;

let cfg = {
let mut cfg = Config::parse_from(["rproxy"]);

cfg.authrpc.enabled = true;
cfg.authrpc.backend_url = format!("http://{backend}");
cfg.authrpc.listen_address = "127.0.0.1:18645".into();
cfg.authrpc.shutdown_timeout_sec = 1;

cfg.rpc.enabled = true;
cfg.rpc.backend_url = format!("http://{backend}");
cfg.rpc.listen_address = "127.0.0.1:18651".into();
cfg.rpc.shutdown_timeout_sec = 1;

cfg.logging.level = "warn,rproxy::server::tests=info".into();
cfg.logging.setup_logging();

cfg
};

let proxy_addr_authrpc = cfg.clone().authrpc.listen_address;
let proxy_addr_rpc = cfg.clone().rpc.listen_address;

let canceller = tokio_util::sync::CancellationToken::new();
let resetter = Server::wait_for_reset_signal(canceller.clone());

let server = {
let canceller = canceller.clone();
let resetter = resetter.clone();

actix_rt::spawn(async move { Server::_run(cfg, canceller, resetter).await })
};
actix_rt::time::sleep(std::time::Duration::from_millis(100)).await;

{
let canceller = canceller.clone();
let client = Client::builder().timeout(Duration::from_millis(10)).finish();

actix_rt::spawn(async move {
loop {
actix_rt::time::sleep(std::time::Duration::from_millis(10)).await;

let req = client
.post(format!("http://{proxy_addr_authrpc}"))
.insert_header((header::CONTENT_TYPE, mime::APPLICATION_JSON))
.send_body(
r#"{"jsonrpc":"2.0","method":"eth_chainId","params":[],"id":1}"#,
);

tokio::select! {
res = req => {
match res {
Ok(mut res) => {
let _ = res.body().await;
}

Err(err) => {
debug!(error = ?err, "Failed to send a request");
}
}
}

_ = canceller.cancelled() => {
break
}
}
}
});
}

{
let canceller = canceller.clone();
let client = Client::builder().timeout(Duration::from_millis(10)).finish();

actix_rt::spawn(async move {
loop {
actix_rt::time::sleep(std::time::Duration::from_millis(10)).await;

let req = client
.post(format!("http://{proxy_addr_rpc}"))
.insert_header((header::CONTENT_TYPE, mime::APPLICATION_JSON))
.send_body(
r#"{"jsonrpc":"2.0","method":"eth_chainId","params":[],"id":1}"#,
);

tokio::select! {
res = req => {
match res {
Ok(mut res) => {
let _ = res.body().await;
}

Err(err) => {
debug!(error = ?err, "Failed to send a request");
}
}
}

_ = canceller.cancelled() => {
break
}
}
}
});
}

for i in 0..10 {
actix_rt::time::sleep(std::time::Duration::from_millis(1200)).await;

match resetter.send(()) {
Err(err) => {
debug!(iteration = i, error = ?err, "Failed to send a reset");
}

Ok(proxies_count) => {
info!(iteration = i, proxies_count = proxies_count, "Sent a reset");
assert_eq!(
proxies_count, 2,
"sent reset wrong count of proxies: {proxies_count} != 2"
);
}
}
}

canceller.cancel();

tokio::time::timeout(tokio::time::Duration::from_secs(5), server).await.ok();
}
}
16 changes: 16 additions & 0 deletions crates/rproxy/src/server/proxy/config/authrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,17 @@ pub(crate) struct ConfigAuthrpc {
name("authrpc_remove_backend_from_mirroring_peers")
)]
pub(crate) remove_backend_from_mirroring_peers: bool,

/// timeout for graceful shutdown of authrpc workers
#[arg(
default_value = "5",
env = "RPROXY_AUTHRPC_SHUTDOWN_TIMEOUT_SEC",
help_heading = "authrpc",
long("authrpc-shutdown-timeout-sec"),
name("authrpc_shutdown_timeout_sec"),
value_name = "seconds"
)]
pub(crate) shutdown_timeout_sec: u64,
}

impl ConfigAuthrpc {
Expand Down Expand Up @@ -449,6 +460,11 @@ impl ConfigProxyHttp for ConfigAuthrpc {
fn prealloacated_response_buffer_size(&self) -> usize {
1024 * self.prealloacated_response_buffer_size_kb
}

#[inline]
fn shutdown_timeout_sec(&self) -> u64 {
self.shutdown_timeout_sec
}
}

// ConfigAuthrpcError --------------------------------------------------
Expand Down
16 changes: 16 additions & 0 deletions crates/rproxy/src/server/proxy/config/flashblocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,17 @@ pub(crate) struct ConfigFlashblocks {
)]
pub(crate) log_sanitise: bool,

/// timeout for graceful shutdown of flashblocks workers
#[arg(
default_value = "5",
env = "RPROXY_FLASHBLOCKS_SHUTDOWN_TIMEOUT_SEC",
help_heading = "flashblocks",
long("flashblocks-shutdown-timeout-sec"),
name("flashblocks_shutdown_timeout_sec"),
value_name = "seconds"
)]
pub(crate) shutdown_timeout_sec: u64,

/// the chance (between 0.0 and 1.0) that pings received from
/// flashblocks backend would be ignored (no pong sent)
#[arg(
Expand Down Expand Up @@ -268,6 +279,11 @@ impl ConfigProxyWs for ConfigFlashblocks {
self.log_sanitise
}

#[inline]
fn shutdown_timeout_sec(&self) -> u64 {
self.shutdown_timeout_sec
}

#[inline]
#[cfg(feature = "chaos")]
fn chaos_probability_backend_ping_ignored(&self) -> f64 {
Expand Down
Loading