Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 163 additions & 20 deletions sei-db/tools/rpc_bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,80 @@ func (s *LatencyStats) Report() {
s.Method, s.Total, s.Errors, rps, p(0.50), p(0.95), p(0.99))
}

func configureOutput(outputFile string) (func() error, error) {
if outputFile == "" {
return func() error { return nil }, nil
}

cleanPath := filepath.Clean(outputFile)
if err := os.MkdirAll(filepath.Dir(cleanPath), 0o750); err != nil {
return nil, err
}

file, err := os.OpenFile(cleanPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600)
if err != nil {
return nil, err
}

originalStdout := os.Stdout
originalStderr := os.Stderr

stdoutReader, stdoutPipe, err := os.Pipe()
if err != nil {
_ = file.Close()
return nil, err
}
stderrReader, stderrPipe, err := os.Pipe()
if err != nil {
_ = stdoutReader.Close()
_ = stdoutPipe.Close()
_ = file.Close()
return nil, err
}

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
_, _ = io.Copy(io.MultiWriter(originalStdout, file), stdoutReader)
}()
go func() {
defer wg.Done()
_, _ = io.Copy(io.MultiWriter(originalStderr, file), stderrReader)
}()

os.Stdout = stdoutPipe
os.Stderr = stderrPipe

return func() error {
var closeErr error

if err := stdoutPipe.Close(); err != nil && closeErr == nil {
closeErr = err
}
if err := stderrPipe.Close(); err != nil && closeErr == nil {
closeErr = err
}

os.Stdout = originalStdout
os.Stderr = originalStderr

wg.Wait()

if err := stdoutReader.Close(); err != nil && closeErr == nil {
closeErr = err
}
if err := stderrReader.Close(); err != nil && closeErr == nil {
closeErr = err
}
if err := file.Close(); err != nil && closeErr == nil {
closeErr = err
}

return closeErr
}, nil
}

var httpClient = &http.Client{
Timeout: 120 * time.Second,
Transport: &http.Transport{
Expand Down Expand Up @@ -325,6 +399,42 @@ func printStats(title string, stats map[string]*LatencyStats) {
"TOTAL", totalReqs, totalErrs, rps, totalDuration.Round(time.Millisecond))
}

func buildBlockNumbers(latestBlock int64, blockCount int, startBlock, endBlock int64) ([]int64, error) {
if startBlock > 0 || endBlock > 0 {
if startBlock <= 0 {
return nil, fmt.Errorf("start-block is required when selecting an explicit block range")
}
if endBlock == 0 {
endBlock = startBlock
}
if startBlock > endBlock {
return nil, fmt.Errorf("start-block (%d) cannot be greater than end-block (%d)", startBlock, endBlock)
}
if startBlock < 1 {
return nil, fmt.Errorf("start-block must be >= 1")
}
if endBlock > latestBlock {
return nil, fmt.Errorf("end-block (%d) cannot exceed latest block (%d)", endBlock, latestBlock)
}

blockNums := make([]int64, 0, endBlock-startBlock+1)
for blockNum := startBlock; blockNum <= endBlock; blockNum++ {
blockNums = append(blockNums, blockNum)
}
return blockNums, nil
}

blockNums := make([]int64, 0, blockCount)
for i := 0; i < blockCount; i++ {
blockNum := latestBlock - int64(i)
if blockNum < 1 {
break
}
blockNums = append(blockNums, blockNum)
}
return blockNums, nil
}

func writeLabel(img *image.RGBA, x, y int, text string, col color.Color) {
d := &font.Drawer{
Dst: img,
Expand Down Expand Up @@ -551,52 +661,81 @@ func main() {
endpoint string
concurrency int
blockCount int
startBlock int64
endBlock int64
requestsPer int
methodsFlag string
traceDiscover int
plotDir string
outputFile string
)
flag.StringVar(&endpoint, "endpoint", "", "RPC endpoint URL (required)")
flag.IntVar(&concurrency, "concurrency", 16, "number of concurrent workers")
flag.IntVar(&blockCount, "blocks", 20, "number of recent blocks to sample")
flag.Int64Var(&startBlock, "start-block", 0, "explicit starting block number to benchmark (inclusive)")
flag.Int64Var(&endBlock, "end-block", 0, "explicit ending block number to benchmark (inclusive); defaults to start-block when omitted")
flag.IntVar(&requestsPer, "requests", 100, "requests per method per phase")
flag.StringVar(&methodsFlag, "methods", "", "comma-separated methods to run (default: all)")
flag.IntVar(&traceDiscover, "trace-discover", 5, "txs to trace for storage slot discovery (0 to disable)")
flag.StringVar(&plotDir, "plot-dir", "", "directory to write per-block trace PNG charts (empty disables plots)")
flag.StringVar(&outputFile, "output-file", "", "file to write benchmark output to in addition to stdout")
flag.Parse()

if endpoint == "" {
fmt.Fprintf(os.Stderr, "Usage: go run main.go -endpoint <rpc-url> [-concurrency 16] [-blocks 20] [-requests 100] [-methods debug_traceBlockByNumber,eth_getLogs]\n")
fmt.Fprintf(os.Stderr, "Usage: go run main.go -endpoint <rpc-url> [-concurrency 16] [-blocks 20] [-start-block 100 -end-block 200] [-requests 100] [-methods debug_traceBlockByNumber,eth_getLogs] [-output-file bench.txt]\n")
os.Exit(1)
}
closeOutput, err := configureOutput(outputFile)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to configure output file: %v\n", err)
os.Exit(1)
}
defer func() {
if err := closeOutput(); err != nil {
fmt.Fprintf(os.Stderr, "Failed to close output file: %v\n", err)
}
}()

// =========================================================================
// Discover recent blocks, transactions, and addresses
// =========================================================================
fmt.Printf("RPC Read Benchmark\n")
fmt.Printf(" endpoint: %s\n", endpoint)
fmt.Printf(" concurrency: %d\n", concurrency)
fmt.Printf(" blocks: %d\n", blockCount)
if startBlock > 0 || endBlock > 0 {
effectiveEndBlock := endBlock
if effectiveEndBlock == 0 {
effectiveEndBlock = startBlock
}
fmt.Printf(" range: %d-%d\n", startBlock, effectiveEndBlock)
} else {
fmt.Printf(" blocks: %d recent blocks\n", blockCount)
}
fmt.Printf(" requests: %d per method per phase\n", requestsPer)
if outputFile != "" {
fmt.Printf(" output file: %s\n", filepath.Clean(outputFile))
}

fmt.Printf("\n--- Discovering recent blocks ---\n")
fmt.Printf("\n--- Discovering blocks ---\n")
latestBlock, err := getLatestBlockNumber(endpoint)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to get latest block: %v\n", err)
os.Exit(1)
}
fmt.Printf("Latest block: %d\n", latestBlock)

blockNums, err := buildBlockNumbers(latestBlock, blockCount, startBlock, endBlock)
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid block selection: %v\n", err)
os.Exit(1)
}

