diff --git a/sei-db/tools/rpc_bench/main.go b/sei-db/tools/rpc_bench/main.go index 870208821a..df51c91cc7 100644 --- a/sei-db/tools/rpc_bench/main.go +++ b/sei-db/tools/rpc_bench/main.go @@ -70,6 +70,80 @@ func (s *LatencyStats) Report() { s.Method, s.Total, s.Errors, rps, p(0.50), p(0.95), p(0.99)) } +func configureOutput(outputFile string) (func() error, error) { + if outputFile == "" { + return func() error { return nil }, nil + } + + cleanPath := filepath.Clean(outputFile) + if err := os.MkdirAll(filepath.Dir(cleanPath), 0o750); err != nil { + return nil, err + } + + file, err := os.OpenFile(cleanPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600) + if err != nil { + return nil, err + } + + originalStdout := os.Stdout + originalStderr := os.Stderr + + stdoutReader, stdoutPipe, err := os.Pipe() + if err != nil { + _ = file.Close() + return nil, err + } + stderrReader, stderrPipe, err := os.Pipe() + if err != nil { + _ = stdoutReader.Close() + _ = stdoutPipe.Close() + _ = file.Close() + return nil, err + } + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + _, _ = io.Copy(io.MultiWriter(originalStdout, file), stdoutReader) + }() + go func() { + defer wg.Done() + _, _ = io.Copy(io.MultiWriter(originalStderr, file), stderrReader) + }() + + os.Stdout = stdoutPipe + os.Stderr = stderrPipe + + return func() error { + var closeErr error + + if err := stdoutPipe.Close(); err != nil && closeErr == nil { + closeErr = err + } + if err := stderrPipe.Close(); err != nil && closeErr == nil { + closeErr = err + } + + os.Stdout = originalStdout + os.Stderr = originalStderr + + wg.Wait() + + if err := stdoutReader.Close(); err != nil && closeErr == nil { + closeErr = err + } + if err := stderrReader.Close(); err != nil && closeErr == nil { + closeErr = err + } + if err := file.Close(); err != nil && closeErr == nil { + closeErr = err + } + + return closeErr + }, nil +} + var httpClient = &http.Client{ Timeout: 120 * time.Second, Transport: &http.Transport{ @@ -325,6 +399,42 @@ func printStats(title string, stats map[string]*LatencyStats) { "TOTAL", totalReqs, totalErrs, rps, totalDuration.Round(time.Millisecond)) } +func buildBlockNumbers(latestBlock int64, blockCount int, startBlock, endBlock int64) ([]int64, error) { + if startBlock > 0 || endBlock > 0 { + if startBlock <= 0 { + return nil, fmt.Errorf("start-block is required when selecting an explicit block range") + } + if endBlock == 0 { + endBlock = startBlock + } + if startBlock > endBlock { + return nil, fmt.Errorf("start-block (%d) cannot be greater than end-block (%d)", startBlock, endBlock) + } + if startBlock < 1 { + return nil, fmt.Errorf("start-block must be >= 1") + } + if endBlock > latestBlock { + return nil, fmt.Errorf("end-block (%d) cannot exceed latest block (%d)", endBlock, latestBlock) + } + + blockNums := make([]int64, 0, endBlock-startBlock+1) + for blockNum := startBlock; blockNum <= endBlock; blockNum++ { + blockNums = append(blockNums, blockNum) + } + return blockNums, nil + } + + blockNums := make([]int64, 0, blockCount) + for i := 0; i < blockCount; i++ { + blockNum := latestBlock - int64(i) + if blockNum < 1 { + break + } + blockNums = append(blockNums, blockNum) + } + return blockNums, nil +} + func writeLabel(img *image.RGBA, x, y int, text string, col color.Color) { d := &font.Drawer{ Dst: img, @@ -551,24 +661,40 @@ func main() { endpoint string concurrency int blockCount int + startBlock int64 + endBlock int64 requestsPer int methodsFlag string traceDiscover int plotDir string + outputFile string ) flag.StringVar(&endpoint, "endpoint", "", "RPC endpoint URL (required)") flag.IntVar(&concurrency, "concurrency", 16, "number of concurrent workers") flag.IntVar(&blockCount, "blocks", 20, "number of recent blocks to sample") + flag.Int64Var(&startBlock, "start-block", 0, "explicit starting block number to benchmark (inclusive)") + flag.Int64Var(&endBlock, "end-block", 0, "explicit ending block number to benchmark (inclusive); defaults to start-block when omitted") flag.IntVar(&requestsPer, "requests", 100, "requests per method per phase") flag.StringVar(&methodsFlag, "methods", "", "comma-separated methods to run (default: all)") flag.IntVar(&traceDiscover, "trace-discover", 5, "txs to trace for storage slot discovery (0 to disable)") flag.StringVar(&plotDir, "plot-dir", "", "directory to write per-block trace PNG charts (empty disables plots)") + flag.StringVar(&outputFile, "output-file", "", "file to write benchmark output to in addition to stdout") flag.Parse() if endpoint == "" { - fmt.Fprintf(os.Stderr, "Usage: go run main.go -endpoint [-concurrency 16] [-blocks 20] [-requests 100] [-methods debug_traceBlockByNumber,eth_getLogs]\n") + fmt.Fprintf(os.Stderr, "Usage: go run main.go -endpoint [-concurrency 16] [-blocks 20] [-start-block 100 -end-block 200] [-requests 100] [-methods debug_traceBlockByNumber,eth_getLogs] [-output-file bench.txt]\n") os.Exit(1) } + closeOutput, err := configureOutput(outputFile) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to configure output file: %v\n", err) + os.Exit(1) + } + defer func() { + if err := closeOutput(); err != nil { + fmt.Fprintf(os.Stderr, "Failed to close output file: %v\n", err) + } + }() // ========================================================================= // Discover recent blocks, transactions, and addresses @@ -576,10 +702,21 @@ func main() { fmt.Printf("RPC Read Benchmark\n") fmt.Printf(" endpoint: %s\n", endpoint) fmt.Printf(" concurrency: %d\n", concurrency) - fmt.Printf(" blocks: %d\n", blockCount) + if startBlock > 0 || endBlock > 0 { + effectiveEndBlock := endBlock + if effectiveEndBlock == 0 { + effectiveEndBlock = startBlock + } + fmt.Printf(" range: %d-%d\n", startBlock, effectiveEndBlock) + } else { + fmt.Printf(" blocks: %d recent blocks\n", blockCount) + } fmt.Printf(" requests: %d per method per phase\n", requestsPer) + if outputFile != "" { + fmt.Printf(" output file: %s\n", filepath.Clean(outputFile)) + } - fmt.Printf("\n--- Discovering recent blocks ---\n") + fmt.Printf("\n--- Discovering blocks ---\n") latestBlock, err := getLatestBlockNumber(endpoint) if err != nil { fmt.Fprintf(os.Stderr, "Failed to get latest block: %v\n", err) @@ -587,16 +724,18 @@ func main() { } fmt.Printf("Latest block: %d\n", latestBlock) + blockNums, err := buildBlockNumbers(latestBlock, blockCount, startBlock, endBlock) + if err != nil { + fmt.Fprintf(os.Stderr, "Invalid block selection: %v\n", err) + os.Exit(1) + } + var blocks []*BlockInfo var allTxHashes []string var allAddresses []string addrSeen := make(map[string]bool) - for i := 0; i < blockCount; i++ { - blockNum := latestBlock - int64(i) - if blockNum < 1 { - break - } + for _, blockNum := range blockNums { info, err := getBlockInfo(endpoint, blockNum) if err != nil { fmt.Printf(" block %d: error %v\n", blockNum, err) @@ -623,7 +762,7 @@ func main() { os.Exit(1) } if len(allAddresses) == 0 { - fmt.Fprintf(os.Stderr, "No addresses found in recent blocks\n") + fmt.Fprintf(os.Stderr, "No addresses found in selected blocks\n") os.Exit(1) } fmt.Printf("Discovered %d blocks, %d transactions, %d unique addresses\n", @@ -638,7 +777,11 @@ func main() { defer rngMu.Unlock() return rng.Intn(n) } - latestHex := fmt.Sprintf("0x%x", latestBlock) + referenceBlock := latestBlock + if startBlock > 0 || endBlock > 0 { + referenceBlock = blocks[len(blocks)-1].Number + } + referenceHex := fmt.Sprintf("0x%x", referenceBlock) randBlock := func() *BlockInfo { return blocks[randomIntn(len(blocks))] } randAddr := func() string { return allAddresses[randomIntn(len(allAddresses))] } randTxHash := func() string { @@ -650,16 +793,15 @@ func main() { randStorageParams := func() []interface{} { if len(allStorageSlots) > 0 { s := allStorageSlots[randomIntn(len(allStorageSlots))] - return []interface{}{s.Address, s.Slot, latestHex} + return []interface{}{s.Address, s.Slot, referenceHex} } - return []interface{}{randAddr(), fmt.Sprintf("0x%064x", randomIntn(10)), latestHex} + return []interface{}{randAddr(), fmt.Sprintf("0x%064x", randomIntn(10)), referenceHex} } randLogsParams := func() []interface{} { - start := randomIntn(len(blocks)) - maxWindow := min(5, len(blocks)-start) - end := start + randomIntn(maxWindow) - fromBlock := blocks[end].Number - toBlock := blocks[start].Number + first := randBlock().Number + second := randBlock().Number + fromBlock := min(first, second) + toBlock := max(first, second) return []interface{}{map[string]interface{}{ "fromBlock": fmt.Sprintf("0x%x", fromBlock), "toBlock": fmt.Sprintf("0x%x", toBlock), @@ -673,9 +815,9 @@ func main() { {"debug_traceBlockByNumber", func() []interface{} { return []interface{}{fmt.Sprintf("0x%x", randBlock().Number)} }, 10, true}, {"debug_traceTransaction", func() []interface{} { return []interface{}{randTxHash()} }, 10, true}, {"eth_getLogs", func() []interface{} { return randLogsParams() }, 20, true}, - {"eth_getBalance", func() []interface{} { return []interface{}{randAddr(), latestHex} }, 25, false}, - {"eth_getTransactionCount", func() []interface{} { return []interface{}{randAddr(), latestHex} }, 15, false}, - {"eth_getCode", func() []interface{} { return []interface{}{randAddr(), latestHex} }, 15, false}, + {"eth_getBalance", func() []interface{} { return []interface{}{randAddr(), referenceHex} }, 25, false}, + {"eth_getTransactionCount", func() []interface{} { return []interface{}{randAddr(), referenceHex} }, 15, false}, + {"eth_getCode", func() []interface{} { return []interface{}{randAddr(), referenceHex} }, 15, false}, {"eth_getStorageAt", func() []interface{} { return randStorageParams() }, 25, false}, } @@ -724,6 +866,7 @@ func main() { fmt.Printf("Discovered %d unique storage slots\n", len(allStorageSlots)) } + fmt.Printf(" reference: block %d\n", referenceBlock) fmt.Printf(" methods: ") for i, m := range allMethods { if i > 0 {