Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions perf/bench/corosio/accept_churn_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ bench::benchmark_result bench_sequential_churn(
if( rec )
co_return;

server.set_linger( true, 0 );
client.close();
server.close();

Expand Down Expand Up @@ -202,6 +203,7 @@ bench::benchmark_result bench_concurrent_churn(
if( rec )
co_return;

server.set_linger( true, 0 );
client.close();
server.close();

Expand Down Expand Up @@ -325,7 +327,10 @@ bench::benchmark_result bench_burst_churn(
for( auto& c : clients )
c.close();
for( auto& s : servers )
{
s.set_linger( true, 0 );
s.close();
}

burst_stats.add( sw.elapsed_us() );
}
Expand Down
2 changes: 1 addition & 1 deletion src/corosio/src/detail/dispatch_coro.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ dispatch_coro(
capy::executor_ref ex,
std::coroutine_handle<> h)
{
if ( ex.target< basic_io_context::executor_type >() )
if ( ex.target< basic_io_context::executor_type >() != nullptr )
return h;
return ex.dispatch(h);
}
Expand Down
6 changes: 6 additions & 0 deletions src/corosio/src/detail/intrusive.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ class intrusive_list
head_->prev_ = nullptr;
else
tail_ = nullptr;
// Defensive: clear stale linkage so remove() on a
// popped node cannot corrupt the list.
w->next_ = nullptr;
w->prev_ = nullptr;
return w;
}

Expand Down Expand Up @@ -217,6 +221,8 @@ class intrusive_queue
head_ = head_->next_;
if(!head_)
tail_ = nullptr;
// Defensive: clear stale linkage on popped node.
w->next_ = nullptr;
return w;
}
};
Expand Down
25 changes: 12 additions & 13 deletions src/corosio/src/detail/iocp/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ namespace boost::corosio::detail {

namespace {

// Max timeout for GQCS to allow periodic re-checking of conditions
// Max timeout for GQCS to allow periodic re-checking of conditions.
// Matches Asio's default_gqcs_timeout for pre-Vista compatibility.
constexpr unsigned long max_gqcs_timeout = 500;

struct scheduler_context
Expand Down Expand Up @@ -294,6 +295,7 @@ win_scheduler::
restart()
{
::InterlockedExchange(&stopped_, 0);
::InterlockedExchange(&stop_event_posted_, 0);
}

std::size_t
Expand All @@ -315,7 +317,6 @@ run()
break;
if (n != (std::numeric_limits<std::size_t>::max)())
++n;
// Check if we should exit after processing work
if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
{
stop();
Expand Down Expand Up @@ -398,11 +399,12 @@ post_deferred_completions(op_queue& ops)
iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(h)))
continue;

// Out of resources, put stuff back
// Out of resources, put the failed op and remaining ops back
ops.push(h);
std::lock_guard<win_mutex> lock(dispatch_mutex_);
completed_ops_.push(h);
completed_ops_.splice(ops);
::InterlockedExchange(&dispatch_required_, 1);
return;
}
}

Expand All @@ -415,20 +417,17 @@ do_one(unsigned long timeout_ms)
// Check if we need to process timers or deferred ops
if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
{
std::lock_guard<win_mutex> lock(dispatch_mutex_);
post_deferred_completions(completed_ops_);
op_queue local_ops;
{
std::lock_guard<win_mutex> lock(dispatch_mutex_);
local_ops.splice(completed_ops_);
}
post_deferred_completions(local_ops);

if (timer_svc_)
timer_svc_->process_expired();

update_timeout();

// After processing, check if all work is done
if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
{
stop();
return 0;
}
}

