Home | History | Annotate | Line # | Download | only in experimental
      1 // <experimental/io_service> -*- C++ -*-
      2 
      3 // Copyright (C) 2015-2022 Free Software Foundation, Inc.
      4 //
      5 // This file is part of the GNU ISO C++ Library.  This library is free
      6 // software; you can redistribute it and/or modify it under the
      7 // terms of the GNU General Public License as published by the
      8 // Free Software Foundation; either version 3, or (at your option)
      9 // any later version.
     10 
     11 // This library is distributed in the hope that it will be useful,
     12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
     13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     14 // GNU General Public License for more details.
     15 
     16 // Under Section 7 of GPL version 3, you are granted additional
     17 // permissions described in the GCC Runtime Library Exception, version
     18 // 3.1, as published by the Free Software Foundation.
     19 
     20 // You should have received a copy of the GNU General Public License and
     21 // a copy of the GCC Runtime Library Exception along with this program;
     22 // see the files COPYING3 and COPYING.RUNTIME respectively.  If not, see
     23 // <http://www.gnu.org/licenses/>.
     24 
     25 /** @file experimental/io_context
     26  *  This is a TS C++ Library header.
     27  *  @ingroup networking-ts
     28  */
     29 
     30 #ifndef _GLIBCXX_EXPERIMENTAL_IO_SERVICE
     31 #define _GLIBCXX_EXPERIMENTAL_IO_SERVICE 1
     32 
     33 #pragma GCC system_header
     34 
     35 #if __cplusplus >= 201402L
     36 
     37 #include <atomic>
     38 #include <forward_list>
     39 #include <functional>
     40 #include <system_error>
     41 #include <thread>
     42 #include <vector>
     43 #include <experimental/netfwd>
     44 #include <experimental/executor>
     45 #include <bits/chrono.h>
     46 #if _GLIBCXX_HAVE_UNISTD_H
     47 # include <unistd.h>
     48 #endif
     49 #ifdef _GLIBCXX_HAVE_POLL_H
     50 # include <poll.h>
     51 #endif
     52 #ifdef _GLIBCXX_HAVE_FCNTL_H
     53 # include <fcntl.h>
     54 #endif
     55 
     56 namespace std _GLIBCXX_VISIBILITY(default)
     57 {
     58 _GLIBCXX_BEGIN_NAMESPACE_VERSION
     59 namespace experimental
     60 {
     61 namespace net
     62 {
     63 inline namespace v1
     64 {
     65 
     66   /** @addtogroup networking-ts
     67    *  @{
     68    */
     69 
     70   class __socket_impl;
     71 
     72   /// An ExecutionContext for I/O operations.
     73   class io_context : public execution_context
     74   {
     75   public:
     76     // types:
     77 
     78     /// An executor for an io_context.
     79     class executor_type
     80     {
     81     public:
     82       // construct / copy / destroy:
     83 
     84       executor_type(const executor_type& __other) noexcept = default;
     85       executor_type(executor_type&& __other) noexcept = default;
     86 
     87       executor_type& operator=(const executor_type& __other) noexcept = default;
     88       executor_type& operator=(executor_type&& __other) noexcept = default;
     89 
     90       // executor operations:
     91 
     92       bool running_in_this_thread() const noexcept
     93       {
     94 #ifdef _GLIBCXX_HAS_GTHREADS
     95 	lock_guard<execution_context::mutex_type> __lock(_M_ctx->_M_mtx);
     96 	auto __end = _M_ctx->_M_call_stack.end();
     97 	return std::find(_M_ctx->_M_call_stack.begin(), __end,
     98 			 this_thread::get_id()) != __end;
     99 #else
    100 	return _M_ctx->_M_run_count != 0;
    101 #endif
    102       }
    103 
    104       io_context& context() const noexcept { return *_M_ctx; }
    105 
    106       void on_work_started() const noexcept { ++_M_ctx->_M_work_count; }
    107       void on_work_finished() const noexcept { --_M_ctx->_M_work_count; }
    108 
    109       template<typename _Func, typename _ProtoAllocator>
    110 	void
    111 	dispatch(_Func&& __f, const _ProtoAllocator& __a) const
    112 	{
    113 	  if (running_in_this_thread())
    114 	    decay_t<_Func>{std::forward<_Func>(__f)}();
    115 	  else
    116 	    post(std::forward<_Func>(__f), __a);
    117 	}
    118 
    119       template<typename _Func, typename _ProtoAllocator>
    120 	void
    121 	post(_Func&& __f, const _ProtoAllocator& __a) const
    122 	{
    123 	  lock_guard<execution_context::mutex_type> __lock(_M_ctx->_M_mtx);
    124 	  // TODO (re-use functionality in system_context)
    125 	  _M_ctx->_M_reactor._M_notify();
    126 	}
    127 
    128       template<typename _Func, typename _ProtoAllocator>
    129 	void
    130 	defer(_Func&& __f, const _ProtoAllocator& __a) const
    131 	{ post(std::forward<_Func>(__f), __a); }
    132 
    133     private:
    134       friend io_context;
    135 
    136       explicit
    137       executor_type(io_context& __ctx) : _M_ctx(std::addressof(__ctx)) { }
    138 
    139       io_context* _M_ctx;
    140     };
    141 
    142     using count_type =  size_t;
    143 
    144     // construct / copy / destroy:
    145 
    146     io_context() : _M_work_count(0) { }
    147 
    148     explicit
    149     io_context(int __concurrency_hint) : _M_work_count(0) { }
    150 
    151     io_context(const io_context&) = delete;
    152     io_context& operator=(const io_context&) = delete;
    153 
    154     // io_context operations:
    155 
    156     executor_type get_executor() noexcept { return executor_type(*this); }
    157 
    158     count_type
    159     run()
    160     {
    161       count_type __n = 0;
    162       while (run_one())
    163 	if (__n != numeric_limits<count_type>::max())
    164 	  ++__n;
    165       return __n;
    166     }
    167 
    168     template<typename _Rep, typename _Period>
    169       count_type
    170       run_for(const chrono::duration<_Rep, _Period>& __rel_time)
    171       { return run_until(chrono::steady_clock::now() + __rel_time); }
    172 
    173     template<typename _Clock, typename _Duration>
    174       count_type
    175       run_until(const chrono::time_point<_Clock, _Duration>& __abs_time)
    176       {
    177 	count_type __n = 0;
    178 	while (run_one_until(__abs_time))
    179 	  if (__n != numeric_limits<count_type>::max())
    180 	    ++__n;
    181 	return __n;
    182       }
    183 
    184     count_type
    185     run_one()
    186     { return _M_do_one(chrono::milliseconds{-1}); }
    187 
    188     template<typename _Rep, typename _Period>
    189       count_type
    190       run_one_for(const chrono::duration<_Rep, _Period>& __rel_time)
    191       { return run_one_until(chrono::steady_clock::now() + __rel_time); }
    192 
    193     template<typename _Clock, typename _Duration>
    194       count_type
    195       run_one_until(const chrono::time_point<_Clock, _Duration>& __abs_time)
    196       {
    197 	auto __now = _Clock::now();
    198 	while (__now < __abs_time)
    199 	  {
    200 	    using namespace std::chrono;
    201 	    auto __ms = duration_cast<milliseconds>(__abs_time - __now);
    202 	    if (_M_do_one(__ms))
    203 	      return 1;
    204 	    __now = _Clock::now();
    205 	  }
    206 	return 0;
    207       }
    208 
    209     count_type
    210     poll()
    211     {
    212       count_type __n = 0;
    213       while (poll_one())
    214 	if (__n != numeric_limits<count_type>::max())
    215 	  ++__n;
    216       return __n;
    217     }
    218 
    219     count_type
    220     poll_one()
    221     { return _M_do_one(chrono::milliseconds{0}); }
    222 
    223     void stop()
    224     {
    225       lock_guard<execution_context::mutex_type> __lock(_M_mtx);
    226       _M_stopped = true;
    227       _M_reactor._M_notify();
    228     }
    229 
    230     bool stopped() const noexcept
    231     {
    232       lock_guard<execution_context::mutex_type> __lock(_M_mtx);
    233       return _M_stopped;
    234     }
    235 
    236     void restart()
    237     {
    238       _M_stopped = false;
    239     }
    240 
    241   private:
    242 
    243     template<typename _Clock, typename _WaitTraits>
    244       friend class basic_waitable_timer;
    245 
    246     friend __socket_impl;
    247 
    248     template<typename _Protocol>
    249       friend class __basic_socket_impl;
    250 
    251     template<typename _Protocol>
    252       friend class basic_socket;
    253 
    254     template<typename _Protocol>
    255       friend class basic_datagram_socket;
    256 
    257     template<typename _Protocol>
    258       friend class basic_stream_socket;
    259 
    260     template<typename _Protocol>
    261       friend class basic_socket_acceptor;
    262 
    263     count_type
    264     _M_outstanding_work() const
    265     { return _M_work_count + !_M_ops.empty(); }
    266 
    267     struct __timer_queue_base : execution_context::service
    268     {
    269       // return milliseconds until next timer expires, or milliseconds::max()
    270       virtual chrono::milliseconds _M_next() const = 0;
    271       virtual bool run_one() = 0;
    272 
    273     protected:
    274       explicit
    275       __timer_queue_base(execution_context& __ctx) : service(__ctx)
    276       {
    277 	auto& __ioc = static_cast<io_context&>(__ctx);
    278 	lock_guard<execution_context::mutex_type> __lock(__ioc._M_mtx);
    279 	__ioc._M_timers.push_back(this);
    280       }
    281 
    282       mutable execution_context::mutex_type _M_qmtx;
    283     };
    284 
    285     template<typename _Timer, typename _Key = typename _Timer::_Key>
    286       struct __timer_queue : __timer_queue_base
    287       {
    288 	using key_type = __timer_queue;
    289 
    290 	explicit
    291 	__timer_queue(execution_context& __ctx) : __timer_queue_base(__ctx)
    292 	{ }
    293 
    294 	void shutdown() noexcept { }
    295 
    296 	io_context& context() noexcept
    297 	{ return static_cast<io_context&>(service::context()); }
    298 
    299 	// Start an asynchronous wait.
    300 	void
    301 	push(const _Timer& __t, function<void(error_code)> __h)
    302 	{
    303 	  context().get_executor().on_work_started();
    304 	  lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
    305 	  _M_queue.emplace(__t, _M_next_id++, std::move(__h));
    306 	  // no need to notify reactor unless this timer went to the front?
    307 	}
    308 
    309 	// Cancel all outstanding waits for __t
    310 	size_t
    311 	cancel(const _Timer& __t)
    312 	{
    313 	  lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
    314 	  size_t __count = 0;
    315 	  auto __last = _M_queue.end();
    316 	  for (auto __it = _M_queue.begin(), __end = __last; __it != __end;
    317 	      ++__it)
    318 	    {
    319 	      if (__it->_M_key == __t._M_key.get())
    320 		{
    321 		  __it->cancel();
    322 		  __last = __it;
    323 		  ++__count;
    324 		}
    325 	    }
    326 	  if (__count)
    327 	    _M_queue._M_sort_to(__last);
    328 	  return __count;
    329 	}
    330 
    331 	// Cancel oldest outstanding wait for __t
    332 	bool
    333 	cancel_one(const _Timer& __t)
    334 	{
    335 	  lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
    336 	  const auto __end = _M_queue.end();
    337 	  auto __oldest = __end;
    338 	  for (auto __it = _M_queue.begin(); __it != __end; ++__it)
    339 	    if (__it->_M_key == __t._M_key.get())
    340 	      if (__oldest == __end || __it->_M_id < __oldest->_M_id)
    341 		__oldest = __it;
    342 	  if (__oldest == __end)
    343 	    return false;
    344 	  __oldest->cancel();
    345 	  _M_queue._M_sort_to(__oldest);
    346 	  return true;
    347 	}
    348 
    349 	chrono::milliseconds
    350 	_M_next() const override
    351 	{
    352 	  typename _Timer::time_point __exp;
    353 	  {
    354 	    lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
    355 	    if (_M_queue.empty())
    356 	      return chrono::milliseconds::max();  // no pending timers
    357 	    if (_M_queue.top()._M_key == nullptr)
    358 	      return chrono::milliseconds::zero(); // cancelled, run now
    359 	    __exp = _M_queue.top()._M_expiry;
    360 	  }
    361 	  auto __dur = _Timer::traits_type::to_wait_duration(__exp);
    362 	  if (__dur < __dur.zero())
    363 	    __dur = __dur.zero();
    364 	  return chrono::duration_cast<chrono::milliseconds>(__dur);
    365 	}
    366 
    367       private:
    368 
    369 	bool run_one() override
    370 	{
    371 	  auto __now = _Timer::clock_type::now();
    372 	  function<void(error_code)> __h;
    373 	  error_code __ec;
    374 	  {
    375 	    lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
    376 
    377 	    if (_M_queue.top()._M_key == nullptr) // cancelled
    378 	      {
    379 		__h = std::move(_M_queue.top()._M_h);
    380 		__ec = std::make_error_code(errc::operation_canceled);
    381 		_M_queue.pop();
    382 	      }
    383 	    else if (_M_queue.top()._M_expiry <= _Timer::clock_type::now())
    384 	      {
    385 		__h = std::move(_M_queue.top()._M_h);
    386 		_M_queue.pop();
    387 	      }
    388 	  }
    389 	  if (__h)
    390 	    {
    391 	      __h(__ec);
    392 	      context().get_executor().on_work_finished();
    393 	      return true;
    394 	    }
    395 	  return false;
    396 	}
    397 
    398 	using __timer_id_type = uint64_t;
    399 
    400 	struct __pending_timer
    401 	{
    402 	  __pending_timer(const _Timer& __t, uint64_t __id,
    403 			  function<void(error_code)> __h)
    404 	  : _M_expiry(__t.expiry()), _M_key(__t._M_key.get()), _M_id(__id),
    405 	    _M_h(std::move(__h))
    406 	  { }
    407 
    408 	  typename _Timer::time_point _M_expiry;
    409 	  _Key* _M_key;
    410 	  __timer_id_type _M_id;
    411 	  function<void(error_code)> _M_h;
    412 
    413 	  void cancel() { _M_expiry = _M_expiry.min(); _M_key = nullptr; }
    414 
    415 	  bool
    416 	  operator<(const __pending_timer& __rhs) const
    417 	  { return _M_expiry < __rhs._M_expiry; }
    418 	};
    419 
    420 	struct __queue : priority_queue<__pending_timer>
    421 	{
    422 	  using iterator =
    423 	    typename priority_queue<__pending_timer>::container_type::iterator;
    424 
    425 	  // expose begin/end/erase for direct access to underlying container
    426 	  iterator begin() { return this->c.begin(); }
    427 	  iterator end() { return this->c.end(); }
    428 	  iterator erase(iterator __it) { return this->c.erase(__it); }
    429 
    430 	  void
    431 	  _M_sort_to(iterator __it)
    432 	  { std::stable_sort(this->c.begin(), ++__it); }
    433 	};
    434 
    435 	__queue	_M_queue;
    436 	__timer_id_type _M_next_id = 0;
    437       };
    438 
    439     template<typename _Timer, typename _CompletionHandler>
    440       void
    441       async_wait(const _Timer& __timer, _CompletionHandler&& __h)
    442       {
    443 	auto& __queue = use_service<__timer_queue<_Timer>>(*this);
    444 	__queue.push(__timer, std::move(__h));
    445 	_M_reactor._M_notify();
    446       }
    447 
    448     // Cancel all wait operations initiated by __timer.
    449     template<typename _Timer>
    450       size_t
    451       cancel(const _Timer& __timer)
    452       {
    453 	if (!has_service<__timer_queue<_Timer>>(*this))
    454 	  return 0;
    455 
    456 	auto __c = use_service<__timer_queue<_Timer>>(*this).cancel(__timer);
    457 	if (__c != 0)
    458 	  _M_reactor._M_notify();
    459 	return __c;
    460       }
    461 
    462     // Cancel the oldest wait operation initiated by __timer.
    463     template<typename _Timer>
    464       size_t
    465       cancel_one(const _Timer& __timer)
    466       {
    467 	if (!has_service<__timer_queue<_Timer>>(*this))
    468 	  return 0;
    469 
    470 	if (use_service<__timer_queue<_Timer>>(*this).cancel_one(__timer))
    471 	  {
    472 	    _M_reactor._M_notify();
    473 	    return 1;
    474 	  }
    475 	return 0;
    476       }
    477 
    478     // The caller must know what the wait-type __w will be interpreted.
    479     // In the current implementation the reactor is based on <poll.h>
    480     // so the parameter must be one of POLLIN, POLLOUT or POLLERR.
    481     template<typename _Op>
    482       void
    483       async_wait(int __fd, int __w, _Op&& __op)
    484       {
    485 	lock_guard<execution_context::mutex_type> __lock(_M_mtx);
    486 	// TODO need push_back, use std::list not std::forward_list
    487 	auto __tail = _M_ops.before_begin(), __it = _M_ops.begin();
    488 	while (__it != _M_ops.end())
    489 	  {
    490 	    ++__it;
    491 	    ++__tail;
    492 	  }
    493 	using __type = __async_operation_impl<_Op>;
    494 	_M_ops.emplace_after(__tail,
    495 			     make_unique<__type>(std::move(__op), __fd, __w));
    496 	_M_reactor._M_fd_interest(__fd, __w);
    497       }
    498 
    499     void _M_add_fd(int __fd) { _M_reactor._M_add_fd(__fd); }
    500     void _M_remove_fd(int __fd) { _M_reactor._M_remove_fd(__fd); }
    501 
    502     void cancel(int __fd, error_code&)
    503     {
    504       lock_guard<execution_context::mutex_type> __lock(_M_mtx);
    505       const auto __end = _M_ops.end();
    506       auto __it = _M_ops.begin();
    507       auto __prev = _M_ops.before_begin();
    508       while (__it != __end && (*__it)->_M_is_cancelled())
    509 	{
    510 	  ++__it;
    511 	  ++__prev;
    512 	}
    513       auto __cancelled = __prev;
    514       while (__it != __end)
    515 	{
    516 	  if ((*__it)->_M_fd == __fd)
    517 	    {
    518 	      (*__it)->cancel();
    519 	      ++__it;
    520 	      _M_ops.splice_after(__cancelled, _M_ops, __prev);
    521 	      ++__cancelled;
    522 	    }
    523 	  else
    524 	    {
    525 	      ++__it;
    526 	      ++__prev;
    527 	    }
    528 	}
    529       _M_reactor._M_not_interested(__fd);
    530     }
    531 
    532     struct __async_operation
    533     {
    534       __async_operation(int __fd, int __ev) : _M_fd(__fd), _M_ev(__ev) { }
    535 
    536       virtual ~__async_operation() = default;
    537 
    538       int _M_fd;
    539       short _M_ev;
    540 
    541       void cancel() { _M_fd = -1; }
    542       bool _M_is_cancelled() const { return _M_fd == -1; }
    543       virtual void run(io_context&) = 0;
    544     };
    545 
    546     template<typename _Op>
    547       struct __async_operation_impl : __async_operation
    548       {
    549 	__async_operation_impl(_Op&& __op, int __fd, int __ev)
    550 	: __async_operation{__fd, __ev}, _M_op(std::move(__op)) { }
    551 
    552 	_Op _M_op;
    553 
    554 	void run(io_context& __ctx)
    555 	{
    556 	  if (_M_is_cancelled())
    557 	    _M_op(std::make_error_code(errc::operation_canceled));
    558 	  else
    559 	    _M_op(error_code{});
    560 	}
    561       };
    562 
    563     atomic<count_type>		_M_work_count;
    564     mutable execution_context::mutex_type		_M_mtx;
    565     queue<function<void()>>	_M_op;
    566     bool			_M_stopped = false;
    567 
    568     struct __monitor
    569     {
    570       __monitor(io_context& __c) : _M_ctx(__c)
    571       {
    572 #ifdef _GLIBCXX_HAS_GTHREADS
    573 	lock_guard<execution_context::mutex_type> __lock(_M_ctx._M_mtx);
    574 	_M_ctx._M_call_stack.push_back(this_thread::get_id());
    575 #else
    576 	_M_ctx._M_run_count++;
    577 #endif
    578       }
    579 
    580       ~__monitor()
    581       {
    582 #ifdef _GLIBCXX_HAS_GTHREADS
    583 	lock_guard<execution_context::mutex_type> __lock(_M_ctx._M_mtx);
    584 	_M_ctx._M_call_stack.pop_back();
    585 #else
    586 	_M_ctx._M_run_count--;
    587 #endif
    588 	if (_M_ctx._M_outstanding_work() == 0)
    589 	  {
    590 	    _M_ctx._M_stopped = true;
    591 	    _M_ctx._M_reactor._M_notify();
    592 	  }
    593       }
    594 
    595       __monitor(__monitor&&) = delete;
    596 
    597       io_context& _M_ctx;
    598     };
    599 
    600     bool
    601     _M_do_one(chrono::milliseconds __timeout)
    602     {
    603       const bool __block = __timeout != chrono::milliseconds::zero();
    604 
    605       __reactor::__fdvec __fds;
    606 
    607       __monitor __mon{*this};
    608 
    609       __timer_queue_base* __timerq = nullptr;
    610       unique_ptr<__async_operation> __async_op;
    611 
    612       while (true)
    613 	{
    614 	  if (__timerq)
    615 	    {
    616 	      if (__timerq->run_one())
    617 		return true;
    618 	      else
    619 		__timerq = nullptr;
    620 	    }
    621 
    622 	  if (__async_op)
    623 	    {
    624 	      __async_op->run(*this);
    625 	      // TODO need to unregister __async_op
    626 	      return true;
    627 	    }
    628 
    629 	  chrono::milliseconds __ms{0};
    630 
    631 	  {
    632 	    lock_guard<execution_context::mutex_type> __lock(_M_mtx);
    633 
    634 	    if (_M_stopped)
    635 	      return false;
    636 
    637 	    // find first timer with something to do
    638 	    for (auto __q : _M_timers)
    639 	      {
    640 		auto __next = __q->_M_next();
    641 		if (__next == __next.zero())  // ready to run immediately
    642 		  {
    643 		    __timerq = __q;
    644 		    __ms = __next;
    645 		    break;
    646 		  }
    647 		else if (__next != __next.max() && __block
    648 		    && (__next < __ms || __timerq == nullptr))
    649 		  {
    650 		    __timerq = __q;
    651 		    __ms = __next;
    652 		  }
    653 	      }
    654 
    655 	    if (__timerq && __ms == __ms.zero())
    656 	      continue;  // restart loop to run a timer immediately
    657 
    658 	    if (!_M_ops.empty() && _M_ops.front()->_M_is_cancelled())
    659 	      {
    660 		_M_ops.front().swap(__async_op);
    661 		_M_ops.pop_front();
    662 		continue;
    663 	      }
    664 
    665 	    // TODO run any posted items
    666 
    667 	    if (__block)
    668 	      {
    669 		if (__timerq == nullptr)
    670 		  __ms = __timeout;
    671 		else if (__ms.zero() <= __timeout && __timeout < __ms)
    672 		  __ms = __timeout;
    673 		else if (__ms.count() > numeric_limits<int>::max())
    674 		  __ms = chrono::milliseconds{numeric_limits<int>::max()};
    675 	      }
    676 	    // else __ms == 0 and poll() will return immediately
    677 
    678 	  }
    679 
    680 	  auto __res = _M_reactor.wait(__fds, __ms);
    681 
    682 	  if (__res == __reactor::_S_retry)
    683 	    continue;
    684 
    685 	  if (__res == __reactor::_S_timeout)
    686 	    {
    687 	      if (__timerq == nullptr)
    688 		return false;
    689 	      else
    690 		continue;  // timed out, so restart loop and process the timer
    691 	    }
    692 
    693 	  __timerq = nullptr;
    694 
    695 	  if (__fds.empty()) // nothing to do
    696 	    return false;
    697 
    698 	  lock_guard<execution_context::mutex_type> __lock(_M_mtx);
    699 	  for (auto __it = _M_ops.begin(), __end = _M_ops.end(),
    700 	      __prev = _M_ops.before_begin(); __it != __end; ++__it, ++__prev)
    701 	    {
    702 	      auto& __op = **__it;
    703 	      auto __pos = std::lower_bound(__fds.begin(), __fds.end(),
    704 		  __op._M_fd,
    705 		  [](const auto& __p, int __fd) { return __p.fd < __fd; });
    706 	      if (__pos != __fds.end() && __pos->fd == __op._M_fd
    707 		  && __pos->revents & __op._M_ev)
    708 		{
    709 		  __it->swap(__async_op);
    710 		  _M_ops.erase_after(__prev);
    711 		  break;  // restart loop and run op
    712 		}
    713 	    }
    714 	}
    715     }
    716 
    717     struct __reactor
    718     {
    719 #ifdef _GLIBCXX_HAVE_POLL_H
    720       __reactor() : _M_fds(1)
    721       {
    722 	int __pipe[2];
    723 	if (::pipe(__pipe) == -1)
    724 	  __throw_system_error(errno);
    725 	if (::fcntl(__pipe[0], F_SETFL, O_NONBLOCK) == -1
    726 	    || ::fcntl(__pipe[1], F_SETFL, O_NONBLOCK) == -1)
    727 	  {
    728 	    int __e = errno;
    729 	    ::close(__pipe[0]);
    730 	    ::close(__pipe[1]);
    731 	    __throw_system_error(__e);
    732 	  }
    733 	_M_fds.back().events	= POLLIN;
    734 	_M_fds.back().fd	= __pipe[0];
    735 	_M_notify_wr		= __pipe[1];
    736       }
    737 
    738       ~__reactor()
    739       {
    740 	::close(_M_fds.back().fd);
    741 	::close(_M_notify_wr);
    742       }
    743 #endif
    744 
    745       // write a notification byte to the pipe (ignoring errors)
    746       void _M_notify()
    747       {
    748 	int __n;
    749 	do {
    750 	  __n = ::write(_M_notify_wr, "", 1);
    751 	} while (__n == -1 && errno == EINTR);
    752       }
    753 
    754       // read all notification bytes from the pipe
    755       void _M_on_notify()
    756       {
    757 	// Drain the pipe.
    758 	char __buf[64];
    759 	ssize_t __n;
    760 	do {
    761 	  __n = ::read(_M_fds.back().fd, __buf, sizeof(__buf));
    762 	} while (__n != -1 || errno == EINTR);
    763       }
    764 
    765       void
    766       _M_add_fd(int __fd)
    767       {
    768 	auto __pos = _M_lower_bound(__fd);
    769 	if (__pos->fd == __fd)
    770 	  __throw_system_error((int)errc::invalid_argument);
    771 	_M_fds.insert(__pos, __fdvec::value_type{})->fd = __fd;
    772 	_M_notify();
    773       }
    774 
    775       void
    776       _M_remove_fd(int __fd)
    777       {
    778 	auto __pos = _M_lower_bound(__fd);
    779 	if (__pos->fd == __fd)
    780 	  _M_fds.erase(__pos);
    781 	// else bug!
    782 	_M_notify();
    783       }
    784 
    785       void
    786       _M_fd_interest(int __fd, int __w)
    787       {
    788 	auto __pos = _M_lower_bound(__fd);
    789 	if (__pos->fd == __fd)
    790 	  __pos->events |= __w;
    791 	// else bug!
    792 	_M_notify();
    793       }
    794 
    795       void
    796       _M_not_interested(int __fd)
    797       {
    798 	auto __pos = _M_lower_bound(__fd);
    799 	if (__pos->fd == __fd)
    800 	  __pos->events = 0;
    801 	_M_notify();
    802       }
    803 
    804 #ifdef _GLIBCXX_HAVE_POLL_H
    805       using __fdvec = vector<::pollfd>;
    806 #else
    807       struct dummy_pollfd { int fd = -1; short events = 0, revents = 0; };
    808       using __fdvec = vector<dummy_pollfd>;
    809 #endif
    810 
    811       // Find first element p such that !(p.fd < __fd)
    812       // N.B. always returns a dereferencable iterator.
    813       __fdvec::iterator
    814       _M_lower_bound(int __fd)
    815       {
    816 	return std::lower_bound(_M_fds.begin(), _M_fds.end() - 1,
    817 	    __fd, [](const auto& __p, int __fd) { return __p.fd < __fd; });
    818       }
    819 
    820       enum __status { _S_retry, _S_timeout, _S_ok, _S_error };
    821 
    822       __status
    823       wait(__fdvec& __fds, chrono::milliseconds __timeout)
    824       {
    825 #ifdef _GLIBCXX_HAVE_POLL_H
    826 	// XXX not thread-safe!
    827 	__fds = _M_fds;  // take snapshot to pass to poll()
    828 
    829 	int __res = ::poll(__fds.data(), __fds.size(), __timeout.count());
    830 
    831 	if (__res == -1)
    832 	  {
    833 	    __fds.clear();
    834 	    if (errno == EINTR)
    835 	      return _S_retry;
    836 	    return _S_error; // XXX ???
    837 	  }
    838 	else if (__res == 0)
    839 	  {
    840 	    __fds.clear();
    841 	    return _S_timeout;
    842 	  }
    843 	else if (__fds.back().revents != 0) // something changed, restart
    844 	  {
    845 	    __fds.clear();
    846 	    _M_on_notify();
    847 	    return _S_retry;
    848 	  }
    849 
    850 	auto __part = std::stable_partition(__fds.begin(), __fds.end() - 1,
    851 	      [](const __fdvec::value_type& __p) { return __p.revents != 0; });
    852 	__fds.erase(__part, __fds.end());
    853 
    854 	return _S_ok;
    855 #else
    856 	(void) __timeout;
    857 	__fds.clear();
    858 	return _S_error;
    859 #endif
    860       }
    861 
    862       __fdvec _M_fds;	// _M_fds.back() is the read end of the self-pipe
    863       int _M_notify_wr;	// write end of the self-pipe
    864     };
    865 
    866     __reactor _M_reactor;
    867 
    868     vector<__timer_queue_base*>			_M_timers;
    869     forward_list<unique_ptr<__async_operation>>	_M_ops;
    870 
    871 #ifdef _GLIBCXX_HAS_GTHREADS
    872     vector<thread::id>	_M_call_stack;
    873 #else
    874     int _M_run_count = 0;
    875 #endif
    876   };
    877 
    878   inline bool
    879   operator==(const io_context::executor_type& __a,
    880 	     const io_context::executor_type& __b) noexcept
    881   {
    882     // https://github.com/chriskohlhoff/asio-tr2/issues/201
    883     using executor_type = io_context::executor_type;
    884     return std::addressof(executor_type(__a).context())
    885       == std::addressof(executor_type(__b).context());
    886   }
    887 
    888   inline bool
    889   operator!=(const io_context::executor_type& __a,
    890 	     const io_context::executor_type& __b) noexcept
    891   { return !(__a == __b); }
    892 
    893   template<> struct is_executor<io_context::executor_type> : true_type {};
    894 
    895   /// @}
    896 
    897 } // namespace v1
    898 } // namespace net
    899 } // namespace experimental
    900 _GLIBCXX_END_NAMESPACE_VERSION
    901 } // namespace std
    902 
    903 #endif // C++14
    904 
    905 #endif // _GLIBCXX_EXPERIMENTAL_IO_SERVICE
    906