Home | History | Annotate | Line # | Download | only in sync
      1 /**
      2  * The condition module provides a primitive for synchronized condition
      3  * checking.
      4  *
      5  * Copyright: Copyright Sean Kelly 2005 - 2009.
      6  * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
      7  * Authors:   Sean Kelly
      8  * Source:    $(DRUNTIMESRC core/sync/_condition.d)
      9  */
     10 
     11 /*          Copyright Sean Kelly 2005 - 2009.
     12  * Distributed under the Boost Software License, Version 1.0.
     13  *    (See accompanying file LICENSE or copy at
     14  *          http://www.boost.org/LICENSE_1_0.txt)
     15  */
     16 module core.sync.condition;
     17 
     18 
     19 public import core.sync.exception;
     20 public import core.sync.mutex;
     21 public import core.time;
     22 
     23 version (Windows)
     24 {
     25     import core.sync.semaphore;
     26     import core.sys.windows.basetsd /+: HANDLE+/;
     27     import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION,
     28         DeleteCriticalSection, EnterCriticalSection, INFINITE, InitializeCriticalSection,
     29         LeaveCriticalSection, ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/;
     30     import core.sys.windows.windef /+: BOOL, DWORD+/;
     31     import core.sys.windows.winerror /+: WAIT_TIMEOUT+/;
     32 }
     33 else version (Posix)
     34 {
     35     import core.sync.config;
     36     import core.stdc.errno;
     37     import core.sys.posix.pthread;
     38     import core.sys.posix.time;
     39 }
     40 else
     41 {
     42     static assert(false, "Platform not supported");
     43 }
     44 
     45 
     46 ////////////////////////////////////////////////////////////////////////////////
     47 // Condition
     48 //
     49 // void wait();
     50 // void notify();
     51 // void notifyAll();
     52 ////////////////////////////////////////////////////////////////////////////////
     53 
     54 
     55 /**
     56  * This class represents a condition variable as conceived by C.A.R. Hoare.  As
     57  * per Mesa type monitors however, "signal" has been replaced with "notify" to
     58  * indicate that control is not transferred to the waiter when a notification
     59  * is sent.
     60  */
     61 class Condition
     62 {
     63     ////////////////////////////////////////////////////////////////////////////
     64     // Initialization
     65     ////////////////////////////////////////////////////////////////////////////
     66 
     67     /**
     68      * Initializes a condition object which is associated with the supplied
     69      * mutex object.
     70      *
     71      * Params:
     72      *  m = The mutex with which this condition will be associated.
     73      *
     74      * Throws:
     75      *  SyncError on error.
     76      */
     77     this( Mutex m ) nothrow @safe
     78     {
     79         this(m, true);
     80     }
     81 
     82     /// ditto
     83     this( shared Mutex m ) shared nothrow @safe
     84     {
     85         this(m, true);
     86     }
     87 
     88     //
     89     private this(this Q, M)( M m, bool _unused_ ) nothrow @trusted
     90         if ((is(Q == Condition) && is(M == Mutex)) ||
     91             (is(Q == shared Condition) && is(M == shared Mutex)))
     92     {
     93         version (Windows)
     94         {
     95             static if (is(Q == Condition))
     96             {
     97                 alias HANDLE_TYPE = void*;
     98             }
     99             else
    100             {
    101                 alias HANDLE_TYPE = shared(void*);
    102             }
    103             m_blockLock = cast(HANDLE_TYPE) CreateSemaphoreA( null, 1, 1, null );
    104             if ( m_blockLock == m_blockLock.init )
    105                 throw new SyncError( "Unable to initialize condition" );
    106             scope(failure) CloseHandle( cast(void*) m_blockLock );
    107 
    108             m_blockQueue = cast(HANDLE_TYPE) CreateSemaphoreA( null, 0, int.max, null );
    109             if ( m_blockQueue == m_blockQueue.init )
    110                 throw new SyncError( "Unable to initialize condition" );
    111             scope(failure) CloseHandle( cast(void*) m_blockQueue );
    112 
    113             InitializeCriticalSection( cast(RTL_CRITICAL_SECTION*) &m_unblockLock );
    114             m_assocMutex = m;
    115         }
    116         else version (Posix)
    117         {
    118             m_assocMutex = m;
    119             static if ( is( typeof( pthread_condattr_setclock ) ) )
    120             {
    121                 () @trusted
    122                 {
    123                     pthread_condattr_t attr = void;
    124                     int rc  = pthread_condattr_init( &attr );
    125                     if ( rc )
    126                         throw new SyncError( "Unable to initialize condition" );
    127                     rc = pthread_condattr_setclock( &attr, CLOCK_MONOTONIC );
    128                     if ( rc )
    129                         throw new SyncError( "Unable to initialize condition" );
    130                     rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, &attr );
    131                     if ( rc )
    132                         throw new SyncError( "Unable to initialize condition" );
    133                     rc = pthread_condattr_destroy( &attr );
    134                     if ( rc )
    135                         throw new SyncError( "Unable to initialize condition" );
    136                 } ();
    137             }
    138             else
    139             {
    140                 int rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, null );
    141                 if ( rc )
    142                     throw new SyncError( "Unable to initialize condition" );
    143             }
    144         }
    145     }
    146 
    147 
    148     ~this()
    149     {
    150         version (Windows)
    151         {
    152             BOOL rc = CloseHandle( m_blockLock );
    153             assert( rc, "Unable to destroy condition" );
    154             rc = CloseHandle( m_blockQueue );
    155             assert( rc, "Unable to destroy condition" );
    156             DeleteCriticalSection( &m_unblockLock );
    157         }
    158         else version (Posix)
    159         {
    160             int rc = pthread_cond_destroy( &m_hndl );
    161             assert( !rc, "Unable to destroy condition" );
    162         }
    163     }
    164 
    165 
    166     ////////////////////////////////////////////////////////////////////////////
    167     // General Properties
    168     ////////////////////////////////////////////////////////////////////////////
    169 
    170 
    171     /**
    172      * Gets the mutex associated with this condition.
    173      *
    174      * Returns:
    175      *  The mutex associated with this condition.
    176      */
    177     @property Mutex mutex()
    178     {
    179         return m_assocMutex;
    180     }
    181 
    182     /// ditto
    183     @property shared(Mutex) mutex() shared
    184     {
    185         return m_assocMutex;
    186     }
    187 
    188     // undocumented function for internal use
    189     final @property Mutex mutex_nothrow() pure nothrow @safe @nogc
    190     {
    191         return m_assocMutex;
    192     }
    193 
    194     // ditto
    195     final @property shared(Mutex) mutex_nothrow() shared pure nothrow @safe @nogc
    196     {
    197         return m_assocMutex;
    198     }
    199 
    200     ////////////////////////////////////////////////////////////////////////////
    201     // General Actions
    202     ////////////////////////////////////////////////////////////////////////////
    203 
    204 
    205     /**
    206      * Wait until notified.
    207      *
    208      * Throws:
    209      *  SyncError on error.
    210      */
    211     void wait()
    212     {
    213         wait!(typeof(this))(true);
    214     }
    215 
    216     /// ditto
    217     void wait() shared
    218     {
    219         wait!(typeof(this))(true);
    220     }
    221 
    222     /// ditto
    223     void wait(this Q)( bool _unused_ )
    224         if (is(Q == Condition) || is(Q == shared Condition))
    225     {
    226         version (Windows)
    227         {
    228             timedWait( INFINITE );
    229         }
    230         else version (Posix)
    231         {
    232             int rc = pthread_cond_wait( cast(pthread_cond_t*) &m_hndl, (cast(Mutex) m_assocMutex).handleAddr() );
    233             if ( rc )
    234                 throw new SyncError( "Unable to wait for condition" );
    235         }
    236     }
    237 
    238     /**
    239      * Suspends the calling thread until a notification occurs or until the
    240      * supplied time period has elapsed.
    241      *
    242      * Params:
    243      *  val = The time to wait.
    244      *
    245      * In:
    246      *  val must be non-negative.
    247      *
    248      * Throws:
    249      *  SyncError on error.
    250      *
    251      * Returns:
    252      *  true if notified before the timeout and false if not.
    253      */
    254     bool wait( Duration val )
    255     {
    256         return wait!(typeof(this))(val, true);
    257     }
    258 
    259     /// ditto
    260     bool wait( Duration val ) shared
    261     {
    262         return wait!(typeof(this))(val, true);
    263     }
    264 
    265     /// ditto
    266     bool wait(this Q)( Duration val, bool _unused_ )
    267         if (is(Q == Condition) || is(Q == shared Condition))
    268     in
    269     {
    270         assert( !val.isNegative );
    271     }
    272     do
    273     {
    274         version (Windows)
    275         {
    276             auto maxWaitMillis = dur!("msecs")( uint.max - 1 );
    277 
    278             while ( val > maxWaitMillis )
    279             {
    280                 if ( timedWait( cast(uint)
    281                                maxWaitMillis.total!"msecs" ) )
    282                     return true;
    283                 val -= maxWaitMillis;
    284             }
    285             return timedWait( cast(uint) val.total!"msecs" );
    286         }
    287         else version (Posix)
    288         {
    289             timespec t = void;
    290             mktspec( t, val );
    291 
    292             int rc = pthread_cond_timedwait( cast(pthread_cond_t*) &m_hndl,
    293                                              (cast(Mutex) m_assocMutex).handleAddr(),
    294                                              &t );
    295             if ( !rc )
    296                 return true;
    297             if ( rc == ETIMEDOUT )
    298                 return false;
    299             throw new SyncError( "Unable to wait for condition" );
    300         }
    301     }
    302 
    303     /**
    304      * Notifies one waiter.
    305      *
    306      * Throws:
    307      *  SyncError on error.
    308      */
    309     void notify()
    310     {
    311         notify!(typeof(this))(true);
    312     }
    313 
    314     /// ditto
    315     void notify() shared
    316     {
    317         notify!(typeof(this))(true);
    318     }
    319 
    320     /// ditto
    321     void notify(this Q)( bool _unused_ )
    322         if (is(Q == Condition) || is(Q == shared Condition))
    323     {
    324         version (Windows)
    325         {
    326             notify_( false );
    327         }
    328         else version (Posix)
    329         {
    330             // Since OS X 10.7 (Lion), pthread_cond_signal returns EAGAIN after retrying 8192 times,
    331             // so need to retrying while it returns EAGAIN.
    332             //
    333             // 10.7.0 (Lion):          http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c
    334             // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c
    335             // 10.10.0 (Yosemite):     http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c
    336             // 10.11.0 (El Capitan):   http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c
    337             // 10.12.0 (Sierra):       http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c
    338             // 10.13.0 (High Sierra):  http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c
    339             // 10.14.0 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c
    340             // 10.14.1 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c
    341 
    342             int rc;
    343             do {
    344                 rc = pthread_cond_signal( cast(pthread_cond_t*) &m_hndl );
    345             } while ( rc == EAGAIN );
    346             if ( rc )
    347                 throw new SyncError( "Unable to notify condition" );
    348         }
    349     }
    350 
    351     /**
    352      * Notifies all waiters.
    353      *
    354      * Throws:
    355      *  SyncError on error.
    356      */
    357     void notifyAll()
    358     {
    359         notifyAll!(typeof(this))(true);
    360     }
    361 
    362     /// ditto
    363     void notifyAll() shared
    364     {
    365         notifyAll!(typeof(this))(true);
    366     }
    367 
    368     /// ditto
    369     void notifyAll(this Q)( bool _unused_ )
    370         if (is(Q == Condition) || is(Q == shared Condition))
    371     {
    372         version (Windows)
    373         {
    374             notify_( true );
    375         }
    376         else version (Posix)
    377         {
    378             // Since OS X 10.7 (Lion), pthread_cond_broadcast returns EAGAIN after retrying 8192 times,
    379             // so need to retrying while it returns EAGAIN.
    380             //
    381             // 10.7.0 (Lion):          http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c
    382             // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c
    383             // 10.10.0 (Yosemite):     http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c
    384             // 10.11.0 (El Capitan):   http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c
    385             // 10.12.0 (Sierra):       http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c
    386             // 10.13.0 (High Sierra):  http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c
    387             // 10.14.0 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c
    388             // 10.14.1 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c
    389 
    390             int rc;
    391             do {
    392                 rc = pthread_cond_broadcast( cast(pthread_cond_t*) &m_hndl );
    393             } while ( rc == EAGAIN );
    394             if ( rc )
    395                 throw new SyncError( "Unable to notify condition" );
    396         }
    397     }
    398 
    399 private:
    400     version (Windows)
    401     {
    402         bool timedWait(this Q)( DWORD timeout )
    403             if (is(Q == Condition) || is(Q == shared Condition))
    404         {
    405             static if (is(Q == Condition))
    406             {
    407                 auto op(string o, T, V1)(ref T val, V1 mod)
    408                 {
    409                     return mixin("val " ~ o ~ "mod");
    410                 }
    411             }
    412             else
    413             {
    414                 auto op(string o, T, V1)(ref shared T val, V1 mod)
    415                 {
    416                     import core.atomic: atomicOp;
    417                     return atomicOp!o(val, mod);
    418                 }
    419             }
    420 
    421             int   numSignalsLeft;
    422             int   numWaitersGone;
    423             DWORD rc;
    424 
    425             rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
    426             assert( rc == WAIT_OBJECT_0 );
    427 
    428             op!"+="(m_numWaitersBlocked, 1);
    429 
    430             rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
    431             assert( rc );
    432 
    433             m_assocMutex.unlock();
    434             scope(failure) m_assocMutex.lock();
    435 
    436             rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, timeout );
    437             assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
    438             bool timedOut = (rc == WAIT_TIMEOUT);
    439 
    440             EnterCriticalSection( &m_unblockLock );
    441             scope(failure) LeaveCriticalSection( &m_unblockLock );
    442 
    443             if ( (numSignalsLeft = m_numWaitersToUnblock) != 0 )
    444             {
    445                 if ( timedOut )
    446                 {
    447                     // timeout (or canceled)
    448                     if ( m_numWaitersBlocked != 0 )
    449                     {
    450                         op!"-="(m_numWaitersBlocked, 1);
    451                         // do not unblock next waiter below (already unblocked)
    452                         numSignalsLeft = 0;
    453                     }
    454                     else
    455                     {
    456                         // spurious wakeup pending!!
    457                         m_numWaitersGone = 1;
    458                     }
    459                 }
    460                 if ( op!"-="(m_numWaitersToUnblock, 1) == 0 )
    461                 {
    462                     if ( m_numWaitersBlocked != 0 )
    463                     {
    464                         // open the gate
    465                         rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
    466                         assert( rc );
    467                         // do not open the gate below again
    468                         numSignalsLeft = 0;
    469                     }
    470                     else if ( (numWaitersGone = m_numWaitersGone) != 0 )
    471                     {
    472                         m_numWaitersGone = 0;
    473                     }
    474                 }
    475             }
    476             else if ( op!"+="(m_numWaitersGone, 1) == int.max / 2 )
    477             {
    478                 // timeout/canceled or spurious event :-)
    479                 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
    480                 assert( rc == WAIT_OBJECT_0 );
    481                 // something is going on here - test of timeouts?
    482                 op!"-="(m_numWaitersBlocked, m_numWaitersGone);
    483                 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
    484                 assert( rc == WAIT_OBJECT_0 );
    485                 m_numWaitersGone = 0;
    486             }
    487 
    488             LeaveCriticalSection( &m_unblockLock );
    489 
    490             if ( numSignalsLeft == 1 )
    491             {
    492                 // better now than spurious later (same as ResetEvent)
    493                 for ( ; numWaitersGone > 0; --numWaitersGone )
    494                 {
    495                     rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, INFINITE );
    496                     assert( rc == WAIT_OBJECT_0 );
    497                 }
    498                 // open the gate
    499                 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
    500                 assert( rc );
    501             }
    502             else if ( numSignalsLeft != 0 )
    503             {
    504                 // unblock next waiter
    505                 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null );
    506                 assert( rc );
    507             }
    508             m_assocMutex.lock();
    509             return !timedOut;
    510         }
    511 
    512 
    513         void notify_(this Q)( bool all )
    514             if (is(Q == Condition) || is(Q == shared Condition))
    515         {
    516             static if (is(Q == Condition))
    517             {
    518                 auto op(string o, T, V1)(ref T val, V1 mod)
    519                 {
    520                     return mixin("val " ~ o ~ "mod");
    521                 }
    522             }
    523             else
    524             {
    525                 auto op(string o, T, V1)(ref shared T val, V1 mod)
    526                 {
    527                     import core.atomic: atomicOp;
    528                     return atomicOp!o(val, mod);
    529                 }
    530             }
    531 
    532             DWORD rc;
    533 
    534             EnterCriticalSection( &m_unblockLock );
    535             scope(failure) LeaveCriticalSection( &m_unblockLock );
    536 
    537             if ( m_numWaitersToUnblock != 0 )
    538             {
    539                 if ( m_numWaitersBlocked == 0 )
    540                 {
    541                     LeaveCriticalSection( &m_unblockLock );
    542                     return;
    543                 }
    544                 if ( all )
    545                 {
    546                     op!"+="(m_numWaitersToUnblock, m_numWaitersBlocked);
    547                     m_numWaitersBlocked = 0;
    548                 }
    549                 else
    550                 {
    551                     op!"+="(m_numWaitersToUnblock, 1);
    552                     op!"-="(m_numWaitersBlocked, 1);
    553                 }
    554                 LeaveCriticalSection( &m_unblockLock );
    555             }
    556             else if ( m_numWaitersBlocked > m_numWaitersGone )
    557             {
    558                 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
    559                 assert( rc == WAIT_OBJECT_0 );
    560                 if ( 0 != m_numWaitersGone )
    561                 {
    562                     op!"-="(m_numWaitersBlocked, m_numWaitersGone);
    563                     m_numWaitersGone = 0;
    564                 }
    565                 if ( all )
    566                 {
    567                     m_numWaitersToUnblock = m_numWaitersBlocked;
    568                     m_numWaitersBlocked = 0;
    569                 }
    570                 else
    571                 {
    572                     m_numWaitersToUnblock = 1;
    573                     op!"-="(m_numWaitersBlocked, 1);
    574                 }
    575                 LeaveCriticalSection( &m_unblockLock );
    576                 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null );
    577                 assert( rc );
    578             }
    579             else
    580             {
    581                 LeaveCriticalSection( &m_unblockLock );
    582             }
    583         }
    584 
    585 
    586         // NOTE: This implementation uses Algorithm 8c as described here:
    587         //       http://groups.google.com/group/comp.programming.threads/
    588         //              browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a
    589         HANDLE              m_blockLock;    // auto-reset event (now semaphore)
    590         HANDLE              m_blockQueue;   // auto-reset event (now semaphore)
    591         Mutex               m_assocMutex;   // external mutex/CS
    592         CRITICAL_SECTION    m_unblockLock;  // internal mutex/CS
    593         int                 m_numWaitersGone        = 0;
    594         int                 m_numWaitersBlocked     = 0;
    595         int                 m_numWaitersToUnblock   = 0;
    596     }
    597     else version (Posix)
    598     {
    599         Mutex               m_assocMutex;
    600         pthread_cond_t      m_hndl;
    601     }
    602 }
    603 
    604 
    605 ////////////////////////////////////////////////////////////////////////////////
    606 // Unit Tests
    607 ////////////////////////////////////////////////////////////////////////////////
    608 
    609 unittest
    610 {
    611     import core.thread;
    612     import core.sync.mutex;
    613     import core.sync.semaphore;
    614 
    615 
    616     void testNotify()
    617     {
    618         auto mutex      = new Mutex;
    619         auto condReady  = new Condition( mutex );
    620         auto semDone    = new Semaphore;
    621         auto synLoop    = new Object;
    622         int  numWaiters = 10;
    623         int  numTries   = 10;
    624         int  numReady   = 0;
    625         int  numTotal   = 0;
    626         int  numDone    = 0;
    627         int  numPost    = 0;
    628 
    629         void waiter()
    630         {
    631             for ( int i = 0; i < numTries; ++i )
    632             {
    633                 synchronized( mutex )
    634                 {
    635                     while ( numReady < 1 )
    636                     {
    637                         condReady.wait();
    638                     }
    639                     --numReady;
    640                     ++numTotal;
    641                 }
    642 
    643                 synchronized( synLoop )
    644                 {
    645                     ++numDone;
    646                 }
    647                 semDone.wait();
    648             }
    649         }
    650 
    651         auto group = new ThreadGroup;
    652 
    653         for ( int i = 0; i < numWaiters; ++i )
    654             group.create( &waiter );
    655 
    656         for ( int i = 0; i < numTries; ++i )
    657         {
    658             for ( int j = 0; j < numWaiters; ++j )
    659             {
    660                 synchronized( mutex )
    661                 {
    662                     ++numReady;
    663                     condReady.notify();
    664                 }
    665             }
    666             while ( true )
    667             {
    668                 synchronized( synLoop )
    669                 {
    670                     if ( numDone >= numWaiters )
    671                         break;
    672                 }
    673                 Thread.yield();
    674             }
    675             for ( int j = 0; j < numWaiters; ++j )
    676             {
    677                 semDone.notify();
    678             }
    679         }
    680 
    681         group.joinAll();
    682         assert( numTotal == numWaiters * numTries );
    683     }
    684 
    685 
    686     void testNotifyAll()
    687     {
    688         auto mutex      = new Mutex;
    689         auto condReady  = new Condition( mutex );
    690         int  numWaiters = 10;
    691         int  numReady   = 0;
    692         int  numDone    = 0;
    693         bool alert      = false;
    694 
    695         void waiter()
    696         {
    697             synchronized( mutex )
    698             {
    699                 ++numReady;
    700                 while ( !alert )
    701                     condReady.wait();
    702                 ++numDone;
    703             }
    704         }
    705 
    706         auto group = new ThreadGroup;
    707 
    708         for ( int i = 0; i < numWaiters; ++i )
    709             group.create( &waiter );
    710 
    711         while ( true )
    712         {
    713             synchronized( mutex )
    714             {
    715                 if ( numReady >= numWaiters )
    716                 {
    717                     alert = true;
    718                     condReady.notifyAll();
    719                     break;
    720                 }
    721             }
    722             Thread.yield();
    723         }
    724         group.joinAll();
    725         assert( numReady == numWaiters && numDone == numWaiters );
    726     }
    727 
    728 
    729     void testWaitTimeout()
    730     {
    731         auto mutex      = new Mutex;
    732         auto condReady  = new Condition( mutex );
    733         bool waiting    = false;
    734         bool alertedOne = true;
    735         bool alertedTwo = true;
    736 
    737         void waiter()
    738         {
    739             synchronized( mutex )
    740             {
    741                 waiting    = true;
    742                 // we never want to miss the notification (30s)
    743                 alertedOne = condReady.wait( dur!"seconds"(30) );
    744                 // but we don't want to wait long for the timeout (10ms)
    745                 alertedTwo = condReady.wait( dur!"msecs"(10) );
    746             }
    747         }
    748 
    749         auto thread = new Thread( &waiter );
    750         thread.start();
    751 
    752         while ( true )
    753         {
    754             synchronized( mutex )
    755             {
    756                 if ( waiting )
    757                 {
    758                     condReady.notify();
    759                     break;
    760                 }
    761             }
    762             Thread.yield();
    763         }
    764         thread.join();
    765         assert( waiting );
    766         assert( alertedOne );
    767         assert( !alertedTwo );
    768     }
    769 
    770     testNotify();
    771     testNotifyAll();
    772     testWaitTimeout();
    773 }
    774 
    775 unittest
    776 {
    777     import core.thread;
    778     import core.sync.mutex;
    779     import core.sync.semaphore;
    780 
    781 
    782     void testNotify()
    783     {
    784         auto mutex      = new shared Mutex;
    785         auto condReady  = new shared Condition( mutex );
    786         auto semDone    = new Semaphore;
    787         auto synLoop    = new Object;
    788         int  numWaiters = 10;
    789         int  numTries   = 10;
    790         int  numReady   = 0;
    791         int  numTotal   = 0;
    792         int  numDone    = 0;
    793         int  numPost    = 0;
    794 
    795         void waiter()
    796         {
    797             for ( int i = 0; i < numTries; ++i )
    798             {
    799                 synchronized( mutex )
    800                 {
    801                     while ( numReady < 1 )
    802                     {
    803                         condReady.wait();
    804                     }
    805                     --numReady;
    806                     ++numTotal;
    807                 }
    808 
    809                 synchronized( synLoop )
    810                 {
    811                     ++numDone;
    812                 }
    813                 semDone.wait();
    814             }
    815         }
    816 
    817         auto group = new ThreadGroup;
    818 
    819         for ( int i = 0; i < numWaiters; ++i )
    820             group.create( &waiter );
    821 
    822         for ( int i = 0; i < numTries; ++i )
    823         {
    824             for ( int j = 0; j < numWaiters; ++j )
    825             {
    826                 synchronized( mutex )
    827                 {
    828                     ++numReady;
    829                     condReady.notify();
    830                 }
    831             }
    832             while ( true )
    833             {
    834                 synchronized( synLoop )
    835                 {
    836                     if ( numDone >= numWaiters )
    837                         break;
    838                 }
    839                 Thread.yield();
    840             }
    841             for ( int j = 0; j < numWaiters; ++j )
    842             {
    843                 semDone.notify();
    844             }
    845         }
    846 
    847         group.joinAll();
    848         assert( numTotal == numWaiters * numTries );
    849     }
    850 
    851 
    852     void testNotifyAll()
    853     {
    854         auto mutex      = new shared Mutex;
    855         auto condReady  = new shared Condition( mutex );
    856         int  numWaiters = 10;
    857         int  numReady   = 0;
    858         int  numDone    = 0;
    859         bool alert      = false;
    860 
    861         void waiter()
    862         {
    863             synchronized( mutex )
    864             {
    865                 ++numReady;
    866                 while ( !alert )
    867                     condReady.wait();
    868                 ++numDone;
    869             }
    870         }
    871 
    872         auto group = new ThreadGroup;
    873 
    874         for ( int i = 0; i < numWaiters; ++i )
    875             group.create( &waiter );
    876 
    877         while ( true )
    878         {
    879             synchronized( mutex )
    880             {
    881                 if ( numReady >= numWaiters )
    882                 {
    883                     alert = true;
    884                     condReady.notifyAll();
    885                     break;
    886                 }
    887             }
    888             Thread.yield();
    889         }
    890         group.joinAll();
    891         assert( numReady == numWaiters && numDone == numWaiters );
    892     }
    893 
    894 
    895     void testWaitTimeout()
    896     {
    897         auto mutex      = new shared Mutex;
    898         auto condReady  = new shared Condition( mutex );
    899         bool waiting    = false;
    900         bool alertedOne = true;
    901         bool alertedTwo = true;
    902 
    903         void waiter()
    904         {
    905             synchronized( mutex )
    906             {
    907                 waiting    = true;
    908                 // we never want to miss the notification (30s)
    909                 alertedOne = condReady.wait( dur!"seconds"(30) );
    910                 // but we don't want to wait long for the timeout (10ms)
    911                 alertedTwo = condReady.wait( dur!"msecs"(10) );
    912             }
    913         }
    914 
    915         auto thread = new Thread( &waiter );
    916         thread.start();
    917 
    918         while ( true )
    919         {
    920             synchronized( mutex )
    921             {
    922                 if ( waiting )
    923                 {
    924                     condReady.notify();
    925                     break;
    926                 }
    927             }
    928             Thread.yield();
    929         }
    930         thread.join();
    931         assert( waiting );
    932         assert( alertedOne );
    933         assert( !alertedTwo );
    934     }
    935 
    936     testNotify();
    937     testNotifyAll();
    938     testWaitTimeout();
    939 }
    940