1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
20  

20  

21  
#include <boost/corosio/native/detail/select/select_socket.hpp>
21  
#include <boost/corosio/native/detail/select/select_socket.hpp>
22  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
22  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
23  

23  

24  
#include <boost/corosio/detail/endpoint_convert.hpp>
24  
#include <boost/corosio/detail/endpoint_convert.hpp>
25  
#include <boost/corosio/detail/dispatch_coro.hpp>
25  
#include <boost/corosio/detail/dispatch_coro.hpp>
26  
#include <boost/corosio/detail/make_err.hpp>
26  
#include <boost/corosio/detail/make_err.hpp>
27  

27  

28  
#include <boost/corosio/detail/except.hpp>
28  
#include <boost/corosio/detail/except.hpp>
29  

29  

30  
#include <boost/capy/buffers.hpp>
30  
#include <boost/capy/buffers.hpp>
31  

31  

32  
#include <errno.h>
32  
#include <errno.h>
33  
#include <fcntl.h>
33  
#include <fcntl.h>
34  
#include <netinet/in.h>
34  
#include <netinet/in.h>
35  
#include <netinet/tcp.h>
35  
#include <netinet/tcp.h>
36  
#include <sys/socket.h>
36  
#include <sys/socket.h>
37  
#include <unistd.h>
37  
#include <unistd.h>
38  

38  

39  
#include <memory>
39  
#include <memory>
40  
#include <mutex>
40  
#include <mutex>
41  
#include <unordered_map>
41  
#include <unordered_map>
42  

42  

43  
/*
43  
/*
44  
    select Socket Implementation
44  
    select Socket Implementation
45  
    ============================
45  
    ============================
46  

46  

47  
    This mirrors the epoll_sockets design for behavioral consistency.
47  
    This mirrors the epoll_sockets design for behavioral consistency.
48  
    Each I/O operation follows the same pattern:
48  
    Each I/O operation follows the same pattern:
49  
      1. Try the syscall immediately (non-blocking socket)
49  
      1. Try the syscall immediately (non-blocking socket)
50  
      2. If it succeeds or fails with a real error, post to completion queue
50  
      2. If it succeeds or fails with a real error, post to completion queue
51  
      3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
51  
      3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52  

52  

53  
    Cancellation
53  
    Cancellation
54  
    ------------
54  
    ------------
55  
    See op.hpp for the completion/cancellation race handling via the
55  
    See op.hpp for the completion/cancellation race handling via the
56  
    `registered` atomic. cancel() must complete pending operations (post
56  
    `registered` atomic. cancel() must complete pending operations (post
57  
    them with cancelled flag) so coroutines waiting on them can resume.
57  
    them with cancelled flag) so coroutines waiting on them can resume.
58  
    close_socket() calls cancel() first to ensure this.
58  
    close_socket() calls cancel() first to ensure this.
59  

59  

60  
    Impl Lifetime with shared_ptr
60  
    Impl Lifetime with shared_ptr
61  
    -----------------------------
61  
    -----------------------------
62  
    Socket impls use enable_shared_from_this. The service owns impls via
62  
    Socket impls use enable_shared_from_this. The service owns impls via
63  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
63  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64  
    removal. When a user calls close(), we call cancel() which posts pending
64  
    removal. When a user calls close(), we call cancel() which posts pending
65  
    ops to the scheduler.
65  
    ops to the scheduler.
66  

66  

67  
    CRITICAL: The posted ops must keep the impl alive until they complete.
67  
    CRITICAL: The posted ops must keep the impl alive until they complete.
68  
    Otherwise the scheduler would process a freed op (use-after-free). The
68  
    Otherwise the scheduler would process a freed op (use-after-free). The
69  
    cancel() method captures shared_from_this() into op.impl_ptr before
69  
    cancel() method captures shared_from_this() into op.impl_ptr before
70  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
70  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
71  
    to be destroyed if no other references exist.
71  
    to be destroyed if no other references exist.
72  

72  

73  
    Service Ownership
73  
    Service Ownership
74  
    -----------------
74  
    -----------------
75  
    select_socket_service owns all socket impls. destroy() removes the
75  
    select_socket_service owns all socket impls. destroy() removes the
76  
    shared_ptr from the map, but the impl may survive if ops still hold
76  
    shared_ptr from the map, but the impl may survive if ops still hold
77  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
77  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
78  
    in-flight ops will complete and release their refs.
78  
    in-flight ops will complete and release their refs.
79  
*/
79  
*/
80  

80  

81  
namespace boost::corosio::detail {
81  
namespace boost::corosio::detail {
82  

82  

83  
/** State for select socket service. */
83  
/** State for select socket service. */
84  
class select_socket_state
84  
class select_socket_state
85  
{
85  
{
86  
public:
86  
public:
87  
    explicit select_socket_state(select_scheduler& sched) noexcept
87  
    explicit select_socket_state(select_scheduler& sched) noexcept
88  
        : sched_(sched)
88  
        : sched_(sched)
89  
    {
89  
    {
90  
    }
90  
    }
91  

91  

92  
    select_scheduler& sched_;
92  
    select_scheduler& sched_;
93  
    std::mutex mutex_;
93  
    std::mutex mutex_;
94  
    intrusive_list<select_socket> socket_list_;
94  
    intrusive_list<select_socket> socket_list_;
95  
    std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
95  
    std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96  
        socket_ptrs_;
96  
        socket_ptrs_;
97  
};
97  
};
98  

98  

99  
/** select socket service implementation.
99  
/** select socket service implementation.
100  

100  

101  
    Inherits from socket_service to enable runtime polymorphism.
101  
    Inherits from socket_service to enable runtime polymorphism.
102  
    Uses key_type = socket_service for service lookup.
102  
    Uses key_type = socket_service for service lookup.
103  
*/
103  
*/
104  
class BOOST_COROSIO_DECL select_socket_service final : public socket_service
104  
class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105  
{
105  
{
106  
public:
106  
public:
107  
    explicit select_socket_service(capy::execution_context& ctx);
107  
    explicit select_socket_service(capy::execution_context& ctx);
108  
    ~select_socket_service() override;
108  
    ~select_socket_service() override;
109  

109  

110  
    select_socket_service(select_socket_service const&)            = delete;
110  
    select_socket_service(select_socket_service const&)            = delete;
111  
    select_socket_service& operator=(select_socket_service const&) = delete;
111  
    select_socket_service& operator=(select_socket_service const&) = delete;
112  

112  

113  
    void shutdown() override;
113  
    void shutdown() override;
114  

114  

115  
    io_object::implementation* construct() override;
115  
    io_object::implementation* construct() override;
116  
    void destroy(io_object::implementation*) override;
116  
    void destroy(io_object::implementation*) override;
117  
    void close(io_object::handle&) override;
117  
    void close(io_object::handle&) override;
118  
    std::error_code
118  
    std::error_code
119  
    open_socket(tcp_socket::implementation& impl,
119  
    open_socket(tcp_socket::implementation& impl,
120  
                int family, int type, int protocol) override;
120  
                int family, int type, int protocol) override;
121  

121  

122  
    select_scheduler& scheduler() const noexcept
122  
    select_scheduler& scheduler() const noexcept
123  
    {
123  
    {
124  
        return state_->sched_;
124  
        return state_->sched_;
125  
    }
125  
    }
126  
    void post(select_op* op);
126  
    void post(select_op* op);
127  
    void work_started() noexcept;
127  
    void work_started() noexcept;
128  
    void work_finished() noexcept;
128  
    void work_finished() noexcept;
129  

129  

130  
private:
130  
private:
131  
    std::unique_ptr<select_socket_state> state_;
131  
    std::unique_ptr<select_socket_state> state_;
132  
};
132  
};
133  

133  

134  
// Backward compatibility alias
134  
// Backward compatibility alias
135  
using select_sockets = select_socket_service;
135  
using select_sockets = select_socket_service;
136  

136  

137  
inline void
137  
inline void
138  
select_op::canceller::operator()() const noexcept
138  
select_op::canceller::operator()() const noexcept
139  
{
139  
{
140  
    op->cancel();
140  
    op->cancel();
141  
}
141  
}
142  

142  

143  
inline void
143  
inline void
144  
select_connect_op::cancel() noexcept
144  
select_connect_op::cancel() noexcept
145  
{
145  
{
146  
    if (socket_impl_)
146  
    if (socket_impl_)
147  
        socket_impl_->cancel_single_op(*this);
147  
        socket_impl_->cancel_single_op(*this);
148  
    else
148  
    else
149  
        request_cancel();
149  
        request_cancel();
150  
}
150  
}
151  

151  

152  
inline void
152  
inline void
153  
select_read_op::cancel() noexcept
153  
select_read_op::cancel() noexcept
154  
{
154  
{
155  
    if (socket_impl_)
155  
    if (socket_impl_)
156  
        socket_impl_->cancel_single_op(*this);
156  
        socket_impl_->cancel_single_op(*this);
157  
    else
157  
    else
158  
        request_cancel();
158  
        request_cancel();
159  
}
159  
}
160  

160  

161  
inline void
161  
inline void
162  
select_write_op::cancel() noexcept
162  
select_write_op::cancel() noexcept
163  
{
163  
{
164  
    if (socket_impl_)
164  
    if (socket_impl_)
165  
        socket_impl_->cancel_single_op(*this);
165  
        socket_impl_->cancel_single_op(*this);
166  
    else
166  
    else
167  
        request_cancel();
167  
        request_cancel();
168  
}
168  
}
169  

169  

170  
inline void
170  
inline void
171  
select_connect_op::operator()()
171  
select_connect_op::operator()()
172  
{
172  
{
173  
    stop_cb.reset();
173  
    stop_cb.reset();
174  

174  

175  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
175  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
176  

176  

177  
    // Cache endpoints on successful connect
177  
    // Cache endpoints on successful connect
178  
    if (success && socket_impl_)
178  
    if (success && socket_impl_)
179  
    {
179  
    {
180  
        endpoint local_ep;
180  
        endpoint local_ep;
181  
        sockaddr_storage local_storage{};
181  
        sockaddr_storage local_storage{};
182  
        socklen_t local_len = sizeof(local_storage);
182  
        socklen_t local_len = sizeof(local_storage);
183  
        if (::getsockname(
183  
        if (::getsockname(
184  
                fd, reinterpret_cast<sockaddr*>(&local_storage),
184  
                fd, reinterpret_cast<sockaddr*>(&local_storage),
185  
                &local_len) == 0)
185  
                &local_len) == 0)
186  
            local_ep = from_sockaddr(local_storage);
186  
            local_ep = from_sockaddr(local_storage);
187  
        static_cast<select_socket*>(socket_impl_)
187  
        static_cast<select_socket*>(socket_impl_)
188  
            ->set_endpoints(local_ep, target_endpoint);
188  
            ->set_endpoints(local_ep, target_endpoint);
189  
    }
189  
    }
190  

190  

191  
    if (ec_out)
191  
    if (ec_out)
192  
    {
192  
    {
193  
        if (cancelled.load(std::memory_order_acquire))
193  
        if (cancelled.load(std::memory_order_acquire))
194  
            *ec_out = capy::error::canceled;
194  
            *ec_out = capy::error::canceled;
195  
        else if (errn != 0)
195  
        else if (errn != 0)
196  
            *ec_out = make_err(errn);
196  
            *ec_out = make_err(errn);
197  
        else
197  
        else
198  
            *ec_out = {};
198  
            *ec_out = {};
199  
    }
199  
    }
200  

200  

201  
    if (bytes_out)
201  
    if (bytes_out)
202  
        *bytes_out = bytes_transferred;
202  
        *bytes_out = bytes_transferred;
203  

203  

204  
    // Move to stack before destroying the frame
204  
    // Move to stack before destroying the frame
205  
    capy::executor_ref saved_ex(ex);
205  
    capy::executor_ref saved_ex(ex);
206  
    std::coroutine_handle<> saved_h(h);
206  
    std::coroutine_handle<> saved_h(h);
207  
    impl_ptr.reset();
207  
    impl_ptr.reset();
208  
    dispatch_coro(saved_ex, saved_h).resume();
208  
    dispatch_coro(saved_ex, saved_h).resume();
209  
}
209  
}
210  

210  

211  
inline select_socket::select_socket(select_socket_service& svc) noexcept
211  
inline select_socket::select_socket(select_socket_service& svc) noexcept
212  
    : svc_(svc)
212  
    : svc_(svc)
213  
{
213  
{
214  
}
214  
}
215  

215  

216  
inline std::coroutine_handle<>
216  
inline std::coroutine_handle<>
217  
select_socket::connect(
217  
select_socket::connect(
218  
    std::coroutine_handle<> h,
218  
    std::coroutine_handle<> h,
219  
    capy::executor_ref ex,
219  
    capy::executor_ref ex,
220  
    endpoint ep,
220  
    endpoint ep,
221  
    std::stop_token token,
221  
    std::stop_token token,
222  
    std::error_code* ec)
222  
    std::error_code* ec)
223  
{
223  
{
224  
    auto& op = conn_;
224  
    auto& op = conn_;
225  
    op.reset();
225  
    op.reset();
226  
    op.h               = h;
226  
    op.h               = h;
227  
    op.ex              = ex;
227  
    op.ex              = ex;
228  
    op.ec_out          = ec;
228  
    op.ec_out          = ec;
229  
    op.fd              = fd_;
229  
    op.fd              = fd_;
230  
    op.target_endpoint = ep; // Store target for endpoint caching
230  
    op.target_endpoint = ep; // Store target for endpoint caching
231  
    op.start(token, this);
231  
    op.start(token, this);
232  

232  

233  
    sockaddr_storage storage{};
233  
    sockaddr_storage storage{};
234  
    socklen_t addrlen =
234  
    socklen_t addrlen =
235  
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
235  
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
236  
    int result =
236  
    int result =
237  
        ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
237  
        ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
238  

238  

239  
    if (result == 0)
239  
    if (result == 0)
240  
    {
240  
    {
241  
        // Sync success — cache endpoints immediately
241  
        // Sync success — cache endpoints immediately
242  
        sockaddr_storage local_storage{};
242  
        sockaddr_storage local_storage{};
243  
        socklen_t local_len = sizeof(local_storage);
243  
        socklen_t local_len = sizeof(local_storage);
244  
        if (::getsockname(
244  
        if (::getsockname(
245  
                fd_, reinterpret_cast<sockaddr*>(&local_storage),
245  
                fd_, reinterpret_cast<sockaddr*>(&local_storage),
246  
                &local_len) == 0)
246  
                &local_len) == 0)
247  
            local_endpoint_ = detail::from_sockaddr(local_storage);
247  
            local_endpoint_ = detail::from_sockaddr(local_storage);
248  
        remote_endpoint_ = ep;
248  
        remote_endpoint_ = ep;
249  

249  

250  
        op.complete(0, 0);
250  
        op.complete(0, 0);
251  
        op.impl_ptr = shared_from_this();
251  
        op.impl_ptr = shared_from_this();
252  
        svc_.post(&op);
252  
        svc_.post(&op);
253  
        // completion is always posted to scheduler queue, never inline.
253  
        // completion is always posted to scheduler queue, never inline.
254  
        return std::noop_coroutine();
254  
        return std::noop_coroutine();
255  
    }
255  
    }
256  

256  

257  
    if (errno == EINPROGRESS)
257  
    if (errno == EINPROGRESS)
258  
    {
258  
    {
259  
        svc_.work_started();
259  
        svc_.work_started();
260  
        op.impl_ptr = shared_from_this();
260  
        op.impl_ptr = shared_from_this();
261  

261  

262  
        // Set registering BEFORE register_fd to close the race window where
262  
        // Set registering BEFORE register_fd to close the race window where
263  
        // reactor sees an event before we set registered. The reactor treats
263  
        // reactor sees an event before we set registered. The reactor treats
264  
        // registering the same as registered when claiming the op.
264  
        // registering the same as registered when claiming the op.
265  
        op.registered.store(
265  
        op.registered.store(
266  
            select_registration_state::registering, std::memory_order_release);
266  
            select_registration_state::registering, std::memory_order_release);
267  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
267  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
268  

268  

269  
        // Transition to registered. If this fails, reactor or cancel already
269  
        // Transition to registered. If this fails, reactor or cancel already
270  
        // claimed the op (state is now unregistered), so we're done. However,
270  
        // claimed the op (state is now unregistered), so we're done. However,
271  
        // we must still deregister the fd because cancel's deregister_fd may
271  
        // we must still deregister the fd because cancel's deregister_fd may
272  
        // have run before our register_fd, leaving the fd orphaned.
272  
        // have run before our register_fd, leaving the fd orphaned.
273  
        auto expected = select_registration_state::registering;
273  
        auto expected = select_registration_state::registering;
274  
        if (!op.registered.compare_exchange_strong(
274  
        if (!op.registered.compare_exchange_strong(
275  
                expected, select_registration_state::registered,
275  
                expected, select_registration_state::registered,
276  
                std::memory_order_acq_rel))
276  
                std::memory_order_acq_rel))
277  
        {
277  
        {
278  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
278  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
279  
            // completion is always posted to scheduler queue, never inline.
279  
            // completion is always posted to scheduler queue, never inline.
280  
            return std::noop_coroutine();
280  
            return std::noop_coroutine();
281  
        }
281  
        }
282  

282  

283  
        // If cancelled was set before we registered, handle it now.
283  
        // If cancelled was set before we registered, handle it now.
284  
        if (op.cancelled.load(std::memory_order_acquire))
284  
        if (op.cancelled.load(std::memory_order_acquire))
285  
        {
285  
        {
286  
            auto prev = op.registered.exchange(
286  
            auto prev = op.registered.exchange(
287  
                select_registration_state::unregistered,
287  
                select_registration_state::unregistered,
288  
                std::memory_order_acq_rel);
288  
                std::memory_order_acq_rel);
289  
            if (prev != select_registration_state::unregistered)
289  
            if (prev != select_registration_state::unregistered)
290  
            {
290  
            {
291  
                svc_.scheduler().deregister_fd(
291  
                svc_.scheduler().deregister_fd(
292  
                    fd_, select_scheduler::event_write);
292  
                    fd_, select_scheduler::event_write);
293  
                op.impl_ptr = shared_from_this();
293  
                op.impl_ptr = shared_from_this();
294  
                svc_.post(&op);
294  
                svc_.post(&op);
295  
                svc_.work_finished();
295  
                svc_.work_finished();
296  
            }
296  
            }
297  
        }
297  
        }
298  
        // completion is always posted to scheduler queue, never inline.
298  
        // completion is always posted to scheduler queue, never inline.
299  
        return std::noop_coroutine();
299  
        return std::noop_coroutine();
300  
    }
300  
    }
301  

301  

302  
    op.complete(errno, 0);
302  
    op.complete(errno, 0);
303  
    op.impl_ptr = shared_from_this();
303  
    op.impl_ptr = shared_from_this();
304  
    svc_.post(&op);
304  
    svc_.post(&op);
305  
    // completion is always posted to scheduler queue, never inline.
305  
    // completion is always posted to scheduler queue, never inline.
306  
    return std::noop_coroutine();
306  
    return std::noop_coroutine();
307  
}
307  
}
308  

308  

309  
inline std::coroutine_handle<>
309  
inline std::coroutine_handle<>
310  
select_socket::read_some(
310  
select_socket::read_some(
311  
    std::coroutine_handle<> h,
311  
    std::coroutine_handle<> h,
312  
    capy::executor_ref ex,
312  
    capy::executor_ref ex,
313  
    io_buffer_param param,
313  
    io_buffer_param param,
314  
    std::stop_token token,
314  
    std::stop_token token,
315  
    std::error_code* ec,
315  
    std::error_code* ec,
316  
    std::size_t* bytes_out)
316  
    std::size_t* bytes_out)
317  
{
317  
{
318  
    auto& op = rd_;
318  
    auto& op = rd_;
319  
    op.reset();
319  
    op.reset();
320  
    op.h         = h;
320  
    op.h         = h;
321  
    op.ex        = ex;
321  
    op.ex        = ex;
322  
    op.ec_out    = ec;
322  
    op.ec_out    = ec;
323  
    op.bytes_out = bytes_out;
323  
    op.bytes_out = bytes_out;
324  
    op.fd        = fd_;
324  
    op.fd        = fd_;
325  
    op.start(token, this);
325  
    op.start(token, this);
326  

326  

327  
    capy::mutable_buffer bufs[select_read_op::max_buffers];
327  
    capy::mutable_buffer bufs[select_read_op::max_buffers];
328  
    op.iovec_count =
328  
    op.iovec_count =
329  
        static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
329  
        static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
330  

330  

331  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
331  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
332  
    {
332  
    {
333  
        op.empty_buffer_read = true;
333  
        op.empty_buffer_read = true;
334  
        op.complete(0, 0);
334  
        op.complete(0, 0);
335  
        op.impl_ptr = shared_from_this();
335  
        op.impl_ptr = shared_from_this();
336  
        svc_.post(&op);
336  
        svc_.post(&op);
337  
        return std::noop_coroutine();
337  
        return std::noop_coroutine();
338  
    }
338  
    }
339  

339  

340  
    for (int i = 0; i < op.iovec_count; ++i)
340  
    for (int i = 0; i < op.iovec_count; ++i)
341  
    {
341  
    {
342  
        op.iovecs[i].iov_base = bufs[i].data();
342  
        op.iovecs[i].iov_base = bufs[i].data();
343  
        op.iovecs[i].iov_len  = bufs[i].size();
343  
        op.iovecs[i].iov_len  = bufs[i].size();
344  
    }
344  
    }
345  

345  

346  
    ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
346  
    ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
347  

347  

348  
    if (n > 0)
348  
    if (n > 0)
349  
    {
349  
    {
350  
        op.complete(0, static_cast<std::size_t>(n));
350  
        op.complete(0, static_cast<std::size_t>(n));
351  
        op.impl_ptr = shared_from_this();
351  
        op.impl_ptr = shared_from_this();
352  
        svc_.post(&op);
352  
        svc_.post(&op);
353  
        return std::noop_coroutine();
353  
        return std::noop_coroutine();
354  
    }
354  
    }
355  

355  

356  
    if (n == 0)
356  
    if (n == 0)
357  
    {
357  
    {
358  
        op.complete(0, 0);
358  
        op.complete(0, 0);
359  
        op.impl_ptr = shared_from_this();
359  
        op.impl_ptr = shared_from_this();
360  
        svc_.post(&op);
360  
        svc_.post(&op);
361  
        return std::noop_coroutine();
361  
        return std::noop_coroutine();
362  
    }
362  
    }
363  

363  

364  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
364  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
365  
    {
365  
    {
366  
        svc_.work_started();
366  
        svc_.work_started();
367  
        op.impl_ptr = shared_from_this();
367  
        op.impl_ptr = shared_from_this();
368  

368  

369  
        // Set registering BEFORE register_fd to close the race window where
369  
        // Set registering BEFORE register_fd to close the race window where
370  
        // reactor sees an event before we set registered.
370  
        // reactor sees an event before we set registered.
371  
        op.registered.store(
371  
        op.registered.store(
372  
            select_registration_state::registering, std::memory_order_release);
372  
            select_registration_state::registering, std::memory_order_release);
373  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
373  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
374  

374  

375  
        // Transition to registered. If this fails, reactor or cancel already
375  
        // Transition to registered. If this fails, reactor or cancel already
376  
        // claimed the op (state is now unregistered), so we're done. However,
376  
        // claimed the op (state is now unregistered), so we're done. However,
377  
        // we must still deregister the fd because cancel's deregister_fd may
377  
        // we must still deregister the fd because cancel's deregister_fd may
378  
        // have run before our register_fd, leaving the fd orphaned.
378  
        // have run before our register_fd, leaving the fd orphaned.
379  
        auto expected = select_registration_state::registering;
379  
        auto expected = select_registration_state::registering;
380  
        if (!op.registered.compare_exchange_strong(
380  
        if (!op.registered.compare_exchange_strong(
381  
                expected, select_registration_state::registered,
381  
                expected, select_registration_state::registered,
382  
                std::memory_order_acq_rel))
382  
                std::memory_order_acq_rel))
383  
        {
383  
        {
384  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
384  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
385  
            return std::noop_coroutine();
385  
            return std::noop_coroutine();
386  
        }
386  
        }
387  

387  

388  
        // If cancelled was set before we registered, handle it now.
388  
        // If cancelled was set before we registered, handle it now.
389  
        if (op.cancelled.load(std::memory_order_acquire))
389  
        if (op.cancelled.load(std::memory_order_acquire))
390  
        {
390  
        {
391  
            auto prev = op.registered.exchange(
391  
            auto prev = op.registered.exchange(
392  
                select_registration_state::unregistered,
392  
                select_registration_state::unregistered,
393  
                std::memory_order_acq_rel);
393  
                std::memory_order_acq_rel);
394  
            if (prev != select_registration_state::unregistered)
394  
            if (prev != select_registration_state::unregistered)
395  
            {
395  
            {
396  
                svc_.scheduler().deregister_fd(
396  
                svc_.scheduler().deregister_fd(
397  
                    fd_, select_scheduler::event_read);
397  
                    fd_, select_scheduler::event_read);
398  
                op.impl_ptr = shared_from_this();
398  
                op.impl_ptr = shared_from_this();
399  
                svc_.post(&op);
399  
                svc_.post(&op);
400  
                svc_.work_finished();
400  
                svc_.work_finished();
401  
            }
401  
            }
402  
        }
402  
        }
403  
        return std::noop_coroutine();
403  
        return std::noop_coroutine();
404  
    }
404  
    }
405  

405  

406  
    op.complete(errno, 0);
406  
    op.complete(errno, 0);
407  
    op.impl_ptr = shared_from_this();
407  
    op.impl_ptr = shared_from_this();
408  
    svc_.post(&op);
408  
    svc_.post(&op);
409  
    return std::noop_coroutine();
409  
    return std::noop_coroutine();
410  
}
410  
}
411  

411  

412  
inline std::coroutine_handle<>
412  
inline std::coroutine_handle<>
413  
select_socket::write_some(
413  
select_socket::write_some(
414  
    std::coroutine_handle<> h,
414  
    std::coroutine_handle<> h,
415  
    capy::executor_ref ex,
415  
    capy::executor_ref ex,
416  
    io_buffer_param param,
416  
    io_buffer_param param,
417  
    std::stop_token token,
417  
    std::stop_token token,
418  
    std::error_code* ec,
418  
    std::error_code* ec,
419  
    std::size_t* bytes_out)
419  
    std::size_t* bytes_out)
420  
{
420  
{
421  
    auto& op = wr_;
421  
    auto& op = wr_;
422  
    op.reset();
422  
    op.reset();
423  
    op.h         = h;
423  
    op.h         = h;
424  
    op.ex        = ex;
424  
    op.ex        = ex;
425  
    op.ec_out    = ec;
425  
    op.ec_out    = ec;
426  
    op.bytes_out = bytes_out;
426  
    op.bytes_out = bytes_out;
427  
    op.fd        = fd_;
427  
    op.fd        = fd_;
428  
    op.start(token, this);
428  
    op.start(token, this);
429  

429  

430  
    capy::mutable_buffer bufs[select_write_op::max_buffers];
430  
    capy::mutable_buffer bufs[select_write_op::max_buffers];
431  
    op.iovec_count =
431  
    op.iovec_count =
432  
        static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
432  
        static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
433  

433  

434  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
434  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
435  
    {
435  
    {
436  
        op.complete(0, 0);
436  
        op.complete(0, 0);
437  
        op.impl_ptr = shared_from_this();
437  
        op.impl_ptr = shared_from_this();
438  
        svc_.post(&op);
438  
        svc_.post(&op);
439  
        return std::noop_coroutine();
439  
        return std::noop_coroutine();
440  
    }
440  
    }
441  

441  

442  
    for (int i = 0; i < op.iovec_count; ++i)
442  
    for (int i = 0; i < op.iovec_count; ++i)
443  
    {
443  
    {
444  
        op.iovecs[i].iov_base = bufs[i].data();
444  
        op.iovecs[i].iov_base = bufs[i].data();
445  
        op.iovecs[i].iov_len  = bufs[i].size();
445  
        op.iovecs[i].iov_len  = bufs[i].size();
446  
    }
446  
    }
447  

447  

448  
    msghdr msg{};
448  
    msghdr msg{};
449  
    msg.msg_iov    = op.iovecs;
449  
    msg.msg_iov    = op.iovecs;
450  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
450  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
451  

451  

452  
    ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
452  
    ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
453  

453  

454  
    if (n > 0)
454  
    if (n > 0)
455  
    {
455  
    {
456  
        op.complete(0, static_cast<std::size_t>(n));
456  
        op.complete(0, static_cast<std::size_t>(n));
457  
        op.impl_ptr = shared_from_this();
457  
        op.impl_ptr = shared_from_this();
458  
        svc_.post(&op);
458  
        svc_.post(&op);
459  
        return std::noop_coroutine();
459  
        return std::noop_coroutine();
460  
    }
460  
    }
461  

461  

462  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
462  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
463  
    {
463  
    {
464  
        svc_.work_started();
464  
        svc_.work_started();
465  
        op.impl_ptr = shared_from_this();
465  
        op.impl_ptr = shared_from_this();
466  

466  

467  
        // Set registering BEFORE register_fd to close the race window where
467  
        // Set registering BEFORE register_fd to close the race window where
468  
        // reactor sees an event before we set registered.
468  
        // reactor sees an event before we set registered.
469  
        op.registered.store(
469  
        op.registered.store(
470  
            select_registration_state::registering, std::memory_order_release);
470  
            select_registration_state::registering, std::memory_order_release);
471  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
471  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
472  

472  

473  
        // Transition to registered. If this fails, reactor or cancel already
473  
        // Transition to registered. If this fails, reactor or cancel already
474  
        // claimed the op (state is now unregistered), so we're done. However,
474  
        // claimed the op (state is now unregistered), so we're done. However,
475  
        // we must still deregister the fd because cancel's deregister_fd may
475  
        // we must still deregister the fd because cancel's deregister_fd may
476  
        // have run before our register_fd, leaving the fd orphaned.
476  
        // have run before our register_fd, leaving the fd orphaned.
477  
        auto expected = select_registration_state::registering;
477  
        auto expected = select_registration_state::registering;
478  
        if (!op.registered.compare_exchange_strong(
478  
        if (!op.registered.compare_exchange_strong(
479  
                expected, select_registration_state::registered,
479  
                expected, select_registration_state::registered,
480  
                std::memory_order_acq_rel))
480  
                std::memory_order_acq_rel))
481  
        {
481  
        {
482  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
482  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
483  
            return std::noop_coroutine();
483  
            return std::noop_coroutine();
484  
        }
484  
        }
485  

485  

486  
        // If cancelled was set before we registered, handle it now.
486  
        // If cancelled was set before we registered, handle it now.
487  
        if (op.cancelled.load(std::memory_order_acquire))
487  
        if (op.cancelled.load(std::memory_order_acquire))
488  
        {
488  
        {
489  
            auto prev = op.registered.exchange(
489  
            auto prev = op.registered.exchange(
490  
                select_registration_state::unregistered,
490  
                select_registration_state::unregistered,
491  
                std::memory_order_acq_rel);
491  
                std::memory_order_acq_rel);
492  
            if (prev != select_registration_state::unregistered)
492  
            if (prev != select_registration_state::unregistered)
493  
            {
493  
            {
494  
                svc_.scheduler().deregister_fd(
494  
                svc_.scheduler().deregister_fd(
495  
                    fd_, select_scheduler::event_write);
495  
                    fd_, select_scheduler::event_write);
496  
                op.impl_ptr = shared_from_this();
496  
                op.impl_ptr = shared_from_this();
497  
                svc_.post(&op);
497  
                svc_.post(&op);
498  
                svc_.work_finished();
498  
                svc_.work_finished();
499  
            }
499  
            }
500  
        }
500  
        }
501  
        return std::noop_coroutine();
501  
        return std::noop_coroutine();
502  
    }
502  
    }
503  

503  

504  
    op.complete(errno ? errno : EIO, 0);
504  
    op.complete(errno ? errno : EIO, 0);
505  
    op.impl_ptr = shared_from_this();
505  
    op.impl_ptr = shared_from_this();
506  
    svc_.post(&op);
506  
    svc_.post(&op);
507  
    return std::noop_coroutine();
507  
    return std::noop_coroutine();
508  
}
508  
}
509  

509  

510  
inline std::error_code
510  
inline std::error_code
511  
select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
511  
select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
512  
{
512  
{
513  
    int how;
513  
    int how;
514  
    switch (what)
514  
    switch (what)
515  
    {
515  
    {
516  
    case tcp_socket::shutdown_receive:
516  
    case tcp_socket::shutdown_receive:
517  
        how = SHUT_RD;
517  
        how = SHUT_RD;
518  
        break;
518  
        break;
519  
    case tcp_socket::shutdown_send:
519  
    case tcp_socket::shutdown_send:
520  
        how = SHUT_WR;
520  
        how = SHUT_WR;
521  
        break;
521  
        break;
522  
    case tcp_socket::shutdown_both:
522  
    case tcp_socket::shutdown_both:
523  
        how = SHUT_RDWR;
523  
        how = SHUT_RDWR;
524  
        break;
524  
        break;
525  
    default:
525  
    default:
526  
        return make_err(EINVAL);
526  
        return make_err(EINVAL);
527  
    }
527  
    }
528  
    if (::shutdown(fd_, how) != 0)
528  
    if (::shutdown(fd_, how) != 0)
529  
        return make_err(errno);
529  
        return make_err(errno);
530  
    return {};
530  
    return {};
531  
}
531  
}
532  

532  

533  
inline std::error_code
533  
inline std::error_code
534  
select_socket::set_option(
534  
select_socket::set_option(
535  
    int level, int optname,
535  
    int level, int optname,
536  
    void const* data, std::size_t size) noexcept
536  
    void const* data, std::size_t size) noexcept
537  
{
537  
{
538  
    if (::setsockopt(fd_, level, optname, data,
538  
    if (::setsockopt(fd_, level, optname, data,
539  
            static_cast<socklen_t>(size)) != 0)
539  
            static_cast<socklen_t>(size)) != 0)
540  
        return make_err(errno);
540  
        return make_err(errno);
541  
    return {};
541  
    return {};
542  
}
542  
}
543  

543  

544  
inline std::error_code
544  
inline std::error_code
545  
select_socket::get_option(
545  
select_socket::get_option(
546  
    int level, int optname,
546  
    int level, int optname,
547  
    void* data, std::size_t* size) const noexcept
547  
    void* data, std::size_t* size) const noexcept
548  
{
548  
{
549  
    socklen_t len = static_cast<socklen_t>(*size);
549  
    socklen_t len = static_cast<socklen_t>(*size);
550  
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
550  
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
551  
        return make_err(errno);
551  
        return make_err(errno);
552  
    *size = static_cast<std::size_t>(len);
552  
    *size = static_cast<std::size_t>(len);
553  
    return {};
553  
    return {};
554  
}
554  
}
555  

555  

556  
inline void
556  
inline void
557  
select_socket::cancel() noexcept
557  
select_socket::cancel() noexcept
558  
{
558  
{
559  
    auto self = weak_from_this().lock();
559  
    auto self = weak_from_this().lock();
560  
    if (!self)
560  
    if (!self)
561  
        return;
561  
        return;
562  

562  

563  
    auto cancel_op = [this, &self](select_op& op, int events) {
563  
    auto cancel_op = [this, &self](select_op& op, int events) {
564  
        auto prev = op.registered.exchange(
564  
        auto prev = op.registered.exchange(
565  
            select_registration_state::unregistered, std::memory_order_acq_rel);
565  
            select_registration_state::unregistered, std::memory_order_acq_rel);
566  
        op.request_cancel();
566  
        op.request_cancel();
567  
        if (prev != select_registration_state::unregistered)
567  
        if (prev != select_registration_state::unregistered)
568  
        {
568  
        {
569  
            svc_.scheduler().deregister_fd(fd_, events);
569  
            svc_.scheduler().deregister_fd(fd_, events);
570  
            op.impl_ptr = self;
570  
            op.impl_ptr = self;
571  
            svc_.post(&op);
571  
            svc_.post(&op);
572  
            svc_.work_finished();
572  
            svc_.work_finished();
573  
        }
573  
        }
574  
    };
574  
    };
575  

575  

576  
    cancel_op(conn_, select_scheduler::event_write);
576  
    cancel_op(conn_, select_scheduler::event_write);
577  
    cancel_op(rd_, select_scheduler::event_read);
577  
    cancel_op(rd_, select_scheduler::event_read);
578  
    cancel_op(wr_, select_scheduler::event_write);
578  
    cancel_op(wr_, select_scheduler::event_write);
579  
}
579  
}
580  

580  

581  
inline void
581  
inline void
582  
select_socket::cancel_single_op(select_op& op) noexcept
582  
select_socket::cancel_single_op(select_op& op) noexcept
583  
{
583  
{
584  
    auto self = weak_from_this().lock();
584  
    auto self = weak_from_this().lock();
585  
    if (!self)
585  
    if (!self)
586  
        return;
586  
        return;
587  

587  

588  
    // Called from stop_token callback to cancel a specific pending operation.
588  
    // Called from stop_token callback to cancel a specific pending operation.
589  
    auto prev = op.registered.exchange(
589  
    auto prev = op.registered.exchange(
590  
        select_registration_state::unregistered, std::memory_order_acq_rel);
590  
        select_registration_state::unregistered, std::memory_order_acq_rel);
591  
    op.request_cancel();
591  
    op.request_cancel();
592  

592  

593  
    if (prev != select_registration_state::unregistered)
593  
    if (prev != select_registration_state::unregistered)
594  
    {
594  
    {
595  
        // Determine which event type to deregister
595  
        // Determine which event type to deregister
596  
        int events = 0;
596  
        int events = 0;
597  
        if (&op == &conn_ || &op == &wr_)
597  
        if (&op == &conn_ || &op == &wr_)
598  
            events = select_scheduler::event_write;
598  
            events = select_scheduler::event_write;
599  
        else if (&op == &rd_)
599  
        else if (&op == &rd_)
600  
            events = select_scheduler::event_read;
600  
            events = select_scheduler::event_read;
601  

601  

602  
        svc_.scheduler().deregister_fd(fd_, events);
602  
        svc_.scheduler().deregister_fd(fd_, events);
603  

603  

604  
        op.impl_ptr = self;
604  
        op.impl_ptr = self;
605  
        svc_.post(&op);
605  
        svc_.post(&op);
606  
        svc_.work_finished();
606  
        svc_.work_finished();
607  
    }
607  
    }
608  
}
608  
}
609  

609  

610  
inline void
610  
inline void
611  
select_socket::close_socket() noexcept
611  
select_socket::close_socket() noexcept
612  
{
612  
{
613  
    auto self = weak_from_this().lock();
613  
    auto self = weak_from_this().lock();
614  
    if (self)
614  
    if (self)
615  
    {
615  
    {
616  
        auto cancel_op = [this, &self](select_op& op, int events) {
616  
        auto cancel_op = [this, &self](select_op& op, int events) {
617  
            auto prev = op.registered.exchange(
617  
            auto prev = op.registered.exchange(
618  
                select_registration_state::unregistered,
618  
                select_registration_state::unregistered,
619  
                std::memory_order_acq_rel);
619  
                std::memory_order_acq_rel);
620  
            op.request_cancel();
620  
            op.request_cancel();
621  
            if (prev != select_registration_state::unregistered)
621  
            if (prev != select_registration_state::unregistered)
622  
            {
622  
            {
623  
                svc_.scheduler().deregister_fd(fd_, events);
623  
                svc_.scheduler().deregister_fd(fd_, events);
624  
                op.impl_ptr = self;
624  
                op.impl_ptr = self;
625  
                svc_.post(&op);
625  
                svc_.post(&op);
626  
                svc_.work_finished();
626  
                svc_.work_finished();
627  
            }
627  
            }
628  
        };
628  
        };
629  

629  

630  
        cancel_op(conn_, select_scheduler::event_write);
630  
        cancel_op(conn_, select_scheduler::event_write);
631  
        cancel_op(rd_, select_scheduler::event_read);
631  
        cancel_op(rd_, select_scheduler::event_read);
632  
        cancel_op(wr_, select_scheduler::event_write);
632  
        cancel_op(wr_, select_scheduler::event_write);
633  
    }
633  
    }
634  

634  

635  
    if (fd_ >= 0)
635  
    if (fd_ >= 0)
636  
    {
636  
    {
637  
        svc_.scheduler().deregister_fd(
637  
        svc_.scheduler().deregister_fd(
638  
            fd_, select_scheduler::event_read | select_scheduler::event_write);
638  
            fd_, select_scheduler::event_read | select_scheduler::event_write);
639  
        ::close(fd_);
639  
        ::close(fd_);
640  
        fd_ = -1;
640  
        fd_ = -1;
641  
    }
641  
    }
642  

642  

643  
    local_endpoint_  = endpoint{};
643  
    local_endpoint_  = endpoint{};
644  
    remote_endpoint_ = endpoint{};
644  
    remote_endpoint_ = endpoint{};
645  
}
645  
}
646  

646  

647  
inline select_socket_service::select_socket_service(
647  
inline select_socket_service::select_socket_service(
648  
    capy::execution_context& ctx)
648  
    capy::execution_context& ctx)
649  
    : state_(
649  
    : state_(
650  
          std::make_unique<select_socket_state>(
650  
          std::make_unique<select_socket_state>(
651  
              ctx.use_service<select_scheduler>()))
651  
              ctx.use_service<select_scheduler>()))
652  
{
652  
{
653  
}
653  
}
654  

654  

655  
inline select_socket_service::~select_socket_service() {}
655  
inline select_socket_service::~select_socket_service() {}
656  

656  

657  
inline void
657  
inline void
658  
select_socket_service::shutdown()
658  
select_socket_service::shutdown()
659  
{
659  
{
660  
    std::lock_guard lock(state_->mutex_);
660  
    std::lock_guard lock(state_->mutex_);
661  

661  

662  
    while (auto* impl = state_->socket_list_.pop_front())
662  
    while (auto* impl = state_->socket_list_.pop_front())
663  
        impl->close_socket();
663  
        impl->close_socket();
664  

664  

665  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
665  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
666  
    // drains completed_ops_, calling destroy() on each queued op. Letting
666  
    // drains completed_ops_, calling destroy() on each queued op. Letting
667  
    // ~state_ release the ptrs (during service destruction, after scheduler
667  
    // ~state_ release the ptrs (during service destruction, after scheduler
668  
    // shutdown) keeps every impl alive until all ops have been drained.
668  
    // shutdown) keeps every impl alive until all ops have been drained.
669  
}
669  
}
670  

670  

671  
inline io_object::implementation*
671  
inline io_object::implementation*
672  
select_socket_service::construct()
672  
select_socket_service::construct()
673  
{
673  
{
674  
    auto impl = std::make_shared<select_socket>(*this);
674  
    auto impl = std::make_shared<select_socket>(*this);
675  
    auto* raw = impl.get();
675  
    auto* raw = impl.get();
676  

676  

677  
    {
677  
    {
678  
        std::lock_guard lock(state_->mutex_);
678  
        std::lock_guard lock(state_->mutex_);
679  
        state_->socket_list_.push_back(raw);
679  
        state_->socket_list_.push_back(raw);
680  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
680  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
681  
    }
681  
    }
682  

682  

683  
    return raw;
683  
    return raw;
684  
}
684  
}
685  

685  

686  
inline void
686  
inline void
687  
select_socket_service::destroy(io_object::implementation* impl)
687  
select_socket_service::destroy(io_object::implementation* impl)
688  
{
688  
{
689  
    auto* select_impl = static_cast<select_socket*>(impl);
689  
    auto* select_impl = static_cast<select_socket*>(impl);
690  
    select_impl->close_socket();
690  
    select_impl->close_socket();
691  
    std::lock_guard lock(state_->mutex_);
691  
    std::lock_guard lock(state_->mutex_);
692  
    state_->socket_list_.remove(select_impl);
692  
    state_->socket_list_.remove(select_impl);
693  
    state_->socket_ptrs_.erase(select_impl);
693  
    state_->socket_ptrs_.erase(select_impl);
694  
}
694  
}
695  

695  

696  
inline std::error_code
696  
inline std::error_code
697  
select_socket_service::open_socket(
697  
select_socket_service::open_socket(
698  
    tcp_socket::implementation& impl,
698  
    tcp_socket::implementation& impl,
699  
    int family, int type, int protocol)
699  
    int family, int type, int protocol)
700  
{
700  
{
701  
    auto* select_impl = static_cast<select_socket*>(&impl);
701  
    auto* select_impl = static_cast<select_socket*>(&impl);
702  
    select_impl->close_socket();
702  
    select_impl->close_socket();
703  

703  

704  
    int fd = ::socket(family, type, protocol);
704  
    int fd = ::socket(family, type, protocol);
705  
    if (fd < 0)
705  
    if (fd < 0)
706  
        return make_err(errno);
706  
        return make_err(errno);
707  

707  

708  
    if (family == AF_INET6)
708  
    if (family == AF_INET6)
709  
    {
709  
    {
710  
        int one = 1;
710  
        int one = 1;
711  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
711  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
712  
    }
712  
    }
713  

713  

714  
    // Set non-blocking and close-on-exec
714  
    // Set non-blocking and close-on-exec
715  
    int flags = ::fcntl(fd, F_GETFL, 0);
715  
    int flags = ::fcntl(fd, F_GETFL, 0);
716  
    if (flags == -1)
716  
    if (flags == -1)
717  
    {
717  
    {
718  
        int errn = errno;
718  
        int errn = errno;
719  
        ::close(fd);
719  
        ::close(fd);
720  
        return make_err(errn);
720  
        return make_err(errn);
721  
    }
721  
    }
722  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
722  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
723  
    {
723  
    {
724  
        int errn = errno;
724  
        int errn = errno;
725  
        ::close(fd);
725  
        ::close(fd);
726  
        return make_err(errn);
726  
        return make_err(errn);
727  
    }
727  
    }
728  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
728  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
729  
    {
729  
    {
730  
        int errn = errno;
730  
        int errn = errno;
731  
        ::close(fd);
731  
        ::close(fd);
732  
        return make_err(errn);
732  
        return make_err(errn);
733  
    }
733  
    }
734  

734  

735  
    // Check fd is within select() limits
735  
    // Check fd is within select() limits
736  
    if (fd >= FD_SETSIZE)
736  
    if (fd >= FD_SETSIZE)
737  
    {
737  
    {
738  
        ::close(fd);
738  
        ::close(fd);
739  
        return make_err(EMFILE); // Too many open files
739  
        return make_err(EMFILE); // Too many open files
740  
    }
740  
    }
741  

741  

742  
    select_impl->fd_ = fd;
742  
    select_impl->fd_ = fd;
743  
    return {};
743  
    return {};
744  
}
744  
}
745  

745  

746  
inline void
746  
inline void
747  
select_socket_service::close(io_object::handle& h)
747  
select_socket_service::close(io_object::handle& h)
748  
{
748  
{
749  
    static_cast<select_socket*>(h.get())->close_socket();
749  
    static_cast<select_socket*>(h.get())->close_socket();
750  
}
750  
}
751  

751  

752  
inline void
752  
inline void
753  
select_socket_service::post(select_op* op)
753  
select_socket_service::post(select_op* op)
754  
{
754  
{
755  
    state_->sched_.post(op);
755  
    state_->sched_.post(op);
756  
}
756  
}
757  

757  

758  
inline void
758  
inline void
759  
select_socket_service::work_started() noexcept
759  
select_socket_service::work_started() noexcept
760  
{
760  
{
761  
    state_->sched_.work_started();
761  
    state_->sched_.work_started();
762  
}
762  
}
763  

763  

764  
inline void
764  
inline void
765  
select_socket_service::work_finished() noexcept
765  
select_socket_service::work_finished() noexcept
766  
{
766  
{
767  
    state_->sched_.work_finished();
767  
    state_->sched_.work_finished();
768  
}
768  
}
769  

769  

770  
} // namespace boost::corosio::detail
770  
} // namespace boost::corosio::detail
771  

771  

772  
#endif // BOOST_COROSIO_HAS_SELECT
772  
#endif // BOOST_COROSIO_HAS_SELECT
773  

773  

774  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
774  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP