Home | History | Annotate | Line # | Download | only in io
async.h revision 1.1.1.1
      1 /* Copyright (C) 2018-2019 Free Software Foundation, Inc.
      2    Contributed by Nicolas Koenig
      3 
      4    This file is part of the GNU Fortran runtime library (libgfortran).
      5 
      6    Libgfortran is free software; you can redistribute it and/or modify
      7    it under the terms of the GNU General Public License as published by
      8    the Free Software Foundation; either version 3, or (at your option)
      9    any later version.
     10 
     11    Libgfortran is distributed in the hope that it will be useful,
     12    but WITHOUT ANY WARRANTY; without even the implied warranty of
     13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     14    GNU General Public License for more details.
     15 
     16    Under Section 7 of GPL version 3, you are granted additional
     17    permissions described in the GCC Runtime Library Exception, version
     18    3.1, as published by the Free Software Foundation.
     19 
     20    You should have received a copy of the GNU General Public License and
     21    a copy of the GCC Runtime Library Exception along with this program;
     22    see the files COPYING3 and COPYING.RUNTIME respectively.  If not, see
     23    <http://www.gnu.org/licenses/>.  */
     24 
     25 #ifndef ASYNC_H
     26 #define ASYNC_H
     27 
     28 /* Async I/O will not work on targets which do not support
     29    __gthread_cond_t and __gthread_equal / __gthread_self.  Check
     30    this.  */
     31 
     32 #if defined(__GTHREAD_HAS_COND) && defined(__GTHREADS_CXX0X)
     33 #define ASYNC_IO 1
     34 #else
     35 #define ASYNC_IO 0
     36 #endif
     37 
     38 /* Defining DEBUG_ASYNC will enable somewhat verbose debugging
     39    output for async I/O.  */
     40 
     41 #define DEBUG_ASYNC
     42 #undef DEBUG_ASYNC
     43 
     44 #ifdef DEBUG_ASYNC
     45 
     46 /* Define this if you want to use ANSI color escape sequences in your
     47    debugging output.  */
     48 
     49 #define DEBUG_COLOR
     50 
     51 #ifdef DEBUG_COLOR
     52 #define MPREFIX "\033[30;46mM:\033[0m "
     53 #define TPREFIX "\033[37;44mT:\033[0m "
     54 #define RPREFIX "\033[37;41mR:\033[0m "
     55 #define DEBUG_RED "\033[31m"
     56 #define DEBUG_ORANGE "\033[33m"
     57 #define DEBUG_GREEN "\033[32m"
     58 #define DEBUG_DARKRED "\033[31;2m"
     59 #define DEBUG_PURPLE "\033[35m"
     60 #define DEBUG_NORM "\033[0m"
     61 #define DEBUG_REVERSE_RED "\033[41;37m"
     62 #define DEBUG_BLUE "\033[34m"
     63 
     64 #else
     65 
     66 #define MPREFIX "M: "
     67 #define TPREFIX "T: "
     68 #define RPREFIX ""
     69 #define DEBUG_RED ""
     70 #define DEBUG_ORANGE ""
     71 #define DEBUG_GREEN ""
     72 #define DEBUG_DARKRED ""
     73 #define DEBUG_PURPLE ""
     74 #define DEBUG_NORM ""
     75 #define DEBUG_REVERSE_RED ""
     76 #define DEBUG_BLUE ""
     77 
     78 #endif
     79 
     80 #define DEBUG_PRINTF(...) fprintf (stderr,__VA_ARGS__)
     81 
     82 #define IN_DEBUG_QUEUE(mutex) ({		\
     83       __label__ end;				\
     84       aio_lock_debug *curr = aio_debug_head;	\
     85       while (curr) {				\
     86 	if (curr->m == mutex) {			\
     87 	  goto end;				\
     88 	}					\
     89 	curr = curr->next;			\
     90       }						\
     91     end:;					\
     92       curr;					\
     93     })
     94 
     95 #define TAIL_DEBUG_QUEUE ({			\
     96       aio_lock_debug *curr = aio_debug_head;	\
     97       while (curr && curr->next) {		\
     98 	curr = curr->next;			\
     99       }						\
    100       curr;					\
    101     })
    102 
    103 #define CHECK_LOCK(mutex, status) do {					\
    104     aio_lock_debug *curr;						\
    105     INTERN_LOCK (&debug_queue_lock);					\
    106     if (__gthread_mutex_trylock (mutex)) {				\
    107       if ((curr = IN_DEBUG_QUEUE (mutex))) {				\
    108 	sprintf (status, DEBUG_RED "%s():%d" DEBUG_NORM, curr->func, curr->line); \
    109       } else								\
    110 	sprintf (status, DEBUG_RED "unknown" DEBUG_NORM);			\
    111     }									\
    112     else {								\
    113       __gthread_mutex_unlock (mutex);					\
    114       sprintf (status, DEBUG_GREEN "unlocked" DEBUG_NORM);			\
    115     }									\
    116     INTERN_UNLOCK (&debug_queue_lock);					\
    117   }while (0)
    118 
    119 #define T_ERROR(func, ...) do {				\
    120     int t_error_temp;					\
    121     t_error_temp = func(__VA_ARGS__);			\
    122     if (t_error_temp)					\
    123       ERROR (t_error_temp, "args: " #__VA_ARGS__ "\n");	\
    124   } while (0)
    125 
    126 #define NOTE(str, ...) do{						\
    127     char note_str[200];							\
    128     sprintf (note_str, "%s" DEBUG_PURPLE "NOTE: " DEBUG_NORM str, aio_prefix, ##__VA_ARGS__); \
    129     DEBUG_PRINTF ("%-90s %20s():%-5d\n", note_str, __FUNCTION__, __LINE__); \
    130   }while (0);
    131 
    132 #define ERROR(errnum, str, ...) do{					\
    133     char note_str[200];							\
    134     sprintf (note_str, "%s" DEBUG_REVERSE_RED "ERROR:" DEBUG_NORM " [%d] " str, aio_prefix, \
    135 	    errnum, ##__VA_ARGS__);					\
    136     DEBUG_PRINTF ("%-68s %s():%-5d\n", note_str, __FUNCTION__, __LINE__);	\
    137   }while (0)
    138 
    139 #define MUTEX_DEBUG_ADD(mutex) do {		\
    140     aio_lock_debug *n;				\
    141     n = malloc (sizeof(aio_lock_debug));	\
    142     n->prev = TAIL_DEBUG_QUEUE;			\
    143     if (n->prev)				\
    144       n->prev->next = n;			\
    145     n->next = NULL;				\
    146     n->line = __LINE__;				\
    147     n->func = __FUNCTION__;			\
    148     n->m = mutex;				\
    149     if (!aio_debug_head) {			\
    150       aio_debug_head = n;			\
    151     }						\
    152   } while (0)
    153 
    154 #define UNLOCK(mutex) do {						\
    155     aio_lock_debug *curr;						\
    156     DEBUG_PRINTF ("%s%-75s %20s():%-5d %18p\n", aio_prefix, DEBUG_GREEN "UNLOCK: " DEBUG_NORM #mutex, \
    157 		 __FUNCTION__, __LINE__, (void *) mutex);		\
    158     INTERN_LOCK (&debug_queue_lock);					\
    159     curr = IN_DEBUG_QUEUE (mutex);					\
    160     if (curr)								\
    161       {									\
    162 	if (curr->prev)							\
    163 	  curr->prev->next = curr->next;				\
    164 	if (curr->next) {						\
    165 	  curr->next->prev = curr->prev;				\
    166 	  if (curr == aio_debug_head)					\
    167 	    aio_debug_head = curr->next;				\
    168 	} else {							\
    169 	  if (curr == aio_debug_head)					\
    170 	    aio_debug_head = NULL;					\
    171 	}								\
    172 	free (curr);							\
    173       }									\
    174     INTERN_UNLOCK (&debug_queue_lock);					\
    175     INTERN_UNLOCK (mutex);						\
    176   }while (0)
    177 
    178 #define TRYLOCK(mutex) ({						\
    179 			 char status[200];				\
    180 			 int res;					\
    181 			 aio_lock_debug *curr;				\
    182 			 res = __gthread_mutex_trylock (mutex);		\
    183 			 INTERN_LOCK (&debug_queue_lock);		\
    184 			 if (res) {					\
    185 			   if ((curr = IN_DEBUG_QUEUE (mutex))) {	\
    186 			     sprintf (status, DEBUG_RED "%s():%d" DEBUG_NORM, curr->func, curr->line);	\
    187 			   } else					\
    188 			     sprintf (status, DEBUG_RED "unknown" DEBUG_NORM);	\
    189 			 }						\
    190 			 else {						\
    191 			   sprintf (status, DEBUG_GREEN "unlocked" DEBUG_NORM);	\
    192 			   MUTEX_DEBUG_ADD (mutex);			\
    193 			 }						\
    194 			 DEBUG_PRINTF ("%s%-44s prev: %-35s %20s():%-5d %18p\n", aio_prefix, \
    195 				      DEBUG_DARKRED "TRYLOCK: " DEBUG_NORM #mutex, status, __FUNCTION__, __LINE__, \
    196 				      (void *) mutex);			\
    197 			 INTERN_UNLOCK (&debug_queue_lock);		\
    198 			 res;						\
    199     })
    200 
    201 #define LOCK(mutex) do {						\
    202     char status[200];							\
    203     CHECK_LOCK (mutex, status);						\
    204     DEBUG_PRINTF ("%s%-42s prev: %-35s %20s():%-5d %18p\n", aio_prefix,	\
    205 		 DEBUG_RED "LOCK: " DEBUG_NORM #mutex, status, __FUNCTION__, __LINE__, (void *) mutex); \
    206     INTERN_LOCK (mutex);							\
    207     INTERN_LOCK (&debug_queue_lock);					\
    208     MUTEX_DEBUG_ADD (mutex);						\
    209     INTERN_UNLOCK (&debug_queue_lock);					\
    210     DEBUG_PRINTF ("%s" DEBUG_RED "ACQ:" DEBUG_NORM " %-30s %78p\n", aio_prefix, #mutex, mutex); \
    211   } while (0)
    212 
    213 #define DEBUG_LINE(...) __VA_ARGS__
    214 
    215 #else
    216 #define DEBUG_PRINTF(...) {}
    217 #define CHECK_LOCK(au, mutex, status) {}
    218 #define NOTE(str, ...) {}
    219 #define DEBUG_LINE(...)
    220 #define T_ERROR(func, ...) func(__VA_ARGS__)
    221 #define LOCK(mutex) INTERN_LOCK (mutex)
    222 #define UNLOCK(mutex) INTERN_UNLOCK (mutex)
    223 #define TRYLOCK(mutex) (__gthread_mutex_trylock (mutex))
    224 #endif
    225 
    226 #define INTERN_LOCK(mutex) T_ERROR (__gthread_mutex_lock, mutex);
    227 
    228 #define INTERN_UNLOCK(mutex) T_ERROR (__gthread_mutex_unlock, mutex);
    229 
    230 #if ASYNC_IO
    231 
    232 /* au->lock has to be held when calling this macro.  */
    233 
    234 #define SIGNAL(advcond) do{						\
    235     (advcond)->pending = 1;						\
    236     DEBUG_PRINTF ("%s%-75s %20s():%-5d %18p\n", aio_prefix, DEBUG_ORANGE "SIGNAL: " DEBUG_NORM \
    237 		 #advcond, __FUNCTION__, __LINE__, (void *) advcond);	\
    238     T_ERROR (__gthread_cond_broadcast, &(advcond)->signal);			\
    239   } while (0)
    240 
    241 /* Has to be entered with mutex locked.  */
    242 
    243 #define WAIT_SIGNAL_MUTEX(advcond, condition, mutex) do{		\
    244     __label__ finish;		       					\
    245     DEBUG_PRINTF ("%s%-75s %20s():%-5d %18p\n", aio_prefix, DEBUG_BLUE "WAITING: " DEBUG_NORM \
    246 		 #advcond, __FUNCTION__, __LINE__, (void *) advcond);	\
    247     if ((advcond)->pending || (condition))				\
    248       goto finish;							\
    249     while (1)								\
    250       {									\
    251 	int err_ret = __gthread_cond_wait(&(advcond)->signal, mutex);	\
    252 	if (err_ret) internal_error (NULL, "WAIT_SIGNAL_MUTEX failed");	\
    253 	if (condition)							\
    254 	  {								\
    255 	    DEBUG_PRINTF ("%s%-75s %20s():%-5d %18p\n", aio_prefix, DEBUG_ORANGE \
    256 			  "REC: " DEBUG_NORM				\
    257 			  #advcond,  __FUNCTION__, __LINE__, (void *)advcond); \
    258 	    break;				      			\
    259 	  }								\
    260       }									\
    261   finish:								\
    262     (advcond)->pending = 0;						\
    263     UNLOCK (mutex);							\
    264   } while (0)
    265 
    266 /* au->lock has to be held when calling this macro.  */
    267 
    268 #define REVOKE_SIGNAL(advcond) do{		\
    269     (advcond)->pending = 0;			\
    270   } while (0)
    271 
    272 #else
    273 
    274 #define SIGNAL(advcond) do{} while(0)
    275 #define WAIT_SIGNAL_MUTEX(advcond, condition, mutex) do{} while(0)
    276 #define REVOKE_SIGNAL(advcond) do{} while(0)
    277 
    278 #endif
    279 
    280 #if ASYNC_IO
    281 DEBUG_LINE (extern __thread const char *aio_prefix);
    282 
    283 DEBUG_LINE (typedef struct aio_lock_debug{
    284   __gthread_mutex_t *m;
    285   int line;
    286   const char *func;
    287   struct aio_lock_debug *next;
    288   struct aio_lock_debug *prev;
    289 } aio_lock_debug;)
    290 
    291 DEBUG_LINE (extern aio_lock_debug *aio_debug_head;)
    292 DEBUG_LINE (extern __gthread_mutex_t debug_queue_lock;)
    293 
    294 /* Thread - local storage of the current unit we are looking at. Needed for
    295    error reporting.  */
    296 
    297 extern __thread gfc_unit *thread_unit;
    298 #endif
    299 
    300 enum aio_do {
    301   AIO_INVALID = 0,
    302   AIO_DATA_TRANSFER_INIT,
    303   AIO_TRANSFER_SCALAR,
    304   AIO_TRANSFER_ARRAY,
    305   AIO_WRITE_DONE,
    306   AIO_READ_DONE,
    307   AIO_CLOSE
    308 };
    309 
    310 typedef union transfer_args
    311 {
    312   struct
    313   {
    314     void (*transfer) (struct st_parameter_dt *, bt, void *, int, size_t, size_t);
    315     bt arg_bt;
    316     void *data;
    317     int i;
    318     size_t s1;
    319     size_t s2;
    320   } scalar;
    321   struct
    322   {
    323     gfc_array_char *desc;
    324     int kind;
    325     gfc_charlen_type charlen;
    326   } array;
    327 } transfer_args;
    328 
    329 struct adv_cond
    330 {
    331 #if ASYNC_IO
    332   int pending;
    333   __gthread_cond_t signal;
    334 #endif
    335 };
    336 
    337 typedef struct async_unit
    338 {
    339   __gthread_mutex_t io_lock;   /* Lock for doing actual I/O. */
    340   __gthread_mutex_t lock;      /* Lock for manipulating the queue structure.  */
    341   bool empty;
    342   struct
    343   {
    344     int waiting;
    345     int low;
    346     int high;
    347     struct adv_cond done;
    348   } id;
    349 
    350 #if ASYNC_IO
    351   struct adv_cond work;
    352   struct adv_cond emptysignal;
    353   struct st_parameter_dt *pdt;
    354   pthread_t thread;
    355   struct transfer_queue *head;
    356   struct transfer_queue *tail;
    357 
    358   struct {
    359     const char *message;
    360     st_parameter_common *cmp;
    361     bool has_error;
    362     int last_good_id;
    363     int family;
    364     bool fatal_error;
    365   } error;
    366 #endif
    367 } async_unit;
    368 
    369 void init_async_unit (gfc_unit *);
    370 internal_proto (init_async_unit);
    371 
    372 bool async_wait (st_parameter_common *, async_unit *);
    373 internal_proto (async_wait);
    374 
    375 bool async_wait_id (st_parameter_common *, async_unit *, int);
    376 internal_proto (async_wait_id);
    377 
    378 bool collect_async_errors (st_parameter_common *, async_unit *);
    379 internal_proto (collect_async_errors);
    380 
    381 void async_close (async_unit *);
    382 internal_proto (async_close);
    383 
    384 void enqueue_transfer (async_unit * au, transfer_args * arg, enum aio_do);
    385 internal_proto (enqueue_transfer);
    386 
    387 void enqueue_done (async_unit *, enum aio_do type);
    388 internal_proto (enqueue_done);
    389 
    390 int enqueue_done_id (async_unit *, enum aio_do type);
    391 internal_proto (enqueue_done_id);
    392 
    393 void enqueue_init (async_unit *);
    394 internal_proto (enqueue_init);
    395 
    396 void enqueue_data_transfer_init (async_unit *, st_parameter_dt *, int);
    397 internal_proto (enqueue_data_transfer_init);
    398 
    399 void enqueue_close (async_unit *);
    400 internal_proto (enqueue_close);
    401 
    402 #endif
    403