diff --git a/perf/bench/corosio/accept_churn_bench.cpp b/perf/bench/corosio/accept_churn_bench.cpp index 4e7bdc6f..ce000ccd 100644 --- a/perf/bench/corosio/accept_churn_bench.cpp +++ b/perf/bench/corosio/accept_churn_bench.cpp @@ -93,6 +93,7 @@ bench::benchmark_result bench_sequential_churn( if( rec ) co_return; + server.set_linger( true, 0 ); client.close(); server.close(); @@ -202,6 +203,7 @@ bench::benchmark_result bench_concurrent_churn( if( rec ) co_return; + server.set_linger( true, 0 ); client.close(); server.close(); @@ -325,7 +327,10 @@ bench::benchmark_result bench_burst_churn( for( auto& c : clients ) c.close(); for( auto& s : servers ) + { + s.set_linger( true, 0 ); s.close(); + } burst_stats.add( sw.elapsed_us() ); } diff --git a/src/corosio/src/detail/dispatch_coro.hpp b/src/corosio/src/detail/dispatch_coro.hpp index efa99bb8..b8bd5b34 100644 --- a/src/corosio/src/detail/dispatch_coro.hpp +++ b/src/corosio/src/detail/dispatch_coro.hpp @@ -38,7 +38,7 @@ dispatch_coro( capy::executor_ref ex, std::coroutine_handle<> h) { - if ( ex.target< basic_io_context::executor_type >() ) + if ( ex.target< basic_io_context::executor_type >() != nullptr ) return h; return ex.dispatch(h); } diff --git a/src/corosio/src/detail/intrusive.hpp b/src/corosio/src/detail/intrusive.hpp index 02c39bb5..b06ddde6 100644 --- a/src/corosio/src/detail/intrusive.hpp +++ b/src/corosio/src/detail/intrusive.hpp @@ -109,6 +109,10 @@ class intrusive_list head_->prev_ = nullptr; else tail_ = nullptr; + // Defensive: clear stale linkage so remove() on a + // popped node cannot corrupt the list. + w->next_ = nullptr; + w->prev_ = nullptr; return w; } @@ -217,6 +221,8 @@ class intrusive_queue head_ = head_->next_; if(!head_) tail_ = nullptr; + // Defensive: clear stale linkage on popped node. + w->next_ = nullptr; return w; } }; diff --git a/src/corosio/src/detail/iocp/scheduler.cpp b/src/corosio/src/detail/iocp/scheduler.cpp index 84c6d675..7ae9733a 100644 --- a/src/corosio/src/detail/iocp/scheduler.cpp +++ b/src/corosio/src/detail/iocp/scheduler.cpp @@ -44,7 +44,8 @@ namespace boost::corosio::detail { namespace { -// Max timeout for GQCS to allow periodic re-checking of conditions +// Max timeout for GQCS to allow periodic re-checking of conditions. +// Matches Asio's default_gqcs_timeout for pre-Vista compatibility. constexpr unsigned long max_gqcs_timeout = 500; struct scheduler_context @@ -294,6 +295,7 @@ win_scheduler:: restart() { ::InterlockedExchange(&stopped_, 0); + ::InterlockedExchange(&stop_event_posted_, 0); } std::size_t @@ -315,7 +317,6 @@ run() break; if (n != (std::numeric_limits::max)()) ++n; - // Check if we should exit after processing work if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) { stop(); @@ -398,11 +399,12 @@ post_deferred_completions(op_queue& ops) iocp_, 0, key_posted, reinterpret_cast(h))) continue; - // Out of resources, put stuff back + // Out of resources, put the failed op and remaining ops back + ops.push(h); std::lock_guard lock(dispatch_mutex_); - completed_ops_.push(h); completed_ops_.splice(ops); ::InterlockedExchange(&dispatch_required_, 1); + return; } } @@ -415,20 +417,17 @@ do_one(unsigned long timeout_ms) // Check if we need to process timers or deferred ops if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1) { - std::lock_guard lock(dispatch_mutex_); - post_deferred_completions(completed_ops_); + op_queue local_ops; + { + std::lock_guard lock(dispatch_mutex_); + local_ops.splice(completed_ops_); + } + post_deferred_completions(local_ops); if (timer_svc_) timer_svc_->process_expired(); update_timeout(); - - // After processing, check if all work is done - if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) - { - stop(); - return 0; - } } DWORD bytes = 0; diff --git a/src/corosio/src/detail/iocp/sockets.cpp b/src/corosio/src/detail/iocp/sockets.cpp index 4ade08b3..72f85482 100644 --- a/src/corosio/src/detail/iocp/sockets.cpp +++ b/src/corosio/src/detail/iocp/sockets.cpp @@ -119,10 +119,25 @@ accept_op::do_complete( { auto* op = static_cast(base); - // Destroy path + // Destroy path (shutdown). Release resources owned by this + // op before destroying the coroutine frame, whose tcp_socket + // destructors will handle their own cleanup independently. if (!owner) { + if (op->accepted_socket != INVALID_SOCKET) + { + ::closesocket(op->accepted_socket); + op->accepted_socket = INVALID_SOCKET; + } + + if (op->peer_wrapper) + { + op->peer_wrapper->release(); + op->peer_wrapper = nullptr; + } + op->cleanup_only(); + op->acceptor_ptr.reset(); return; } @@ -605,6 +620,16 @@ win_sockets( win_sockets:: ~win_sockets() { + // Delete wrappers that survived shutdown. This runs after + // win_scheduler is destroyed (reverse creation order), so + // all coroutine frames and their tcp_socket members are gone. + for (auto* w = socket_wrapper_list_.pop_front(); w != nullptr; + w = socket_wrapper_list_.pop_front()) + delete w; + + for (auto* w = acceptor_wrapper_list_.pop_front(); w != nullptr; + w = acceptor_wrapper_list_.pop_front()) + delete w; } void @@ -613,13 +638,15 @@ shutdown() { std::lock_guard lock(mutex_); - // Just close sockets and remove from list - // The shared_ptrs held by socket objects and operations will handle destruction + // Close all sockets to force pending I/O to complete via IOCP. + // Wrappers are NOT deleted here - coroutine frames destroyed + // during scheduler shutdown may still hold tcp_socket objects + // that reference them. Wrapper deletion is deferred to ~win_sockets + // after the scheduler has drained all outstanding operations. for (auto* impl = socket_list_.pop_front(); impl != nullptr; impl = socket_list_.pop_front()) { impl->close_socket(); - // Note: impl may still be alive if operations hold shared_ptr } for (auto* impl = acceptor_list_.pop_front(); impl != nullptr; @@ -627,19 +654,6 @@ shutdown() { impl->close_socket(); } - - // Cleanup wrappers - for (auto* w = socket_wrapper_list_.pop_front(); w != nullptr; - w = socket_wrapper_list_.pop_front()) - { - delete w; - } - - for (auto* w = acceptor_wrapper_list_.pop_front(); w != nullptr; - w = acceptor_wrapper_list_.pop_front()) - { - delete w; - } } win_socket_impl& diff --git a/src/openssl/src/openssl_stream.cpp b/src/openssl/src/openssl_stream.cpp index ccf6eadf..499099b7 100644 --- a/src/openssl/src/openssl_stream.cpp +++ b/src/openssl/src/openssl_stream.cpp @@ -386,7 +386,9 @@ struct openssl_stream::impl break; { - auto guard = co_await io_cm_.scoped_lock(); + auto [lec, guard] = co_await io_cm_.scoped_lock(); + if(lec) + co_return lec; auto [ec, n] = co_await capy::write(s_, capy::const_buffer(out_buf_.data(), got)); if(ec) @@ -399,7 +401,9 @@ struct openssl_stream::impl capy::task read_input() { - auto guard = co_await io_cm_.scoped_lock(); + auto [lec, guard] = co_await io_cm_.scoped_lock(); + if(lec) + co_return lec; auto [ec, n] = co_await s_.read_some( capy::mutable_buffer(in_buf_.data(), in_buf_.size())); if(ec) diff --git a/src/wolfssl/src/wolfssl_stream.cpp b/src/wolfssl/src/wolfssl_stream.cpp index 75329e87..a7efa523 100644 --- a/src/wolfssl/src/wolfssl_stream.cpp +++ b/src/wolfssl/src/wolfssl_stream.cpp @@ -504,7 +504,12 @@ struct wolfssl_stream::impl read_in_buf_.data() + read_in_len_, read_in_buf_.size() - read_in_len_); - auto guard = co_await io_cm_.scoped_lock(); + auto [lec, guard] = co_await io_cm_.scoped_lock(); + if(lec) + { + current_op_ = nullptr; + co_return {lec, total_read}; + } auto [rec, rn] = co_await s_.read_some(rbuf); if(rec) { @@ -530,7 +535,12 @@ struct wolfssl_stream::impl // Renegotiation if(read_out_len_ > 0) { - auto guard = co_await io_cm_.scoped_lock(); + auto [lec, guard] = co_await io_cm_.scoped_lock(); + if(lec) + { + current_op_ = nullptr; + co_return {lec, total_read}; + } auto [wec, wn] = co_await capy::write(s_, capy::const_buffer(read_out_buf_.data(), read_out_len_)); read_out_len_ = 0; @@ -600,7 +610,12 @@ struct wolfssl_stream::impl // Flush any pending output if(write_out_len_ > 0) { - auto guard = co_await io_cm_.scoped_lock(); + auto [lec, guard] = co_await io_cm_.scoped_lock(); + if(lec) + { + current_op_ = nullptr; + co_return {lec, total_written}; + } auto [wec, wn] = co_await capy::write(s_, capy::const_buffer(write_out_buf_.data(), write_out_len_)); write_out_len_ = 0; @@ -622,7 +637,12 @@ struct wolfssl_stream::impl { if(write_out_len_ > 0) { - auto guard = co_await io_cm_.scoped_lock(); + auto [lec, guard] = co_await io_cm_.scoped_lock(); + if(lec) + { + current_op_ = nullptr; + co_return {lec, total_written}; + } auto [wec, wn] = co_await capy::write(s_, capy::const_buffer(write_out_buf_.data(), write_out_len_)); write_out_len_ = 0; @@ -644,7 +664,12 @@ struct wolfssl_stream::impl capy::mutable_buffer rbuf( write_in_buf_.data() + write_in_len_, write_in_buf_.size() - write_in_len_); - auto guard = co_await io_cm_.scoped_lock(); + auto [lec, guard] = co_await io_cm_.scoped_lock(); + if(lec) + { + current_op_ = nullptr; + co_return {lec, total_written}; + } auto [rec, rn] = co_await s_.read_some(rbuf); if(rec) { @@ -707,7 +732,12 @@ struct wolfssl_stream::impl // Flush any remaining output if(read_out_len_ > 0) { - auto guard = co_await io_cm_.scoped_lock(); + auto [lec, guard] = co_await io_cm_.scoped_lock(); + if(lec) + { + ec = lec; + break; + } auto [wec, wn] = co_await capy::write(s_, capy::const_buffer(read_out_buf_.data(), read_out_len_)); read_out_len_ = 0; @@ -725,7 +755,12 @@ struct wolfssl_stream::impl // Must flush (e.g. ClientHello) before reading ServerHello if(read_out_len_ > 0) { - auto guard = co_await io_cm_.scoped_lock(); + auto [lec, guard] = co_await io_cm_.scoped_lock(); + if(lec) + { + ec = lec; + break; + } auto [wec, wn] = co_await capy::write(s_, capy::const_buffer(read_out_buf_.data(), read_out_len_)); read_out_len_ = 0; @@ -744,7 +779,12 @@ struct wolfssl_stream::impl capy::mutable_buffer rbuf( read_in_buf_.data() + read_in_len_, read_in_buf_.size() - read_in_len_); - auto guard = co_await io_cm_.scoped_lock(); + auto [lec, guard] = co_await io_cm_.scoped_lock(); + if(lec) + { + ec = lec; + break; + } auto [rec, rn] = co_await s_.read_some(rbuf); if(rec) { @@ -757,7 +797,12 @@ struct wolfssl_stream::impl { if(read_out_len_ > 0) { - auto guard = co_await io_cm_.scoped_lock(); + auto [lec, guard] = co_await io_cm_.scoped_lock(); + if(lec) + { + ec = lec; + break; + } auto [wec, wn] = co_await capy::write(s_, capy::const_buffer(read_out_buf_.data(), read_out_len_)); read_out_len_ = 0; @@ -807,7 +852,12 @@ struct wolfssl_stream::impl // Bidirectional shutdown complete - flush any remaining output if(read_out_len_ > 0) { - auto guard = co_await io_cm_.scoped_lock(); + auto [lec, guard] = co_await io_cm_.scoped_lock(); + if(lec) + { + ec = lec; + break; + } auto [wec, wn] = co_await capy::write(s_, capy::const_buffer(read_out_buf_.data(), read_out_len_)); read_out_len_ = 0; @@ -821,7 +871,9 @@ struct wolfssl_stream::impl // First, flush any pending output (sends our close_notify) if(read_out_len_ > 0) { - auto guard = co_await io_cm_.scoped_lock(); + auto [lec, guard] = co_await io_cm_.scoped_lock(); + if(lec) + break; auto [wec, wn] = co_await capy::write(s_, capy::const_buffer(read_out_buf_.data(), read_out_len_)); read_out_len_ = 0; @@ -840,7 +892,9 @@ struct wolfssl_stream::impl capy::mutable_buffer rbuf( read_in_buf_.data() + read_in_len_, read_in_buf_.size() - read_in_len_); - auto guard = co_await io_cm_.scoped_lock(); + auto [lec, guard] = co_await io_cm_.scoped_lock(); + if(lec) + break; auto [rec, rn] = co_await s_.read_some(rbuf); if(rec) break; // EOF or socket error during shutdown read - acceptable