include/boost/corosio/native/detail/select/select_socket_service.hpp

75.8% Lines (263/347) 93.1% Functions (27/29)
include/boost/corosio/native/detail/select/select_socket_service.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_socket.hpp>
22 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
23
24 #include <boost/corosio/detail/endpoint_convert.hpp>
25 #include <boost/corosio/detail/dispatch_coro.hpp>
26 #include <boost/corosio/detail/make_err.hpp>
27
28 #include <boost/corosio/detail/except.hpp>
29
30 #include <boost/capy/buffers.hpp>
31
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 #include <memory>
40 #include <mutex>
41 #include <unordered_map>
42
43 /*
44 select Socket Implementation
45 ============================
46
47 This mirrors the epoll_sockets design for behavioral consistency.
48 Each I/O operation follows the same pattern:
49 1. Try the syscall immediately (non-blocking socket)
50 2. If it succeeds or fails with a real error, post to completion queue
51 3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52
53 Cancellation
54 ------------
55 See op.hpp for the completion/cancellation race handling via the
56 `registered` atomic. cancel() must complete pending operations (post
57 them with cancelled flag) so coroutines waiting on them can resume.
58 close_socket() calls cancel() first to ensure this.
59
60 Impl Lifetime with shared_ptr
61 -----------------------------
62 Socket impls use enable_shared_from_this. The service owns impls via
63 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64 removal. When a user calls close(), we call cancel() which posts pending
65 ops to the scheduler.
66
67 CRITICAL: The posted ops must keep the impl alive until they complete.
68 Otherwise the scheduler would process a freed op (use-after-free). The
69 cancel() method captures shared_from_this() into op.impl_ptr before
70 posting. When the op completes, impl_ptr is cleared, allowing the impl
71 to be destroyed if no other references exist.
72
73 Service Ownership
74 -----------------
75 select_socket_service owns all socket impls. destroy() removes the
76 shared_ptr from the map, but the impl may survive if ops still hold
77 impl_ptr refs. shutdown() closes all sockets and clears the map; any
78 in-flight ops will complete and release their refs.
79 */
80
81 namespace boost::corosio::detail {
82
83 /** State for select socket service. */
84 class select_socket_state
85 {
86 public:
87 168 explicit select_socket_state(select_scheduler& sched) noexcept
88 168 : sched_(sched)
89 {
90 168 }
91
92 select_scheduler& sched_;
93 std::mutex mutex_;
94 intrusive_list<select_socket> socket_list_;
95 std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96 socket_ptrs_;
97 };
98
99 /** select socket service implementation.
100
101 Inherits from socket_service to enable runtime polymorphism.
102 Uses key_type = socket_service for service lookup.
103 */
104 class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105 {
106 public:
107 explicit select_socket_service(capy::execution_context& ctx);
108 ~select_socket_service() override;
109
110 select_socket_service(select_socket_service const&) = delete;
111 select_socket_service& operator=(select_socket_service const&) = delete;
112
113 void shutdown() override;
114
115 io_object::implementation* construct() override;
116 void destroy(io_object::implementation*) override;
117 void close(io_object::handle&) override;
118 std::error_code
119 open_socket(tcp_socket::implementation& impl,
120 int family, int type, int protocol) override;
121
122 11022 select_scheduler& scheduler() const noexcept
123 {
124 11022 return state_->sched_;
125 }
126 void post(select_op* op);
127 void work_started() noexcept;
128 void work_finished() noexcept;
129
130 private:
131 std::unique_ptr<select_socket_state> state_;
132 };
133
134 // Backward compatibility alias
135 using select_sockets = select_socket_service;
136
137 inline void
138 98 select_op::canceller::operator()() const noexcept
139 {
140 98 op->cancel();
141 98 }
142
143 inline void
144 select_connect_op::cancel() noexcept
145 {
146 if (socket_impl_)
147 socket_impl_->cancel_single_op(*this);
148 else
149 request_cancel();
150 }
151
152 inline void
153 98 select_read_op::cancel() noexcept
154 {
155 98 if (socket_impl_)
156 98 socket_impl_->cancel_single_op(*this);
157 else
158 request_cancel();
159 98 }
160
161 inline void
162 select_write_op::cancel() noexcept
163 {
164 if (socket_impl_)
165 socket_impl_->cancel_single_op(*this);
166 else
167 request_cancel();
168 }
169
170 inline void
171 3523 select_connect_op::operator()()
172 {
173 3523 stop_cb.reset();
174
175 3523 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
176
177 // Cache endpoints on successful connect
178 3523 if (success && socket_impl_)
179 {
180 3520 endpoint local_ep;
181 3520 sockaddr_storage local_storage{};
182 3520 socklen_t local_len = sizeof(local_storage);
183 3520 if (::getsockname(
184 fd, reinterpret_cast<sockaddr*>(&local_storage),
185 3520 &local_len) == 0)
186 3520 local_ep = from_sockaddr(local_storage);
187 3520 static_cast<select_socket*>(socket_impl_)
188 3520 ->set_endpoints(local_ep, target_endpoint);
189 }
190
191 3523 if (ec_out)
192 {
193 3523 if (cancelled.load(std::memory_order_acquire))
194 *ec_out = capy::error::canceled;
195 3523 else if (errn != 0)
196 3 *ec_out = make_err(errn);
197 else
198 3520 *ec_out = {};
199 }
200
201 3523 if (bytes_out)
202 *bytes_out = bytes_transferred;
203
204 // Move to stack before destroying the frame
205 3523 capy::executor_ref saved_ex(ex);
206 3523 std::coroutine_handle<> saved_h(h);
207 3523 impl_ptr.reset();
208 3523 dispatch_coro(saved_ex, saved_h).resume();
209 3523 }
210
211 10587 inline select_socket::select_socket(select_socket_service& svc) noexcept
212 10587 : svc_(svc)
213 {
214 10587 }
215
216 inline std::coroutine_handle<>
217 3523 select_socket::connect(
218 std::coroutine_handle<> h,
219 capy::executor_ref ex,
220 endpoint ep,
221 std::stop_token token,
222 std::error_code* ec)
223 {
224 3523 auto& op = conn_;
225 3523 op.reset();
226 3523 op.h = h;
227 3523 op.ex = ex;
228 3523 op.ec_out = ec;
229 3523 op.fd = fd_;
230 3523 op.target_endpoint = ep; // Store target for endpoint caching
231 3523 op.start(token, this);
232
233 3523 sockaddr_storage storage{};
234 socklen_t addrlen =
235 3523 detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
236 int result =
237 3523 ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
238
239 3523 if (result == 0)
240 {
241 // Sync success — cache endpoints immediately
242 sockaddr_storage local_storage{};
243 socklen_t local_len = sizeof(local_storage);
244 if (::getsockname(
245 fd_, reinterpret_cast<sockaddr*>(&local_storage),
246 &local_len) == 0)
247 local_endpoint_ = detail::from_sockaddr(local_storage);
248 remote_endpoint_ = ep;
249
250 op.complete(0, 0);
251 op.impl_ptr = shared_from_this();
252 svc_.post(&op);
253 // completion is always posted to scheduler queue, never inline.
254 return std::noop_coroutine();
255 }
256
257 3523 if (errno == EINPROGRESS)
258 {
259 3523 svc_.work_started();
260 3523 op.impl_ptr = shared_from_this();
261
262 // Set registering BEFORE register_fd to close the race window where
263 // reactor sees an event before we set registered. The reactor treats
264 // registering the same as registered when claiming the op.
265 3523 op.registered.store(
266 select_registration_state::registering, std::memory_order_release);
267 3523 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
268
269 // Transition to registered. If this fails, reactor or cancel already
270 // claimed the op (state is now unregistered), so we're done. However,
271 // we must still deregister the fd because cancel's deregister_fd may
272 // have run before our register_fd, leaving the fd orphaned.
273 3523 auto expected = select_registration_state::registering;
274 3523 if (!op.registered.compare_exchange_strong(
275 expected, select_registration_state::registered,
276 std::memory_order_acq_rel))
277 {
278 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
279 // completion is always posted to scheduler queue, never inline.
280 return std::noop_coroutine();
281 }
282
283 // If cancelled was set before we registered, handle it now.
284 3523 if (op.cancelled.load(std::memory_order_acquire))
285 {
286 auto prev = op.registered.exchange(
287 select_registration_state::unregistered,
288 std::memory_order_acq_rel);
289 if (prev != select_registration_state::unregistered)
290 {
291 svc_.scheduler().deregister_fd(
292 fd_, select_scheduler::event_write);
293 op.impl_ptr = shared_from_this();
294 svc_.post(&op);
295 svc_.work_finished();
296 }
297 }
298 // completion is always posted to scheduler queue, never inline.
299 3523 return std::noop_coroutine();
300 }
301
302 op.complete(errno, 0);
303 op.impl_ptr = shared_from_this();
304 svc_.post(&op);
305 // completion is always posted to scheduler queue, never inline.
306 return std::noop_coroutine();
307 }
308
309 inline std::coroutine_handle<>
310 69601 select_socket::read_some(
311 std::coroutine_handle<> h,
312 capy::executor_ref ex,
313 io_buffer_param param,
314 std::stop_token token,
315 std::error_code* ec,
316 std::size_t* bytes_out)
317 {
318 69601 auto& op = rd_;
319 69601 op.reset();
320 69601 op.h = h;
321 69601 op.ex = ex;
322 69601 op.ec_out = ec;
323 69601 op.bytes_out = bytes_out;
324 69601 op.fd = fd_;
325 69601 op.start(token, this);
326
327 69601 capy::mutable_buffer bufs[select_read_op::max_buffers];
328 69601 op.iovec_count =
329 69601 static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
330
331 69601 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
332 {
333 1 op.empty_buffer_read = true;
334 1 op.complete(0, 0);
335 1 op.impl_ptr = shared_from_this();
336 1 svc_.post(&op);
337 1 return std::noop_coroutine();
338 }
339
340 139200 for (int i = 0; i < op.iovec_count; ++i)
341 {
342 69600 op.iovecs[i].iov_base = bufs[i].data();
343 69600 op.iovecs[i].iov_len = bufs[i].size();
344 }
345
346 69600 ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
347
348 69600 if (n > 0)
349 {
350 69313 op.complete(0, static_cast<std::size_t>(n));
351 69313 op.impl_ptr = shared_from_this();
352 69313 svc_.post(&op);
353 69313 return std::noop_coroutine();
354 }
355
356 287 if (n == 0)
357 {
358 5 op.complete(0, 0);
359 5 op.impl_ptr = shared_from_this();
360 5 svc_.post(&op);
361 5 return std::noop_coroutine();
362 }
363
364 282 if (errno == EAGAIN || errno == EWOULDBLOCK)
365 {
366 282 svc_.work_started();
367 282 op.impl_ptr = shared_from_this();
368
369 // Set registering BEFORE register_fd to close the race window where
370 // reactor sees an event before we set registered.
371 282 op.registered.store(
372 select_registration_state::registering, std::memory_order_release);
373 282 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
374
375 // Transition to registered. If this fails, reactor or cancel already
376 // claimed the op (state is now unregistered), so we're done. However,
377 // we must still deregister the fd because cancel's deregister_fd may
378 // have run before our register_fd, leaving the fd orphaned.
379 282 auto expected = select_registration_state::registering;
380 282 if (!op.registered.compare_exchange_strong(
381 expected, select_registration_state::registered,
382 std::memory_order_acq_rel))
383 {
384 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
385 return std::noop_coroutine();
386 }
387
388 // If cancelled was set before we registered, handle it now.
389 282 if (op.cancelled.load(std::memory_order_acquire))
390 {
391 auto prev = op.registered.exchange(
392 select_registration_state::unregistered,
393 std::memory_order_acq_rel);
394 if (prev != select_registration_state::unregistered)
395 {
396 svc_.scheduler().deregister_fd(
397 fd_, select_scheduler::event_read);
398 op.impl_ptr = shared_from_this();
399 svc_.post(&op);
400 svc_.work_finished();
401 }
402 }
403 282 return std::noop_coroutine();
404 }
405
406 op.complete(errno, 0);
407 op.impl_ptr = shared_from_this();
408 svc_.post(&op);
409 return std::noop_coroutine();
410 }
411
412 inline std::coroutine_handle<>
413 69438 select_socket::write_some(
414 std::coroutine_handle<> h,
415 capy::executor_ref ex,
416 io_buffer_param param,
417 std::stop_token token,
418 std::error_code* ec,
419 std::size_t* bytes_out)
420 {
421 69438 auto& op = wr_;
422 69438 op.reset();
423 69438 op.h = h;
424 69438 op.ex = ex;
425 69438 op.ec_out = ec;
426 69438 op.bytes_out = bytes_out;
427 69438 op.fd = fd_;
428 69438 op.start(token, this);
429
430 69438 capy::mutable_buffer bufs[select_write_op::max_buffers];
431 69438 op.iovec_count =
432 69438 static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
433
434 69438 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
435 {
436 1 op.complete(0, 0);
437 1 op.impl_ptr = shared_from_this();
438 1 svc_.post(&op);
439 1 return std::noop_coroutine();
440 }
441
442 138874 for (int i = 0; i < op.iovec_count; ++i)
443 {
444 69437 op.iovecs[i].iov_base = bufs[i].data();
445 69437 op.iovecs[i].iov_len = bufs[i].size();
446 }
447
448 69437 msghdr msg{};
449 69437 msg.msg_iov = op.iovecs;
450 69437 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
451
452 69437 ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
453
454 69437 if (n > 0)
455 {
456 69436 op.complete(0, static_cast<std::size_t>(n));
457 69436 op.impl_ptr = shared_from_this();
458 69436 svc_.post(&op);
459 69436 return std::noop_coroutine();
460 }
461
462 1 if (errno == EAGAIN || errno == EWOULDBLOCK)
463 {
464 svc_.work_started();
465 op.impl_ptr = shared_from_this();
466
467 // Set registering BEFORE register_fd to close the race window where
468 // reactor sees an event before we set registered.
469 op.registered.store(
470 select_registration_state::registering, std::memory_order_release);
471 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
472
473 // Transition to registered. If this fails, reactor or cancel already
474 // claimed the op (state is now unregistered), so we're done. However,
475 // we must still deregister the fd because cancel's deregister_fd may
476 // have run before our register_fd, leaving the fd orphaned.
477 auto expected = select_registration_state::registering;
478 if (!op.registered.compare_exchange_strong(
479 expected, select_registration_state::registered,
480 std::memory_order_acq_rel))
481 {
482 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
483 return std::noop_coroutine();
484 }
485
486 // If cancelled was set before we registered, handle it now.
487 if (op.cancelled.load(std::memory_order_acquire))
488 {
489 auto prev = op.registered.exchange(
490 select_registration_state::unregistered,
491 std::memory_order_acq_rel);
492 if (prev != select_registration_state::unregistered)
493 {
494 svc_.scheduler().deregister_fd(
495 fd_, select_scheduler::event_write);
496 op.impl_ptr = shared_from_this();
497 svc_.post(&op);
498 svc_.work_finished();
499 }
500 }
501 return std::noop_coroutine();
502 }
503
504 1 op.complete(errno ? errno : EIO, 0);
505 1 op.impl_ptr = shared_from_this();
506 1 svc_.post(&op);
507 1 return std::noop_coroutine();
508 }
509
510 inline std::error_code
511 3 select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
512 {
513 int how;
514 3 switch (what)
515 {
516 1 case tcp_socket::shutdown_receive:
517 1 how = SHUT_RD;
518 1 break;
519 1 case tcp_socket::shutdown_send:
520 1 how = SHUT_WR;
521 1 break;
522 1 case tcp_socket::shutdown_both:
523 1 how = SHUT_RDWR;
524 1 break;
525 default:
526 return make_err(EINVAL);
527 }
528 3 if (::shutdown(fd_, how) != 0)
529 return make_err(errno);
530 3 return {};
531 }
532
533 inline std::error_code
534 28 select_socket::set_option(
535 int level, int optname,
536 void const* data, std::size_t size) noexcept
537 {
538 28 if (::setsockopt(fd_, level, optname, data,
539 28 static_cast<socklen_t>(size)) != 0)
540 return make_err(errno);
541 28 return {};
542 }
543
544 inline std::error_code
545 31 select_socket::get_option(
546 int level, int optname,
547 void* data, std::size_t* size) const noexcept
548 {
549 31 socklen_t len = static_cast<socklen_t>(*size);
550 31 if (::getsockopt(fd_, level, optname, data, &len) != 0)
551 return make_err(errno);
552 31 *size = static_cast<std::size_t>(len);
553 31 return {};
554 }
555
556 inline void
557 177 select_socket::cancel() noexcept
558 {
559 177 auto self = weak_from_this().lock();
560 177 if (!self)
561 return;
562
563 531 auto cancel_op = [this, &self](select_op& op, int events) {
564 531 auto prev = op.registered.exchange(
565 select_registration_state::unregistered, std::memory_order_acq_rel);
566 531 op.request_cancel();
567 531 if (prev != select_registration_state::unregistered)
568 {
569 92 svc_.scheduler().deregister_fd(fd_, events);
570 92 op.impl_ptr = self;
571 92 svc_.post(&op);
572 92 svc_.work_finished();
573 }
574 708 };
575
576 177 cancel_op(conn_, select_scheduler::event_write);
577 177 cancel_op(rd_, select_scheduler::event_read);
578 177 cancel_op(wr_, select_scheduler::event_write);
579 177 }
580
581 inline void
582 98 select_socket::cancel_single_op(select_op& op) noexcept
583 {
584 98 auto self = weak_from_this().lock();
585 98 if (!self)
586 return;
587
588 // Called from stop_token callback to cancel a specific pending operation.
589 98 auto prev = op.registered.exchange(
590 select_registration_state::unregistered, std::memory_order_acq_rel);
591 98 op.request_cancel();
592
593 98 if (prev != select_registration_state::unregistered)
594 {
595 // Determine which event type to deregister
596 66 int events = 0;
597 66 if (&op == &conn_ || &op == &wr_)
598 events = select_scheduler::event_write;
599 66 else if (&op == &rd_)
600 66 events = select_scheduler::event_read;
601
602 66 svc_.scheduler().deregister_fd(fd_, events);
603
604 66 op.impl_ptr = self;
605 66 svc_.post(&op);
606 66 svc_.work_finished();
607 }
608 98 }
609
610 inline void
611 31770 select_socket::close_socket() noexcept
612 {
613 31770 auto self = weak_from_this().lock();
614 31770 if (self)
615 {
616 95310 auto cancel_op = [this, &self](select_op& op, int events) {
617 95310 auto prev = op.registered.exchange(
618 select_registration_state::unregistered,
619 std::memory_order_acq_rel);
620 95310 op.request_cancel();
621 95310 if (prev != select_registration_state::unregistered)
622 {
623 1 svc_.scheduler().deregister_fd(fd_, events);
624 1 op.impl_ptr = self;
625 1 svc_.post(&op);
626 1 svc_.work_finished();
627 }
628 127080 };
629
630 31770 cancel_op(conn_, select_scheduler::event_write);
631 31770 cancel_op(rd_, select_scheduler::event_read);
632 31770 cancel_op(wr_, select_scheduler::event_write);
633 }
634
635 31770 if (fd_ >= 0)
636 {
637 7058 svc_.scheduler().deregister_fd(
638 fd_, select_scheduler::event_read | select_scheduler::event_write);
639 7058 ::close(fd_);
640 7058 fd_ = -1;
641 }
642
643 31770 local_endpoint_ = endpoint{};
644 31770 remote_endpoint_ = endpoint{};
645 31770 }
646
647 168 inline select_socket_service::select_socket_service(
648 168 capy::execution_context& ctx)
649 168 : state_(
650 std::make_unique<select_socket_state>(
651 168 ctx.use_service<select_scheduler>()))
652 {
653 168 }
654
655 336 inline select_socket_service::~select_socket_service() {}
656
657 inline void
658 168 select_socket_service::shutdown()
659 {
660 168 std::lock_guard lock(state_->mutex_);
661
662 168 while (auto* impl = state_->socket_list_.pop_front())
663 impl->close_socket();
664
665 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
666 // drains completed_ops_, calling destroy() on each queued op. Letting
667 // ~state_ release the ptrs (during service destruction, after scheduler
668 // shutdown) keeps every impl alive until all ops have been drained.
669 168 }
670
671 inline io_object::implementation*
672 10587 select_socket_service::construct()
673 {
674 10587 auto impl = std::make_shared<select_socket>(*this);
675 10587 auto* raw = impl.get();
676
677 {
678 10587 std::lock_guard lock(state_->mutex_);
679 10587 state_->socket_list_.push_back(raw);
680 10587 state_->socket_ptrs_.emplace(raw, std::move(impl));
681 10587 }
682
683 10587 return raw;
684 10587 }
685
686 inline void
687 10587 select_socket_service::destroy(io_object::implementation* impl)
688 {
689 10587 auto* select_impl = static_cast<select_socket*>(impl);
690 10587 select_impl->close_socket();
691 10587 std::lock_guard lock(state_->mutex_);
692 10587 state_->socket_list_.remove(select_impl);
693 10587 state_->socket_ptrs_.erase(select_impl);
694 10587 }
695
696 inline std::error_code
697 3538 select_socket_service::open_socket(
698 tcp_socket::implementation& impl,
699 int family, int type, int protocol)
700 {
701 3538 auto* select_impl = static_cast<select_socket*>(&impl);
702 3538 select_impl->close_socket();
703
704 3538 int fd = ::socket(family, type, protocol);
705 3538 if (fd < 0)
706 return make_err(errno);
707
708 3538 if (family == AF_INET6)
709 {
710 5 int one = 1;
711 5 ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
712 }
713
714 // Set non-blocking and close-on-exec
715 3538 int flags = ::fcntl(fd, F_GETFL, 0);
716 3538 if (flags == -1)
717 {
718 int errn = errno;
719 ::close(fd);
720 return make_err(errn);
721 }
722 3538 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
723 {
724 int errn = errno;
725 ::close(fd);
726 return make_err(errn);
727 }
728 3538 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
729 {
730 int errn = errno;
731 ::close(fd);
732 return make_err(errn);
733 }
734
735 // Check fd is within select() limits
736 3538 if (fd >= FD_SETSIZE)
737 {
738 ::close(fd);
739 return make_err(EMFILE); // Too many open files
740 }
741
742 3538 select_impl->fd_ = fd;
743 3538 return {};
744 }
745
746 inline void
747 17645 select_socket_service::close(io_object::handle& h)
748 {
749 17645 static_cast<select_socket*>(h.get())->close_socket();
750 17645 }
751
752 inline void
753 138916 select_socket_service::post(select_op* op)
754 {
755 138916 state_->sched_.post(op);
756 138916 }
757
758 inline void
759 3805 select_socket_service::work_started() noexcept
760 {
761 3805 state_->sched_.work_started();
762 3805 }
763
764 inline void
765 159 select_socket_service::work_finished() noexcept
766 {
767 159 state_->sched_.work_finished();
768 159 }
769
770 } // namespace boost::corosio::detail
771
772 #endif // BOOST_COROSIO_HAS_SELECT
773
774 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
775