1 /** 2 * The condition module provides a primitive for synchronized condition 3 * checking. 4 * 5 * Copyright: Copyright Sean Kelly 2005 - 2009. 6 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 7 * Authors: Sean Kelly 8 * Source: $(DRUNTIMESRC core/sync/_condition.d) 9 */ 10 11 /* Copyright Sean Kelly 2005 - 2009. 12 * Distributed under the Boost Software License, Version 1.0. 13 * (See accompanying file LICENSE or copy at 14 * http://www.boost.org/LICENSE_1_0.txt) 15 */ 16 module core.sync.condition; 17 18 19 public import core.sync.exception; 20 public import core.sync.mutex; 21 public import core.time; 22 23 version (Windows) 24 { 25 import core.sync.semaphore; 26 import core.sys.windows.basetsd /+: HANDLE+/; 27 import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION, 28 DeleteCriticalSection, EnterCriticalSection, INFINITE, InitializeCriticalSection, 29 LeaveCriticalSection, ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/; 30 import core.sys.windows.windef /+: BOOL, DWORD+/; 31 import core.sys.windows.winerror /+: WAIT_TIMEOUT+/; 32 } 33 else version (Posix) 34 { 35 import core.sync.config; 36 import core.stdc.errno; 37 import core.sys.posix.pthread; 38 import core.sys.posix.time; 39 } 40 else 41 { 42 static assert(false, "Platform not supported"); 43 } 44 45 46 //////////////////////////////////////////////////////////////////////////////// 47 // Condition 48 // 49 // void wait(); 50 // void notify(); 51 // void notifyAll(); 52 //////////////////////////////////////////////////////////////////////////////// 53 54 55 /** 56 * This class represents a condition variable as conceived by C.A.R. Hoare. As 57 * per Mesa type monitors however, "signal" has been replaced with "notify" to 58 * indicate that control is not transferred to the waiter when a notification 59 * is sent. 60 */ 61 class Condition 62 { 63 //////////////////////////////////////////////////////////////////////////// 64 // Initialization 65 //////////////////////////////////////////////////////////////////////////// 66 67 /** 68 * Initializes a condition object which is associated with the supplied 69 * mutex object. 70 * 71 * Params: 72 * m = The mutex with which this condition will be associated. 73 * 74 * Throws: 75 * SyncError on error. 76 */ 77 this( Mutex m ) nothrow @safe 78 { 79 this(m, true); 80 } 81 82 /// ditto 83 this( shared Mutex m ) shared nothrow @safe 84 { 85 this(m, true); 86 } 87 88 // 89 private this(this Q, M)( M m, bool _unused_ ) nothrow @trusted 90 if ((is(Q == Condition) && is(M == Mutex)) || 91 (is(Q == shared Condition) && is(M == shared Mutex))) 92 { 93 version (Windows) 94 { 95 static if (is(Q == Condition)) 96 { 97 alias HANDLE_TYPE = void*; 98 } 99 else 100 { 101 alias HANDLE_TYPE = shared(void*); 102 } 103 m_blockLock = cast(HANDLE_TYPE) CreateSemaphoreA( null, 1, 1, null ); 104 if ( m_blockLock == m_blockLock.init ) 105 throw new SyncError( "Unable to initialize condition" ); 106 scope(failure) CloseHandle( cast(void*) m_blockLock ); 107 108 m_blockQueue = cast(HANDLE_TYPE) CreateSemaphoreA( null, 0, int.max, null ); 109 if ( m_blockQueue == m_blockQueue.init ) 110 throw new SyncError( "Unable to initialize condition" ); 111 scope(failure) CloseHandle( cast(void*) m_blockQueue ); 112 113 InitializeCriticalSection( cast(RTL_CRITICAL_SECTION*) &m_unblockLock ); 114 m_assocMutex = m; 115 } 116 else version (Posix) 117 { 118 m_assocMutex = m; 119 static if ( is( typeof( pthread_condattr_setclock ) ) ) 120 { 121 () @trusted 122 { 123 pthread_condattr_t attr = void; 124 int rc = pthread_condattr_init( &attr ); 125 if ( rc ) 126 throw new SyncError( "Unable to initialize condition" ); 127 rc = pthread_condattr_setclock( &attr, CLOCK_MONOTONIC ); 128 if ( rc ) 129 throw new SyncError( "Unable to initialize condition" ); 130 rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, &attr ); 131 if ( rc ) 132 throw new SyncError( "Unable to initialize condition" ); 133 rc = pthread_condattr_destroy( &attr ); 134 if ( rc ) 135 throw new SyncError( "Unable to initialize condition" ); 136 } (); 137 } 138 else 139 { 140 int rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, null ); 141 if ( rc ) 142 throw new SyncError( "Unable to initialize condition" ); 143 } 144 } 145 } 146 147 148 ~this() 149 { 150 version (Windows) 151 { 152 BOOL rc = CloseHandle( m_blockLock ); 153 assert( rc, "Unable to destroy condition" ); 154 rc = CloseHandle( m_blockQueue ); 155 assert( rc, "Unable to destroy condition" ); 156 DeleteCriticalSection( &m_unblockLock ); 157 } 158 else version (Posix) 159 { 160 int rc = pthread_cond_destroy( &m_hndl ); 161 assert( !rc, "Unable to destroy condition" ); 162 } 163 } 164 165 166 //////////////////////////////////////////////////////////////////////////// 167 // General Properties 168 //////////////////////////////////////////////////////////////////////////// 169 170 171 /** 172 * Gets the mutex associated with this condition. 173 * 174 * Returns: 175 * The mutex associated with this condition. 176 */ 177 @property Mutex mutex() 178 { 179 return m_assocMutex; 180 } 181 182 /// ditto 183 @property shared(Mutex) mutex() shared 184 { 185 return m_assocMutex; 186 } 187 188 // undocumented function for internal use 189 final @property Mutex mutex_nothrow() pure nothrow @safe @nogc 190 { 191 return m_assocMutex; 192 } 193 194 // ditto 195 final @property shared(Mutex) mutex_nothrow() shared pure nothrow @safe @nogc 196 { 197 return m_assocMutex; 198 } 199 200 //////////////////////////////////////////////////////////////////////////// 201 // General Actions 202 //////////////////////////////////////////////////////////////////////////// 203 204 205 /** 206 * Wait until notified. 207 * 208 * Throws: 209 * SyncError on error. 210 */ 211 void wait() 212 { 213 wait!(typeof(this))(true); 214 } 215 216 /// ditto 217 void wait() shared 218 { 219 wait!(typeof(this))(true); 220 } 221 222 /// ditto 223 void wait(this Q)( bool _unused_ ) 224 if (is(Q == Condition) || is(Q == shared Condition)) 225 { 226 version (Windows) 227 { 228 timedWait( INFINITE ); 229 } 230 else version (Posix) 231 { 232 int rc = pthread_cond_wait( cast(pthread_cond_t*) &m_hndl, (cast(Mutex) m_assocMutex).handleAddr() ); 233 if ( rc ) 234 throw new SyncError( "Unable to wait for condition" ); 235 } 236 } 237 238 /** 239 * Suspends the calling thread until a notification occurs or until the 240 * supplied time period has elapsed. 241 * 242 * Params: 243 * val = The time to wait. 244 * 245 * In: 246 * val must be non-negative. 247 * 248 * Throws: 249 * SyncError on error. 250 * 251 * Returns: 252 * true if notified before the timeout and false if not. 253 */ 254 bool wait( Duration val ) 255 { 256 return wait!(typeof(this))(val, true); 257 } 258 259 /// ditto 260 bool wait( Duration val ) shared 261 { 262 return wait!(typeof(this))(val, true); 263 } 264 265 /// ditto 266 bool wait(this Q)( Duration val, bool _unused_ ) 267 if (is(Q == Condition) || is(Q == shared Condition)) 268 in 269 { 270 assert( !val.isNegative ); 271 } 272 do 273 { 274 version (Windows) 275 { 276 auto maxWaitMillis = dur!("msecs")( uint.max - 1 ); 277 278 while ( val > maxWaitMillis ) 279 { 280 if ( timedWait( cast(uint) 281 maxWaitMillis.total!"msecs" ) ) 282 return true; 283 val -= maxWaitMillis; 284 } 285 return timedWait( cast(uint) val.total!"msecs" ); 286 } 287 else version (Posix) 288 { 289 timespec t = void; 290 mktspec( t, val ); 291 292 int rc = pthread_cond_timedwait( cast(pthread_cond_t*) &m_hndl, 293 (cast(Mutex) m_assocMutex).handleAddr(), 294 &t ); 295 if ( !rc ) 296 return true; 297 if ( rc == ETIMEDOUT ) 298 return false; 299 throw new SyncError( "Unable to wait for condition" ); 300 } 301 } 302 303 /** 304 * Notifies one waiter. 305 * 306 * Throws: 307 * SyncError on error. 308 */ 309 void notify() 310 { 311 notify!(typeof(this))(true); 312 } 313 314 /// ditto 315 void notify() shared 316 { 317 notify!(typeof(this))(true); 318 } 319 320 /// ditto 321 void notify(this Q)( bool _unused_ ) 322 if (is(Q == Condition) || is(Q == shared Condition)) 323 { 324 version (Windows) 325 { 326 notify_( false ); 327 } 328 else version (Posix) 329 { 330 // Since OS X 10.7 (Lion), pthread_cond_signal returns EAGAIN after retrying 8192 times, 331 // so need to retrying while it returns EAGAIN. 332 // 333 // 10.7.0 (Lion): http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c 334 // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c 335 // 10.10.0 (Yosemite): http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c 336 // 10.11.0 (El Capitan): http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c 337 // 10.12.0 (Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c 338 // 10.13.0 (High Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c 339 // 10.14.0 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c 340 // 10.14.1 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c 341 342 int rc; 343 do { 344 rc = pthread_cond_signal( cast(pthread_cond_t*) &m_hndl ); 345 } while ( rc == EAGAIN ); 346 if ( rc ) 347 throw new SyncError( "Unable to notify condition" ); 348 } 349 } 350 351 /** 352 * Notifies all waiters. 353 * 354 * Throws: 355 * SyncError on error. 356 */ 357 void notifyAll() 358 { 359 notifyAll!(typeof(this))(true); 360 } 361 362 /// ditto 363 void notifyAll() shared 364 { 365 notifyAll!(typeof(this))(true); 366 } 367 368 /// ditto 369 void notifyAll(this Q)( bool _unused_ ) 370 if (is(Q == Condition) || is(Q == shared Condition)) 371 { 372 version (Windows) 373 { 374 notify_( true ); 375 } 376 else version (Posix) 377 { 378 // Since OS X 10.7 (Lion), pthread_cond_broadcast returns EAGAIN after retrying 8192 times, 379 // so need to retrying while it returns EAGAIN. 380 // 381 // 10.7.0 (Lion): http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c 382 // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c 383 // 10.10.0 (Yosemite): http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c 384 // 10.11.0 (El Capitan): http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c 385 // 10.12.0 (Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c 386 // 10.13.0 (High Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c 387 // 10.14.0 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c 388 // 10.14.1 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c 389 390 int rc; 391 do { 392 rc = pthread_cond_broadcast( cast(pthread_cond_t*) &m_hndl ); 393 } while ( rc == EAGAIN ); 394 if ( rc ) 395 throw new SyncError( "Unable to notify condition" ); 396 } 397 } 398 399 private: 400 version (Windows) 401 { 402 bool timedWait(this Q)( DWORD timeout ) 403 if (is(Q == Condition) || is(Q == shared Condition)) 404 { 405 static if (is(Q == Condition)) 406 { 407 auto op(string o, T, V1)(ref T val, V1 mod) 408 { 409 return mixin("val " ~ o ~ "mod"); 410 } 411 } 412 else 413 { 414 auto op(string o, T, V1)(ref shared T val, V1 mod) 415 { 416 import core.atomic: atomicOp; 417 return atomicOp!o(val, mod); 418 } 419 } 420 421 int numSignalsLeft; 422 int numWaitersGone; 423 DWORD rc; 424 425 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE ); 426 assert( rc == WAIT_OBJECT_0 ); 427 428 op!"+="(m_numWaitersBlocked, 1); 429 430 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); 431 assert( rc ); 432 433 m_assocMutex.unlock(); 434 scope(failure) m_assocMutex.lock(); 435 436 rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, timeout ); 437 assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT ); 438 bool timedOut = (rc == WAIT_TIMEOUT); 439 440 EnterCriticalSection( &m_unblockLock ); 441 scope(failure) LeaveCriticalSection( &m_unblockLock ); 442 443 if ( (numSignalsLeft = m_numWaitersToUnblock) != 0 ) 444 { 445 if ( timedOut ) 446 { 447 // timeout (or canceled) 448 if ( m_numWaitersBlocked != 0 ) 449 { 450 op!"-="(m_numWaitersBlocked, 1); 451 // do not unblock next waiter below (already unblocked) 452 numSignalsLeft = 0; 453 } 454 else 455 { 456 // spurious wakeup pending!! 457 m_numWaitersGone = 1; 458 } 459 } 460 if ( op!"-="(m_numWaitersToUnblock, 1) == 0 ) 461 { 462 if ( m_numWaitersBlocked != 0 ) 463 { 464 // open the gate 465 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); 466 assert( rc ); 467 // do not open the gate below again 468 numSignalsLeft = 0; 469 } 470 else if ( (numWaitersGone = m_numWaitersGone) != 0 ) 471 { 472 m_numWaitersGone = 0; 473 } 474 } 475 } 476 else if ( op!"+="(m_numWaitersGone, 1) == int.max / 2 ) 477 { 478 // timeout/canceled or spurious event :-) 479 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE ); 480 assert( rc == WAIT_OBJECT_0 ); 481 // something is going on here - test of timeouts? 482 op!"-="(m_numWaitersBlocked, m_numWaitersGone); 483 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); 484 assert( rc == WAIT_OBJECT_0 ); 485 m_numWaitersGone = 0; 486 } 487 488 LeaveCriticalSection( &m_unblockLock ); 489 490 if ( numSignalsLeft == 1 ) 491 { 492 // better now than spurious later (same as ResetEvent) 493 for ( ; numWaitersGone > 0; --numWaitersGone ) 494 { 495 rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, INFINITE ); 496 assert( rc == WAIT_OBJECT_0 ); 497 } 498 // open the gate 499 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); 500 assert( rc ); 501 } 502 else if ( numSignalsLeft != 0 ) 503 { 504 // unblock next waiter 505 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null ); 506 assert( rc ); 507 } 508 m_assocMutex.lock(); 509 return !timedOut; 510 } 511 512 513 void notify_(this Q)( bool all ) 514 if (is(Q == Condition) || is(Q == shared Condition)) 515 { 516 static if (is(Q == Condition)) 517 { 518 auto op(string o, T, V1)(ref T val, V1 mod) 519 { 520 return mixin("val " ~ o ~ "mod"); 521 } 522 } 523 else 524 { 525 auto op(string o, T, V1)(ref shared T val, V1 mod) 526 { 527 import core.atomic: atomicOp; 528 return atomicOp!o(val, mod); 529 } 530 } 531 532 DWORD rc; 533 534 EnterCriticalSection( &m_unblockLock ); 535 scope(failure) LeaveCriticalSection( &m_unblockLock ); 536 537 if ( m_numWaitersToUnblock != 0 ) 538 { 539 if ( m_numWaitersBlocked == 0 ) 540 { 541 LeaveCriticalSection( &m_unblockLock ); 542 return; 543 } 544 if ( all ) 545 { 546 op!"+="(m_numWaitersToUnblock, m_numWaitersBlocked); 547 m_numWaitersBlocked = 0; 548 } 549 else 550 { 551 op!"+="(m_numWaitersToUnblock, 1); 552 op!"-="(m_numWaitersBlocked, 1); 553 } 554 LeaveCriticalSection( &m_unblockLock ); 555 } 556 else if ( m_numWaitersBlocked > m_numWaitersGone ) 557 { 558 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE ); 559 assert( rc == WAIT_OBJECT_0 ); 560 if ( 0 != m_numWaitersGone ) 561 { 562 op!"-="(m_numWaitersBlocked, m_numWaitersGone); 563 m_numWaitersGone = 0; 564 } 565 if ( all ) 566 { 567 m_numWaitersToUnblock = m_numWaitersBlocked; 568 m_numWaitersBlocked = 0; 569 } 570 else 571 { 572 m_numWaitersToUnblock = 1; 573 op!"-="(m_numWaitersBlocked, 1); 574 } 575 LeaveCriticalSection( &m_unblockLock ); 576 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null ); 577 assert( rc ); 578 } 579 else 580 { 581 LeaveCriticalSection( &m_unblockLock ); 582 } 583 } 584 585 586 // NOTE: This implementation uses Algorithm 8c as described here: 587 // http://groups.google.com/group/comp.programming.threads/ 588 // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a 589 HANDLE m_blockLock; // auto-reset event (now semaphore) 590 HANDLE m_blockQueue; // auto-reset event (now semaphore) 591 Mutex m_assocMutex; // external mutex/CS 592 CRITICAL_SECTION m_unblockLock; // internal mutex/CS 593 int m_numWaitersGone = 0; 594 int m_numWaitersBlocked = 0; 595 int m_numWaitersToUnblock = 0; 596 } 597 else version (Posix) 598 { 599 Mutex m_assocMutex; 600 pthread_cond_t m_hndl; 601 } 602 } 603 604 605 //////////////////////////////////////////////////////////////////////////////// 606 // Unit Tests 607 //////////////////////////////////////////////////////////////////////////////// 608 609 unittest 610 { 611 import core.thread; 612 import core.sync.mutex; 613 import core.sync.semaphore; 614 615 616 void testNotify() 617 { 618 auto mutex = new Mutex; 619 auto condReady = new Condition( mutex ); 620 auto semDone = new Semaphore; 621 auto synLoop = new Object; 622 int numWaiters = 10; 623 int numTries = 10; 624 int numReady = 0; 625 int numTotal = 0; 626 int numDone = 0; 627 int numPost = 0; 628 629 void waiter() 630 { 631 for ( int i = 0; i < numTries; ++i ) 632 { 633 synchronized( mutex ) 634 { 635 while ( numReady < 1 ) 636 { 637 condReady.wait(); 638 } 639 --numReady; 640 ++numTotal; 641 } 642 643 synchronized( synLoop ) 644 { 645 ++numDone; 646 } 647 semDone.wait(); 648 } 649 } 650 651 auto group = new ThreadGroup; 652 653 for ( int i = 0; i < numWaiters; ++i ) 654 group.create( &waiter ); 655 656 for ( int i = 0; i < numTries; ++i ) 657 { 658 for ( int j = 0; j < numWaiters; ++j ) 659 { 660 synchronized( mutex ) 661 { 662 ++numReady; 663 condReady.notify(); 664 } 665 } 666 while ( true ) 667 { 668 synchronized( synLoop ) 669 { 670 if ( numDone >= numWaiters ) 671 break; 672 } 673 Thread.yield(); 674 } 675 for ( int j = 0; j < numWaiters; ++j ) 676 { 677 semDone.notify(); 678 } 679 } 680 681 group.joinAll(); 682 assert( numTotal == numWaiters * numTries ); 683 } 684 685 686 void testNotifyAll() 687 { 688 auto mutex = new Mutex; 689 auto condReady = new Condition( mutex ); 690 int numWaiters = 10; 691 int numReady = 0; 692 int numDone = 0; 693 bool alert = false; 694 695 void waiter() 696 { 697 synchronized( mutex ) 698 { 699 ++numReady; 700 while ( !alert ) 701 condReady.wait(); 702 ++numDone; 703 } 704 } 705 706 auto group = new ThreadGroup; 707 708 for ( int i = 0; i < numWaiters; ++i ) 709 group.create( &waiter ); 710 711 while ( true ) 712 { 713 synchronized( mutex ) 714 { 715 if ( numReady >= numWaiters ) 716 { 717 alert = true; 718 condReady.notifyAll(); 719 break; 720 } 721 } 722 Thread.yield(); 723 } 724 group.joinAll(); 725 assert( numReady == numWaiters && numDone == numWaiters ); 726 } 727 728 729 void testWaitTimeout() 730 { 731 auto mutex = new Mutex; 732 auto condReady = new Condition( mutex ); 733 bool waiting = false; 734 bool alertedOne = true; 735 bool alertedTwo = true; 736 737 void waiter() 738 { 739 synchronized( mutex ) 740 { 741 waiting = true; 742 // we never want to miss the notification (30s) 743 alertedOne = condReady.wait( dur!"seconds"(30) ); 744 // but we don't want to wait long for the timeout (10ms) 745 alertedTwo = condReady.wait( dur!"msecs"(10) ); 746 } 747 } 748 749 auto thread = new Thread( &waiter ); 750 thread.start(); 751 752 while ( true ) 753 { 754 synchronized( mutex ) 755 { 756 if ( waiting ) 757 { 758 condReady.notify(); 759 break; 760 } 761 } 762 Thread.yield(); 763 } 764 thread.join(); 765 assert( waiting ); 766 assert( alertedOne ); 767 assert( !alertedTwo ); 768 } 769 770 testNotify(); 771 testNotifyAll(); 772 testWaitTimeout(); 773 } 774 775 unittest 776 { 777 import core.thread; 778 import core.sync.mutex; 779 import core.sync.semaphore; 780 781 782 void testNotify() 783 { 784 auto mutex = new shared Mutex; 785 auto condReady = new shared Condition( mutex ); 786 auto semDone = new Semaphore; 787 auto synLoop = new Object; 788 int numWaiters = 10; 789 int numTries = 10; 790 int numReady = 0; 791 int numTotal = 0; 792 int numDone = 0; 793 int numPost = 0; 794 795 void waiter() 796 { 797 for ( int i = 0; i < numTries; ++i ) 798 { 799 synchronized( mutex ) 800 { 801 while ( numReady < 1 ) 802 { 803 condReady.wait(); 804 } 805 --numReady; 806 ++numTotal; 807 } 808 809 synchronized( synLoop ) 810 { 811 ++numDone; 812 } 813 semDone.wait(); 814 } 815 } 816 817 auto group = new ThreadGroup; 818 819 for ( int i = 0; i < numWaiters; ++i ) 820 group.create( &waiter ); 821 822 for ( int i = 0; i < numTries; ++i ) 823 { 824 for ( int j = 0; j < numWaiters; ++j ) 825 { 826 synchronized( mutex ) 827 { 828 ++numReady; 829 condReady.notify(); 830 } 831 } 832 while ( true ) 833 { 834 synchronized( synLoop ) 835 { 836 if ( numDone >= numWaiters ) 837 break; 838 } 839 Thread.yield(); 840 } 841 for ( int j = 0; j < numWaiters; ++j ) 842 { 843 semDone.notify(); 844 } 845 } 846 847 group.joinAll(); 848 assert( numTotal == numWaiters * numTries ); 849 } 850 851 852 void testNotifyAll() 853 { 854 auto mutex = new shared Mutex; 855 auto condReady = new shared Condition( mutex ); 856 int numWaiters = 10; 857 int numReady = 0; 858 int numDone = 0; 859 bool alert = false; 860 861 void waiter() 862 { 863 synchronized( mutex ) 864 { 865 ++numReady; 866 while ( !alert ) 867 condReady.wait(); 868 ++numDone; 869 } 870 } 871 872 auto group = new ThreadGroup; 873 874 for ( int i = 0; i < numWaiters; ++i ) 875 group.create( &waiter ); 876 877 while ( true ) 878 { 879 synchronized( mutex ) 880 { 881 if ( numReady >= numWaiters ) 882 { 883 alert = true; 884 condReady.notifyAll(); 885 break; 886 } 887 } 888 Thread.yield(); 889 } 890 group.joinAll(); 891 assert( numReady == numWaiters && numDone == numWaiters ); 892 } 893 894 895 void testWaitTimeout() 896 { 897 auto mutex = new shared Mutex; 898 auto condReady = new shared Condition( mutex ); 899 bool waiting = false; 900 bool alertedOne = true; 901 bool alertedTwo = true; 902 903 void waiter() 904 { 905 synchronized( mutex ) 906 { 907 waiting = true; 908 // we never want to miss the notification (30s) 909 alertedOne = condReady.wait( dur!"seconds"(30) ); 910 // but we don't want to wait long for the timeout (10ms) 911 alertedTwo = condReady.wait( dur!"msecs"(10) ); 912 } 913 } 914 915 auto thread = new Thread( &waiter ); 916 thread.start(); 917 918 while ( true ) 919 { 920 synchronized( mutex ) 921 { 922 if ( waiting ) 923 { 924 condReady.notify(); 925 break; 926 } 927 } 928 Thread.yield(); 929 } 930 thread.join(); 931 assert( waiting ); 932 assert( alertedOne ); 933 assert( !alertedTwo ); 934 } 935 936 testNotify(); 937 testNotifyAll(); 938 testWaitTimeout(); 939 } 940