Lines Matching refs:au
98 async_unit *au = u->au;
99 LOCK (&au->lock);
101 au->thread = __gthread_self ();
104 /* Main loop. At this point, au->lock is always held. */
105 WAIT_SIGNAL_MUTEX (&au->work, au->tail != NULL, &au->lock);
106 LOCK (&au->lock);
107 ctq = au->head;
114 if (!au->error.has_error)
116 UNLOCK (&au->lock);
122 st_write_done_worker (au->pdt, false);
123 UNLOCK (&au->io_lock);
128 st_read_done_worker (au->pdt, false);
129 UNLOCK (&au->io_lock);
134 LOCK (&au->io_lock);
135 update_pdt (&au->pdt, ctq->new_pdt);
136 data_transfer_init_worker (au->pdt, ctq->read_flag);
141 ctq->arg.scalar.transfer (au->pdt, ctq->arg.scalar.arg_bt,
152 transfer_array_inner (au->pdt, ctq->arg.array.desc,
160 LOCK (&au->lock);
167 LOCK (&au->lock);
168 if (unlikely (au->error.has_error))
169 au->error.last_good_id = au->id.low - 1;
175 UNLOCK (&au->io_lock);
184 NOTE ("Next ctq, current id: %d", au->id.low);
185 if (ctq->has_id && au->id.waiting == au->id.low++)
186 SIGNAL (&au->id.done);
190 au->tail = NULL;
191 au->head = NULL;
192 au->empty = 1;
193 SIGNAL (&au->emptysignal);
196 au->tail = NULL;
197 au->head = NULL;
198 au->empty = 1;
199 SIGNAL (&au->emptysignal);
201 UNLOCK (&au->lock);
208 free_async_unit (async_unit *au)
210 if (au->tail)
213 destroy_adv_cond (&au->work);
214 destroy_adv_cond (&au->emptysignal);
215 destroy_adv_cond (&au->id.done);
216 T_ERROR (__gthread_mutex_destroy, &au->lock);
217 free (au);
230 nonzero on failure. It also sets u->au. */
235 async_unit *au;
238 u->au = NULL;
242 au = (async_unit *) xmalloc (sizeof (async_unit));
243 u->au = au;
244 init_adv_cond (&au->work);
245 init_adv_cond (&au->emptysignal);
246 __GTHREAD_MUTEX_INIT_FUNCTION (&au->lock);
247 __GTHREAD_MUTEX_INIT_FUNCTION (&au->io_lock);
248 LOCK (&au->lock);
249 T_ERROR (__gthread_create, &au->thread, &async_io, (void *) u);
250 au->pdt = NULL;
251 au->head = NULL;
252 au->tail = NULL;
253 au->empty = true;
254 au->id.waiting = -1;
255 au->id.low = 0;
256 au->id.high = 0;
257 au->error.fatal_error = 0;
258 au->error.has_error = 0;
259 au->error.last_good_id = 0;
260 init_adv_cond (&au->id.done);
261 UNLOCK (&au->lock);
267 enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
273 LOCK (&au->lock);
274 if (!au->tail)
275 au->head = tq;
277 au->tail->next = tq;
278 au->tail = tq;
279 REVOKE_SIGNAL (&(au->emptysignal));
280 au->empty = false;
281 SIGNAL (&au->work);
282 UNLOCK (&au->lock);
288 enqueue_done_id (async_unit *au, enum aio_do type)
295 LOCK (&au->lock);
296 if (!au->tail)
297 au->head = tq;
299 au->tail->next = tq;
300 au->tail = tq;
301 REVOKE_SIGNAL (&(au->emptysignal));
302 au->empty = false;
303 ret = au->id.high++;
305 SIGNAL (&au->work);
306 UNLOCK (&au->lock);
313 enqueue_done (async_unit *au, enum aio_do type)
318 LOCK (&au->lock);
319 if (!au->tail)
320 au->head = tq;
322 au->tail->next = tq;
323 au->tail = tq;
324 REVOKE_SIGNAL (&(au->emptysignal));
325 au->empty = false;
326 SIGNAL (&au->work);
327 UNLOCK (&au->lock);
333 enqueue_close (async_unit *au)
338 LOCK (&au->lock);
339 if (!au->tail)
340 au->head = tq;
342 au->tail->next = tq;
343 au->tail = tq;
344 REVOKE_SIGNAL (&(au->emptysignal));
345 au->empty = false;
346 SIGNAL (&au->work);
347 UNLOCK (&au->lock);
354 enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
368 LOCK (&au->lock);
370 if (!au->tail)
371 au->head = tq;
373 au->tail->next = tq;
374 au->tail = tq;
375 REVOKE_SIGNAL (&(au->emptysignal));
376 au->empty = false;
377 SIGNAL (&au->work);
378 UNLOCK (&au->lock);
385 collect_async_errors (st_parameter_common *cmp, async_unit *au)
387 bool has_error = au->error.has_error;
391 if (generate_error_common (cmp, au->error.family, au->error.message))
393 au->error.has_error = 0;
394 au->error.cmp = NULL;
399 au->error.fatal_error = true;
410 async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
414 if (au == NULL)
418 cmp = au->error.cmp;
420 if (au->error.has_error)
422 if (i <= au->error.last_good_id)
425 return collect_async_errors (cmp, au);
428 LOCK (&au->lock);
429 if (i > au->id.high)
432 UNLOCK (&au->lock);
437 if (au->id.waiting < i)
438 au->id.waiting = i;
439 SIGNAL (&(au->work));
440 WAIT_SIGNAL_MUTEX (&(au->id.done),
441 (au->id.low >= au->id.waiting || au->empty), &au->lock);
442 LOCK (&au->lock);
443 ret = collect_async_errors (cmp, au);
444 UNLOCK (&au->lock);
451 async_wait (st_parameter_common *cmp, async_unit *au)
455 if (au == NULL)
459 cmp = au->error.cmp;
461 LOCK (&(au->lock));
462 SIGNAL (&(au->work));
464 if (au->empty)
466 ret = collect_async_errors (cmp, au);
467 UNLOCK (&au->lock);
471 WAIT_SIGNAL_MUTEX (&(au->emptysignal), (au->empty), &au->lock);
472 ret = collect_async_errors (cmp, au);
479 async_close (async_unit *au)
481 if (au == NULL)
485 enqueue_close (au);
486 T_ERROR (__gthread_join, au->thread, NULL);
487 free_async_unit (au);
492 /* Only set u->au to NULL so no async I/O will happen. */
497 u->au = NULL;
504 enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
512 enqueue_done_id (async_unit *au, enum aio_do type)
520 enqueue_done (async_unit *au, enum aio_do type)
528 enqueue_close (async_unit *au)
536 enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
544 collect_async_errors (st_parameter_common *cmp, async_unit *au)
552 async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
560 async_wait (st_parameter_common *cmp, async_unit *au)
568 async_close (async_unit *au)