DWORD bytes = 0;
Expand Down
48 changes: 31 additions & 17 deletions src/corosio/src/detail/iocp/sockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,25 @@ accept_op::do_complete(
{
auto* op = static_cast<accept_op*>(base);

// Destroy path
// Destroy path (shutdown). Release resources owned by this
// op before destroying the coroutine frame, whose tcp_socket
// destructors will handle their own cleanup independently.
if (!owner)
{
if (op->accepted_socket != INVALID_SOCKET)
{
::closesocket(op->accepted_socket);
op->accepted_socket = INVALID_SOCKET;
}

if (op->peer_wrapper)
{
op->peer_wrapper->release();
op->peer_wrapper = nullptr;
}

op->cleanup_only();
op->acceptor_ptr.reset();
return;
}

Expand Down Expand Up @@ -605,6 +620,16 @@ win_sockets(
win_sockets::
~win_sockets()
{
// Delete wrappers that survived shutdown. This runs after
// win_scheduler is destroyed (reverse creation order), so
// all coroutine frames and their tcp_socket members are gone.
for (auto* w = socket_wrapper_list_.pop_front(); w != nullptr;
w = socket_wrapper_list_.pop_front())
delete w;

for (auto* w = acceptor_wrapper_list_.pop_front(); w != nullptr;
w = acceptor_wrapper_list_.pop_front())
delete w;
}

void
Expand All @@ -613,33 +638,22 @@ shutdown()
{
std::lock_guard<win_mutex> lock(mutex_);

// Just close sockets and remove from list
// The shared_ptrs held by socket objects and operations will handle destruction
// Close all sockets to force pending I/O to complete via IOCP.
// Wrappers are NOT deleted here - coroutine frames destroyed
// during scheduler shutdown may still hold tcp_socket objects
// that reference them. Wrapper deletion is deferred to ~win_sockets
// after the scheduler has drained all outstanding operations.
for (auto* impl = socket_list_.pop_front(); impl != nullptr;
impl = socket_list_.pop_front())
{
impl->close_socket();
// Note: impl may still be alive if operations hold shared_ptr
}

for (auto* impl = acceptor_list_.pop_front(); impl != nullptr;
impl = acceptor_list_.pop_front())
{
impl->close_socket();
}

// Cleanup wrappers
for (auto* w = socket_wrapper_list_.pop_front(); w != nullptr;
w = socket_wrapper_list_.pop_front())
{
delete w;
}

for (auto* w = acceptor_wrapper_list_.pop_front(); w != nullptr;
w = acceptor_wrapper_list_.pop_front())
{
delete w;
}
}

win_socket_impl&
Expand Down
8 changes: 6 additions & 2 deletions src/openssl/src/openssl_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,9 @@ struct openssl_stream::impl
break;

{
auto guard = co_await io_cm_.scoped_lock();
auto [lec, guard] = co_await io_cm_.scoped_lock();
if(lec)
co_return lec;
auto [ec, n] = co_await capy::write(s_,
capy::const_buffer(out_buf_.data(), got));
if(ec)
Expand All @@ -399,7 +401,9 @@ struct openssl_stream::impl
capy::task<std::error_code>
read_input()
{
auto guard = co_await io_cm_.scoped_lock();
auto [lec, guard] = co_await io_cm_.scoped_lock();
if(lec)
co_return lec;
auto [ec, n] = co_await s_.read_some(
capy::mutable_buffer(in_buf_.data(), in_buf_.size()));
if(ec)
Expand Down
78 changes: 66 additions & 12 deletions src/wolfssl/src/wolfssl_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,12 @@ struct wolfssl_stream::impl
read_in_buf_.data() + read_in_len_,
read_in_buf_.size() - read_in_len_);

auto guard = co_await io_cm_.scoped_lock();
auto [lec, guard] = co_await io_cm_.scoped_lock();
if(lec)
{
current_op_ = nullptr;
co_return {lec, total_read};
}
auto [rec, rn] = co_await s_.read_some(rbuf);
if(rec)
{
Expand All @@ -530,7 +535,12 @@ struct wolfssl_stream::impl
// Renegotiation
if(read_out_len_ > 0)
{
auto guard = co_await io_cm_.scoped_lock();
auto [lec, guard] = co_await io_cm_.scoped_lock();
if(lec)
{
current_op_ = nullptr;
co_return {lec, total_read};
}
auto [wec, wn] = co_await capy::write(s_,
capy::const_buffer(read_out_buf_.data(), read_out_len_));
read_out_len_ = 0;
Expand Down Expand Up @@ -600,7 +610,12 @@ struct wolfssl_stream::impl
// Flush any pending output
if(write_out_len_ > 0)
{
auto guard = co_await io_cm_.scoped_lock();
auto [lec, guard] = co_await io_cm_.scoped_lock();
if(lec)
{
current_op_ = nullptr;
co_return {lec, total_written};
}
auto [wec, wn] = co_await capy::write(s_,
capy::const_buffer(write_out_buf_.data(), write_out_len_));
write_out_len_ = 0;
Expand All @@ -622,7 +637,12 @@ struct wolfssl_stream::impl
{
if(write_out_len_ > 0)
{
auto guard = co_await io_cm_.scoped_lock();
auto [lec, guard] = co_await io_cm_.scoped_lock();
if(lec)
{
current_op_ = nullptr;
co_return {lec, total_written};
}
auto [wec, wn] = co_await capy::write(s_,
capy::const_buffer(write_out_buf_.data(), write_out_len_));
write_out_len_ = 0;
Expand All @@ -644,7 +664,12 @@ struct wolfssl_stream::impl
capy::mutable_buffer rbuf(
write_in_buf_.data() + write_in_len_,
write_in_buf_.size() - write_in_len_);
auto guard = co_await io_cm_.scoped_lock();
auto [lec, guard] = co_await io_cm_.scoped_lock();
if(lec)
{
current_op_ = nullptr;
co_return {lec, total_written};
}
auto [rec, rn] = co_await s_.read_some(rbuf);
if(rec)
{
Expand Down Expand Up @@ -707,7 +732,12 @@ struct wolfssl_stream::impl
// Flush any remaining output
if(read_out_len_ > 0)
{
auto guard = co_await io_cm_.scoped_lock();
auto [lec, guard] = co_await io_cm_.scoped_lock();
if(lec)
{
ec = lec;
break;
}
auto [wec, wn] = co_await capy::write(s_,
capy::const_buffer(read_out_buf_.data(), read_out_len_));
read_out_len_ = 0;
Expand All @@ -725,7 +755,12 @@ struct wolfssl_stream::impl
// Must flush (e.g. ClientHello) before reading ServerHello
if(read_out_len_ > 0)
{
auto guard = co_await io_cm_.scoped_lock();
auto [lec, guard] = co_await io_cm_.scoped_lock();
if(lec)
{
ec = lec;
break;
}
auto [wec, wn] = co_await capy::write(s_,
capy::const_buffer(read_out_buf_.data(), read_out_len_));
read_out_len_ = 0;
Expand All @@ -744,7 +779,12 @@ struct wolfssl_stream::impl
capy::mutable_buffer rbuf(
read_in_buf_.data() + read_in_len_,
read_in_buf_.size() - read_in_len_);
auto guard = co_await io_cm_.scoped_lock();
auto [lec, guard] = co_await io_cm_.scoped_lock();
if(lec)
{
ec = lec;
break;
}
auto [rec, rn] = co_await s_.read_some(rbuf);
if(rec)
{
Expand All @@ -757,7 +797,12 @@ struct wolfssl_stream::impl
{
if(read_out_len_ > 0)
{
auto guard = co_await io_cm_.scoped_lock();
auto [lec, guard] = co_await io_cm_.scoped_lock();
if(lec)
{
ec = lec;
break;
}
auto [wec, wn] = co_await capy::write(s_,
capy::const_buffer(read_out_buf_.data(), read_out_len_));
read_out_len_ = 0;
Expand Down Expand Up @@ -807,7 +852,12 @@ struct wolfssl_stream::impl
// Bidirectional shutdown complete - flush any remaining output
if(read_out_len_ > 0)
{
auto guard = co_await io_cm_.scoped_lock();
auto [lec, guard] = co_await io_cm_.scoped_lock();
if(lec)
{
ec = lec;
break;
}
auto [wec, wn] = co_await capy::write(s_,
capy::const_buffer(read_out_buf_.data(), read_out_len_));
read_out_len_ = 0;
Expand All @@ -821,7 +871,9 @@ struct wolfssl_stream::impl
// First, flush any pending output (sends our close_notify)
if(read_out_len_ > 0)
{
auto guard = co_await io_cm_.scoped_lock();
auto [lec, guard] = co_await io_cm_.scoped_lock();
if(lec)
break;
auto [wec, wn] = co_await capy::write(s_,
capy::const_buffer(read_out_buf_.data(), read_out_len_));
read_out_len_ = 0;
Expand All @@ -840,7 +892,9 @@ struct wolfssl_stream::impl
capy::mutable_buffer rbuf(
read_in_buf_.data() + read_in_len_,
read_in_buf_.size() - read_in_len_);
auto guard = co_await io_cm_.scoped_lock();
auto [lec, guard] = co_await io_cm_.scoped_lock();
if(lec)
break;
auto [rec, rn] = co_await s_.read_some(rbuf);
if(rec)
break; // EOF or socket error during shutdown read - acceptable
Expand Down
Loading