Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ COPTS = [
}) + select({
"//bazel/config:brpc_with_asan": ["-fsanitize=address"],
"//conditions:default": [""],
}) + select({
":brpc_with_gdr": ["-DBRPC_WITH_GDR=1"],
"//conditions:default": [""],
}) + select({
"//bazel/config:brpc_with_no_pthread_mutex_hook": ["-DNO_PTHREAD_MUTEX_HOOK"],
"//conditions:default": [""],
Expand Down Expand Up @@ -232,6 +235,7 @@ BUTIL_SRCS = [
"src/butil/iobuf.cpp",
"src/butil/single_iobuf.cpp",
"src/butil/iobuf_profiler.cpp",
"src/butil/gpu/gpu_block_pool.cpp",
"src/butil/binary_printer.cpp",
"src/butil/recordio.cc",
"src/butil/popen.cpp",
Expand Down Expand Up @@ -337,6 +341,9 @@ cc_library(
"-DUNIT_TEST",
],
"//conditions:default": [],
}) + select({
":brpc_with_gdr": ["@local_config_cuda//cuda:cuda_headers"],
"//conditions:default": [],
}),
includes = [
"src/",
Expand All @@ -356,6 +363,9 @@ cc_library(
}) + select({
"//bazel/config:brpc_with_boringssl": ["@boringssl//:ssl", "@boringssl//:crypto"],
"//conditions:default": ["@openssl//:ssl", "@openssl//:crypto"],
}) + select({
":brpc_with_gdr": ["@local_config_cuda//cuda:cuda_headers"],
"//conditions:default": [],
}),
)

Expand Down Expand Up @@ -573,6 +583,9 @@ cc_library(
"@org_apache_thrift//:thrift",
],
"//conditions:default": [],
}) + select({
":brpc_with_gdr": ["@local_config_cuda//cuda:cuda_headers"],
"//conditions:default": [],
}),
)

Expand Down
8 changes: 7 additions & 1 deletion bazel/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ config_setting(
visibility = ["//visibility:public"],
)

config_setting(
name = "brpc_with_gdr",
define_values = {"BRPC_WITH_GDR": "true"},
visibility = ["//visibility:public"],
)

config_setting(
name = "brpc_with_boringssl",
define_values = {"BRPC_WITH_BORINGSSL": "true"},
Expand Down Expand Up @@ -148,4 +154,4 @@ config_setting(
name = "with_babylon_counter",
define_values = {"with_babylon_counter": "true"},
visibility = ["//visibility:public"],
)
)
185 changes: 156 additions & 29 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <google/protobuf/text_format.h>
#include "butil/logging.h" // LOG()
#include "butil/iobuf.h" // butil::IOBuf
#include "butil/gpu/gpu_block_pool.h"
#include "butil/raw_pack.h" // RawPacker RawUnpacker
#include "butil/memory/scope_guard.h"
#include "json2pb/json_to_pb.h"
Expand Down Expand Up @@ -69,6 +70,10 @@ DECLARE_bool(pb_enum_as_number);
// 5. Not supported: chunk_info

// Pack header into `buf'

const int header_size = 12;
const int prefetch_d2h_size = 512;

