1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/native_scheduler.hpp>
20  
#include <boost/corosio/native/native_scheduler.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
22  

22  

23  
#include <boost/corosio/native/detail/select/select_op.hpp>
23  
#include <boost/corosio/native/detail/select/select_op.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28  

28  

29  
#include <boost/corosio/detail/except.hpp>
29  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
31  

31  

32  
#include <sys/select.h>
32  
#include <sys/select.h>
33  
#include <sys/socket.h>
33  
#include <sys/socket.h>
34  
#include <unistd.h>
34  
#include <unistd.h>
35  
#include <errno.h>
35  
#include <errno.h>
36  
#include <fcntl.h>
36  
#include <fcntl.h>
37  

37  

38  
#include <algorithm>
38  
#include <algorithm>
39  
#include <atomic>
39  
#include <atomic>
40  
#include <chrono>
40  
#include <chrono>
41  
#include <condition_variable>
41  
#include <condition_variable>
42  
#include <cstddef>
42  
#include <cstddef>
43  
#include <limits>
43  
#include <limits>
44  
#include <mutex>
44  
#include <mutex>
45  
#include <unordered_map>
45  
#include <unordered_map>
46  

46  

47  
namespace boost::corosio::detail {
47  
namespace boost::corosio::detail {
48  

48  

49  
struct select_op;
49  
struct select_op;
50  

50  

51  
/** POSIX scheduler using select() for I/O multiplexing.
51  
/** POSIX scheduler using select() for I/O multiplexing.
52  

52  

53  
    This scheduler implements the scheduler interface using the POSIX select()
53  
    This scheduler implements the scheduler interface using the POSIX select()
54  
    call for I/O event notification. It uses a single reactor model
54  
    call for I/O event notification. It uses a single reactor model
55  
    where one thread runs select() while other threads wait on a condition
55  
    where one thread runs select() while other threads wait on a condition
56  
    variable for handler work. This design provides:
56  
    variable for handler work. This design provides:
57  

57  

58  
    - Handler parallelism: N posted handlers can execute on N threads
58  
    - Handler parallelism: N posted handlers can execute on N threads
59  
    - No thundering herd: condition_variable wakes exactly one thread
59  
    - No thundering herd: condition_variable wakes exactly one thread
60  
    - Portability: Works on all POSIX systems
60  
    - Portability: Works on all POSIX systems
61  

61  

62  
    The design mirrors epoll_scheduler for behavioral consistency:
62  
    The design mirrors epoll_scheduler for behavioral consistency:
63  
    - Same single-reactor thread coordination model
63  
    - Same single-reactor thread coordination model
64  
    - Same work counting semantics
64  
    - Same work counting semantics
65  
    - Same timer integration pattern
65  
    - Same timer integration pattern
66  

66  

67  
    Known Limitations:
67  
    Known Limitations:
68  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
68  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
69  
    - O(n) scanning: rebuilds fd_sets each iteration
69  
    - O(n) scanning: rebuilds fd_sets each iteration
70  
    - Level-triggered only (no edge-triggered mode)
70  
    - Level-triggered only (no edge-triggered mode)
71  

71  

72  
    @par Thread Safety
72  
    @par Thread Safety
73  
    All public member functions are thread-safe.
73  
    All public member functions are thread-safe.
74  
*/
74  
*/
75  
class BOOST_COROSIO_DECL select_scheduler final
75  
class BOOST_COROSIO_DECL select_scheduler final
76  
    : public native_scheduler
76  
    : public native_scheduler
77  
    , public capy::execution_context::service
77  
    , public capy::execution_context::service
78  
{
78  
{
79  
public:
79  
public:
80  
    using key_type = scheduler;
80  
    using key_type = scheduler;
81  

81  

82  
    /** Construct the scheduler.
82  
    /** Construct the scheduler.
83  

83  

84  
        Creates a self-pipe for reactor interruption.
84  
        Creates a self-pipe for reactor interruption.
85  

85  

86  
        @param ctx Reference to the owning execution_context.
86  
        @param ctx Reference to the owning execution_context.
87  
        @param concurrency_hint Hint for expected thread count (unused).
87  
        @param concurrency_hint Hint for expected thread count (unused).
88  
    */
88  
    */
89  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
89  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
90  

90  

91  
    ~select_scheduler() override;
91  
    ~select_scheduler() override;
92  

92  

93  
    select_scheduler(select_scheduler const&)            = delete;
93  
    select_scheduler(select_scheduler const&)            = delete;
94  
    select_scheduler& operator=(select_scheduler const&) = delete;
94  
    select_scheduler& operator=(select_scheduler const&) = delete;
95  

95  

96  
    void shutdown() override;
96  
    void shutdown() override;
97  
    void post(std::coroutine_handle<> h) const override;
97  
    void post(std::coroutine_handle<> h) const override;
98  
    void post(scheduler_op* h) const override;
98  
    void post(scheduler_op* h) const override;
99  
    bool running_in_this_thread() const noexcept override;
99  
    bool running_in_this_thread() const noexcept override;
100  
    void stop() override;
100  
    void stop() override;
101  
    bool stopped() const noexcept override;
101  
    bool stopped() const noexcept override;
102  
    void restart() override;
102  
    void restart() override;
103  
    std::size_t run() override;
103  
    std::size_t run() override;
104  
    std::size_t run_one() override;
104  
    std::size_t run_one() override;
105  
    std::size_t wait_one(long usec) override;
105  
    std::size_t wait_one(long usec) override;
106  
    std::size_t poll() override;
106  
    std::size_t poll() override;
107  
    std::size_t poll_one() override;
107  
    std::size_t poll_one() override;
108  

108  

109  
    /** Return the maximum file descriptor value supported.
109  
    /** Return the maximum file descriptor value supported.
110  

110  

111  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
111  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
112  
        monitored by select(). Operations with fd >= FD_SETSIZE
112  
        monitored by select(). Operations with fd >= FD_SETSIZE
113  
        will fail with EINVAL.
113  
        will fail with EINVAL.
114  

114  

115  
        @return The maximum supported file descriptor value.
115  
        @return The maximum supported file descriptor value.
116  
    */
116  
    */
117  
    static constexpr int max_fd() noexcept
117  
    static constexpr int max_fd() noexcept
118  
    {
118  
    {
119  
        return FD_SETSIZE - 1;
119  
        return FD_SETSIZE - 1;
120  
    }
120  
    }
121  

121  

122  
    /** Register a file descriptor for monitoring.
122  
    /** Register a file descriptor for monitoring.
123  

123  

124  
        @param fd The file descriptor to register.
124  
        @param fd The file descriptor to register.
125  
        @param op The operation associated with this fd.
125  
        @param op The operation associated with this fd.
126  
        @param events Event mask: 1 = read, 2 = write, 3 = both.
126  
        @param events Event mask: 1 = read, 2 = write, 3 = both.
127  
    */
127  
    */
128  
    void register_fd(int fd, select_op* op, int events) const;
128  
    void register_fd(int fd, select_op* op, int events) const;
129  

129  

130  
    /** Unregister a file descriptor from monitoring.
130  
    /** Unregister a file descriptor from monitoring.
131  

131  

132  
        @param fd The file descriptor to unregister.
132  
        @param fd The file descriptor to unregister.
133  
        @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
133  
        @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
134  
    */
134  
    */
135  
    void deregister_fd(int fd, int events) const;
135  
    void deregister_fd(int fd, int events) const;
136  

136  

137  
    void work_started() noexcept override;
137  
    void work_started() noexcept override;
138  
    void work_finished() noexcept override;
138  
    void work_finished() noexcept override;
139  

139  

140  
    // Event flags for register_fd/deregister_fd
140  
    // Event flags for register_fd/deregister_fd
141  
    static constexpr int event_read  = 1;
141  
    static constexpr int event_read  = 1;
142  
    static constexpr int event_write = 2;
142  
    static constexpr int event_write = 2;
143  

143  

144  
private:
144  
private:
145  
    std::size_t do_one(long timeout_us);
145  
    std::size_t do_one(long timeout_us);
146  
    void run_reactor(std::unique_lock<std::mutex>& lock);
146  
    void run_reactor(std::unique_lock<std::mutex>& lock);
147  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
147  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
148  
    void interrupt_reactor() const;
148  
    void interrupt_reactor() const;
149  
    long calculate_timeout(long requested_timeout_us) const;
149  
    long calculate_timeout(long requested_timeout_us) const;
150  

150  

151  
    // Self-pipe for interrupting select()
151  
    // Self-pipe for interrupting select()
152  
    int pipe_fds_[2]; // [0]=read, [1]=write
152  
    int pipe_fds_[2]; // [0]=read, [1]=write
153  

153  

154  
    mutable std::mutex mutex_;
154  
    mutable std::mutex mutex_;
155  
    mutable std::condition_variable wakeup_event_;
155  
    mutable std::condition_variable wakeup_event_;
156  
    mutable op_queue completed_ops_;
156  
    mutable op_queue completed_ops_;
157  
    mutable std::atomic<long> outstanding_work_;
157  
    mutable std::atomic<long> outstanding_work_;
158  
    std::atomic<bool> stopped_;
158  
    std::atomic<bool> stopped_;
159  

159  

160  
    // Per-fd state for tracking registered operations
160  
    // Per-fd state for tracking registered operations
161  
    struct fd_state
161  
    struct fd_state
162  
    {
162  
    {
163  
        select_op* read_op  = nullptr;
163  
        select_op* read_op  = nullptr;
164  
        select_op* write_op = nullptr;
164  
        select_op* write_op = nullptr;
165  
    };
165  
    };
166  
    mutable std::unordered_map<int, fd_state> registered_fds_;
166  
    mutable std::unordered_map<int, fd_state> registered_fds_;
167  
    mutable int max_fd_ = -1;
167  
    mutable int max_fd_ = -1;
168  

168  

169  
    // Single reactor thread coordination
169  
    // Single reactor thread coordination
170  
    mutable bool reactor_running_     = false;
170  
    mutable bool reactor_running_     = false;
171  
    mutable bool reactor_interrupted_ = false;
171  
    mutable bool reactor_interrupted_ = false;
172  
    mutable int idle_thread_count_    = 0;
172  
    mutable int idle_thread_count_    = 0;
173  

173  

174  
    // Sentinel operation for interleaving reactor runs with handler execution.
174  
    // Sentinel operation for interleaving reactor runs with handler execution.
175  
    // Ensures the reactor runs periodically even when handlers are continuously
175  
    // Ensures the reactor runs periodically even when handlers are continuously
176  
    // posted, preventing timer starvation.
176  
    // posted, preventing timer starvation.
177  
    struct task_op final : scheduler_op
177  
    struct task_op final : scheduler_op
178  
    {
178  
    {
179  
        void operator()() override {}
179  
        void operator()() override {}
180  
        void destroy() override {}
180  
        void destroy() override {}
181  
    };
181  
    };
182  
    task_op task_op_;
182  
    task_op task_op_;
183  
};
183  
};
184  

184  

185  
/*
185  
/*
186  
    select Scheduler - Single Reactor Model
186  
    select Scheduler - Single Reactor Model
187  
    =======================================
187  
    =======================================
188  

188  

189  
    This scheduler mirrors the epoll_scheduler design but uses select() instead
189  
    This scheduler mirrors the epoll_scheduler design but uses select() instead
190  
    of epoll for I/O multiplexing. The thread coordination strategy is identical:
190  
    of epoll for I/O multiplexing. The thread coordination strategy is identical:
191  
    one thread becomes the "reactor" while others wait on a condition variable.
191  
    one thread becomes the "reactor" while others wait on a condition variable.
192  

192  

193  
    Thread Model
193  
    Thread Model
194  
    ------------
194  
    ------------
195  
    - ONE thread runs select() at a time (the reactor thread)
195  
    - ONE thread runs select() at a time (the reactor thread)
196  
    - OTHER threads wait on wakeup_event_ (condition variable) for handlers
196  
    - OTHER threads wait on wakeup_event_ (condition variable) for handlers
197  
    - When work is posted, exactly one waiting thread wakes via notify_one()
197  
    - When work is posted, exactly one waiting thread wakes via notify_one()
198  

198  

199  
    Key Differences from epoll
199  
    Key Differences from epoll
200  
    --------------------------
200  
    --------------------------
201  
    - Uses self-pipe instead of eventfd for interruption (more portable)
201  
    - Uses self-pipe instead of eventfd for interruption (more portable)
202  
    - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
202  
    - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
203  
    - FD_SETSIZE limit (~1024 fds on most systems)
203  
    - FD_SETSIZE limit (~1024 fds on most systems)
204  
    - Level-triggered only (no edge-triggered mode)
204  
    - Level-triggered only (no edge-triggered mode)
205  

205  

206  
    Self-Pipe Pattern
206  
    Self-Pipe Pattern
207  
    -----------------
207  
    -----------------
208  
    To interrupt a blocking select() call (e.g., when work is posted or a timer
208  
    To interrupt a blocking select() call (e.g., when work is posted or a timer
209  
    expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
209  
    expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
210  
    always in the read_fds set, so select() returns immediately. We drain the
210  
    always in the read_fds set, so select() returns immediately. We drain the
211  
    pipe to clear the readable state.
211  
    pipe to clear the readable state.
212  

212  

213  
    fd-to-op Mapping
213  
    fd-to-op Mapping
214  
    ----------------
214  
    ----------------
215  
    We use an unordered_map<int, fd_state> to track which operations are
215  
    We use an unordered_map<int, fd_state> to track which operations are
216  
    registered for each fd. This allows O(1) lookup when select() returns
216  
    registered for each fd. This allows O(1) lookup when select() returns
217  
    ready fds. Each fd can have at most one read op and one write op registered.
217  
    ready fds. Each fd can have at most one read op and one write op registered.
218  
*/
218  
*/
219  

219  

220  
namespace select {
220  
namespace select {
221  

221  

222  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
222  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
223  
{
223  
{
224  
    select_scheduler const* key;
224  
    select_scheduler const* key;
225  
    scheduler_context* next;
225  
    scheduler_context* next;
226  
};
226  
};
227  

227  

228  
inline thread_local_ptr<scheduler_context> context_stack;
228  
inline thread_local_ptr<scheduler_context> context_stack;
229  

229  

230  
struct thread_context_guard
230  
struct thread_context_guard
231  
{
231  
{
232  
    scheduler_context frame_;
232  
    scheduler_context frame_;
233  

233  

234  
    explicit thread_context_guard(select_scheduler const* ctx) noexcept
234  
    explicit thread_context_guard(select_scheduler const* ctx) noexcept
235  
        : frame_{ctx, context_stack.get()}
235  
        : frame_{ctx, context_stack.get()}
236  
    {
236  
    {
237  
        context_stack.set(&frame_);
237  
        context_stack.set(&frame_);
238  
    }
238  
    }
239  

239  

240  
    ~thread_context_guard() noexcept
240  
    ~thread_context_guard() noexcept
241  
    {
241  
    {
242  
        context_stack.set(frame_.next);
242  
        context_stack.set(frame_.next);
243  
    }
243  
    }
244  
};
244  
};
245  

245  

246  
struct work_guard
246  
struct work_guard
247  
{
247  
{
248  
    select_scheduler* self;
248  
    select_scheduler* self;
249  
    ~work_guard()
249  
    ~work_guard()
250  
    {
250  
    {
251  
        self->work_finished();
251  
        self->work_finished();
252  
    }
252  
    }
253  
};
253  
};
254  

254  

255  
} // namespace select
255  
} // namespace select
256  

256  

257  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
257  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
258  
    : pipe_fds_{-1, -1}
258  
    : pipe_fds_{-1, -1}
259  
    , outstanding_work_(0)
259  
    , outstanding_work_(0)
260  
    , stopped_(false)
260  
    , stopped_(false)
261  
    , max_fd_(-1)
261  
    , max_fd_(-1)
262  
    , reactor_running_(false)
262  
    , reactor_running_(false)
263  
    , reactor_interrupted_(false)
263  
    , reactor_interrupted_(false)
264  
    , idle_thread_count_(0)
264  
    , idle_thread_count_(0)
265  
{
265  
{
266  
    // Create self-pipe for interrupting select()
266  
    // Create self-pipe for interrupting select()
267  
    if (::pipe(pipe_fds_) < 0)
267  
    if (::pipe(pipe_fds_) < 0)
268  
        detail::throw_system_error(make_err(errno), "pipe");
268  
        detail::throw_system_error(make_err(errno), "pipe");
269  

269  

270  
    // Set both ends to non-blocking and close-on-exec
270  
    // Set both ends to non-blocking and close-on-exec
271  
    for (int i = 0; i < 2; ++i)
271  
    for (int i = 0; i < 2; ++i)
272  
    {
272  
    {
273  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
273  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
274  
        if (flags == -1)
274  
        if (flags == -1)
275  
        {
275  
        {
276  
            int errn = errno;
276  
            int errn = errno;
277  
            ::close(pipe_fds_[0]);
277  
            ::close(pipe_fds_[0]);
278  
            ::close(pipe_fds_[1]);
278  
            ::close(pipe_fds_[1]);
279  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
279  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
280  
        }
280  
        }
281  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
281  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
282  
        {
282  
        {
283  
            int errn = errno;
283  
            int errn = errno;
284  
            ::close(pipe_fds_[0]);
284  
            ::close(pipe_fds_[0]);
285  
            ::close(pipe_fds_[1]);
285  
            ::close(pipe_fds_[1]);
286  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
286  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
287  
        }
287  
        }
288  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
288  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
289  
        {
289  
        {
290  
            int errn = errno;
290  
            int errn = errno;
291  
            ::close(pipe_fds_[0]);
291  
            ::close(pipe_fds_[0]);
292  
            ::close(pipe_fds_[1]);
292  
            ::close(pipe_fds_[1]);
293  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
293  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
294  
        }
294  
        }
295  
    }
295  
    }
296  

296  

297  
    timer_svc_ = &get_timer_service(ctx, *this);
297  
    timer_svc_ = &get_timer_service(ctx, *this);
298  
    timer_svc_->set_on_earliest_changed(
298  
    timer_svc_->set_on_earliest_changed(
299  
        timer_service::callback(this, [](void* p) {
299  
        timer_service::callback(this, [](void* p) {
300  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
300  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
301  
        }));
301  
        }));
302  

302  

303  
    // Initialize resolver service
303  
    // Initialize resolver service
304  
    get_resolver_service(ctx, *this);
304  
    get_resolver_service(ctx, *this);
305  

305  

306  
    // Initialize signal service
306  
    // Initialize signal service
307  
    get_signal_service(ctx, *this);
307  
    get_signal_service(ctx, *this);
308  

308  

309  
    // Push task sentinel to interleave reactor runs with handler execution
309  
    // Push task sentinel to interleave reactor runs with handler execution
310  
    completed_ops_.push(&task_op_);
310  
    completed_ops_.push(&task_op_);
311  
}
311  
}
312  

312  

313  
inline select_scheduler::~select_scheduler()
313  
inline select_scheduler::~select_scheduler()
314  
{
314  
{
315  
    if (pipe_fds_[0] >= 0)
315  
    if (pipe_fds_[0] >= 0)
316  
        ::close(pipe_fds_[0]);
316  
        ::close(pipe_fds_[0]);
317  
    if (pipe_fds_[1] >= 0)
317  
    if (pipe_fds_[1] >= 0)
318  
        ::close(pipe_fds_[1]);
318  
        ::close(pipe_fds_[1]);
319  
}
319  
}
320  

320  

321  
inline void
321  
inline void
322  
select_scheduler::shutdown()
322  
select_scheduler::shutdown()
323  
{
323  
{
324  
    {
324  
    {
325  
        std::unique_lock lock(mutex_);
325  
        std::unique_lock lock(mutex_);
326  

326  

327  
        while (auto* h = completed_ops_.pop())
327  
        while (auto* h = completed_ops_.pop())
328  
        {
328  
        {
329  
            if (h == &task_op_)
329  
            if (h == &task_op_)
330  
                continue;
330  
                continue;
331  
            lock.unlock();
331  
            lock.unlock();
332  
            h->destroy();
332  
            h->destroy();
333  
            lock.lock();
333  
            lock.lock();
334  
        }
334  
        }
335  
    }
335  
    }
336  

336  

337  
    if (pipe_fds_[1] >= 0)
337  
    if (pipe_fds_[1] >= 0)
338  
        interrupt_reactor();
338  
        interrupt_reactor();
339  

339  

340  
    wakeup_event_.notify_all();
340  
    wakeup_event_.notify_all();
341  
}
341  
}
342  

342  

343  
inline void
343  
inline void
344  
select_scheduler::post(std::coroutine_handle<> h) const
344  
select_scheduler::post(std::coroutine_handle<> h) const
345  
{
345  
{
346  
    struct post_handler final : scheduler_op
346  
    struct post_handler final : scheduler_op
347  
    {
347  
    {
348  
        std::coroutine_handle<> h_;
348  
        std::coroutine_handle<> h_;
349  

349  

350  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
350  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
351  

351  

352  
        ~post_handler() override = default;
352  
        ~post_handler() override = default;
353  

353  

354  
        void operator()() override
354  
        void operator()() override
355  
        {
355  
        {
356  
            auto h = h_;
356  
            auto h = h_;
357  
            delete this;
357  
            delete this;
358  
            h.resume();
358  
            h.resume();
359  
        }
359  
        }
360  

360  

361  
        void destroy() override
361  
        void destroy() override
362  
        {
362  
        {
363  
            auto h = h_;
363  
            auto h = h_;
364  
            delete this;
364  
            delete this;
365  
            h.destroy();
365  
            h.destroy();
366  
        }
366  
        }
367  
    };
367  
    };
368  

368  

369  
    auto ph = std::make_unique<post_handler>(h);
369  
    auto ph = std::make_unique<post_handler>(h);
370  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
370  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
371  

371  

372  
    std::unique_lock lock(mutex_);
372  
    std::unique_lock lock(mutex_);
373  
    completed_ops_.push(ph.release());
373  
    completed_ops_.push(ph.release());
374  
    wake_one_thread_and_unlock(lock);
374  
    wake_one_thread_and_unlock(lock);
375  
}
375  
}
376  

376  

377  
inline void
377  
inline void
378  
select_scheduler::post(scheduler_op* h) const
378  
select_scheduler::post(scheduler_op* h) const
379  
{
379  
{
380  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
380  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
381  

381  

382  
    std::unique_lock lock(mutex_);
382  
    std::unique_lock lock(mutex_);
383  
    completed_ops_.push(h);
383  
    completed_ops_.push(h);
384  
    wake_one_thread_and_unlock(lock);
384  
    wake_one_thread_and_unlock(lock);
385  
}
385  
}
386  

386  

387  
inline bool
387  
inline bool
388  
select_scheduler::running_in_this_thread() const noexcept
388  
select_scheduler::running_in_this_thread() const noexcept
389  
{
389  
{
390  
    for (auto* c = select::context_stack.get(); c != nullptr; c = c->next)
390  
    for (auto* c = select::context_stack.get(); c != nullptr; c = c->next)
391  
        if (c->key == this)
391  
        if (c->key == this)
392  
            return true;
392  
            return true;
393  
    return false;
393  
    return false;
394  
}
394  
}
395  

395  

396  
inline void
396  
inline void
397  
select_scheduler::stop()
397  
select_scheduler::stop()
398  
{
398  
{
399  
    bool expected = false;
399  
    bool expected = false;
400  
    if (stopped_.compare_exchange_strong(
400  
    if (stopped_.compare_exchange_strong(
401  
            expected, true, std::memory_order_release,
401  
            expected, true, std::memory_order_release,
402  
            std::memory_order_relaxed))
402  
            std::memory_order_relaxed))
403  
    {
403  
    {
404  
        // Wake all threads so they notice stopped_ and exit
404  
        // Wake all threads so they notice stopped_ and exit
405  
        {
405  
        {
406  
            std::lock_guard lock(mutex_);
406  
            std::lock_guard lock(mutex_);
407  
            wakeup_event_.notify_all();
407  
            wakeup_event_.notify_all();
408  
        }
408  
        }
409  
        interrupt_reactor();
409  
        interrupt_reactor();
410  
    }
410  
    }
411  
}
411  
}
412  

412  

413  
inline bool
413  
inline bool
414  
select_scheduler::stopped() const noexcept
414  
select_scheduler::stopped() const noexcept
415  
{
415  
{
416  
    return stopped_.load(std::memory_order_acquire);
416  
    return stopped_.load(std::memory_order_acquire);
417  
}
417  
}
418  

418  

419  
inline void
419  
inline void
420  
select_scheduler::restart()
420  
select_scheduler::restart()
421  
{
421  
{
422  
    stopped_.store(false, std::memory_order_release);
422  
    stopped_.store(false, std::memory_order_release);
423  
}
423  
}
424  

424  

425  
inline std::size_t
425  
inline std::size_t
426  
select_scheduler::run()
426  
select_scheduler::run()
427  
{
427  
{
428  
    if (stopped_.load(std::memory_order_acquire))
428  
    if (stopped_.load(std::memory_order_acquire))
429  
        return 0;
429  
        return 0;
430  

430  

431  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
431  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
432  
    {
432  
    {
433  
        stop();
433  
        stop();
434  
        return 0;
434  
        return 0;
435  
    }
435  
    }
436  

436  

437  
    select::thread_context_guard ctx(this);
437  
    select::thread_context_guard ctx(this);
438  

438  

439  
    std::size_t n = 0;
439  
    std::size_t n = 0;
440  
    while (do_one(-1))
440  
    while (do_one(-1))
441  
        if (n != (std::numeric_limits<std::size_t>::max)())
441  
        if (n != (std::numeric_limits<std::size_t>::max)())
442  
            ++n;
442  
            ++n;
443  
    return n;
443  
    return n;
444  
}
444  
}
445  

445  

446  
inline std::size_t
446  
inline std::size_t
447  
select_scheduler::run_one()
447  
select_scheduler::run_one()
448  
{
448  
{
449  
    if (stopped_.load(std::memory_order_acquire))
449  
    if (stopped_.load(std::memory_order_acquire))
450  
        return 0;
450  
        return 0;
451  

451  

452  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
452  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
453  
    {
453  
    {
454  
        stop();
454  
        stop();
455  
        return 0;
455  
        return 0;
456  
    }
456  
    }
457  

457  

458  
    select::thread_context_guard ctx(this);
458  
    select::thread_context_guard ctx(this);
459  
    return do_one(-1);
459  
    return do_one(-1);
460  
}
460  
}
461  

461  

462  
inline std::size_t
462  
inline std::size_t
463  
select_scheduler::wait_one(long usec)
463  
select_scheduler::wait_one(long usec)
464  
{
464  
{
465  
    if (stopped_.load(std::memory_order_acquire))
465  
    if (stopped_.load(std::memory_order_acquire))
466  
        return 0;
466  
        return 0;
467  

467  

468  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
468  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
469  
    {
469  
    {
470  
        stop();
470  
        stop();
471  
        return 0;
471  
        return 0;
472  
    }
472  
    }
473  

473  

474  
    select::thread_context_guard ctx(this);
474  
    select::thread_context_guard ctx(this);
475  
    return do_one(usec);
475  
    return do_one(usec);
476  
}
476  
}
477  

477  

478  
inline std::size_t
478  
inline std::size_t
479  
select_scheduler::poll()
479  
select_scheduler::poll()
480  
{
480  
{
481  
    if (stopped_.load(std::memory_order_acquire))
481  
    if (stopped_.load(std::memory_order_acquire))
482  
        return 0;
482  
        return 0;
483  

483  

484  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
484  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
485  
    {
485  
    {
486  
        stop();
486  
        stop();
487  
        return 0;
487  
        return 0;
488  
    }
488  
    }
489  

489  

490  
    select::thread_context_guard ctx(this);
490  
    select::thread_context_guard ctx(this);
491  

491  

492  
    std::size_t n = 0;
492  
    std::size_t n = 0;
493  
    while (do_one(0))
493  
    while (do_one(0))
494  
        if (n != (std::numeric_limits<std::size_t>::max)())
494  
        if (n != (std::numeric_limits<std::size_t>::max)())
495  
            ++n;
495  
            ++n;
496  
    return n;
496  
    return n;
497  
}
497  
}
498  

498  

499  
inline std::size_t
499  
inline std::size_t
500  
select_scheduler::poll_one()
500  
select_scheduler::poll_one()
501  
{
501  
{
502  
    if (stopped_.load(std::memory_order_acquire))
502  
    if (stopped_.load(std::memory_order_acquire))
503  
        return 0;
503  
        return 0;
504  

504  

505  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
505  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
506  
    {
506  
    {
507  
        stop();
507  
        stop();
508  
        return 0;
508  
        return 0;
509  
    }
509  
    }
510  

510  

511  
    select::thread_context_guard ctx(this);
511  
    select::thread_context_guard ctx(this);
512  
    return do_one(0);
512  
    return do_one(0);
513  
}
513  
}
514  

514  

515  
inline void
515  
inline void
516  
select_scheduler::register_fd(int fd, select_op* op, int events) const
516  
select_scheduler::register_fd(int fd, select_op* op, int events) const
517  
{
517  
{
518  
    // Validate fd is within select() limits
518  
    // Validate fd is within select() limits
519  
    if (fd < 0 || fd >= FD_SETSIZE)
519  
    if (fd < 0 || fd >= FD_SETSIZE)
520  
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
520  
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
521  

521  

522  
    {
522  
    {
523  
        std::lock_guard lock(mutex_);
523  
        std::lock_guard lock(mutex_);
524  

524  

525  
        auto& state = registered_fds_[fd];
525  
        auto& state = registered_fds_[fd];
526  
        if (events & event_read)
526  
        if (events & event_read)
527  
            state.read_op = op;
527  
            state.read_op = op;
528  
        if (events & event_write)
528  
        if (events & event_write)
529  
            state.write_op = op;
529  
            state.write_op = op;
530  

530  

531  
        if (fd > max_fd_)
531  
        if (fd > max_fd_)
532  
            max_fd_ = fd;
532  
            max_fd_ = fd;
533  
    }
533  
    }
534  

534  

535  
    // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
535  
    // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
536  
    // with the newly registered fd.
536  
    // with the newly registered fd.
537  
    interrupt_reactor();
537  
    interrupt_reactor();
538  
}
538  
}
539  

539  

540  
inline void
540  
inline void
541  
select_scheduler::deregister_fd(int fd, int events) const
541  
select_scheduler::deregister_fd(int fd, int events) const
542  
{
542  
{
543  
    std::lock_guard lock(mutex_);
543  
    std::lock_guard lock(mutex_);
544  

544  

545  
    auto it = registered_fds_.find(fd);
545  
    auto it = registered_fds_.find(fd);
546  
    if (it == registered_fds_.end())
546  
    if (it == registered_fds_.end())
547  
        return;
547  
        return;
548  

548  

549  
    if (events & event_read)
549  
    if (events & event_read)
550  
        it->second.read_op = nullptr;
550  
        it->second.read_op = nullptr;
551  
    if (events & event_write)
551  
    if (events & event_write)
552  
        it->second.write_op = nullptr;
552  
        it->second.write_op = nullptr;
553  

553  

554  
    // Remove entry if both are null
554  
    // Remove entry if both are null
555  
    if (!it->second.read_op && !it->second.write_op)
555  
    if (!it->second.read_op && !it->second.write_op)
556  
    {
556  
    {
557  
        registered_fds_.erase(it);
557  
        registered_fds_.erase(it);
558  

558  

559  
        // Recalculate max_fd_ if needed
559  
        // Recalculate max_fd_ if needed
560  
        if (fd == max_fd_)
560  
        if (fd == max_fd_)
561  
        {
561  
        {
562  
            max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
562  
            max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
563  
            for (auto& [registered_fd, state] : registered_fds_)
563  
            for (auto& [registered_fd, state] : registered_fds_)
564  
            {
564  
            {
565  
                if (registered_fd > max_fd_)
565  
                if (registered_fd > max_fd_)
566  
                    max_fd_ = registered_fd;
566  
                    max_fd_ = registered_fd;
567  
            }
567  
            }
568  
        }
568  
        }
569  
    }
569  
    }
570  
}
570  
}
571  

571  

572  
inline void
572  
inline void
573  
select_scheduler::work_started() noexcept
573  
select_scheduler::work_started() noexcept
574  
{
574  
{
575  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
575  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
576  
}
576  
}
577  

577  

578  
inline void
578  
inline void
579  
select_scheduler::work_finished() noexcept
579  
select_scheduler::work_finished() noexcept
580  
{
580  
{
581  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
581  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
582  
        stop();
582  
        stop();
583  
}
583  
}
584  

584  

585  
inline void
585  
inline void
586  
select_scheduler::interrupt_reactor() const
586  
select_scheduler::interrupt_reactor() const
587  
{
587  
{
588  
    char byte               = 1;
588  
    char byte               = 1;
589  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
589  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
590  
}
590  
}
591  

591  

592  
inline void
592  
inline void
593  
select_scheduler::wake_one_thread_and_unlock(
593  
select_scheduler::wake_one_thread_and_unlock(
594  
    std::unique_lock<std::mutex>& lock) const
594  
    std::unique_lock<std::mutex>& lock) const
595  
{
595  
{
596  
    if (idle_thread_count_ > 0)
596  
    if (idle_thread_count_ > 0)
597  
    {
597  
    {
598  
        // Idle worker exists - wake it via condvar
598  
        // Idle worker exists - wake it via condvar
599  
        wakeup_event_.notify_one();
599  
        wakeup_event_.notify_one();
600  
        lock.unlock();
600  
        lock.unlock();
601  
    }
601  
    }
602  
    else if (reactor_running_ && !reactor_interrupted_)
602  
    else if (reactor_running_ && !reactor_interrupted_)
603  
    {
603  
    {
604  
        // No idle workers but reactor is running - interrupt it
604  
        // No idle workers but reactor is running - interrupt it
605  
        reactor_interrupted_ = true;
605  
        reactor_interrupted_ = true;
606  
        lock.unlock();
606  
        lock.unlock();
607  
        interrupt_reactor();
607  
        interrupt_reactor();
608  
    }
608  
    }
609  
    else
609  
    else
610  
    {
610  
    {
611  
        // No one to wake
611  
        // No one to wake
612  
        lock.unlock();
612  
        lock.unlock();
613  
    }
613  
    }
614  
}
614  
}
615  

615  

616  
inline long
616  
inline long
617  
select_scheduler::calculate_timeout(long requested_timeout_us) const
617  
select_scheduler::calculate_timeout(long requested_timeout_us) const
618  
{
618  
{
619  
    if (requested_timeout_us == 0)
619  
    if (requested_timeout_us == 0)
620  
        return 0;
620  
        return 0;
621  

621  

622  
    auto nearest = timer_svc_->nearest_expiry();
622  
    auto nearest = timer_svc_->nearest_expiry();
623  
    if (nearest == timer_service::time_point::max())
623  
    if (nearest == timer_service::time_point::max())
624  
        return requested_timeout_us;
624  
        return requested_timeout_us;
625  

625  

626  
    auto now = std::chrono::steady_clock::now();
626  
    auto now = std::chrono::steady_clock::now();
627  
    if (nearest <= now)
627  
    if (nearest <= now)
628  
        return 0;
628  
        return 0;
629  

629  

630  
    auto timer_timeout_us =
630  
    auto timer_timeout_us =
631  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
631  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
632  
            .count();
632  
            .count();
633  

633  

634  
    // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
634  
    // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
635  
    constexpr auto long_max =
635  
    constexpr auto long_max =
636  
        static_cast<long long>((std::numeric_limits<long>::max)());
636  
        static_cast<long long>((std::numeric_limits<long>::max)());
637  
    auto capped_timer_us =
637  
    auto capped_timer_us =
638  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
638  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
639  
                              static_cast<long long>(0)),
639  
                              static_cast<long long>(0)),
640  
                   long_max);
640  
                   long_max);
