1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/native_scheduler.hpp>
20  
#include <boost/corosio/native/native_scheduler.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
22  

22  

23  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28  

28  

29  
#include <boost/corosio/detail/except.hpp>
29  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
31  

31  

32  
#include <atomic>
32  
#include <atomic>
33  
#include <chrono>
33  
#include <chrono>
34  
#include <condition_variable>
34  
#include <condition_variable>
35  
#include <cstddef>
35  
#include <cstddef>
36  
#include <cstdint>
36  
#include <cstdint>
37  
#include <limits>
37  
#include <limits>
38  
#include <mutex>
38  
#include <mutex>
39  
#include <utility>
39  
#include <utility>
40  

40  

41  
#include <errno.h>
41  
#include <errno.h>
42  
#include <fcntl.h>
42  
#include <fcntl.h>
43  
#include <sys/epoll.h>
43  
#include <sys/epoll.h>
44  
#include <sys/eventfd.h>
44  
#include <sys/eventfd.h>
45  
#include <sys/socket.h>
45  
#include <sys/socket.h>
46  
#include <sys/timerfd.h>
46  
#include <sys/timerfd.h>
47  
#include <unistd.h>
47  
#include <unistd.h>
48  

48  

49  
namespace boost::corosio::detail {
49  
namespace boost::corosio::detail {
50  

50  

51  
struct epoll_op;
51  
struct epoll_op;
52  
struct descriptor_state;
52  
struct descriptor_state;
53  
namespace epoll {
53  
namespace epoll {
54  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
54  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
55  
} // namespace epoll
55  
} // namespace epoll
56  

56  

57  
/** Linux scheduler using epoll for I/O multiplexing.
57  
/** Linux scheduler using epoll for I/O multiplexing.
58  

58  

59  
    This scheduler implements the scheduler interface using Linux epoll
59  
    This scheduler implements the scheduler interface using Linux epoll
60  
    for efficient I/O event notification. It uses a single reactor model
60  
    for efficient I/O event notification. It uses a single reactor model
61  
    where one thread runs epoll_wait while other threads
61  
    where one thread runs epoll_wait while other threads
62  
    wait on a condition variable for handler work. This design provides:
62  
    wait on a condition variable for handler work. This design provides:
63  

63  

64  
    - Handler parallelism: N posted handlers can execute on N threads
64  
    - Handler parallelism: N posted handlers can execute on N threads
65  
    - No thundering herd: condition_variable wakes exactly one thread
65  
    - No thundering herd: condition_variable wakes exactly one thread
66  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
66  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
67  

67  

68  
    When threads call run(), they first try to execute queued handlers.
68  
    When threads call run(), they first try to execute queued handlers.
69  
    If the queue is empty and no reactor is running, one thread becomes
69  
    If the queue is empty and no reactor is running, one thread becomes
70  
    the reactor and runs epoll_wait. Other threads wait on a condition
70  
    the reactor and runs epoll_wait. Other threads wait on a condition
71  
    variable until handlers are available.
71  
    variable until handlers are available.
72  

72  

73  
    @par Thread Safety
73  
    @par Thread Safety
74  
    All public member functions are thread-safe.
74  
    All public member functions are thread-safe.
75  
*/
75  
*/
76  
class BOOST_COROSIO_DECL epoll_scheduler final
76  
class BOOST_COROSIO_DECL epoll_scheduler final
77  
    : public native_scheduler
77  
    : public native_scheduler
78  
    , public capy::execution_context::service
78  
    , public capy::execution_context::service
79  
{
79  
{
80  
public:
80  
public:
81  
    using key_type = scheduler;
81  
    using key_type = scheduler;
82  

82  

83  
    /** Construct the scheduler.
83  
    /** Construct the scheduler.
84  

84  

85  
        Creates an epoll instance, eventfd for reactor interruption,
85  
        Creates an epoll instance, eventfd for reactor interruption,
86  
        and timerfd for kernel-managed timer expiry.
86  
        and timerfd for kernel-managed timer expiry.
87  

87  

88  
        @param ctx Reference to the owning execution_context.
88  
        @param ctx Reference to the owning execution_context.
89  
        @param concurrency_hint Hint for expected thread count (unused).
89  
        @param concurrency_hint Hint for expected thread count (unused).
90  
    */
90  
    */
91  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
91  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92  

92  

93  
    /// Destroy the scheduler.
93  
    /// Destroy the scheduler.
94  
    ~epoll_scheduler() override;
94  
    ~epoll_scheduler() override;
95  

95  

96  
    epoll_scheduler(epoll_scheduler const&)            = delete;
96  
    epoll_scheduler(epoll_scheduler const&)            = delete;
97  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
97  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
98  

98  

99  
    void shutdown() override;
99  
    void shutdown() override;
100  
    void post(std::coroutine_handle<> h) const override;
100  
    void post(std::coroutine_handle<> h) const override;
101  
    void post(scheduler_op* h) const override;
101  
    void post(scheduler_op* h) const override;
102  
    bool running_in_this_thread() const noexcept override;
102  
    bool running_in_this_thread() const noexcept override;
103  
    void stop() override;
103  
    void stop() override;
104  
    bool stopped() const noexcept override;
104  
    bool stopped() const noexcept override;
105  
    void restart() override;
105  
    void restart() override;
106  
    std::size_t run() override;
106  
    std::size_t run() override;
107  
    std::size_t run_one() override;
107  
    std::size_t run_one() override;
108  
    std::size_t wait_one(long usec) override;
108  
    std::size_t wait_one(long usec) override;
109  
    std::size_t poll() override;
109  
    std::size_t poll() override;
110  
    std::size_t poll_one() override;
110  
    std::size_t poll_one() override;
111  

111  

112  
    /** Return the epoll file descriptor.
112  
    /** Return the epoll file descriptor.
113  

113  

114  
        Used by socket services to register file descriptors
114  
        Used by socket services to register file descriptors
115  
        for I/O event notification.
115  
        for I/O event notification.
116  

116  

117  
        @return The epoll file descriptor.
117  
        @return The epoll file descriptor.
118  
    */
118  
    */
119  
    int epoll_fd() const noexcept
119  
    int epoll_fd() const noexcept
120  
    {
120  
    {
121  
        return epoll_fd_;
121  
        return epoll_fd_;
122  
    }
122  
    }
123  

123  

124  
    /** Reset the thread's inline completion budget.
124  
    /** Reset the thread's inline completion budget.
125  

125  

126  
        Called at the start of each posted completion handler to
126  
        Called at the start of each posted completion handler to
127  
        grant a fresh budget for speculative inline completions.
127  
        grant a fresh budget for speculative inline completions.
128  
    */
128  
    */
129  
    void reset_inline_budget() const noexcept;
129  
    void reset_inline_budget() const noexcept;
130  

130  

131  
    /** Consume one unit of inline budget if available.
131  
    /** Consume one unit of inline budget if available.
132  

132  

133  
        @return True if budget was available and consumed.
133  
        @return True if budget was available and consumed.
134  
    */
134  
    */
135  
    bool try_consume_inline_budget() const noexcept;
135  
    bool try_consume_inline_budget() const noexcept;
136  

136  

137  
    /** Register a descriptor for persistent monitoring.
137  
    /** Register a descriptor for persistent monitoring.
138  

138  

139  
        The fd is registered once and stays registered until explicitly
139  
        The fd is registered once and stays registered until explicitly
140  
        deregistered. Events are dispatched via descriptor_state which
140  
        deregistered. Events are dispatched via descriptor_state which
141  
        tracks pending read/write/connect operations.
141  
        tracks pending read/write/connect operations.
142  

142  

143  
        @param fd The file descriptor to register.
143  
        @param fd The file descriptor to register.
144  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
144  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
145  
    */
145  
    */
146  
    void register_descriptor(int fd, descriptor_state* desc) const;
146  
    void register_descriptor(int fd, descriptor_state* desc) const;
147  

147  

148  
    /** Deregister a persistently registered descriptor.
148  
    /** Deregister a persistently registered descriptor.
149  

149  

150  
        @param fd The file descriptor to deregister.
150  
        @param fd The file descriptor to deregister.
151  
    */
151  
    */
152  
    void deregister_descriptor(int fd) const;
152  
    void deregister_descriptor(int fd) const;
153  

153  

154  
    void work_started() noexcept override;
154  
    void work_started() noexcept override;
155  
    void work_finished() noexcept override;
155  
    void work_finished() noexcept override;
156  

156  

157  
    /** Offset a forthcoming work_finished from work_cleanup.
157  
    /** Offset a forthcoming work_finished from work_cleanup.
158  

158  

159  
        Called by descriptor_state when all I/O returned EAGAIN and no
159  
        Called by descriptor_state when all I/O returned EAGAIN and no
160  
        handler will be executed. Must be called from a scheduler thread.
160  
        handler will be executed. Must be called from a scheduler thread.
161  
    */
161  
    */
162  
    void compensating_work_started() const noexcept;
162  
    void compensating_work_started() const noexcept;
163  

163  

164  
    /** Drain work from thread context's private queue to global queue.
164  
    /** Drain work from thread context's private queue to global queue.
165  

165  

166  
        Called by thread_context_guard destructor when a thread exits run().
166  
        Called by thread_context_guard destructor when a thread exits run().
167  
        Transfers pending work to the global queue under mutex protection.
167  
        Transfers pending work to the global queue under mutex protection.
168  

168  

169  
        @param queue The private queue to drain.
169  
        @param queue The private queue to drain.
170  
        @param count Item count for wakeup decisions (wakes other threads if positive).
170  
        @param count Item count for wakeup decisions (wakes other threads if positive).
171  
    */
171  
    */
172  
    void drain_thread_queue(op_queue& queue, long count) const;
172  
    void drain_thread_queue(op_queue& queue, long count) const;
173  

173  

174  
    /** Post completed operations for deferred invocation.
174  
    /** Post completed operations for deferred invocation.
175  

175  

176  
        If called from a thread running this scheduler, operations go to
176  
        If called from a thread running this scheduler, operations go to
177  
        the thread's private queue (fast path). Otherwise, operations are
177  
        the thread's private queue (fast path). Otherwise, operations are
178  
        added to the global queue under mutex and a waiter is signaled.
178  
        added to the global queue under mutex and a waiter is signaled.
179  

179  

180  
        @par Preconditions
180  
        @par Preconditions
181  
        work_started() must have been called for each operation.
181  
        work_started() must have been called for each operation.
182  

182  

183  
        @param ops Queue of operations to post.
183  
        @param ops Queue of operations to post.
184  
    */
184  
    */
185  
    void post_deferred_completions(op_queue& ops) const;
185  
    void post_deferred_completions(op_queue& ops) const;
186  

186  

187  
private:
187  
private:
188  
    struct work_cleanup
188  
    struct work_cleanup
189  
    {
189  
    {
190  
        epoll_scheduler* scheduler;
190  
        epoll_scheduler* scheduler;
191  
        std::unique_lock<std::mutex>* lock;
191  
        std::unique_lock<std::mutex>* lock;
192  
        epoll::scheduler_context* ctx;
192  
        epoll::scheduler_context* ctx;
193  
        ~work_cleanup();
193  
        ~work_cleanup();
194  
    };
194  
    };
195  

195  

196  
    struct task_cleanup
196  
    struct task_cleanup
197  
    {
197  
    {
198  
        epoll_scheduler const* scheduler;
198  
        epoll_scheduler const* scheduler;
199  
        std::unique_lock<std::mutex>* lock;
199  
        std::unique_lock<std::mutex>* lock;
200  
        epoll::scheduler_context* ctx;
200  
        epoll::scheduler_context* ctx;
201  
        ~task_cleanup();
201  
        ~task_cleanup();
202  
    };
202  
    };
203  

203  

204  
    std::size_t do_one(
204  
    std::size_t do_one(
205  
        std::unique_lock<std::mutex>& lock,
205  
        std::unique_lock<std::mutex>& lock,
206  
        long timeout_us,
206  
        long timeout_us,
207  
        epoll::scheduler_context* ctx);
207  
        epoll::scheduler_context* ctx);
208  
    void
208  
    void
209  
    run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
209  
    run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
210  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
210  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
211  
    void interrupt_reactor() const;
211  
    void interrupt_reactor() const;
212  
    void update_timerfd() const;
212  
    void update_timerfd() const;
213  

213  

214  
    /** Set the signaled state and wake all waiting threads.
214  
    /** Set the signaled state and wake all waiting threads.
215  

215  

216  
        @par Preconditions
216  
        @par Preconditions
217  
        Mutex must be held.
217  
        Mutex must be held.
218  

218  

219  
        @param lock The held mutex lock.
219  
        @param lock The held mutex lock.
220  
    */
220  
    */
221  
    void signal_all(std::unique_lock<std::mutex>& lock) const;
221  
    void signal_all(std::unique_lock<std::mutex>& lock) const;
222  

222  

223  
    /** Set the signaled state and wake one waiter if any exist.
223  
    /** Set the signaled state and wake one waiter if any exist.
224  

224  

225  
        Only unlocks and signals if at least one thread is waiting.
225  
        Only unlocks and signals if at least one thread is waiting.
226  
        Use this when the caller needs to perform a fallback action
226  
        Use this when the caller needs to perform a fallback action
227  
        (such as interrupting the reactor) when no waiters exist.
227  
        (such as interrupting the reactor) when no waiters exist.
228  

228  

229  
        @par Preconditions
229  
        @par Preconditions
230  
        Mutex must be held.
230  
        Mutex must be held.
231  

231  

232  
        @param lock The held mutex lock.
232  
        @param lock The held mutex lock.
233  

233  

234  
        @return `true` if unlocked and signaled, `false` if lock still held.
234  
        @return `true` if unlocked and signaled, `false` if lock still held.
235  
    */
235  
    */
236  
    bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
236  
    bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
237  

237  

238  
    /** Set the signaled state, unlock, and wake one waiter if any exist.
238  
    /** Set the signaled state, unlock, and wake one waiter if any exist.
239  

239  

240  
        Always unlocks the mutex. Use this when the caller will release
240  
        Always unlocks the mutex. Use this when the caller will release
241  
        the lock regardless of whether a waiter exists.
241  
        the lock regardless of whether a waiter exists.
242  

242  

243  
        @par Preconditions
243  
        @par Preconditions
244  
        Mutex must be held.
244  
        Mutex must be held.
245  

245  

246  
        @param lock The held mutex lock.
246  
        @param lock The held mutex lock.
247  

247  

248  
        @return `true` if a waiter was signaled, `false` otherwise.
248  
        @return `true` if a waiter was signaled, `false` otherwise.
249  
    */
249  
    */
250  
    bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
250  
    bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
251  

251  

252  
    /** Clear the signaled state before waiting.
252  
    /** Clear the signaled state before waiting.
253  

253  

254  
        @par Preconditions
254  
        @par Preconditions
255  
        Mutex must be held.
255  
        Mutex must be held.
256  
    */
256  
    */
257  
    void clear_signal() const;
257  
    void clear_signal() const;
258  

258  

259  
    /** Block until the signaled state is set.
259  
    /** Block until the signaled state is set.
260  

260  

261  
        Returns immediately if already signaled (fast-path). Otherwise
261  
        Returns immediately if already signaled (fast-path). Otherwise
262  
        increments the waiter count, waits on the condition variable,
262  
        increments the waiter count, waits on the condition variable,
263  
        and decrements the waiter count upon waking.
263  
        and decrements the waiter count upon waking.
264  

264  

265  
        @par Preconditions
265  
        @par Preconditions
266  
        Mutex must be held.
266  
        Mutex must be held.
267  

267  

268  
        @param lock The held mutex lock.
268  
        @param lock The held mutex lock.
269  
    */
269  
    */
270  
    void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
270  
    void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
271  

271  

272  
    /** Block until signaled or timeout expires.
272  
    /** Block until signaled or timeout expires.
273  

273  

274  
        @par Preconditions
274  
        @par Preconditions
275  
        Mutex must be held.
275  
        Mutex must be held.
276  

276  

277  
        @param lock The held mutex lock.
277  
        @param lock The held mutex lock.
278  
        @param timeout_us Maximum time to wait in microseconds.
278  
        @param timeout_us Maximum time to wait in microseconds.
279  
    */
279  
    */
280  
    void wait_for_signal_for(
280  
    void wait_for_signal_for(
281  
        std::unique_lock<std::mutex>& lock, long timeout_us) const;
281  
        std::unique_lock<std::mutex>& lock, long timeout_us) const;
282  

282  

283  
    int epoll_fd_;
283  
    int epoll_fd_;
284  
    int event_fd_; // for interrupting reactor
284  
    int event_fd_; // for interrupting reactor
285  
    int timer_fd_; // timerfd for kernel-managed timer expiry
285  
    int timer_fd_; // timerfd for kernel-managed timer expiry
286  
    mutable std::mutex mutex_;
286  
    mutable std::mutex mutex_;
287  
    mutable std::condition_variable cond_;
287  
    mutable std::condition_variable cond_;
288  
    mutable op_queue completed_ops_;
288  
    mutable op_queue completed_ops_;
289  
    mutable std::atomic<long> outstanding_work_;
289  
    mutable std::atomic<long> outstanding_work_;
290  
    bool stopped_;
290  
    bool stopped_;
291  

291  

292  
    // True while a thread is blocked in epoll_wait. Used by
292  
    // True while a thread is blocked in epoll_wait. Used by
293  
    // wake_one_thread_and_unlock and work_finished to know when
293  
    // wake_one_thread_and_unlock and work_finished to know when
294  
    // an eventfd interrupt is needed instead of a condvar signal.
294  
    // an eventfd interrupt is needed instead of a condvar signal.
295  
    mutable std::atomic<bool> task_running_{false};
295  
    mutable std::atomic<bool> task_running_{false};
296  

296  

297  
    // True when the reactor has been told to do a non-blocking poll
297  
    // True when the reactor has been told to do a non-blocking poll
298  
    // (more handlers queued or poll mode). Prevents redundant eventfd
298  
    // (more handlers queued or poll mode). Prevents redundant eventfd
299  
    // writes and controls the epoll_wait timeout.
299  
    // writes and controls the epoll_wait timeout.
300  
    mutable bool task_interrupted_ = false;
300  
    mutable bool task_interrupted_ = false;
301  

301  

302  
    // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
302  
    // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
303  
    mutable std::size_t state_ = 0;
303  
    mutable std::size_t state_ = 0;
304  

304  

305  
    // Edge-triggered eventfd state
305  
    // Edge-triggered eventfd state
306  
    mutable std::atomic<bool> eventfd_armed_{false};
306  
    mutable std::atomic<bool> eventfd_armed_{false};
307  

307  

308  
    // Set when the earliest timer changes; flushed before epoll_wait
308  
    // Set when the earliest timer changes; flushed before epoll_wait
309  
    // blocks. Avoids timerfd_settime syscalls for timers that are
309  
    // blocks. Avoids timerfd_settime syscalls for timers that are
310  
    // scheduled then cancelled without being waited on.
310  
    // scheduled then cancelled without being waited on.
311  
    mutable std::atomic<bool> timerfd_stale_{false};
311  
    mutable std::atomic<bool> timerfd_stale_{false};
312  

312  

313  
    // Sentinel operation for interleaving reactor runs with handler execution.
313  
    // Sentinel operation for interleaving reactor runs with handler execution.
314  
    // Ensures the reactor runs periodically even when handlers are continuously
314  
    // Ensures the reactor runs periodically even when handlers are continuously
315  
    // posted, preventing starvation of I/O events, timers, and signals.
315  
    // posted, preventing starvation of I/O events, timers, and signals.
316  
    struct task_op final : scheduler_op
316  
    struct task_op final : scheduler_op
317  
    {
317  
    {
318  
        void operator()() override {}
318  
        void operator()() override {}
319  
        void destroy() override {}
319  
        void destroy() override {}
320  
    };
320  
    };
321  
    task_op task_op_;
321  
    task_op task_op_;
322  
};
322  
};
323  

323  

324  
//--------------------------------------------------------------------------
324  
//--------------------------------------------------------------------------
325  
//
325  
//
326  
// Implementation
326  
// Implementation
327  
//
327  
//
328  
//--------------------------------------------------------------------------
328  
//--------------------------------------------------------------------------
329  

329  

330  
/*
330  
/*
331  
    epoll Scheduler - Single Reactor Model
331  
    epoll Scheduler - Single Reactor Model
332  
    ======================================
332  
    ======================================
333  

333  

334  
    This scheduler uses a thread coordination strategy to provide handler
334  
    This scheduler uses a thread coordination strategy to provide handler
335  
    parallelism and avoid the thundering herd problem.
335  
    parallelism and avoid the thundering herd problem.
336  
    Instead of all threads blocking on epoll_wait(), one thread becomes the
336  
    Instead of all threads blocking on epoll_wait(), one thread becomes the
337  
    "reactor" while others wait on a condition variable for handler work.
337  
    "reactor" while others wait on a condition variable for handler work.
338  

338  

339  
    Thread Model
339  
    Thread Model
340  
    ------------
340  
    ------------
341  
    - ONE thread runs epoll_wait() at a time (the reactor thread)
341  
    - ONE thread runs epoll_wait() at a time (the reactor thread)
342  
    - OTHER threads wait on cond_ (condition variable) for handlers
342  
    - OTHER threads wait on cond_ (condition variable) for handlers
343  
    - When work is posted, exactly one waiting thread wakes via notify_one()
343  
    - When work is posted, exactly one waiting thread wakes via notify_one()
344  
    - This matches Windows IOCP semantics where N posted items wake N threads
344  
    - This matches Windows IOCP semantics where N posted items wake N threads
345  

345  

346  
    Event Loop Structure (do_one)
346  
    Event Loop Structure (do_one)
347  
    -----------------------------
347  
    -----------------------------
348  
    1. Lock mutex, try to pop handler from queue
348  
    1. Lock mutex, try to pop handler from queue
349  
    2. If got handler: execute it (unlocked), return
349  
    2. If got handler: execute it (unlocked), return
350  
    3. If queue empty and no reactor running: become reactor
350  
    3. If queue empty and no reactor running: become reactor
351  
       - Run epoll_wait (unlocked), queue I/O completions, loop back
351  
       - Run epoll_wait (unlocked), queue I/O completions, loop back
352  
    4. If queue empty and reactor running: wait on condvar for work
352  
    4. If queue empty and reactor running: wait on condvar for work
353  

353  

354  
    The task_running_ flag ensures only one thread owns epoll_wait().
354  
    The task_running_ flag ensures only one thread owns epoll_wait().
355  
    After the reactor queues I/O completions, it loops back to try getting
355  
    After the reactor queues I/O completions, it loops back to try getting
356  
    a handler, giving priority to handler execution over more I/O polling.
356  
    a handler, giving priority to handler execution over more I/O polling.
357  

357  

358  
    Signaling State (state_)
358  
    Signaling State (state_)
359  
    ------------------------
359  
    ------------------------
360  
    The state_ variable encodes two pieces of information:
360  
    The state_ variable encodes two pieces of information:
361  
    - Bit 0: signaled flag (1 = signaled, persists until cleared)
361  
    - Bit 0: signaled flag (1 = signaled, persists until cleared)
362  
    - Upper bits: waiter count (each waiter adds 2 before blocking)
362  
    - Upper bits: waiter count (each waiter adds 2 before blocking)
363  

363  

364  
    This allows efficient coordination:
364  
    This allows efficient coordination:
365  
    - Signalers only call notify when waiters exist (state_ > 1)
365  
    - Signalers only call notify when waiters exist (state_ > 1)
366  
    - Waiters check if already signaled before blocking (fast-path)
366  
    - Waiters check if already signaled before blocking (fast-path)
367  

367  

368  
    Wake Coordination (wake_one_thread_and_unlock)
368  
    Wake Coordination (wake_one_thread_and_unlock)
369  
    ----------------------------------------------
369  
    ----------------------------------------------
370  
    When posting work:
370  
    When posting work:
371  
    - If waiters exist (state_ > 1): signal and notify_one()
371  
    - If waiters exist (state_ > 1): signal and notify_one()
372  
    - Else if reactor running: interrupt via eventfd write
372  
    - Else if reactor running: interrupt via eventfd write
373  
    - Else: no-op (thread will find work when it checks queue)
373  
    - Else: no-op (thread will find work when it checks queue)
374  

374  

375  
    This avoids waking threads unnecessarily. With cascading wakes,
375  
    This avoids waking threads unnecessarily. With cascading wakes,
376  
    each handler execution wakes at most one additional thread if
376  
    each handler execution wakes at most one additional thread if
377  
    more work exists in the queue.
377  
    more work exists in the queue.
378  

378  

379  
    Work Counting
379  
    Work Counting
380  
    -------------
380  
    -------------
381  
    outstanding_work_ tracks pending operations. When it hits zero, run()
381  
    outstanding_work_ tracks pending operations. When it hits zero, run()
382  
    returns. Each operation increments on start, decrements on completion.
382  
    returns. Each operation increments on start, decrements on completion.
383  

383  

384  
    Timer Integration
384  
    Timer Integration
385  
    -----------------
385  
    -----------------
386  
    Timers are handled by timer_service. The reactor adjusts epoll_wait
386  
    Timers are handled by timer_service. The reactor adjusts epoll_wait
387  
    timeout to wake for the nearest timer expiry. When a new timer is
387  
    timeout to wake for the nearest timer expiry. When a new timer is
388  
    scheduled earlier than current, timer_service calls interrupt_reactor()
388  
    scheduled earlier than current, timer_service calls interrupt_reactor()
389  
    to re-evaluate the timeout.
389  
    to re-evaluate the timeout.
390  
*/
390  
*/
391  

391  

392  
namespace epoll {
392  
namespace epoll {
393  

393  

394  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
394  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
395  
{
395  
{
396  
    epoll_scheduler const* key;
396  
    epoll_scheduler const* key;
397  
    scheduler_context* next;
397  
    scheduler_context* next;
398  
    op_queue private_queue;
398  
    op_queue private_queue;
399  
    long private_outstanding_work;
399  
    long private_outstanding_work;
400  
    int inline_budget;
400  
    int inline_budget;
401  
    int inline_budget_max;
401  
    int inline_budget_max;
402  
    bool unassisted;
402  
    bool unassisted;
403  

403  

404  
    scheduler_context(epoll_scheduler const* k, scheduler_context* n)
404  
    scheduler_context(epoll_scheduler const* k, scheduler_context* n)
405  
        : key(k)
405  
        : key(k)
406  
        , next(n)
406  
        , next(n)
407  
        , private_outstanding_work(0)
407  
        , private_outstanding_work(0)
408  
        , inline_budget(0)
408  
        , inline_budget(0)
409  
        , inline_budget_max(2)
409  
        , inline_budget_max(2)
410  
        , unassisted(false)
410  
        , unassisted(false)
411  
    {
411  
    {
412  
    }
412  
    }
413  
};
413  
};
414  

414  

415  
inline thread_local_ptr<scheduler_context> context_stack;
415  
inline thread_local_ptr<scheduler_context> context_stack;
416  

416  

417  
struct thread_context_guard
417  
struct thread_context_guard
418  
{
418  
{
419  
    scheduler_context frame_;
419  
    scheduler_context frame_;
420  

420  

421  
    explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
421  
    explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
422  
        : frame_(ctx, context_stack.get())
422  
        : frame_(ctx, context_stack.get())
423  
    {
423  
    {
424  
        context_stack.set(&frame_);
424  
        context_stack.set(&frame_);
425  
    }
425  
    }
426  

426  

427  
    ~thread_context_guard() noexcept
427  
    ~thread_context_guard() noexcept
428  
    {
428  
    {
429  
        if (!frame_.private_queue.empty())
429  
        if (!frame_.private_queue.empty())
430  
            frame_.key->drain_thread_queue(
430  
            frame_.key->drain_thread_queue(
431  
                frame_.private_queue, frame_.private_outstanding_work);
431  
                frame_.private_queue, frame_.private_outstanding_work);
432  
        context_stack.set(frame_.next);
432  
        context_stack.set(frame_.next);
433  
    }
433  
    }
434  
};
434  
};
435  

435  

436  
inline scheduler_context*
436  
inline scheduler_context*
437  
find_context(epoll_scheduler const* self) noexcept
437  
find_context(epoll_scheduler const* self) noexcept
438  
{
438  
{
439  
    for (auto* c = context_stack.get(); c != nullptr; c = c->next)
439  
    for (auto* c = context_stack.get(); c != nullptr; c = c->next)
440  
        if (c->key == self)
440  
        if (c->key == self)
441  
            return c;
441  
            return c;
442  
    return nullptr;
442  
    return nullptr;
443  
}
443  
}
444  

444  

445  
} // namespace epoll
445  
} // namespace epoll
446  

446  

447  
inline void
447  
inline void
448  
epoll_scheduler::reset_inline_budget() const noexcept
448  
epoll_scheduler::reset_inline_budget() const noexcept
449  
{
449  
{
450  
    if (auto* ctx = epoll::find_context(this))
450  
    if (auto* ctx = epoll::find_context(this))
451  
    {
451  
    {
452  
        // Cap when no other thread absorbed queued work. A moderate
452  
        // Cap when no other thread absorbed queued work. A moderate
453  
        // cap (4) amortizes scheduling for small buffers while avoiding
453  
        // cap (4) amortizes scheduling for small buffers while avoiding
454  
        // bursty I/O that fills socket buffers and stalls large transfers.
454  
        // bursty I/O that fills socket buffers and stalls large transfers.
455  
        if (ctx->unassisted)
455  
        if (ctx->unassisted)
456  
        {
456  
        {
457  
            ctx->inline_budget_max = 4;
457  
            ctx->inline_budget_max = 4;
458  
            ctx->inline_budget     = 4;
458  
            ctx->inline_budget     = 4;
459  
            return;
459  
            return;
460  
        }
460  
        }
461  
        // Ramp up when previous cycle fully consumed budget.
461  
        // Ramp up when previous cycle fully consumed budget.
462  
        // Reset on partial consumption (EAGAIN hit or peer got scheduled).
462  
        // Reset on partial consumption (EAGAIN hit or peer got scheduled).
463  
        if (ctx->inline_budget == 0)
463  
        if (ctx->inline_budget == 0)
464  
            ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
464  
            ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
465  
        else if (ctx->inline_budget < ctx->inline_budget_max)
465  
        else if (ctx->inline_budget < ctx->inline_budget_max)
466  
            ctx->inline_budget_max = 2;
466  
            ctx->inline_budget_max = 2;
467  
        ctx->inline_budget = ctx->inline_budget_max;
467  
        ctx->inline_budget = ctx->inline_budget_max;
468  
    }
468  
    }
469  
}
469  
}
470  

470  

471  
inline bool
471  
inline bool
472  
epoll_scheduler::try_consume_inline_budget() const noexcept
472  
epoll_scheduler::try_consume_inline_budget() const noexcept
473  
{
473  
{
474  
    if (auto* ctx = epoll::find_context(this))
474  
    if (auto* ctx = epoll::find_context(this))
475  
    {
475  
    {
476  
        if (ctx->inline_budget > 0)
476  
        if (ctx->inline_budget > 0)
477  
        {
477  
        {
478  
            --ctx->inline_budget;
478  
            --ctx->inline_budget;
479  
            return true;
479  
            return true;
480  
        }
480  
        }
481  
    }
481  
    }
482  
    return false;
482  
    return false;
483  
}
483  
}
484  

484  

485  
inline void
485  
inline void
486  
descriptor_state::operator()()
486  
descriptor_state::operator()()
487  
{
487  
{
488  
    is_enqueued_.store(false, std::memory_order_relaxed);
488  
    is_enqueued_.store(false, std::memory_order_relaxed);
489  

489  

490  
    // Take ownership of impl ref set by close_socket() to prevent
490  
    // Take ownership of impl ref set by close_socket() to prevent
491  
    // the owning impl from being freed while we're executing
491  
    // the owning impl from being freed while we're executing
492  
    auto prevent_impl_destruction = std::move(impl_ref_);
492  
    auto prevent_impl_destruction = std::move(impl_ref_);
493  

493  

494  
    std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
494  
    std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
495  
    if (ev == 0)
495  
    if (ev == 0)
496  
    {
496  
    {
497  
        scheduler_->compensating_work_started();
497  
        scheduler_->compensating_work_started();
498  
        return;
498  
        return;
499  
    }
499  
    }
500  

500  

501  
    op_queue local_ops;
501  
    op_queue local_ops;
502  

502  

503  
    int err = 0;
503  
    int err = 0;
504  
    if (ev & EPOLLERR)
504  
    if (ev & EPOLLERR)
505  
    {
505  
    {
506  
        socklen_t len = sizeof(err);
506  
        socklen_t len = sizeof(err);
507  
        if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
507  
        if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
508  
            err = errno;
508  
            err = errno;
509  
        if (err == 0)
509  
        if (err == 0)
510  
            err = EIO;
510  
            err = EIO;
511  
    }
511  
    }
512  

512  

513  
    {
513  
    {
514  
        std::lock_guard lock(mutex);
514  
        std::lock_guard lock(mutex);
515  
        if (ev & EPOLLIN)
515  
        if (ev & EPOLLIN)
516  
        {
516  
        {
517  
            if (read_op)
517  
            if (read_op)
518  
            {
518  
            {
519  
                auto* rd = read_op;
519  
                auto* rd = read_op;
520  
                if (err)
520  
                if (err)
521  
                    rd->complete(err, 0);
521  
                    rd->complete(err, 0);
522  
                else
522  
                else
523  
                    rd->perform_io();
523  
                    rd->perform_io();
524  

524  

525  
                if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
525  
                if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
526  
                {
526  
                {
527  
                    rd->errn = 0;
527  
                    rd->errn = 0;
528  
                }
528  
                }
529  
                else
529  
                else
530  
                {
530  
                {
531  
                    read_op = nullptr;
531  
                    read_op = nullptr;
532  
                    local_ops.push(rd);
532  
                    local_ops.push(rd);
533  
                }
533  
                }
534  
            }
534  
            }
535  
            else
535  
            else
536  
            {
536  
            {
537  
                read_ready = true;
537  
                read_ready = true;
538  
            }
538  
            }
539  
        }
539  
        }
540  
        if (ev & EPOLLOUT)
540  
        if (ev & EPOLLOUT)
541  
        {
541  
        {
542  
            bool had_write_op = (connect_op || write_op);
542  
            bool had_write_op = (connect_op || write_op);
543  
            if (connect_op)
543  
            if (connect_op)
544  
            {
544  
            {
545  
                auto* cn = connect_op;
545  
                auto* cn = connect_op;
546  
                if (err)
546  
                if (err)
547  
                    cn->complete(err, 0);
547  
                    cn->complete(err, 0);
548  
                else
548  
                else
549  
                    cn->perform_io();
549  
                    cn->perform_io();
550  
                connect_op = nullptr;
550  
                connect_op = nullptr;
551  
                local_ops.push(cn);
551  
                local_ops.push(cn);
552  
            }
552  
            }
553  
            if (write_op)
553  
            if (write_op)
554  
            {
554  
            {
555  
                auto* wr = write_op;
555  
                auto* wr = write_op;
556  
                if (err)
556  
                if (err)
557  
                    wr->complete(err, 0);
557  
                    wr->complete(err, 0);
558  
                else
558  
                else
559  
                    wr->perform_io();
559  
                    wr->perform_io();
560  

560  

561  
                if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
561  
                if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
562  
                {
562  
                {
563  
                    wr->errn = 0;
563  
                    wr->errn = 0;
564  
                }
564  
                }
565  
                else
565  
                else
566  
                {
566  
                {
567  
                    write_op = nullptr;
567  
                    write_op = nullptr;
568  
                    local_ops.push(wr);
568  
                    local_ops.push(wr);
569  
                }
569  
                }
570  
            }
570  
            }
571  
            if (!had_write_op)
571  
            if (!had_write_op)
572  
                write_ready = true;
572  
                write_ready = true;
573  
        }
573  
        }
574  
        if (err)
574  
        if (err)
575  
        {
575  
        {
576  
            if (read_op)
576  
            if (read_op)
577  
            {
577  
            {
578  
                read_op->complete(err, 0);
578  
                read_op->complete(err, 0);
579  
                local_ops.push(std::exchange(read_op, nullptr));
579  
                local_ops.push(std::exchange(read_op, nullptr));
580  
            }
580  
            }
581  
            if (write_op)
581  
            if (write_op)
582  
            {
582  
            {
583  
                write_op->complete(err, 0);
583  
                write_op->complete(err, 0);
584  
                local_ops.push(std::exchange(write_op, nullptr));
584  
                local_ops.push(std::exchange(write_op, nullptr));
585  
            }
585  
            }
586  
            if (connect_op)
586  
            if (connect_op)
587  
            {
587  
            {
588  
                connect_op->complete(err, 0);
588  
                connect_op->complete(err, 0);
589  
                local_ops.push(std::exchange(connect_op, nullptr));
589  
                local_ops.push(std::exchange(connect_op, nullptr));
590  
            }
590  
            }
591  
        }
591  
        }
592  
    }
592  
    }
593  

593  

594  
    // Execute first handler inline — the scheduler's work_cleanup
594  
    // Execute first handler inline — the scheduler's work_cleanup
595  
    // accounts for this as the "consumed" work item
595  
    // accounts for this as the "consumed" work item
596  
    scheduler_op* first = local_ops.pop();
596  
    scheduler_op* first = local_ops.pop();
597  
    if (first)
597  
    if (first)
598  
    {
598  
    {
599  
        scheduler_->post_deferred_completions(local_ops);
599  
        scheduler_->post_deferred_completions(local_ops);
600  
        (*first)();
600  
        (*first)();
601  
    }
601  
    }
602  
    else
602  
    else
603  
    {
603  
    {
604  
        scheduler_->compensating_work_started();
604  
        scheduler_->compensating_work_started();
605  
    }
605  
    }
606  
}
606  
}
607  

607  

608  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
608  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
609  
    : epoll_fd_(-1)
609  
    : epoll_fd_(-1)
610  
    , event_fd_(-1)
610  
    , event_fd_(-1)
611  
    , timer_fd_(-1)
611  
    , timer_fd_(-1)
612  
    , outstanding_work_(0)
612  
    , outstanding_work_(0)
613  
    , stopped_(false)
613  
    , stopped_(false)
614  
    , task_running_{false}
614  
    , task_running_{false}
615  
    , task_interrupted_(false)
615  
    , task_interrupted_(false)
616  
    , state_(0)
616  
    , state_(0)
617  
{
617  
{
618  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
618  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
619  
    if (epoll_fd_ < 0)
619  
    if (epoll_fd_ < 0)
620  
        detail::throw_system_error(make_err(errno), "epoll_create1");
620  
        detail::throw_system_error(make_err(errno), "epoll_create1");
621  

621  

622  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
622  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
623  
    if (event_fd_ < 0)
623  
    if (event_fd_ < 0)
624  
    {
624  
    {
625  
        int errn = errno;
625  
        int errn = errno;
626  
        ::close(epoll_fd_);
626  
        ::close(epoll_fd_);
627  
        detail::throw_system_error(make_err(errn), "eventfd");
627  
        detail::throw_system_error(make_err(errn), "eventfd");
628  
    }
628  
    }
629  

629  

630  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
630  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
631  
    if (timer_fd_ < 0)
631  
    if (timer_fd_ < 0)
632  
    {
632  
    {
633  
        int errn = errno;
633  
        int errn = errno;
634  
        ::close(event_fd_);
634  
        ::close(event_fd_);
635  
        ::close(epoll_fd_);
635  
        ::close(epoll_fd_);
636  
        detail::throw_system_error(make_err(errn), "timerfd_create");
636  
        detail::throw_system_error(make_err(errn), "timerfd_create");
637  
    }
637  
    }
638  

638  

639  
    epoll_event ev{};
639  
    epoll_event ev{};
640  
    ev.events   = EPOLLIN | EPOLLET;
640  
    ev.events   = EPOLLIN | EPOLLET;
641  
    ev.data.ptr = nullptr;
641  
    ev.data.ptr = nullptr;
642  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
642  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
643  
    {
643  
    {
644  
        int errn = errno;
644  
        int errn = errno;
645  
        ::close(timer_fd_);
645  
        ::close(timer_fd_);
646  
        ::close(event_fd_);
646  
        ::close(event_fd_);
647  
        ::close(epoll_fd_);
647  
        ::close(epoll_fd_);
648  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
648  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
649  
    }
649  
    }
650  

650  

651  
    epoll_event timer_ev{};
651  
    epoll_event timer_ev{};
652  
    timer_ev.events   = EPOLLIN | EPOLLERR;
652  
    timer_ev.events   = EPOLLIN | EPOLLERR;
653  
    timer_ev.data.ptr = &timer_fd_;
653  
    timer_ev.data.ptr = &timer_fd_;
654  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
654  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
655  
    {
655  
    {
656  
        int errn = errno;
656  
        int errn = errno;
657  
        ::close(timer_fd_);
657  
        ::close(timer_fd_);
658  
        ::close(event_fd_);
658  
        ::close(event_fd_);
659  
        ::close(epoll_fd_);
659  
        ::close(epoll_fd_);
660  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
660  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
661  
    }
661  
    }
662  

662  

663  
    timer_svc_ = &get_timer_service(ctx, *this);
663  
    timer_svc_ = &get_timer_service(ctx, *this);
664  
    timer_svc_->set_on_earliest_changed(
664  
    timer_svc_->set_on_earliest_changed(
665  
        timer_service::callback(this, [](void* p) {
665  
        timer_service::callback(this, [](void* p) {
666  
            auto* self = static_cast<epoll_scheduler*>(p);
666  
            auto* self = static_cast<epoll_scheduler*>(p);
667  
            self->timerfd_stale_.store(true, std::memory_order_release);
667  
            self->timerfd_stale_.store(true, std::memory_order_release);
668  
            if (self->task_running_.load(std::memory_order_acquire))
668  
            if (self->task_running_.load(std::memory_order_acquire))
669  
                self->interrupt_reactor();
669  
                self->interrupt_reactor();
670  
        }));
670  
        }));
671  

671  

672  
    // Initialize resolver service
672  
    // Initialize resolver service
673  
    get_resolver_service(ctx, *this);
673  
    get_resolver_service(ctx, *this);
674  

674  

675  
    // Initialize signal service
675  
    // Initialize signal service
676  
    get_signal_service(ctx, *this);
676  
    get_signal_service(ctx, *this);
677  

677  

678  
    // Push task sentinel to interleave reactor runs with handler execution
678  
    // Push task sentinel to interleave reactor runs with handler execution
679  
    completed_ops_.push(&task_op_);
679  
    completed_ops_.push(&task_op_);
680  
}
680  
}
681  

681  

682  
inline epoll_scheduler::~epoll_scheduler()
682  
inline epoll_scheduler::~epoll_scheduler()
683  
{
683  
{
684  
    if (timer_fd_ >= 0)
684  
    if (timer_fd_ >= 0)
685  
        ::close(timer_fd_);
685  
        ::close(timer_fd_);
686  
    if (event_fd_ >= 0)
686  
    if (event_fd_ >= 0)
687  
        ::close(event_fd_);
687  
        ::close(event_fd_);
688  
    if (epoll_fd_ >= 0)
688  
    if (epoll_fd_ >= 0)
689  
        ::close(epoll_fd_);
689  
        ::close(epoll_fd_);
690  
}
690  
}
691  

691  

692  
inline void
692  
inline void
693  
epoll_scheduler::shutdown()
693  
epoll_scheduler::shutdown()
694  
{
694  
{
695  
    {
695  
    {
696  
        std::unique_lock lock(mutex_);
696  
        std::unique_lock lock(mutex_);
697  

697  

698  
        while (auto* h = completed_ops_.pop())
698  
        while (auto* h = completed_ops_.pop())
699  
        {
699  
        {
700  
            if (h == &task_op_)
700  
            if (h == &task_op_)
701  
                continue;
701  
                continue;
702  
            lock.unlock();
702  
            lock.unlock();
703  
            h->destroy();
703  
            h->destroy();
704  
            lock.lock();
704  
            lock.lock();
705  
        }
705  
        }
706  

706  

707  
        signal_all(lock);
707  
        signal_all(lock);
708  
    }
708  
    }
709  

709  

710  
    if (event_fd_ >= 0)
710  
    if (event_fd_ >= 0)
711  
        interrupt_reactor();
711  
        interrupt_reactor();
712  
}
712  
}
713  

713  

714  
inline void
714  
inline void
715  
epoll_scheduler::post(std::coroutine_handle<> h) const
715  
epoll_scheduler::post(std::coroutine_handle<> h) const
716  
{
716  
{
717  
    struct post_handler final : scheduler_op
717  
    struct post_handler final : scheduler_op
718  
    {
718  
    {
719  
        std::coroutine_handle<> h_;
719  
        std::coroutine_handle<> h_;
720  

720  

721  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
721  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
722  

722  

723  
        ~post_handler() override = default;
723  
        ~post_handler() override = default;
724  

724  

725  
        void operator()() override
725  
        void operator()() override
726  
        {
726  
        {
727  
            auto h = h_;
727  
            auto h = h_;
728  
            delete this;
728  
            delete this;
729  
            h.resume();
729  
            h.resume();
730  
        }
730  
        }
731  

731  

732  
        void destroy() override
732  
        void destroy() override
733  
        {
733  
        {
734  
            auto h = h_;
734  
            auto h = h_;
735  
            delete this;
735  
            delete this;
736  
            h.destroy();
736  
            h.destroy();
737  
        }
737  
        }
738  
    };
738  
    };
739  

739  

740  
    auto ph = std::make_unique<post_handler>(h);
740  
    auto ph = std::make_unique<post_handler>(h);
741  

741  

742  
    // Fast path: same thread posts to private queue
742  
    // Fast path: same thread posts to private queue
743  
    // Only count locally; work_cleanup batches to global counter
743  
    // Only count locally; work_cleanup batches to global counter
744  
    if (auto* ctx = epoll::find_context(this))
744  
    if (auto* ctx = epoll::find_context(this))
745  
    {
745  
    {
746  
        ++ctx->private_outstanding_work;
746  
        ++ctx->private_outstanding_work;
747  
        ctx->private_queue.push(ph.release());
747  
        ctx->private_queue.push(ph.release());
748  
        return;
748  
        return;
749  
    }
749  
    }
750  

750  

751  
    // Slow path: cross-thread post requires mutex
751  
    // Slow path: cross-thread post requires mutex
752  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
752  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
753  

753  

754  
    std::unique_lock lock(mutex_);
754  
    std::unique_lock lock(mutex_);
755  
    completed_ops_.push(ph.release());
755  
    completed_ops_.push(ph.release());
756  
    wake_one_thread_and_unlock(lock);
756  
    wake_one_thread_and_unlock(lock);
757  
}
757  
}
758  

758  

759  
inline void
759  
inline void
760  
epoll_scheduler::post(scheduler_op* h) const
760  
epoll_scheduler::post(scheduler_op* h) const
761  
{
761  
{
762  
    // Fast path: same thread posts to private queue
762  
    // Fast path: same thread posts to private queue
763  
    // Only count locally; work_cleanup batches to global counter
763  
    // Only count locally; work_cleanup batches to global counter
764  
    if (auto* ctx = epoll::find_context(this))
764  
    if (auto* ctx = epoll::find_context(this))
765  
    {
765  
    {
766  
        ++ctx->private_outstanding_work;
766  
        ++ctx->private_outstanding_work;
767  
        ctx->private_queue.push(h);
767  
        ctx->private_queue.push(h);
768  
        return;
768  
        return;
769  
    }
769  
    }
770  

770  

771  
    // Slow path: cross-thread post requires mutex
771  
    // Slow path: cross-thread post requires mutex
772  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
772  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
773  

773  

774  
    std::unique_lock lock(mutex_);
774  
    std::unique_lock lock(mutex_);
775  
    completed_ops_.push(h);
775  
    completed_ops_.push(h);
776  
    wake_one_thread_and_unlock(lock);
776  
    wake_one_thread_and_unlock(lock);
777  
}
777  
}
778  

778  

779  
inline bool
779  
inline bool
780  
epoll_scheduler::running_in_this_thread() const noexcept
780  
epoll_scheduler::running_in_this_thread() const noexcept
781  
{
781  
{
782  
    for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
782  
    for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
783  
        if (c->key == this)
783  
        if (c->key == this)
784  
            return true;
784  
            return true;
785  
    return false;
785  
    return false;
786  
}
786  
}
787  

787  

788  
inline void
788  
inline void
789  
epoll_scheduler::stop()
789  
epoll_scheduler::stop()
790  
{
790  
{
791  
    std::unique_lock lock(mutex_);
791  
    std::unique_lock lock(mutex_);
792  
    if (!stopped_)
792  
    if (!stopped_)
793  
    {
793  
    {
794  
        stopped_ = true;
794  
        stopped_ = true;
795  
        signal_all(lock);
795  
        signal_all(lock);
796  
        interrupt_reactor();
796  
        interrupt_reactor();
797  
    }
797  
    }
798  
}
798  
}
799  

799  

800  
inline bool
800  
inline bool
801  
epoll_scheduler::stopped() const noexcept
801  
epoll_scheduler::stopped() const noexcept
802  
{
802  
{
803  
    std::unique_lock lock(mutex_);
803  
    std::unique_lock lock(mutex_);
804  
    return stopped_;
804  
    return stopped_;
805  
}
805  
}
806  

806  

807  
inline void
807  
inline void
808  
epoll_scheduler::restart()
808  
epoll_scheduler::restart()
809  
{
809  
{
810  
    std::unique_lock lock(mutex_);
810  
    std::unique_lock lock(mutex_);
811  
    stopped_ = false;
811  
    stopped_ = false;
812  
}
812  
}
813  

813  

814  
inline std::size_t
814  
inline std::size_t
815  
epoll_scheduler::run()
815  
epoll_scheduler::run()
816  
{
816  
{
817  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
817  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
818  
    {
818  
    {
819  
        stop();
819  
        stop();
820  
        return 0;
820  
        return 0;
821  
    }
821  
    }
822  

822  

823  
    epoll::thread_context_guard ctx(this);
823  
    epoll::thread_context_guard ctx(this);
824  
    std::unique_lock lock(mutex_);
824  
    std::unique_lock lock(mutex_);
825  

825  

826  
    std::size_t n = 0;
826  
    std::size_t n = 0;
827  
    for (;;)
827  
    for (;;)
828  
    {
828  
    {
829  
        if (!do_one(lock, -1, &ctx.frame_))
829  
        if (!do_one(lock, -1, &ctx.frame_))
830  
            break;
830  
            break;
831  
        if (n != (std::numeric_limits<std::size_t>::max)())
831  
        if (n != (std::numeric_limits<std::size_t>::max)())
832  
            ++n;
832  
            ++n;
833  
        if (!lock.owns_lock())
833  
        if (!lock.owns_lock())
834  
            lock.lock();
834  
            lock.lock();
835  
    }
835  
    }
836  
    return n;
836  
    return n;
837  
}
837  
}
838  

838  

839  
inline std::size_t
839  
inline std::size_t
840  
epoll_scheduler::run_one()
840  
epoll_scheduler::run_one()
841  
{
841  
{
842  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
842  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
843  
    {
843  
    {
844  
        stop();
844  
        stop();
845  
        return 0;
845  
        return 0;
846  
    }
846  
    }
847  

847  

848  
    epoll::thread_context_guard ctx(this);
848  
    epoll::thread_context_guard ctx(this);
849  
    std::unique_lock lock(mutex_);
849  
    std::unique_lock lock(mutex_);
850  
    return do_one(lock, -1, &ctx.frame_);
850  
    return do_one(lock, -1, &ctx.frame_);
851  
}
851  
}
852  

852  

853  
inline std::size_t
853  
inline std::size_t
854  
epoll_scheduler::wait_one(long usec)
854  
epoll_scheduler::wait_one(long usec)
855  
{
855  
{
856  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
856  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
857  
    {
857  
    {
858  
        stop();
858  
        stop();
859  
        return 0;
859  
        return 0;
860  
    }
860  
    }
861  

861  

862  
    epoll::thread_context_guard ctx(this);
862  
    epoll::thread_context_guard ctx(this);
863  
    std::unique_lock lock(mutex_);
863  
    std::unique_lock lock(mutex_);
864  
    return do_one(lock, usec, &ctx.frame_);
864  
    return do_one(lock, usec, &ctx.frame_);
865  
}
865  
}
866  

866  

867  
inline std::size_t
867  
inline std::size_t
868  
epoll_scheduler::poll()
868  
epoll_scheduler::poll()
869  
{
869  
{
870  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
870  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
871  
    {
871  
    {
872  
        stop();
872  
        stop();
873  
        return 0;
873  
        return 0;
874  
    }
874  
    }
875  

875  

876  
    epoll::thread_context_guard ctx(this);
876  
    epoll::thread_context_guard ctx(this);
877  
    std::unique_lock lock(mutex_);
877  
    std::unique_lock lock(mutex_);
878  

878  

879  
    std::size_t n = 0;
879  
    std::size_t n = 0;
880  
    for (;;)
880  
    for (;;)
881  
    {
881  
    {
882  
        if (!do_one(lock, 0, &ctx.frame_))
882  
        if (!do_one(lock, 0, &ctx.frame_))
883  
            break;
883  
            break;
884  
        if (n != (std::numeric_limits<std::size_t>::max)())
884  
        if (n != (std::numeric_limits<std::size_t>::max)())
885  
            ++n;
885  
            ++n;
886  
        if (!lock.owns_lock())
886  
        if (!lock.owns_lock())
887  
            lock.lock();
887  
            lock.lock();
888  
    }
888  
    }
889  
    return n;
889  
    return n;
890  
}
890  
}
891  

891  

892  
inline std::size_t
892  
inline std::size_t
893  
epoll_scheduler::poll_one()
893  
epoll_scheduler::poll_one()
894  
{
894  
{
895  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
895  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
896  
    {
896  
    {
897  
        stop();
897  
        stop();
898  
        return 0;
898  
        return 0;
899  
    }
899  
    }
900  

900  

901  
    epoll::thread_context_guard ctx(this);
901  
    epoll::thread_context_guard ctx(this);
902  
    std::unique_lock lock(mutex_);
902  
    std::unique_lock lock(mutex_);
903  
    return do_one(lock, 0, &ctx.frame_);
903  
    return do_one(lock, 0, &ctx.frame_);
904  
}
904  
}
905  

905  

906  
inline void
906  
inline void
907  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
907  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
908  
{
908  
{
909  
    epoll_event ev{};
909  
    epoll_event ev{};
910  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
910  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
911  
    ev.data.ptr = desc;
911  
    ev.data.ptr = desc;
912  

912  

913  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
913  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
914  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
914  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
915  

915  

916  
    desc->registered_events = ev.events;
916  
    desc->registered_events = ev.events;
917  
    desc->fd                = fd;
917  
    desc->fd                = fd;
918  
    desc->scheduler_        = this;
918  
    desc->scheduler_        = this;
919  

919  

920  
    std::lock_guard lock(desc->mutex);
920  
    std::lock_guard lock(desc->mutex);
921  
    desc->read_ready  = false;
921  
    desc->read_ready  = false;
922  
    desc->write_ready = false;
922  
    desc->write_ready = false;
923  
}
923  
}
924  

924  

925  
inline void
925  
inline void
926  
epoll_scheduler::deregister_descriptor(int fd) const
926  
epoll_scheduler::deregister_descriptor(int fd) const
927  
{
927  
{
928  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
928  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
929  
}
929  
}
930  

930  

931  
inline void
931  
inline void
932  
epoll_scheduler::work_started() noexcept
932  
epoll_scheduler::work_started() noexcept
933  
{
933  
{
934  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
934  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
935  
}
935  
}
936  

936  

937  
inline void
937  
inline void
938  
epoll_scheduler::work_finished() noexcept
938  
epoll_scheduler::work_finished() noexcept
939  
{
939  
{
940  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
940  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
941  
        stop();
941  
        stop();
942  
}
942  
}
943  

943  

944  
inline void
944  
inline void
945  
epoll_scheduler::compensating_work_started() const noexcept
945  
epoll_scheduler::compensating_work_started() const noexcept
946  
{
946  
{
947  
    auto* ctx = epoll::find_context(this);
947  
    auto* ctx = epoll::find_context(this);
948  
    if (ctx)
948  
    if (ctx)
949  
        ++ctx->private_outstanding_work;
949  
        ++ctx->private_outstanding_work;
950  
}
950  
}
951  

951  

952  
inline void
952  
inline void
953  
epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
953  
epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
954  
{
954  
{
955  
    // Note: outstanding_work_ was already incremented when posting
955  
    // Note: outstanding_work_ was already incremented when posting
956  
    std::unique_lock lock(mutex_);
956  
    std::unique_lock lock(mutex_);
957  
    completed_ops_.splice(queue);
957  
    completed_ops_.splice(queue);
958  
    if (count > 0)
958  
    if (count > 0)
959  
        maybe_unlock_and_signal_one(lock);
959  
        maybe_unlock_and_signal_one(lock);
960  
}
960  
}
961  

961  

962  
inline void
962  
inline void
963  
epoll_scheduler::post_deferred_completions(op_queue& ops) const
963  
epoll_scheduler::post_deferred_completions(op_queue& ops) const
964  
{
964  
{
965  
    if (ops.empty())
965  
    if (ops.empty())
966  
        return;
966  
        return;
967  

967  

968  
    // Fast path: if on scheduler thread, use private queue
968  
    // Fast path: if on scheduler thread, use private queue
969  
    if (auto* ctx = epoll::find_context(this))
969  
    if (auto* ctx = epoll::find_context(this))
970  
    {
970  
    {
971  
        ctx->private_queue.splice(ops);
971  
        ctx->private_queue.splice(ops);
972  
        return;
972  
        return;
973  
    }
973  
    }
974  

974  

975  
    // Slow path: add to global queue and wake a thread
975  
    // Slow path: add to global queue and wake a thread
976  
    std::unique_lock lock(mutex_);
976  
    std::unique_lock lock(mutex_);
977  
    completed_ops_.splice(ops);
977  
    completed_ops_.splice(ops);
978  
    wake_one_thread_and_unlock(lock);
978  
    wake_one_thread_and_unlock(lock);
979  
}
979  
}
980  

980  

981  
inline void
981  
inline void
982  
epoll_scheduler::interrupt_reactor() const
982  
epoll_scheduler::interrupt_reactor() const
983  
{
983  
{
984  
    // Only write if not already armed to avoid redundant writes
984  
    // Only write if not already armed to avoid redundant writes
985  
    bool expected = false;
985  
    bool expected = false;
986  
    if (eventfd_armed_.compare_exchange_strong(
986  
    if (eventfd_armed_.compare_exchange_strong(
987  
            expected, true, std::memory_order_release,
987  
            expected, true, std::memory_order_release,
988  
            std::memory_order_relaxed))
988  
            std::memory_order_relaxed))
989  
    {
989  
    {
990  
        std::uint64_t val       = 1;
990  
        std::uint64_t val       = 1;
991  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
991  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
992  
    }
992  
    }
993  
}
993  
}
994  

994  

995  
inline void
995  
inline void
996  
epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
996  
epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
997  
{
997  
{
998  
    state_ |= 1;
998  
    state_ |= 1;
999  
    cond_.notify_all();
999  
    cond_.notify_all();
1000  
}
1000  
}
1001  

1001  

1002  
inline bool
1002  
inline bool
1003  
epoll_scheduler::maybe_unlock_and_signal_one(
1003  
epoll_scheduler::maybe_unlock_and_signal_one(
1004  
    std::unique_lock<std::mutex>& lock) const
1004  
    std::unique_lock<std::mutex>& lock) const
1005  
{
1005  
{
1006  
    state_ |= 1;
1006  
    state_ |= 1;
1007  
    if (state_ > 1)
1007  
    if (state_ > 1)
1008  
    {
1008  
    {
1009  
        lock.unlock();
1009  
        lock.unlock();
1010  
        cond_.notify_one();
1010  
        cond_.notify_one();
1011  
        return true;
1011  
        return true;
1012  
    }
1012  
    }
1013  
    return false;
1013  
    return false;
1014  
}
1014  
}
1015  

1015  

1016  
inline bool
1016  
inline bool
1017  
epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
1017  
epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
1018  
{
1018  
{
1019  
    state_ |= 1;
1019  
    state_ |= 1;
1020  
    bool have_waiters = state_ > 1;
1020  
    bool have_waiters = state_ > 1;
1021  
    lock.unlock();
1021  
    lock.unlock();
1022  
    if (have_waiters)
1022  
    if (have_waiters)
1023  
        cond_.notify_one();
1023  
        cond_.notify_one();
1024  
    return have_waiters;
1024  
    return have_waiters;
1025  
}
1025  
}
1026  

1026  

1027  
inline void
1027  
inline void
1028  
epoll_scheduler::clear_signal() const
1028  
epoll_scheduler::clear_signal() const
1029  
{
1029  
{
1030  
    state_ &= ~std::size_t(1);
1030  
    state_ &= ~std::size_t(1);
1031  
}
1031  
}
1032  

1032  

1033  
inline void
1033  
inline void
1034  
epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
1034  
epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
1035  
{
1035  
{
1036  
    while ((state_ & 1) == 0)
1036  
    while ((state_ & 1) == 0)
1037  
    {
1037  
    {
1038  
        state_ += 2;
1038  
        state_ += 2;
1039  
        cond_.wait(lock);
1039  
        cond_.wait(lock);
1040  
        state_ -= 2;
1040  
        state_ -= 2;
1041  
    }
1041  
    }
1042  
}
1042  
}
1043  

1043  

1044  
inline void
1044  
inline void
1045  
epoll_scheduler::wait_for_signal_for(
1045  
epoll_scheduler::wait_for_signal_for(
1046  
    std::unique_lock<std::mutex>& lock, long timeout_us) const
1046  
    std::unique_lock<std::mutex>& lock, long timeout_us) const
1047  
{
1047  
{
1048  
    if ((state_ & 1) == 0)
1048  
    if ((state_ & 1) == 0)
1049  
    {
1049  
    {
1050  
        state_ += 2;
1050  
        state_ += 2;
1051  
        cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
1051  
        cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
1052  
        state_ -= 2;
1052  
        state_ -= 2;
1053  
    }
1053  
    }
1054  
}
1054  
}
1055  

1055  

1056  
inline void
1056  
inline void
1057  
epoll_scheduler::wake_one_thread_and_unlock(
1057  
epoll_scheduler::wake_one_thread_and_unlock(
1058  
    std::unique_lock<std::mutex>& lock) const
1058  
    std::unique_lock<std::mutex>& lock) const
1059  
{
1059  
{
1060  
    if (maybe_unlock_and_signal_one(lock))
1060  
    if (maybe_unlock_and_signal_one(lock))
1061  
        return;
1061  
        return;
1062  

1062  

1063  
    if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
1063  
    if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
1064  
    {
1064  
    {
1065  
        task_interrupted_ = true;
1065  
        task_interrupted_ = true;
1066  
        lock.unlock();
1066  
        lock.unlock();
1067  
        interrupt_reactor();
1067  
        interrupt_reactor();
1068  
    }
1068  
    }
1069  
    else
1069  
    else
1070  
    {
1070  
    {
1071  
        lock.unlock();
1071  
        lock.unlock();
1072  
    }
1072  
    }
1073  
}
1073  
}
1074  

1074  

1075  
inline epoll_scheduler::work_cleanup::~work_cleanup()
1075  
inline epoll_scheduler::work_cleanup::~work_cleanup()
1076  
{
1076  
{
1077  
    if (ctx)
1077  
    if (ctx)
1078  
    {
1078  
    {
1079  
        long produced = ctx->private_outstanding_work;
1079  
        long produced = ctx->private_outstanding_work;
1080  
        if (produced > 1)
1080  
        if (produced > 1)
1081  
            scheduler->outstanding_work_.fetch_add(
1081  
            scheduler->outstanding_work_.fetch_add(
1082  
                produced - 1, std::memory_order_relaxed);
1082  
                produced - 1, std::memory_order_relaxed);
1083  
        else if (produced < 1)
1083  
        else if (produced < 1)
1084  
            scheduler->work_finished();
1084  
            scheduler->work_finished();
1085  
        ctx->private_outstanding_work = 0;
1085  
        ctx->private_outstanding_work = 0;
1086  

1086  

1087  
        if (!ctx->private_queue.empty())
1087  
        if (!ctx->private_queue.empty())
1088  
        {
1088  
        {
1089  
            lock->lock();
1089  
            lock->lock();
1090  
            scheduler->completed_ops_.splice(ctx->private_queue);
1090  
            scheduler->completed_ops_.splice(ctx->private_queue);
1091  
        }
1091  
        }
1092  
    }
1092  
    }
1093  
    else
1093  
    else
1094  
    {
1094  
    {
1095  
        scheduler->work_finished();
1095  
        scheduler->work_finished();
1096  
    }
1096  
    }
1097  
}
1097  
}
1098  

1098  

1099  
inline epoll_scheduler::task_cleanup::~task_cleanup()
1099  
inline epoll_scheduler::task_cleanup::~task_cleanup()
1100  
{
1100  
{
1101  
    if (!ctx)
1101  
    if (!ctx)
1102  
        return;
1102  
        return;
1103  

1103  

1104  
    if (ctx->private_outstanding_work > 0)
1104  
    if (ctx->private_outstanding_work > 0)
1105  
    {
1105  
    {
1106  
        scheduler->outstanding_work_.fetch_add(
1106  
        scheduler->outstanding_work_.fetch_add(
1107  
            ctx->private_outstanding_work, std::memory_order_relaxed);
1107  
            ctx->private_outstanding_work, std::memory_order_relaxed);
1108  
        ctx->private_outstanding_work = 0;
1108  
        ctx->private_outstanding_work = 0;
1109  
    }
1109  
    }
1110  

1110  

1111  
    if (!ctx->private_queue.empty())
1111  
    if (!ctx->private_queue.empty())
1112  
    {
1112  
    {
1113  
        if (!lock->owns_lock())
1113  
        if (!lock->owns_lock())
1114  
            lock->lock();
1114  
            lock->lock();
1115  
        scheduler->completed_ops_.splice(ctx->private_queue);
1115  
        scheduler->completed_ops_.splice(ctx->private_queue);
1116  
    }
1116  
    }
1117  
}
1117  
}
1118  

1118  

1119  
inline void
1119  
inline void
1120  
epoll_scheduler::update_timerfd() const
1120  
epoll_scheduler::update_timerfd() const
1121  
{
1121  
{
1122  
    auto nearest = timer_svc_->nearest_expiry();
1122  
    auto nearest = timer_svc_->nearest_expiry();
1123  

1123  

1124  
    itimerspec ts{};
1124  
    itimerspec ts{};
1125  
    int flags = 0;
1125  
    int flags = 0;
1126  

1126  

1127  
    if (nearest == timer_service::time_point::max())
1127  
    if (nearest == timer_service::time_point::max())
1128  
    {
1128  
    {
1129  
        // No timers - disarm by setting to 0 (relative)
1129  
        // No timers - disarm by setting to 0 (relative)
1130  
    }
1130  
    }
1131  
    else
1131  
    else
1132  
    {
1132  
    {
1133  
        auto now = std::chrono::steady_clock::now();
1133  
        auto now = std::chrono::steady_clock::now();
1134  
        if (nearest <= now)
1134  
        if (nearest <= now)
1135  
        {
1135  
        {
1136  
            // Use 1ns instead of 0 - zero disarms the timerfd
1136  
            // Use 1ns instead of 0 - zero disarms the timerfd
1137  
            ts.it_value.tv_nsec = 1;
1137  
            ts.it_value.tv_nsec = 1;
1138  
        }
1138  
        }
1139  
        else
1139  
        else
1140  
        {
1140  
        {
1141  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
1141  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
1142  
                            nearest - now)
1142  
                            nearest - now)
1143  
                            .count();
1143  
                            .count();
1144  
            ts.it_value.tv_sec  = nsec / 1000000000;
1144  
            ts.it_value.tv_sec  = nsec / 1000000000;
1145  
            ts.it_value.tv_nsec = nsec % 1000000000;
1145  
            ts.it_value.tv_nsec = nsec % 1000000000;
1146  
            // Ensure non-zero to avoid disarming if duration rounds to 0
1146  
            // Ensure non-zero to avoid disarming if duration rounds to 0
1147  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
1147  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
1148  
                ts.it_value.tv_nsec = 1;
1148  
                ts.it_value.tv_nsec = 1;
1149  
        }
1149  
        }
1150  
    }
1150  
    }
1151  

1151  

1152  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
1152  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
1153  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
1153  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
1154  
}
1154  
}
1155  

