1 // SPDX-FileCopyrightText: 2009 Mathieu Desnoyers <mathieu.desnoyers (at) efficios.com> 2 // 3 // SPDX-License-Identifier: GPL-2.0-or-later 4 5 /* 6 * Userspace RCU library - test program (with baatch reclamation) 7 */ 8 9 #include <stdio.h> 10 #include <pthread.h> 11 #include <stdlib.h> 12 #include <string.h> 13 #include <sys/types.h> 14 #include <sys/wait.h> 15 #include <unistd.h> 16 #include <stdio.h> 17 #include <errno.h> 18 19 #include <urcu/arch.h> 20 #include <urcu/assert.h> 21 #include <urcu/tls-compat.h> 22 #include <urcu/uatomic.h> 23 #include "thread-id.h" 24 #include "../common/debug-yield.h" 25 26 /* hardcoded number of CPUs */ 27 #define NR_CPUS 16384 28 29 #define _LGPL_SOURCE 30 #include <urcu-qsbr.h> 31 32 struct test_array { 33 int a; 34 }; 35 36 static unsigned long wdelay; 37 38 static struct test_array *test_rcu_pointer; 39 40 static unsigned long duration; 41 42 /* read-side C.S. duration, in loops */ 43 static unsigned long rduration; 44 static long reclaim_batch = 1; 45 46 struct reclaim_queue { 47 void **queue; /* Beginning of queue */ 48 void **head; /* Insert position */ 49 }; 50 51 static struct reclaim_queue *pending_reclaims; 52 53 54 /* write-side C.S. duration, in loops */ 55 static unsigned long wduration; 56 57 static inline void loop_sleep(unsigned long loops) 58 { 59 while (loops-- != 0) 60 caa_cpu_relax(); 61 } 62 63 static int verbose_mode; 64 65 #define printf_verbose(fmt, args...) \ 66 do { \ 67 if (verbose_mode) \ 68 printf(fmt, args); \ 69 } while (0) 70 71 static unsigned int cpu_affinities[NR_CPUS]; 72 static unsigned int next_aff = 0; 73 static int use_affinity = 0; 74 75 pthread_mutex_t affinity_mutex = PTHREAD_MUTEX_INITIALIZER; 76 77 static void set_affinity(void) 78 { 79 #ifdef HAVE_SCHED_SETAFFINITY 80 cpu_set_t mask; 81 int cpu, ret; 82 #endif /* HAVE_SCHED_SETAFFINITY */ 83 84 if (!use_affinity) 85 return; 86 87 #ifdef HAVE_SCHED_SETAFFINITY 88 ret = pthread_mutex_lock(&affinity_mutex); 89 if (ret) { 90 perror("Error in pthread mutex lock"); 91 exit(-1); 92 } 93 cpu = cpu_affinities[next_aff++]; 94 ret = pthread_mutex_unlock(&affinity_mutex); 95 if (ret) { 96 perror("Error in pthread mutex unlock"); 97 exit(-1); 98 } 99 100 CPU_ZERO(&mask); 101 CPU_SET(cpu, &mask); 102 sched_setaffinity(0, sizeof(mask), &mask); 103 #endif /* HAVE_SCHED_SETAFFINITY */ 104 } 105 106 static DEFINE_URCU_TLS(unsigned long long, nr_writes); 107 static DEFINE_URCU_TLS(unsigned long long, nr_reads); 108 109 static unsigned int nr_readers; 110 static unsigned int nr_writers; 111 112 pthread_mutex_t rcu_copy_mutex = PTHREAD_MUTEX_INITIALIZER; 113 static 114 unsigned long long __attribute__((aligned(CAA_CACHE_LINE_SIZE))) *tot_nr_writes; 115 116 static 117 void *thr_reader(void *_count) 118 { 119 unsigned long long *count = _count; 120 struct test_array *local_ptr; 121 122 printf_verbose("thread_begin %s, tid %lu\n", 123 "reader", urcu_get_thread_id()); 124 125 set_affinity(); 126 127 rcu_register_thread(); 128 129 wait_until_go(); 130 131 for (;;) { 132 _rcu_read_lock(); 133 local_ptr = _rcu_dereference(test_rcu_pointer); 134 rcu_debug_yield_read(); 135 if (local_ptr) 136 urcu_posix_assert(local_ptr->a == 8); 137 if (caa_unlikely(rduration)) 138 loop_sleep(rduration); 139 _rcu_read_unlock(); 140 URCU_TLS(nr_reads)++; 141 /* QS each 1024 reads */ 142 if (caa_unlikely((URCU_TLS(nr_reads) & ((1 << 10) - 1)) == 0)) 143 _rcu_quiescent_state(); 144 if (caa_unlikely(!test_duration_read())) 145 break; 146 } 147 148 rcu_unregister_thread(); 149 150 *count = URCU_TLS(nr_reads); 151 printf_verbose("thread_end %s, tid %lu\n", 152 "reader", urcu_get_thread_id()); 153 return ((void*)1); 154 155 } 156 157 static void rcu_gc_clear_queue(unsigned long wtidx) 158 { 159 void **p; 160 161 /* Wait for Q.S and empty queue */ 162 synchronize_rcu(); 163 164 for (p = pending_reclaims[wtidx].queue; 165 p < pending_reclaims[wtidx].head; p++) { 166 /* poison */ 167 if (*p) 168 ((struct test_array *)*p)->a = 0; 169 free(*p); 170 } 171 pending_reclaims[wtidx].head = pending_reclaims[wtidx].queue; 172 } 173 174 /* Using per-thread queue */ 175 static void rcu_gc_reclaim(unsigned long wtidx, void *old) 176 { 177 /* Queue pointer */ 178 *pending_reclaims[wtidx].head = old; 179 pending_reclaims[wtidx].head++; 180 181 if (caa_likely(pending_reclaims[wtidx].head - pending_reclaims[wtidx].queue 182 < reclaim_batch)) 183 return; 184 185 rcu_gc_clear_queue(wtidx); 186 } 187 188 static 189 void *thr_writer(void *data) 190 { 191 unsigned long wtidx = (unsigned long)data; 192 #ifdef TEST_LOCAL_GC 193 struct test_array *old = NULL; 194 #else 195 struct test_array *new, *old; 196 #endif 197 198 printf_verbose("thread_begin %s, tid %lu\n", 199 "writer", urcu_get_thread_id()); 200 201 set_affinity(); 202 203 wait_until_go(); 204 205 for (;;) { 206 #ifndef TEST_LOCAL_GC 207 new = malloc(sizeof(*new)); 208 new->a = 8; 209 old = _rcu_xchg_pointer(&test_rcu_pointer, new); 210 #endif 211 if (caa_unlikely(wduration)) 212 loop_sleep(wduration); 213 rcu_gc_reclaim(wtidx, old); 214 URCU_TLS(nr_writes)++; 215 if (caa_unlikely(!test_duration_write())) 216 break; 217 if (caa_unlikely(wdelay)) 218 loop_sleep(wdelay); 219 } 220 221 printf_verbose("thread_end %s, tid %lu\n", 222 "writer", urcu_get_thread_id()); 223 tot_nr_writes[wtidx] = URCU_TLS(nr_writes); 224 return ((void*)2); 225 } 226 227 static 228 void show_usage(char **argv) 229 { 230 printf("Usage : %s nr_readers nr_writers duration (s) <OPTIONS>\n", 231 argv[0]); 232 printf("OPTIONS:\n"); 233 printf(" [-r] [-w] (yield reader and/or writer)\n"); 234 printf(" [-b batch] (batch reclaim)\n"); 235 printf(" [-d delay] (writer period (us))\n"); 236 printf(" [-c duration] (reader C.S. duration (in loops))\n"); 237 printf(" [-e duration] (writer C.S. duration (in loops))\n"); 238 printf(" [-v] (verbose output)\n"); 239 printf(" [-a cpu#] [-a cpu#]... (affinity)\n"); 240 printf("\n"); 241 } 242 243 int main(int argc, char **argv) 244 { 245 int err; 246 pthread_t *tid_reader, *tid_writer; 247 void *tret; 248 unsigned long long *count_reader; 249 unsigned long long tot_reads = 0, tot_writes = 0; 250 int i, a; 251 unsigned int i_thr; 252 253 if (argc < 4) { 254 show_usage(argv); 255 return -1; 256 } 257 258 err = sscanf(argv[1], "%u", &nr_readers); 259 if (err != 1) { 260 show_usage(argv); 261 return -1; 262 } 263 264 err = sscanf(argv[2], "%u", &nr_writers); 265 if (err != 1) { 266 show_usage(argv); 267 return -1; 268 } 269 270 err = sscanf(argv[3], "%lu", &duration); 271 if (err != 1) { 272 show_usage(argv); 273 return -1; 274 } 275 276 for (i = 4; i < argc; i++) { 277 if (argv[i][0] != '-') 278 continue; 279 switch (argv[i][1]) { 280 case 'r': 281 rcu_debug_yield_enable(RCU_YIELD_READ); 282 break; 283 case 'w': 284 rcu_debug_yield_enable(RCU_YIELD_WRITE); 285 break; 286 case 'a': 287 if (argc < i + 2) { 288 show_usage(argv); 289 return -1; 290 } 291 a = atoi(argv[++i]); 292 cpu_affinities[next_aff++] = a; 293 use_affinity = 1; 294 printf_verbose("Adding CPU %d affinity\n", a); 295 break; 296 case 'b': 297 if (argc < i + 2) { 298 show_usage(argv); 299 return -1; 300 } 301 reclaim_batch = atol(argv[++i]); 302 break; 303 case 'c': 304 if (argc < i + 2) { 305 show_usage(argv); 306 return -1; 307 } 308 rduration = atol(argv[++i]); 309 break; 310 case 'd': 311 if (argc < i + 2) { 312 show_usage(argv); 313 return -1; 314 } 315 wdelay = atol(argv[++i]); 316 break; 317 case 'e': 318 if (argc < i + 2) { 319 show_usage(argv); 320 return -1; 321 } 322 wduration = atol(argv[++i]); 323 break; 324 case 'v': 325 verbose_mode = 1; 326 break; 327 } 328 } 329 330 printf_verbose("running test for %lu seconds, %u readers, %u writers.\n", 331 duration, nr_readers, nr_writers); 332 printf_verbose("Writer delay : %lu loops.\n", wdelay); 333 printf_verbose("Reader duration : %lu loops.\n", rduration); 334 printf_verbose("thread %-6s, tid %lu\n", 335 "main", urcu_get_thread_id()); 336 337 tid_reader = calloc(nr_readers, sizeof(*tid_reader)); 338 tid_writer = calloc(nr_writers, sizeof(*tid_writer)); 339 count_reader = calloc(nr_readers, sizeof(*count_reader)); 340 tot_nr_writes = calloc(nr_writers, sizeof(*tot_nr_writes)); 341 pending_reclaims = calloc(nr_writers, sizeof(*pending_reclaims)); 342 if (reclaim_batch * sizeof(*pending_reclaims[0].queue) 343 < CAA_CACHE_LINE_SIZE) 344 for (i_thr = 0; i_thr < nr_writers; i_thr++) 345 pending_reclaims[i_thr].queue = calloc(1, CAA_CACHE_LINE_SIZE); 346 else 347 for (i_thr = 0; i_thr < nr_writers; i_thr++) 348 pending_reclaims[i_thr].queue = calloc(reclaim_batch, 349 sizeof(*pending_reclaims[i_thr].queue)); 350 for (i_thr = 0; i_thr < nr_writers; i_thr++) 351 pending_reclaims[i_thr].head = pending_reclaims[i_thr].queue; 352 353 next_aff = 0; 354 355 for (i_thr = 0; i_thr < nr_readers; i_thr++) { 356 err = pthread_create(&tid_reader[i_thr], NULL, thr_reader, 357 &count_reader[i_thr]); 358 if (err != 0) 359 exit(1); 360 } 361 for (i_thr = 0; i_thr < nr_writers; i_thr++) { 362 err = pthread_create(&tid_writer[i_thr], NULL, thr_writer, 363 (void *)(long)i_thr); 364 if (err != 0) 365 exit(1); 366 } 367 368 test_for(duration); 369 370 for (i_thr = 0; i_thr < nr_readers; i_thr++) { 371 err = pthread_join(tid_reader[i_thr], &tret); 372 if (err != 0) 373 exit(1); 374 tot_reads += count_reader[i_thr]; 375 } 376 for (i_thr = 0; i_thr < nr_writers; i_thr++) { 377 err = pthread_join(tid_writer[i_thr], &tret); 378 if (err != 0) 379 exit(1); 380 tot_writes += tot_nr_writes[i_thr]; 381 rcu_gc_clear_queue(i_thr); 382 } 383 384 printf_verbose("total number of reads : %llu, writes %llu\n", tot_reads, 385 tot_writes); 386 printf("SUMMARY %-25s testdur %4lu nr_readers %3u rdur %6lu wdur %6lu " 387 "nr_writers %3u " 388 "wdelay %6lu nr_reads %12llu nr_writes %12llu nr_ops %12llu " 389 "batch %ld\n", 390 argv[0], duration, nr_readers, rduration, wduration, 391 nr_writers, wdelay, tot_reads, tot_writes, 392 tot_reads + tot_writes, reclaim_batch); 393 394 free(tid_reader); 395 free(tid_writer); 396 free(count_reader); 397 free(tot_nr_writes); 398 399 for (i_thr = 0; i_thr < nr_writers; i_thr++) 400 free(pending_reclaims[i_thr].queue); 401 free(pending_reclaims); 402 403 return 0; 404 } 405