Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/corosio/src/detail/kqueue/acceptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ operator()()
{
stop_cb.reset();

static_cast<kqueue_acceptor_impl*>(acceptor_impl_)
->service().scheduler().reset_inline_budget();

bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));

if (ec_out)
Expand Down
34 changes: 2 additions & 32 deletions src/corosio/src/detail/kqueue/op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@
#include <boost/capy/error.hpp>
#include <system_error>

#include "src/detail/make_err.hpp"
#include "src/detail/dispatch_coro.hpp"
#include "src/detail/scheduler_op.hpp"
#include "src/detail/endpoint_convert.hpp"

#include <unistd.h>
#include <errno.h>
Expand Down Expand Up @@ -213,35 +210,8 @@ struct kqueue_op : scheduler_op
acceptor_impl_ = nullptr;
}

void operator()() override
{
stop_cb.reset();

if (ec_out)
{
if (cancelled.load(std::memory_order_acquire))
*ec_out = capy::error::canceled;
else if (errn != 0)
*ec_out = make_err(errn);
else if (is_read_operation() && bytes_transferred == 0)
*ec_out = capy::error::eof;
else
*ec_out = {};
}

if (bytes_out)
*bytes_out = bytes_transferred;

// Move to stack before resuming coroutine. The coroutine might close
// the socket, releasing the last wrapper ref. If impl_ptr were the
// last ref and we destroyed it while still in operator(), we'd have
// use-after-free. Moving to local ensures destruction happens at
// function exit, after all member accesses are complete.
capy::executor_ref saved_ex( std::move( ex ) );
std::coroutine_handle<> saved_h( std::move( h ) );
auto prevent_premature_destruction = std::move(impl_ptr);
dispatch_coro(saved_ex, saved_h).resume();
}
// Defined in sockets.cpp where kqueue_socket_impl is complete
void operator()() override;

virtual bool is_read_operation() const noexcept { return false; }
virtual void cancel() noexcept = 0;
Expand Down
25 changes: 25 additions & 0 deletions src/corosio/src/detail/kqueue/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,13 @@ struct scheduler_context
scheduler_context* next;
op_queue private_queue;
std::int64_t private_outstanding_work;
int inline_budget;

scheduler_context(kqueue_scheduler const* k, scheduler_context* n)
: key(k)
, next(n)
, private_outstanding_work(0)
, inline_budget(0)
{
}
};
Expand Down Expand Up @@ -151,6 +153,29 @@ drain_private_queue(

} // namespace

void
kqueue_scheduler::
reset_inline_budget() const noexcept
{
if (auto* ctx = find_context(this))
ctx->inline_budget = max_inline_budget_;
}

bool
kqueue_scheduler::
try_consume_inline_budget() const noexcept
{
if (auto* ctx = find_context(this))
{
if (ctx->inline_budget > 0)
{
--ctx->inline_budget;
return true;
}
}
return false;
}

void
descriptor_state::
operator()()
Expand Down
14 changes: 14 additions & 0 deletions src/corosio/src/detail/kqueue/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,19 @@ class kqueue_scheduler
*/
int kq_fd() const noexcept { return kq_fd_; }

/** Reset the thread's inline completion budget.

Called at the start of each posted completion handler to
grant a fresh budget for speculative inline completions.
*/
void reset_inline_budget() const noexcept;

/** Consume one unit of inline budget if available.

@return True if budget was available and consumed.
*/
bool try_consume_inline_budget() const noexcept;

/** Register a descriptor for persistent monitoring.

Adds EVFILT_READ and EVFILT_WRITE (both EV_CLEAR) for @a fd
Expand Down Expand Up @@ -266,6 +279,7 @@ class kqueue_scheduler
long timeout_us) const;

int kq_fd_;
int max_inline_budget_ = 2;
mutable std::mutex mutex_;
mutable std::condition_variable cond_;
mutable op_queue completed_ops_;
Expand Down
Loading
Loading