diff --git a/BUILD.bazel b/BUILD.bazel index 138e416b10..e1a853d2db 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -54,6 +54,9 @@ COPTS = [ }) + select({ "//bazel/config:brpc_with_asan": ["-fsanitize=address"], "//conditions:default": [""], +}) + select({ + ":brpc_with_gdr": ["-DBRPC_WITH_GDR=1"], + "//conditions:default": [""], }) + select({ "//bazel/config:brpc_with_no_pthread_mutex_hook": ["-DNO_PTHREAD_MUTEX_HOOK"], "//conditions:default": [""], @@ -232,6 +235,7 @@ BUTIL_SRCS = [ "src/butil/iobuf.cpp", "src/butil/single_iobuf.cpp", "src/butil/iobuf_profiler.cpp", + "src/butil/gpu/gpu_block_pool.cpp", "src/butil/binary_printer.cpp", "src/butil/recordio.cc", "src/butil/popen.cpp", @@ -337,6 +341,9 @@ cc_library( "-DUNIT_TEST", ], "//conditions:default": [], + }) + select({ + ":brpc_with_gdr": ["@local_config_cuda//cuda:cuda_headers"], + "//conditions:default": [], }), includes = [ "src/", @@ -356,6 +363,9 @@ cc_library( }) + select({ "//bazel/config:brpc_with_boringssl": ["@boringssl//:ssl", "@boringssl//:crypto"], "//conditions:default": ["@openssl//:ssl", "@openssl//:crypto"], + }) + select({ + ":brpc_with_gdr": ["@local_config_cuda//cuda:cuda_headers"], + "//conditions:default": [], }), ) @@ -573,6 +583,9 @@ cc_library( "@org_apache_thrift//:thrift", ], "//conditions:default": [], + }) + select({ + ":brpc_with_gdr": ["@local_config_cuda//cuda:cuda_headers"], + "//conditions:default": [], }), ) diff --git a/bazel/config/BUILD.bazel b/bazel/config/BUILD.bazel index d08ea2ec23..06376cf85c 100644 --- a/bazel/config/BUILD.bazel +++ b/bazel/config/BUILD.bazel @@ -104,6 +104,12 @@ config_setting( visibility = ["//visibility:public"], ) +config_setting( + name = "brpc_with_gdr", + define_values = {"BRPC_WITH_GDR": "true"}, + visibility = ["//visibility:public"], +) + config_setting( name = "brpc_with_boringssl", define_values = {"BRPC_WITH_BORINGSSL": "true"}, @@ -148,4 +154,4 @@ config_setting( name = "with_babylon_counter", define_values = {"with_babylon_counter": "true"}, visibility = ["//visibility:public"], -) \ No newline at end of file +) diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 5adf77b2c5..7a2c079c24 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -23,6 +23,7 @@ #include #include "butil/logging.h" // LOG() #include "butil/iobuf.h" // butil::IOBuf +#include "butil/gpu/gpu_block_pool.h" #include "butil/raw_pack.h" // RawPacker RawUnpacker #include "butil/memory/scope_guard.h" #include "json2pb/json_to_pb.h" @@ -69,6 +70,10 @@ DECLARE_bool(pb_enum_as_number); // 5. Not supported: chunk_info // Pack header into `buf' + +const int header_size = 12; +const int prefetch_d2h_size = 512; + inline void PackRpcHeader(char* rpc_header, uint32_t meta_size, int payload_size) { uint32_t* dummy = (uint32_t*)rpc_header; // suppress strict-alias warning *dummy = *(uint32_t*)"PRPC"; @@ -101,44 +106,103 @@ static void SerializeRpcHeaderAndMeta( ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket, bool /*read_eof*/, const void*) { + char header_buf[12]; - const size_t n = source->copy_to(header_buf, sizeof(header_buf)); - if (n >= 4) { - void* dummy = header_buf; - if (*(const uint32_t*)dummy != *(const uint32_t*)"PRPC") { - return MakeParseError(PARSE_ERROR_TRY_OTHERS); - } - } else { - if (memcmp(header_buf, "PRPC", n) != 0) { - return MakeParseError(PARSE_ERROR_TRY_OTHERS); - } - } - if (n < sizeof(header_buf)) { - return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); - } + size_t n = 0; uint32_t body_size; uint32_t meta_size; - butil::RawUnpacker(header_buf + 4).unpack32(body_size).unpack32(meta_size); - if (body_size > FLAGS_max_body_size) { - // We need this log to report the body_size to give users some clues - // which is not printed in InputMessenger. - LOG(ERROR) << "body_size=" << body_size << " from " - << socket->remote_side() << " is too large"; - return MakeParseError(PARSE_ERROR_TOO_BIG_DATA); - } else if (source->length() < sizeof(header_buf) + body_size) { - return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); + ParseError pe = PARSE_OK; + +#if BRPC_WITH_GDR + void* prefetch_d2h_data = NULL; + uint32_t data_meta = source->get_first_data_meta_high32(); + bool is_gpu_memory = (data_meta == static_cast(butil::IOBuf::GPU_MEMORY)); + butil::gdr::BlockPoolAllocator* host_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_cpu_allocator(); + if (is_gpu_memory) { + prefetch_d2h_data = host_allocator->AllocateRaw(prefetch_d2h_size); + if (prefetch_d2h_data == NULL) { + LOG(FATAL) << "alloc host data failed!!!"; + } + n = source->copy_from_gpu(prefetch_d2h_data, prefetch_d2h_size); + size_t copy_size = n > 12 ? 12 : n; + memcpy(header_buf, prefetch_d2h_data, copy_size); + } else { + n = source->copy_to(header_buf, sizeof(header_buf)); } - if (meta_size > body_size) { - LOG(ERROR) << "meta_size=" << meta_size << " is bigger than body_size=" - << body_size; - // Pop the message - source->pop_front(sizeof(header_buf) + body_size); - return MakeParseError(PARSE_ERROR_TRY_OTHERS); +#else + n = source->copy_to(header_buf, sizeof(header_buf)); +#endif // BRPC_WITH_GDR + + do { + if (n >= 4) { + void* dummy = header_buf; + if (*(const uint32_t*)dummy != *(const uint32_t*)"PRPC") { + pe = PARSE_ERROR_TRY_OTHERS; + break; + } + } else { + if (memcmp(header_buf, "PRPC", n) != 0) { + pe = PARSE_ERROR_TRY_OTHERS; + break; + } + } + if (n < sizeof(header_buf)) { + pe = PARSE_ERROR_NOT_ENOUGH_DATA; + break; + } + butil::RawUnpacker(header_buf + 4).unpack32(body_size).unpack32(meta_size); + if (body_size > FLAGS_max_body_size) { + // We need this log to report the body_size to give users some clues + // which is not printed in InputMessenger. + LOG(ERROR) << "body_size=" << body_size << " from " + << socket->remote_side() << " is too large"; + pe = PARSE_ERROR_TOO_BIG_DATA; + break; + } else if (source->length() < sizeof(header_buf) + body_size) { + pe = PARSE_ERROR_NOT_ENOUGH_DATA; + break; + } + if (meta_size > body_size) { + LOG(ERROR) << "meta_size=" << meta_size << " is bigger than body_size=" + << body_size; + // Pop the message + source->pop_front(sizeof(header_buf) + body_size); + pe = PARSE_ERROR_TRY_OTHERS; + break; + } + } while (0); + + if (pe != PARSE_OK) { +#if BRPC_WITH_GDR + if (is_gpu_memory) { + host_allocator->DeallocateRaw(prefetch_d2h_data); + } +#endif // BRPC_WITH_GDR + return MakeParseError(pe); } + source->pop_front(sizeof(header_buf)); MostCommonMessage* msg = MostCommonMessage::Get(); + +#if BRPC_WITH_GDR + if (is_gpu_memory) { + if (header_size + meta_size <= n) { + auto deleter = [host_allocator, prefetch_d2h_data](void* data) { host_allocator->DeallocateRaw(prefetch_d2h_data); }; + msg->meta.append_user_data_with_meta((char*)prefetch_d2h_data + header_size, meta_size, deleter, n); + source->pop_front(meta_size); + } else { + host_allocator->DeallocateRaw(prefetch_d2h_data); + source->cutn_from_gpu(&msg->meta, meta_size); + } + source->cutn(&msg->payload, body_size - meta_size); + } else { + source->cutn(&msg->meta, meta_size); + source->cutn(&msg->payload, body_size - meta_size); + } +#else source->cutn(&msg->meta, meta_size); source->cutn(&msg->payload, body_size - meta_size); +#endif // BRPC_WITH_GDR return MakeMessage(msg); } @@ -793,7 +857,29 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { butil::IOBuf req_buf; int body_without_attachment_size = req_size - meta.attachment_size(); +#if BRPC_WITH_GDR + int meta_size = msg->meta.size(); + uint32_t data_meta = msg->payload.get_first_data_meta_high32(); + bool is_gpu_memory = (data_meta == static_cast(butil::IOBuf::GPU_MEMORY)); + if(is_gpu_memory) { + int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta(); + if (header_size + meta_size + body_without_attachment_size <= real_prefetch_d2h_size) { + void* data = msg->meta.get_first_data_ptr(); + if (data == nullptr) { + LOG(FATAL) << "illegal data!!!"; + } + req_buf.append((char*)data + meta_size, body_without_attachment_size); + msg->payload.pop_front(body_without_attachment_size); + } else { + msg->payload.cutn_from_gpu(&req_buf, body_without_attachment_size); + } + } + else { + msg->payload.cutn(&req_buf, body_without_attachment_size); + } +#else msg->payload.cutn(&req_buf, body_without_attachment_size); +#endif // BRPC_WITH_GDR if (meta.attachment_size() > 0) { cntl->request_attachment().swap(msg->payload); } @@ -963,8 +1049,14 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { } // Parse response message iff error code from meta is 0 butil::IOBuf res_buf; + int meta_size = msg->meta.size(); const int res_size = msg->payload.length(); butil::IOBuf* res_buf_ptr = &msg->payload; + +#if BRPC_WITH_GDR + uint32_t data_meta = msg->payload.get_first_data_meta_high32(); + bool is_gpu_memory = (data_meta == static_cast(butil::IOBuf::GPU_MEMORY)); +#endif // BRPC_WITH_GDR if (meta.has_attachment_size()) { if (meta.attachment_size() > res_size) { cntl->SetFailed( @@ -973,9 +1065,44 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { break; } int body_without_attachment_size = res_size - meta.attachment_size(); + +#if BRPC_WITH_GDR + if(is_gpu_memory) { + int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta(); + if (header_size + meta_size + body_without_attachment_size <= real_prefetch_d2h_size) { + void* data = msg->meta.get_first_data_ptr(); + if (data == nullptr) { + LOG(FATAL) << "illegal data!!!"; + } + res_buf.append((char*)data + meta_size, body_without_attachment_size); + msg->payload.pop_front(body_without_attachment_size); + } else { + msg->payload.cutn_from_gpu(&res_buf, body_without_attachment_size); + } + } + else { + msg->payload.cutn(&res_buf, body_without_attachment_size); + } +#else msg->payload.cutn(&res_buf, body_without_attachment_size); +#endif // BRPC_WITH_GDR res_buf_ptr = &res_buf; cntl->response_attachment().swap(msg->payload); +#if BRPC_WITH_GDR + } else if(is_gpu_memory) { + int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta(); + if (header_size + meta_size + res_size <= real_prefetch_d2h_size) { + void* data = msg->meta.get_first_data_ptr(); + if (data == nullptr) { + LOG(FATAL) << "illegal data!!!"; + } + res_buf.append((char*)data + meta_size, res_size); + msg->payload.pop_front(res_size); + } else { + msg->payload.cutn_from_gpu(&res_buf, res_size); + } + res_buf_ptr = &res_buf; +#endif // BRPC_WITH_GDR } ContentType content_type = meta.content_type(); diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 1d502a98f7..73bf974330 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -20,6 +20,7 @@ #include #include "butil/fd_utility.h" #include "butil/logging.h" // CHECK, LOG +#include "butil/gpu/gpu_block_pool.h" #include "butil/sys_byteorder.h" // HostToNet,NetToHost #include "bthread/bthread.h" #include "brpc/errno.pb.h" @@ -56,7 +57,7 @@ DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: " "default(8KB - 32B)/large(64KB - 32B)/huge(2MB - 32B)"); DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once."); DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP."); -DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP."); +DEFINE_int32(rdma_prepared_qp_cnt, 256, "Initial count of prepared QP."); DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely"); BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate); DEFINE_bool(rdma_use_polling, false, "Use polling mode for RDMA."); @@ -944,9 +945,13 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { case IBV_WC_RECV: { // recv completion // Please note that only the first wc.byte_len bytes is valid if (wc.byte_len > 0) { +#if BRPC_WITH_GDR + zerocopy = true; +#else if (wc.byte_len < (uint32_t)FLAGS_rdma_zerocopy_min_size) { zerocopy = false; } +#endif // BRPC_WITH_GDR CHECK(_state != FALLBACK_TCP); if (zerocopy) { butil::IOBuf tmp; @@ -1017,11 +1022,44 @@ int RdmaEndpoint::DoPostRecv(void* block, size_t block_size) { return 0; } +int RdmaEndpoint::DoPostRecvGDR(void* block, size_t block_size, uint32_t lkey) { + ibv_recv_wr wr; + memset(&wr, 0, sizeof(wr)); + ibv_sge sge; + sge.addr = (uint64_t)block; + sge.length = block_size; + sge.lkey = lkey; + wr.num_sge = 1; + wr.sg_list = &sge; + //LOG(INFO) << "POST recv: addr=0x" << std::hex << sge.addr + // << std::dec << " length=0x" << sge.length + // << " lkey=0x" << sge.lkey; + //LOG(INFO) << block << " " << _device_allocator->get_lkey(); + ibv_recv_wr* bad = NULL; + int err = ibv_post_recv(_resource->qp, &wr, &bad); + if (err != 0) { + LOG(WARNING) << "Fail to ibv_post_recv: " << berror(err); + return -1; + } + return 0; +} + int RdmaEndpoint::PostRecv(uint32_t num, bool zerocopy) { // We do the post repeatedly from the _rbuf[_rq_received]. while (num > 0) { + uint32_t lkey = 0; if (zerocopy) { _rbuf[_rq_received].clear(); + +#if BRPC_WITH_GDR + butil::gdr::BlockPoolAllocator* device_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_gpu_allocator(); + void* device_ptr = device_allocator->AllocateRaw(g_rdma_recv_block_size); + auto deleter = [device_allocator](void* data) { device_allocator->DeallocateRaw(data); }; + lkey = device_allocator->get_lkey(device_ptr); + uint64_t data_meta = (static_cast(butil::IOBuf::GPU_MEMORY) << 32) | lkey; + _rbuf[_rq_received].append_user_data_with_meta(device_ptr, g_rdma_recv_block_size, deleter , data_meta); + _rbuf_data[_rq_received] = device_ptr; +#else butil::IOBufAsZeroCopyOutputStream os(&_rbuf[_rq_received], g_rdma_recv_block_size + IOBUF_BLOCK_HEADER_LEN); int size = 0; @@ -1032,11 +1070,20 @@ int RdmaEndpoint::PostRecv(uint32_t num, bool zerocopy) { } else { CHECK(static_cast(size) == g_rdma_recv_block_size) << size; } +#endif // if BRPC_WITH_GDR } +#if BRPC_WITH_GDR + if (DoPostRecvGDR(_rbuf_data[_rq_received], g_rdma_recv_block_size, lkey) < 0) { + _rbuf[_rq_received].clear(); + return -1; + } +#else if (DoPostRecv(_rbuf_data[_rq_received], g_rdma_recv_block_size) < 0) { _rbuf[_rq_received].clear(); return -1; } +#endif // if BRPC_WITH_GDR + --num; ++_rq_received; if (_rq_received == _rq_size) { @@ -1504,6 +1551,10 @@ void RdmaEndpoint::DebugInfo(std::ostream& os) const { } int RdmaEndpoint::GlobalInitialize() { +#if BRPC_WITH_GDR + LOG(INFO) << ", gdr_block_size_kb: " << butil::gdr::gdr_block_size_kb; + g_rdma_recv_block_size = butil::gdr::gdr_block_size_kb * 1024 - IOBUF_BLOCK_HEADER_LEN; +#else if (FLAGS_rdma_recv_block_type == "default") { g_rdma_recv_block_size = GetBlockSize(0) - IOBUF_BLOCK_HEADER_LEN; } else if (FLAGS_rdma_recv_block_type == "large") { @@ -1514,6 +1565,15 @@ int RdmaEndpoint::GlobalInitialize() { errno = EINVAL; return -1; } +#endif // BRPC_WITH_GDR + + LOG(INFO) << "rdma_use_polling :" << FLAGS_rdma_use_polling + << ", rdma_poller_num : " << FLAGS_rdma_poller_num + << ", rdma_poller_yield : " << FLAGS_rdma_poller_yield + << ", rdma_sq_size: " << FLAGS_rdma_sq_size + << ", rdma_rq_size: " << FLAGS_rdma_rq_size + << ", rdma_zerocopy_min_size: " << FLAGS_rdma_zerocopy_min_size + << ", g_rdma_recv_block_size: " << g_rdma_recv_block_size; g_rdma_resource_mutex = new butil::Mutex; for (int i = 0; i < FLAGS_rdma_prepared_qp_cnt; ++i) { diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index de7cd5f6d8..4705e362ea 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -31,7 +31,6 @@ #include "butil/containers/mpsc_queue.h" #include "brpc/socket.h" - namespace brpc { class Socket; namespace rdma { @@ -173,6 +172,8 @@ friend class brpc::Socket; // -1: failed, errno set int DoPostRecv(void* block, size_t block_size); + + int DoPostRecvGDR(void* block, size_t block_size, uint32_t lkey); // Read at most len bytes from fd in _socket to data // wait for _read_butex if encounter EAGAIN // return -1 if encounter other errno (including EOF) diff --git a/src/brpc/rdma/rdma_helper.cpp b/src/brpc/rdma/rdma_helper.cpp index 9bad33750c..3b45b2621c 100644 --- a/src/brpc/rdma/rdma_helper.cpp +++ b/src/brpc/rdma/rdma_helper.cpp @@ -25,6 +25,7 @@ #include "butil/containers/flat_map.h" // butil::FlatMap #include "butil/fd_guard.h" #include "butil/fd_utility.h" // butil::make_non_blocking +#include "butil/gpu/gpu_block_pool.h" #include "butil/logging.h" #include "brpc/socket.h" #include "brpc/rdma/block_pool.h" @@ -84,6 +85,8 @@ static uint16_t g_lid; static int g_max_sge = 0; static uint8_t g_port_num = 1; +static int g_gpu_index = 0; + static int g_comp_vector_index = 0; butil::atomic g_rdma_available(false); @@ -93,7 +96,7 @@ DEFINE_string(rdma_device, "", "The name of the HCA device used " "(Empty means using the first active device)"); DEFINE_int32(rdma_port, 1, "The port number to use. For RoCE, it is always 1."); DEFINE_int32(rdma_gid_index, -1, "The GID index to use. -1 means using the last one."); - +DEFINE_int32(gpu_index, 0, "The GPU device index to use. In GDR, we suggest to use the GPU that is connected to the same PCIe switch with rdma devices"); // static const size_t SYSFS_SIZE = 4096; static ibv_device** g_devices = NULL; static ibv_context* g_context = NULL; @@ -477,6 +480,7 @@ static void GlobalRdmaInitializeOrDieImpl() { ExitWithError(); } + g_gpu_index = FLAGS_gpu_index; // Find the first active port g_port_num = FLAGS_rdma_port; int available_devices; @@ -552,6 +556,13 @@ static void GlobalRdmaInitializeOrDieImpl() { ExitWithError(); } +#if BRPC_WITH_GDR + if (!butil::gdr::InitGPUBlockPool(g_gpu_index, GetRdmaPd())) { + PLOG(ERROR) << "Fail to initialize RDMA GPU memory pool"; + ExitWithError(); + } +#endif // if BRPC_WITH_GDR + if (RdmaEndpoint::GlobalInitialize() < 0) { LOG(ERROR) << "rdma_recv_block_type incorrect " << "(valid value: default/large/huge)"; @@ -679,6 +690,11 @@ uint8_t GetRdmaPortNum() { return g_port_num; } +int GetGPUIndex() { + return g_gpu_index; +} + + bool IsRdmaAvailable() { return g_rdma_available.load(butil::memory_order_acquire); } diff --git a/src/brpc/rdma/rdma_helper.h b/src/brpc/rdma/rdma_helper.h index 052763325b..25a93476e7 100644 --- a/src/brpc/rdma/rdma_helper.h +++ b/src/brpc/rdma/rdma_helper.h @@ -74,6 +74,9 @@ int GetRdmaCompVector(); // Return current port number used uint8_t GetRdmaPortNum(); +// Get GPU index used +int GetGPUIndex(); + // Get max_sge supported by the device int GetRdmaMaxSge(); diff --git a/src/butil/gpu/gpu_block_pool.cpp b/src/butil/gpu/gpu_block_pool.cpp new file mode 100644 index 0000000000..b768e408e8 --- /dev/null +++ b/src/butil/gpu/gpu_block_pool.cpp @@ -0,0 +1,450 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#if BRPC_WITH_GDR + +#include +#include +#include "butil/fast_rand.h" +#include "gpu_block_pool.h" +namespace butil { +namespace gdr { + +#define CHECK_CUDA(call) \ +do { \ + auto _sts = (call); \ + if (_sts != cudaSuccess) { \ + LOG(FATAL) << " cuda error:" \ + << (cudaGetErrorString(_sts)) << std::string(" at ") \ + << __FILE__ << ": " << __LINE__; \ + } \ +} while (0); + +bool verify_same_context() { + static int original_device = -1; + static bool first_call = true; + + int current_device; + cudaGetDevice(¤t_device); + + if (first_call) { + original_device = current_device; + first_call = false; + return true; + } + + return (current_device == original_device); +} + +void* get_gpu_mem(int gpu_id, int64_t gpu_mem_size) { + CHECK_CUDA(cudaSetDevice(gpu_id)); + void *d_data; + + LOG(INFO) << "try to alloc " << gpu_mem_size << " bytes from gpu " << gpu_id; + + CHECK_CUDA(cudaMalloc(&d_data, gpu_mem_size)); + cudaDeviceSynchronize(); + return (void *)d_data; +} + +void* get_cpu_mem(int gpu_id, int64_t cpu_mem_size) { + CHECK_CUDA(cudaSetDevice(gpu_id)); + + LOG(INFO) << "try to alloc " << cpu_mem_size << " bytes from gpu " << gpu_id << "on host"; + + void* mem = NULL; + + CHECK_CUDA(cudaMallocHost(&mem, cpu_mem_size)); + + cudaDeviceSynchronize(); + + return mem; +} + + +BlockPoolAllocators* BlockPoolAllocators::instance_ = nullptr; + +BlockPoolAllocators* BlockPoolAllocators::singleton() { + static std::mutex mutex; + if (instance_ == nullptr) { + std::lock_guard l(mutex); + if (instance_ == nullptr) { + instance_ = new BlockPoolAllocators(); + std::atomic_thread_fence(std::memory_order_release); + } + } + std::atomic_thread_fence(std::memory_order_acquire); + return instance_; +} + +bool InitGPUBlockPool(int gpu_id, ibv_pd* pd) { + BlockPoolAllocators::singleton()->init(gpu_id, pd); + return true; +} + +class BlockHeaderList { + public: + BlockHeaderList() { + objects_.reserve(kMaxObjects); + } + virtual ~BlockHeaderList() { + for (size_t i = 0; i < objects_.size(); i++) { + delete objects_[i]; + } + } + + BlockHeader* New() { + { + std::lock_guard lock(mu_); + if (!objects_.empty()) { + BlockHeader* result = objects_.back(); + objects_.pop_back(); + return result; + } + } + return new BlockHeader; + } + void Release(BlockHeader* obj) { + obj->Reset(); + { + std::lock_guard lock(mu_); + if (objects_.size() < kMaxObjects) { + objects_.push_back(obj); + return; + } + } + delete obj; + } + + private: + static const int kMaxObjects = 100000; + + std::mutex mu_; + std::vector objects_; +}; + +static BlockHeaderList* get_bh_list() { + static BlockHeaderList* bh_list = new BlockHeaderList(); + return bh_list; +} + + +BlockPoolAllocator::BlockPoolAllocator(int gpuId, bool onGpu, ibv_pd* brpc_pd, + size_t blockSize, size_t regionSize) : + gpu_id(gpuId) + , on_gpu(onGpu) + , pd(brpc_pd) + , BLOCK_SIZE(std::max(blockSize, sizeof(BlockHeader))) + , REGION_SIZE((regionSize / blockSize) * blockSize) // 对齐到块大小的倍数 + , freeList(nullptr) + , g_region_num(0) + , totalAllocated(0) + , totalDeallocated(0) + , peakUsage(0) { + LOG(INFO) << "Memory Pool initialized: block_size=" << BLOCK_SIZE + << ", region_size=" << REGION_SIZE + << ", gpu_id=" << gpu_id << ", on_gpu=" << on_gpu << ", pd=" << pd; + + extendRegion(); +} + +BlockPoolAllocator::~BlockPoolAllocator() { +#ifdef DEBUG + printStatistics(); +#endif + + for (int i = 0; i < max_regions; i++) { + Region* r = &g_regions[i]; + if (!r->mr) { + return; + } + + LOG(INFO) << "try to free " << r->size << " bytes from gpu " << gpu_id << ", on_gpu " << on_gpu; + ibv_dereg_mr(r->mr); + if (on_gpu) { + CHECK_CUDA(cudaFree(reinterpret_cast(r->start))); + } else { + CHECK_CUDA(cudaFreeHost(reinterpret_cast(r->start))); + } + } +} + +Region* BlockPoolAllocator::GetRegion(const void* buf) { + if (!buf) { + errno = EINVAL; + return NULL; + } + Region* r = NULL; + uintptr_t addr = (uintptr_t)buf; + for (int i = 0; i < max_regions; ++i) { + if (g_regions[i].aligned_start == 0) { + break; + } + if (addr >= g_regions[i].aligned_start && + addr < g_regions[i].aligned_start + g_regions[i].aligned_size) { + r = &g_regions[i]; + break; + } + } + return r; +} + +uint32_t BlockPoolAllocator::get_lkey(const void* buf) { + Region* r = GetRegion(buf); + if (!r) { + LOG(ERROR) << "can not get a region for buf " << buf; + return 0; + } + return r->lkey; +} + +void* BlockPoolAllocator::AllocateRaw(size_t num_bytes) { + if (num_bytes == 0) { + return nullptr; + } + if (num_bytes > BLOCK_SIZE) { + LOG(FATAL) << "try to alloc " << num_bytes << " bytes, its bigger than block_size " << BLOCK_SIZE; + } + + auto startTime = std::chrono::high_resolution_clock::now(); + + std::lock_guard lock(poolMutex); + + if (!freeList) { + extendRegion(); + } + + BlockHeader* block = freeList; + freeList = freeList->next; + + void* addr = block->addr; + get_bh_list()->Release(block); + + totalAllocated++; + peakUsage = std::max(peakUsage, totalAllocated - totalDeallocated); + + auto endTime = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(endTime - startTime); + +#ifdef DEBUG + if (duration.count() > 1000) { // 如果分配时间超过1微秒 + LOG(INFO) << "Slow allocation: " << duration.count() << " ns"; + } +#endif + + return addr; +} + +void BlockPoolAllocator::DeallocateRaw(void* ptr) { + if (!ptr) return; + + std::lock_guard lock(poolMutex); + + BlockHeader* block = get_bh_list()->New(); + block->addr = ptr; + block->next = freeList; + freeList = block; + + totalDeallocated++; +} + +// 获取统计信息 +void BlockPoolAllocator::printStatistics() const { + LOG(INFO) << "=== Memory Pool Statistics ==="; + LOG(INFO) << "Total regions: " << g_region_num + << ", Total blocks allocated: " << totalAllocated + << ", Total blocks deallocated: " << totalDeallocated + << ", Current usage: " << (totalAllocated - totalDeallocated) << " blocks" + << ", Peak usage: " << peakUsage << " blocks" + << ", Memory efficiency: " + << (static_cast(totalAllocated - totalDeallocated) / + (g_region_num * (REGION_SIZE / BLOCK_SIZE)) * 100) + << "%"; +} + +void BlockPoolAllocator::extendRegion() { + if (g_region_num == max_regions) { + LOG(FATAL) << "Gdr Memory pool reaches max regions"; + return ; + } + + auto startTime = std::chrono::high_resolution_clock::now(); + void* ptr = nullptr; + void* aligned_ptr = nullptr; + int alignment = 4096; + + if (on_gpu) { + ptr = get_gpu_mem(gpu_id, REGION_SIZE); + } else { + ptr = get_cpu_mem(gpu_id, REGION_SIZE); + } + + aligned_ptr = (void*)(((uintptr_t)ptr + alignment - 1) & ~(alignment - 1)); + + int64_t aligned_bytes = REGION_SIZE; + if (ptr != aligned_ptr) { + uintptr_t region_end = uintptr_t(ptr) + REGION_SIZE; + uintptr_t aligned_end_ptr = region_end & ~(alignment - 1); + aligned_bytes = uintptr_t(aligned_end_ptr) - uintptr_t(aligned_ptr); + LOG(WARNING) << "addr is not aligned with 4096: " << ptr << ", aligned_bytes: " << aligned_bytes + << ", region_size: " << REGION_SIZE; + } + + LOG(INFO) << "reg_mr for ptr: " << aligned_ptr << ", size:" << aligned_bytes; + auto mr = ibv_reg_mr(pd, aligned_ptr, aligned_bytes, + IBV_ACCESS_LOCAL_WRITE | + IBV_ACCESS_REMOTE_READ | + IBV_ACCESS_REMOTE_WRITE); + //IBV_ACCESS_RELAXED_ORDERING); + + if (!mr) { + LOG(FATAL) << "Failed to register MR: " << strerror(errno) + << ", pd " << pd << ", aligned_ptr:" << aligned_ptr; + } else { + LOG(INFO) << "Success to register MR: " + << ", pd " << pd << ", aligned_ptr:" << aligned_ptr; + } + + LOG(INFO) << "try to init region, g_region_num:" << g_region_num; + size_t blockCount = aligned_bytes / BLOCK_SIZE; + Region* region = &g_regions[g_region_num++]; + region->start = (uintptr_t)ptr; + region->aligned_start = (uintptr_t)aligned_ptr; + region->mr = mr; + region->size = REGION_SIZE; + region->aligned_size = aligned_bytes; + region->lkey = mr->lkey; + region->blockCount = blockCount; + + + LOG(INFO) << "try to insert list, freeList:" << freeList << ", blockCount:" << blockCount; + BlockHeader* lastBlock = nullptr; + for (size_t i = 0; i < blockCount; ++i) { + BlockHeader* block = get_bh_list()->New(); + block->addr = reinterpret_cast(static_cast(aligned_ptr) + i * BLOCK_SIZE); + if (lastBlock != nullptr) { + lastBlock->next = block; + } else { + freeList = block; + } + lastBlock = block; + } + + if (lastBlock) { + lastBlock->next = nullptr; + } + + auto endTime = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(endTime - startTime); + + LOG(INFO) << "Extended region #" << g_region_num << ": " << blockCount + << " blocks (" << (REGION_SIZE / (1024 * 1024)) << " MB)" << ", on_gpu " << on_gpu + << ", cost " << duration.count() << " ns"; +} + +GPUStreamPool::GPUStreamPool(int gpu_id) : + gpu_id_(gpu_id) { + CHECK_CUDA(cudaSetDevice(gpu_id)); + d2d_streams_.resize(kMaxConcurrent); + d2h_streams_.resize(kMaxConcurrent); + for (int i = 0; i < kMaxConcurrent; i++) { + CHECK_CUDA(cudaStreamCreate(&d2d_streams_[i])); + CHECK_CUDA(cudaStreamCreate(&d2h_streams_[i])); + } + CHECK_CUDA(cudaDeviceSynchronize()); +} + +GPUStreamPool::~GPUStreamPool() { + CHECK_CUDA(cudaDeviceSynchronize()); + for (int i = 0; i < kMaxConcurrent; i++) { + CHECK_CUDA(cudaStreamDestroy(d2d_streams_[i])); + CHECK_CUDA(cudaStreamDestroy(d2h_streams_[i])); + } +} + +void GPUStreamPool::fast_d2d(std::vector& src_list, + std::vector& length_list, + void* dst) { +#ifdef DEBUG + if (!verify_same_context()) { + LOG(FATAL) << "Context mismatch!"; + return; + } +#endif + int64_t offset = 0; + int segs = src_list.size(); + if (segs == 0) return; + if (segs != length_list.size()) { + LOG(FATAL) << "src list size is not equal with length list size!!!"; + } + + int stream_idx = 0; + { + std::lock_guard stream_lb_lock(d2d_lb_lock_); + d2d_cnt_.fetch_add(1); + stream_idx = d2d_cnt_ % kMaxConcurrent; + } + std::lock_guard stream_lock(d2d_locks_[stream_idx]); + CHECK_CUDA(cudaStreamSynchronize(d2d_streams_[stream_idx])); + for (int i = 0; i < segs; i++) { + if (length_list[i] == 0) { + continue; + } + CHECK_CUDA(cudaMemcpyAsync(static_cast(dst) + offset, src_list[i], length_list[i], + cudaMemcpyDeviceToDevice, d2d_streams_[stream_idx])); + offset += length_list[i]; + } + CHECK_CUDA(cudaStreamSynchronize(d2d_streams_[stream_idx])); +} + +void GPUStreamPool::fast_d2h(std::vector& src_list, + std::vector& length_list, + void* dst) { + if (!verify_same_context()) { + LOG(FATAL) << "Context mismatch!"; + return; + } + int64_t offset = 0; + int segs = src_list.size(); + if (segs == 0) return; + if (segs != length_list.size()) { + LOG(FATAL) << "src list size is not equal with length list size!!!"; + } + + int stream_idx = 0; + { + std::lock_guard stream_lb_lock(d2h_lb_lock_); + d2h_cnt_.fetch_add(1); + stream_idx = d2h_cnt_ % kMaxConcurrent; + } + std::lock_guard stream_lock(d2h_locks_[stream_idx]); + CHECK_CUDA(cudaStreamSynchronize(d2h_streams_[stream_idx])); + for (int i = 0; i < segs; i++) { + if (length_list[i] == 0) { + continue; + } + CHECK_CUDA(cudaMemcpyAsync(static_cast(dst) + offset, src_list[i], length_list[i], + cudaMemcpyDeviceToHost, d2h_streams_[stream_idx])); + offset += length_list[i]; + } + CHECK_CUDA(cudaStreamSynchronize(d2h_streams_[stream_idx])); +} + +} +} + +#endif // BRPC_WITH_GDR diff --git a/src/butil/gpu/gpu_block_pool.h b/src/butil/gpu/gpu_block_pool.h new file mode 100644 index 0000000000..6106952c76 --- /dev/null +++ b/src/butil/gpu/gpu_block_pool.h @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef BUTIL_GPU_GPU_BLOCK_POOL_H +#define BUTIL_GPU_GPU_BLOCK_POOL_H + +#if BRPC_WITH_GDR + +#include +#include +#include +#include +#include +#include +#include +#include +#include "butil/containers/hash_tables.h" +#include "butil/logging.h" +#include +#include "cuda.h" + +// #include "gdrapi.h" +namespace butil { +namespace gdr { + +static int gdr_block_size_kb = [](){ + int ret = 64; + const char* env_var_val = getenv("GDR_BLOCK_SIZE_KB"); + if (env_var_val == nullptr) { + return ret; + } + ret = std::stoi(env_var_val); + + return ret; +}(); + +void* get_gpu_mem(int gpu_id, int64_t gpu_mem_size); +void* get_cpu_mem(int gpu_id, int64_t cpu_mem_size); + +bool InitGPUBlockPool(int gpu_id, ibv_pd* pd); + +struct Region { + Region() { start = 0; aligned_start = 0;} + uintptr_t start; + uintptr_t aligned_start; + + size_t size; + size_t aligned_size; + size_t blockCount; + struct ibv_mr *mr {nullptr}; + uint32_t lkey; +}; + +struct BlockHeader { + BlockHeader() { addr = nullptr; next = nullptr;} + void Reset() { addr = nullptr; next = nullptr; } + void* addr; + BlockHeader* next; +}; + +class BlockPoolAllocator { + private: + int gpu_id; + bool on_gpu; + ibv_pd* pd {nullptr}; + + const size_t BLOCK_SIZE; + const size_t REGION_SIZE; + + BlockHeader* freeList; + static constexpr size_t max_regions = 16; + int g_region_num {0}; + Region g_regions[max_regions]; + std::mutex poolMutex; + + // 统计信息 + size_t totalAllocated; + size_t totalDeallocated; + size_t peakUsage; + + public: + explicit BlockPoolAllocator(int gpu_id, + bool on_gpu, ibv_pd* pd, + size_t blockSize, size_t regionSize); + + ~BlockPoolAllocator(); + + void* AllocateRaw(size_t num_bytes); + + void DeallocateRaw(void* ptr); + + // 获取统计信息 + void printStatistics() const; + + int64_t getCurrentUsage() const { + return totalAllocated - totalDeallocated; + } + + int64_t getTotalMemory() const { + return g_region_num * REGION_SIZE; + } + + int64_t get_block_size() const { + return BLOCK_SIZE; + } + + uint32_t get_lkey(const void* buf); + + private: + Region* GetRegion(const void* buf); + void extendRegion(); +}; + +class GPUStreamPool { +public: + explicit GPUStreamPool(int gpu_id); + + ~GPUStreamPool(); + + GPUStreamPool(const GPUStreamPool&) = delete; + GPUStreamPool& operator=(const GPUStreamPool&) = delete; + + void fast_d2h(std::vector& src_list, std::vector& length_list, void* dst); + + void fast_d2d(std::vector& src_list, std::vector& length_list, void* dst); + + static constexpr int kMaxConcurrent = 32; +private: + int gpu_id_ {-1}; + std::atomic d2h_cnt_ {0}; + std::atomic d2d_cnt_ {0}; + std::mutex d2h_locks_[kMaxConcurrent]; + std::mutex d2d_locks_[kMaxConcurrent]; + std::mutex d2h_lb_lock_; + std::mutex d2d_lb_lock_; + std::vector d2h_streams_; + std::vector d2d_streams_; +}; + +class BlockPoolAllocators { +public: + static BlockPoolAllocators* singleton(); + BlockPoolAllocators() {} + virtual ~BlockPoolAllocators() { + CHECK_EQ(this, instance_); + instance_ = nullptr; + } + + void init(int gpu_id, ibv_pd* pd) { + LOG(INFO) << "set GPU BlockPoolAllocator for " << gpu_id; + size_t region_size = 512LL * 1024 * 1024; + size_t block_size = gdr_block_size_kb * 1024; + gpu_mem_alloc = new BlockPoolAllocator(gpu_id, true, pd, block_size, region_size); + + region_size = 32LL * 1024 * 1024; + block_size = 512; + cpu_mem_alloc = new BlockPoolAllocator(gpu_id, false, pd, block_size, region_size); + + gpu_stream_pool = new GPUStreamPool(gpu_id); + } + + BlockPoolAllocator* get_gpu_allocator() { + return gpu_mem_alloc; + } + + BlockPoolAllocator* get_cpu_allocator() { + return cpu_mem_alloc; + } + + GPUStreamPool* get_gpu_stream_pool() { + return gpu_stream_pool; + } + +public: + static BlockPoolAllocators* instance_; + +private: + BlockPoolAllocator* gpu_mem_alloc {nullptr}; + BlockPoolAllocator* cpu_mem_alloc {nullptr}; + GPUStreamPool* gpu_stream_pool {nullptr}; +}; +} +} + +#endif // BRPC_WITH_GDR + +#endif diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp index 26046e3745..ce3c0cc0bb 100644 --- a/src/butil/iobuf.cpp +++ b/src/butil/iobuf.cpp @@ -40,6 +40,7 @@ #include "butil/fd_guard.h" // butil::fd_guard #include "butil/iobuf.h" #include "butil/iobuf_profiler.h" +#include "butil/gpu/gpu_block_pool.h" namespace butil { namespace iobuf { @@ -722,6 +723,46 @@ size_t IOBuf::cutn(IOBuf* out, size_t n) { return saved_n; } +#if BRPC_WITH_GDR +size_t IOBuf::cutn_from_gpu(IOBuf* out, size_t n) { + if (n == 0) { + return 0; + } + + butil::gdr::BlockPoolAllocator* host_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_cpu_allocator(); + bool alloc_from_host_alloc = (n <= host_allocator->get_block_size()); + void* mem = NULL; + if (alloc_from_host_alloc) { + mem = host_allocator->AllocateRaw(n); + } else { + mem = malloc(n); + } + + if (mem == NULL) { + return 0; + } + size_t saved_n = copy_from_gpu(mem, n, 0, false); + if (saved_n > 0) { + if (alloc_from_host_alloc) { + auto deleter = [host_allocator](void* data) { host_allocator->DeallocateRaw(data); }; + out->append_user_data(mem, saved_n, deleter); + } else { + auto deleter = [](void* data) { free(data); }; + out->append_user_data(mem, saved_n, deleter); + } + pop_front(saved_n); + } else { + if (alloc_from_host_alloc) { + host_allocator->DeallocateRaw(mem); + } else { + free(mem); + } + } + + return saved_n; +} +#endif // BRPC_WITH_GDR + size_t IOBuf::cutn(void* out, size_t n) { const size_t len = length(); if (n > len) { @@ -1152,9 +1193,32 @@ uint64_t IOBuf::get_first_data_meta() { if (!(r.block->flags & IOBUF_BLOCK_FLAGS_USER_DATA)) { return 0; } - return r.block->u.data_meta; + return (r.block->u.data_meta & 0x00000000FFFFFFFF); +} + +// only when user use append_user_data_with_meta(), lkey is stored in data_meta +// We add this function for GDR, we want to know whether the data is in Host memory or GPU memory +// since lkey is uint32_t type, thus we use the high 32 bit to store +uint32_t IOBuf::get_first_data_meta_high32() { + if (_ref_num() == 0) { + return 0; + } + IOBuf::BlockRef const& r = _ref_at(0); + if (!(r.block->flags & IOBUF_BLOCK_FLAGS_USER_DATA)) { + return 0; + } + return (uint32_t)(r.block->u.data_meta >> 32); +} + +void* IOBuf::get_first_data_ptr() { + if (_ref_num() == 0) { + return 0; + } + IOBuf::BlockRef const& r = _ref_at(0); + return r.block->data; } + int IOBuf::resize(size_t n, char c) { const size_t saved_len = length(); if (n < saved_len) { @@ -1317,6 +1381,46 @@ size_t IOBuf::copy_to(void* d, size_t n, size_t pos) const { return n - m; } +#if BRPC_WITH_GDR +size_t IOBuf::copy_from_gpu(void* d, size_t n, size_t pos, bool to_gpu) const { + if (n == 0) { + return 0; + } + const size_t nref = _ref_num(); + // Skip `pos' bytes. `offset' is the starting position in starting BlockRef. + size_t offset = pos; + size_t i = 0; + for (; offset != 0 && i < nref; ++i) { + IOBuf::BlockRef const& r = _ref_at(i); + if (offset < (size_t)r.length) { + break; + } + offset -= r.length; + } + + butil::gdr::GPUStreamPool* gpu_stream_pool = butil::gdr::BlockPoolAllocators::singleton()->get_gpu_stream_pool(); + size_t m = n; + std::vector src_list; + std::vector length_list; + for (; m != 0 && i < nref; ++i) { + IOBuf::BlockRef const& r = _ref_at(i); + const size_t nc = std::min(m, (size_t)r.length - offset); + void* gpu_src = r.block->data + r.offset + offset; + src_list.push_back(gpu_src); + length_list.push_back(nc); + offset = 0; + m -= nc; + } + if (to_gpu) { + gpu_stream_pool->fast_d2d(src_list, length_list, d); + } else { + gpu_stream_pool->fast_d2h(src_list, length_list, d); + } + // If nref == 0, here returns 0 correctly + return n - m; +} +#endif // BRPC_WITH_GDR + size_t IOBuf::copy_to(std::string* s, size_t n, size_t pos) const { const size_t len = length(); if (len <= pos) { @@ -2102,4 +2206,4 @@ bool IOBufBytesIterator::forward_one_block(const void** data, size_t* size) { void* fast_memcpy(void *__restrict dest, const void *__restrict src, size_t n) { return butil::iobuf::cp(dest, src, n); -} // namespace butil \ No newline at end of file +} // namespace butil diff --git a/src/butil/iobuf.h b/src/butil/iobuf.h index 239e82d950..14077f0c29 100644 --- a/src/butil/iobuf.h +++ b/src/butil/iobuf.h @@ -70,6 +70,11 @@ friend class SingleIOBuf; static const size_t DEFAULT_BLOCK_SIZE = 8192; static const size_t INITIAL_CAP = 32; // must be power of 2 + enum MemoryMeta { + HOST_MEMORY = 0, + GPU_MEMORY = 1 + }; + struct Block; // can't directly use `struct iovec' here because we also need to access the @@ -141,6 +146,12 @@ friend class SingleIOBuf; size_t cutn(IOBuf* out, size_t n); size_t cutn(void* out, size_t n); size_t cutn(std::string* out, size_t n); + +#if BRPC_WITH_GDR + size_t cutn_from_gpu(IOBuf* out, size_t n); + size_t copy_from_gpu(void* d, size_t n, size_t pos = 0, bool to_gpu = false) const; +#endif // BRPC_WITH_GDR + // Cut off 1 byte from the front side and set to *c // Return true on cut, false otherwise. bool cut1(void* c); @@ -260,6 +271,12 @@ friend class SingleIOBuf; // 0 means the meta is invalid. uint64_t get_first_data_meta(); + // Get the high 32 bits of the data meta of the first byte in this IOBuf. + // The meta is specified with append_user_data_with_meta before. + // we use 0 to specify host memory, 1 to specify GPU memory + uint32_t get_first_data_meta_high32(); + void* get_first_data_ptr(); + // Resizes the buf to a length of n characters. // If n is smaller than the current length, all bytes after n will be // truncated. @@ -775,4 +792,4 @@ inline void swap(butil::IOBuf& a, butil::IOBuf& b) { #include "butil/iobuf_inl.h" -#endif // BUTIL_IOBUF_H \ No newline at end of file +#endif // BUTIL_IOBUF_H