diff --git a/src/corosio/src/detail/kqueue/acceptors.cpp b/src/corosio/src/detail/kqueue/acceptors.cpp index 2516dae9..2a4b3697 100644 --- a/src/corosio/src/detail/kqueue/acceptors.cpp +++ b/src/corosio/src/detail/kqueue/acceptors.cpp @@ -86,6 +86,9 @@ operator()() { stop_cb.reset(); + static_cast(acceptor_impl_) + ->service().scheduler().reset_inline_budget(); + bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire)); if (ec_out) diff --git a/src/corosio/src/detail/kqueue/op.hpp b/src/corosio/src/detail/kqueue/op.hpp index 6d77d238..b9b34902 100644 --- a/src/corosio/src/detail/kqueue/op.hpp +++ b/src/corosio/src/detail/kqueue/op.hpp @@ -22,10 +22,7 @@ #include #include -#include "src/detail/make_err.hpp" -#include "src/detail/dispatch_coro.hpp" #include "src/detail/scheduler_op.hpp" -#include "src/detail/endpoint_convert.hpp" #include #include @@ -213,35 +210,8 @@ struct kqueue_op : scheduler_op acceptor_impl_ = nullptr; } - void operator()() override - { - stop_cb.reset(); - - if (ec_out) - { - if (cancelled.load(std::memory_order_acquire)) - *ec_out = capy::error::canceled; - else if (errn != 0) - *ec_out = make_err(errn); - else if (is_read_operation() && bytes_transferred == 0) - *ec_out = capy::error::eof; - else - *ec_out = {}; - } - - if (bytes_out) - *bytes_out = bytes_transferred; - - // Move to stack before resuming coroutine. The coroutine might close - // the socket, releasing the last wrapper ref. If impl_ptr were the - // last ref and we destroyed it while still in operator(), we'd have - // use-after-free. Moving to local ensures destruction happens at - // function exit, after all member accesses are complete. - capy::executor_ref saved_ex( std::move( ex ) ); - std::coroutine_handle<> saved_h( std::move( h ) ); - auto prevent_premature_destruction = std::move(impl_ptr); - dispatch_coro(saved_ex, saved_h).resume(); - } + // Defined in sockets.cpp where kqueue_socket_impl is complete + void operator()() override; virtual bool is_read_operation() const noexcept { return false; } virtual void cancel() noexcept = 0; diff --git a/src/corosio/src/detail/kqueue/scheduler.cpp b/src/corosio/src/detail/kqueue/scheduler.cpp index 915aee9d..61631a52 100644 --- a/src/corosio/src/detail/kqueue/scheduler.cpp +++ b/src/corosio/src/detail/kqueue/scheduler.cpp @@ -77,11 +77,13 @@ struct scheduler_context scheduler_context* next; op_queue private_queue; std::int64_t private_outstanding_work; + int inline_budget; scheduler_context(kqueue_scheduler const* k, scheduler_context* n) : key(k) , next(n) , private_outstanding_work(0) + , inline_budget(0) { } }; @@ -151,6 +153,29 @@ drain_private_queue( } // namespace +void +kqueue_scheduler:: +reset_inline_budget() const noexcept +{ + if (auto* ctx = find_context(this)) + ctx->inline_budget = max_inline_budget_; +} + +bool +kqueue_scheduler:: +try_consume_inline_budget() const noexcept +{ + if (auto* ctx = find_context(this)) + { + if (ctx->inline_budget > 0) + { + --ctx->inline_budget; + return true; + } + } + return false; +} + void descriptor_state:: operator()() diff --git a/src/corosio/src/detail/kqueue/scheduler.hpp b/src/corosio/src/detail/kqueue/scheduler.hpp index b2a7b29a..6f477709 100644 --- a/src/corosio/src/detail/kqueue/scheduler.hpp +++ b/src/corosio/src/detail/kqueue/scheduler.hpp @@ -117,6 +117,19 @@ class kqueue_scheduler */ int kq_fd() const noexcept { return kq_fd_; } + /** Reset the thread's inline completion budget. + + Called at the start of each posted completion handler to + grant a fresh budget for speculative inline completions. + */ + void reset_inline_budget() const noexcept; + + /** Consume one unit of inline budget if available. + + @return True if budget was available and consumed. + */ + bool try_consume_inline_budget() const noexcept; + /** Register a descriptor for persistent monitoring. Adds EVFILT_READ and EVFILT_WRITE (both EV_CLEAR) for @a fd @@ -266,6 +279,7 @@ class kqueue_scheduler long timeout_us) const; int kq_fd_; + int max_inline_budget_ = 2; mutable std::mutex mutex_; mutable std::condition_variable cond_; mutable op_queue completed_ops_; diff --git a/src/corosio/src/detail/kqueue/sockets.cpp b/src/corosio/src/detail/kqueue/sockets.cpp index 5f7a9536..8088eca0 100644 --- a/src/corosio/src/detail/kqueue/sockets.cpp +++ b/src/corosio/src/detail/kqueue/sockets.cpp @@ -31,32 +31,28 @@ slots (read_op, write_op, connect_op) and two ready flags (read_ready, write_ready) under a per-descriptor mutex. + Speculative I/O and the pump + ---------------------------- + read_some() and write_some() attempt the syscall (readv/writev) + speculatively before suspending the caller. If data is available the + result is returned via symmetric transfer — no scheduler queue, no + mutex, no reactor round-trip. An inline budget limits consecutive + inline completions to prevent starvation of other connections. + + When the speculative attempt returns EAGAIN, register_op() parks the + operation in its descriptor_state slot under the per-descriptor mutex. + If a cached ready flag fires before parking, register_op() retries + the I/O once under the mutex. This eliminates the cached_initiator + coroutine frame that previously trampolined into do_read_io() / + do_write_io() after the caller suspended. + Ready-flag protocol ------------------- When a kqueue event fires and no operation is pending for that direction, the reactor sets the corresponding ready flag instead of - dropping the event. When a new operation starts and finds the ready - flag set, it performs I/O immediately rather than parking in the - descriptor_state slot. This prevents lost wakeups under edge-triggered - notification. - - Edge-triggered retry - -------------------- - Because EV_CLEAR delivers each transition exactly once, a single - event may correspond to more data than one I/O call can consume. The - retry loops in connect(), do_read_io(), and do_write_io() repeat - perform_io() while EAGAIN/EWOULDBLOCK is returned and the ready flag - has been re-set. When the flag is clear the operation parks in its - descriptor_state slot and waits for the next kqueue event. - - Symmetric transfer and the cached_initiator - -------------------------------------------- - read_some() and write_some() return a coroutine_handle<> for symmetric - transfer so the caller is fully suspended before any I/O is attempted. - The cached_initiator manages a reusable coroutine frame that calls - do_read_io / do_write_io after the caller suspends. This avoids a - heap allocation per operation and guarantees the caller's state is - consistent if a cancellation races with completion. + dropping the event. When register_op() finds the ready flag set, it + performs I/O immediately rather than parking. This prevents lost + wakeups under edge-triggered notification. */ #include @@ -105,12 +101,48 @@ cancel() noexcept request_cancel(); } +void +kqueue_op:: +operator()() +{ + stop_cb.reset(); + + socket_impl_->desc_state_.scheduler_->reset_inline_budget(); + + if (ec_out) + { + if (cancelled.load(std::memory_order_acquire)) + *ec_out = capy::error::canceled; + else if (errn != 0) + *ec_out = make_err(errn); + else if (is_read_operation() && bytes_transferred == 0) + *ec_out = capy::error::eof; + else + *ec_out = {}; + } + + if (bytes_out) + *bytes_out = bytes_transferred; + + // Move to stack before resuming coroutine. The coroutine might close + // the socket, releasing the last wrapper ref. If impl_ptr were the + // last ref and we destroyed it while still in operator(), we'd have + // use-after-free. Moving to local ensures destruction happens at + // function exit, after all member accesses are complete. + capy::executor_ref saved_ex( std::move( ex ) ); + std::coroutine_handle<> saved_h( std::move( h ) ); + auto prevent_premature_destruction = std::move(impl_ptr); + dispatch_coro(saved_ex, saved_h).resume(); +} + void kqueue_connect_op:: operator()() { stop_cb.reset(); + socket_impl_->desc_state_.scheduler_->reset_inline_budget(); + bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire)); // Cache endpoints on successful connect @@ -173,301 +205,97 @@ connect( std::error_code* ec) { auto& op = conn_; - op.reset(); - op.h = h; - op.ex = ex; - op.ec_out = ec; - op.fd = fd_; - op.target_endpoint = ep; // Store target for endpoint caching - op.start(token, this); sockaddr_in addr = detail::to_sockaddr_in(ep); int result = ::connect(fd_, reinterpret_cast(&addr), sizeof(addr)); + // Cache endpoints on sync success if (result == 0) { - // Sync success - cache endpoints immediately sockaddr_in local_addr{}; socklen_t local_len = sizeof(local_addr); if (::getsockname(fd_, reinterpret_cast(&local_addr), &local_len) == 0) local_endpoint_ = detail::from_sockaddr_in(local_addr); remote_endpoint_ = ep; - - op.complete(0, 0); - op.impl_ptr = shared_from_this(); - svc_.post(&op); - return std::noop_coroutine(); } - if (errno == EINPROGRESS) + if (result == 0 || errno != EINPROGRESS) { - svc_.work_started(); - op.impl_ptr = shared_from_this(); - - bool perform_now = false; - { - std::lock_guard lock(desc_state_.mutex); - if (desc_state_.write_ready) - { - desc_state_.write_ready = false; - perform_now = true; - } - else - { - desc_state_.connect_op = &op; - if (desc_state_.connect_cancel_pending) - { - desc_state_.connect_cancel_pending = false; - op.cancelled.store(true, std::memory_order_relaxed); - } - } - } + int err = (result < 0) ? errno : 0; - if (perform_now) + if (svc_.scheduler().try_consume_inline_budget()) { - for (;;) - { - op.perform_io(); - if (op.errn != EAGAIN && op.errn != EWOULDBLOCK) - { - svc_.post(&op); - svc_.work_finished(); - break; - } - op.errn = 0; - std::lock_guard lock(desc_state_.mutex); - if (desc_state_.write_ready) - { - desc_state_.write_ready = false; - continue; - } - desc_state_.connect_op = &op; - if (desc_state_.connect_cancel_pending) - { - desc_state_.connect_cancel_pending = false; - op.cancelled.store(true, std::memory_order_relaxed); - } - break; - } - return std::noop_coroutine(); + *ec = err ? make_err(err) : std::error_code{}; + return dispatch_coro(ex, h); } - if (op.cancelled.load(std::memory_order_acquire)) - { - kqueue_op* claimed = nullptr; - { - std::lock_guard lock(desc_state_.mutex); - if (desc_state_.connect_op == &op) - claimed = std::exchange(desc_state_.connect_op, nullptr); - } - if (claimed) - { - svc_.post(claimed); - svc_.work_finished(); - } - } + // Budget exhausted — post through queue + op.reset(); + op.h = h; + op.ex = ex; + op.ec_out = ec; + op.fd = fd_; + op.target_endpoint = ep; + op.start(token, this); + op.impl_ptr = shared_from_this(); + op.complete(err, 0); + svc_.post(&op); return std::noop_coroutine(); } - op.complete(errno, 0); + // EINPROGRESS — async path + op.reset(); + op.h = h; + op.ex = ex; + op.ec_out = ec; + op.fd = fd_; + op.target_endpoint = ep; + op.start(token, this); op.impl_ptr = shared_from_this(); - svc_.post(&op); + + register_op(op, desc_state_.connect_op, desc_state_.write_ready, + desc_state_.connect_cancel_pending); return std::noop_coroutine(); } +// Register an op with the reactor, handling cached edge events. +// Called under the EAGAIN path when speculative I/O failed. void kqueue_socket_impl:: -do_read_io() +register_op( + kqueue_op& op, + kqueue_op*& desc_slot, + bool& ready_flag, + bool& cancel_flag) noexcept { - auto& op = rd_; - - ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count); - - if (n > 0) - { - { - std::lock_guard lock(desc_state_.mutex); - desc_state_.read_ready = false; - } - op.complete(0, static_cast(n)); - svc_.post(&op); - return; - } + svc_.work_started(); - if (n == 0) + std::lock_guard lock(desc_state_.mutex); + bool io_done = false; + if (ready_flag) { - { - std::lock_guard lock(desc_state_.mutex); - desc_state_.read_ready = false; - } - op.complete(0, 0); - svc_.post(&op); - return; + ready_flag = false; + op.perform_io(); + io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK); + if (!io_done) + op.errn = 0; } - if (errno == EAGAIN || errno == EWOULDBLOCK) + if (cancel_flag) { - svc_.work_started(); - - bool perform_now = false; - { - std::lock_guard lock(desc_state_.mutex); - if (desc_state_.read_ready) - { - desc_state_.read_ready = false; - perform_now = true; - } - else - { - desc_state_.read_op = &op; - if (desc_state_.read_cancel_pending) - { - desc_state_.read_cancel_pending = false; - op.cancelled.store(true, std::memory_order_relaxed); - } - } - } - - if (perform_now) - { - for (;;) - { - op.perform_io(); - if (op.errn != EAGAIN && op.errn != EWOULDBLOCK) - { - svc_.post(&op); - svc_.work_finished(); - return; - } - op.errn = 0; - std::lock_guard lock(desc_state_.mutex); - if (desc_state_.read_ready) - { - desc_state_.read_ready = false; - continue; - } - desc_state_.read_op = &op; - if (desc_state_.read_cancel_pending) - { - desc_state_.read_cancel_pending = false; - op.cancelled.store(true, std::memory_order_relaxed); - } - break; - } - } - - if (op.cancelled.load(std::memory_order_acquire)) - { - kqueue_op* claimed = nullptr; - { - std::lock_guard lock(desc_state_.mutex); - if (desc_state_.read_op == &op) - claimed = std::exchange(desc_state_.read_op, nullptr); - } - if (claimed) - { - svc_.post(claimed); - svc_.work_finished(); - } - } - return; + cancel_flag = false; + op.cancelled.store(true, std::memory_order_relaxed); } - op.complete(errno, 0); - svc_.post(&op); -} - -void -kqueue_socket_impl:: -do_write_io() -{ - auto& op = wr_; - - // SO_NOSIGPIPE is set on the socket at creation time, so writev() is safe. - // FreeBSD: Supports MSG_NOSIGNAL on sendmsg() - ssize_t n = ::writev(fd_, op.iovecs, op.iovec_count); - - if (n >= 0) + if (io_done || op.cancelled.load(std::memory_order_acquire)) { - { - std::lock_guard lock(desc_state_.mutex); - desc_state_.write_ready = false; - } - op.complete(0, static_cast(n)); svc_.post(&op); - return; + svc_.work_finished(); } - - if (errno == EAGAIN || errno == EWOULDBLOCK) + else { - svc_.work_started(); - - bool perform_now = false; - { - std::lock_guard lock(desc_state_.mutex); - if (desc_state_.write_ready) - { - desc_state_.write_ready = false; - perform_now = true; - } - else - { - desc_state_.write_op = &op; - if (desc_state_.write_cancel_pending) - { - desc_state_.write_cancel_pending = false; - op.cancelled.store(true, std::memory_order_relaxed); - } - } - } - - if (perform_now) - { - for (;;) - { - op.perform_io(); - if (op.errn != EAGAIN && op.errn != EWOULDBLOCK) - { - svc_.post(&op); - svc_.work_finished(); - return; - } - op.errn = 0; - std::lock_guard lock(desc_state_.mutex); - if (desc_state_.write_ready) - { - desc_state_.write_ready = false; - continue; - } - desc_state_.write_op = &op; - if (desc_state_.write_cancel_pending) - { - desc_state_.write_cancel_pending = false; - op.cancelled.store(true, std::memory_order_relaxed); - } - break; - } - } - - if (op.cancelled.load(std::memory_order_acquire)) - { - kqueue_op* claimed = nullptr; - { - std::lock_guard lock(desc_state_.mutex); - if (desc_state_.write_op == &op) - claimed = std::exchange(desc_state_.write_op, nullptr); - } - if (claimed) - { - svc_.post(claimed); - svc_.work_finished(); - } - } - return; + desc_slot = &op; } - - op.complete(errno, 0); - svc_.post(&op); } std::coroutine_handle<> @@ -482,21 +310,19 @@ read_some( { auto& op = rd_; op.reset(); - op.h = h; - op.ex = ex; - op.ec_out = ec; - op.bytes_out = bytes_out; - op.fd = fd_; - op.start(token, this); - op.impl_ptr = shared_from_this(); - // Must prepare buffers before initiator runs capy::mutable_buffer bufs[kqueue_read_op::max_buffers]; op.iovec_count = static_cast(param.copy_to(bufs, kqueue_read_op::max_buffers)); if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0)) { op.empty_buffer_read = true; + op.h = h; + op.ex = ex; + op.ec_out = ec; + op.bytes_out = bytes_out; + op.start(token, this); + op.impl_ptr = shared_from_this(); op.complete(0, 0); svc_.post(&op); return std::noop_coroutine(); @@ -508,8 +334,57 @@ read_some( op.iovecs[i].iov_len = bufs[i].size(); } - // Symmetric transfer ensures caller is suspended before I/O starts - return read_initiator_.start<&kqueue_socket_impl::do_read_io>(this); + // Speculative read: try I/O before suspending. On success, return via + // symmetric transfer without touching the scheduler queue — this creates + // a tight pump loop for back-to-back reads on a hot socket. + // Budget limits consecutive inline completions to prevent starvation + // of other connections competing for scheduler time. + ssize_t n; + do { + n = ::readv(fd_, op.iovecs, op.iovec_count); + } while (n < 0 && errno == EINTR); + + if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) + { + int err = (n < 0) ? errno : 0; + auto bytes = (n > 0) ? static_cast(n) : std::size_t(0); + + if (svc_.scheduler().try_consume_inline_budget()) + { + if (err) + *ec = make_err(err); + else if (n == 0) + *ec = capy::error::eof; + else + *ec = {}; + *bytes_out = bytes; + return dispatch_coro(ex, h); + } + + // Budget exhausted — fall through to queue + op.h = h; + op.ex = ex; + op.ec_out = ec; + op.bytes_out = bytes_out; + op.start(token, this); + op.impl_ptr = shared_from_this(); + op.complete(err, bytes); + svc_.post(&op); + return std::noop_coroutine(); + } + + // EAGAIN — register with reactor + op.h = h; + op.ex = ex; + op.ec_out = ec; + op.bytes_out = bytes_out; + op.fd = fd_; + op.start(token, this); + op.impl_ptr = shared_from_this(); + + register_op(op, desc_state_.read_op, desc_state_.read_ready, + desc_state_.read_cancel_pending); + return std::noop_coroutine(); } std::coroutine_handle<> @@ -524,20 +399,18 @@ write_some( { auto& op = wr_; op.reset(); - op.h = h; - op.ex = ex; - op.ec_out = ec; - op.bytes_out = bytes_out; - op.fd = fd_; - op.start(token, this); - op.impl_ptr = shared_from_this(); - // Must prepare buffers before initiator runs capy::mutable_buffer bufs[kqueue_write_op::max_buffers]; op.iovec_count = static_cast(param.copy_to(bufs, kqueue_write_op::max_buffers)); if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0)) { + op.h = h; + op.ex = ex; + op.ec_out = ec; + op.bytes_out = bytes_out; + op.start(token, this); + op.impl_ptr = shared_from_this(); op.complete(0, 0); svc_.post(&op); return std::noop_coroutine(); @@ -549,8 +422,51 @@ write_some( op.iovecs[i].iov_len = bufs[i].size(); } - // Symmetric transfer ensures caller is suspended before I/O starts - return write_initiator_.start<&kqueue_socket_impl::do_write_io>(this); + // Speculative write: try I/O before suspending. On success, return via + // symmetric transfer without touching the scheduler queue — this creates + // a tight pump loop for back-to-back writes on a hot socket. + // Budget limits consecutive inline completions to prevent starvation. + ssize_t n; + do { + n = ::writev(fd_, op.iovecs, op.iovec_count); + } while (n < 0 && errno == EINTR); + + if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) + { + int err = (n < 0) ? errno : 0; + auto bytes = (n > 0) ? static_cast(n) : std::size_t(0); + + if (svc_.scheduler().try_consume_inline_budget()) + { + *ec = err ? make_err(err) : std::error_code{}; + *bytes_out = bytes; + return dispatch_coro(ex, h); + } + + // Budget exhausted — fall through to queue + op.h = h; + op.ex = ex; + op.ec_out = ec; + op.bytes_out = bytes_out; + op.start(token, this); + op.impl_ptr = shared_from_this(); + op.complete(err, bytes); + svc_.post(&op); + return std::noop_coroutine(); + } + + // EAGAIN — register with reactor + op.h = h; + op.ex = ex; + op.ec_out = ec; + op.bytes_out = bytes_out; + op.fd = fd_; + op.start(token, this); + op.impl_ptr = shared_from_this(); + + register_op(op, desc_state_.write_op, desc_state_.write_ready, + desc_state_.write_cancel_pending); + return std::noop_coroutine(); } std::error_code diff --git a/src/corosio/src/detail/kqueue/sockets.hpp b/src/corosio/src/detail/kqueue/sockets.hpp index 9e01122b..a8db8b21 100644 --- a/src/corosio/src/detail/kqueue/sockets.hpp +++ b/src/corosio/src/detail/kqueue/sockets.hpp @@ -21,7 +21,6 @@ #include "src/detail/intrusive.hpp" #include "src/detail/socket_service.hpp" -#include "src/detail/cached_initiator.hpp" #include "src/detail/kqueue/op.hpp" #include "src/detail/kqueue/scheduler.hpp" @@ -35,13 +34,14 @@ ============================ Each I/O operation follows the same pattern: - 1. Try the syscall immediately (non-blocking socket) - 2. If it succeeds or fails with a real error, post to completion queue - 3. If EAGAIN/EWOULDBLOCK, register with kqueue and wait + 1. Try the syscall speculatively (readv/writev) before suspending + 2. On success, return via symmetric transfer (the "pump" fast path) + 3. On budget exhaustion, post to the scheduler queue for fairness + 4. On EAGAIN, register_op() parks the op in the descriptor_state - This "try first" approach avoids unnecessary kqueue round-trips for - operations that can complete immediately (common for small reads/writes - on fast local connections). + The speculative path avoids scheduler queue, mutex, and reactor + round-trips entirely. An inline budget limits consecutive inline + completions to prevent starvation of other connections. Cancellation ------------ @@ -155,11 +155,12 @@ class kqueue_socket_impl kqueue_read_op rd_; kqueue_write_op wr_; descriptor_state desc_state_; - cached_initiator read_initiator_; - cached_initiator write_initiator_; - void do_read_io(); - void do_write_io(); + void register_op( + kqueue_op& op, + kqueue_op*& desc_slot, + bool& ready_flag, + bool& cancel_flag) noexcept; private: kqueue_socket_service& svc_;