var blocks []*BlockInfo
var allTxHashes []string
var allAddresses []string
addrSeen := make(map[string]bool)

for i := 0; i < blockCount; i++ {
blockNum := latestBlock - int64(i)
if blockNum < 1 {
break
}
for _, blockNum := range blockNums {
info, err := getBlockInfo(endpoint, blockNum)
if err != nil {
fmt.Printf(" block %d: error %v\n", blockNum, err)
Expand All @@ -623,7 +762,7 @@ func main() {
os.Exit(1)
}
if len(allAddresses) == 0 {
fmt.Fprintf(os.Stderr, "No addresses found in recent blocks\n")
fmt.Fprintf(os.Stderr, "No addresses found in selected blocks\n")
os.Exit(1)
}
fmt.Printf("Discovered %d blocks, %d transactions, %d unique addresses\n",
Expand All @@ -638,7 +777,11 @@ func main() {
defer rngMu.Unlock()
return rng.Intn(n)
}
latestHex := fmt.Sprintf("0x%x", latestBlock)
referenceBlock := latestBlock
if startBlock > 0 || endBlock > 0 {
referenceBlock = blocks[len(blocks)-1].Number
}
referenceHex := fmt.Sprintf("0x%x", referenceBlock)
randBlock := func() *BlockInfo { return blocks[randomIntn(len(blocks))] }
randAddr := func() string { return allAddresses[randomIntn(len(allAddresses))] }
randTxHash := func() string {
Expand All @@ -650,16 +793,15 @@ func main() {
randStorageParams := func() []interface{} {
if len(allStorageSlots) > 0 {
s := allStorageSlots[randomIntn(len(allStorageSlots))]
return []interface{}{s.Address, s.Slot, latestHex}
return []interface{}{s.Address, s.Slot, referenceHex}
}
return []interface{}{randAddr(), fmt.Sprintf("0x%064x", randomIntn(10)), latestHex}
return []interface{}{randAddr(), fmt.Sprintf("0x%064x", randomIntn(10)), referenceHex}
}
randLogsParams := func() []interface{} {
start := randomIntn(len(blocks))
maxWindow := min(5, len(blocks)-start)
end := start + randomIntn(maxWindow)
fromBlock := blocks[end].Number
toBlock := blocks[start].Number
first := randBlock().Number
second := randBlock().Number
fromBlock := min(first, second)
toBlock := max(first, second)
return []interface{}{map[string]interface{}{
"fromBlock": fmt.Sprintf("0x%x", fromBlock),
"toBlock": fmt.Sprintf("0x%x", toBlock),
Expand All @@ -673,9 +815,9 @@ func main() {
{"debug_traceBlockByNumber", func() []interface{} { return []interface{}{fmt.Sprintf("0x%x", randBlock().Number)} }, 10, true},
{"debug_traceTransaction", func() []interface{} { return []interface{}{randTxHash()} }, 10, true},
{"eth_getLogs", func() []interface{} { return randLogsParams() }, 20, true},
{"eth_getBalance", func() []interface{} { return []interface{}{randAddr(), latestHex} }, 25, false},
{"eth_getTransactionCount", func() []interface{} { return []interface{}{randAddr(), latestHex} }, 15, false},
{"eth_getCode", func() []interface{} { return []interface{}{randAddr(), latestHex} }, 15, false},
{"eth_getBalance", func() []interface{} { return []interface{}{randAddr(), referenceHex} }, 25, false},
{"eth_getTransactionCount", func() []interface{} { return []interface{}{randAddr(), referenceHex} }, 15, false},
{"eth_getCode", func() []interface{} { return []interface{}{randAddr(), referenceHex} }, 15, false},
{"eth_getStorageAt", func() []interface{} { return randStorageParams() }, 25, false},
}

Expand Down Expand Up @@ -724,6 +866,7 @@ func main() {
fmt.Printf("Discovered %d unique storage slots\n", len(allStorageSlots))
}

fmt.Printf(" reference: block %d\n", referenceBlock)
fmt.Printf(" methods: ")
for i, m := range allMethods {
if i > 0 {
Expand Down
Loading