Home | History | Annotate | Line # | Download | only in src
      1 // SPDX-FileCopyrightText: 2010-2011 Mathieu Desnoyers <mathieu.desnoyers (at) efficios.com>
      2 // SPDX-FileCopyrightText: 2011 Lai Jiangshan <laijs (at) cn.fujitsu.com>
      3 //
      4 // SPDX-License-Identifier: LGPL-2.1-or-later
      5 
      6 /*
      7  * Userspace RCU library - Lock-Free Resizable RCU Hash Table
      8  */
      9 
     10 /*
     11  * Based on the following articles:
     12  * - Ori Shalev and Nir Shavit. Split-ordered lists: Lock-free
     13  *   extensible hash tables. J. ACM 53, 3 (May 2006), 379-405.
     14  * - Michael, M. M. High performance dynamic lock-free hash tables
     15  *   and list-based sets. In Proceedings of the fourteenth annual ACM
     16  *   symposium on Parallel algorithms and architectures, ACM Press,
     17  *   (2002), 73-82.
     18  *
     19  * Some specificities of this Lock-Free Resizable RCU Hash Table
     20  * implementation:
     21  *
     22  * - RCU read-side critical section allows readers to perform hash
     23  *   table lookups, as well as traversals, and use the returned objects
     24  *   safely by allowing memory reclaim to take place only after a grace
     25  *   period.
     26  * - Add and remove operations are lock-free, and do not need to
     27  *   allocate memory. They need to be executed within RCU read-side
     28  *   critical section to ensure the objects they read are valid and to
     29  *   deal with the cmpxchg ABA problem.
     30  * - add and add_unique operations are supported. add_unique checks if
     31  *   the node key already exists in the hash table. It ensures not to
     32  *   populate a duplicate key if the node key already exists in the hash
     33  *   table.
     34  * - The resize operation executes concurrently with
     35  *   add/add_unique/add_replace/remove/lookup/traversal.
     36  * - Hash table nodes are contained within a split-ordered list. This
     37  *   list is ordered by incrementing reversed-bits-hash value.
     38  * - An index of bucket nodes is kept. These bucket nodes are the hash
     39  *   table "buckets". These buckets are internal nodes that allow to
     40  *   perform a fast hash lookup, similarly to a skip list. These
     41  *   buckets are chained together in the split-ordered list, which
     42  *   allows recursive expansion by inserting new buckets between the
     43  *   existing buckets. The split-ordered list allows adding new buckets
     44  *   between existing buckets as the table needs to grow.
     45  * - The resize operation for small tables only allows expanding the
     46  *   hash table. It is triggered automatically by detecting long chains
     47  *   in the add operation.
     48  * - The resize operation for larger tables (and available through an
     49  *   API) allows both expanding and shrinking the hash table.
     50  * - Split-counters are used to keep track of the number of
     51  *   nodes within the hash table for automatic resize triggering.
     52  * - Resize operation initiated by long chain detection is executed by a
     53  *   worker thread, which keeps lock-freedom of add and remove.
     54  * - Resize operations are protected by a mutex.
     55  * - The removal operation is split in two parts: first, a "removed"
     56  *   flag is set in the next pointer within the node to remove. Then,
     57  *   a "garbage collection" is performed in the bucket containing the
     58  *   removed node (from the start of the bucket up to the removed node).
     59  *   All encountered nodes with "removed" flag set in their next
     60  *   pointers are removed from the linked-list. If the cmpxchg used for
     61  *   removal fails (due to concurrent garbage-collection or concurrent
     62  *   add), we retry from the beginning of the bucket. This ensures that
     63  *   the node with "removed" flag set is removed from the hash table
     64  *   (not visible to lookups anymore) before the RCU read-side critical
     65  *   section held across removal ends. Furthermore, this ensures that
     66  *   the node with "removed" flag set is removed from the linked-list
     67  *   before its memory is reclaimed. After setting the "removal" flag,
     68  *   only the thread which removal is the first to set the "removal
     69  *   owner" flag (with an xchg) into a node's next pointer is considered
     70  *   to have succeeded its removal (and thus owns the node to reclaim).
     71  *   Because we garbage-collect starting from an invariant node (the
     72  *   start-of-bucket bucket node) up to the "removed" node (or find a
     73  *   reverse-hash that is higher), we are sure that a successful
     74  *   traversal of the chain leads to a chain that is present in the
     75  *   linked-list (the start node is never removed) and that it does not
     76  *   contain the "removed" node anymore, even if concurrent delete/add
     77  *   operations are changing the structure of the list concurrently.
     78  * - The add operations perform garbage collection of buckets if they
     79  *   encounter nodes with removed flag set in the bucket where they want
     80  *   to add their new node. This ensures lock-freedom of add operation by
     81  *   helping the remover unlink nodes from the list rather than to wait
     82  *   for it do to so.
     83  * - There are three memory backends for the hash table buckets: the
     84  *   "order table", the "chunks", and the "mmap".
     85  * - These bucket containers contain a compact version of the hash table
     86  *   nodes.
     87  * - The RCU "order table":
     88  *   -  has a first level table indexed by log2(hash index) which is
     89  *      copied and expanded by the resize operation. This order table
     90  *      allows finding the "bucket node" tables.
     91  *   - There is one bucket node table per hash index order. The size of
     92  *     each bucket node table is half the number of hashes contained in
     93  *     this order (except for order 0).
     94  * - The RCU "chunks" is best suited for close interaction with a page
     95  *   allocator. It uses a linear array as index to "chunks" containing
     96  *   each the same number of buckets.
     97  * - The RCU "mmap" memory backend uses a single memory map to hold
     98  *   all buckets.
     99  * - synchronize_rcu is used to garbage-collect the old bucket node table.
    100  *
    101  * Ordering Guarantees:
    102  *
    103  * To discuss these guarantees, we first define "read" operation as any
    104  * of the the basic cds_lfht_lookup, cds_lfht_next_duplicate,
    105  * cds_lfht_first, cds_lfht_next operation, as well as
    106  * cds_lfht_add_unique (failure).
    107  *
    108  * We define "read traversal" operation as any of the following
    109  * group of operations
    110  *  - cds_lfht_lookup followed by iteration with cds_lfht_next_duplicate
    111  *    (and/or cds_lfht_next, although less common).
    112  *  - cds_lfht_add_unique (failure) followed by iteration with
    113  *    cds_lfht_next_duplicate (and/or cds_lfht_next, although less
    114  *    common).
    115  *  - cds_lfht_first followed iteration with cds_lfht_next (and/or
    116  *    cds_lfht_next_duplicate, although less common).
    117  *
    118  * We define "write" operations as any of cds_lfht_add, cds_lfht_replace,
    119  * cds_lfht_add_unique (success), cds_lfht_add_replace, cds_lfht_del.
    120  *
    121  * When cds_lfht_add_unique succeeds (returns the node passed as
    122  * parameter), it acts as a "write" operation. When cds_lfht_add_unique
    123  * fails (returns a node different from the one passed as parameter), it
    124  * acts as a "read" operation. A cds_lfht_add_unique failure is a
    125  * cds_lfht_lookup "read" operation, therefore, any ordering guarantee
    126  * referring to "lookup" imply any of "lookup" or cds_lfht_add_unique
    127  * (failure).
    128  *
    129  * We define "prior" and "later" node as nodes observable by reads and
    130  * read traversals respectively before and after a write or sequence of
    131  * write operations.
    132  *
    133  * Hash-table operations are often cascaded, for example, the pointer
    134  * returned by a cds_lfht_lookup() might be passed to a cds_lfht_next(),
    135  * whose return value might in turn be passed to another hash-table
    136  * operation. This entire cascaded series of operations must be enclosed
    137  * by a pair of matching rcu_read_lock() and rcu_read_unlock()
    138  * operations.
    139  *
    140  * The following ordering guarantees are offered by this hash table:
    141  *
    142  * A.1) "read" after "write": if there is ordering between a write and a
    143  *      later read, then the read is guaranteed to see the write or some
    144  *      later write.
    145  * A.2) "read traversal" after "write": given that there is dependency
    146  *      ordering between reads in a "read traversal", if there is
    147  *      ordering between a write and the first read of the traversal,
    148  *      then the "read traversal" is guaranteed to see the write or
    149  *      some later write.
    150  * B.1) "write" after "read": if there is ordering between a read and a
    151  *      later write, then the read will never see the write.
    152  * B.2) "write" after "read traversal": given that there is dependency
    153  *      ordering between reads in a "read traversal", if there is
    154  *      ordering between the last read of the traversal and a later
    155  *      write, then the "read traversal" will never see the write.
    156  * C)   "write" while "read traversal": if a write occurs during a "read
    157  *      traversal", the traversal may, or may not, see the write.
    158  * D.1) "write" after "write": if there is ordering between a write and
    159  *      a later write, then the later write is guaranteed to see the
    160  *      effects of the first write.
    161  * D.2) Concurrent "write" pairs: The system will assign an arbitrary
    162  *      order to any pair of concurrent conflicting writes.
    163  *      Non-conflicting writes (for example, to different keys) are
    164  *      unordered.
    165  * E)   If a grace period separates a "del" or "replace" operation
    166  *      and a subsequent operation, then that subsequent operation is
    167  *      guaranteed not to see the removed item.
    168  * F)   Uniqueness guarantee: given a hash table that does not contain
    169  *      duplicate items for a given key, there will only be one item in
    170  *      the hash table after an arbitrary sequence of add_unique and/or
    171  *      add_replace operations. Note, however, that a pair of
    172  *      concurrent read operations might well access two different items
    173  *      with that key.
    174  * G.1) If a pair of lookups for a given key are ordered (e.g. by a
    175  *      memory barrier), then the second lookup will return the same
    176  *      node as the previous lookup, or some later node.
    177  * G.2) A "read traversal" that starts after the end of a prior "read
    178  *      traversal" (ordered by memory barriers) is guaranteed to see the
    179  *      same nodes as the previous traversal, or some later nodes.
    180  * G.3) Concurrent "read" pairs: concurrent reads are unordered. For
    181  *      example, if a pair of reads to the same key run concurrently
    182  *      with an insertion of that same key, the reads remain unordered
    183  *      regardless of their return values. In other words, you cannot
    184  *      rely on the values returned by the reads to deduce ordering.
    185  *
    186  * Progress guarantees:
    187  *
    188  * * Reads are wait-free. These operations always move forward in the
    189  *   hash table linked list, and this list has no loop.
    190  * * Writes are lock-free. Any retry loop performed by a write operation
    191  *   is triggered by progress made within another update operation.
    192  *
    193  * Bucket node tables:
    194  *
    195  * hash table	hash table	the last	all bucket node tables
    196  * order	size		bucket node	0   1   2   3   4   5   6(index)
    197  * 				table size
    198  * 0		1		1		1
    199  * 1		2		1		1   1
    200  * 2		4		2		1   1   2
    201  * 3		8		4		1   1   2   4
    202  * 4		16		8		1   1   2   4   8
    203  * 5		32		16		1   1   2   4   8  16
    204  * 6		64		32		1   1   2   4   8  16  32
    205  *
    206  * When growing/shrinking, we only focus on the last bucket node table
    207  * which size is (!order ? 1 : (1 << (order -1))).
    208  *
    209  * Example for growing/shrinking:
    210  * grow hash table from order 5 to 6: init the index=6 bucket node table
    211  * shrink hash table from order 6 to 5: fini the index=6 bucket node table
    212  *
    213  * A bit of ascii art explanation:
    214  *
    215  * The order index is the off-by-one compared to the actual power of 2
    216  * because we use index 0 to deal with the 0 special-case.
    217  *
    218  * This shows the nodes for a small table ordered by reversed bits:
    219  *
    220  *    bits   reverse
    221  * 0  000        000
    222  * 4  100        001
    223  * 2  010        010
    224  * 6  110        011
    225  * 1  001        100
    226  * 5  101        101
    227  * 3  011        110
    228  * 7  111        111
    229  *
    230  * This shows the nodes in order of non-reversed bits, linked by
    231  * reversed-bit order.
    232  *
    233  * order              bits       reverse
    234  * 0               0  000        000
    235  * 1               |  1  001        100             <-
    236  * 2               |  |  2  010        010    <-     |
    237  *                 |  |  |  3  011        110  | <-  |
    238  * 3               -> |  |  |  4  100        001  |  |
    239  *                    -> |  |     5  101        101  |
    240  *                       -> |        6  110        011
    241  *                          ->          7  111        111
    242  */
    243 
    244 #define _LGPL_SOURCE
    245 #include <stdlib.h>
    246 #include <errno.h>
    247 #include <stdio.h>
    248 #include <stdint.h>
    249 #include <string.h>
    250 #include <sched.h>
    251 #include <unistd.h>
    252 #include <stdlib.h>
    253 
    254 #include "compat-getcpu.h"
    255 #include <urcu/assert.h>
    256 #include <urcu/pointer.h>
    257 #include <urcu/call-rcu.h>
    258 #include <urcu/flavor.h>
    259 #include <urcu/arch.h>
    260 #include <urcu/uatomic.h>
    261 #include <urcu/compiler.h>
    262 #include <urcu/rculfhash.h>
    263 #include <stdio.h>
    264 #include <pthread.h>
    265 #include <signal.h>
    266 #include "rculfhash-internal.h"
    267 #include "workqueue.h"
    268 #include "urcu-die.h"
    269 #include "urcu-utils.h"
    270 #include "compat-smp.h"
    271 
    272 /*
    273  * Split-counters lazily update the global counter each 1024
    274  * addition/removal. It automatically keeps track of resize required.
    275  * We use the bucket length as indicator for need to expand for small
    276  * tables and machines lacking per-cpu data support.
    277  */
    278 #define COUNT_COMMIT_ORDER		10
    279 #define DEFAULT_SPLIT_COUNT_MASK	0xFUL
    280 #define CHAIN_LEN_TARGET		1
    281 #define CHAIN_LEN_RESIZE_THRESHOLD	3
    282 
    283 /*
    284  * Define the minimum table size.
    285  */
    286 #define MIN_TABLE_ORDER			0
    287 #define MIN_TABLE_SIZE			(1UL << MIN_TABLE_ORDER)
    288 
    289 /*
    290  * Minimum number of bucket nodes to touch per thread to parallelize grow/shrink.
    291  */
    292 #define MIN_PARTITION_PER_THREAD_ORDER	12
    293 #define MIN_PARTITION_PER_THREAD	(1UL << MIN_PARTITION_PER_THREAD_ORDER)
    294 
    295 /*
    296  * The removed flag needs to be updated atomically with the pointer.
    297  * It indicates that no node must attach to the node scheduled for
    298  * removal, and that node garbage collection must be performed.
    299  * The bucket flag does not require to be updated atomically with the
    300  * pointer, but it is added as a pointer low bit flag to save space.
    301  * The "removal owner" flag is used to detect which of the "del"
    302  * operation that has set the "removed flag" gets to return the removed
    303  * node to its caller. Note that the replace operation does not need to
    304  * iteract with the "removal owner" flag, because it validates that
    305  * the "removed" flag is not set before performing its cmpxchg.
    306  */
    307 #define REMOVED_FLAG		(1UL << 0)
    308 #define BUCKET_FLAG		(1UL << 1)
    309 #define REMOVAL_OWNER_FLAG	(1UL << 2)
    310 #define FLAGS_MASK		((1UL << 3) - 1)
    311 
    312 /* Value of the end pointer. Should not interact with flags. */
    313 #define END_VALUE		NULL
    314 
    315 /*
    316  * ht_items_count: Split-counters counting the number of node addition
    317  * and removal in the table. Only used if the CDS_LFHT_ACCOUNTING flag
    318  * is set at hash table creation.
    319  *
    320  * These are free-running counters, never reset to zero. They count the
    321  * number of add/remove, and trigger every (1 << COUNT_COMMIT_ORDER)
    322  * operations to update the global counter. We choose a power-of-2 value
    323  * for the trigger to deal with 32 or 64-bit overflow of the counter.
    324  */
    325 struct ht_items_count {
    326 	unsigned long add, del;
    327 } __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
    328 
    329 /*
    330  * resize_work: Contains arguments passed to worker thread
    331  * responsible for performing lazy resize.
    332  */
    333 struct resize_work {
    334 	struct urcu_work work;
    335 	struct cds_lfht *ht;
    336 };
    337 
    338 /*
    339  * partition_resize_work: Contains arguments passed to worker threads
    340  * executing the hash table resize on partitions of the hash table
    341  * assigned to each processor's worker thread.
    342  */
    343 struct partition_resize_work {
    344 	pthread_t thread_id;
    345 	struct cds_lfht *ht;
    346 	unsigned long i, start, len;
    347 	void (*fct)(struct cds_lfht *ht, unsigned long i,
    348 		    unsigned long start, unsigned long len);
    349 };
    350 
    351 enum nr_cpus_mask_state {
    352 	NR_CPUS_MASK_INIT_FAILED = -2,
    353 	NR_CPUS_MASK_UNINITIALIZED = -1,
    354 };
    355 
    356 static struct urcu_workqueue *cds_lfht_workqueue;
    357 
    358 /*
    359  * Mutex ensuring mutual exclusion between workqueue initialization and
    360  * fork handlers. cds_lfht_fork_mutex nests inside call_rcu_mutex.
    361  */
    362 static pthread_mutex_t cds_lfht_fork_mutex = PTHREAD_MUTEX_INITIALIZER;
    363 
    364 static struct urcu_atfork cds_lfht_atfork;
    365 
    366 /*
    367  * atfork handler nesting counters. Handle being registered to many urcu
    368  * flavors, thus being possibly invoked more than once in the
    369  * pthread_atfork list of callbacks.
    370  */
    371 static int cds_lfht_workqueue_atfork_nesting;
    372 
    373 static void __attribute__((destructor)) cds_lfht_exit(void);
    374 static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor);
    375 
    376 #ifdef CONFIG_CDS_LFHT_ITER_DEBUG
    377 
    378 static
    379 void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht, struct cds_lfht_iter *iter)
    380 {
    381 	iter->lfht = ht;
    382 }
    383 
    384 #define cds_lfht_iter_debug_assert(...)		urcu_posix_assert(__VA_ARGS__)
    385 
    386 #else
    387 
    388 static
    389 void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht __attribute__((unused)),
    390 		struct cds_lfht_iter *iter __attribute__((unused)))
    391 {
    392 }
    393 
    394 #define cds_lfht_iter_debug_assert(...)
    395 
    396 #endif
    397 
    398 /*
    399  * Algorithm to reverse bits in a word by lookup table, extended to
    400  * 64-bit words.
    401  * Source:
    402  * http://graphics.stanford.edu/~seander/bithacks.html#BitReverseTable
    403  * Originally from Public Domain.
    404  */
    405 
    406 static const uint8_t BitReverseTable256[256] =
    407 {
    408 #define R2(n) (n),   (n) + 2*64,     (n) + 1*64,     (n) + 3*64
    409 #define R4(n) R2(n), R2((n) + 2*16), R2((n) + 1*16), R2((n) + 3*16)
    410 #define R6(n) R4(n), R4((n) + 2*4 ), R4((n) + 1*4 ), R4((n) + 3*4 )
    411 	R6(0), R6(2), R6(1), R6(3)
    412 };
    413 #undef R2
    414 #undef R4
    415 #undef R6
    416 
    417 static
    418 uint8_t bit_reverse_u8(uint8_t v)
    419 {
    420 	return BitReverseTable256[v];
    421 }
    422 
    423 #if (CAA_BITS_PER_LONG == 32)
    424 static
    425 uint32_t bit_reverse_u32(uint32_t v)
    426 {
    427 	return ((uint32_t) bit_reverse_u8(v) << 24) |
    428 		((uint32_t) bit_reverse_u8(v >> 8) << 16) |
    429 		((uint32_t) bit_reverse_u8(v >> 16) << 8) |
    430 		((uint32_t) bit_reverse_u8(v >> 24));
    431 }
    432 #else
    433 static
    434 uint64_t bit_reverse_u64(uint64_t v)
    435 {
    436 	return ((uint64_t) bit_reverse_u8(v) << 56) |
    437 		((uint64_t) bit_reverse_u8(v >> 8)  << 48) |
    438 		((uint64_t) bit_reverse_u8(v >> 16) << 40) |
    439 		((uint64_t) bit_reverse_u8(v >> 24) << 32) |
    440 		((uint64_t) bit_reverse_u8(v >> 32) << 24) |
    441 		((uint64_t) bit_reverse_u8(v >> 40) << 16) |
    442 		((uint64_t) bit_reverse_u8(v >> 48) << 8) |
    443 		((uint64_t) bit_reverse_u8(v >> 56));
    444 }
    445 #endif
    446 
    447 static
    448 unsigned long bit_reverse_ulong(unsigned long v)
    449 {
    450 #if (CAA_BITS_PER_LONG == 32)
    451 	return bit_reverse_u32(v);
    452 #else
    453 	return bit_reverse_u64(v);
    454 #endif
    455 }
    456 
    457 /*
    458  * fls: returns the position of the most significant bit.
    459  * Returns 0 if no bit is set, else returns the position of the most
    460  * significant bit (from 1 to 32 on 32-bit, from 1 to 64 on 64-bit).
    461  */
    462 #if defined(URCU_ARCH_X86)
    463 static inline
    464 unsigned int fls_u32(uint32_t x)
    465 {
    466 	int r;
    467 
    468 	__asm__ ("bsrl %1,%0\n\t"
    469 	    "jnz 1f\n\t"
    470 	    "movl $-1,%0\n\t"
    471 	    "1:\n\t"
    472 	    : "=r" (r) : "rm" (x));
    473 	return r + 1;
    474 }
    475 #define HAS_FLS_U32
    476 #endif
    477 
    478 #if defined(URCU_ARCH_AMD64)
    479 static inline
    480 unsigned int fls_u64(uint64_t x)
    481 {
    482 	long r;
    483 
    484 	__asm__ ("bsrq %1,%0\n\t"
    485 	    "jnz 1f\n\t"
    486 	    "movq $-1,%0\n\t"
    487 	    "1:\n\t"
    488 	    : "=r" (r) : "rm" (x));
    489 	return r + 1;
    490 }
    491 #define HAS_FLS_U64
    492 #endif
    493 
    494 #ifndef HAS_FLS_U64
    495 static __attribute__((unused))
    496 unsigned int fls_u64(uint64_t x)
    497 {
    498 	unsigned int r = 64;
    499 
    500 	if (!x)
    501 		return 0;
    502 
    503 	if (!(x & 0xFFFFFFFF00000000ULL)) {
    504 		x <<= 32;
    505 		r -= 32;
    506 	}
    507 	if (!(x & 0xFFFF000000000000ULL)) {
    508 		x <<= 16;
    509 		r -= 16;
    510 	}
    511 	if (!(x & 0xFF00000000000000ULL)) {
    512 		x <<= 8;
    513 		r -= 8;
    514 	}
    515 	if (!(x & 0xF000000000000000ULL)) {
    516 		x <<= 4;
    517 		r -= 4;
    518 	}
    519 	if (!(x & 0xC000000000000000ULL)) {
    520 		x <<= 2;
    521 		r -= 2;
    522 	}
    523 	if (!(x & 0x8000000000000000ULL)) {
    524 		x <<= 1;
    525 		r -= 1;
    526 	}
    527 	return r;
    528 }
    529 #endif
    530 
    531 #ifndef HAS_FLS_U32
    532 static __attribute__((unused))
    533 unsigned int fls_u32(uint32_t x)
    534 {
    535 	unsigned int r = 32;
    536 
    537 	if (!x)
    538 		return 0;
    539 	if (!(x & 0xFFFF0000U)) {
    540 		x <<= 16;
    541 		r -= 16;
    542 	}
    543 	if (!(x & 0xFF000000U)) {
    544 		x <<= 8;
    545 		r -= 8;
    546 	}
    547 	if (!(x & 0xF0000000U)) {
    548 		x <<= 4;
    549 		r -= 4;
    550 	}
    551 	if (!(x & 0xC0000000U)) {
    552 		x <<= 2;
    553 		r -= 2;
    554 	}
    555 	if (!(x & 0x80000000U)) {
    556 		x <<= 1;
    557 		r -= 1;
    558 	}
    559 	return r;
    560 }
    561 #endif
    562 
    563 unsigned int cds_lfht_fls_ulong(unsigned long x)
    564 {
    565 #if (CAA_BITS_PER_LONG == 32)
    566 	return fls_u32(x);
    567 #else
    568 	return fls_u64(x);
    569 #endif
    570 }
    571 
    572 static void *cds_lfht_malloc(void *state __attribute__((unused)),
    573 		size_t size)
    574 {
    575 	return malloc(size);
    576 }
    577 
    578 static void *cds_lfht_calloc(void *state __attribute__((unused)),
    579 		size_t nmemb, size_t size)
    580 {
    581 	return calloc(nmemb, size);
    582 }
    583 
    584 static void *cds_lfht_realloc(void *state __attribute__((unused)),
    585 		void *ptr, size_t size)
    586 {
    587 	return realloc(ptr, size);
    588 }
    589 
    590 static void *cds_lfht_aligned_alloc(void *state __attribute__((unused)),
    591 		size_t alignment, size_t size)
    592 {
    593 	void *ptr;
    594 
    595 	if (posix_memalign(&ptr, alignment, size))
    596 		return NULL;
    597 	return ptr;
    598 }
    599 
    600 static void cds_lfht_free(void *state __attribute__((unused)), void *ptr)
    601 {
    602 	free(ptr);
    603 }
    604 
    605 
    606 /* Default memory allocator */
    607 static struct cds_lfht_alloc cds_lfht_default_alloc = {
    608 	.malloc = cds_lfht_malloc,
    609 	.calloc = cds_lfht_calloc,
    610 	.realloc = cds_lfht_realloc,
    611 	.aligned_alloc = cds_lfht_aligned_alloc,
    612 	.free = cds_lfht_free,
    613 	.state = NULL,
    614 };
    615 
    616 /*
    617  * Return the minimum order for which x <= (1UL << order).
    618  * Return -1 if x is 0.
    619  */
    620 static
    621 int cds_lfht_get_count_order_u32(uint32_t x)
    622 {
    623 	if (!x)
    624 		return -1;
    625 
    626 	return fls_u32(x - 1);
    627 }
    628 
    629 /*
    630  * Return the minimum order for which x <= (1UL << order).
    631  * Return -1 if x is 0.
    632  */
    633 int cds_lfht_get_count_order_ulong(unsigned long x)
    634 {
    635 	if (!x)
    636 		return -1;
    637 
    638 	return cds_lfht_fls_ulong(x - 1);
    639 }
    640 
    641 static
    642 void cds_lfht_resize_lazy_grow(struct cds_lfht *ht, unsigned long size, int growth);
    643 
    644 static
    645 void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size,
    646 				unsigned long count);
    647 
    648 static void mutex_lock(pthread_mutex_t *mutex)
    649 {
    650 	int ret;
    651 
    652 #ifndef DISTRUST_SIGNALS_EXTREME
    653 	ret = pthread_mutex_lock(mutex);
    654 	if (ret)
    655 		urcu_die(ret);
    656 #else /* #ifndef DISTRUST_SIGNALS_EXTREME */
    657 	while ((ret = pthread_mutex_trylock(mutex)) != 0) {
    658 		if (ret != EBUSY && ret != EINTR)
    659 			urcu_die(ret);
    660 		if (CMM_LOAD_SHARED(URCU_TLS(rcu_reader).need_mb)) {
    661 			uatomic_store(&URCU_TLS(rcu_reader).need_mb, 0, CMM_SEQ_CST);
    662 		}
    663 		(void) poll(NULL, 0, 10);
    664 	}
    665 #endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */
    666 }
    667 
    668 static void mutex_unlock(pthread_mutex_t *mutex)
    669 {
    670 	int ret;
    671 
    672 	ret = pthread_mutex_unlock(mutex);
    673 	if (ret)
    674 		urcu_die(ret);
    675 }
    676 
    677 static long nr_cpus_mask = NR_CPUS_MASK_UNINITIALIZED;
    678 static long split_count_mask = -1;
    679 static int split_count_order = -1;
    680 
    681 static void ht_init_nr_cpus_mask(void)
    682 {
    683 	long maxcpus;
    684 
    685 	maxcpus = get_possible_cpus_array_len();
    686 	if (maxcpus <= 0) {
    687 		nr_cpus_mask = NR_CPUS_MASK_INIT_FAILED;
    688 		return;
    689 	}
    690 	/*
    691 	 * round up number of CPUs to next power of two, so we
    692 	 * can use & for modulo.
    693 	 */
    694 	maxcpus = 1UL << cds_lfht_get_count_order_ulong(maxcpus);
    695 	nr_cpus_mask = maxcpus - 1;
    696 }
    697 
    698 static
    699 void alloc_split_items_count(struct cds_lfht *ht)
    700 {
    701 	if (nr_cpus_mask == NR_CPUS_MASK_UNINITIALIZED)	{
    702 		ht_init_nr_cpus_mask();
    703 		if (nr_cpus_mask < 0)
    704 			split_count_mask = DEFAULT_SPLIT_COUNT_MASK;
    705 		else
    706 			split_count_mask = nr_cpus_mask;
    707 		split_count_order =
    708 			cds_lfht_get_count_order_ulong(split_count_mask + 1);
    709 	}
    710 
    711 	urcu_posix_assert(split_count_mask >= 0);
    712 
    713 	if (ht->flags & CDS_LFHT_ACCOUNTING) {
    714 		ht->split_count = ht->alloc->calloc(ht->alloc->state, split_count_mask + 1,
    715 					sizeof(struct ht_items_count));
    716 		urcu_posix_assert(ht->split_count);
    717 	} else {
    718 		ht->split_count = NULL;
    719 	}
    720 }
    721 
    722 static
    723 void free_split_items_count(struct cds_lfht *ht)
    724 {
    725 	poison_free(ht->alloc, ht->split_count);
    726 }
    727 
    728 static
    729 int ht_get_split_count_index(unsigned long hash)
    730 {
    731 	int cpu;
    732 
    733 	urcu_posix_assert(split_count_mask >= 0);
    734 	cpu = urcu_sched_getcpu();
    735 	if (caa_unlikely(cpu < 0))
    736 		return hash & split_count_mask;
    737 	else
    738 		return cpu & split_count_mask;
    739 }
    740 
    741 static
    742 void ht_count_add(struct cds_lfht *ht, unsigned long size, unsigned long hash)
    743 {
    744 	unsigned long split_count, count;
    745 	int index;
    746 
    747 	if (caa_unlikely(!ht->split_count))
    748 		return;
    749 	index = ht_get_split_count_index(hash);
    750 	split_count = uatomic_add_return(&ht->split_count[index].add, 1);
    751 	if (caa_likely(split_count & ((1UL << COUNT_COMMIT_ORDER) - 1)))
    752 		return;
    753 	/* Only if number of add multiple of 1UL << COUNT_COMMIT_ORDER */
    754 
    755 	dbg_printf("add split count %lu\n", split_count);
    756 	count = uatomic_add_return(&ht->count,
    757 				   1UL << COUNT_COMMIT_ORDER);
    758 	if (caa_likely(count & (count - 1)))
    759 		return;
    760 	/* Only if global count is power of 2 */
    761 
    762 	if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) < size)
    763 		return;
    764 	dbg_printf("add set global %lu\n", count);
    765 	cds_lfht_resize_lazy_count(ht, size,
    766 		count >> (CHAIN_LEN_TARGET - 1));
    767 }
    768 
    769 static
    770 void ht_count_del(struct cds_lfht *ht, unsigned long size, unsigned long hash)
    771 {
    772 	unsigned long split_count, count;
    773 	int index;
    774 
    775 	if (caa_unlikely(!ht->split_count))
    776 		return;
    777 	index = ht_get_split_count_index(hash);
    778 	split_count = uatomic_add_return(&ht->split_count[index].del, 1);
    779 	if (caa_likely(split_count & ((1UL << COUNT_COMMIT_ORDER) - 1)))
    780 		return;
    781 	/* Only if number of deletes multiple of 1UL << COUNT_COMMIT_ORDER */
    782 
    783 	dbg_printf("del split count %lu\n", split_count);
    784 	count = uatomic_add_return(&ht->count,
    785 				   -(1UL << COUNT_COMMIT_ORDER));
    786 	if (caa_likely(count & (count - 1)))
    787 		return;
    788 	/* Only if global count is power of 2 */
    789 
    790 	if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) >= size)
    791 		return;
    792 	dbg_printf("del set global %lu\n", count);
    793 	/*
    794 	 * Don't shrink table if the number of nodes is below a
    795 	 * certain threshold.
    796 	 */
    797 	if (count < (1UL << COUNT_COMMIT_ORDER) * (split_count_mask + 1))
    798 		return;
    799 	cds_lfht_resize_lazy_count(ht, size,
    800 		count >> (CHAIN_LEN_TARGET - 1));
    801 }
    802 
    803 static
    804 void check_resize(struct cds_lfht *ht, unsigned long size, uint32_t chain_len)
    805 {
    806 	unsigned long count;
    807 
    808 	if (!(ht->flags & CDS_LFHT_AUTO_RESIZE))
    809 		return;
    810 	count = uatomic_read(&ht->count);
    811 	/*
    812 	 * Use bucket-local length for small table expand and for
    813 	 * environments lacking per-cpu data support.
    814 	 */
    815 	if (count >= (1UL << (COUNT_COMMIT_ORDER + split_count_order)))
    816 		return;
    817 	if (chain_len > 100)
    818 		dbg_printf("WARNING: large chain length: %u.\n",
    819 			   chain_len);
    820 	if (chain_len >= CHAIN_LEN_RESIZE_THRESHOLD) {
    821 		int growth;
    822 
    823 		/*
    824 		 * Ideal growth calculated based on chain length.
    825 		 */
    826 		growth = cds_lfht_get_count_order_u32(chain_len
    827 				- (CHAIN_LEN_TARGET - 1));
    828 		if ((ht->flags & CDS_LFHT_ACCOUNTING)
    829 				&& (size << growth)
    830 					>= (1UL << (COUNT_COMMIT_ORDER
    831 						+ split_count_order))) {
    832 			/*
    833 			 * If ideal growth expands the hash table size
    834 			 * beyond the "small hash table" sizes, use the
    835 			 * maximum small hash table size to attempt
    836 			 * expanding the hash table. This only applies
    837 			 * when node accounting is available, otherwise
    838 			 * the chain length is used to expand the hash
    839 			 * table in every case.
    840 			 */
    841 			growth = COUNT_COMMIT_ORDER + split_count_order
    842 				- cds_lfht_get_count_order_ulong(size);
    843 			if (growth <= 0)
    844 				return;
    845 		}
    846 		cds_lfht_resize_lazy_grow(ht, size, growth);
    847 	}
    848 }
    849 
    850 static
    851 struct cds_lfht_node *clear_flag(struct cds_lfht_node *node)
    852 {
    853 	return (struct cds_lfht_node *) (((unsigned long) node) & ~FLAGS_MASK);
    854 }
    855 
    856 static
    857 int is_removed(const struct cds_lfht_node *node)
    858 {
    859 	return ((unsigned long) node) & REMOVED_FLAG;
    860 }
    861 
    862 static
    863 int is_bucket(struct cds_lfht_node *node)
    864 {
    865 	return ((unsigned long) node) & BUCKET_FLAG;
    866 }
    867 
    868 static
    869 struct cds_lfht_node *flag_bucket(struct cds_lfht_node *node)
    870 {
    871 	return (struct cds_lfht_node *) (((unsigned long) node) | BUCKET_FLAG);
    872 }
    873 
    874 static
    875 int is_removal_owner(struct cds_lfht_node *node)
    876 {
    877 	return ((unsigned long) node) & REMOVAL_OWNER_FLAG;
    878 }
    879 
    880 static
    881 struct cds_lfht_node *flag_removed(struct cds_lfht_node *node)
    882 {
    883 	return (struct cds_lfht_node *) (((unsigned long) node) | REMOVED_FLAG);
    884 }
    885 
    886 static
    887 struct cds_lfht_node *flag_removal_owner(struct cds_lfht_node *node)
    888 {
    889 	return (struct cds_lfht_node *) (((unsigned long) node) | REMOVAL_OWNER_FLAG);
    890 }
    891 
    892 static
    893 struct cds_lfht_node *flag_removed_or_removal_owner(struct cds_lfht_node *node)
    894 {
    895 	return (struct cds_lfht_node *) (((unsigned long) node) | REMOVED_FLAG | REMOVAL_OWNER_FLAG);
    896 }
    897 
    898 static
    899 struct cds_lfht_node *get_end(void)
    900 {
    901 	return (struct cds_lfht_node *) END_VALUE;
    902 }
    903 
    904 static
    905 int is_end(struct cds_lfht_node *node)
    906 {
    907 	return clear_flag(node) == (struct cds_lfht_node *) END_VALUE;
    908 }
    909 
    910 static
    911 unsigned long _uatomic_xchg_monotonic_increase(unsigned long *ptr,
    912 		unsigned long v)
    913 {
    914 	unsigned long old1, old2;
    915 
    916 	old1 = uatomic_read(ptr);
    917 	do {
    918 		old2 = old1;
    919 		if (old2 >= v) {
    920 			cmm_smp_mb();
    921 			return old2;
    922 		}
    923 	} while ((old1 = uatomic_cmpxchg(ptr, old2, v)) != old2);
    924 	return old2;
    925 }
    926 
    927 static
    928 void cds_lfht_alloc_bucket_table(struct cds_lfht *ht, unsigned long order)
    929 {
    930 	return ht->mm->alloc_bucket_table(ht, order);
    931 }
    932 
    933 /*
    934  * cds_lfht_free_bucket_table() should be called with decreasing order.
    935  * When cds_lfht_free_bucket_table(0) is called, it means the whole
    936  * lfht is destroyed.
    937  */
    938 static
    939 void cds_lfht_free_bucket_table(struct cds_lfht *ht, unsigned long order)
    940 {
    941 	return ht->mm->free_bucket_table(ht, order);
    942 }
    943 
    944 static inline
    945 struct cds_lfht_node *bucket_at(struct cds_lfht *ht, unsigned long index)
    946 {
    947 	return ht->bucket_at(ht, index);
    948 }
    949 
    950 static inline
    951 struct cds_lfht_node *lookup_bucket(struct cds_lfht *ht, unsigned long size,
    952 		unsigned long hash)
    953 {
    954 	urcu_posix_assert(size > 0);
    955 	return bucket_at(ht, hash & (size - 1));
    956 }
    957 
    958 /*
    959  * Remove all logically deleted nodes from a bucket up to a certain node key.
    960  */
    961 static
    962 void _cds_lfht_gc_bucket(struct cds_lfht_node *bucket, struct cds_lfht_node *node)
    963 {
    964 	struct cds_lfht_node *iter_prev, *iter, *next, *new_next;
    965 
    966 	urcu_posix_assert(!is_bucket(bucket));
    967 	urcu_posix_assert(!is_removed(bucket));
    968 	urcu_posix_assert(!is_removal_owner(bucket));
    969 	urcu_posix_assert(!is_bucket(node));
    970 	urcu_posix_assert(!is_removed(node));
    971 	urcu_posix_assert(!is_removal_owner(node));
    972 	for (;;) {
    973 		iter_prev = bucket;
    974 		/* We can always skip the bucket node initially */
    975 		iter = rcu_dereference(iter_prev->next);
    976 		urcu_posix_assert(!is_removed(iter));
    977 		urcu_posix_assert(!is_removal_owner(iter));
    978 		urcu_posix_assert(iter_prev->reverse_hash <= node->reverse_hash);
    979 		/*
    980 		 * We should never be called with bucket (start of chain)
    981 		 * and logically removed node (end of path compression
    982 		 * marker) being the actual same node. This would be a
    983 		 * bug in the algorithm implementation.
    984 		 */
    985 		urcu_posix_assert(bucket != node);
    986 		for (;;) {
    987 			if (caa_unlikely(is_end(iter)))
    988 				return;
    989 			if (caa_likely(clear_flag(iter)->reverse_hash > node->reverse_hash))
    990 				return;
    991 			next = rcu_dereference(clear_flag(iter)->next);
    992 			if (caa_likely(is_removed(next)))
    993 				break;
    994 			iter_prev = clear_flag(iter);
    995 			iter = next;
    996 		}
    997 		urcu_posix_assert(!is_removed(iter));
    998 		urcu_posix_assert(!is_removal_owner(iter));
    999 		if (is_bucket(iter))
   1000 			new_next = flag_bucket(clear_flag(next));
   1001 		else
   1002 			new_next = clear_flag(next);
   1003 		(void) uatomic_cmpxchg(&iter_prev->next, iter, new_next);
   1004 	}
   1005 }
   1006 
   1007 static
   1008 int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size,
   1009 		struct cds_lfht_node *old_node,
   1010 		struct cds_lfht_node *old_next,
   1011 		struct cds_lfht_node *new_node)
   1012 {
   1013 	struct cds_lfht_node *bucket, *ret_next;
   1014 
   1015 	if (!old_node)	/* Return -ENOENT if asked to replace NULL node */
   1016 		return -ENOENT;
   1017 
   1018 	urcu_posix_assert(!is_removed(old_node));
   1019 	urcu_posix_assert(!is_removal_owner(old_node));
   1020 	urcu_posix_assert(!is_bucket(old_node));
   1021 	urcu_posix_assert(!is_removed(new_node));
   1022 	urcu_posix_assert(!is_removal_owner(new_node));
   1023 	urcu_posix_assert(!is_bucket(new_node));
   1024 	urcu_posix_assert(new_node != old_node);
   1025 	for (;;) {
   1026 		/* Insert after node to be replaced */
   1027 		if (is_removed(old_next)) {
   1028 			/*
   1029 			 * Too late, the old node has been removed under us
   1030 			 * between lookup and replace. Fail.
   1031 			 */
   1032 			return -ENOENT;
   1033 		}
   1034 		urcu_posix_assert(old_next == clear_flag(old_next));
   1035 		urcu_posix_assert(new_node != old_next);
   1036 		/*
   1037 		 * REMOVAL_OWNER flag is _NEVER_ set before the REMOVED
   1038 		 * flag. It is either set atomically at the same time
   1039 		 * (replace) or after (del).
   1040 		 */
   1041 		urcu_posix_assert(!is_removal_owner(old_next));
   1042 		new_node->next = old_next;
   1043 		/*
   1044 		 * Here is the whole trick for lock-free replace: we add
   1045 		 * the replacement node _after_ the node we want to
   1046 		 * replace by atomically setting its next pointer at the
   1047 		 * same time we set its removal flag. Given that
   1048 		 * the lookups/get next use an iterator aware of the
   1049 		 * next pointer, they will either skip the old node due
   1050 		 * to the removal flag and see the new node, or use
   1051 		 * the old node, but will not see the new one.
   1052 		 * This is a replacement of a node with another node
   1053 		 * that has the same value: we are therefore not
   1054 		 * removing a value from the hash table. We set both the
   1055 		 * REMOVED and REMOVAL_OWNER flags atomically so we own
   1056 		 * the node after successful cmpxchg.
   1057 		 */
   1058 		ret_next = uatomic_cmpxchg(&old_node->next,
   1059 			old_next, flag_removed_or_removal_owner(new_node));
   1060 		if (ret_next == old_next)
   1061 			break;		/* We performed the replacement. */
   1062 		old_next = ret_next;
   1063 	}
   1064 
   1065 	/*
   1066 	 * Ensure that the old node is not visible to readers anymore:
   1067 	 * lookup for the node, and remove it (along with any other
   1068 	 * logically removed node) if found.
   1069 	 */
   1070 	bucket = lookup_bucket(ht, size, bit_reverse_ulong(old_node->reverse_hash));
   1071 	_cds_lfht_gc_bucket(bucket, new_node);
   1072 
   1073 	urcu_posix_assert(is_removed(CMM_LOAD_SHARED(old_node->next)));
   1074 	return 0;
   1075 }
   1076 
   1077 /*
   1078  * A non-NULL unique_ret pointer uses the "add unique" (or uniquify) add
   1079  * mode. A NULL unique_ret allows creation of duplicate keys.
   1080  */
   1081 static
   1082 void _cds_lfht_add(struct cds_lfht *ht,
   1083 		unsigned long hash,
   1084 		cds_lfht_match_fct match,
   1085 		const void *key,
   1086 		unsigned long size,
   1087 		struct cds_lfht_node *node,
   1088 		struct cds_lfht_iter *unique_ret,
   1089 		int bucket_flag)
   1090 {
   1091 	struct cds_lfht_node *iter_prev, *iter, *next, *new_node, *new_next,
   1092 			*return_node;
   1093 	struct cds_lfht_node *bucket;
   1094 
   1095 	urcu_posix_assert(!is_bucket(node));
   1096 	urcu_posix_assert(!is_removed(node));
   1097 	urcu_posix_assert(!is_removal_owner(node));
   1098 	bucket = lookup_bucket(ht, size, hash);
   1099 	for (;;) {
   1100 		uint32_t chain_len = 0;
   1101 
   1102 		/*
   1103 		 * iter_prev points to the non-removed node prior to the
   1104 		 * insert location.
   1105 		 */
   1106 		iter_prev = bucket;
   1107 		/* We can always skip the bucket node initially */
   1108 		iter = rcu_dereference(iter_prev->next);
   1109 		urcu_posix_assert(iter_prev->reverse_hash <= node->reverse_hash);
   1110 		for (;;) {
   1111 			if (caa_unlikely(is_end(iter)))
   1112 				goto insert;
   1113 			if (caa_likely(clear_flag(iter)->reverse_hash > node->reverse_hash))
   1114 				goto insert;
   1115 
   1116 			/* bucket node is the first node of the identical-hash-value chain */
   1117 			if (bucket_flag && clear_flag(iter)->reverse_hash == node->reverse_hash)
   1118 				goto insert;
   1119 
   1120 			next = rcu_dereference(clear_flag(iter)->next);
   1121 			if (caa_unlikely(is_removed(next)))
   1122 				goto gc_node;
   1123 
   1124 			/* uniquely add */
   1125 			if (unique_ret
   1126 			    && !is_bucket(next)
   1127 			    && clear_flag(iter)->reverse_hash == node->reverse_hash) {
   1128 				struct cds_lfht_iter d_iter = {
   1129 					.node = node,
   1130 					.next = iter,
   1131 #ifdef CONFIG_CDS_LFHT_ITER_DEBUG
   1132 					.lfht = ht,
   1133 #endif
   1134 				};
   1135 
   1136 				/*
   1137 				 * uniquely adding inserts the node as the first
   1138 				 * node of the identical-hash-value node chain.
   1139 				 *
   1140 				 * This semantic ensures no duplicated keys
   1141 				 * should ever be observable in the table
   1142 				 * (including traversing the table node by
   1143 				 * node by forward iterations)
   1144 				 */
   1145 				cds_lfht_next_duplicate(ht, match, key, &d_iter);
   1146 				if (!d_iter.node)
   1147 					goto insert;
   1148 
   1149 				*unique_ret = d_iter;
   1150 				return;
   1151 			}
   1152 
   1153 			/* Only account for identical reverse hash once */
   1154 			if (iter_prev->reverse_hash != clear_flag(iter)->reverse_hash
   1155 			    && !is_bucket(next))
   1156 				check_resize(ht, size, ++chain_len);
   1157 			iter_prev = clear_flag(iter);
   1158 			iter = next;
   1159 		}
   1160 
   1161 	insert:
   1162 		urcu_posix_assert(node != clear_flag(iter));
   1163 		urcu_posix_assert(!is_removed(iter_prev));
   1164 		urcu_posix_assert(!is_removal_owner(iter_prev));
   1165 		urcu_posix_assert(!is_removed(iter));
   1166 		urcu_posix_assert(!is_removal_owner(iter));
   1167 		urcu_posix_assert(iter_prev != node);
   1168 		if (!bucket_flag)
   1169 			node->next = clear_flag(iter);
   1170 		else
   1171 			node->next = flag_bucket(clear_flag(iter));
   1172 		if (is_bucket(iter))
   1173 			new_node = flag_bucket(node);
   1174 		else
   1175 			new_node = node;
   1176 		if (uatomic_cmpxchg(&iter_prev->next, iter,
   1177 				    new_node) != iter) {
   1178 			continue;	/* retry */
   1179 		} else {
   1180 			return_node = node;
   1181 			goto end;
   1182 		}
   1183 
   1184 	gc_node:
   1185 		urcu_posix_assert(!is_removed(iter));
   1186 		urcu_posix_assert(!is_removal_owner(iter));
   1187 		if (is_bucket(iter))
   1188 			new_next = flag_bucket(clear_flag(next));
   1189 		else
   1190 			new_next = clear_flag(next);
   1191 		(void) uatomic_cmpxchg(&iter_prev->next, iter, new_next);
   1192 		/* retry */
   1193 	}
   1194 end:
   1195 	if (unique_ret) {
   1196 		unique_ret->node = return_node;
   1197 		/* unique_ret->next left unset, never used. */
   1198 	}
   1199 }
   1200 
   1201 static
   1202 int _cds_lfht_del(struct cds_lfht *ht, unsigned long size,
   1203 		struct cds_lfht_node *node)
   1204 {
   1205 	struct cds_lfht_node *bucket, *next;
   1206 	uintptr_t *node_next;
   1207 
   1208 	if (!node)	/* Return -ENOENT if asked to delete NULL node */
   1209 		return -ENOENT;
   1210 
   1211 	/* logically delete the node */
   1212 	urcu_posix_assert(!is_bucket(node));
   1213 	urcu_posix_assert(!is_removed(node));
   1214 	urcu_posix_assert(!is_removal_owner(node));
   1215 
   1216 	/*
   1217 	 * We are first checking if the node had previously been
   1218 	 * logically removed (this check is not atomic with setting the
   1219 	 * logical removal flag). Return -ENOENT if the node had
   1220 	 * previously been removed.
   1221 	 */
   1222 	next = CMM_LOAD_SHARED(node->next);	/* next is not dereferenced */
   1223 	if (caa_unlikely(is_removed(next)))
   1224 		return -ENOENT;
   1225 	urcu_posix_assert(!is_bucket(next));
   1226 	/*
   1227 	 * The del operation semantic guarantees a full memory barrier
   1228 	 * before the uatomic_or atomic commit of the deletion flag.
   1229 	 *
   1230 	 * We set the REMOVED_FLAG unconditionally. Note that there may
   1231 	 * be more than one concurrent thread setting this flag.
   1232 	 * Knowing which wins the race will be known after the garbage
   1233 	 * collection phase, stay tuned!
   1234 	 *
   1235 	 * NOTE: The node_next variable is present to avoid breaking
   1236 	 * strict-aliasing rules.
   1237 	 */
   1238 	node_next = (uintptr_t*)&node->next;
   1239 	uatomic_or_mo(node_next, REMOVED_FLAG, CMM_RELEASE);
   1240 
   1241 	/* We performed the (logical) deletion. */
   1242 
   1243 	/*
   1244 	 * Ensure that the node is not visible to readers anymore: lookup for
   1245 	 * the node, and remove it (along with any other logically removed node)
   1246 	 * if found.
   1247 	 */
   1248 	bucket = lookup_bucket(ht, size, bit_reverse_ulong(node->reverse_hash));
   1249 	_cds_lfht_gc_bucket(bucket, node);
   1250 
   1251 	urcu_posix_assert(is_removed(CMM_LOAD_SHARED(node->next)));
   1252 	/*
   1253 	 * Last phase: atomically exchange node->next with a version
   1254 	 * having "REMOVAL_OWNER_FLAG" set. If the returned node->next
   1255 	 * pointer did _not_ have "REMOVAL_OWNER_FLAG" set, we now own
   1256 	 * the node and win the removal race.
   1257 	 * It is interesting to note that all "add" paths are forbidden
   1258 	 * to change the next pointer starting from the point where the
   1259 	 * REMOVED_FLAG is set, so here using a read, followed by a
   1260 	 * xchg() suffice to guarantee that the xchg() will ever only
   1261 	 * set the "REMOVAL_OWNER_FLAG" (or change nothing if the flag
   1262 	 * was already set).
   1263 	 */
   1264 	if (!is_removal_owner(uatomic_xchg(&node->next,
   1265 			flag_removal_owner(uatomic_load(&node->next, CMM_RELAXED)))))
   1266 		return 0;
   1267 	else
   1268 		return -ENOENT;
   1269 }
   1270 
   1271 static
   1272 void *partition_resize_thread(void *arg)
   1273 {
   1274 	struct partition_resize_work *work = arg;
   1275 
   1276 	work->ht->flavor->register_thread();
   1277 	work->fct(work->ht, work->i, work->start, work->len);
   1278 	work->ht->flavor->unregister_thread();
   1279 	return NULL;
   1280 }
   1281 
   1282 static
   1283 void partition_resize_helper(struct cds_lfht *ht, unsigned long i,
   1284 		unsigned long len,
   1285 		void (*fct)(struct cds_lfht *ht, unsigned long i,
   1286 			unsigned long start, unsigned long len))
   1287 {
   1288 	unsigned long partition_len, start = 0;
   1289 	struct partition_resize_work *work;
   1290 	int ret;
   1291 	unsigned long thread, nr_threads;
   1292 	sigset_t newmask, oldmask;
   1293 
   1294 	urcu_posix_assert(nr_cpus_mask != NR_CPUS_MASK_UNINITIALIZED);
   1295 	if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD)
   1296 		goto fallback;
   1297 
   1298 	/*
   1299 	 * Note: nr_cpus_mask + 1 is always power of 2.
   1300 	 * We spawn just the number of threads we need to satisfy the minimum
   1301 	 * partition size, up to the number of CPUs in the system.
   1302 	 */
   1303 	if (nr_cpus_mask > 0) {
   1304 		nr_threads = min_t(unsigned long, nr_cpus_mask + 1,
   1305 				 len >> MIN_PARTITION_PER_THREAD_ORDER);
   1306 	} else {
   1307 		nr_threads = 1;
   1308 	}
   1309 	partition_len = len >> cds_lfht_get_count_order_ulong(nr_threads);
   1310 	work = ht->alloc->calloc(ht->alloc->state, nr_threads, sizeof(*work));
   1311 	if (!work) {
   1312 		dbg_printf("error allocating for resize, single-threading\n");
   1313 		goto fallback;
   1314 	}
   1315 
   1316 	ret = sigfillset(&newmask);
   1317 	urcu_posix_assert(!ret);
   1318 	ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask);
   1319 	urcu_posix_assert(!ret);
   1320 
   1321 	for (thread = 0; thread < nr_threads; thread++) {
   1322 		work[thread].ht = ht;
   1323 		work[thread].i = i;
   1324 		work[thread].len = partition_len;
   1325 		work[thread].start = thread * partition_len;
   1326 		work[thread].fct = fct;
   1327 		ret = pthread_create(&(work[thread].thread_id),
   1328 			ht->caller_resize_attr ? &ht->resize_attr : NULL,
   1329 			partition_resize_thread, &work[thread]);
   1330 		if (ret == EAGAIN) {
   1331 			/*
   1332 			 * Out of resources: wait and join the threads
   1333 			 * we've created, then handle leftovers.
   1334 			 */
   1335 			dbg_printf("error spawning for resize, single-threading\n");
   1336 			start = work[thread].start;
   1337 			len -= start;
   1338 			nr_threads = thread;
   1339 			break;
   1340 		}
   1341 		urcu_posix_assert(!ret);
   1342 	}
   1343 
   1344 	ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
   1345 	urcu_posix_assert(!ret);
   1346 
   1347 	for (thread = 0; thread < nr_threads; thread++) {
   1348 		ret = pthread_join(work[thread].thread_id, NULL);
   1349 		urcu_posix_assert(!ret);
   1350 	}
   1351 	ht->alloc->free(ht->alloc->state, work);
   1352 
   1353 	/*
   1354 	 * A pthread_create failure above will either lead in us having
   1355 	 * no threads to join or starting at a non-zero offset,
   1356 	 * fallback to single thread processing of leftovers.
   1357 	 */
   1358 	if (start == 0 && nr_threads > 0)
   1359 		return;
   1360 fallback:
   1361 	fct(ht, i, start, len);
   1362 }
   1363 
   1364 /*
   1365  * Holding RCU read lock to protect _cds_lfht_add against memory
   1366  * reclaim that could be performed by other worker threads (ABA
   1367  * problem).
   1368  *
   1369  * When we reach a certain length, we can split this population phase over
   1370  * many worker threads, based on the number of CPUs available in the system.
   1371  * This should therefore take care of not having the expand lagging behind too
   1372  * many concurrent insertion threads by using the scheduler's ability to
   1373  * schedule bucket node population fairly with insertions.
   1374  */
   1375 static
   1376 void init_table_populate_partition(struct cds_lfht *ht, unsigned long i,
   1377 				   unsigned long start, unsigned long len)
   1378 {
   1379 	unsigned long j, size = 1UL << (i - 1);
   1380 
   1381 	urcu_posix_assert(i > MIN_TABLE_ORDER);
   1382 	ht->flavor->read_lock();
   1383 	for (j = size + start; j < size + start + len; j++) {
   1384 		struct cds_lfht_node *new_node = bucket_at(ht, j);
   1385 
   1386 		urcu_posix_assert(j >= size && j < (size << 1));
   1387 		dbg_printf("init populate: order %lu index %lu hash %lu\n",
   1388 			   i, j, j);
   1389 		new_node->reverse_hash = bit_reverse_ulong(j);
   1390 		_cds_lfht_add(ht, j, NULL, NULL, size, new_node, NULL, 1);
   1391 	}
   1392 	ht->flavor->read_unlock();
   1393 }
   1394 
   1395 static
   1396 void init_table_populate(struct cds_lfht *ht, unsigned long i,
   1397 			 unsigned long len)
   1398 {
   1399 	partition_resize_helper(ht, i, len, init_table_populate_partition);
   1400 }
   1401 
   1402 static
   1403 void init_table(struct cds_lfht *ht,
   1404 		unsigned long first_order, unsigned long last_order)
   1405 {
   1406 	unsigned long i;
   1407 
   1408 	dbg_printf("init table: first_order %lu last_order %lu\n",
   1409 		   first_order, last_order);
   1410 	urcu_posix_assert(first_order > MIN_TABLE_ORDER);
   1411 	for (i = first_order; i <= last_order; i++) {
   1412 		unsigned long len;
   1413 
   1414 		len = 1UL << (i - 1);
   1415 		dbg_printf("init order %lu len: %lu\n", i, len);
   1416 
   1417 		/* Stop expand if the resize target changes under us */
   1418 		if (CMM_LOAD_SHARED(ht->resize_target) < (1UL << i))
   1419 			break;
   1420 
   1421 		cds_lfht_alloc_bucket_table(ht, i);
   1422 
   1423 		/*
   1424 		 * Set all bucket nodes reverse hash values for a level and
   1425 		 * link all bucket nodes into the table.
   1426 		 */
   1427 		init_table_populate(ht, i, len);
   1428 
   1429 		/*
   1430 		 * Update table size.
   1431 		 *
   1432 		 * Populate data before RCU size.
   1433 		 */
   1434 		uatomic_store(&ht->size, 1UL << i, CMM_RELEASE);
   1435 
   1436 		dbg_printf("init new size: %lu\n", 1UL << i);
   1437 		if (CMM_LOAD_SHARED(ht->in_progress_destroy))
   1438 			break;
   1439 	}
   1440 }
   1441 
   1442 /*
   1443  * Holding RCU read lock to protect _cds_lfht_remove against memory
   1444  * reclaim that could be performed by other worker threads (ABA
   1445  * problem).
   1446  * For a single level, we logically remove and garbage collect each node.
   1447  *
   1448  * As a design choice, we perform logical removal and garbage collection on a
   1449  * node-per-node basis to simplify this algorithm. We also assume keeping good
   1450  * cache locality of the operation would overweight possible performance gain
   1451  * that could be achieved by batching garbage collection for multiple levels.
   1452  * However, this would have to be justified by benchmarks.
   1453  *
   1454  * Concurrent removal and add operations are helping us perform garbage
   1455  * collection of logically removed nodes. We guarantee that all logically
   1456  * removed nodes have been garbage-collected (unlinked) before work
   1457  * enqueue is invoked to free a hole level of bucket nodes (after a
   1458  * grace period).
   1459  *
   1460  * Logical removal and garbage collection can therefore be done in batch
   1461  * or on a node-per-node basis, as long as the guarantee above holds.
   1462  *
   1463  * When we reach a certain length, we can split this removal over many worker
   1464  * threads, based on the number of CPUs available in the system. This should
   1465  * take care of not letting resize process lag behind too many concurrent
   1466  * updater threads actively inserting into the hash table.
   1467  */
   1468 static
   1469 void remove_table_partition(struct cds_lfht *ht, unsigned long i,
   1470 			    unsigned long start, unsigned long len)
   1471 {
   1472 	unsigned long j, size = 1UL << (i - 1);
   1473 
   1474 	urcu_posix_assert(i > MIN_TABLE_ORDER);
   1475 	ht->flavor->read_lock();
   1476 	for (j = size + start; j < size + start + len; j++) {
   1477 		struct cds_lfht_node *fini_bucket = bucket_at(ht, j);
   1478 		struct cds_lfht_node *parent_bucket = bucket_at(ht, j - size);
   1479 		uintptr_t *fini_bucket_next;
   1480 
   1481 		urcu_posix_assert(j >= size && j < (size << 1));
   1482 		dbg_printf("remove entry: order %lu index %lu hash %lu\n",
   1483 			   i, j, j);
   1484 		/* Set the REMOVED_FLAG to freeze the ->next for gc.
   1485 		 *
   1486 		 * NOTE: The fini_bucket_next variable is present to
   1487 		 * avoid breaking strict-aliasing rules.
   1488 		 */
   1489 		fini_bucket_next = (uintptr_t*)&fini_bucket->next;
   1490 		uatomic_or(fini_bucket_next, REMOVED_FLAG);
   1491 		_cds_lfht_gc_bucket(parent_bucket, fini_bucket);
   1492 	}
   1493 	ht->flavor->read_unlock();
   1494 }
   1495 
   1496 static
   1497 void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len)
   1498 {
   1499 	partition_resize_helper(ht, i, len, remove_table_partition);
   1500 }
   1501 
   1502 /*
   1503  * fini_table() is never called for first_order == 0, which is why
   1504  * free_by_rcu_order == 0 can be used as criterion to know if free must
   1505  * be called.
   1506  */
   1507 static
   1508 void fini_table(struct cds_lfht *ht,
   1509 		unsigned long first_order, unsigned long last_order)
   1510 {
   1511 	unsigned long free_by_rcu_order = 0, i;
   1512 
   1513 	dbg_printf("fini table: first_order %lu last_order %lu\n",
   1514 		   first_order, last_order);
   1515 	urcu_posix_assert(first_order > MIN_TABLE_ORDER);
   1516 	for (i = last_order; i >= first_order; i--) {
   1517 		unsigned long len;
   1518 
   1519 		len = 1UL << (i - 1);
   1520 		dbg_printf("fini order %ld len: %lu\n", i, len);
   1521 
   1522 		/* Stop shrink if the resize target changes under us */
   1523 		if (CMM_LOAD_SHARED(ht->resize_target) > (1UL << (i - 1)))
   1524 			break;
   1525 
   1526 		cmm_smp_wmb();	/* populate data before RCU size */
   1527 		CMM_STORE_SHARED(ht->size, 1UL << (i - 1));
   1528 
   1529 		/*
   1530 		 * We need to wait for all add operations to reach Q.S. (and
   1531 		 * thus use the new table for lookups) before we can start
   1532 		 * releasing the old bucket nodes. Otherwise their lookup will
   1533 		 * return a logically removed node as insert position.
   1534 		 */
   1535 		ht->flavor->update_synchronize_rcu();
   1536 		if (free_by_rcu_order)
   1537 			cds_lfht_free_bucket_table(ht, free_by_rcu_order);
   1538 
   1539 		/*
   1540 		 * Set "removed" flag in bucket nodes about to be removed.
   1541 		 * Unlink all now-logically-removed bucket node pointers.
   1542 		 * Concurrent add/remove operation are helping us doing
   1543 		 * the gc.
   1544 		 */
   1545 		remove_table(ht, i, len);
   1546 
   1547 		free_by_rcu_order = i;
   1548 
   1549 		dbg_printf("fini new size: %lu\n", 1UL << i);
   1550 		if (CMM_LOAD_SHARED(ht->in_progress_destroy))
   1551 			break;
   1552 	}
   1553 
   1554 	if (free_by_rcu_order) {
   1555 		ht->flavor->update_synchronize_rcu();
   1556 		cds_lfht_free_bucket_table(ht, free_by_rcu_order);
   1557 	}
   1558 }
   1559 
   1560 /*
   1561  * Never called with size < 1.
   1562  */
   1563 static
   1564 void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size)
   1565 {
   1566 	struct cds_lfht_node *prev, *node;
   1567 	unsigned long order, len, i;
   1568 	int bucket_order;
   1569 
   1570 	cds_lfht_alloc_bucket_table(ht, 0);
   1571 
   1572 	dbg_printf("create bucket: order 0 index 0 hash 0\n");
   1573 	node = bucket_at(ht, 0);
   1574 	node->next = flag_bucket(get_end());
   1575 	node->reverse_hash = 0;
   1576 
   1577 	bucket_order = cds_lfht_get_count_order_ulong(size);
   1578 	urcu_posix_assert(bucket_order >= 0);
   1579 
   1580 	for (order = 1; order < (unsigned long) bucket_order + 1; order++) {
   1581 		len = 1UL << (order - 1);
   1582 		cds_lfht_alloc_bucket_table(ht, order);
   1583 
   1584 		for (i = 0; i < len; i++) {
   1585 			/*
   1586 			 * Now, we are trying to init the node with the
   1587 			 * hash=(len+i) (which is also a bucket with the
   1588 			 * index=(len+i)) and insert it into the hash table,
   1589 			 * so this node has to be inserted after the bucket
   1590 			 * with the index=(len+i)&(len-1)=i. And because there
   1591 			 * is no other non-bucket node nor bucket node with
   1592 			 * larger index/hash inserted, so the bucket node
   1593 			 * being inserted should be inserted directly linked
   1594 			 * after the bucket node with index=i.
   1595 			 */
   1596 			prev = bucket_at(ht, i);
   1597 			node = bucket_at(ht, len + i);
   1598 
   1599 			dbg_printf("create bucket: order %lu index %lu hash %lu\n",
   1600 				   order, len + i, len + i);
   1601 			node->reverse_hash = bit_reverse_ulong(len + i);
   1602 
   1603 			/* insert after prev */
   1604 			urcu_posix_assert(is_bucket(prev->next));
   1605 			node->next = prev->next;
   1606 			prev->next = flag_bucket(node);
   1607 		}
   1608 	}
   1609 }
   1610 
   1611 #if (CAA_BITS_PER_LONG > 32)
   1612 /*
   1613  * For 64-bit architectures, with max number of buckets small enough not to
   1614  * use the entire 64-bit memory mapping space (and allowing a fair number of
   1615  * hash table instances), use the mmap allocator, which is faster. Otherwise,
   1616  * fallback to the order allocator.
   1617  */
   1618 static
   1619 const struct cds_lfht_mm_type *get_mm_type(unsigned long max_nr_buckets)
   1620 {
   1621 	if (max_nr_buckets && max_nr_buckets <= (1ULL << 32))
   1622 		return &cds_lfht_mm_mmap;
   1623 	else
   1624 		return &cds_lfht_mm_order;
   1625 }
   1626 #else
   1627 /*
   1628  * For 32-bit architectures, use the order allocator.
   1629  */
   1630 static
   1631 const struct cds_lfht_mm_type *get_mm_type(
   1632 		unsigned long max_nr_buckets __attribute__((unused)))
   1633 {
   1634 	return &cds_lfht_mm_order;
   1635 }
   1636 #endif
   1637 
   1638 void cds_lfht_node_init_deleted(struct cds_lfht_node *node)
   1639 {
   1640 	cds_lfht_node_init(node);
   1641 	node->next = flag_removed(NULL);
   1642 }
   1643 
   1644 struct cds_lfht *_cds_lfht_new_with_alloc(unsigned long init_size,
   1645 			unsigned long min_nr_alloc_buckets,
   1646 			unsigned long max_nr_buckets,
   1647 			int flags,
   1648 			const struct cds_lfht_mm_type *mm,
   1649 			const struct rcu_flavor_struct *flavor,
   1650 			const struct cds_lfht_alloc *alloc,
   1651 			pthread_attr_t *attr)
   1652 {
   1653 	struct cds_lfht *ht;
   1654 	unsigned long order;
   1655 
   1656 	/* min_nr_alloc_buckets must be power of two */
   1657 	if (!min_nr_alloc_buckets || (min_nr_alloc_buckets & (min_nr_alloc_buckets - 1)))
   1658 		return NULL;
   1659 
   1660 	/* init_size must be power of two */
   1661 	if (!init_size || (init_size & (init_size - 1)))
   1662 		return NULL;
   1663 
   1664 	/*
   1665 	 * Memory management plugin default.
   1666 	 */
   1667 	if (!mm)
   1668 		mm = get_mm_type(max_nr_buckets);
   1669 
   1670 	/* max_nr_buckets == 0 for order based mm means infinite */
   1671 	if (mm == &cds_lfht_mm_order && !max_nr_buckets)
   1672 		max_nr_buckets = 1UL << (MAX_TABLE_ORDER - 1);
   1673 
   1674 	/* max_nr_buckets must be power of two */
   1675 	if (!max_nr_buckets || (max_nr_buckets & (max_nr_buckets - 1)))
   1676 		return NULL;
   1677 
   1678 	if (flags & CDS_LFHT_AUTO_RESIZE)
   1679 		cds_lfht_init_worker(flavor);
   1680 
   1681 	min_nr_alloc_buckets = max(min_nr_alloc_buckets, MIN_TABLE_SIZE);
   1682 	init_size = max(init_size, MIN_TABLE_SIZE);
   1683 	max_nr_buckets = max(max_nr_buckets, min_nr_alloc_buckets);
   1684 	init_size = min(init_size, max_nr_buckets);
   1685 
   1686 	ht = mm->alloc_cds_lfht(min_nr_alloc_buckets, max_nr_buckets, alloc ? : &cds_lfht_default_alloc);
   1687 
   1688 	urcu_posix_assert(ht);
   1689 	urcu_posix_assert(ht->mm == mm);
   1690 	urcu_posix_assert(ht->bucket_at == mm->bucket_at);
   1691 
   1692 	ht->flags = flags;
   1693 	ht->flavor = flavor;
   1694 	ht->caller_resize_attr = attr;
   1695 	if (attr)
   1696 		ht->resize_attr = *attr;
   1697 	alloc_split_items_count(ht);
   1698 	/* this mutex should not nest in read-side C.S. */
   1699 	pthread_mutex_init(&ht->resize_mutex, NULL);
   1700 	order = cds_lfht_get_count_order_ulong(init_size);
   1701 	ht->resize_target = 1UL << order;
   1702 	cds_lfht_create_bucket(ht, 1UL << order);
   1703 	ht->size = 1UL << order;
   1704 	return ht;
   1705 }
   1706 
   1707 struct cds_lfht *_cds_lfht_new(unsigned long init_size,
   1708 			unsigned long min_nr_alloc_buckets,
   1709 			unsigned long max_nr_buckets,
   1710 			int flags,
   1711 			const struct cds_lfht_mm_type *mm,
   1712 			const struct rcu_flavor_struct *flavor,
   1713 			pthread_attr_t *attr)
   1714 {
   1715 	return _cds_lfht_new_with_alloc(init_size,
   1716 			min_nr_alloc_buckets, max_nr_buckets,
   1717 			flags, mm, flavor, NULL, attr);
   1718 }
   1719 
   1720 void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash,
   1721 		cds_lfht_match_fct match, const void *key,
   1722 		struct cds_lfht_iter *iter)
   1723 {
   1724 	struct cds_lfht_node *node, *next, *bucket;
   1725 	unsigned long reverse_hash, size;
   1726 
   1727 	cds_lfht_iter_debug_set_ht(ht, iter);
   1728 
   1729 	reverse_hash = bit_reverse_ulong(hash);
   1730 
   1731 	/*
   1732 	 * Use load acquire instead of rcu_dereference because there is no
   1733 	 * dependency between the table size and the dereference of the bucket
   1734 	 * content.
   1735 	 *
   1736 	 * This acquire is paired with the store release in init_table().
   1737 	 */
   1738 	size = uatomic_load(&ht->size, CMM_ACQUIRE);
   1739 	bucket = lookup_bucket(ht, size, hash);
   1740 	/* We can always skip the bucket node initially */
   1741 	node = rcu_dereference(bucket->next);
   1742 	node = clear_flag(node);
   1743 	for (;;) {
   1744 		if (caa_unlikely(is_end(node))) {
   1745 			node = next = NULL;
   1746 			break;
   1747 		}
   1748 		if (caa_unlikely(node->reverse_hash > reverse_hash)) {
   1749 			node = next = NULL;
   1750 			break;
   1751 		}
   1752 		next = rcu_dereference(node->next);
   1753 		urcu_posix_assert(node == clear_flag(node));
   1754 		if (caa_likely(!is_removed(next))
   1755 		    && !is_bucket(next)
   1756 		    && node->reverse_hash == reverse_hash
   1757 		    && caa_likely(match(node, key))) {
   1758 				break;
   1759 		}
   1760 		node = clear_flag(next);
   1761 	}
   1762 	urcu_posix_assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next)));
   1763 	iter->node = node;
   1764 	iter->next = next;
   1765 }
   1766 
   1767 void cds_lfht_next_duplicate(struct cds_lfht *ht __attribute__((unused)),
   1768 		cds_lfht_match_fct match,
   1769 		const void *key, struct cds_lfht_iter *iter)
   1770 {
   1771 	struct cds_lfht_node *node, *next;
   1772 	unsigned long reverse_hash;
   1773 
   1774 	cds_lfht_iter_debug_assert(ht == iter->lfht);
   1775 	node = iter->node;
   1776 	reverse_hash = node->reverse_hash;
   1777 	next = iter->next;
   1778 	node = clear_flag(next);
   1779 
   1780 	for (;;) {
   1781 		if (caa_unlikely(is_end(node))) {
   1782 			node = next = NULL;
   1783 			break;
   1784 		}
   1785 		if (caa_unlikely(node->reverse_hash > reverse_hash)) {
   1786 			node = next = NULL;
   1787 			break;
   1788 		}
   1789 		next = rcu_dereference(node->next);
   1790 		if (caa_likely(!is_removed(next))
   1791 		    && !is_bucket(next)
   1792 		    && caa_likely(match(node, key))) {
   1793 				break;
   1794 		}
   1795 		node = clear_flag(next);
   1796 	}
   1797 	urcu_posix_assert(!node || !is_bucket(uatomic_load(&node->next, CMM_RELAXED)));
   1798 	iter->node = node;
   1799 	iter->next = next;
   1800 }
   1801 
   1802 void cds_lfht_next(struct cds_lfht *ht __attribute__((unused)),
   1803 		struct cds_lfht_iter *iter)
   1804 {
   1805 	struct cds_lfht_node *node, *next;
   1806 
   1807 	cds_lfht_iter_debug_assert(ht == iter->lfht);
   1808 	node = clear_flag(iter->next);
   1809 	for (;;) {
   1810 		if (caa_unlikely(is_end(node))) {
   1811 			node = next = NULL;
   1812 			break;
   1813 		}
   1814 		next = rcu_dereference(node->next);
   1815 		if (caa_likely(!is_removed(next))
   1816 		    && !is_bucket(next)) {
   1817 				break;
   1818 		}
   1819 		node = clear_flag(next);
   1820 	}
   1821 	urcu_posix_assert(!node || !is_bucket(uatomic_load(&node->next, CMM_RELAXED)));
   1822 	iter->node = node;
   1823 	iter->next = next;
   1824 }
   1825 
   1826 void cds_lfht_first(struct cds_lfht *ht, struct cds_lfht_iter *iter)
   1827 {
   1828 	cds_lfht_iter_debug_set_ht(ht, iter);
   1829 	/*
   1830 	 * Get next after first bucket node. The first bucket node is the
   1831 	 * first node of the linked list.
   1832 	 */
   1833 	iter->next = uatomic_load(&bucket_at(ht, 0)->next, CMM_CONSUME);
   1834 	cds_lfht_next(ht, iter);
   1835 }
   1836 
   1837 void cds_lfht_add(struct cds_lfht *ht, unsigned long hash,
   1838 		struct cds_lfht_node *node)
   1839 {
   1840 	unsigned long size;
   1841 
   1842 	node->reverse_hash = bit_reverse_ulong(hash);
   1843 	size = uatomic_load(&ht->size, CMM_ACQUIRE);
   1844 	_cds_lfht_add(ht, hash, NULL, NULL, size, node, NULL, 0);
   1845 	ht_count_add(ht, size, hash);
   1846 }
   1847 
   1848 struct cds_lfht_node *cds_lfht_add_unique(struct cds_lfht *ht,
   1849 				unsigned long hash,
   1850 				cds_lfht_match_fct match,
   1851 				const void *key,
   1852 				struct cds_lfht_node *node)
   1853 {
   1854 	unsigned long size;
   1855 	struct cds_lfht_iter iter;
   1856 
   1857 	node->reverse_hash = bit_reverse_ulong(hash);
   1858 	size = uatomic_load(&ht->size, CMM_ACQUIRE);
   1859 	_cds_lfht_add(ht, hash, match, key, size, node, &iter, 0);
   1860 	if (iter.node == node)
   1861 		ht_count_add(ht, size, hash);
   1862 	return iter.node;
   1863 }
   1864 
   1865 struct cds_lfht_node *cds_lfht_add_replace(struct cds_lfht *ht,
   1866 				unsigned long hash,
   1867 				cds_lfht_match_fct match,
   1868 				const void *key,
   1869 				struct cds_lfht_node *node)
   1870 {
   1871 	unsigned long size;
   1872 	struct cds_lfht_iter iter;
   1873 
   1874 	node->reverse_hash = bit_reverse_ulong(hash);
   1875 	size = uatomic_load(&ht->size, CMM_ACQUIRE);
   1876 	for (;;) {
   1877 		_cds_lfht_add(ht, hash, match, key, size, node, &iter, 0);
   1878 		if (iter.node == node) {
   1879 			ht_count_add(ht, size, hash);
   1880 			return NULL;
   1881 		}
   1882 
   1883 		if (!_cds_lfht_replace(ht, size, iter.node, iter.next, node))
   1884 			return iter.node;
   1885 	}
   1886 }
   1887 
   1888 int cds_lfht_replace(struct cds_lfht *ht,
   1889 		struct cds_lfht_iter *old_iter,
   1890 		unsigned long hash,
   1891 		cds_lfht_match_fct match,
   1892 		const void *key,
   1893 		struct cds_lfht_node *new_node)
   1894 {
   1895 	unsigned long size;
   1896 
   1897 	new_node->reverse_hash = bit_reverse_ulong(hash);
   1898 	if (!old_iter->node)
   1899 		return -ENOENT;
   1900 	if (caa_unlikely(old_iter->node->reverse_hash != new_node->reverse_hash))
   1901 		return -EINVAL;
   1902 	if (caa_unlikely(!match(old_iter->node, key)))
   1903 		return -EINVAL;
   1904 	size = uatomic_load(&ht->size, CMM_ACQUIRE);
   1905 	return _cds_lfht_replace(ht, size, old_iter->node, old_iter->next,
   1906 			new_node);
   1907 }
   1908 
   1909 int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_node *node)
   1910 {
   1911 	unsigned long size;
   1912 	int ret;
   1913 
   1914 	size = uatomic_load(&ht->size, CMM_ACQUIRE);
   1915 	ret = _cds_lfht_del(ht, size, node);
   1916 	if (!ret) {
   1917 		unsigned long hash;
   1918 
   1919 		hash = bit_reverse_ulong(node->reverse_hash);
   1920 		ht_count_del(ht, size, hash);
   1921 	}
   1922 	return ret;
   1923 }
   1924 
   1925 int cds_lfht_is_node_deleted(const struct cds_lfht_node *node)
   1926 {
   1927 	return is_removed(CMM_LOAD_SHARED(node->next));
   1928 }
   1929 
   1930 static
   1931 bool cds_lfht_is_empty(struct cds_lfht *ht)
   1932 {
   1933 	struct cds_lfht_node *node, *next;
   1934 	bool empty = true;
   1935 	bool was_online;
   1936 
   1937 	was_online = ht->flavor->read_ongoing();
   1938 	if (!was_online) {
   1939 		ht->flavor->thread_online();
   1940 		ht->flavor->read_lock();
   1941 	}
   1942 	/* Check that the table is empty */
   1943 	node = bucket_at(ht, 0);
   1944 	do {
   1945 		next = rcu_dereference(node->next);
   1946 		if (!is_bucket(next)) {
   1947 			empty = false;
   1948 			break;
   1949 		}
   1950 		node = clear_flag(next);
   1951 	} while (!is_end(node));
   1952 	if (!was_online) {
   1953 		ht->flavor->read_unlock();
   1954 		ht->flavor->thread_offline();
   1955 	}
   1956 	return empty;
   1957 }
   1958 
   1959 static
   1960 int cds_lfht_delete_bucket(struct cds_lfht *ht)
   1961 {
   1962 	struct cds_lfht_node *node;
   1963 	unsigned long order, i, size;
   1964 
   1965 	/* Check that the table is empty */
   1966 	node = bucket_at(ht, 0);
   1967 	do {
   1968 		node = clear_flag(node)->next;
   1969 		if (!is_bucket(node))
   1970 			return -EPERM;
   1971 		urcu_posix_assert(!is_removed(node));
   1972 		urcu_posix_assert(!is_removal_owner(node));
   1973 	} while (!is_end(node));
   1974 	/*
   1975 	 * size accessed without rcu_dereference because hash table is
   1976 	 * being destroyed.
   1977 	 */
   1978 	size = ht->size;
   1979 	/* Internal sanity check: all nodes left should be buckets */
   1980 	for (i = 0; i < size; i++) {
   1981 		node = bucket_at(ht, i);
   1982 		dbg_printf("delete bucket: index %lu expected hash %lu hash %lu\n",
   1983 			i, i, bit_reverse_ulong(node->reverse_hash));
   1984 		urcu_posix_assert(is_bucket(node->next));
   1985 	}
   1986 
   1987 	for (order = cds_lfht_get_count_order_ulong(size); (long)order >= 0; order--)
   1988 		cds_lfht_free_bucket_table(ht, order);
   1989 
   1990 	return 0;
   1991 }
   1992 
   1993 static
   1994 void do_auto_resize_destroy_cb(struct urcu_work *work)
   1995 {
   1996 	struct cds_lfht *ht = caa_container_of(work, struct cds_lfht, destroy_work);
   1997 	int ret;
   1998 
   1999 	ht->flavor->register_thread();
   2000 	ret = cds_lfht_delete_bucket(ht);
   2001 	if (ret)
   2002 		urcu_die(-ret);
   2003 	free_split_items_count(ht);
   2004 	ret = pthread_mutex_destroy(&ht->resize_mutex);
   2005 	if (ret)
   2006 		urcu_die(ret);
   2007 	ht->flavor->unregister_thread();
   2008 	poison_free(ht->alloc, ht);
   2009 }
   2010 
   2011 /*
   2012  * Should only be called when no more concurrent readers nor writers can
   2013  * possibly access the table.
   2014  */
   2015 int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr)
   2016 {
   2017 	int ret;
   2018 
   2019 	if (ht->flags & CDS_LFHT_AUTO_RESIZE) {
   2020 		/*
   2021 		 * Perform error-checking for emptiness before queuing
   2022 		 * work, so we can return error to the caller. This runs
   2023 		 * concurrently with ongoing resize.
   2024 		 */
   2025 		if (!cds_lfht_is_empty(ht))
   2026 			return -EPERM;
   2027 		/* Cancel ongoing resize operations. */
   2028 		uatomic_store(&ht->in_progress_destroy, 1, CMM_RELAXED);
   2029 		if (attr) {
   2030 			*attr = ht->caller_resize_attr;
   2031 			ht->caller_resize_attr = NULL;
   2032 		}
   2033 		/*
   2034 		 * Queue destroy work after prior queued resize
   2035 		 * operations. Given there are no concurrent writers
   2036 		 * accessing the hash table at this point, no resize
   2037 		 * operations can be queued after this destroy work.
   2038 		 */
   2039 		urcu_workqueue_queue_work(cds_lfht_workqueue,
   2040 			&ht->destroy_work, do_auto_resize_destroy_cb);
   2041 		return 0;
   2042 	}
   2043 	ret = cds_lfht_delete_bucket(ht);
   2044 	if (ret)
   2045 		return ret;
   2046 	free_split_items_count(ht);
   2047 	if (attr)
   2048 		*attr = ht->caller_resize_attr;
   2049 	ret = pthread_mutex_destroy(&ht->resize_mutex);
   2050 	if (ret)
   2051 		ret = -EBUSY;
   2052 	poison_free(ht->alloc, ht);
   2053 	return ret;
   2054 }
   2055 
   2056 void cds_lfht_count_nodes(struct cds_lfht *ht,
   2057 		long *approx_before,
   2058 		unsigned long *count,
   2059 		long *approx_after)
   2060 {
   2061 	struct cds_lfht_node *node, *next;
   2062 	unsigned long nr_bucket = 0, nr_removed = 0;
   2063 
   2064 	*approx_before = 0;
   2065 	if (ht->split_count) {
   2066 		int i;
   2067 
   2068 		for (i = 0; i < split_count_mask + 1; i++) {
   2069 			*approx_before += uatomic_read(&ht->split_count[i].add);
   2070 			*approx_before -= uatomic_read(&ht->split_count[i].del);
   2071 		}
   2072 	}
   2073 
   2074 	*count = 0;
   2075 
   2076 	/* Count non-bucket nodes in the table */
   2077 	node = bucket_at(ht, 0);
   2078 	do {
   2079 		next = rcu_dereference(node->next);
   2080 		if (is_removed(next)) {
   2081 			if (!is_bucket(next))
   2082 				(nr_removed)++;
   2083 			else
   2084 				(nr_bucket)++;
   2085 		} else if (!is_bucket(next))
   2086 			(*count)++;
   2087 		else
   2088 			(nr_bucket)++;
   2089 		node = clear_flag(next);
   2090 	} while (!is_end(node));
   2091 	dbg_printf("number of logically removed nodes: %lu\n", nr_removed);
   2092 	dbg_printf("number of bucket nodes: %lu\n", nr_bucket);
   2093 	*approx_after = 0;
   2094 	if (ht->split_count) {
   2095 		int i;
   2096 
   2097 		for (i = 0; i < split_count_mask + 1; i++) {
   2098 			*approx_after += uatomic_read(&ht->split_count[i].add);
   2099 			*approx_after -= uatomic_read(&ht->split_count[i].del);
   2100 		}
   2101 	}
   2102 }
   2103 
   2104 /* called with resize mutex held */
   2105 static
   2106 void _do_cds_lfht_grow(struct cds_lfht *ht,
   2107 		unsigned long old_size, unsigned long new_size)
   2108 {
   2109 	unsigned long old_order, new_order;
   2110 
   2111 	old_order = cds_lfht_get_count_order_ulong(old_size);
   2112 	new_order = cds_lfht_get_count_order_ulong(new_size);
   2113 	dbg_printf("resize from %lu (order %lu) to %lu (order %lu) buckets\n",
   2114 		   old_size, old_order, new_size, new_order);
   2115 	urcu_posix_assert(new_size > old_size);
   2116 	init_table(ht, old_order + 1, new_order);
   2117 }
   2118 
   2119 /* called with resize mutex held */
   2120 static
   2121 void _do_cds_lfht_shrink(struct cds_lfht *ht,
   2122 		unsigned long old_size, unsigned long new_size)
   2123 {
   2124 	unsigned long old_order, new_order;
   2125 
   2126 	new_size = max(new_size, MIN_TABLE_SIZE);
   2127 	old_order = cds_lfht_get_count_order_ulong(old_size);
   2128 	new_order = cds_lfht_get_count_order_ulong(new_size);
   2129 	dbg_printf("resize from %lu (order %lu) to %lu (order %lu) buckets\n",
   2130 		   old_size, old_order, new_size, new_order);
   2131 	urcu_posix_assert(new_size < old_size);
   2132 
   2133 	/* Remove and unlink all bucket nodes to remove. */
   2134 	fini_table(ht, new_order + 1, old_order);
   2135 }
   2136 
   2137 
   2138 /* called with resize mutex held */
   2139 static
   2140 void _do_cds_lfht_resize(struct cds_lfht *ht)
   2141 {
   2142 	unsigned long new_size, old_size;
   2143 
   2144 	/*
   2145 	 * Resize table, re-do if the target size has changed under us.
   2146 	 */
   2147 	do {
   2148 		if (uatomic_load(&ht->in_progress_destroy, CMM_RELAXED))
   2149 			break;
   2150 
   2151 		uatomic_store(&ht->resize_initiated, 1, CMM_RELAXED);
   2152 
   2153 		old_size = ht->size;
   2154 		new_size = uatomic_load(&ht->resize_target, CMM_RELAXED);
   2155 		if (old_size < new_size)
   2156 			_do_cds_lfht_grow(ht, old_size, new_size);
   2157 		else if (old_size > new_size)
   2158 			_do_cds_lfht_shrink(ht, old_size, new_size);
   2159 
   2160 		uatomic_store(&ht->resize_initiated, 0, CMM_RELAXED);
   2161 		/* write resize_initiated before read resize_target */
   2162 		cmm_smp_mb();
   2163 	} while (ht->size != uatomic_load(&ht->resize_target, CMM_RELAXED));
   2164 }
   2165 
   2166 static
   2167 unsigned long resize_target_grow(struct cds_lfht *ht, unsigned long new_size)
   2168 {
   2169 	return _uatomic_xchg_monotonic_increase(&ht->resize_target, new_size);
   2170 }
   2171 
   2172 static
   2173 void resize_target_update_count(struct cds_lfht *ht,
   2174 				unsigned long count)
   2175 {
   2176 	count = max(count, MIN_TABLE_SIZE);
   2177 	count = min(count, ht->max_nr_buckets);
   2178 	uatomic_set(&ht->resize_target, count);
   2179 }
   2180 
   2181 void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size)
   2182 {
   2183 	resize_target_update_count(ht, new_size);
   2184 
   2185 	/*
   2186 	 * Set flags has early as possible even in contention case.
   2187 	 */
   2188 	uatomic_store(&ht->resize_initiated, 1, CMM_RELAXED);
   2189 
   2190 	mutex_lock(&ht->resize_mutex);
   2191 	_do_cds_lfht_resize(ht);
   2192 	mutex_unlock(&ht->resize_mutex);
   2193 }
   2194 
   2195 static
   2196 void do_resize_cb(struct urcu_work *work)
   2197 {
   2198 	struct resize_work *resize_work =
   2199 		caa_container_of(work, struct resize_work, work);
   2200 	struct cds_lfht *ht = resize_work->ht;
   2201 
   2202 	ht->flavor->register_thread();
   2203 	mutex_lock(&ht->resize_mutex);
   2204 	_do_cds_lfht_resize(ht);
   2205 	mutex_unlock(&ht->resize_mutex);
   2206 	ht->flavor->unregister_thread();
   2207 	poison_free(ht->alloc, work);
   2208 }
   2209 
   2210 static
   2211 void __cds_lfht_resize_lazy_launch(struct cds_lfht *ht)
   2212 {
   2213 	struct resize_work *work;
   2214 
   2215 	/*
   2216 	 * Store to resize_target is before read resize_initiated as guaranteed
   2217 	 * by either cmpxchg or _uatomic_xchg_monotonic_increase.
   2218 	 */
   2219 	if (!uatomic_load(&ht->resize_initiated, CMM_RELAXED)) {
   2220 		if (uatomic_load(&ht->in_progress_destroy, CMM_RELAXED)) {
   2221 			return;
   2222 		}
   2223 		work = ht->alloc->malloc(ht->alloc->state, sizeof(*work));
   2224 		if (work == NULL) {
   2225 			dbg_printf("error allocating resize work, bailing out\n");
   2226 			return;
   2227 		}
   2228 		work->ht = ht;
   2229 		urcu_workqueue_queue_work(cds_lfht_workqueue,
   2230 			&work->work, do_resize_cb);
   2231 		uatomic_store(&ht->resize_initiated, 1, CMM_RELAXED);
   2232 	}
   2233 }
   2234 
   2235 static
   2236 void cds_lfht_resize_lazy_grow(struct cds_lfht *ht, unsigned long size, int growth)
   2237 {
   2238 	unsigned long target_size = size << growth;
   2239 
   2240 	target_size = min(target_size, ht->max_nr_buckets);
   2241 	if (resize_target_grow(ht, target_size) >= target_size)
   2242 		return;
   2243 
   2244 	__cds_lfht_resize_lazy_launch(ht);
   2245 }
   2246 
   2247 /*
   2248  * We favor grow operations over shrink. A shrink operation never occurs
   2249  * if a grow operation is queued for lazy execution. A grow operation
   2250  * cancels any pending shrink lazy execution.
   2251  */
   2252 static
   2253 void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size,
   2254 				unsigned long count)
   2255 {
   2256 	if (!(ht->flags & CDS_LFHT_AUTO_RESIZE))
   2257 		return;
   2258 	count = max(count, MIN_TABLE_SIZE);
   2259 	count = min(count, ht->max_nr_buckets);
   2260 	if (count == size)
   2261 		return;		/* Already the right size, no resize needed */
   2262 	if (count > size) {	/* lazy grow */
   2263 		if (resize_target_grow(ht, count) >= count)
   2264 			return;
   2265 	} else {		/* lazy shrink */
   2266 		for (;;) {
   2267 			unsigned long s;
   2268 
   2269 			s = uatomic_cmpxchg(&ht->resize_target, size, count);
   2270 			if (s == size)
   2271 				break;	/* no resize needed */
   2272 			if (s > size)
   2273 				return;	/* growing is/(was just) in progress */
   2274 			if (s <= count)
   2275 				return;	/* some other thread do shrink */
   2276 			size = s;
   2277 		}
   2278 	}
   2279 	__cds_lfht_resize_lazy_launch(ht);
   2280 }
   2281 
   2282 static void cds_lfht_before_fork(void *priv __attribute__((unused)))
   2283 {
   2284 	if (cds_lfht_workqueue_atfork_nesting++)
   2285 		return;
   2286 	mutex_lock(&cds_lfht_fork_mutex);
   2287 	if (!cds_lfht_workqueue)
   2288 		return;
   2289 	urcu_workqueue_pause_worker(cds_lfht_workqueue);
   2290 }
   2291 
   2292 static void cds_lfht_after_fork_parent(void *priv __attribute__((unused)))
   2293 {
   2294 	if (--cds_lfht_workqueue_atfork_nesting)
   2295 		return;
   2296 	if (!cds_lfht_workqueue)
   2297 		goto end;
   2298 	urcu_workqueue_resume_worker(cds_lfht_workqueue);
   2299 end:
   2300 	mutex_unlock(&cds_lfht_fork_mutex);
   2301 }
   2302 
   2303 static void cds_lfht_after_fork_child(void *priv __attribute__((unused)))
   2304 {
   2305 	if (--cds_lfht_workqueue_atfork_nesting)
   2306 		return;
   2307 	if (!cds_lfht_workqueue)
   2308 		goto end;
   2309 	urcu_workqueue_create_worker(cds_lfht_workqueue);
   2310 end:
   2311 	mutex_unlock(&cds_lfht_fork_mutex);
   2312 }
   2313 
   2314 static struct urcu_atfork cds_lfht_atfork = {
   2315 	.before_fork = cds_lfht_before_fork,
   2316 	.after_fork_parent = cds_lfht_after_fork_parent,
   2317 	.after_fork_child = cds_lfht_after_fork_child,
   2318 };
   2319 
   2320 static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor)
   2321 {
   2322 	flavor->register_rculfhash_atfork(&cds_lfht_atfork);
   2323 
   2324 	mutex_lock(&cds_lfht_fork_mutex);
   2325 	if (!cds_lfht_workqueue)
   2326 		cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL,
   2327 			NULL, NULL, NULL, NULL, NULL, NULL, NULL);
   2328 	mutex_unlock(&cds_lfht_fork_mutex);
   2329 }
   2330 
   2331 static void cds_lfht_exit(void)
   2332 {
   2333 	mutex_lock(&cds_lfht_fork_mutex);
   2334 	if (cds_lfht_workqueue) {
   2335 		urcu_workqueue_flush_queued_work(cds_lfht_workqueue);
   2336 		urcu_workqueue_destroy(cds_lfht_workqueue);
   2337 		cds_lfht_workqueue = NULL;
   2338 	}
   2339 	mutex_unlock(&cds_lfht_fork_mutex);
   2340 }
   2341