TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 :
20 : #include <boost/corosio/native/native_scheduler.hpp>
21 : #include <boost/corosio/detail/scheduler_op.hpp>
22 :
23 : #include <boost/corosio/native/detail/select/select_op.hpp>
24 : #include <boost/corosio/detail/timer_service.hpp>
25 : #include <boost/corosio/detail/make_err.hpp>
26 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28 :
29 : #include <boost/corosio/detail/except.hpp>
30 : #include <boost/corosio/detail/thread_local_ptr.hpp>
31 :
32 : #include <sys/select.h>
33 : #include <sys/socket.h>
34 : #include <unistd.h>
35 : #include <errno.h>
36 : #include <fcntl.h>
37 :
38 : #include <algorithm>
39 : #include <atomic>
40 : #include <chrono>
41 : #include <condition_variable>
42 : #include <cstddef>
43 : #include <limits>
44 : #include <mutex>
45 : #include <unordered_map>
46 :
47 : namespace boost::corosio::detail {
48 :
49 : struct select_op;
50 :
51 : /** POSIX scheduler using select() for I/O multiplexing.
52 :
53 : This scheduler implements the scheduler interface using the POSIX select()
54 : call for I/O event notification. It uses a single reactor model
55 : where one thread runs select() while other threads wait on a condition
56 : variable for handler work. This design provides:
57 :
58 : - Handler parallelism: N posted handlers can execute on N threads
59 : - No thundering herd: condition_variable wakes exactly one thread
60 : - Portability: Works on all POSIX systems
61 :
62 : The design mirrors epoll_scheduler for behavioral consistency:
63 : - Same single-reactor thread coordination model
64 : - Same work counting semantics
65 : - Same timer integration pattern
66 :
67 : Known Limitations:
68 : - FD_SETSIZE (~1024) limits maximum concurrent connections
69 : - O(n) scanning: rebuilds fd_sets each iteration
70 : - Level-triggered only (no edge-triggered mode)
71 :
72 : @par Thread Safety
73 : All public member functions are thread-safe.
74 : */
75 : class BOOST_COROSIO_DECL select_scheduler final
76 : : public native_scheduler
77 : , public capy::execution_context::service
78 : {
79 : public:
80 : using key_type = scheduler;
81 :
82 : /** Construct the scheduler.
83 :
84 : Creates a self-pipe for reactor interruption.
85 :
86 : @param ctx Reference to the owning execution_context.
87 : @param concurrency_hint Hint for expected thread count (unused).
88 : */
89 : select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
90 :
91 : ~select_scheduler() override;
92 :
93 : select_scheduler(select_scheduler const&) = delete;
94 : select_scheduler& operator=(select_scheduler const&) = delete;
95 :
96 : void shutdown() override;
97 : void post(std::coroutine_handle<> h) const override;
98 : void post(scheduler_op* h) const override;
99 : bool running_in_this_thread() const noexcept override;
100 : void stop() override;
101 : bool stopped() const noexcept override;
102 : void restart() override;
103 : std::size_t run() override;
104 : std::size_t run_one() override;
105 : std::size_t wait_one(long usec) override;
106 : std::size_t poll() override;
107 : std::size_t poll_one() override;
108 :
109 : /** Return the maximum file descriptor value supported.
110 :
111 : Returns FD_SETSIZE - 1, the maximum fd value that can be
112 : monitored by select(). Operations with fd >= FD_SETSIZE
113 : will fail with EINVAL.
114 :
115 : @return The maximum supported file descriptor value.
116 : */
117 : static constexpr int max_fd() noexcept
118 : {
119 : return FD_SETSIZE - 1;
120 : }
121 :
122 : /** Register a file descriptor for monitoring.
123 :
124 : @param fd The file descriptor to register.
125 : @param op The operation associated with this fd.
126 : @param events Event mask: 1 = read, 2 = write, 3 = both.
127 : */
128 : void register_fd(int fd, select_op* op, int events) const;
129 :
130 : /** Unregister a file descriptor from monitoring.
131 :
132 : @param fd The file descriptor to unregister.
133 : @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
134 : */
135 : void deregister_fd(int fd, int events) const;
136 :
137 : void work_started() noexcept override;
138 : void work_finished() noexcept override;
139 :
140 : // Event flags for register_fd/deregister_fd
141 : static constexpr int event_read = 1;
142 : static constexpr int event_write = 2;
143 :
144 : private:
145 : std::size_t do_one(long timeout_us);
146 : void run_reactor(std::unique_lock<std::mutex>& lock);
147 : void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
148 : void interrupt_reactor() const;
149 : long calculate_timeout(long requested_timeout_us) const;
150 :
151 : // Self-pipe for interrupting select()
152 : int pipe_fds_[2]; // [0]=read, [1]=write
153 :
154 : mutable std::mutex mutex_;
155 : mutable std::condition_variable wakeup_event_;
156 : mutable op_queue completed_ops_;
157 : mutable std::atomic<long> outstanding_work_;
158 : std::atomic<bool> stopped_;
159 :
160 : // Per-fd state for tracking registered operations
161 : struct fd_state
162 : {
163 : select_op* read_op = nullptr;
164 : select_op* write_op = nullptr;
165 : };
166 : mutable std::unordered_map<int, fd_state> registered_fds_;
167 : mutable int max_fd_ = -1;
168 :
169 : // Single reactor thread coordination
170 : mutable bool reactor_running_ = false;
171 : mutable bool reactor_interrupted_ = false;
172 : mutable int idle_thread_count_ = 0;
173 :
174 : // Sentinel operation for interleaving reactor runs with handler execution.
175 : // Ensures the reactor runs periodically even when handlers are continuously
176 : // posted, preventing timer starvation.
177 : struct task_op final : scheduler_op
178 : {
179 MIS 0 : void operator()() override {}
180 0 : void destroy() override {}
181 : };
182 : task_op task_op_;
183 : };
184 :
185 : /*
186 : select Scheduler - Single Reactor Model
187 : =======================================
188 :
189 : This scheduler mirrors the epoll_scheduler design but uses select() instead
190 : of epoll for I/O multiplexing. The thread coordination strategy is identical:
191 : one thread becomes the "reactor" while others wait on a condition variable.
192 :
193 : Thread Model
194 : ------------
195 : - ONE thread runs select() at a time (the reactor thread)
196 : - OTHER threads wait on wakeup_event_ (condition variable) for handlers
197 : - When work is posted, exactly one waiting thread wakes via notify_one()
198 :
199 : Key Differences from epoll
200 : --------------------------
201 : - Uses self-pipe instead of eventfd for interruption (more portable)
202 : - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
203 : - FD_SETSIZE limit (~1024 fds on most systems)
204 : - Level-triggered only (no edge-triggered mode)
205 :
206 : Self-Pipe Pattern
207 : -----------------
208 : To interrupt a blocking select() call (e.g., when work is posted or a timer
209 : expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
210 : always in the read_fds set, so select() returns immediately. We drain the
211 : pipe to clear the readable state.
212 :
213 : fd-to-op Mapping
214 : ----------------
215 : We use an unordered_map<int, fd_state> to track which operations are
216 : registered for each fd. This allows O(1) lookup when select() returns
217 : ready fds. Each fd can have at most one read op and one write op registered.
218 : */
219 :
220 : namespace select {
221 :
222 : struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
223 : {
224 : select_scheduler const* key;
225 : scheduler_context* next;
226 : };
227 :
228 : inline thread_local_ptr<scheduler_context> context_stack;
229 :
230 : struct thread_context_guard
231 : {
232 : scheduler_context frame_;
233 :
234 HIT 148 : explicit thread_context_guard(select_scheduler const* ctx) noexcept
235 148 : : frame_{ctx, context_stack.get()}
236 : {
237 148 : context_stack.set(&frame_);
238 148 : }
239 :
240 148 : ~thread_context_guard() noexcept
241 : {
242 148 : context_stack.set(frame_.next);
243 148 : }
244 : };
245 :
246 : struct work_guard
247 : {
248 : select_scheduler* self;
249 153997 : ~work_guard()
250 : {
251 153997 : self->work_finished();
252 153997 : }
253 : };
254 :
255 : } // namespace select
256 :
257 168 : inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
258 168 : : pipe_fds_{-1, -1}
259 168 : , outstanding_work_(0)
260 168 : , stopped_(false)
261 168 : , max_fd_(-1)
262 168 : , reactor_running_(false)
263 168 : , reactor_interrupted_(false)
264 336 : , idle_thread_count_(0)
265 : {
266 : // Create self-pipe for interrupting select()
267 168 : if (::pipe(pipe_fds_) < 0)
268 MIS 0 : detail::throw_system_error(make_err(errno), "pipe");
269 :
270 : // Set both ends to non-blocking and close-on-exec
271 HIT 504 : for (int i = 0; i < 2; ++i)
272 : {
273 336 : int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
274 336 : if (flags == -1)
275 : {
276 MIS 0 : int errn = errno;
277 0 : ::close(pipe_fds_[0]);
278 0 : ::close(pipe_fds_[1]);
279 0 : detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
280 : }
281 HIT 336 : if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
282 : {
283 MIS 0 : int errn = errno;
284 0 : ::close(pipe_fds_[0]);
285 0 : ::close(pipe_fds_[1]);
286 0 : detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
287 : }
288 HIT 336 : if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
289 : {
290 MIS 0 : int errn = errno;
291 0 : ::close(pipe_fds_[0]);
292 0 : ::close(pipe_fds_[1]);
293 0 : detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
294 : }
295 : }
296 :
297 HIT 168 : timer_svc_ = &get_timer_service(ctx, *this);
298 168 : timer_svc_->set_on_earliest_changed(
299 3917 : timer_service::callback(this, [](void* p) {
300 3749 : static_cast<select_scheduler*>(p)->interrupt_reactor();
301 3749 : }));
302 :
303 : // Initialize resolver service
304 168 : get_resolver_service(ctx, *this);
305 :
306 : // Initialize signal service
307 168 : get_signal_service(ctx, *this);
308 :
309 : // Push task sentinel to interleave reactor runs with handler execution
310 168 : completed_ops_.push(&task_op_);
311 168 : }
312 :
313 336 : inline select_scheduler::~select_scheduler()
314 : {
315 168 : if (pipe_fds_[0] >= 0)
316 168 : ::close(pipe_fds_[0]);
317 168 : if (pipe_fds_[1] >= 0)
318 168 : ::close(pipe_fds_[1]);
319 336 : }
320 :
321 : inline void
322 168 : select_scheduler::shutdown()
323 : {
324 : {
325 168 : std::unique_lock lock(mutex_);
326 :
327 343 : while (auto* h = completed_ops_.pop())
328 : {
329 175 : if (h == &task_op_)
330 168 : continue;
331 7 : lock.unlock();
332 7 : h->destroy();
333 7 : lock.lock();
334 175 : }
335 168 : }
336 :
337 168 : if (pipe_fds_[1] >= 0)
338 168 : interrupt_reactor();
339 :
340 168 : wakeup_event_.notify_all();
341 168 : }
342 :
343 : inline void
344 4139 : select_scheduler::post(std::coroutine_handle<> h) const
345 : {
346 : struct post_handler final : scheduler_op
347 : {
348 : std::coroutine_handle<> h_;
349 :
350 4139 : explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
351 :
352 8278 : ~post_handler() override = default;
353 :
354 4136 : void operator()() override
355 : {
356 4136 : auto h = h_;
357 4136 : delete this;
358 4136 : h.resume();
359 4136 : }
360 :
361 3 : void destroy() override
362 : {
363 3 : auto h = h_;
364 3 : delete this;
365 3 : h.destroy();
366 3 : }
367 : };
368 :
369 4139 : auto ph = std::make_unique<post_handler>(h);
370 4139 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
371 :
372 4139 : std::unique_lock lock(mutex_);
373 4139 : completed_ops_.push(ph.release());
374 4139 : wake_one_thread_and_unlock(lock);
375 4139 : }
376 :
377 : inline void
378 142701 : select_scheduler::post(scheduler_op* h) const
379 : {
380 142701 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
381 :
382 142701 : std::unique_lock lock(mutex_);
383 142701 : completed_ops_.push(h);
384 142701 : wake_one_thread_and_unlock(lock);
385 142701 : }
386 :
387 : inline bool
388 598 : select_scheduler::running_in_this_thread() const noexcept
389 : {
390 598 : for (auto* c = select::context_stack.get(); c != nullptr; c = c->next)
391 368 : if (c->key == this)
392 368 : return true;
393 230 : return false;
394 : }
395 :
396 : inline void
397 126 : select_scheduler::stop()
398 : {
399 126 : bool expected = false;
400 126 : if (stopped_.compare_exchange_strong(
401 : expected, true, std::memory_order_release,
402 : std::memory_order_relaxed))
403 : {
404 : // Wake all threads so they notice stopped_ and exit
405 : {
406 126 : std::lock_guard lock(mutex_);
407 126 : wakeup_event_.notify_all();
408 126 : }
409 126 : interrupt_reactor();
410 : }
411 126 : }
412 :
413 : inline bool
414 3 : select_scheduler::stopped() const noexcept
415 : {
416 3 : return stopped_.load(std::memory_order_acquire);
417 : }
418 :
419 : inline void
420 38 : select_scheduler::restart()
421 : {
422 38 : stopped_.store(false, std::memory_order_release);
423 38 : }
424 :
425 : inline std::size_t
426 122 : select_scheduler::run()
427 : {
428 122 : if (stopped_.load(std::memory_order_acquire))
429 MIS 0 : return 0;
430 :
431 HIT 244 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
432 : {
433 MIS 0 : stop();
434 0 : return 0;
435 : }
436 :
437 HIT 122 : select::thread_context_guard ctx(this);
438 :
439 122 : std::size_t n = 0;
440 154093 : while (do_one(-1))
441 153971 : if (n != (std::numeric_limits<std::size_t>::max)())
442 153971 : ++n;
443 122 : return n;
444 122 : }
445 :
446 : inline std::size_t
447 MIS 0 : select_scheduler::run_one()
448 : {
449 0 : if (stopped_.load(std::memory_order_acquire))
450 0 : return 0;
451 :
452 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
453 : {
454 0 : stop();
455 0 : return 0;
456 : }
457 :
458 0 : select::thread_context_guard ctx(this);
459 0 : return do_one(-1);
460 0 : }
461 :
462 : inline std::size_t
463 HIT 27 : select_scheduler::wait_one(long usec)
464 : {
465 27 : if (stopped_.load(std::memory_order_acquire))
466 3 : return 0;
467 :
468 48 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
469 : {
470 MIS 0 : stop();
471 0 : return 0;
472 : }
473 :
474 HIT 24 : select::thread_context_guard ctx(this);
475 24 : return do_one(usec);
476 24 : }
477 :
478 : inline std::size_t
479 2 : select_scheduler::poll()
480 : {
481 2 : if (stopped_.load(std::memory_order_acquire))
482 MIS 0 : return 0;
483 :
484 HIT 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
485 : {
486 MIS 0 : stop();
487 0 : return 0;
488 : }
489 :
490 HIT 2 : select::thread_context_guard ctx(this);
491 :
492 2 : std::size_t n = 0;
493 4 : while (do_one(0))
494 2 : if (n != (std::numeric_limits<std::size_t>::max)())
495 2 : ++n;
496 2 : return n;
497 2 : }
498 :
499 : inline std::size_t
500 MIS 0 : select_scheduler::poll_one()
501 : {
502 0 : if (stopped_.load(std::memory_order_acquire))
503 0 : return 0;
504 :
505 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
506 : {
507 0 : stop();
508 0 : return 0;
509 : }
510 :
511 0 : select::thread_context_guard ctx(this);
512 0 : return do_one(0);
513 0 : }
514 :
515 : inline void
516 HIT 7326 : select_scheduler::register_fd(int fd, select_op* op, int events) const
517 : {
518 : // Validate fd is within select() limits
519 7326 : if (fd < 0 || fd >= FD_SETSIZE)
520 MIS 0 : detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
521 :
522 : {
523 HIT 7326 : std::lock_guard lock(mutex_);
524 :
525 7326 : auto& state = registered_fds_[fd];
526 7326 : if (events & event_read)
527 3803 : state.read_op = op;
528 7326 : if (events & event_write)
529 3523 : state.write_op = op;
530 :
531 7326 : if (fd > max_fd_)
532 250 : max_fd_ = fd;
533 7326 : }
534 :
535 : // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
536 : // with the newly registered fd.
537 7326 : interrupt_reactor();
538 7326 : }
539 :
540 : inline void
541 7279 : select_scheduler::deregister_fd(int fd, int events) const
542 : {
543 7279 : std::lock_guard lock(mutex_);
544 :
545 7279 : auto it = registered_fds_.find(fd);
546 7279 : if (it == registered_fds_.end())
547 7117 : return;
548 :
549 162 : if (events & event_read)
550 162 : it->second.read_op = nullptr;
551 162 : if (events & event_write)
552 MIS 0 : it->second.write_op = nullptr;
553 :
554 : // Remove entry if both are null
555 HIT 162 : if (!it->second.read_op && !it->second.write_op)
556 : {
557 162 : registered_fds_.erase(it);
558 :
559 : // Recalculate max_fd_ if needed
560 162 : if (fd == max_fd_)
561 : {
562 161 : max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
563 161 : for (auto& [registered_fd, state] : registered_fds_)
564 : {
565 MIS 0 : if (registered_fd > max_fd_)
566 0 : max_fd_ = registered_fd;
567 : }
568 : }
569 : }
570 HIT 7279 : }
571 :
572 : inline void
573 11702 : select_scheduler::work_started() noexcept
574 : {
575 11702 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
576 11702 : }
577 :
578 : inline void
579 158535 : select_scheduler::work_finished() noexcept
580 : {
581 317070 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
582 125 : stop();
583 158535 : }
584 :
585 : inline void
586 15097 : select_scheduler::interrupt_reactor() const
587 : {
588 15097 : char byte = 1;
589 15097 : [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
590 15097 : }
591 :
592 : inline void
593 146840 : select_scheduler::wake_one_thread_and_unlock(
594 : std::unique_lock<std::mutex>& lock) const
595 : {
596 146840 : if (idle_thread_count_ > 0)
597 : {
598 : // Idle worker exists - wake it via condvar
599 MIS 0 : wakeup_event_.notify_one();
600 0 : lock.unlock();
601 : }
602 HIT 146840 : else if (reactor_running_ && !reactor_interrupted_)
603 : {
604 : // No idle workers but reactor is running - interrupt it
605 3728 : reactor_interrupted_ = true;
606 3728 : lock.unlock();
607 3728 : interrupt_reactor();
608 : }
609 : else
610 : {
611 : // No one to wake
612 143112 : lock.unlock();
613 : }
614 146840 : }
615 :
616 : inline long
617 10775 : select_scheduler::calculate_timeout(long requested_timeout_us) const
618 : {
619 10775 : if (requested_timeout_us == 0)
620 MIS 0 : return 0;
621 :
622 HIT 10775 : auto nearest = timer_svc_->nearest_expiry();
623 10775 : if (nearest == timer_service::time_point::max())
624 46 : return requested_timeout_us;
625 :
626 10729 : auto now = std::chrono::steady_clock::now();
627 10729 : if (nearest <= now)
628 136 : return 0;
629 :
630 : auto timer_timeout_us =
631 10593 : std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
632 10593 : .count();
633 :
634 : // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
635 10593 : constexpr auto long_max =
636 : static_cast<long long>((std::numeric_limits<long>::max)());
637 : auto capped_timer_us =
638 10593 : (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
639 10593 : static_cast<long long>(0)),
640 10593 : long_max);
641 :
642 10593 : if (requested_timeout_us < 0)
643 10593 : return static_cast<long>(capped_timer_us);
644 :
645 : // requested_timeout_us is already long, so min() result fits in long
646 : return static_cast<long>(
647 MIS 0 : (std::min)(static_cast<long long>(requested_timeout_us),
648 0 : capped_timer_us));
649 : }
650 :
651 : inline void
652 HIT 91799 : select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
653 : {
654 : // Calculate timeout considering timers, use 0 if interrupted
655 : long effective_timeout_us =
656 91799 : reactor_interrupted_ ? 0 : calculate_timeout(-1);
657 :
658 : // Build fd_sets from registered_fds_
659 : fd_set read_fds, write_fds, except_fds;
660 1560583 : FD_ZERO(&read_fds);
661 1560583 : FD_ZERO(&write_fds);
662 1560583 : FD_ZERO(&except_fds);
663 :
664 : // Always include the interrupt pipe
665 91799 : FD_SET(pipe_fds_[0], &read_fds);
666 91799 : int nfds = pipe_fds_[0];
667 :
668 : // Add registered fds
669 109317 : for (auto& [fd, state] : registered_fds_)
670 : {
671 17518 : if (state.read_op)
672 13995 : FD_SET(fd, &read_fds);
673 17518 : if (state.write_op)
674 : {
675 3523 : FD_SET(fd, &write_fds);
676 : // Also monitor for errors on connect operations
677 3523 : FD_SET(fd, &except_fds);
678 : }
679 17518 : if (fd > nfds)
680 14000 : nfds = fd;
681 : }
682 :
683 : // Convert timeout to timeval
684 : struct timeval tv;
685 91799 : struct timeval* tv_ptr = nullptr;
686 91799 : if (effective_timeout_us >= 0)
687 : {
688 91753 : tv.tv_sec = effective_timeout_us / 1000000;
689 91753 : tv.tv_usec = effective_timeout_us % 1000000;
690 91753 : tv_ptr = &tv;
691 : }
692 :
693 91799 : lock.unlock();
694 :
695 91799 : int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
696 91799 : int saved_errno = errno;
697 :
698 : // Process timers outside the lock
699 91799 : timer_svc_->process_expired();
700 :
701 91799 : if (ready < 0 && saved_errno != EINTR)
702 MIS 0 : detail::throw_system_error(make_err(saved_errno), "select");
703 :
704 : // Re-acquire lock before modifying completed_ops_
705 HIT 91799 : lock.lock();
706 :
707 : // Drain the interrupt pipe if readable
708 91799 : if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
709 : {
710 : char buf[256];
711 22282 : while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
712 : {
713 : }
714 : }
715 :
716 : // Process I/O completions
717 91799 : int completions_queued = 0;
718 91799 : if (ready > 0)
719 : {
720 : // Iterate over registered fds (copy keys to avoid iterator invalidation)
721 11141 : std::vector<int> fds_to_check;
722 11141 : fds_to_check.reserve(registered_fds_.size());
723 25182 : for (auto& [fd, state] : registered_fds_)
724 14041 : fds_to_check.push_back(fd);
725 :
726 25182 : for (int fd : fds_to_check)
727 : {
728 14041 : auto it = registered_fds_.find(fd);
729 14041 : if (it == registered_fds_.end())
730 MIS 0 : continue;
731 :
732 HIT 14041 : auto& state = it->second;
733 :
734 : // Check for errors (especially for connect operations)
735 14041 : bool has_error = FD_ISSET(fd, &except_fds);
736 :
737 : // Process read readiness
738 14041 : if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
739 : {
740 3641 : auto* op = state.read_op;
741 : // Claim the op by exchanging to unregistered. Both registering and
742 : // registered states mean the op is ours to complete.
743 3641 : auto prev = op->registered.exchange(
744 : select_registration_state::unregistered,
745 : std::memory_order_acq_rel);
746 3641 : if (prev != select_registration_state::unregistered)
747 : {
748 3641 : state.read_op = nullptr;
749 :
750 3641 : if (has_error)
751 : {
752 MIS 0 : int errn = 0;
753 0 : socklen_t len = sizeof(errn);
754 0 : if (::getsockopt(
755 0 : fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
756 0 : errn = errno;
757 0 : if (errn == 0)
758 0 : errn = EIO;
759 0 : op->complete(errn, 0);
760 : }
761 : else
762 : {
763 HIT 3641 : op->perform_io();
764 : }
765 :
766 3641 : completed_ops_.push(op);
767 3641 : ++completions_queued;
768 : }
769 : }
770 :
771 : // Process write readiness
772 14041 : if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
773 : {
774 3523 : auto* op = state.write_op;
775 : // Claim the op by exchanging to unregistered. Both registering and
776 : // registered states mean the op is ours to complete.
777 3523 : auto prev = op->registered.exchange(
778 : select_registration_state::unregistered,
779 : std::memory_order_acq_rel);
780 3523 : if (prev != select_registration_state::unregistered)
781 : {
782 3523 : state.write_op = nullptr;
783 :
784 3523 : if (has_error)
785 : {
786 MIS 0 : int errn = 0;
787 0 : socklen_t len = sizeof(errn);
788 0 : if (::getsockopt(
789 0 : fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
790 0 : errn = errno;
791 0 : if (errn == 0)
792 0 : errn = EIO;
793 0 : op->complete(errn, 0);
794 : }
795 : else
796 : {
797 HIT 3523 : op->perform_io();
798 : }
799 :
800 3523 : completed_ops_.push(op);
801 3523 : ++completions_queued;
802 : }
803 : }
804 :
805 : // Clean up empty entries
806 14041 : if (!state.read_op && !state.write_op)
807 7164 : registered_fds_.erase(it);
808 : }
809 11141 : }
810 :
811 91799 : if (completions_queued > 0)
812 : {
813 3646 : if (completions_queued == 1)
814 128 : wakeup_event_.notify_one();
815 : else
816 3518 : wakeup_event_.notify_all();
817 : }
818 91799 : }
819 :
820 : inline std::size_t
821 154121 : select_scheduler::do_one(long timeout_us)
822 : {
823 154121 : std::unique_lock lock(mutex_);
824 :
825 : for (;;)
826 : {
827 245920 : if (stopped_.load(std::memory_order_acquire))
828 122 : return 0;
829 :
830 245798 : scheduler_op* op = completed_ops_.pop();
831 :
832 245798 : if (op == &task_op_)
833 : {
834 91801 : bool more_handlers = !completed_ops_.empty();
835 :
836 91801 : if (!more_handlers)
837 : {
838 21554 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
839 : {
840 MIS 0 : completed_ops_.push(&task_op_);
841 0 : return 0;
842 : }
843 HIT 10777 : if (timeout_us == 0)
844 : {
845 2 : completed_ops_.push(&task_op_);
846 2 : return 0;
847 : }
848 : }
849 :
850 91799 : reactor_interrupted_ = more_handlers || timeout_us == 0;
851 91799 : reactor_running_ = true;
852 :
853 91799 : if (more_handlers && idle_thread_count_ > 0)
854 MIS 0 : wakeup_event_.notify_one();
855 :
856 HIT 91799 : run_reactor(lock);
857 :
858 91799 : reactor_running_ = false;
859 91799 : completed_ops_.push(&task_op_);
860 91799 : continue;
861 91799 : }
862 :
863 153997 : if (op != nullptr)
864 : {
865 153997 : lock.unlock();
866 153997 : select::work_guard g{this};
867 153997 : (*op)();
868 153997 : return 1;
869 153997 : }
870 :
871 MIS 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
872 0 : return 0;
873 :
874 0 : if (timeout_us == 0)
875 0 : return 0;
876 :
877 0 : ++idle_thread_count_;
878 0 : if (timeout_us < 0)
879 0 : wakeup_event_.wait(lock);
880 : else
881 0 : wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
882 0 : --idle_thread_count_;
883 HIT 91799 : }
884 154121 : }
885 :
886 : } // namespace boost::corosio::detail
887 :
888 : #endif // BOOST_COROSIO_HAS_SELECT
889 :
890 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
|