inline void PackRpcHeader(char* rpc_header, uint32_t meta_size, int payload_size) {
uint32_t* dummy = (uint32_t*)rpc_header; // suppress strict-alias warning
*dummy = *(uint32_t*)"PRPC";
Expand Down Expand Up @@ -101,44 +106,103 @@ static void SerializeRpcHeaderAndMeta(

ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket,
bool /*read_eof*/, const void*) {

char header_buf[12];
const size_t n = source->copy_to(header_buf, sizeof(header_buf));
if (n >= 4) {
void* dummy = header_buf;
if (*(const uint32_t*)dummy != *(const uint32_t*)"PRPC") {
return MakeParseError(PARSE_ERROR_TRY_OTHERS);
}
} else {
if (memcmp(header_buf, "PRPC", n) != 0) {
return MakeParseError(PARSE_ERROR_TRY_OTHERS);
}
}
if (n < sizeof(header_buf)) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
size_t n = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to put the baidu_rpc_protocol.cpp file containing GPU functionality into a separate file called baidu_rpc_with_gpu_protocol.cpp, or to define a new protocol. Currently, the macro definitions generate too many if-else statements.

Copy link
Author

@randomkang randomkang Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe put GPU functionality into a separate file or define a new protocol is two heavy, we reorg the code to decrease the macro definitions. Is it ok? @yanglimingcn

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to separate new protocol file for GDR feature, Besides, I have a idea we can also define a new protocol meta that includes both request meta and response meta carrying the GDR attribute value., the corresponding meta should also contain the GPU address information that both sides need to register with IB. Doing the GDR operations inside processRequest and ProcessRpcResponse method step will probably be more reasonable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the server or client wants to know the GPU memory region of the other side, we can send this information with a separate rpc transport. But i do not see the the benefits of this approach. Can you explain more?

uint32_t body_size;
uint32_t meta_size;
butil::RawUnpacker(header_buf + 4).unpack32(body_size).unpack32(meta_size);
if (body_size > FLAGS_max_body_size) {
// We need this log to report the body_size to give users some clues
// which is not printed in InputMessenger.
LOG(ERROR) << "body_size=" << body_size << " from "
<< socket->remote_side() << " is too large";
return MakeParseError(PARSE_ERROR_TOO_BIG_DATA);
} else if (source->length() < sizeof(header_buf) + body_size) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
ParseError pe = PARSE_OK;

#if BRPC_WITH_GDR
void* prefetch_d2h_data = NULL;
uint32_t data_meta = source->get_first_data_meta_high32();
bool is_gpu_memory = (data_meta == static_cast<uint32_t>(butil::IOBuf::GPU_MEMORY));
butil::gdr::BlockPoolAllocator* host_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_cpu_allocator();
if (is_gpu_memory) {
prefetch_d2h_data = host_allocator->AllocateRaw(prefetch_d2h_size);
if (prefetch_d2h_data == NULL) {
LOG(FATAL) << "alloc host data failed!!!";
}
n = source->copy_from_gpu(prefetch_d2h_data, prefetch_d2h_size);
size_t copy_size = n > 12 ? 12 : n;
memcpy(header_buf, prefetch_d2h_data, copy_size);
} else {
n = source->copy_to(header_buf, sizeof(header_buf));
}
if (meta_size > body_size) {
LOG(ERROR) << "meta_size=" << meta_size << " is bigger than body_size="
<< body_size;
// Pop the message
source->pop_front(sizeof(header_buf) + body_size);
return MakeParseError(PARSE_ERROR_TRY_OTHERS);
#else
n = source->copy_to(header_buf, sizeof(header_buf));
#endif // BRPC_WITH_GDR

do {
if (n >= 4) {
void* dummy = header_buf;
if (*(const uint32_t*)dummy != *(const uint32_t*)"PRPC") {
pe = PARSE_ERROR_TRY_OTHERS;
break;
}
} else {
if (memcmp(header_buf, "PRPC", n) != 0) {
pe = PARSE_ERROR_TRY_OTHERS;
break;
}
}
if (n < sizeof(header_buf)) {
pe = PARSE_ERROR_NOT_ENOUGH_DATA;
break;
}
butil::RawUnpacker(header_buf + 4).unpack32(body_size).unpack32(meta_size);
if (body_size > FLAGS_max_body_size) {
// We need this log to report the body_size to give users some clues
// which is not printed in InputMessenger.
LOG(ERROR) << "body_size=" << body_size << " from "
<< socket->remote_side() << " is too large";
pe = PARSE_ERROR_TOO_BIG_DATA;
break;
} else if (source->length() < sizeof(header_buf) + body_size) {
pe = PARSE_ERROR_NOT_ENOUGH_DATA;
break;
}
if (meta_size > body_size) {
LOG(ERROR) << "meta_size=" << meta_size << " is bigger than body_size="
<< body_size;
// Pop the message
source->pop_front(sizeof(header_buf) + body_size);
pe = PARSE_ERROR_TRY_OTHERS;
break;
}
} while (0);

if (pe != PARSE_OK) {
#if BRPC_WITH_GDR
if (is_gpu_memory) {
host_allocator->DeallocateRaw(prefetch_d2h_data);
}
#endif // BRPC_WITH_GDR
return MakeParseError(pe);
}

source->pop_front(sizeof(header_buf));
MostCommonMessage* msg = MostCommonMessage::Get();

#if BRPC_WITH_GDR
if (is_gpu_memory) {
if (header_size + meta_size <= n) {
auto deleter = [host_allocator, prefetch_d2h_data](void* data) { host_allocator->DeallocateRaw(prefetch_d2h_data); };
msg->meta.append_user_data_with_meta((char*)prefetch_d2h_data + header_size, meta_size, deleter, n);
source->pop_front(meta_size);
} else {
host_allocator->DeallocateRaw(prefetch_d2h_data);
source->cutn_from_gpu(&msg->meta, meta_size);
}
source->cutn(&msg->payload, body_size - meta_size);
} else {
source->cutn(&msg->meta, meta_size);
source->cutn(&msg->payload, body_size - meta_size);
}
#else
source->cutn(&msg->meta, meta_size);
source->cutn(&msg->payload, body_size - meta_size);
#endif // BRPC_WITH_GDR
return MakeMessage(msg);
}

Expand Down Expand Up @@ -793,7 +857,29 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {

butil::IOBuf req_buf;
int body_without_attachment_size = req_size - meta.attachment_size();
#if BRPC_WITH_GDR
int meta_size = msg->meta.size();
uint32_t data_meta = msg->payload.get_first_data_meta_high32();
bool is_gpu_memory = (data_meta == static_cast<uint32_t>(butil::IOBuf::GPU_MEMORY));
if(is_gpu_memory) {
int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta();
if (header_size + meta_size + body_without_attachment_size <= real_prefetch_d2h_size) {
void* data = msg->meta.get_first_data_ptr();
if (data == nullptr) {
LOG(FATAL) << "illegal data!!!";
}
req_buf.append((char*)data + meta_size, body_without_attachment_size);
msg->payload.pop_front(body_without_attachment_size);
} else {
msg->payload.cutn_from_gpu(&req_buf, body_without_attachment_size);
}
}
else {
msg->payload.cutn(&req_buf, body_without_attachment_size);
}
#else
msg->payload.cutn(&req_buf, body_without_attachment_size);
#endif // BRPC_WITH_GDR
if (meta.attachment_size() > 0) {
cntl->request_attachment().swap(msg->payload);
}
Expand Down Expand Up @@ -963,8 +1049,14 @@ void ProcessRpcResponse(InputMessageBase* msg_base) {
}
// Parse response message iff error code from meta is 0
butil::IOBuf res_buf;
int meta_size = msg->meta.size();
const int res_size = msg->payload.length();
butil::IOBuf* res_buf_ptr = &msg->payload;

#if BRPC_WITH_GDR
uint32_t data_meta = msg->payload.get_first_data_meta_high32();
bool is_gpu_memory = (data_meta == static_cast<uint32_t>(butil::IOBuf::GPU_MEMORY));
#endif // BRPC_WITH_GDR
if (meta.has_attachment_size()) {
if (meta.attachment_size() > res_size) {
cntl->SetFailed(
Expand All @@ -973,9 +1065,44 @@ void ProcessRpcResponse(InputMessageBase* msg_base) {
break;
}
int body_without_attachment_size = res_size - meta.attachment_size();

#if BRPC_WITH_GDR
if(is_gpu_memory) {
int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta();
if (header_size + meta_size + body_without_attachment_size <= real_prefetch_d2h_size) {
void* data = msg->meta.get_first_data_ptr();
if (data == nullptr) {
LOG(FATAL) << "illegal data!!!";
}
res_buf.append((char*)data + meta_size, body_without_attachment_size);
msg->payload.pop_front(body_without_attachment_size);
} else {
msg->payload.cutn_from_gpu(&res_buf, body_without_attachment_size);
}
}
else {
msg->payload.cutn(&res_buf, body_without_attachment_size);
}
#else
msg->payload.cutn(&res_buf, body_without_attachment_size);
#endif // BRPC_WITH_GDR
res_buf_ptr = &res_buf;
cntl->response_attachment().swap(msg->payload);
#if BRPC_WITH_GDR
} else if(is_gpu_memory) {
int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta();
if (header_size + meta_size + res_size <= real_prefetch_d2h_size) {
void* data = msg->meta.get_first_data_ptr();
if (data == nullptr) {
LOG(FATAL) << "illegal data!!!";
}
res_buf.append((char*)data + meta_size, res_size);
msg->payload.pop_front(res_size);
} else {
msg->payload.cutn_from_gpu(&res_buf, res_size);
}
res_buf_ptr = &res_buf;
#endif // BRPC_WITH_GDR
}

ContentType content_type = meta.content_type();
Expand Down
Loading
Loading