641  

641  

642  
    if (requested_timeout_us < 0)
642  
    if (requested_timeout_us < 0)
643  
        return static_cast<long>(capped_timer_us);
643  
        return static_cast<long>(capped_timer_us);
644  

644  

645  
    // requested_timeout_us is already long, so min() result fits in long
645  
    // requested_timeout_us is already long, so min() result fits in long
646  
    return static_cast<long>(
646  
    return static_cast<long>(
647  
        (std::min)(static_cast<long long>(requested_timeout_us),
647  
        (std::min)(static_cast<long long>(requested_timeout_us),
648  
                   capped_timer_us));
648  
                   capped_timer_us));
649  
}
649  
}
650  

650  

651  
inline void
651  
inline void
652  
select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
652  
select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
653  
{
653  
{
654  
    // Calculate timeout considering timers, use 0 if interrupted
654  
    // Calculate timeout considering timers, use 0 if interrupted
655  
    long effective_timeout_us =
655  
    long effective_timeout_us =
656  
        reactor_interrupted_ ? 0 : calculate_timeout(-1);
656  
        reactor_interrupted_ ? 0 : calculate_timeout(-1);
657  

657  

658  
    // Build fd_sets from registered_fds_
658  
    // Build fd_sets from registered_fds_
659  
    fd_set read_fds, write_fds, except_fds;
659  
    fd_set read_fds, write_fds, except_fds;
660  
    FD_ZERO(&read_fds);
660  
    FD_ZERO(&read_fds);
661  
    FD_ZERO(&write_fds);
661  
    FD_ZERO(&write_fds);
662  
    FD_ZERO(&except_fds);
662  
    FD_ZERO(&except_fds);
663  

663  

664  
    // Always include the interrupt pipe
664  
    // Always include the interrupt pipe
665  
    FD_SET(pipe_fds_[0], &read_fds);
665  
    FD_SET(pipe_fds_[0], &read_fds);
666  
    int nfds = pipe_fds_[0];
666  
    int nfds = pipe_fds_[0];
667  

667  

668  
    // Add registered fds
668  
    // Add registered fds
669  
    for (auto& [fd, state] : registered_fds_)
669  
    for (auto& [fd, state] : registered_fds_)
670  
    {
670  
    {
671  
        if (state.read_op)
671  
        if (state.read_op)
672  
            FD_SET(fd, &read_fds);
672  
            FD_SET(fd, &read_fds);
673  
        if (state.write_op)
673  
        if (state.write_op)
674  
        {
674  
        {
675  
            FD_SET(fd, &write_fds);
675  
            FD_SET(fd, &write_fds);
676  
            // Also monitor for errors on connect operations
676  
            // Also monitor for errors on connect operations
677  
            FD_SET(fd, &except_fds);
677  
            FD_SET(fd, &except_fds);
678  
        }
678  
        }
679  
        if (fd > nfds)
679  
        if (fd > nfds)
680  
            nfds = fd;
680  
            nfds = fd;
681  
    }
681  
    }
682  

682  

683  
    // Convert timeout to timeval
683  
    // Convert timeout to timeval
684  
    struct timeval tv;
684  
    struct timeval tv;
685  
    struct timeval* tv_ptr = nullptr;
685  
    struct timeval* tv_ptr = nullptr;
686  
    if (effective_timeout_us >= 0)
686  
    if (effective_timeout_us >= 0)
687  
    {
687  
    {
688  
        tv.tv_sec  = effective_timeout_us / 1000000;
688  
        tv.tv_sec  = effective_timeout_us / 1000000;
689  
        tv.tv_usec = effective_timeout_us % 1000000;
689  
        tv.tv_usec = effective_timeout_us % 1000000;
690  
        tv_ptr     = &tv;
690  
        tv_ptr     = &tv;
691  
    }
691  
    }
692  

692  

693  
    lock.unlock();
693  
    lock.unlock();
694  

694  

695  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
695  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
696  
    int saved_errno = errno;
696  
    int saved_errno = errno;
697  

697  

698  
    // Process timers outside the lock
698  
    // Process timers outside the lock
699  
    timer_svc_->process_expired();
699  
    timer_svc_->process_expired();
700  

700  

701  
    if (ready < 0 && saved_errno != EINTR)
701  
    if (ready < 0 && saved_errno != EINTR)
702  
        detail::throw_system_error(make_err(saved_errno), "select");
702  
        detail::throw_system_error(make_err(saved_errno), "select");
703  

703  

704  
    // Re-acquire lock before modifying completed_ops_
704  
    // Re-acquire lock before modifying completed_ops_
705  
    lock.lock();
705  
    lock.lock();
706  

706  

707  
    // Drain the interrupt pipe if readable
707  
    // Drain the interrupt pipe if readable
708  
    if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
708  
    if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
709  
    {
709  
    {
710  
        char buf[256];
710  
        char buf[256];
711  
        while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
711  
        while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
712  
        {
712  
        {
713  
        }
713  
        }
714  
    }
714  
    }
715  

715  

716  
    // Process I/O completions
716  
    // Process I/O completions
717  
    int completions_queued = 0;
717  
    int completions_queued = 0;
718  
    if (ready > 0)
718  
    if (ready > 0)
719  
    {
719  
    {
720  
        // Iterate over registered fds (copy keys to avoid iterator invalidation)
720  
        // Iterate over registered fds (copy keys to avoid iterator invalidation)
721  
        std::vector<int> fds_to_check;
721  
        std::vector<int> fds_to_check;
722  
        fds_to_check.reserve(registered_fds_.size());
722  
        fds_to_check.reserve(registered_fds_.size());
723  
        for (auto& [fd, state] : registered_fds_)
723  
        for (auto& [fd, state] : registered_fds_)
724  
            fds_to_check.push_back(fd);
724  
            fds_to_check.push_back(fd);
725  

725  

726  
        for (int fd : fds_to_check)
726  
        for (int fd : fds_to_check)
727  
        {
727  
        {
728  
            auto it = registered_fds_.find(fd);
728  
            auto it = registered_fds_.find(fd);
729  
            if (it == registered_fds_.end())
729  
            if (it == registered_fds_.end())
730  
                continue;
730  
                continue;
731  

731  

732  
            auto& state = it->second;
732  
            auto& state = it->second;
733  

733  

734  
            // Check for errors (especially for connect operations)
734  
            // Check for errors (especially for connect operations)
735  
            bool has_error = FD_ISSET(fd, &except_fds);
735  
            bool has_error = FD_ISSET(fd, &except_fds);
736  

736  

737  
            // Process read readiness
737  
            // Process read readiness
738  
            if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
738  
            if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
739  
            {
739  
            {
740  
                auto* op = state.read_op;
740  
                auto* op = state.read_op;
741  
                // Claim the op by exchanging to unregistered. Both registering and
741  
                // Claim the op by exchanging to unregistered. Both registering and
742  
                // registered states mean the op is ours to complete.
742  
                // registered states mean the op is ours to complete.
743  
                auto prev = op->registered.exchange(
743  
                auto prev = op->registered.exchange(
744  
                    select_registration_state::unregistered,
744  
                    select_registration_state::unregistered,
745  
                    std::memory_order_acq_rel);
745  
                    std::memory_order_acq_rel);
746  
                if (prev != select_registration_state::unregistered)
746  
                if (prev != select_registration_state::unregistered)
747  
                {
747  
                {
748  
                    state.read_op = nullptr;
748  
                    state.read_op = nullptr;
749  

749  

750  
                    if (has_error)
750  
                    if (has_error)
751  
                    {
751  
                    {
752  
                        int errn      = 0;
752  
                        int errn      = 0;
753  
                        socklen_t len = sizeof(errn);
753  
                        socklen_t len = sizeof(errn);
754  
                        if (::getsockopt(
754  
                        if (::getsockopt(
755  
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
755  
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
756  
                            errn = errno;
756  
                            errn = errno;
757  
                        if (errn == 0)
757  
                        if (errn == 0)
758  
                            errn = EIO;
758  
                            errn = EIO;
759  
                        op->complete(errn, 0);
759  
                        op->complete(errn, 0);
760  
                    }
760  
                    }
761  
                    else
761  
                    else
762  
                    {
762  
                    {
763  
                        op->perform_io();
763  
                        op->perform_io();
764  
                    }
764  
                    }
765  

765  

766  
                    completed_ops_.push(op);
766  
                    completed_ops_.push(op);
767  
                    ++completions_queued;
767  
                    ++completions_queued;
768  
                }
768  
                }
769  
            }
769  
            }
770  

770  

771  
            // Process write readiness
771  
            // Process write readiness
772  
            if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
772  
            if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
773  
            {
773  
            {
774  
                auto* op = state.write_op;
774  
                auto* op = state.write_op;
775  
                // Claim the op by exchanging to unregistered. Both registering and
775  
                // Claim the op by exchanging to unregistered. Both registering and
776  
                // registered states mean the op is ours to complete.
776  
                // registered states mean the op is ours to complete.
777  
                auto prev = op->registered.exchange(
777  
                auto prev = op->registered.exchange(
778  
                    select_registration_state::unregistered,
778  
                    select_registration_state::unregistered,
779  
                    std::memory_order_acq_rel);
779  
                    std::memory_order_acq_rel);
780  
                if (prev != select_registration_state::unregistered)
780  
                if (prev != select_registration_state::unregistered)
781  
                {
781  
                {
782  
                    state.write_op = nullptr;
782  
                    state.write_op = nullptr;
783  

783  

784  
                    if (has_error)
784  
                    if (has_error)
785  
                    {
785  
                    {
786  
                        int errn      = 0;
786  
                        int errn      = 0;
787  
                        socklen_t len = sizeof(errn);
787  
                        socklen_t len = sizeof(errn);
788  
                        if (::getsockopt(
788  
                        if (::getsockopt(
789  
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
789  
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
790  
                            errn = errno;
790  
                            errn = errno;
791  
                        if (errn == 0)
791  
                        if (errn == 0)
792  
                            errn = EIO;
792  
                            errn = EIO;
793  
                        op->complete(errn, 0);
793  
                        op->complete(errn, 0);
794  
                    }
794  
                    }
795  
                    else
795  
                    else
796  
                    {
796  
                    {
797  
                        op->perform_io();
797  
                        op->perform_io();
798  
                    }
798  
                    }
799  

799  

800  
                    completed_ops_.push(op);
800  
                    completed_ops_.push(op);
801  
                    ++completions_queued;
801  
                    ++completions_queued;
802  
                }
802  
                }
803  
            }
803  
            }
804  

804  

805  
            // Clean up empty entries
805  
            // Clean up empty entries
806  
            if (!state.read_op && !state.write_op)
806  
            if (!state.read_op && !state.write_op)
807  
                registered_fds_.erase(it);
807  
                registered_fds_.erase(it);
808  
        }
808  
        }
809  
    }
809  
    }
810  

810  

811  
    if (completions_queued > 0)
811  
    if (completions_queued > 0)
812  
    {
812  
    {
813  
        if (completions_queued == 1)
813  
        if (completions_queued == 1)
814  
            wakeup_event_.notify_one();
814  
            wakeup_event_.notify_one();
815  
        else
815  
        else
816  
            wakeup_event_.notify_all();
816  
            wakeup_event_.notify_all();
817  
    }
817  
    }
818  
}
818  
}
819  

819  

820  
inline std::size_t
820  
inline std::size_t
821  
select_scheduler::do_one(long timeout_us)
821  
select_scheduler::do_one(long timeout_us)
822  
{
822  
{
823  
    std::unique_lock lock(mutex_);
823  
    std::unique_lock lock(mutex_);
824  

824  

825  
    for (;;)
825  
    for (;;)
826  
    {
826  
    {
827  
        if (stopped_.load(std::memory_order_acquire))
827  
        if (stopped_.load(std::memory_order_acquire))
828  
            return 0;
828  
            return 0;
829  

829  

830  
        scheduler_op* op = completed_ops_.pop();
830  
        scheduler_op* op = completed_ops_.pop();
831  

831  

832  
        if (op == &task_op_)
832  
        if (op == &task_op_)
833  
        {
833  
        {
834  
            bool more_handlers = !completed_ops_.empty();
834  
            bool more_handlers = !completed_ops_.empty();
835  

835  

836  
            if (!more_handlers)
836  
            if (!more_handlers)
837  
            {
837  
            {
838  
                if (outstanding_work_.load(std::memory_order_acquire) == 0)
838  
                if (outstanding_work_.load(std::memory_order_acquire) == 0)
839  
                {
839  
                {
840  
                    completed_ops_.push(&task_op_);
840  
                    completed_ops_.push(&task_op_);
841  
                    return 0;
841  
                    return 0;
842  
                }
842  
                }
843  
                if (timeout_us == 0)
843  
                if (timeout_us == 0)
844  
                {
844  
                {
845  
                    completed_ops_.push(&task_op_);
845  
                    completed_ops_.push(&task_op_);
846  
                    return 0;
846  
                    return 0;
847  
                }
847  
                }
848  
            }
848  
            }
849  

849  

850  
            reactor_interrupted_ = more_handlers || timeout_us == 0;
850  
            reactor_interrupted_ = more_handlers || timeout_us == 0;
851  
            reactor_running_     = true;
851  
            reactor_running_     = true;
852  

852  

853  
            if (more_handlers && idle_thread_count_ > 0)
853  
            if (more_handlers && idle_thread_count_ > 0)
854  
                wakeup_event_.notify_one();
854  
                wakeup_event_.notify_one();
855  

855  

856  
            run_reactor(lock);
856  
            run_reactor(lock);
857  

857  

858  
            reactor_running_ = false;
858  
            reactor_running_ = false;
859  
            completed_ops_.push(&task_op_);
859  
            completed_ops_.push(&task_op_);
860  
            continue;
860  
            continue;
861  
        }
861  
        }
862  

862  

863  
        if (op != nullptr)
863  
        if (op != nullptr)
864  
        {
864  
        {
865  
            lock.unlock();
865  
            lock.unlock();
866  
            select::work_guard g{this};
866  
            select::work_guard g{this};
867  
            (*op)();
867  
            (*op)();
868  
            return 1;
868  
            return 1;
869  
        }
869  
        }
870  

870  

871  
        if (outstanding_work_.load(std::memory_order_acquire) == 0)
871  
        if (outstanding_work_.load(std::memory_order_acquire) == 0)
872  
            return 0;
872  
            return 0;
873  

873  

874  
        if (timeout_us == 0)
874  
        if (timeout_us == 0)
875  
            return 0;
875  
            return 0;
876  

876  

877  
        ++idle_thread_count_;
877  
        ++idle_thread_count_;
878  
        if (timeout_us < 0)
878  
        if (timeout_us < 0)
879  
            wakeup_event_.wait(lock);
879  
            wakeup_event_.wait(lock);
880  
        else
880  
        else
881  
            wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
881  
            wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
882  
        --idle_thread_count_;
882  
        --idle_thread_count_;
883  
    }
883  
    }
884  
}
884  
}
885  

885  

886  
} // namespace boost::corosio::detail
886  
} // namespace boost::corosio::detail
887  

887  

888  
#endif // BOOST_COROSIO_HAS_SELECT
888  
#endif // BOOST_COROSIO_HAS_SELECT
889  

889  

890  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
890  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP