include/boost/corosio/native/detail/epoll/epoll_socket_service.hpp

81.0% Lines (337/416) 93.3% Functions (28/30)
include/boost/corosio/native/detail/epoll/epoll_socket_service.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23
24 #include <boost/corosio/detail/endpoint_convert.hpp>
25 #include <boost/corosio/detail/make_err.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/except.hpp>
28 #include <boost/capy/buffers.hpp>
29
30 #include <coroutine>
31 #include <mutex>
32 #include <unordered_map>
33 #include <utility>
34
35 #include <errno.h>
36 #include <netinet/in.h>
37 #include <netinet/tcp.h>
38 #include <sys/epoll.h>
39 #include <sys/socket.h>
40 #include <unistd.h>
41
42 /*
43 epoll Socket Implementation
44 ===========================
45
46 Each I/O operation follows the same pattern:
47 1. Try the syscall immediately (non-blocking socket)
48 2. If it succeeds or fails with a real error, post to completion queue
49 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50
51 This "try first" approach avoids unnecessary epoll round-trips for
52 operations that can complete immediately (common for small reads/writes
53 on fast local connections).
54
55 One-Shot Registration
56 ---------------------
57 We use one-shot epoll registration: each operation registers, waits for
58 one event, then unregisters. This simplifies the state machine since we
59 don't need to track whether an fd is currently registered or handle
60 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61 simplicity is worth it.
62
63 Cancellation
64 ------------
65 See op.hpp for the completion/cancellation race handling via the
66 `registered` atomic. cancel() must complete pending operations (post
67 them with cancelled flag) so coroutines waiting on them can resume.
68 close_socket() calls cancel() first to ensure this.
69
70 Impl Lifetime with shared_ptr
71 -----------------------------
72 Socket impls use enable_shared_from_this. The service owns impls via
73 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74 removal. When a user calls close(), we call cancel() which posts pending
75 ops to the scheduler.
76
77 CRITICAL: The posted ops must keep the impl alive until they complete.
78 Otherwise the scheduler would process a freed op (use-after-free). The
79 cancel() method captures shared_from_this() into op.impl_ptr before
80 posting. When the op completes, impl_ptr is cleared, allowing the impl
81 to be destroyed if no other references exist.
82
83 Service Ownership
84 -----------------
85 epoll_socket_service owns all socket impls. destroy_impl() removes the
86 shared_ptr from the map, but the impl may survive if ops still hold
87 impl_ptr refs. shutdown() closes all sockets and clears the map; any
88 in-flight ops will complete and release their refs.
89 */
90
91 namespace boost::corosio::detail {
92
93 /** State for epoll socket service. */
94 class epoll_socket_state
95 {
96 public:
97 239 explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98 {
99 239 }
100
101 epoll_scheduler& sched_;
102 std::mutex mutex_;
103 intrusive_list<epoll_socket> socket_list_;
104 std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105 socket_ptrs_;
106 };
107
108 /** epoll socket service implementation.
109
110 Inherits from socket_service to enable runtime polymorphism.
111 Uses key_type = socket_service for service lookup.
112 */
113 class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114 {
115 public:
116 explicit epoll_socket_service(capy::execution_context& ctx);
117 ~epoll_socket_service() override;
118
119 epoll_socket_service(epoll_socket_service const&) = delete;
120 epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121
122 void shutdown() override;
123
124 io_object::implementation* construct() override;
125 void destroy(io_object::implementation*) override;
126 void close(io_object::handle&) override;
127 std::error_code
128 open_socket(tcp_socket::implementation& impl,
129 int family, int type, int protocol) override;
130
131 325086 epoll_scheduler& scheduler() const noexcept
132 {
133 325086 return state_->sched_;
134 }
135 void post(epoll_op* op);
136 void work_started() noexcept;
137 void work_finished() noexcept;
138
139 private:
140 std::unique_ptr<epoll_socket_state> state_;
141 };
142
143 //--------------------------------------------------------------------------
144 //
145 // Implementation
146 //
147 //--------------------------------------------------------------------------
148
149 // Register an op with the reactor, handling cached edge events.
150 // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
151 inline void
152 5065 epoll_socket::register_op(
153 epoll_op& op,
154 epoll_op*& desc_slot,
155 bool& ready_flag,
156 bool& cancel_flag) noexcept
157 {
158 5065 svc_.work_started();
159
160 5065 std::lock_guard lock(desc_state_.mutex);
161 5065 bool io_done = false;
162 5065 if (ready_flag)
163 {
164 142 ready_flag = false;
165 142 op.perform_io();
166 142 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
167 142 if (!io_done)
168 142 op.errn = 0;
169 }
170
171 5065 if (cancel_flag)
172 {
173 95 cancel_flag = false;
174 95 op.cancelled.store(true, std::memory_order_relaxed);
175 }
176
177 5065 if (io_done || op.cancelled.load(std::memory_order_acquire))
178 {
179 95 svc_.post(&op);
180 95 svc_.work_finished();
181 }
182 else
183 {
184 4970 desc_slot = &op;
185 }
186 5065 }
187
188 inline void
189 104 epoll_op::canceller::operator()() const noexcept
190 {
191 104 op->cancel();
192 104 }
193
194 inline void
195 epoll_connect_op::cancel() noexcept
196 {
197 if (socket_impl_)
198 socket_impl_->cancel_single_op(*this);
199 else
200 request_cancel();
201 }
202
203 inline void
204 98 epoll_read_op::cancel() noexcept
205 {
206 98 if (socket_impl_)
207 98 socket_impl_->cancel_single_op(*this);
208 else
209 request_cancel();
210 98 }
211
212 inline void
213 epoll_write_op::cancel() noexcept
214 {
215 if (socket_impl_)
216 socket_impl_->cancel_single_op(*this);
217 else
218 request_cancel();
219 }
220
221 inline void
222 50231 epoll_op::operator()()
223 {
224 50231 stop_cb.reset();
225
226 50231 socket_impl_->svc_.scheduler().reset_inline_budget();
227
228 50231 if (cancelled.load(std::memory_order_acquire))
229 205 *ec_out = capy::error::canceled;
230 50026 else if (errn != 0)
231 *ec_out = make_err(errn);
232 50026 else if (is_read_operation() && bytes_transferred == 0)
233 *ec_out = capy::error::eof;
234 else
235 50026 *ec_out = {};
236
237 50231 *bytes_out = bytes_transferred;
238
239 // Move to stack before resuming coroutine. The coroutine might close
240 // the socket, releasing the last wrapper ref. If impl_ptr were the
241 // last ref and we destroyed it while still in operator(), we'd have
242 // use-after-free. Moving to local ensures destruction happens at
243 // function exit, after all member accesses are complete.
244 50231 capy::executor_ref saved_ex(ex);
245 50231 std::coroutine_handle<> saved_h(h);
246 50231 auto prevent_premature_destruction = std::move(impl_ptr);
247 50231 dispatch_coro(saved_ex, saved_h).resume();
248 50231 }
249
250 inline void
251 4864 epoll_connect_op::operator()()
252 {
253 4864 stop_cb.reset();
254
255 4864 socket_impl_->svc_.scheduler().reset_inline_budget();
256
257 4864 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
258
259 // Cache endpoints on successful connect
260 4864 if (success && socket_impl_)
261 {
262 4862 endpoint local_ep;
263 4862 sockaddr_storage local_storage{};
264 4862 socklen_t local_len = sizeof(local_storage);
265 4862 if (::getsockname(
266 fd, reinterpret_cast<sockaddr*>(&local_storage),
267 4862 &local_len) == 0)
268 4862 local_ep = from_sockaddr(local_storage);
269 4862 static_cast<epoll_socket*>(socket_impl_)
270 4862 ->set_endpoints(local_ep, target_endpoint);
271 }
272
273 4864 if (cancelled.load(std::memory_order_acquire))
274 *ec_out = capy::error::canceled;
275 4864 else if (errn != 0)
276 2 *ec_out = make_err(errn);
277 else
278 4862 *ec_out = {};
279
280 // Move to stack before resuming. See epoll_op::operator()() for rationale.
281 4864 capy::executor_ref saved_ex(ex);
282 4864 std::coroutine_handle<> saved_h(h);
283 4864 auto prevent_premature_destruction = std::move(impl_ptr);
284 4864 dispatch_coro(saved_ex, saved_h).resume();
285 4864 }
286
287 14649 inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
288 14649 : svc_(svc)
289 {
290 14649 }
291
292 14649 inline epoll_socket::~epoll_socket() = default;
293
294 inline std::coroutine_handle<>
295 4864 epoll_socket::connect(
296 std::coroutine_handle<> h,
297 capy::executor_ref ex,
298 endpoint ep,
299 std::stop_token token,
300 std::error_code* ec)
301 {
302 4864 auto& op = conn_;
303
304 4864 sockaddr_storage storage{};
305 socklen_t addrlen =
306 4864 detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
307 int result =
308 4864 ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
309
310 4864 if (result == 0)
311 {
312 sockaddr_storage local_storage{};
313 socklen_t local_len = sizeof(local_storage);
314 if (::getsockname(
315 fd_, reinterpret_cast<sockaddr*>(&local_storage),
316 &local_len) == 0)
317 local_endpoint_ = detail::from_sockaddr(local_storage);
318 remote_endpoint_ = ep;
319 }
320
321 4864 if (result == 0 || errno != EINPROGRESS)
322 {
323 int err = (result < 0) ? errno : 0;
324 if (svc_.scheduler().try_consume_inline_budget())
325 {
326 *ec = err ? make_err(err) : std::error_code{};
327 return dispatch_coro(ex, h);
328 }
329 op.reset();
330 op.h = h;
331 op.ex = ex;
332 op.ec_out = ec;
333 op.fd = fd_;
334 op.target_endpoint = ep;
335 op.start(token, this);
336 op.impl_ptr = shared_from_this();
337 op.complete(err, 0);
338 svc_.post(&op);
339 return std::noop_coroutine();
340 }
341
342 // EINPROGRESS — register with reactor
343 4864 op.reset();
344 4864 op.h = h;
345 4864 op.ex = ex;
346 4864 op.ec_out = ec;
347 4864 op.fd = fd_;
348 4864 op.target_endpoint = ep;
349 4864 op.start(token, this);
350 4864 op.impl_ptr = shared_from_this();
351
352 4864 register_op(
353 4864 op, desc_state_.connect_op, desc_state_.write_ready,
354 4864 desc_state_.connect_cancel_pending);
355 4864 return std::noop_coroutine();
356 }
357
358 inline std::coroutine_handle<>
359 125456 epoll_socket::read_some(
360 std::coroutine_handle<> h,
361 capy::executor_ref ex,
362 io_buffer_param param,
363 std::stop_token token,
364 std::error_code* ec,
365 std::size_t* bytes_out)
366 {
367 125456 auto& op = rd_;
368 125456 op.reset();
369
370 125456 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
371 125456 op.iovec_count =
372 125456 static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
373
374 125456 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
375 {
376 1 op.empty_buffer_read = true;
377 1 op.h = h;
378 1 op.ex = ex;
379 1 op.ec_out = ec;
380 1 op.bytes_out = bytes_out;
381 1 op.start(token, this);
382 1 op.impl_ptr = shared_from_this();
383 1 op.complete(0, 0);
384 1 svc_.post(&op);
385 1 return std::noop_coroutine();
386 }
387
388 250910 for (int i = 0; i < op.iovec_count; ++i)
389 {
390 125455 op.iovecs[i].iov_base = bufs[i].data();
391 125455 op.iovecs[i].iov_len = bufs[i].size();
392 }
393
394 // Speculative read
395 ssize_t n;
396 do
397 {
398 125455 n = ::readv(fd_, op.iovecs, op.iovec_count);
399 }
400 125455 while (n < 0 && errno == EINTR);
401
402 125455 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
403 {
404 125254 int err = (n < 0) ? errno : 0;
405 125254 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
406
407 125254 if (svc_.scheduler().try_consume_inline_budget())
408 {
409 100249 if (err)
410 *ec = make_err(err);
411 100249 else if (n == 0)
412 5 *ec = capy::error::eof;
413 else
414 100244 *ec = {};
415 100249 *bytes_out = bytes;
416 100249 return dispatch_coro(ex, h);
417 }
418 25005 op.h = h;
419 25005 op.ex = ex;
420 25005 op.ec_out = ec;
421 25005 op.bytes_out = bytes_out;
422 25005 op.start(token, this);
423 25005 op.impl_ptr = shared_from_this();
424 25005 op.complete(err, bytes);
425 25005 svc_.post(&op);
426 25005 return std::noop_coroutine();
427 }
428
429 // EAGAIN — register with reactor
430 201 op.h = h;
431 201 op.ex = ex;
432 201 op.ec_out = ec;
433 201 op.bytes_out = bytes_out;
434 201 op.fd = fd_;
435 201 op.start(token, this);
436 201 op.impl_ptr = shared_from_this();
437
438 201 register_op(
439 201 op, desc_state_.read_op, desc_state_.read_ready,
440 201 desc_state_.read_cancel_pending);
441 201 return std::noop_coroutine();
442 }
443
444 inline std::coroutine_handle<>
445 125256 epoll_socket::write_some(
446 std::coroutine_handle<> h,
447 capy::executor_ref ex,
448 io_buffer_param param,
449 std::stop_token token,
450 std::error_code* ec,
451 std::size_t* bytes_out)
452 {
453 125256 auto& op = wr_;
454 125256 op.reset();
455
456 125256 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
457 125256 op.iovec_count =
458 125256 static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
459
460 125256 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
461 {
462 1 op.h = h;
463 1 op.ex = ex;
464 1 op.ec_out = ec;
465 1 op.bytes_out = bytes_out;
466 1 op.start(token, this);
467 1 op.impl_ptr = shared_from_this();
468 1 op.complete(0, 0);
469 1 svc_.post(&op);
470 1 return std::noop_coroutine();
471 }
472
473 250510 for (int i = 0; i < op.iovec_count; ++i)
474 {
475 125255 op.iovecs[i].iov_base = bufs[i].data();
476 125255 op.iovecs[i].iov_len = bufs[i].size();
477 }
478
479 // Speculative write
480 125255 msghdr msg{};
481 125255 msg.msg_iov = op.iovecs;
482 125255 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
483
484 ssize_t n;
485 do
486 {
487 125255 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
488 }
489 125255 while (n < 0 && errno == EINTR);
490
491 125255 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
492 {
493 125255 int err = (n < 0) ? errno : 0;
494 125255 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
495
496 125255 if (svc_.scheduler().try_consume_inline_budget())
497 {
498 100232 *ec = err ? make_err(err) : std::error_code{};
499 100232 *bytes_out = bytes;
500 100232 return dispatch_coro(ex, h);
501 }
502 25023 op.h = h;
503 25023 op.ex = ex;
504 25023 op.ec_out = ec;
505 25023 op.bytes_out = bytes_out;
506 25023 op.start(token, this);
507 25023 op.impl_ptr = shared_from_this();
508 25023 op.complete(err, bytes);
509 25023 svc_.post(&op);
510 25023 return std::noop_coroutine();
511 }
512
513 // EAGAIN — register with reactor
514 op.h = h;
515 op.ex = ex;
516 op.ec_out = ec;
517 op.bytes_out = bytes_out;
518 op.fd = fd_;
519 op.start(token, this);
520 op.impl_ptr = shared_from_this();
521
522 register_op(
523 op, desc_state_.write_op, desc_state_.write_ready,
524 desc_state_.write_cancel_pending);
525 return std::noop_coroutine();
526 }
527
528 inline std::error_code
529 3 epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
530 {
531 int how;
532 3 switch (what)
533 {
534 1 case tcp_socket::shutdown_receive:
535 1 how = SHUT_RD;
536 1 break;
537 1 case tcp_socket::shutdown_send:
538 1 how = SHUT_WR;
539 1 break;
540 1 case tcp_socket::shutdown_both:
541 1 how = SHUT_RDWR;
542 1 break;
543 default:
544 return make_err(EINVAL);
545 }
546 3 if (::shutdown(fd_, how) != 0)
547 return make_err(errno);
548 3 return {};
549 }
550
551 inline std::error_code
552 32 epoll_socket::set_option(
553 int level, int optname,
554 void const* data, std::size_t size) noexcept
555 {
556 32 if (::setsockopt(fd_, level, optname, data,
557 32 static_cast<socklen_t>(size)) != 0)
558 return make_err(errno);
559 32 return {};
560 }
561
562 inline std::error_code
563 31 epoll_socket::get_option(
564 int level, int optname,
565 void* data, std::size_t* size) const noexcept
566 {
567 31 socklen_t len = static_cast<socklen_t>(*size);
568 31 if (::getsockopt(fd_, level, optname, data, &len) != 0)
569 return make_err(errno);
570 31 *size = static_cast<std::size_t>(len);
571 31 return {};
572 }
573
574 inline void
575 187 epoll_socket::cancel() noexcept
576 {
577 187 auto self = weak_from_this().lock();
578 187 if (!self)
579 return;
580
581 187 conn_.request_cancel();
582 187 rd_.request_cancel();
583 187 wr_.request_cancel();
584
585 187 epoll_op* conn_claimed = nullptr;
586 187 epoll_op* rd_claimed = nullptr;
587 187 epoll_op* wr_claimed = nullptr;
588 {
589 187 std::lock_guard lock(desc_state_.mutex);
590 187 if (desc_state_.connect_op == &conn_)
591 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
592 else
593 187 desc_state_.connect_cancel_pending = true;
594 187 if (desc_state_.read_op == &rd_)
595 3 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
596 else
597 184 desc_state_.read_cancel_pending = true;
598 187 if (desc_state_.write_op == &wr_)
599 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
600 else
601 187 desc_state_.write_cancel_pending = true;
602 187 }
603
604 187 if (conn_claimed)
605 {
606 conn_.impl_ptr = self;
607 svc_.post(&conn_);
608 svc_.work_finished();
609 }
610 187 if (rd_claimed)
611 {
612 3 rd_.impl_ptr = self;
613 3 svc_.post(&rd_);
614 3 svc_.work_finished();
615 }
616 187 if (wr_claimed)
617 {
618 wr_.impl_ptr = self;
619 svc_.post(&wr_);
620 svc_.work_finished();
621 }
622 187 }
623
624 inline void
625 98 epoll_socket::cancel_single_op(epoll_op& op) noexcept
626 {
627 98 auto self = weak_from_this().lock();
628 98 if (!self)
629 return;
630
631 98 op.request_cancel();
632
633 98 epoll_op** desc_op_ptr = nullptr;
634 98 if (&op == &conn_)
635 desc_op_ptr = &desc_state_.connect_op;
636 98 else if (&op == &rd_)
637 98 desc_op_ptr = &desc_state_.read_op;
638 else if (&op == &wr_)
639 desc_op_ptr = &desc_state_.write_op;
640
641 98 if (desc_op_ptr)
642 {
643 98 epoll_op* claimed = nullptr;
644 {
645 98 std::lock_guard lock(desc_state_.mutex);
646 98 if (*desc_op_ptr == &op)
647 98 claimed = std::exchange(*desc_op_ptr, nullptr);
648 else if (&op == &conn_)
649 desc_state_.connect_cancel_pending = true;
650 else if (&op == &rd_)
651 desc_state_.read_cancel_pending = true;
652 else if (&op == &wr_)
653 desc_state_.write_cancel_pending = true;
654 98 }
655 98 if (claimed)
656 {
657 98 op.impl_ptr = self;
658 98 svc_.post(&op);
659 98 svc_.work_finished();
660 }
661 }
662 98 }
663
664 inline void
665 43918 epoll_socket::close_socket() noexcept
666 {
667 43918 auto self = weak_from_this().lock();
668 43918 if (self)
669 {
670 43918 conn_.request_cancel();
671 43918 rd_.request_cancel();
672 43918 wr_.request_cancel();
673
674 43918 epoll_op* conn_claimed = nullptr;
675 43918 epoll_op* rd_claimed = nullptr;
676 43918 epoll_op* wr_claimed = nullptr;
677 {
678 43918 std::lock_guard lock(desc_state_.mutex);
679 43918 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
680 43918 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
681 43918 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
682 43918 desc_state_.read_ready = false;
683 43918 desc_state_.write_ready = false;
684 43918 desc_state_.read_cancel_pending = false;
685 43918 desc_state_.write_cancel_pending = false;
686 43918 desc_state_.connect_cancel_pending = false;
687 43918 }
688
689 43918 if (conn_claimed)
690 {
691 conn_.impl_ptr = self;
692 svc_.post(&conn_);
693 svc_.work_finished();
694 }
695 43918 if (rd_claimed)
696 {
697 1 rd_.impl_ptr = self;
698 1 svc_.post(&rd_);
699 1 svc_.work_finished();
700 }
701 43918 if (wr_claimed)
702 {
703 wr_.impl_ptr = self;
704 svc_.post(&wr_);
705 svc_.work_finished();
706 }
707
708 43918 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
709 97 desc_state_.impl_ref_ = self;
710 }
711
712 43918 if (fd_ >= 0)
713 {
714 9741 if (desc_state_.registered_events != 0)
715 9741 svc_.scheduler().deregister_descriptor(fd_);
716 9741 ::close(fd_);
717 9741 fd_ = -1;
718 }
719
720 43918 desc_state_.fd = -1;
721 43918 desc_state_.registered_events = 0;
722
723 43918 local_endpoint_ = endpoint{};
724 43918 remote_endpoint_ = endpoint{};
725 43918 }
726
727 239 inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
728 239 : state_(
729 std::make_unique<epoll_socket_state>(
730 239 ctx.use_service<epoll_scheduler>()))
731 {
732 239 }
733
734 478 inline epoll_socket_service::~epoll_socket_service() {}
735
736 inline void
737 239 epoll_socket_service::shutdown()
738 {
739 239 std::lock_guard lock(state_->mutex_);
740
741 239 while (auto* impl = state_->socket_list_.pop_front())
742 impl->close_socket();
743
744 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
745 // drains completed_ops_, calling destroy() on each queued op. If we
746 // released our shared_ptrs now, an epoll_op::destroy() could free the
747 // last ref to an impl whose embedded descriptor_state is still linked
748 // in the queue — use-after-free on the next pop(). Letting ~state_
749 // release the ptrs (during service destruction, after scheduler
750 // shutdown) keeps every impl alive until all ops have been drained.
751 239 }
752
753 inline io_object::implementation*
754 14649 epoll_socket_service::construct()
755 {
756 14649 auto impl = std::make_shared<epoll_socket>(*this);
757 14649 auto* raw = impl.get();
758
759 {
760 14649 std::lock_guard lock(state_->mutex_);
761 14649 state_->socket_list_.push_back(raw);
762 14649 state_->socket_ptrs_.emplace(raw, std::move(impl));
763 14649 }
764
765 14649 return raw;
766 14649 }
767
768 inline void
769 14649 epoll_socket_service::destroy(io_object::implementation* impl)
770 {
771 14649 auto* epoll_impl = static_cast<epoll_socket*>(impl);
772 14649 epoll_impl->close_socket();
773 14649 std::lock_guard lock(state_->mutex_);
774 14649 state_->socket_list_.remove(epoll_impl);
775 14649 state_->socket_ptrs_.erase(epoll_impl);
776 14649 }
777
778 inline std::error_code
779 4879 epoll_socket_service::open_socket(
780 tcp_socket::implementation& impl,
781 int family, int type, int protocol)
782 {
783 4879 auto* epoll_impl = static_cast<epoll_socket*>(&impl);
784 4879 epoll_impl->close_socket();
785
786 4879 int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
787 4879 if (fd < 0)
788 return make_err(errno);
789
790 4879 if (family == AF_INET6)
791 {
792 5 int one = 1;
793 5 ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
794 }
795
796 4879 epoll_impl->fd_ = fd;
797
798 // Register fd with epoll (edge-triggered mode)
799 4879 epoll_impl->desc_state_.fd = fd;
800 {
801 4879 std::lock_guard lock(epoll_impl->desc_state_.mutex);
802 4879 epoll_impl->desc_state_.read_op = nullptr;
803 4879 epoll_impl->desc_state_.write_op = nullptr;
804 4879 epoll_impl->desc_state_.connect_op = nullptr;
805 4879 }
806 4879 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
807
808 4879 return {};
809 }
810
811 inline void
812 24390 epoll_socket_service::close(io_object::handle& h)
813 {
814 24390 static_cast<epoll_socket*>(h.get())->close_socket();
815 24390 }
816
817 inline void
818 50227 epoll_socket_service::post(epoll_op* op)
819 {
820 50227 state_->sched_.post(op);
821 50227 }
822
823 inline void
824 5065 epoll_socket_service::work_started() noexcept
825 {
826 5065 state_->sched_.work_started();
827 5065 }
828
829 inline void
830 197 epoll_socket_service::work_finished() noexcept
831 {
832 197 state_->sched_.work_finished();
833 197 }
834
835 } // namespace boost::corosio::detail
836
837 #endif // BOOST_COROSIO_HAS_EPOLL
838
839 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
840