Home | History | Annotate | Line # | Download | only in unit
      1 #include "test/jemalloc_test.h"
      2 
      3 #include "jemalloc/internal/mpsc_queue.h"
      4 
      5 typedef struct elem_s elem_t;
      6 typedef ql_head(elem_t) elem_list_t;
      7 typedef mpsc_queue(elem_t) elem_mpsc_queue_t;
      8 struct elem_s {
      9 	int thread;
     10 	int idx;
     11 	ql_elm(elem_t) link;
     12 };
     13 
     14 /* Include both proto and gen to make sure they match up. */
     15 mpsc_queue_proto(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t,
     16     elem_list_t);
     17 mpsc_queue_gen(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t,
     18     elem_list_t, link);
     19 
     20 static void
     21 init_elems_simple(elem_t *elems, int nelems, int thread) {
     22 	for (int i = 0; i < nelems; i++) {
     23 		elems[i].thread = thread;
     24 		elems[i].idx = i;
     25 		ql_elm_new(&elems[i], link);
     26 	}
     27 }
     28 
     29 static void
     30 check_elems_simple(elem_list_t *list, int nelems, int thread) {
     31 	elem_t *elem;
     32 	int next_idx = 0;
     33 	ql_foreach(elem, list, link) {
     34 		expect_d_lt(next_idx, nelems, "Too many list items");
     35 		expect_d_eq(thread, elem->thread, "");
     36 		expect_d_eq(next_idx, elem->idx, "List out of order");
     37 		next_idx++;
     38 	}
     39 }
     40 
     41 TEST_BEGIN(test_simple) {
     42 	enum {NELEMS = 10};
     43 	elem_t elems[NELEMS];
     44 	elem_list_t list;
     45 	elem_mpsc_queue_t queue;
     46 
     47 	/* Pop empty queue onto empty list -> empty list */
     48 	ql_new(&list);
     49 	elem_mpsc_queue_new(&queue);
     50 	elem_mpsc_queue_pop_batch(&queue, &list);
     51 	expect_true(ql_empty(&list), "");
     52 
     53 	/* Pop empty queue onto nonempty list -> list unchanged */
     54 	ql_new(&list);
     55 	elem_mpsc_queue_new(&queue);
     56 	init_elems_simple(elems, NELEMS, 0);
     57 	for (int i = 0; i < NELEMS; i++) {
     58 		ql_tail_insert(&list, &elems[i], link);
     59 	}
     60 	elem_mpsc_queue_pop_batch(&queue, &list);
     61 	check_elems_simple(&list, NELEMS, 0);
     62 
     63 	/* Pop nonempty queue onto empty list -> list takes queue contents */
     64 	ql_new(&list);
     65 	elem_mpsc_queue_new(&queue);
     66 	init_elems_simple(elems, NELEMS, 0);
     67 	for (int i = 0; i < NELEMS; i++) {
     68 		elem_mpsc_queue_push(&queue, &elems[i]);
     69 	}
     70 	elem_mpsc_queue_pop_batch(&queue, &list);
     71 	check_elems_simple(&list, NELEMS, 0);
     72 
     73 	/* Pop nonempty queue onto nonempty list -> list gains queue contents */
     74 	ql_new(&list);
     75 	elem_mpsc_queue_new(&queue);
     76 	init_elems_simple(elems, NELEMS, 0);
     77 	for (int i = 0; i < NELEMS / 2; i++) {
     78 		ql_tail_insert(&list, &elems[i], link);
     79 	}
     80 	for (int i = NELEMS / 2; i < NELEMS; i++) {
     81 		elem_mpsc_queue_push(&queue, &elems[i]);
     82 	}
     83 	elem_mpsc_queue_pop_batch(&queue, &list);
     84 	check_elems_simple(&list, NELEMS, 0);
     85 
     86 }
     87 TEST_END
     88 
     89 TEST_BEGIN(test_push_single_or_batch) {
     90 	enum {
     91 		BATCH_MAX = 10,
     92 		/*
     93 		 * We'll push i items one-at-a-time, then i items as a batch,
     94 		 * then i items as a batch again, as i ranges from 1 to
     95 		 * BATCH_MAX.  So we need 3 times the sum of the numbers from 1
     96 		 * to BATCH_MAX elements total.
     97 		 */
     98 		NELEMS = 3 * BATCH_MAX * (BATCH_MAX - 1) / 2
     99 	};
    100 	elem_t elems[NELEMS];
    101 	init_elems_simple(elems, NELEMS, 0);
    102 	elem_list_t list;
    103 	ql_new(&list);
    104 	elem_mpsc_queue_t queue;
    105 	elem_mpsc_queue_new(&queue);
    106 	int next_idx = 0;
    107 	for (int i = 1; i < 10; i++) {
    108 		/* Push i items 1 at a time. */
    109 		for (int j = 0; j < i; j++) {
    110 			elem_mpsc_queue_push(&queue, &elems[next_idx]);
    111 			next_idx++;
    112 		}
    113 		/* Push i items in batch. */
    114 		for (int j = 0; j < i; j++) {
    115 			ql_tail_insert(&list, &elems[next_idx], link);
    116 			next_idx++;
    117 		}
    118 		elem_mpsc_queue_push_batch(&queue, &list);
    119 		expect_true(ql_empty(&list), "Batch push should empty source");
    120 		/*
    121 		 * Push i items in batch, again.  This tests two batches
    122 		 * proceeding one after the other.
    123 		 */
    124 		for (int j = 0; j < i; j++) {
    125 			ql_tail_insert(&list, &elems[next_idx], link);
    126 			next_idx++;
    127 		}
    128 		elem_mpsc_queue_push_batch(&queue, &list);
    129 		expect_true(ql_empty(&list), "Batch push should empty source");
    130 	}
    131 	expect_d_eq(NELEMS, next_idx, "Miscomputed number of elems to push.");
    132 
    133 	expect_true(ql_empty(&list), "");
    134 	elem_mpsc_queue_pop_batch(&queue, &list);
    135 	check_elems_simple(&list, NELEMS, 0);
    136 }
    137 TEST_END
    138 
    139 TEST_BEGIN(test_multi_op) {
    140 	enum {NELEMS = 20};
    141 	elem_t elems[NELEMS];
    142 	init_elems_simple(elems, NELEMS, 0);
    143 	elem_list_t push_list;
    144 	ql_new(&push_list);
    145 	elem_list_t result_list;
    146 	ql_new(&result_list);
    147 	elem_mpsc_queue_t queue;
    148 	elem_mpsc_queue_new(&queue);
    149 
    150 	int next_idx = 0;
    151 	/* Push first quarter 1-at-a-time. */
    152 	for (int i = 0; i < NELEMS / 4; i++) {
    153 		elem_mpsc_queue_push(&queue, &elems[next_idx]);
    154 		next_idx++;
    155 	}
    156 	/* Push second quarter in batch. */
    157 	for (int i = NELEMS / 4; i < NELEMS / 2; i++) {
    158 		ql_tail_insert(&push_list, &elems[next_idx], link);
    159 		next_idx++;
    160 	}
    161 	elem_mpsc_queue_push_batch(&queue, &push_list);
    162 	/* Batch pop all pushed elements. */
    163 	elem_mpsc_queue_pop_batch(&queue, &result_list);
    164 	/* Push third quarter in batch. */
    165 	for (int i = NELEMS / 2; i < 3 * NELEMS / 4; i++) {
    166 		ql_tail_insert(&push_list, &elems[next_idx], link);
    167 		next_idx++;
    168 	}
    169 	elem_mpsc_queue_push_batch(&queue, &push_list);
    170 	/* Push last quarter one-at-a-time. */
    171 	for (int i = 3 * NELEMS / 4; i < NELEMS; i++) {
    172 		elem_mpsc_queue_push(&queue, &elems[next_idx]);
    173 		next_idx++;
    174 	}
    175 	/* Pop them again.  Order of existing list should be preserved. */
    176 	elem_mpsc_queue_pop_batch(&queue, &result_list);
    177 
    178 	check_elems_simple(&result_list, NELEMS, 0);
    179 
    180 }
    181 TEST_END
    182 
    183 typedef struct pusher_arg_s pusher_arg_t;
    184 struct pusher_arg_s {
    185 	elem_mpsc_queue_t *queue;
    186 	int thread;
    187 	elem_t *elems;
    188 	int nelems;
    189 };
    190 
    191 typedef struct popper_arg_s popper_arg_t;
    192 struct popper_arg_s {
    193 	elem_mpsc_queue_t *queue;
    194 	int npushers;
    195 	int nelems_per_pusher;
    196 	int *pusher_counts;
    197 };
    198 
    199 static void *
    200 thd_pusher(void *void_arg) {
    201 	pusher_arg_t *arg = (pusher_arg_t *)void_arg;
    202 	int next_idx = 0;
    203 	while (next_idx < arg->nelems) {
    204 		/* Push 10 items in batch. */
    205 		elem_list_t list;
    206 		ql_new(&list);
    207 		int limit = next_idx + 10;
    208 		while (next_idx < arg->nelems && next_idx < limit) {
    209 			ql_tail_insert(&list, &arg->elems[next_idx], link);
    210 			next_idx++;
    211 		}
    212 		elem_mpsc_queue_push_batch(arg->queue, &list);
    213 		/* Push 10 items one-at-a-time. */
    214 		limit = next_idx + 10;
    215 		while (next_idx < arg->nelems && next_idx < limit) {
    216 			elem_mpsc_queue_push(arg->queue, &arg->elems[next_idx]);
    217 			next_idx++;
    218 		}
    219 
    220 	}
    221 	return NULL;
    222 }
    223 
    224 static void *
    225 thd_popper(void *void_arg) {
    226 	popper_arg_t *arg = (popper_arg_t *)void_arg;
    227 	int done_pushers = 0;
    228 	while (done_pushers < arg->npushers) {
    229 		elem_list_t list;
    230 		ql_new(&list);
    231 		elem_mpsc_queue_pop_batch(arg->queue, &list);
    232 		elem_t *elem;
    233 		ql_foreach(elem, &list, link) {
    234 			int thread = elem->thread;
    235 			int idx = elem->idx;
    236 			expect_d_eq(arg->pusher_counts[thread], idx,
    237 			    "Thread's pushes reordered");
    238 			arg->pusher_counts[thread]++;
    239 			if (arg->pusher_counts[thread]
    240 			    == arg->nelems_per_pusher) {
    241 				done_pushers++;
    242 			}
    243 		}
    244 	}
    245 	return NULL;
    246 }
    247 
    248 TEST_BEGIN(test_multiple_threads) {
    249 	enum {
    250 		NPUSHERS = 4,
    251 		NELEMS_PER_PUSHER = 1000*1000,
    252 	};
    253 	thd_t pushers[NPUSHERS];
    254 	pusher_arg_t pusher_arg[NPUSHERS];
    255 
    256 	thd_t popper;
    257 	popper_arg_t popper_arg;
    258 
    259 	elem_mpsc_queue_t queue;
    260 	elem_mpsc_queue_new(&queue);
    261 
    262 	elem_t *elems = calloc(NPUSHERS * NELEMS_PER_PUSHER, sizeof(elem_t));
    263 	elem_t *elem_iter = elems;
    264 	for (int i = 0; i < NPUSHERS; i++) {
    265 		pusher_arg[i].queue = &queue;
    266 		pusher_arg[i].thread = i;
    267 		pusher_arg[i].elems = elem_iter;
    268 		pusher_arg[i].nelems = NELEMS_PER_PUSHER;
    269 
    270 		init_elems_simple(elem_iter, NELEMS_PER_PUSHER, i);
    271 		elem_iter += NELEMS_PER_PUSHER;
    272 	}
    273 	popper_arg.queue = &queue;
    274 	popper_arg.npushers = NPUSHERS;
    275 	popper_arg.nelems_per_pusher = NELEMS_PER_PUSHER;
    276 	int pusher_counts[NPUSHERS] = {0};
    277 	popper_arg.pusher_counts = pusher_counts;
    278 
    279 	thd_create(&popper, thd_popper, (void *)&popper_arg);
    280 	for (int i = 0; i < NPUSHERS; i++) {
    281 		thd_create(&pushers[i], thd_pusher, &pusher_arg[i]);
    282 	}
    283 
    284 	thd_join(popper, NULL);
    285 	for (int i = 0; i < NPUSHERS; i++) {
    286 		thd_join(pushers[i], NULL);
    287 	}
    288 
    289 	for (int i = 0; i < NPUSHERS; i++) {
    290 		expect_d_eq(NELEMS_PER_PUSHER, pusher_counts[i], "");
    291 	}
    292 
    293 	free(elems);
    294 }
    295 TEST_END
    296 
    297 int
    298 main(void) {
    299 	return test_no_reentrancy(
    300 	    test_simple,
    301 	    test_push_single_or_batch,
    302 	    test_multi_op,
    303 	    test_multiple_threads);
    304 }
    305