Home | History | Annotate | Line # | Download | only in io
async.h revision 1.1
      1  1.1  mrg /* Copyright (C) 2018-2019 Free Software Foundation, Inc.
      2  1.1  mrg    Contributed by Nicolas Koenig
      3  1.1  mrg 
      4  1.1  mrg    This file is part of the GNU Fortran runtime library (libgfortran).
      5  1.1  mrg 
      6  1.1  mrg    Libgfortran is free software; you can redistribute it and/or modify
      7  1.1  mrg    it under the terms of the GNU General Public License as published by
      8  1.1  mrg    the Free Software Foundation; either version 3, or (at your option)
      9  1.1  mrg    any later version.
     10  1.1  mrg 
     11  1.1  mrg    Libgfortran is distributed in the hope that it will be useful,
     12  1.1  mrg    but WITHOUT ANY WARRANTY; without even the implied warranty of
     13  1.1  mrg    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     14  1.1  mrg    GNU General Public License for more details.
     15  1.1  mrg 
     16  1.1  mrg    Under Section 7 of GPL version 3, you are granted additional
     17  1.1  mrg    permissions described in the GCC Runtime Library Exception, version
     18  1.1  mrg    3.1, as published by the Free Software Foundation.
     19  1.1  mrg 
     20  1.1  mrg    You should have received a copy of the GNU General Public License and
     21  1.1  mrg    a copy of the GCC Runtime Library Exception along with this program;
     22  1.1  mrg    see the files COPYING3 and COPYING.RUNTIME respectively.  If not, see
     23  1.1  mrg    <http://www.gnu.org/licenses/>.  */
     24  1.1  mrg 
     25  1.1  mrg #ifndef ASYNC_H
     26  1.1  mrg #define ASYNC_H
     27  1.1  mrg 
     28  1.1  mrg /* Async I/O will not work on targets which do not support
     29  1.1  mrg    __gthread_cond_t and __gthread_equal / __gthread_self.  Check
     30  1.1  mrg    this.  */
     31  1.1  mrg 
     32  1.1  mrg #if defined(__GTHREAD_HAS_COND) && defined(__GTHREADS_CXX0X)
     33  1.1  mrg #define ASYNC_IO 1
     34  1.1  mrg #else
     35  1.1  mrg #define ASYNC_IO 0
     36  1.1  mrg #endif
     37  1.1  mrg 
     38  1.1  mrg /* Defining DEBUG_ASYNC will enable somewhat verbose debugging
     39  1.1  mrg    output for async I/O.  */
     40  1.1  mrg 
     41  1.1  mrg #define DEBUG_ASYNC
     42  1.1  mrg #undef DEBUG_ASYNC
     43  1.1  mrg 
     44  1.1  mrg #ifdef DEBUG_ASYNC
     45  1.1  mrg 
     46  1.1  mrg /* Define this if you want to use ANSI color escape sequences in your
     47  1.1  mrg    debugging output.  */
     48  1.1  mrg 
     49  1.1  mrg #define DEBUG_COLOR
     50  1.1  mrg 
     51  1.1  mrg #ifdef DEBUG_COLOR
     52  1.1  mrg #define MPREFIX "\033[30;46mM:\033[0m "
     53  1.1  mrg #define TPREFIX "\033[37;44mT:\033[0m "
     54  1.1  mrg #define RPREFIX "\033[37;41mR:\033[0m "
     55  1.1  mrg #define DEBUG_RED "\033[31m"
     56  1.1  mrg #define DEBUG_ORANGE "\033[33m"
     57  1.1  mrg #define DEBUG_GREEN "\033[32m"
     58  1.1  mrg #define DEBUG_DARKRED "\033[31;2m"
     59  1.1  mrg #define DEBUG_PURPLE "\033[35m"
     60  1.1  mrg #define DEBUG_NORM "\033[0m"
     61  1.1  mrg #define DEBUG_REVERSE_RED "\033[41;37m"
     62  1.1  mrg #define DEBUG_BLUE "\033[34m"
     63  1.1  mrg 
     64  1.1  mrg #else
     65  1.1  mrg 
     66  1.1  mrg #define MPREFIX "M: "
     67  1.1  mrg #define TPREFIX "T: "
     68  1.1  mrg #define RPREFIX ""
     69  1.1  mrg #define DEBUG_RED ""
     70  1.1  mrg #define DEBUG_ORANGE ""
     71  1.1  mrg #define DEBUG_GREEN ""
     72  1.1  mrg #define DEBUG_DARKRED ""
     73  1.1  mrg #define DEBUG_PURPLE ""
     74  1.1  mrg #define DEBUG_NORM ""
     75  1.1  mrg #define DEBUG_REVERSE_RED ""
     76  1.1  mrg #define DEBUG_BLUE ""
     77  1.1  mrg 
     78  1.1  mrg #endif
     79  1.1  mrg 
     80  1.1  mrg #define DEBUG_PRINTF(...) fprintf (stderr,__VA_ARGS__)
     81  1.1  mrg 
     82  1.1  mrg #define IN_DEBUG_QUEUE(mutex) ({		\
     83  1.1  mrg       __label__ end;				\
     84  1.1  mrg       aio_lock_debug *curr = aio_debug_head;	\
     85  1.1  mrg       while (curr) {				\
     86  1.1  mrg 	if (curr->m == mutex) {			\
     87  1.1  mrg 	  goto end;				\
     88  1.1  mrg 	}					\
     89  1.1  mrg 	curr = curr->next;			\
     90  1.1  mrg       }						\
     91  1.1  mrg     end:;					\
     92  1.1  mrg       curr;					\
     93  1.1  mrg     })
     94  1.1  mrg 
     95  1.1  mrg #define TAIL_DEBUG_QUEUE ({			\
     96  1.1  mrg       aio_lock_debug *curr = aio_debug_head;	\
     97  1.1  mrg       while (curr && curr->next) {		\
     98  1.1  mrg 	curr = curr->next;			\
     99  1.1  mrg       }						\
    100  1.1  mrg       curr;					\
    101  1.1  mrg     })
    102  1.1  mrg 
    103  1.1  mrg #define CHECK_LOCK(mutex, status) do {					\
    104  1.1  mrg     aio_lock_debug *curr;						\
    105  1.1  mrg     INTERN_LOCK (&debug_queue_lock);					\
    106  1.1  mrg     if (__gthread_mutex_trylock (mutex)) {				\
    107  1.1  mrg       if ((curr = IN_DEBUG_QUEUE (mutex))) {				\
    108  1.1  mrg 	sprintf (status, DEBUG_RED "%s():%d" DEBUG_NORM, curr->func, curr->line); \
    109  1.1  mrg       } else								\
    110  1.1  mrg 	sprintf (status, DEBUG_RED "unknown" DEBUG_NORM);			\
    111  1.1  mrg     }									\
    112  1.1  mrg     else {								\
    113  1.1  mrg       __gthread_mutex_unlock (mutex);					\
    114  1.1  mrg       sprintf (status, DEBUG_GREEN "unlocked" DEBUG_NORM);			\
    115  1.1  mrg     }									\
    116  1.1  mrg     INTERN_UNLOCK (&debug_queue_lock);					\
    117  1.1  mrg   }while (0)
    118  1.1  mrg 
    119  1.1  mrg #define T_ERROR(func, ...) do {				\
    120  1.1  mrg     int t_error_temp;					\
    121  1.1  mrg     t_error_temp = func(__VA_ARGS__);			\
    122  1.1  mrg     if (t_error_temp)					\
    123  1.1  mrg       ERROR (t_error_temp, "args: " #__VA_ARGS__ "\n");	\
    124  1.1  mrg   } while (0)
    125  1.1  mrg 
    126  1.1  mrg #define NOTE(str, ...) do{						\
    127  1.1  mrg     char note_str[200];							\
    128  1.1  mrg     sprintf (note_str, "%s" DEBUG_PURPLE "NOTE: " DEBUG_NORM str, aio_prefix, ##__VA_ARGS__); \
    129  1.1  mrg     DEBUG_PRINTF ("%-90s %20s():%-5d\n", note_str, __FUNCTION__, __LINE__); \
    130  1.1  mrg   }while (0);
    131  1.1  mrg 
    132  1.1  mrg #define ERROR(errnum, str, ...) do{					\
    133  1.1  mrg     char note_str[200];							\
    134  1.1  mrg     sprintf (note_str, "%s" DEBUG_REVERSE_RED "ERROR:" DEBUG_NORM " [%d] " str, aio_prefix, \
    135  1.1  mrg 	    errnum, ##__VA_ARGS__);					\
    136  1.1  mrg     DEBUG_PRINTF ("%-68s %s():%-5d\n", note_str, __FUNCTION__, __LINE__);	\
    137  1.1  mrg   }while (0)
    138  1.1  mrg 
    139  1.1  mrg #define MUTEX_DEBUG_ADD(mutex) do {		\
    140  1.1  mrg     aio_lock_debug *n;				\
    141  1.1  mrg     n = malloc (sizeof(aio_lock_debug));	\
    142  1.1  mrg     n->prev = TAIL_DEBUG_QUEUE;			\
    143  1.1  mrg     if (n->prev)				\
    144  1.1  mrg       n->prev->next = n;			\
    145  1.1  mrg     n->next = NULL;				\
    146  1.1  mrg     n->line = __LINE__;				\
    147  1.1  mrg     n->func = __FUNCTION__;			\
    148  1.1  mrg     n->m = mutex;				\
    149  1.1  mrg     if (!aio_debug_head) {			\
    150  1.1  mrg       aio_debug_head = n;			\
    151  1.1  mrg     }						\
    152  1.1  mrg   } while (0)
    153  1.1  mrg 
    154  1.1  mrg #define UNLOCK(mutex) do {						\
    155  1.1  mrg     aio_lock_debug *curr;						\
    156  1.1  mrg     DEBUG_PRINTF ("%s%-75s %20s():%-5d %18p\n", aio_prefix, DEBUG_GREEN "UNLOCK: " DEBUG_NORM #mutex, \
    157  1.1  mrg 		 __FUNCTION__, __LINE__, (void *) mutex);		\
    158  1.1  mrg     INTERN_LOCK (&debug_queue_lock);					\
    159  1.1  mrg     curr = IN_DEBUG_QUEUE (mutex);					\
    160  1.1  mrg     if (curr)								\
    161  1.1  mrg       {									\
    162  1.1  mrg 	if (curr->prev)							\
    163  1.1  mrg 	  curr->prev->next = curr->next;				\
    164  1.1  mrg 	if (curr->next) {						\
    165  1.1  mrg 	  curr->next->prev = curr->prev;				\
    166  1.1  mrg 	  if (curr == aio_debug_head)					\
    167  1.1  mrg 	    aio_debug_head = curr->next;				\
    168  1.1  mrg 	} else {							\
    169  1.1  mrg 	  if (curr == aio_debug_head)					\
    170  1.1  mrg 	    aio_debug_head = NULL;					\
    171  1.1  mrg 	}								\
    172  1.1  mrg 	free (curr);							\
    173  1.1  mrg       }									\
    174  1.1  mrg     INTERN_UNLOCK (&debug_queue_lock);					\
    175  1.1  mrg     INTERN_UNLOCK (mutex);						\
    176  1.1  mrg   }while (0)
    177  1.1  mrg 
    178  1.1  mrg #define TRYLOCK(mutex) ({						\
    179  1.1  mrg 			 char status[200];				\
    180  1.1  mrg 			 int res;					\
    181  1.1  mrg 			 aio_lock_debug *curr;				\
    182  1.1  mrg 			 res = __gthread_mutex_trylock (mutex);		\
    183  1.1  mrg 			 INTERN_LOCK (&debug_queue_lock);		\
    184  1.1  mrg 			 if (res) {					\
    185  1.1  mrg 			   if ((curr = IN_DEBUG_QUEUE (mutex))) {	\
    186  1.1  mrg 			     sprintf (status, DEBUG_RED "%s():%d" DEBUG_NORM, curr->func, curr->line);	\
    187  1.1  mrg 			   } else					\
    188  1.1  mrg 			     sprintf (status, DEBUG_RED "unknown" DEBUG_NORM);	\
    189  1.1  mrg 			 }						\
    190  1.1  mrg 			 else {						\
    191  1.1  mrg 			   sprintf (status, DEBUG_GREEN "unlocked" DEBUG_NORM);	\
    192  1.1  mrg 			   MUTEX_DEBUG_ADD (mutex);			\
    193  1.1  mrg 			 }						\
    194  1.1  mrg 			 DEBUG_PRINTF ("%s%-44s prev: %-35s %20s():%-5d %18p\n", aio_prefix, \
    195  1.1  mrg 				      DEBUG_DARKRED "TRYLOCK: " DEBUG_NORM #mutex, status, __FUNCTION__, __LINE__, \
    196  1.1  mrg 				      (void *) mutex);			\
    197  1.1  mrg 			 INTERN_UNLOCK (&debug_queue_lock);		\
    198  1.1  mrg 			 res;						\
    199  1.1  mrg     })
    200  1.1  mrg 
    201  1.1  mrg #define LOCK(mutex) do {						\
    202  1.1  mrg     char status[200];							\
    203  1.1  mrg     CHECK_LOCK (mutex, status);						\
    204  1.1  mrg     DEBUG_PRINTF ("%s%-42s prev: %-35s %20s():%-5d %18p\n", aio_prefix,	\
    205  1.1  mrg 		 DEBUG_RED "LOCK: " DEBUG_NORM #mutex, status, __FUNCTION__, __LINE__, (void *) mutex); \
    206  1.1  mrg     INTERN_LOCK (mutex);							\
    207  1.1  mrg     INTERN_LOCK (&debug_queue_lock);					\
    208  1.1  mrg     MUTEX_DEBUG_ADD (mutex);						\
    209  1.1  mrg     INTERN_UNLOCK (&debug_queue_lock);					\
    210  1.1  mrg     DEBUG_PRINTF ("%s" DEBUG_RED "ACQ:" DEBUG_NORM " %-30s %78p\n", aio_prefix, #mutex, mutex); \
    211  1.1  mrg   } while (0)
    212  1.1  mrg 
    213  1.1  mrg #define DEBUG_LINE(...) __VA_ARGS__
    214  1.1  mrg 
    215  1.1  mrg #else
    216  1.1  mrg #define DEBUG_PRINTF(...) {}
    217  1.1  mrg #define CHECK_LOCK(au, mutex, status) {}
    218  1.1  mrg #define NOTE(str, ...) {}
    219  1.1  mrg #define DEBUG_LINE(...)
    220  1.1  mrg #define T_ERROR(func, ...) func(__VA_ARGS__)
    221  1.1  mrg #define LOCK(mutex) INTERN_LOCK (mutex)
    222  1.1  mrg #define UNLOCK(mutex) INTERN_UNLOCK (mutex)
    223  1.1  mrg #define TRYLOCK(mutex) (__gthread_mutex_trylock (mutex))
    224  1.1  mrg #endif
    225  1.1  mrg 
    226  1.1  mrg #define INTERN_LOCK(mutex) T_ERROR (__gthread_mutex_lock, mutex);
    227  1.1  mrg 
    228  1.1  mrg #define INTERN_UNLOCK(mutex) T_ERROR (__gthread_mutex_unlock, mutex);
    229  1.1  mrg 
    230  1.1  mrg #if ASYNC_IO
    231  1.1  mrg 
    232  1.1  mrg /* au->lock has to be held when calling this macro.  */
    233  1.1  mrg 
    234  1.1  mrg #define SIGNAL(advcond) do{						\
    235  1.1  mrg     (advcond)->pending = 1;						\
    236  1.1  mrg     DEBUG_PRINTF ("%s%-75s %20s():%-5d %18p\n", aio_prefix, DEBUG_ORANGE "SIGNAL: " DEBUG_NORM \
    237  1.1  mrg 		 #advcond, __FUNCTION__, __LINE__, (void *) advcond);	\
    238  1.1  mrg     T_ERROR (__gthread_cond_broadcast, &(advcond)->signal);			\
    239  1.1  mrg   } while (0)
    240  1.1  mrg 
    241  1.1  mrg /* Has to be entered with mutex locked.  */
    242  1.1  mrg 
    243  1.1  mrg #define WAIT_SIGNAL_MUTEX(advcond, condition, mutex) do{		\
    244  1.1  mrg     __label__ finish;		       					\
    245  1.1  mrg     DEBUG_PRINTF ("%s%-75s %20s():%-5d %18p\n", aio_prefix, DEBUG_BLUE "WAITING: " DEBUG_NORM \
    246  1.1  mrg 		 #advcond, __FUNCTION__, __LINE__, (void *) advcond);	\
    247  1.1  mrg     if ((advcond)->pending || (condition))				\
    248  1.1  mrg       goto finish;							\
    249  1.1  mrg     while (1)								\
    250  1.1  mrg       {									\
    251  1.1  mrg 	int err_ret = __gthread_cond_wait(&(advcond)->signal, mutex);	\
    252  1.1  mrg 	if (err_ret) internal_error (NULL, "WAIT_SIGNAL_MUTEX failed");	\
    253  1.1  mrg 	if (condition)							\
    254  1.1  mrg 	  {								\
    255  1.1  mrg 	    DEBUG_PRINTF ("%s%-75s %20s():%-5d %18p\n", aio_prefix, DEBUG_ORANGE \
    256  1.1  mrg 			  "REC: " DEBUG_NORM				\
    257  1.1  mrg 			  #advcond,  __FUNCTION__, __LINE__, (void *)advcond); \
    258  1.1  mrg 	    break;				      			\
    259  1.1  mrg 	  }								\
    260  1.1  mrg       }									\
    261  1.1  mrg   finish:								\
    262  1.1  mrg     (advcond)->pending = 0;						\
    263  1.1  mrg     UNLOCK (mutex);							\
    264  1.1  mrg   } while (0)
    265  1.1  mrg 
    266  1.1  mrg /* au->lock has to be held when calling this macro.  */
    267  1.1  mrg 
    268  1.1  mrg #define REVOKE_SIGNAL(advcond) do{		\
    269  1.1  mrg     (advcond)->pending = 0;			\
    270  1.1  mrg   } while (0)
    271  1.1  mrg 
    272  1.1  mrg #else
    273  1.1  mrg 
    274  1.1  mrg #define SIGNAL(advcond) do{} while(0)
    275  1.1  mrg #define WAIT_SIGNAL_MUTEX(advcond, condition, mutex) do{} while(0)
    276  1.1  mrg #define REVOKE_SIGNAL(advcond) do{} while(0)
    277  1.1  mrg 
    278  1.1  mrg #endif
    279  1.1  mrg 
    280  1.1  mrg #if ASYNC_IO
    281  1.1  mrg DEBUG_LINE (extern __thread const char *aio_prefix);
    282  1.1  mrg 
    283  1.1  mrg DEBUG_LINE (typedef struct aio_lock_debug{
    284  1.1  mrg   __gthread_mutex_t *m;
    285  1.1  mrg   int line;
    286  1.1  mrg   const char *func;
    287  1.1  mrg   struct aio_lock_debug *next;
    288  1.1  mrg   struct aio_lock_debug *prev;
    289  1.1  mrg } aio_lock_debug;)
    290  1.1  mrg 
    291  1.1  mrg DEBUG_LINE (extern aio_lock_debug *aio_debug_head;)
    292  1.1  mrg DEBUG_LINE (extern __gthread_mutex_t debug_queue_lock;)
    293  1.1  mrg 
    294  1.1  mrg /* Thread - local storage of the current unit we are looking at. Needed for
    295  1.1  mrg    error reporting.  */
    296  1.1  mrg 
    297  1.1  mrg extern __thread gfc_unit *thread_unit;
    298  1.1  mrg #endif
    299  1.1  mrg 
    300  1.1  mrg enum aio_do {
    301  1.1  mrg   AIO_INVALID = 0,
    302  1.1  mrg   AIO_DATA_TRANSFER_INIT,
    303  1.1  mrg   AIO_TRANSFER_SCALAR,
    304  1.1  mrg   AIO_TRANSFER_ARRAY,
    305  1.1  mrg   AIO_WRITE_DONE,
    306  1.1  mrg   AIO_READ_DONE,
    307  1.1  mrg   AIO_CLOSE
    308  1.1  mrg };
    309  1.1  mrg 
    310  1.1  mrg typedef union transfer_args
    311  1.1  mrg {
    312  1.1  mrg   struct
    313  1.1  mrg   {
    314  1.1  mrg     void (*transfer) (struct st_parameter_dt *, bt, void *, int, size_t, size_t);
    315  1.1  mrg     bt arg_bt;
    316  1.1  mrg     void *data;
    317  1.1  mrg     int i;
    318  1.1  mrg     size_t s1;
    319  1.1  mrg     size_t s2;
    320  1.1  mrg   } scalar;
    321  1.1  mrg   struct
    322  1.1  mrg   {
    323  1.1  mrg     gfc_array_char *desc;
    324  1.1  mrg     int kind;
    325  1.1  mrg     gfc_charlen_type charlen;
    326  1.1  mrg   } array;
    327  1.1  mrg } transfer_args;
    328  1.1  mrg 
    329  1.1  mrg struct adv_cond
    330  1.1  mrg {
    331  1.1  mrg #if ASYNC_IO
    332  1.1  mrg   int pending;
    333  1.1  mrg   __gthread_cond_t signal;
    334  1.1  mrg #endif
    335  1.1  mrg };
    336  1.1  mrg 
    337  1.1  mrg typedef struct async_unit
    338  1.1  mrg {
    339  1.1  mrg   __gthread_mutex_t io_lock;   /* Lock for doing actual I/O. */
    340  1.1  mrg   __gthread_mutex_t lock;      /* Lock for manipulating the queue structure.  */
    341  1.1  mrg   bool empty;
    342  1.1  mrg   struct
    343  1.1  mrg   {
    344  1.1  mrg     int waiting;
    345  1.1  mrg     int low;
    346  1.1  mrg     int high;
    347  1.1  mrg     struct adv_cond done;
    348  1.1  mrg   } id;
    349  1.1  mrg 
    350  1.1  mrg #if ASYNC_IO
    351  1.1  mrg   struct adv_cond work;
    352  1.1  mrg   struct adv_cond emptysignal;
    353  1.1  mrg   struct st_parameter_dt *pdt;
    354  1.1  mrg   pthread_t thread;
    355  1.1  mrg   struct transfer_queue *head;
    356  1.1  mrg   struct transfer_queue *tail;
    357  1.1  mrg 
    358  1.1  mrg   struct {
    359  1.1  mrg     const char *message;
    360  1.1  mrg     st_parameter_common *cmp;
    361  1.1  mrg     bool has_error;
    362  1.1  mrg     int last_good_id;
    363  1.1  mrg     int family;
    364  1.1  mrg     bool fatal_error;
    365  1.1  mrg   } error;
    366  1.1  mrg #endif
    367  1.1  mrg } async_unit;
    368  1.1  mrg 
    369  1.1  mrg void init_async_unit (gfc_unit *);
    370  1.1  mrg internal_proto (init_async_unit);
    371  1.1  mrg 
    372  1.1  mrg bool async_wait (st_parameter_common *, async_unit *);
    373  1.1  mrg internal_proto (async_wait);
    374  1.1  mrg 
    375  1.1  mrg bool async_wait_id (st_parameter_common *, async_unit *, int);
    376  1.1  mrg internal_proto (async_wait_id);
    377  1.1  mrg 
    378  1.1  mrg bool collect_async_errors (st_parameter_common *, async_unit *);
    379  1.1  mrg internal_proto (collect_async_errors);
    380  1.1  mrg 
    381  1.1  mrg void async_close (async_unit *);
    382  1.1  mrg internal_proto (async_close);
    383  1.1  mrg 
    384  1.1  mrg void enqueue_transfer (async_unit * au, transfer_args * arg, enum aio_do);
    385  1.1  mrg internal_proto (enqueue_transfer);
    386  1.1  mrg 
    387  1.1  mrg void enqueue_done (async_unit *, enum aio_do type);
    388  1.1  mrg internal_proto (enqueue_done);
    389  1.1  mrg 
    390  1.1  mrg int enqueue_done_id (async_unit *, enum aio_do type);
    391  1.1  mrg internal_proto (enqueue_done_id);
    392  1.1  mrg 
    393  1.1  mrg void enqueue_init (async_unit *);
    394  1.1  mrg internal_proto (enqueue_init);
    395  1.1  mrg 
    396  1.1  mrg void enqueue_data_transfer_init (async_unit *, st_parameter_dt *, int);
    397  1.1  mrg internal_proto (enqueue_data_transfer_init);
    398  1.1  mrg 
    399  1.1  mrg void enqueue_close (async_unit *);
    400  1.1  mrg internal_proto (enqueue_close);
    401  1.1  mrg 
    402  1.1  mrg #endif
    403