1155  

1156  
inline void
1156  
inline void
1157  
epoll_scheduler::run_task(
1157  
epoll_scheduler::run_task(
1158  
    std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
1158  
    std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
1159  
{
1159  
{
1160  
    int timeout_ms = task_interrupted_ ? 0 : -1;
1160  
    int timeout_ms = task_interrupted_ ? 0 : -1;
1161  

1161  

1162  
    if (lock.owns_lock())
1162  
    if (lock.owns_lock())
1163  
        lock.unlock();
1163  
        lock.unlock();
1164  

1164  

1165  
    task_cleanup on_exit{this, &lock, ctx};
1165  
    task_cleanup on_exit{this, &lock, ctx};
1166  

1166  

1167  
    // Flush deferred timerfd programming before blocking
1167  
    // Flush deferred timerfd programming before blocking
1168  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
1168  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
1169  
        update_timerfd();
1169  
        update_timerfd();
1170  

1170  

1171  
    // Event loop runs without mutex held
1171  
    // Event loop runs without mutex held
1172  
    epoll_event events[128];
1172  
    epoll_event events[128];
1173  
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
1173  
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
1174  

1174  

1175  
    if (nfds < 0 && errno != EINTR)
1175  
    if (nfds < 0 && errno != EINTR)
1176  
        detail::throw_system_error(make_err(errno), "epoll_wait");
1176  
        detail::throw_system_error(make_err(errno), "epoll_wait");
1177  

1177  

1178  
    bool check_timers = false;
1178  
    bool check_timers = false;
1179  
    op_queue local_ops;
1179  
    op_queue local_ops;
1180  

1180  

1181  
    // Process events without holding the mutex
1181  
    // Process events without holding the mutex
1182  
    for (int i = 0; i < nfds; ++i)
1182  
    for (int i = 0; i < nfds; ++i)
1183  
    {
1183  
    {
1184  
        if (events[i].data.ptr == nullptr)
1184  
        if (events[i].data.ptr == nullptr)
1185  
        {
1185  
        {
1186  
            std::uint64_t val;
1186  
            std::uint64_t val;
1187  
            // Mutex released above; analyzer can't track unlock via ref
1187  
            // Mutex released above; analyzer can't track unlock via ref
1188  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1188  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1189  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
1189  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
1190  
            eventfd_armed_.store(false, std::memory_order_relaxed);
1190  
            eventfd_armed_.store(false, std::memory_order_relaxed);
1191  
            continue;
1191  
            continue;
1192  
        }
1192  
        }
1193  

1193  

1194  
        if (events[i].data.ptr == &timer_fd_)
1194  
        if (events[i].data.ptr == &timer_fd_)
1195  
        {
1195  
        {
1196  
            std::uint64_t expirations;
1196  
            std::uint64_t expirations;
1197  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1197  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1198  
            [[maybe_unused]] auto r =
1198  
            [[maybe_unused]] auto r =
1199  
                ::read(timer_fd_, &expirations, sizeof(expirations));
1199  
                ::read(timer_fd_, &expirations, sizeof(expirations));
1200  
            check_timers = true;
1200  
            check_timers = true;
1201  
            continue;
1201  
            continue;
1202  
        }
1202  
        }
1203  

1203  

1204  
        // Deferred I/O: just set ready events and enqueue descriptor
1204  
        // Deferred I/O: just set ready events and enqueue descriptor
1205  
        // No per-descriptor mutex locking in reactor hot path!
1205  
        // No per-descriptor mutex locking in reactor hot path!
1206  
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1206  
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1207  
        desc->add_ready_events(events[i].events);
1207  
        desc->add_ready_events(events[i].events);
1208  

1208  

1209  
        // Only enqueue if not already enqueued
1209  
        // Only enqueue if not already enqueued
1210  
        bool expected = false;
1210  
        bool expected = false;
1211  
        if (desc->is_enqueued_.compare_exchange_strong(
1211  
        if (desc->is_enqueued_.compare_exchange_strong(
1212  
                expected, true, std::memory_order_release,
1212  
                expected, true, std::memory_order_release,
1213  
                std::memory_order_relaxed))
1213  
                std::memory_order_relaxed))
1214  
        {
1214  
        {
1215  
            local_ops.push(desc);
1215  
            local_ops.push(desc);
1216  
        }
1216  
        }
1217  
    }
1217  
    }
1218  

1218  

1219  
    // Process timers only when timerfd fires
1219  
    // Process timers only when timerfd fires
1220  
    if (check_timers)
1220  
    if (check_timers)
1221  
    {
1221  
    {
1222  
        timer_svc_->process_expired();
1222  
        timer_svc_->process_expired();
1223  
        update_timerfd();
1223  
        update_timerfd();
1224  
    }
1224  
    }
1225  

1225  

1226  
    lock.lock();
1226  
    lock.lock();
1227  

1227  

1228  
    if (!local_ops.empty())
1228  
    if (!local_ops.empty())
1229  
        completed_ops_.splice(local_ops);
1229  
        completed_ops_.splice(local_ops);
1230  
}
1230  
}
1231  

1231  

1232  
inline std::size_t
1232  
inline std::size_t
1233  
epoll_scheduler::do_one(
1233  
epoll_scheduler::do_one(
1234  
    std::unique_lock<std::mutex>& lock,
1234  
    std::unique_lock<std::mutex>& lock,
1235  
    long timeout_us,
1235  
    long timeout_us,
1236  
    epoll::scheduler_context* ctx)
1236  
    epoll::scheduler_context* ctx)
1237  
{
1237  
{
1238  
    for (;;)
1238  
    for (;;)
1239  
    {
1239  
    {
1240  
        if (stopped_)
1240  
        if (stopped_)
1241  
            return 0;
1241  
            return 0;
1242  

1242  

1243  
        scheduler_op* op = completed_ops_.pop();
1243  
        scheduler_op* op = completed_ops_.pop();
1244  

1244  

1245  
        // Handle reactor sentinel - time to poll for I/O
1245  
        // Handle reactor sentinel - time to poll for I/O
1246  
        if (op == &task_op_)
1246  
        if (op == &task_op_)
1247  
        {
1247  
        {
1248  
            bool more_handlers = !completed_ops_.empty();
1248  
            bool more_handlers = !completed_ops_.empty();
1249  

1249  

1250  
            // Nothing to run the reactor for: no pending work to wait on,
1250  
            // Nothing to run the reactor for: no pending work to wait on,
1251  
            // or caller requested a non-blocking poll
1251  
            // or caller requested a non-blocking poll
1252  
            if (!more_handlers &&
1252  
            if (!more_handlers &&
1253  
                (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1253  
                (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1254  
                 timeout_us == 0))
1254  
                 timeout_us == 0))
1255  
            {
1255  
            {
1256  
                completed_ops_.push(&task_op_);
1256  
                completed_ops_.push(&task_op_);
1257  
                return 0;
1257  
                return 0;
1258  
            }
1258  
            }
1259  

1259  

1260  
            task_interrupted_ = more_handlers || timeout_us == 0;
1260  
            task_interrupted_ = more_handlers || timeout_us == 0;
1261  
            task_running_.store(true, std::memory_order_release);
1261  
            task_running_.store(true, std::memory_order_release);
1262  

1262  

1263  
            if (more_handlers)
1263  
            if (more_handlers)
1264  
                unlock_and_signal_one(lock);
1264  
                unlock_and_signal_one(lock);
1265  

1265  

1266  
            run_task(lock, ctx);
1266  
            run_task(lock, ctx);
1267  

1267  

1268  
            task_running_.store(false, std::memory_order_relaxed);
1268  
            task_running_.store(false, std::memory_order_relaxed);
1269  
            completed_ops_.push(&task_op_);
1269  
            completed_ops_.push(&task_op_);
1270  
            continue;
1270  
            continue;
1271  
        }
1271  
        }
1272  

1272  

1273  
        // Handle operation
1273  
        // Handle operation
1274  
        if (op != nullptr)
1274  
        if (op != nullptr)
1275  
        {
1275  
        {
1276  
            bool more = !completed_ops_.empty();
1276  
            bool more = !completed_ops_.empty();
1277  

1277  

1278  
            if (more)
1278  
            if (more)
1279  
                ctx->unassisted = !unlock_and_signal_one(lock);
1279  
                ctx->unassisted = !unlock_and_signal_one(lock);
1280  
            else
1280  
            else
1281  
            {
1281  
            {
1282  
                ctx->unassisted = false;
1282  
                ctx->unassisted = false;
1283  
                lock.unlock();
1283  
                lock.unlock();
1284  
            }
1284  
            }
1285  

1285  

1286  
            work_cleanup on_exit{this, &lock, ctx};
1286  
            work_cleanup on_exit{this, &lock, ctx};
1287  

1287  

1288  
            (*op)();
1288  
            (*op)();
1289  
            return 1;
1289  
            return 1;
1290  
        }
1290  
        }
1291  

1291  

1292  
        // No pending work to wait on, or caller requested non-blocking poll
1292  
        // No pending work to wait on, or caller requested non-blocking poll
1293  
        if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1293  
        if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1294  
            timeout_us == 0)
1294  
            timeout_us == 0)
1295  
            return 0;
1295  
            return 0;
1296  

1296  

1297  
        clear_signal();
1297  
        clear_signal();
1298  
        if (timeout_us < 0)
1298  
        if (timeout_us < 0)
1299  
            wait_for_signal(lock);
1299  
            wait_for_signal(lock);
1300  
        else
1300  
        else
1301  
            wait_for_signal_for(lock, timeout_us);
1301  
            wait_for_signal_for(lock, timeout_us);
1302  
    }
1302  
    }
1303  
}
1303  
}
1304  

1304  

1305  
} // namespace boost::corosio::detail
1305  
} // namespace boost::corosio::detail
1306  

1306  

1307  
#endif // BOOST_COROSIO_HAS_EPOLL
1307  
#endif // BOOST_COROSIO_HAS_EPOLL
1308  

1308  

1309  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
1309  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP