Home | History | Annotate | Line # | Download | only in raidframe
rf_dagffwr.c revision 1.1
      1 /*	$NetBSD: rf_dagffwr.c,v 1.1 1998/11/13 04:20:27 oster Exp $	*/
      2 /*
      3  * Copyright (c) 1995 Carnegie-Mellon University.
      4  * All rights reserved.
      5  *
      6  * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II
      7  *
      8  * Permission to use, copy, modify and distribute this software and
      9  * its documentation is hereby granted, provided that both the copyright
     10  * notice and this permission notice appear in all copies of the
     11  * software, derivative works or modified versions, and any portions
     12  * thereof, and that both notices appear in supporting documentation.
     13  *
     14  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
     15  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
     16  * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
     17  *
     18  * Carnegie Mellon requests users of this software to return to
     19  *
     20  *  Software Distribution Coordinator  or  Software.Distribution (at) CS.CMU.EDU
     21  *  School of Computer Science
     22  *  Carnegie Mellon University
     23  *  Pittsburgh PA 15213-3890
     24  *
     25  * any improvements or extensions that they make and grant Carnegie the
     26  * rights to redistribute these changes.
     27  */
     28 
     29 /*
     30  * rf_dagff.c
     31  *
     32  * code for creating fault-free DAGs
     33  *
     34  * :
     35  * Log: rf_dagffwr.c,v
     36  * Revision 1.19  1996/07/31 15:35:24  jimz
     37  * evenodd changes; bugfixes for double-degraded archs, generalize
     38  * some formerly PQ-only functions
     39  *
     40  * Revision 1.18  1996/07/28  20:31:39  jimz
     41  * i386netbsd port
     42  * true/false fixup
     43  *
     44  * Revision 1.17  1996/07/27  18:40:24  jimz
     45  * cleanup sweep
     46  *
     47  * Revision 1.16  1996/07/22  19:52:16  jimz
     48  * switched node params to RF_DagParam_t, a union of
     49  * a 64-bit int and a void *, for better portability
     50  * attempted hpux port, but failed partway through for
     51  * lack of a single C compiler capable of compiling all
     52  * source files
     53  *
     54  * Revision 1.15  1996/06/11  01:27:50  jimz
     55  * Fixed bug where diskthread shutdown would crash or hang. This
     56  * turned out to be two distinct bugs:
     57  * (1) [crash] The thread shutdown code wasn't properly waiting for
     58  * all the diskthreads to complete. This caused diskthreads that were
     59  * exiting+cleaning up to unlock a destroyed mutex.
     60  * (2) [hang] TerminateDiskQueues wasn't locking, and DiskIODequeue
     61  * only checked for termination _after_ a wakeup if the queues were
     62  * empty. This was a race where the termination wakeup could be lost
     63  * by the dequeueing thread, and the system would hang waiting for the
     64  * thread to exit, while the thread waited for an I/O or a signal to
     65  * check the termination flag.
     66  *
     67  * Revision 1.14  1996/06/10  22:24:01  wvcii
     68  * added write dags which do not have a commit node and are
     69  * used in forward and backward error recovery experiments.
     70  *
     71  * Revision 1.13  1996/06/07  22:26:27  jimz
     72  * type-ify which_ru (RF_ReconUnitNum_t)
     73  *
     74  * Revision 1.12  1996/06/07  21:33:04  jimz
     75  * begin using consistent types for sector numbers,
     76  * stripe numbers, row+col numbers, recon unit numbers
     77  *
     78  * Revision 1.11  1996/05/31  22:26:54  jimz
     79  * fix a lot of mapping problems, memory allocation problems
     80  * found some weird lock issues, fixed 'em
     81  * more code cleanup
     82  *
     83  * Revision 1.10  1996/05/30  11:29:41  jimz
     84  * Numerous bug fixes. Stripe lock release code disagreed with the taking code
     85  * about when stripes should be locked (I made it consistent: no parity, no lock)
     86  * There was a lot of extra serialization of I/Os which I've removed- a lot of
     87  * it was to calculate values for the cache code, which is no longer with us.
     88  * More types, function, macro cleanup. Added code to properly quiesce the array
     89  * on shutdown. Made a lot of stuff array-specific which was (bogusly) general
     90  * before. Fixed memory allocation, freeing bugs.
     91  *
     92  * Revision 1.9  1996/05/27  18:56:37  jimz
     93  * more code cleanup
     94  * better typing
     95  * compiles in all 3 environments
     96  *
     97  * Revision 1.8  1996/05/24  22:17:04  jimz
     98  * continue code + namespace cleanup
     99  * typed a bunch of flags
    100  *
    101  * Revision 1.7  1996/05/24  04:28:55  jimz
    102  * release cleanup ckpt
    103  *
    104  * Revision 1.6  1996/05/23  21:46:35  jimz
    105  * checkpoint in code cleanup (release prep)
    106  * lots of types, function names have been fixed
    107  *
    108  * Revision 1.5  1996/05/23  00:33:23  jimz
    109  * code cleanup: move all debug decls to rf_options.c, all extern
    110  * debug decls to rf_options.h, all debug vars preceded by rf_
    111  *
    112  * Revision 1.4  1996/05/18  19:51:34  jimz
    113  * major code cleanup- fix syntax, make some types consistent,
    114  * add prototypes, clean out dead code, et cetera
    115  *
    116  * Revision 1.3  1996/05/15  23:23:12  wvcii
    117  * fixed bug in small write read old q node succedent initialization
    118  *
    119  * Revision 1.2  1996/05/08  21:01:24  jimz
    120  * fixed up enum type names that were conflicting with other
    121  * enums and function names (ie, "panic")
    122  * future naming trends will be towards RF_ and rf_ for
    123  * everything raidframe-related
    124  *
    125  * Revision 1.1  1996/05/03  19:20:45  wvcii
    126  * Initial revision
    127  *
    128  */
    129 
    130 #include "rf_types.h"
    131 #include "rf_raid.h"
    132 #include "rf_dag.h"
    133 #include "rf_dagutils.h"
    134 #include "rf_dagfuncs.h"
    135 #include "rf_threadid.h"
    136 #include "rf_debugMem.h"
    137 #include "rf_dagffrd.h"
    138 #include "rf_memchunk.h"
    139 #include "rf_general.h"
    140 #include "rf_dagffwr.h"
    141 
    142 /******************************************************************************
    143  *
    144  * General comments on DAG creation:
    145  *
    146  * All DAGs in this file use roll-away error recovery.  Each DAG has a single
    147  * commit node, usually called "Cmt."  If an error occurs before the Cmt node
    148  * is reached, the execution engine will halt forward execution and work
    149  * backward through the graph, executing the undo functions.  Assuming that
    150  * each node in the graph prior to the Cmt node are undoable and atomic - or -
    151  * does not make changes to permanent state, the graph will fail atomically.
    152  * If an error occurs after the Cmt node executes, the engine will roll-forward
    153  * through the graph, blindly executing nodes until it reaches the end.
    154  * If a graph reaches the end, it is assumed to have completed successfully.
    155  *
    156  * A graph has only 1 Cmt node.
    157  *
    158  */
    159 
    160 
    161 /******************************************************************************
    162  *
    163  * The following wrappers map the standard DAG creation interface to the
    164  * DAG creation routines.  Additionally, these wrappers enable experimentation
    165  * with new DAG structures by providing an extra level of indirection, allowing
    166  * the DAG creation routines to be replaced at this single point.
    167  */
    168 
    169 
    170 void rf_CreateNonRedundantWriteDAG(
    171   RF_Raid_t             *raidPtr,
    172   RF_AccessStripeMap_t  *asmap,
    173   RF_DagHeader_t        *dag_h,
    174   void                  *bp,
    175   RF_RaidAccessFlags_t   flags,
    176   RF_AllocListElem_t    *allocList,
    177   RF_IoType_t            type)
    178 {
    179   rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
    180     RF_IO_TYPE_WRITE);
    181 }
    182 
    183 void rf_CreateRAID0WriteDAG(
    184   RF_Raid_t             *raidPtr,
    185   RF_AccessStripeMap_t  *asmap,
    186   RF_DagHeader_t        *dag_h,
    187   void                  *bp,
    188   RF_RaidAccessFlags_t   flags,
    189   RF_AllocListElem_t    *allocList,
    190   RF_IoType_t            type)
    191 {
    192   rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
    193     RF_IO_TYPE_WRITE);
    194 }
    195 
    196 void rf_CreateSmallWriteDAG(
    197   RF_Raid_t             *raidPtr,
    198   RF_AccessStripeMap_t  *asmap,
    199   RF_DagHeader_t        *dag_h,
    200   void                  *bp,
    201   RF_RaidAccessFlags_t   flags,
    202   RF_AllocListElem_t    *allocList)
    203 {
    204 #if RF_FORWARD > 0
    205   rf_CommonCreateSmallWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
    206     &rf_xorFuncs, NULL);
    207 #else /* RF_FORWARD > 0 */
    208 #if RF_BACKWARD > 0
    209   rf_CommonCreateSmallWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
    210     &rf_xorFuncs, NULL);
    211 #else /* RF_BACKWARD > 0 */
    212   /* "normal" rollaway */
    213   rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
    214     &rf_xorFuncs, NULL);
    215 #endif /* RF_BACKWARD > 0 */
    216 #endif /* RF_FORWARD > 0 */
    217 }
    218 
    219 void rf_CreateLargeWriteDAG(
    220   RF_Raid_t             *raidPtr,
    221   RF_AccessStripeMap_t  *asmap,
    222   RF_DagHeader_t        *dag_h,
    223   void                  *bp,
    224   RF_RaidAccessFlags_t   flags,
    225   RF_AllocListElem_t    *allocList)
    226 {
    227 #if RF_FORWARD > 0
    228   rf_CommonCreateLargeWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
    229     1, rf_RegularXorFunc, RF_TRUE);
    230 #else /* RF_FORWARD > 0 */
    231 #if RF_BACKWARD > 0
    232   rf_CommonCreateLargeWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
    233     1, rf_RegularXorFunc, RF_TRUE);
    234 #else /* RF_BACKWARD > 0 */
    235   /* "normal" rollaway */
    236   rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
    237     1, rf_RegularXorFunc, RF_TRUE);
    238 #endif /* RF_BACKWARD > 0 */
    239 #endif /* RF_FORWARD > 0 */
    240 }
    241 
    242 
    243 /******************************************************************************
    244  *
    245  * DAG creation code begins here
    246  */
    247 
    248 
    249 /******************************************************************************
    250  *
    251  * creates a DAG to perform a large-write operation:
    252  *
    253  *           / Rod \           / Wnd \
    254  * H -- block- Rod - Xor - Cmt - Wnd --- T
    255  *           \ Rod /          \  Wnp /
    256  *                             \[Wnq]/
    257  *
    258  * The XOR node also does the Q calculation in the P+Q architecture.
    259  * All nodes are before the commit node (Cmt) are assumed to be atomic and
    260  * undoable - or - they make no changes to permanent state.
    261  *
    262  * Rod = read old data
    263  * Cmt = commit node
    264  * Wnp = write new parity
    265  * Wnd = write new data
    266  * Wnq = write new "q"
    267  * [] denotes optional segments in the graph
    268  *
    269  * Parameters:  raidPtr   - description of the physical array
    270  *              asmap     - logical & physical addresses for this access
    271  *              bp        - buffer ptr (holds write data)
    272  *              flags     - general flags (e.g. disk locking)
    273  *              allocList - list of memory allocated in DAG creation
    274  *              nfaults   - number of faults array can tolerate
    275  *                          (equal to # redundancy units in stripe)
    276  *              redfuncs  - list of redundancy generating functions
    277  *
    278  *****************************************************************************/
    279 
    280 void rf_CommonCreateLargeWriteDAG(
    281   RF_Raid_t             *raidPtr,
    282   RF_AccessStripeMap_t  *asmap,
    283   RF_DagHeader_t        *dag_h,
    284   void                  *bp,
    285   RF_RaidAccessFlags_t   flags,
    286   RF_AllocListElem_t    *allocList,
    287   int                    nfaults,
    288   int                  (*redFunc)(RF_DagNode_t *),
    289   int                    allowBufferRecycle)
    290 {
    291   RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
    292   RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode;
    293   int nWndNodes, nRodNodes, i, nodeNum, asmNum;
    294   RF_AccessStripeMapHeader_t *new_asm_h[2];
    295   RF_StripeNum_t parityStripeID;
    296   char *sosBuffer, *eosBuffer;
    297   RF_ReconUnitNum_t which_ru;
    298   RF_RaidLayout_t *layoutPtr;
    299   RF_PhysDiskAddr_t *pda;
    300 
    301   layoutPtr = &(raidPtr->Layout);
    302   parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr, asmap->raidAddress,
    303     &which_ru);
    304 
    305   if (rf_dagDebug) {
    306     printf("[Creating large-write DAG]\n");
    307   }
    308   dag_h->creator = "LargeWriteDAG";
    309 
    310   dag_h->numCommitNodes = 1;
    311   dag_h->numCommits = 0;
    312   dag_h->numSuccedents = 1;
    313 
    314   /* alloc the nodes: Wnd, xor, commit, block, term, and  Wnp */
    315   nWndNodes = asmap->numStripeUnitsAccessed;
    316   RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t),
    317     (RF_DagNode_t *), allocList);
    318   i = 0;
    319   wndNodes    = &nodes[i]; i += nWndNodes;
    320   xorNode     = &nodes[i]; i += 1;
    321   wnpNode     = &nodes[i]; i += 1;
    322   blockNode   = &nodes[i]; i += 1;
    323   commitNode  = &nodes[i]; i += 1;
    324   termNode    = &nodes[i]; i += 1;
    325   if (nfaults == 2) {
    326     wnqNode   = &nodes[i]; i += 1;
    327   }
    328   else {
    329     wnqNode = NULL;
    330   }
    331   rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h,
    332     &nRodNodes, &sosBuffer, &eosBuffer, allocList);
    333   if (nRodNodes > 0) {
    334     RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t),
    335       (RF_DagNode_t *), allocList);
    336   }
    337   else {
    338     rodNodes = NULL;
    339   }
    340 
    341   /* begin node initialization */
    342   if (nRodNodes > 0) {
    343     rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
    344       NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList);
    345   }
    346   else {
    347     rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
    348       NULL, 1, 0, 0, 0, dag_h, "Nil", allocList);
    349   }
    350 
    351   rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL,
    352     nWndNodes + nfaults, 1, 0, 0, dag_h, "Cmt", allocList);
    353   rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL,
    354     0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList);
    355 
    356   /* initialize the Rod nodes */
    357   for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
    358     if (new_asm_h[asmNum]) {
    359       pda = new_asm_h[asmNum]->stripeMap->physInfo;
    360       while (pda) {
    361         rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc,
    362           rf_DiskReadUndoFunc,rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
    363           "Rod", allocList);
    364         rodNodes[nodeNum].params[0].p = pda;
    365         rodNodes[nodeNum].params[1].p = pda->bufPtr;
    366         rodNodes[nodeNum].params[2].v = parityStripeID;
    367         rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    368           0, 0, which_ru);
    369         nodeNum++;
    370         pda = pda->next;
    371       }
    372     }
    373   }
    374   RF_ASSERT(nodeNum == nRodNodes);
    375 
    376   /* initialize the wnd nodes */
    377   pda = asmap->physInfo;
    378   for (i=0; i < nWndNodes; i++) {
    379     rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
    380       rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
    381     RF_ASSERT(pda != NULL);
    382     wndNodes[i].params[0].p = pda;
    383     wndNodes[i].params[1].p = pda->bufPtr;
    384     wndNodes[i].params[2].v = parityStripeID;
    385     wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
    386     pda = pda->next;
    387   }
    388 
    389   /* initialize the redundancy node */
    390   if (nRodNodes > 0) {
    391     rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
    392       nRodNodes, 2 * (nWndNodes+nRodNodes) + 1, nfaults, dag_h,
    393       "Xr ", allocList);
    394   }
    395   else {
    396     rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
    397       1, 2 * (nWndNodes+nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);
    398   }
    399   xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
    400   for (i=0; i < nWndNodes; i++) {
    401     xorNode->params[2*i+0] = wndNodes[i].params[0];         /* pda */
    402     xorNode->params[2*i+1] = wndNodes[i].params[1];         /* buf ptr */
    403   }
    404   for (i=0; i < nRodNodes; i++) {
    405     xorNode->params[2*(nWndNodes+i)+0] = rodNodes[i].params[0];  /* pda */
    406     xorNode->params[2*(nWndNodes+i)+1] = rodNodes[i].params[1];  /* buf ptr */
    407   }
    408   /* xor node needs to get at RAID information */
    409   xorNode->params[2*(nWndNodes+nRodNodes)].p = raidPtr;
    410 
    411   /*
    412    * Look for an Rod node that reads a complete SU. If none, alloc a buffer
    413    * to receive the parity info. Note that we can't use a new data buffer
    414    * because it will not have gotten written when the xor occurs.
    415    */
    416   if (allowBufferRecycle) {
    417     for (i = 0; i < nRodNodes; i++) {
    418       if (((RF_PhysDiskAddr_t *)rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
    419         break;
    420     }
    421   }
    422   if ((!allowBufferRecycle) || (i == nRodNodes)) {
    423     RF_CallocAndAdd(xorNode->results[0], 1,
    424       rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
    425       (void *), allocList);
    426   }
    427   else {
    428     xorNode->results[0] = rodNodes[i].params[1].p;
    429   }
    430 
    431   /* initialize the Wnp node */
    432   rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
    433     rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);
    434   wnpNode->params[0].p = asmap->parityInfo;
    435   wnpNode->params[1].p = xorNode->results[0];
    436   wnpNode->params[2].v = parityStripeID;
    437   wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
    438   /* parityInfo must describe entire parity unit */
    439   RF_ASSERT(asmap->parityInfo->next == NULL);
    440 
    441   if (nfaults == 2) {
    442       /*
    443        * We never try to recycle a buffer for the Q calcuation
    444        * in addition to the parity. This would cause two buffers
    445        * to get smashed during the P and Q calculation, guaranteeing
    446        * one would be wrong.
    447        */
    448       RF_CallocAndAdd(xorNode->results[1], 1,
    449         rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
    450         (void *),allocList);
    451       rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
    452         rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);
    453       wnqNode->params[0].p = asmap->qInfo;
    454       wnqNode->params[1].p = xorNode->results[1];
    455       wnqNode->params[2].v = parityStripeID;
    456       wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
    457       /* parityInfo must describe entire parity unit */
    458       RF_ASSERT(asmap->parityInfo->next == NULL);
    459   }
    460 
    461   /*
    462    * Connect nodes to form graph.
    463    */
    464 
    465   /* connect dag header to block node */
    466   RF_ASSERT(blockNode->numAntecedents == 0);
    467   dag_h->succedents[0] = blockNode;
    468 
    469   if (nRodNodes > 0) {
    470     /* connect the block node to the Rod nodes */
    471     RF_ASSERT(blockNode->numSuccedents == nRodNodes);
    472     RF_ASSERT(xorNode->numAntecedents == nRodNodes);
    473     for (i = 0; i < nRodNodes; i++) {
    474       RF_ASSERT(rodNodes[i].numAntecedents == 1);
    475       blockNode->succedents[i] = &rodNodes[i];
    476       rodNodes[i].antecedents[0] = blockNode;
    477       rodNodes[i].antType[0] = rf_control;
    478 
    479       /* connect the Rod nodes to the Xor node */
    480       RF_ASSERT(rodNodes[i].numSuccedents == 1);
    481       rodNodes[i].succedents[0] = xorNode;
    482       xorNode->antecedents[i] = &rodNodes[i];
    483       xorNode->antType[i] = rf_trueData;
    484     }
    485   }
    486   else {
    487     /* connect the block node to the Xor node */
    488     RF_ASSERT(blockNode->numSuccedents == 1);
    489     RF_ASSERT(xorNode->numAntecedents == 1);
    490     blockNode->succedents[0] = xorNode;
    491     xorNode->antecedents[0] = blockNode;
    492     xorNode->antType[0] = rf_control;
    493   }
    494 
    495   /* connect the xor node to the commit node */
    496   RF_ASSERT(xorNode->numSuccedents == 1);
    497   RF_ASSERT(commitNode->numAntecedents == 1);
    498   xorNode->succedents[0] = commitNode;
    499   commitNode->antecedents[0] = xorNode;
    500   commitNode->antType[0] = rf_control;
    501 
    502   /* connect the commit node to the write nodes */
    503   RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults);
    504   for (i = 0; i < nWndNodes; i++) {
    505     RF_ASSERT(wndNodes->numAntecedents == 1);
    506     commitNode->succedents[i] = &wndNodes[i];
    507     wndNodes[i].antecedents[0] = commitNode;
    508     wndNodes[i].antType[0] = rf_control;
    509   }
    510   RF_ASSERT(wnpNode->numAntecedents == 1);
    511   commitNode->succedents[nWndNodes] = wnpNode;
    512   wnpNode->antecedents[0]= commitNode;
    513   wnpNode->antType[0] = rf_trueData;
    514   if (nfaults == 2) {
    515     RF_ASSERT(wnqNode->numAntecedents == 1);
    516     commitNode->succedents[nWndNodes + 1] = wnqNode;
    517     wnqNode->antecedents[0] = commitNode;
    518     wnqNode->antType[0] = rf_trueData;
    519   }
    520 
    521   /* connect the write nodes to the term node */
    522   RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
    523   RF_ASSERT(termNode->numSuccedents == 0);
    524   for (i = 0; i < nWndNodes; i++) {
    525     RF_ASSERT(wndNodes->numSuccedents == 1);
    526     wndNodes[i].succedents[0] = termNode;
    527     termNode->antecedents[i] = &wndNodes[i];
    528     termNode->antType[i] = rf_control;
    529   }
    530   RF_ASSERT(wnpNode->numSuccedents == 1);
    531   wnpNode->succedents[0] = termNode;
    532   termNode->antecedents[nWndNodes] = wnpNode;
    533   termNode->antType[nWndNodes] = rf_control;
    534   if (nfaults == 2) {
    535     RF_ASSERT(wnqNode->numSuccedents == 1);
    536     wnqNode->succedents[0] = termNode;
    537     termNode->antecedents[nWndNodes + 1] = wnqNode;
    538     termNode->antType[nWndNodes + 1] = rf_control;
    539   }
    540 }
    541 
    542 /******************************************************************************
    543  *
    544  * creates a DAG to perform a small-write operation (either raid 5 or pq),
    545  * which is as follows:
    546  *
    547  * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm
    548  *            \- Rod X      /     \----> Wnd [Und]-/
    549  *           [\- Rod X     /       \---> Wnd [Und]-/]
    550  *           [\- Roq -> Q /         \--> Wnq [Unq]-/]
    551  *
    552  * Rop = read old parity
    553  * Rod = read old data
    554  * Roq = read old "q"
    555  * Cmt = commit node
    556  * Und = unlock data disk
    557  * Unp = unlock parity disk
    558  * Unq = unlock q disk
    559  * Wnp = write new parity
    560  * Wnd = write new data
    561  * Wnq = write new "q"
    562  * [ ] denotes optional segments in the graph
    563  *
    564  * Parameters:  raidPtr   - description of the physical array
    565  *              asmap     - logical & physical addresses for this access
    566  *              bp        - buffer ptr (holds write data)
    567  *              flags     - general flags (e.g. disk locking)
    568  *              allocList - list of memory allocated in DAG creation
    569  *              pfuncs    - list of parity generating functions
    570  *              qfuncs    - list of q generating functions
    571  *
    572  * A null qfuncs indicates single fault tolerant
    573  *****************************************************************************/
    574 
    575 void rf_CommonCreateSmallWriteDAG(
    576   RF_Raid_t             *raidPtr,
    577   RF_AccessStripeMap_t  *asmap,
    578   RF_DagHeader_t        *dag_h,
    579   void                  *bp,
    580   RF_RaidAccessFlags_t   flags,
    581   RF_AllocListElem_t    *allocList,
    582   RF_RedFuncs_t         *pfuncs,
    583   RF_RedFuncs_t         *qfuncs)
    584 {
    585   RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
    586   RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
    587   RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes;
    588   RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
    589   int i, j, nNodes, totalNumNodes, lu_flag;
    590   RF_ReconUnitNum_t which_ru;
    591   int (*func)(RF_DagNode_t *), (*undoFunc)(RF_DagNode_t *);
    592   int (*qfunc)(RF_DagNode_t *);
    593   int numDataNodes, numParityNodes;
    594   RF_StripeNum_t parityStripeID;
    595   RF_PhysDiskAddr_t *pda;
    596   char *name, *qname;
    597   long nfaults;
    598 
    599   nfaults = qfuncs ? 2 : 1;
    600   lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */
    601 
    602   parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
    603     asmap->raidAddress, &which_ru);
    604   pda = asmap->physInfo;
    605   numDataNodes = asmap->numStripeUnitsAccessed;
    606   numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
    607 
    608   if (rf_dagDebug) {
    609     printf("[Creating small-write DAG]\n");
    610   }
    611   RF_ASSERT(numDataNodes > 0);
    612   dag_h->creator = "SmallWriteDAG";
    613 
    614   dag_h->numCommitNodes = 1;
    615   dag_h->numCommits = 0;
    616   dag_h->numSuccedents = 1;
    617 
    618   /*
    619    * DAG creation occurs in four steps:
    620    * 1. count the number of nodes in the DAG
    621    * 2. create the nodes
    622    * 3. initialize the nodes
    623    * 4. connect the nodes
    624    */
    625 
    626   /*
    627    * Step 1. compute number of nodes in the graph
    628    */
    629 
    630   /* number of nodes:
    631    *  a read and write for each data unit
    632    *  a redundancy computation node for each parity node (nfaults * nparity)
    633    *  a read and write for each parity unit
    634    *  a block and commit node (2)
    635    *  a terminate node
    636    *  if atomic RMW
    637    *    an unlock node for each data unit, redundancy unit
    638    */
    639   totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
    640     + (nfaults * 2 * numParityNodes) + 3;
    641   if (lu_flag) {
    642     totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
    643   }
    644 
    645   /*
    646    * Step 2. create the nodes
    647    */
    648   RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t),
    649     (RF_DagNode_t *), allocList);
    650   i = 0;
    651   blockNode        = &nodes[i]; i += 1;
    652   commitNode       = &nodes[i]; i += 1;
    653   readDataNodes    = &nodes[i]; i += numDataNodes;
    654   readParityNodes  = &nodes[i]; i += numParityNodes;
    655   writeDataNodes   = &nodes[i]; i += numDataNodes;
    656   writeParityNodes = &nodes[i]; i += numParityNodes;
    657   xorNodes         = &nodes[i]; i += numParityNodes;
    658   termNode         = &nodes[i]; i += 1;
    659   if (lu_flag) {
    660     unlockDataNodes   = &nodes[i]; i += numDataNodes;
    661     unlockParityNodes = &nodes[i]; i += numParityNodes;
    662   }
    663   else {
    664     unlockDataNodes = unlockParityNodes = NULL;
    665   }
    666   if (nfaults == 2) {
    667     readQNodes     = &nodes[i]; i += numParityNodes;
    668     writeQNodes    = &nodes[i]; i += numParityNodes;
    669     qNodes         = &nodes[i]; i += numParityNodes;
    670     if (lu_flag) {
    671       unlockQNodes    = &nodes[i]; i += numParityNodes;
    672     }
    673     else {
    674       unlockQNodes = NULL;
    675     }
    676   }
    677   else {
    678     readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
    679   }
    680   RF_ASSERT(i == totalNumNodes);
    681 
    682   /*
    683    * Step 3. initialize the nodes
    684    */
    685   /* initialize block node (Nil) */
    686   nNodes     = numDataNodes + (nfaults * numParityNodes);
    687   rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
    688     NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
    689 
    690   /* initialize commit node (Cmt) */
    691   rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
    692     NULL, nNodes, (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList);
    693 
    694   /* initialize terminate node (Trm) */
    695   rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
    696     NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);
    697 
    698   /* initialize nodes which read old data (Rod) */
    699   for (i = 0; i < numDataNodes; i++) {
    700     rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
    701       rf_GenericWakeupFunc, (nfaults * numParityNodes), 1, 4, 0, dag_h,
    702       "Rod", allocList);
    703     RF_ASSERT(pda != NULL);
    704     /* physical disk addr desc */
    705     readDataNodes[i].params[0].p = pda;
    706     /* buffer to hold old data */
    707     readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
    708       dag_h, pda, allocList);
    709     readDataNodes[i].params[2].v = parityStripeID;
    710     readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    711       lu_flag, 0, which_ru);
    712     pda = pda->next;
    713     for (j = 0; j < readDataNodes[i].numSuccedents; j++) {
    714       readDataNodes[i].propList[j] = NULL;
    715     }
    716   }
    717 
    718   /* initialize nodes which read old parity (Rop) */
    719   pda = asmap->parityInfo; i = 0;
    720   for (i = 0; i < numParityNodes; i++) {
    721     RF_ASSERT(pda != NULL);
    722     rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc,
    723       rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4,
    724       0, dag_h, "Rop", allocList);
    725     readParityNodes[i].params[0].p = pda;
    726     /* buffer to hold old parity */
    727     readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
    728       dag_h, pda, allocList);
    729     readParityNodes[i].params[2].v = parityStripeID;
    730     readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    731       lu_flag, 0, which_ru);
    732     pda = pda->next;
    733     for (j = 0; j < readParityNodes[i].numSuccedents; j++) {
    734       readParityNodes[i].propList[0] = NULL;
    735     }
    736   }
    737 
    738   /* initialize nodes which read old Q (Roq) */
    739   if (nfaults == 2) {
    740     pda = asmap->qInfo;
    741     for (i = 0; i < numParityNodes; i++) {
    742       RF_ASSERT(pda != NULL);
    743       rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
    744         rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);
    745       readQNodes[i].params[0].p = pda;
    746       /* buffer to hold old Q */
    747       readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda,
    748         allocList);
    749       readQNodes[i].params[2].v = parityStripeID;
    750       readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    751         lu_flag, 0, which_ru);
    752       pda = pda->next;
    753       for (j = 0; j < readQNodes[i].numSuccedents; j++) {
    754         readQNodes[i].propList[0] = NULL;
    755       }
    756     }
    757   }
    758 
    759   /* initialize nodes which write new data (Wnd) */
    760   pda = asmap->physInfo;
    761   for (i=0; i < numDataNodes; i++) {
    762     RF_ASSERT(pda != NULL);
    763     rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
    764       rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
    765       "Wnd", allocList);
    766     /* physical disk addr desc */
    767     writeDataNodes[i].params[0].p = pda;
    768     /* buffer holding new data to be written */
    769     writeDataNodes[i].params[1].p = pda->bufPtr;
    770     writeDataNodes[i].params[2].v = parityStripeID;
    771     writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    772       0, 0, which_ru);
    773     if (lu_flag) {
    774       /* initialize node to unlock the disk queue */
    775       rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
    776         rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
    777         "Und", allocList);
    778       /* physical disk addr desc */
    779       unlockDataNodes[i].params[0].p = pda;
    780       unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    781         0, lu_flag, which_ru);
    782     }
    783     pda = pda->next;
    784   }
    785 
    786   /*
    787    * Initialize nodes which compute new parity and Q.
    788    */
    789   /*
    790    * We use the simple XOR func in the double-XOR case, and when
    791    * we're accessing only a portion of one stripe unit. The distinction
    792    * between the two is that the regular XOR func assumes that the targbuf
    793    * is a full SU in size, and examines the pda associated with the buffer
    794    * to decide where within the buffer to XOR the data, whereas
    795    * the simple XOR func just XORs the data into the start of the buffer.
    796    */
    797   if ((numParityNodes==2) || ((numDataNodes == 1)
    798     && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit)))
    799   {
    800     func = pfuncs->simple; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->SimpleName;
    801     if (qfuncs) {
    802       qfunc = qfuncs->simple;
    803       qname = qfuncs->SimpleName;
    804     }
    805     else {
    806       qfunc = NULL;
    807       qname = NULL;
    808     }
    809   }
    810   else {
    811     func = pfuncs->regular;
    812     undoFunc = rf_NullNodeUndoFunc;
    813     name = pfuncs->RegularName;
    814     if (qfuncs) {
    815       qfunc = qfuncs->regular;
    816       qname = qfuncs->RegularName;
    817     }
    818     else {
    819       qfunc = NULL;
    820       qname = NULL;
    821     }
    822   }
    823   /*
    824    * Initialize the xor nodes: params are {pda,buf}
    825    * from {Rod,Wnd,Rop} nodes, and raidPtr
    826    */
    827   if (numParityNodes==2) {
    828     /* double-xor case */
    829     for (i=0; i < numParityNodes; i++) {
    830       /* note: no wakeup func for xor */
    831       rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL,
    832         1, (numDataNodes + numParityNodes), 7, 1, dag_h, name, allocList);
    833       xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
    834       xorNodes[i].params[0]   = readDataNodes[i].params[0];
    835       xorNodes[i].params[1]   = readDataNodes[i].params[1];
    836       xorNodes[i].params[2]   = readParityNodes[i].params[0];
    837       xorNodes[i].params[3]   = readParityNodes[i].params[1];
    838       xorNodes[i].params[4]   = writeDataNodes[i].params[0];
    839       xorNodes[i].params[5]   = writeDataNodes[i].params[1];
    840       xorNodes[i].params[6].p = raidPtr;
    841       /* use old parity buf as target buf */
    842       xorNodes[i].results[0] = readParityNodes[i].params[1].p;
    843       if (nfaults == 2) {
    844         /* note: no wakeup func for qor */
    845         rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
    846           (numDataNodes + numParityNodes), 7, 1, dag_h, qname, allocList);
    847         qNodes[i].params[0]   = readDataNodes[i].params[0];
    848         qNodes[i].params[1]   = readDataNodes[i].params[1];
    849         qNodes[i].params[2]   = readQNodes[i].params[0];
    850         qNodes[i].params[3]   = readQNodes[i].params[1];
    851         qNodes[i].params[4]   = writeDataNodes[i].params[0];
    852         qNodes[i].params[5]   = writeDataNodes[i].params[1];
    853         qNodes[i].params[6].p = raidPtr;
    854         /* use old Q buf as target buf */
    855         qNodes[i].results[0] = readQNodes[i].params[1].p;
    856       }
    857     }
    858   }
    859   else {
    860     /* there is only one xor node in this case */
    861     rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, 1,
    862       (numDataNodes + numParityNodes),
    863       (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
    864     xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
    865     for (i=0; i < numDataNodes + 1; i++) {
    866       /* set up params related to Rod and Rop nodes */
    867       xorNodes[0].params[2*i+0] = readDataNodes[i].params[0]; /* pda */
    868       xorNodes[0].params[2*i+1] = readDataNodes[i].params[1]; /* buffer ptr */
    869     }
    870     for (i=0; i < numDataNodes; i++) {
    871       /* set up params related to Wnd and Wnp nodes */
    872       xorNodes[0].params[2*(numDataNodes+1+i)+0] = /* pda */
    873         writeDataNodes[i].params[0];
    874       xorNodes[0].params[2*(numDataNodes+1+i)+1] = /* buffer ptr */
    875        writeDataNodes[i].params[1];
    876     }
    877     /* xor node needs to get at RAID information */
    878     xorNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr;
    879     xorNodes[0].results[0] = readParityNodes[0].params[1].p;
    880     if (nfaults == 2)  {
    881       rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
    882         (numDataNodes + numParityNodes),
    883         (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h,
    884         qname, allocList);
    885       for (i=0; i<numDataNodes; i++) {
    886         /* set up params related to Rod */
    887         qNodes[0].params[2*i+0] = readDataNodes[i].params[0]; /* pda */
    888         qNodes[0].params[2*i+1] = readDataNodes[i].params[1]; /* buffer ptr */
    889       }
    890       /* and read old q */
    891       qNodes[0].params[2*numDataNodes + 0] = /* pda */
    892         readQNodes[0].params[0];
    893       qNodes[0].params[2*numDataNodes + 1] = /* buffer ptr */
    894         readQNodes[0].params[1];
    895       for (i=0; i < numDataNodes; i++) {
    896         /* set up params related to Wnd nodes */
    897         qNodes[0].params[2*(numDataNodes+1+i)+0] = /* pda */
    898           writeDataNodes[i].params[0];
    899         qNodes[0].params[2*(numDataNodes+1+i)+1] = /* buffer ptr */
    900           writeDataNodes[i].params[1];
    901       }
    902       /* xor node needs to get at RAID information */
    903       qNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr;
    904       qNodes[0].results[0] = readQNodes[0].params[1].p;
    905     }
    906   }
    907 
    908   /* initialize nodes which write new parity (Wnp) */
    909   pda = asmap->parityInfo;
    910   for (i=0;  i < numParityNodes; i++) {
    911     rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
    912       rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
    913       "Wnp", allocList);
    914     RF_ASSERT(pda != NULL);
    915     writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr) filled in by xor node */
    916     writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for parity write operation */
    917     writeParityNodes[i].params[2].v = parityStripeID;
    918     writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    919       0, 0, which_ru);
    920     if (lu_flag) {
    921       /* initialize node to unlock the disk queue */
    922       rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
    923         rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
    924         "Unp", allocList);
    925       unlockParityNodes[i].params[0].p = pda; /* physical disk addr desc */
    926       unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    927         0, lu_flag, which_ru);
    928     }
    929     pda = pda->next;
    930   }
    931 
    932   /* initialize nodes which write new Q (Wnq) */
    933   if (nfaults == 2) {
    934     pda = asmap->qInfo;
    935     for (i=0;  i < numParityNodes; i++) {
    936       rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
    937         rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
    938         "Wnq", allocList);
    939       RF_ASSERT(pda != NULL);
    940       writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr) filled in by xor node */
    941       writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for parity write operation */
    942       writeQNodes[i].params[2].v = parityStripeID;
    943       writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    944         0, 0, which_ru);
    945       if (lu_flag) {
    946         /* initialize node to unlock the disk queue */
    947         rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
    948           rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
    949           "Unq", allocList);
    950         unlockQNodes[i].params[0].p = pda; /* physical disk addr desc */
    951         unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    952           0, lu_flag, which_ru);
    953       }
    954       pda = pda->next;
    955     }
    956   }
    957 
    958   /*
    959    * Step 4. connect the nodes.
    960    */
    961 
    962   /* connect header to block node */
    963   dag_h->succedents[0] = blockNode;
    964 
    965   /* connect block node to read old data nodes */
    966   RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
    967   for (i = 0; i < numDataNodes; i++) {
    968     blockNode->succedents[i] = &readDataNodes[i];
    969     RF_ASSERT(readDataNodes[i].numAntecedents == 1);
    970     readDataNodes[i].antecedents[0]= blockNode;
    971     readDataNodes[i].antType[0] = rf_control;
    972   }
    973 
    974   /* connect block node to read old parity nodes */
    975   for (i = 0; i < numParityNodes; i++) {
    976     blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
    977     RF_ASSERT(readParityNodes[i].numAntecedents == 1);
    978     readParityNodes[i].antecedents[0] = blockNode;
    979     readParityNodes[i].antType[0] = rf_control;
    980   }
    981 
    982   /* connect block node to read old Q nodes */
    983   if (nfaults == 2) {
    984     for (i = 0; i < numParityNodes; i++) {
    985       blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
    986       RF_ASSERT(readQNodes[i].numAntecedents == 1);
    987       readQNodes[i].antecedents[0] = blockNode;
    988       readQNodes[i].antType[0] = rf_control;
    989     }
    990   }
    991 
    992   /* connect read old data nodes to xor nodes */
    993   for (i = 0; i < numDataNodes; i++) {
    994     RF_ASSERT(readDataNodes[i].numSuccedents == (nfaults * numParityNodes));
    995     for (j = 0; j < numParityNodes; j++){
    996       RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
    997       readDataNodes[i].succedents[j] = &xorNodes[j];
    998       xorNodes[j].antecedents[i] = &readDataNodes[i];
    999       xorNodes[j].antType[i] = rf_trueData;
   1000     }
   1001   }
   1002 
   1003   /* connect read old data nodes to q nodes */
   1004   if (nfaults == 2) {
   1005     for (i = 0; i < numDataNodes; i++) {
   1006       for (j = 0; j < numParityNodes; j++) {
   1007         RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
   1008         readDataNodes[i].succedents[numParityNodes + j] = &qNodes[j];
   1009         qNodes[j].antecedents[i] = &readDataNodes[i];
   1010         qNodes[j].antType[i] = rf_trueData;
   1011       }
   1012     }
   1013   }
   1014 
   1015   /* connect read old parity nodes to xor nodes */
   1016   for (i = 0; i < numParityNodes; i++) {
   1017     RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
   1018     for (j = 0; j < numParityNodes; j++) {
   1019       readParityNodes[i].succedents[j] = &xorNodes[j];
   1020       xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
   1021       xorNodes[j].antType[numDataNodes + i] = rf_trueData;
   1022     }
   1023   }
   1024 
   1025   /* connect read old q nodes to q nodes */
   1026   if (nfaults == 2) {
   1027     for (i = 0; i < numParityNodes; i++) {
   1028       RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
   1029       for (j = 0; j < numParityNodes; j++) {
   1030         readQNodes[i].succedents[j] = &qNodes[j];
   1031         qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
   1032         qNodes[j].antType[numDataNodes + i] = rf_trueData;
   1033       }
   1034     }
   1035   }
   1036 
   1037   /* connect xor nodes to commit node */
   1038   RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes));
   1039   for (i = 0; i < numParityNodes; i++) {
   1040     RF_ASSERT(xorNodes[i].numSuccedents == 1);
   1041     xorNodes[i].succedents[0] = commitNode;
   1042     commitNode->antecedents[i] = &xorNodes[i];
   1043     commitNode->antType[i] = rf_control;
   1044   }
   1045 
   1046   /* connect q nodes to commit node */
   1047   if (nfaults == 2) {
   1048     for (i = 0; i < numParityNodes; i++) {
   1049       RF_ASSERT(qNodes[i].numSuccedents == 1);
   1050       qNodes[i].succedents[0] = commitNode;
   1051       commitNode->antecedents[i + numParityNodes] = &qNodes[i];
   1052       commitNode->antType[i + numParityNodes] = rf_control;
   1053     }
   1054   }
   1055 
   1056   /* connect commit node to write nodes */
   1057   RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes)));
   1058   for (i = 0; i < numDataNodes; i++) {
   1059     RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
   1060     commitNode->succedents[i] = &writeDataNodes[i];
   1061     writeDataNodes[i].antecedents[0] = commitNode;
   1062     writeDataNodes[i].antType[0] = rf_trueData;
   1063   }
   1064   for (i = 0; i < numParityNodes; i++) {
   1065     RF_ASSERT(writeParityNodes[i].numAntecedents == 1);
   1066     commitNode->succedents[i + numDataNodes] = &writeParityNodes[i];
   1067     writeParityNodes[i].antecedents[0] = commitNode;
   1068     writeParityNodes[i].antType[0] = rf_trueData;
   1069   }
   1070   if (nfaults == 2) {
   1071     for (i = 0; i < numParityNodes; i++) {
   1072       RF_ASSERT(writeQNodes[i].numAntecedents == 1);
   1073       commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i];
   1074       writeQNodes[i].antecedents[0] = commitNode;
   1075       writeQNodes[i].antType[0] = rf_trueData;
   1076     }
   1077   }
   1078 
   1079   RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
   1080   RF_ASSERT(termNode->numSuccedents == 0);
   1081   for (i = 0; i < numDataNodes; i++) {
   1082     if (lu_flag) {
   1083       /* connect write new data nodes to unlock nodes */
   1084       RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
   1085       RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
   1086       writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
   1087       unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
   1088       unlockDataNodes[i].antType[0] = rf_control;
   1089 
   1090       /* connect unlock nodes to term node */
   1091       RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
   1092       unlockDataNodes[i].succedents[0] = termNode;
   1093       termNode->antecedents[i] = &unlockDataNodes[i];
   1094       termNode->antType[i] = rf_control;
   1095     }
   1096     else {
   1097       /* connect write new data nodes to term node */
   1098       RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
   1099       RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
   1100       writeDataNodes[i].succedents[0] = termNode;
   1101       termNode->antecedents[i] = &writeDataNodes[i];
   1102       termNode->antType[i] = rf_control;
   1103     }
   1104   }
   1105 
   1106   for (i = 0; i < numParityNodes; i++) {
   1107     if (lu_flag) {
   1108       /* connect write new parity nodes to unlock nodes */
   1109       RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
   1110       RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
   1111       writeParityNodes[i].succedents[0] = &unlockParityNodes[i];
   1112       unlockParityNodes[i].antecedents[0] = &writeParityNodes[i];
   1113       unlockParityNodes[i].antType[0] = rf_control;
   1114 
   1115       /* connect unlock nodes to term node */
   1116       RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
   1117       unlockParityNodes[i].succedents[0] = termNode;
   1118       termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i];
   1119       termNode->antType[numDataNodes + i] = rf_control;
   1120     }
   1121     else {
   1122       RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
   1123       writeParityNodes[i].succedents[0] = termNode;
   1124       termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
   1125       termNode->antType[numDataNodes + i] = rf_control;
   1126     }
   1127   }
   1128 
   1129   if (nfaults == 2) {
   1130     for (i = 0; i < numParityNodes; i++) {
   1131       if (lu_flag) {
   1132         /* connect write new Q nodes to unlock nodes */
   1133         RF_ASSERT(writeQNodes[i].numSuccedents == 1);
   1134         RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
   1135         writeQNodes[i].succedents[0] = &unlockQNodes[i];
   1136         unlockQNodes[i].antecedents[0] = &writeQNodes[i];
   1137         unlockQNodes[i].antType[0] = rf_control;
   1138 
   1139         /* connect unlock nodes to unblock node */
   1140         RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
   1141         unlockQNodes[i].succedents[0] = termNode;
   1142         termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i];
   1143         termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
   1144       }
   1145       else {
   1146         RF_ASSERT(writeQNodes[i].numSuccedents == 1);
   1147         writeQNodes[i].succedents[0] = termNode;
   1148         termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
   1149         termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
   1150       }
   1151     }
   1152   }
   1153 }
   1154 
   1155 
   1156 /******************************************************************************
   1157  * create a write graph (fault-free or degraded) for RAID level 1
   1158  *
   1159  * Hdr -> Commit -> Wpd -> Nil -> Trm
   1160  *               -> Wsd ->
   1161  *
   1162  * The "Wpd" node writes data to the primary copy in the mirror pair
   1163  * The "Wsd" node writes data to the secondary copy in the mirror pair
   1164  *
   1165  * Parameters:  raidPtr   - description of the physical array
   1166  *              asmap     - logical & physical addresses for this access
   1167  *              bp        - buffer ptr (holds write data)
   1168  *              flags     - general flags (e.g. disk locking)
   1169  *              allocList - list of memory allocated in DAG creation
   1170  *****************************************************************************/
   1171 
   1172 void rf_CreateRaidOneWriteDAG(
   1173   RF_Raid_t             *raidPtr,
   1174   RF_AccessStripeMap_t  *asmap,
   1175   RF_DagHeader_t        *dag_h,
   1176   void                  *bp,
   1177   RF_RaidAccessFlags_t   flags,
   1178   RF_AllocListElem_t    *allocList)
   1179 {
   1180   RF_DagNode_t *unblockNode, *termNode, *commitNode;
   1181   RF_DagNode_t *nodes, *wndNode, *wmirNode;
   1182   int nWndNodes, nWmirNodes, i;
   1183   RF_ReconUnitNum_t which_ru;
   1184   RF_PhysDiskAddr_t *pda, *pdaP;
   1185   RF_StripeNum_t parityStripeID;
   1186 
   1187   parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
   1188     asmap->raidAddress, &which_ru);
   1189   if (rf_dagDebug) {
   1190     printf("[Creating RAID level 1 write DAG]\n");
   1191   }
   1192   dag_h->creator = "RaidOneWriteDAG";
   1193 
   1194   /* 2 implies access not SU aligned */
   1195   nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
   1196   nWndNodes =  (asmap->physInfo->next) ? 2 : 1;
   1197 
   1198   /* alloc the Wnd nodes and the Wmir node */
   1199   if (asmap->numDataFailed == 1)
   1200     nWndNodes--;
   1201   if (asmap->numParityFailed == 1)
   1202     nWmirNodes--;
   1203 
   1204   /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock + terminator) */
   1205   RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t),
   1206     (RF_DagNode_t *), allocList);
   1207   i = 0;
   1208   wndNode     = &nodes[i]; i += nWndNodes;
   1209   wmirNode    = &nodes[i]; i += nWmirNodes;
   1210   commitNode   = &nodes[i]; i += 1;
   1211   unblockNode = &nodes[i]; i += 1;
   1212   termNode = &nodes[i]; i += 1;
   1213   RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
   1214 
   1215   /* this dag can commit immediately */
   1216   dag_h->numCommitNodes = 1;
   1217   dag_h->numCommits = 0;
   1218   dag_h->numSuccedents = 1;
   1219 
   1220   /* initialize the commit, unblock, and term nodes */
   1221   rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
   1222     NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Cmt", allocList);
   1223   rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
   1224     NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList);
   1225   rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
   1226     NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
   1227 
   1228   /* initialize the wnd nodes */
   1229   if (nWndNodes > 0) {
   1230     pda = asmap->physInfo;
   1231     for (i = 0; i < nWndNodes; i++) {
   1232       rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
   1233         rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList);
   1234       RF_ASSERT(pda != NULL);
   1235       wndNode[i].params[0].p = pda;
   1236       wndNode[i].params[1].p = pda->bufPtr;
   1237       wndNode[i].params[2].v = parityStripeID;
   1238       wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1239       pda = pda->next;
   1240     }
   1241     RF_ASSERT(pda == NULL);
   1242   }
   1243 
   1244   /* initialize the mirror nodes */
   1245   if (nWmirNodes > 0) {
   1246     pda = asmap->physInfo;
   1247     pdaP = asmap->parityInfo;
   1248     for (i = 0; i < nWmirNodes; i++) {
   1249       rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
   1250         rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList);
   1251       RF_ASSERT(pda != NULL);
   1252       wmirNode[i].params[0].p = pdaP;
   1253       wmirNode[i].params[1].p = pda->bufPtr;
   1254       wmirNode[i].params[2].v = parityStripeID;
   1255       wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1256       pda = pda->next;
   1257       pdaP = pdaP->next;
   1258     }
   1259     RF_ASSERT(pda == NULL);
   1260     RF_ASSERT(pdaP == NULL);
   1261   }
   1262 
   1263   /* link the header node to the commit node */
   1264   RF_ASSERT(dag_h->numSuccedents == 1);
   1265   RF_ASSERT(commitNode->numAntecedents == 0);
   1266   dag_h->succedents[0] = commitNode;
   1267 
   1268   /* link the commit node to the write nodes */
   1269   RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes));
   1270   for (i = 0; i < nWndNodes; i++) {
   1271     RF_ASSERT(wndNode[i].numAntecedents == 1);
   1272     commitNode->succedents[i] = &wndNode[i];
   1273     wndNode[i].antecedents[0] = commitNode;
   1274     wndNode[i].antType[0] = rf_control;
   1275   }
   1276   for (i = 0; i < nWmirNodes; i++) {
   1277     RF_ASSERT(wmirNode[i].numAntecedents == 1);
   1278     commitNode->succedents[i + nWndNodes] = &wmirNode[i];
   1279     wmirNode[i].antecedents[0] = commitNode;
   1280     wmirNode[i].antType[0] = rf_control;
   1281   }
   1282 
   1283   /* link the write nodes to the unblock node */
   1284   RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
   1285   for (i = 0; i < nWndNodes; i++) {
   1286     RF_ASSERT(wndNode[i].numSuccedents == 1);
   1287     wndNode[i].succedents[0] = unblockNode;
   1288     unblockNode->antecedents[i] = &wndNode[i];
   1289     unblockNode->antType[i] = rf_control;
   1290   }
   1291   for (i = 0; i < nWmirNodes; i++) {
   1292     RF_ASSERT(wmirNode[i].numSuccedents == 1);
   1293     wmirNode[i].succedents[0] = unblockNode;
   1294     unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
   1295     unblockNode->antType[i + nWndNodes] = rf_control;
   1296   }
   1297 
   1298   /* link the unblock node to the term node */
   1299   RF_ASSERT(unblockNode->numSuccedents == 1);
   1300   RF_ASSERT(termNode->numAntecedents == 1);
   1301   RF_ASSERT(termNode->numSuccedents == 0);
   1302   unblockNode->succedents[0] = termNode;
   1303   termNode->antecedents[0] = unblockNode;
   1304   termNode->antType[0] = rf_control;
   1305 }
   1306 
   1307 
   1308 
   1309 /* DAGs which have no commit points.
   1310  *
   1311  * The following DAGs are used in forward and backward error recovery experiments.
   1312  * They are identical to the DAGs above this comment with the exception that the
   1313  * the commit points have been removed.
   1314  */
   1315 
   1316 
   1317 
   1318 void rf_CommonCreateLargeWriteDAGFwd(
   1319   RF_Raid_t             *raidPtr,
   1320   RF_AccessStripeMap_t  *asmap,
   1321   RF_DagHeader_t        *dag_h,
   1322   void                  *bp,
   1323   RF_RaidAccessFlags_t   flags,
   1324   RF_AllocListElem_t    *allocList,
   1325   int                    nfaults,
   1326   int                  (*redFunc)(RF_DagNode_t *),
   1327   int                    allowBufferRecycle)
   1328 {
   1329   RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
   1330   RF_DagNode_t *wnqNode, *blockNode, *syncNode, *termNode;
   1331   int nWndNodes, nRodNodes, i, nodeNum, asmNum;
   1332   RF_AccessStripeMapHeader_t *new_asm_h[2];
   1333   RF_StripeNum_t parityStripeID;
   1334   char *sosBuffer, *eosBuffer;
   1335   RF_ReconUnitNum_t which_ru;
   1336   RF_RaidLayout_t *layoutPtr;
   1337   RF_PhysDiskAddr_t *pda;
   1338 
   1339   layoutPtr = &(raidPtr->Layout);
   1340   parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
   1341 
   1342   if (rf_dagDebug)
   1343     printf("[Creating large-write DAG]\n");
   1344   dag_h->creator = "LargeWriteDAGFwd";
   1345 
   1346   dag_h->numCommitNodes = 0;
   1347   dag_h->numCommits = 0;
   1348   dag_h->numSuccedents = 1;
   1349 
   1350   /* alloc the nodes: Wnd, xor, commit, block, term, and  Wnp */
   1351   nWndNodes = asmap->numStripeUnitsAccessed;
   1352   RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
   1353   i = 0;
   1354   wndNodes    = &nodes[i]; i += nWndNodes;
   1355   xorNode     = &nodes[i]; i += 1;
   1356   wnpNode     = &nodes[i]; i += 1;
   1357   blockNode   = &nodes[i]; i += 1;
   1358   syncNode  = &nodes[i]; i += 1;
   1359   termNode    = &nodes[i]; i += 1;
   1360   if (nfaults == 2) {
   1361     wnqNode   = &nodes[i]; i += 1;
   1362   }
   1363   else {
   1364     wnqNode = NULL;
   1365   }
   1366   rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
   1367   if (nRodNodes > 0) {
   1368     RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
   1369   }
   1370   else {
   1371     rodNodes = NULL;
   1372   }
   1373 
   1374   /* begin node initialization */
   1375   if (nRodNodes > 0) {
   1376     rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList);
   1377     rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes, 0, 0, dag_h, "Nil", allocList);
   1378   }
   1379   else {
   1380     rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil", allocList);
   1381     rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, 1, 0, 0, dag_h, "Nil", allocList);
   1382   }
   1383 
   1384   rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList);
   1385 
   1386   /* initialize the Rod nodes */
   1387   for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
   1388     if (new_asm_h[asmNum]) {
   1389       pda = new_asm_h[asmNum]->stripeMap->physInfo;
   1390       while (pda) {
   1391 	rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Rod", allocList);
   1392 	rodNodes[nodeNum].params[0].p = pda;
   1393 	rodNodes[nodeNum].params[1].p = pda->bufPtr;
   1394 	rodNodes[nodeNum].params[2].v = parityStripeID;
   1395 	rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1396 	nodeNum++;
   1397 	pda=pda->next;
   1398       }
   1399     }
   1400   }
   1401   RF_ASSERT(nodeNum == nRodNodes);
   1402 
   1403   /* initialize the wnd nodes */
   1404   pda = asmap->physInfo;
   1405   for (i=0; i < nWndNodes; i++) {
   1406     rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
   1407     RF_ASSERT(pda != NULL);
   1408     wndNodes[i].params[0].p = pda;
   1409     wndNodes[i].params[1].p = pda->bufPtr;
   1410     wndNodes[i].params[2].v = parityStripeID;
   1411     wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1412     pda = pda->next;
   1413   }
   1414 
   1415   /* initialize the redundancy node */
   1416   rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, nfaults, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);
   1417   xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
   1418   for (i=0; i < nWndNodes; i++) {
   1419     xorNode->params[2*i+0] = wndNodes[i].params[0];         /* pda */
   1420     xorNode->params[2*i+1] = wndNodes[i].params[1];         /* buf ptr */
   1421   }
   1422   for (i=0; i < nRodNodes; i++) {
   1423     xorNode->params[2*(nWndNodes+i)+0] = rodNodes[i].params[0];         /* pda */
   1424     xorNode->params[2*(nWndNodes+i)+1] = rodNodes[i].params[1];         /* buf ptr */
   1425   }
   1426   xorNode->params[2*(nWndNodes+nRodNodes)].p = raidPtr; /* xor node needs to get at RAID information */
   1427 
   1428   /* look for an Rod node that reads a complete SU.  If none, alloc a buffer to receive the parity info.
   1429    * Note that we can't use a new data buffer because it will not have gotten written when the xor occurs.
   1430    */
   1431   if (allowBufferRecycle) {
   1432     for (i = 0; i < nRodNodes; i++)
   1433       if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
   1434         break;
   1435   }
   1436   if ((!allowBufferRecycle) || (i == nRodNodes)) {
   1437     RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
   1438   }
   1439   else
   1440     xorNode->results[0] = rodNodes[i].params[1].p;
   1441 
   1442   /* initialize the Wnp node */
   1443   rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);
   1444   wnpNode->params[0].p = asmap->parityInfo;
   1445   wnpNode->params[1].p = xorNode->results[0];
   1446   wnpNode->params[2].v = parityStripeID;
   1447   wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1448   RF_ASSERT(asmap->parityInfo->next == NULL);        /* parityInfo must describe entire parity unit */
   1449 
   1450   if (nfaults == 2)
   1451     {
   1452       /* we never try to recycle a buffer for the Q calcuation in addition to the parity.
   1453 	 This would cause two buffers to get smashed during the P and Q calculation,
   1454 	 guaranteeing one would be wrong.
   1455       */
   1456       RF_CallocAndAdd(xorNode->results[1], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
   1457       rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);
   1458       wnqNode->params[0].p = asmap->qInfo;
   1459       wnqNode->params[1].p = xorNode->results[1];
   1460       wnqNode->params[2].v = parityStripeID;
   1461       wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1462       RF_ASSERT(asmap->parityInfo->next == NULL);        /* parityInfo must describe entire parity unit */
   1463     }
   1464 
   1465 
   1466   /* connect nodes to form graph */
   1467 
   1468   /* connect dag header to block node */
   1469   RF_ASSERT(blockNode->numAntecedents == 0);
   1470   dag_h->succedents[0] = blockNode;
   1471 
   1472   if (nRodNodes > 0) {
   1473     /* connect the block node to the Rod nodes */
   1474     RF_ASSERT(blockNode->numSuccedents == nRodNodes);
   1475     RF_ASSERT(syncNode->numAntecedents == nRodNodes);
   1476     for (i = 0; i < nRodNodes; i++) {
   1477       RF_ASSERT(rodNodes[i].numAntecedents == 1);
   1478       blockNode->succedents[i] = &rodNodes[i];
   1479       rodNodes[i].antecedents[0] = blockNode;
   1480       rodNodes[i].antType[0] = rf_control;
   1481 
   1482       /* connect the Rod nodes to the Nil node */
   1483       RF_ASSERT(rodNodes[i].numSuccedents == 1);
   1484       rodNodes[i].succedents[0] = syncNode;
   1485       syncNode->antecedents[i] = &rodNodes[i];
   1486       syncNode->antType[i] = rf_trueData;
   1487     }
   1488   }
   1489   else {
   1490     /* connect the block node to the Nil node */
   1491     RF_ASSERT(blockNode->numSuccedents == 1);
   1492     RF_ASSERT(syncNode->numAntecedents == 1);
   1493     blockNode->succedents[0] = syncNode;
   1494     syncNode->antecedents[0] = blockNode;
   1495     syncNode->antType[0] = rf_control;
   1496   }
   1497 
   1498   /* connect the sync node to the Wnd nodes */
   1499   RF_ASSERT(syncNode->numSuccedents == (1 + nWndNodes));
   1500   for (i = 0; i < nWndNodes; i++) {
   1501     RF_ASSERT(wndNodes->numAntecedents == 1);
   1502     syncNode->succedents[i] = &wndNodes[i];
   1503     wndNodes[i].antecedents[0] = syncNode;
   1504     wndNodes[i].antType[0] = rf_control;
   1505   }
   1506 
   1507   /* connect the sync node to the Xor node */
   1508   RF_ASSERT(xorNode->numAntecedents == 1);
   1509   syncNode->succedents[nWndNodes] = xorNode;
   1510   xorNode->antecedents[0] = syncNode;
   1511   xorNode->antType[0] = rf_control;
   1512 
   1513   /* connect the xor node to the write parity node */
   1514   RF_ASSERT(xorNode->numSuccedents == nfaults);
   1515   RF_ASSERT(wnpNode->numAntecedents == 1);
   1516   xorNode->succedents[0] = wnpNode;
   1517   wnpNode->antecedents[0]= xorNode;
   1518   wnpNode->antType[0] = rf_trueData;
   1519   if (nfaults == 2) {
   1520     RF_ASSERT(wnqNode->numAntecedents == 1);
   1521     xorNode->succedents[1] = wnqNode;
   1522     wnqNode->antecedents[0] = xorNode;
   1523     wnqNode->antType[0] = rf_trueData;
   1524   }
   1525 
   1526   /* connect the write nodes to the term node */
   1527   RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
   1528   RF_ASSERT(termNode->numSuccedents == 0);
   1529   for (i = 0; i < nWndNodes; i++) {
   1530     RF_ASSERT(wndNodes->numSuccedents == 1);
   1531     wndNodes[i].succedents[0] = termNode;
   1532     termNode->antecedents[i] = &wndNodes[i];
   1533     termNode->antType[i] = rf_control;
   1534   }
   1535   RF_ASSERT(wnpNode->numSuccedents == 1);
   1536   wnpNode->succedents[0] = termNode;
   1537   termNode->antecedents[nWndNodes] = wnpNode;
   1538   termNode->antType[nWndNodes] = rf_control;
   1539   if (nfaults == 2) {
   1540     RF_ASSERT(wnqNode->numSuccedents == 1);
   1541     wnqNode->succedents[0] = termNode;
   1542     termNode->antecedents[nWndNodes + 1] = wnqNode;
   1543     termNode->antType[nWndNodes + 1] = rf_control;
   1544   }
   1545 }
   1546 
   1547 
   1548 /******************************************************************************
   1549  *
   1550  * creates a DAG to perform a small-write operation (either raid 5 or pq),
   1551  * which is as follows:
   1552  *
   1553  * Hdr -> Nil -> Rop - Xor - Wnp [Unp] -- Trm
   1554  *            \- Rod X- Wnd [Und] -------/
   1555  *           [\- Rod X- Wnd [Und] ------/]
   1556  *           [\- Roq - Q --> Wnq [Unq]-/]
   1557  *
   1558  * Rop = read old parity
   1559  * Rod = read old data
   1560  * Roq = read old "q"
   1561  * Cmt = commit node
   1562  * Und = unlock data disk
   1563  * Unp = unlock parity disk
   1564  * Unq = unlock q disk
   1565  * Wnp = write new parity
   1566  * Wnd = write new data
   1567  * Wnq = write new "q"
   1568  * [ ] denotes optional segments in the graph
   1569  *
   1570  * Parameters:  raidPtr   - description of the physical array
   1571  *              asmap     - logical & physical addresses for this access
   1572  *              bp        - buffer ptr (holds write data)
   1573  *              flags     - general flags (e.g. disk locking)
   1574  *              allocList - list of memory allocated in DAG creation
   1575  *              pfuncs    - list of parity generating functions
   1576  *              qfuncs    - list of q generating functions
   1577  *
   1578  * A null qfuncs indicates single fault tolerant
   1579  *****************************************************************************/
   1580 
   1581 void rf_CommonCreateSmallWriteDAGFwd(
   1582   RF_Raid_t             *raidPtr,
   1583   RF_AccessStripeMap_t  *asmap,
   1584   RF_DagHeader_t        *dag_h,
   1585   void                  *bp,
   1586   RF_RaidAccessFlags_t   flags,
   1587   RF_AllocListElem_t    *allocList,
   1588   RF_RedFuncs_t         *pfuncs,
   1589   RF_RedFuncs_t         *qfuncs)
   1590 {
   1591   RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
   1592   RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
   1593   RF_DagNode_t *xorNodes, *qNodes, *blockNode, *nodes;
   1594   RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
   1595   int i, j, nNodes, totalNumNodes, lu_flag;
   1596   RF_ReconUnitNum_t which_ru;
   1597   int (*func)(RF_DagNode_t *), (*undoFunc)(RF_DagNode_t *);
   1598   int (*qfunc)(RF_DagNode_t *);
   1599   int numDataNodes, numParityNodes;
   1600   RF_StripeNum_t parityStripeID;
   1601   RF_PhysDiskAddr_t *pda;
   1602   char *name, *qname;
   1603   long nfaults;
   1604 
   1605   nfaults = qfuncs ? 2 : 1;
   1606   lu_flag = (rf_enableAtomicRMW) ? 1 : 0;          /* lock/unlock flag */
   1607 
   1608   parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
   1609   pda = asmap->physInfo;
   1610   numDataNodes = asmap->numStripeUnitsAccessed;
   1611   numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
   1612 
   1613   if (rf_dagDebug) printf("[Creating small-write DAG]\n");
   1614   RF_ASSERT(numDataNodes > 0);
   1615   dag_h->creator = "SmallWriteDAGFwd";
   1616 
   1617   dag_h->numCommitNodes = 0;
   1618   dag_h->numCommits = 0;
   1619   dag_h->numSuccedents = 1;
   1620 
   1621   qfunc = NULL;
   1622   qname = NULL;
   1623 
   1624   /* DAG creation occurs in four steps:
   1625      1. count the number of nodes in the DAG
   1626      2. create the nodes
   1627      3. initialize the nodes
   1628      4. connect the nodes
   1629    */
   1630 
   1631   /* Step 1. compute number of nodes in the graph */
   1632 
   1633   /* number of nodes:
   1634       a read and write for each data unit
   1635       a redundancy computation node for each parity node (nfaults * nparity)
   1636       a read and write for each parity unit
   1637       a block node
   1638       a terminate node
   1639       if atomic RMW
   1640         an unlock node for each data unit, redundancy unit
   1641   */
   1642   totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) + (nfaults * 2 * numParityNodes) + 2;
   1643   if (lu_flag)
   1644     totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
   1645 
   1646 
   1647   /* Step 2. create the nodes */
   1648   RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
   1649   i = 0;
   1650   blockNode        = &nodes[i]; i += 1;
   1651   readDataNodes    = &nodes[i]; i += numDataNodes;
   1652   readParityNodes  = &nodes[i]; i += numParityNodes;
   1653   writeDataNodes   = &nodes[i]; i += numDataNodes;
   1654   writeParityNodes = &nodes[i]; i += numParityNodes;
   1655   xorNodes         = &nodes[i]; i += numParityNodes;
   1656   termNode         = &nodes[i]; i += 1;
   1657   if (lu_flag) {
   1658     unlockDataNodes   = &nodes[i]; i += numDataNodes;
   1659     unlockParityNodes = &nodes[i]; i += numParityNodes;
   1660   }
   1661   else {
   1662     unlockDataNodes = unlockParityNodes = NULL;
   1663   }
   1664   if (nfaults == 2) {
   1665     readQNodes     = &nodes[i]; i += numParityNodes;
   1666     writeQNodes    = &nodes[i]; i += numParityNodes;
   1667     qNodes         = &nodes[i]; i += numParityNodes;
   1668     if (lu_flag) {
   1669       unlockQNodes    = &nodes[i]; i += numParityNodes;
   1670     }
   1671     else {
   1672       unlockQNodes = NULL;
   1673     }
   1674   }
   1675   else {
   1676     readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
   1677   }
   1678   RF_ASSERT(i == totalNumNodes);
   1679 
   1680   /* Step 3. initialize the nodes */
   1681   /* initialize block node (Nil) */
   1682   nNodes     = numDataNodes + (nfaults * numParityNodes);
   1683   rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
   1684 
   1685   /* initialize terminate node (Trm) */
   1686   rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);
   1687 
   1688   /* initialize nodes which read old data (Rod) */
   1689   for (i = 0; i < numDataNodes; i++) {
   1690     rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, (numParityNodes * nfaults) + 1, 1, 4, 0, dag_h, "Rod", allocList);
   1691     RF_ASSERT(pda != NULL);
   1692     readDataNodes[i].params[0].p = pda;  /* physical disk addr desc */
   1693     readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList);  /* buffer to hold old data */
   1694     readDataNodes[i].params[2].v = parityStripeID;
   1695     readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
   1696     pda=pda->next;
   1697     for (j = 0; j < readDataNodes[i].numSuccedents; j++)
   1698       readDataNodes[i].propList[j] = NULL;
   1699   }
   1700 
   1701   /* initialize nodes which read old parity (Rop) */
   1702   pda = asmap->parityInfo; i = 0;
   1703   for (i = 0; i < numParityNodes; i++) {
   1704     RF_ASSERT(pda != NULL);
   1705     rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Rop", allocList);
   1706     readParityNodes[i].params[0].p = pda;
   1707     readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList);    /* buffer to hold old parity */
   1708     readParityNodes[i].params[2].v = parityStripeID;
   1709     readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
   1710     for (j = 0; j < readParityNodes[i].numSuccedents; j++)
   1711       readParityNodes[i].propList[0] = NULL;
   1712     pda=pda->next;
   1713   }
   1714 
   1715   /* initialize nodes which read old Q (Roq) */
   1716   if (nfaults == 2)
   1717     {
   1718       pda = asmap->qInfo;
   1719       for (i = 0; i < numParityNodes; i++) {
   1720 	RF_ASSERT(pda != NULL);
   1721 	rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);
   1722 	readQNodes[i].params[0].p = pda;
   1723 	readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old Q */
   1724 	readQNodes[i].params[2].v = parityStripeID;
   1725 	readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
   1726 	for (j = 0; j < readQNodes[i].numSuccedents; j++)
   1727 	  readQNodes[i].propList[0] = NULL;
   1728 	pda=pda->next;
   1729       }
   1730     }
   1731 
   1732   /* initialize nodes which write new data (Wnd) */
   1733   pda = asmap->physInfo;
   1734   for (i=0; i < numDataNodes; i++) {
   1735     RF_ASSERT(pda != NULL);
   1736     rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
   1737     writeDataNodes[i].params[0].p = pda;                    /* physical disk addr desc */
   1738     writeDataNodes[i].params[1].p = pda->bufPtr;   /* buffer holding new data to be written */
   1739     writeDataNodes[i].params[2].v = parityStripeID;
   1740     writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1741 
   1742     if (lu_flag) {
   1743       /* initialize node to unlock the disk queue */
   1744       rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Und", allocList);
   1745       unlockDataNodes[i].params[0].p = pda;                    /* physical disk addr desc */
   1746       unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
   1747     }
   1748 
   1749     pda = pda->next;
   1750   }
   1751 
   1752 
   1753   /* initialize nodes which compute new parity and Q */
   1754   /* we use the simple XOR func in the double-XOR case, and when we're accessing only a portion of one stripe unit.
   1755    * the distinction between the two is that the regular XOR func assumes that the targbuf is a full SU in size,
   1756    * and examines the pda associated with the buffer to decide where within the buffer to XOR the data, whereas
   1757    * the simple XOR func just XORs the data into the start of the buffer.
   1758    */
   1759   if ((numParityNodes==2) || ((numDataNodes == 1) && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
   1760     func = pfuncs->simple; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->SimpleName;
   1761     if (qfuncs) {
   1762       qfunc = qfuncs->simple;
   1763       qname = qfuncs->SimpleName;
   1764     }
   1765   }
   1766   else {
   1767     func = pfuncs->regular; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->RegularName;
   1768     if (qfuncs) { qfunc = qfuncs->regular; qname = qfuncs->RegularName;}
   1769   }
   1770   /* initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop} nodes, and raidPtr  */
   1771   if (numParityNodes==2) {        /* double-xor case */
   1772     for (i=0; i < numParityNodes; i++) {
   1773       rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, name, allocList);  /* no wakeup func for xor */
   1774       xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
   1775       xorNodes[i].params[0]   = readDataNodes[i].params[0];
   1776       xorNodes[i].params[1]   = readDataNodes[i].params[1];
   1777       xorNodes[i].params[2]   = readParityNodes[i].params[0];
   1778       xorNodes[i].params[3]   = readParityNodes[i].params[1];
   1779       xorNodes[i].params[4]   = writeDataNodes[i].params[0];
   1780       xorNodes[i].params[5]   = writeDataNodes[i].params[1];
   1781       xorNodes[i].params[6].p = raidPtr;
   1782       xorNodes[i].results[0] = readParityNodes[i].params[1].p;   /* use old parity buf as target buf */
   1783       if (nfaults==2)
   1784 	{
   1785 	  rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, qname, allocList);  /* no wakeup func for xor */
   1786 	  qNodes[i].params[0]   = readDataNodes[i].params[0];
   1787 	  qNodes[i].params[1]   = readDataNodes[i].params[1];
   1788 	  qNodes[i].params[2]   = readQNodes[i].params[0];
   1789 	  qNodes[i].params[3]   = readQNodes[i].params[1];
   1790 	  qNodes[i].params[4]   = writeDataNodes[i].params[0];
   1791 	  qNodes[i].params[5]   = writeDataNodes[i].params[1];
   1792 	  qNodes[i].params[6].p = raidPtr;
   1793 	  qNodes[i].results[0] = readQNodes[i].params[1].p;   /* use old Q buf as target buf */
   1794 	}
   1795     }
   1796   }
   1797   else {
   1798     /* there is only one xor node in this case */
   1799     rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
   1800     xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
   1801     for (i=0; i < numDataNodes + 1; i++) {
   1802       /* set up params related to Rod and Rop nodes */
   1803       xorNodes[0].params[2*i+0] = readDataNodes[i].params[0];    /* pda */
   1804       xorNodes[0].params[2*i+1] = readDataNodes[i].params[1];    /* buffer pointer */
   1805     }
   1806     for (i=0; i < numDataNodes; i++) {
   1807       /* set up params related to Wnd and Wnp nodes */
   1808       xorNodes[0].params[2*(numDataNodes+1+i)+0] = writeDataNodes[i].params[0]; /* pda */
   1809       xorNodes[0].params[2*(numDataNodes+1+i)+1] = writeDataNodes[i].params[1]; /* buffer pointer */
   1810     }
   1811     xorNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr;  /* xor node needs to get at RAID information */
   1812     xorNodes[0].results[0] = readParityNodes[0].params[1].p;
   1813     if (nfaults==2)
   1814       {
   1815 	rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, qname, allocList);
   1816 	for (i=0; i<numDataNodes; i++) {
   1817 	  /* set up params related to Rod */
   1818 	  qNodes[0].params[2*i+0] = readDataNodes[i].params[0];    /* pda */
   1819 	  qNodes[0].params[2*i+1] = readDataNodes[i].params[1];    /* buffer pointer */
   1820 	}
   1821 	/* and read old q */
   1822 	qNodes[0].params[2*numDataNodes + 0] = readQNodes[0].params[0];    /* pda */
   1823 	qNodes[0].params[2*numDataNodes + 1] = readQNodes[0].params[1];    /* buffer pointer */
   1824 	for (i=0; i < numDataNodes; i++) {
   1825 	  /* set up params related to Wnd nodes */
   1826 	  qNodes[0].params[2*(numDataNodes+1+i)+0] = writeDataNodes[i].params[0]; /* pda */
   1827 	  qNodes[0].params[2*(numDataNodes+1+i)+1] = writeDataNodes[i].params[1]; /* buffer pointer */
   1828 	}
   1829 	qNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr;  /* xor node needs to get at RAID information */
   1830 	qNodes[0].results[0] = readQNodes[0].params[1].p;
   1831       }
   1832   }
   1833 
   1834   /* initialize nodes which write new parity (Wnp) */
   1835   pda = asmap->parityInfo;
   1836   for (i=0;  i < numParityNodes; i++) {
   1837     rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnp", allocList);
   1838     RF_ASSERT(pda != NULL);
   1839     writeParityNodes[i].params[0].p = pda;                  /* param 1 (bufPtr) filled in by xor node */
   1840     writeParityNodes[i].params[1].p = xorNodes[i].results[0];     /* buffer pointer for parity write operation */
   1841     writeParityNodes[i].params[2].v = parityStripeID;
   1842     writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1843 
   1844     if (lu_flag) {
   1845       /* initialize node to unlock the disk queue */
   1846       rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unp", allocList);
   1847       unlockParityNodes[i].params[0].p = pda;                    /* physical disk addr desc */
   1848       unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
   1849     }
   1850 
   1851     pda = pda->next;
   1852   }
   1853 
   1854   /* initialize nodes which write new Q (Wnq) */
   1855   if (nfaults == 2)
   1856     {
   1857       pda = asmap->qInfo;
   1858       for (i=0;  i < numParityNodes; i++) {
   1859 	rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnq", allocList);
   1860 	RF_ASSERT(pda != NULL);
   1861 	writeQNodes[i].params[0].p = pda;                  /* param 1 (bufPtr) filled in by xor node */
   1862 	writeQNodes[i].params[1].p = qNodes[i].results[0];     /* buffer pointer for parity write operation */
   1863 	writeQNodes[i].params[2].v = parityStripeID;
   1864 	writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1865 
   1866 	if (lu_flag) {
   1867 	  /* initialize node to unlock the disk queue */
   1868 	  rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unq", allocList);
   1869 	  unlockQNodes[i].params[0].p = pda;                    /* physical disk addr desc */
   1870 	  unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
   1871 	}
   1872 
   1873 	pda = pda->next;
   1874       }
   1875     }
   1876 
   1877   /* Step 4. connect the nodes */
   1878 
   1879   /* connect header to block node */
   1880   dag_h->succedents[0] = blockNode;
   1881 
   1882   /* connect block node to read old data nodes */
   1883   RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
   1884   for (i = 0; i < numDataNodes; i++) {
   1885     blockNode->succedents[i] = &readDataNodes[i];
   1886     RF_ASSERT(readDataNodes[i].numAntecedents == 1);
   1887     readDataNodes[i].antecedents[0]= blockNode;
   1888     readDataNodes[i].antType[0] = rf_control;
   1889   }
   1890 
   1891   /* connect block node to read old parity nodes */
   1892   for (i = 0; i < numParityNodes; i++) {
   1893     blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
   1894     RF_ASSERT(readParityNodes[i].numAntecedents == 1);
   1895     readParityNodes[i].antecedents[0] = blockNode;
   1896     readParityNodes[i].antType[0] = rf_control;
   1897   }
   1898 
   1899   /* connect block node to read old Q nodes */
   1900   if (nfaults == 2)
   1901     for (i = 0; i < numParityNodes; i++) {
   1902       blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
   1903       RF_ASSERT(readQNodes[i].numAntecedents == 1);
   1904       readQNodes[i].antecedents[0] = blockNode;
   1905       readQNodes[i].antType[0] = rf_control;
   1906     }
   1907 
   1908   /* connect read old data nodes to write new data nodes */
   1909   for (i = 0; i < numDataNodes; i++) {
   1910     RF_ASSERT(readDataNodes[i].numSuccedents == ((nfaults * numParityNodes) + 1));
   1911     RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
   1912     readDataNodes[i].succedents[0] = &writeDataNodes[i];
   1913     writeDataNodes[i].antecedents[0] = &readDataNodes[i];
   1914     writeDataNodes[i].antType[0] = rf_antiData;
   1915   }
   1916 
   1917   /* connect read old data nodes to xor nodes */
   1918   for (i = 0; i < numDataNodes; i++) {
   1919     for (j = 0; j < numParityNodes; j++){
   1920       RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
   1921       readDataNodes[i].succedents[1 + j] = &xorNodes[j];
   1922       xorNodes[j].antecedents[i] = &readDataNodes[i];
   1923       xorNodes[j].antType[i] = rf_trueData;
   1924     }
   1925   }
   1926 
   1927   /* connect read old data nodes to q nodes */
   1928   if (nfaults == 2)
   1929     for (i = 0; i < numDataNodes; i++)
   1930       for (j = 0; j < numParityNodes; j++){
   1931 	RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
   1932 	readDataNodes[i].succedents[1 + numParityNodes + j] = &qNodes[j];
   1933 	qNodes[j].antecedents[i] = &readDataNodes[i];
   1934 	qNodes[j].antType[i] = rf_trueData;
   1935       }
   1936 
   1937   /* connect read old parity nodes to xor nodes */
   1938   for (i = 0; i < numParityNodes; i++) {
   1939     for (j = 0; j < numParityNodes; j++) {
   1940       RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
   1941       readParityNodes[i].succedents[j] = &xorNodes[j];
   1942       xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
   1943       xorNodes[j].antType[numDataNodes + i] = rf_trueData;
   1944     }
   1945   }
   1946 
   1947   /* connect read old q nodes to q nodes */
   1948   if (nfaults == 2)
   1949     for (i = 0; i < numParityNodes; i++) {
   1950       for (j = 0; j < numParityNodes; j++) {
   1951 	RF_ASSERT(readQNodes[i].numSuccedents == numParityNodes);
   1952 	readQNodes[i].succedents[j] = &qNodes[j];
   1953 	qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
   1954 	qNodes[j].antType[numDataNodes + i] = rf_trueData;
   1955       }
   1956     }
   1957 
   1958   /* connect xor nodes to the write new parity nodes */
   1959   for (i = 0; i < numParityNodes; i++) {
   1960     RF_ASSERT(writeParityNodes[i].numAntecedents == numParityNodes);
   1961     for (j = 0; j < numParityNodes; j++) {
   1962       RF_ASSERT(xorNodes[j].numSuccedents == numParityNodes);
   1963       xorNodes[i].succedents[j] = &writeParityNodes[j];
   1964       writeParityNodes[j].antecedents[i] = &xorNodes[i];
   1965       writeParityNodes[j].antType[i] = rf_trueData;
   1966     }
   1967   }
   1968 
   1969   /* connect q nodes to the write new q nodes */
   1970   if (nfaults == 2)
   1971     for (i = 0; i < numParityNodes; i++) {
   1972       RF_ASSERT(writeQNodes[i].numAntecedents == numParityNodes);
   1973       for (j = 0; j < numParityNodes; j++) {
   1974 	RF_ASSERT(qNodes[j].numSuccedents == 1);
   1975 	qNodes[i].succedents[j] = &writeQNodes[j];
   1976 	writeQNodes[j].antecedents[i] = &qNodes[i];
   1977 	writeQNodes[j].antType[i] = rf_trueData;
   1978       }
   1979     }
   1980 
   1981   RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
   1982   RF_ASSERT(termNode->numSuccedents == 0);
   1983   for (i = 0; i < numDataNodes; i++) {
   1984     if (lu_flag) {
   1985       /* connect write new data nodes to unlock nodes */
   1986       RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
   1987       RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
   1988       writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
   1989       unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
   1990       unlockDataNodes[i].antType[0] = rf_control;
   1991 
   1992       /* connect unlock nodes to term node */
   1993       RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
   1994       unlockDataNodes[i].succedents[0] = termNode;
   1995       termNode->antecedents[i] = &unlockDataNodes[i];
   1996       termNode->antType[i] = rf_control;
   1997     }
   1998     else {
   1999       /* connect write new data nodes to term node */
   2000       RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
   2001       RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
   2002       writeDataNodes[i].succedents[0] = termNode;
   2003       termNode->antecedents[i] = &writeDataNodes[i];
   2004       termNode->antType[i] = rf_control;
   2005     }
   2006   }
   2007 
   2008   for (i = 0; i < numParityNodes; i++) {
   2009     if (lu_flag) {
   2010       /* connect write new parity nodes to unlock nodes */
   2011       RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
   2012       RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
   2013       writeParityNodes[i].succedents[0] = &unlockParityNodes[i];
   2014       unlockParityNodes[i].antecedents[0] = &writeParityNodes[i];
   2015       unlockParityNodes[i].antType[0] = rf_control;
   2016 
   2017       /* connect unlock nodes to term node */
   2018       RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
   2019       unlockParityNodes[i].succedents[0] = termNode;
   2020       termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i];
   2021       termNode->antType[numDataNodes + i] = rf_control;
   2022     }
   2023     else {
   2024       RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
   2025       writeParityNodes[i].succedents[0] = termNode;
   2026       termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
   2027       termNode->antType[numDataNodes + i] = rf_control;
   2028     }
   2029   }
   2030 
   2031   if (nfaults == 2)
   2032     for (i = 0; i < numParityNodes; i++) {
   2033       if (lu_flag) {
   2034 	/* connect write new Q nodes to unlock nodes */
   2035 	RF_ASSERT(writeQNodes[i].numSuccedents == 1);
   2036 	RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
   2037 	writeQNodes[i].succedents[0] = &unlockQNodes[i];
   2038 	unlockQNodes[i].antecedents[0] = &writeQNodes[i];
   2039 	unlockQNodes[i].antType[0] = rf_control;
   2040 
   2041 	/* connect unlock nodes to unblock node */
   2042 	RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
   2043 	unlockQNodes[i].succedents[0] = termNode;
   2044 	termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i];
   2045 	termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
   2046       }
   2047       else {
   2048 	RF_ASSERT(writeQNodes[i].numSuccedents == 1);
   2049 	writeQNodes[i].succedents[0] = termNode;
   2050 	termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
   2051 	termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
   2052       }
   2053     }
   2054 }
   2055 
   2056 
   2057 
   2058 /******************************************************************************
   2059  * create a write graph (fault-free or degraded) for RAID level 1
   2060  *
   2061  * Hdr  Nil -> Wpd -> Nil -> Trm
   2062  *      Nil -> Wsd ->
   2063  *
   2064  * The "Wpd" node writes data to the primary copy in the mirror pair
   2065  * The "Wsd" node writes data to the secondary copy in the mirror pair
   2066  *
   2067  * Parameters:  raidPtr   - description of the physical array
   2068  *              asmap     - logical & physical addresses for this access
   2069  *              bp        - buffer ptr (holds write data)
   2070  *              flags     - general flags (e.g. disk locking)
   2071  *              allocList - list of memory allocated in DAG creation
   2072  *****************************************************************************/
   2073 
   2074 void rf_CreateRaidOneWriteDAGFwd(
   2075   RF_Raid_t             *raidPtr,
   2076   RF_AccessStripeMap_t  *asmap,
   2077   RF_DagHeader_t        *dag_h,
   2078   void                  *bp,
   2079   RF_RaidAccessFlags_t   flags,
   2080   RF_AllocListElem_t    *allocList)
   2081 {
   2082   RF_DagNode_t *blockNode, *unblockNode, *termNode;
   2083   RF_DagNode_t *nodes, *wndNode, *wmirNode;
   2084   int nWndNodes, nWmirNodes, i;
   2085   RF_ReconUnitNum_t which_ru;
   2086   RF_PhysDiskAddr_t *pda, *pdaP;
   2087   RF_StripeNum_t parityStripeID;
   2088 
   2089   parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
   2090     asmap->raidAddress, &which_ru);
   2091   if (rf_dagDebug) {
   2092     printf("[Creating RAID level 1 write DAG]\n");
   2093   }
   2094 
   2095   nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;  /* 2 implies access not SU aligned */
   2096   nWndNodes =  (asmap->physInfo->next) ? 2 : 1;
   2097 
   2098   /* alloc the Wnd nodes and the Wmir node */
   2099   if (asmap->numDataFailed == 1)
   2100     nWndNodes--;
   2101   if (asmap->numParityFailed == 1)
   2102     nWmirNodes--;
   2103 
   2104   /* total number of nodes = nWndNodes + nWmirNodes + (block + unblock + terminator) */
   2105   RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
   2106   i = 0;
   2107   wndNode     = &nodes[i]; i += nWndNodes;
   2108   wmirNode    = &nodes[i]; i += nWmirNodes;
   2109   blockNode   = &nodes[i]; i += 1;
   2110   unblockNode = &nodes[i]; i += 1;
   2111   termNode = &nodes[i]; i += 1;
   2112   RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
   2113 
   2114   /* this dag can commit immediately */
   2115   dag_h->numCommitNodes = 0;
   2116   dag_h->numCommits = 0;
   2117   dag_h->numSuccedents = 1;
   2118 
   2119   /* initialize the unblock and term nodes */
   2120   rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Nil", allocList);
   2121   rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList);
   2122   rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
   2123 
   2124   /* initialize the wnd nodes */
   2125   if (nWndNodes > 0) {
   2126     pda = asmap->physInfo;
   2127     for (i = 0; i < nWndNodes; i++) {
   2128       rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList);
   2129       RF_ASSERT(pda != NULL);
   2130       wndNode[i].params[0].p = pda;
   2131       wndNode[i].params[1].p = pda->bufPtr;
   2132       wndNode[i].params[2].v = parityStripeID;
   2133       wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   2134       pda = pda->next;
   2135     }
   2136     RF_ASSERT(pda == NULL);
   2137   }
   2138 
   2139   /* initialize the mirror nodes */
   2140   if (nWmirNodes > 0) {
   2141     pda = asmap->physInfo;
   2142     pdaP = asmap->parityInfo;
   2143     for (i = 0; i < nWmirNodes; i++) {
   2144       rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList);
   2145       RF_ASSERT(pda != NULL);
   2146       wmirNode[i].params[0].p = pdaP;
   2147       wmirNode[i].params[1].p = pda->bufPtr;
   2148       wmirNode[i].params[2].v = parityStripeID;
   2149       wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   2150       pda = pda->next;
   2151       pdaP = pdaP->next;
   2152     }
   2153     RF_ASSERT(pda == NULL);
   2154     RF_ASSERT(pdaP == NULL);
   2155   }
   2156 
   2157   /* link the header node to the block node */
   2158   RF_ASSERT(dag_h->numSuccedents == 1);
   2159   RF_ASSERT(blockNode->numAntecedents == 0);
   2160   dag_h->succedents[0] = blockNode;
   2161 
   2162   /* link the block node to the write nodes */
   2163   RF_ASSERT(blockNode->numSuccedents == (nWndNodes + nWmirNodes));
   2164   for (i = 0; i < nWndNodes; i++) {
   2165     RF_ASSERT(wndNode[i].numAntecedents == 1);
   2166     blockNode->succedents[i] = &wndNode[i];
   2167     wndNode[i].antecedents[0] = blockNode;
   2168     wndNode[i].antType[0] = rf_control;
   2169   }
   2170   for (i = 0; i < nWmirNodes; i++) {
   2171     RF_ASSERT(wmirNode[i].numAntecedents == 1);
   2172     blockNode->succedents[i + nWndNodes] = &wmirNode[i];
   2173     wmirNode[i].antecedents[0] = blockNode;
   2174     wmirNode[i].antType[0] = rf_control;
   2175   }
   2176 
   2177   /* link the write nodes to the unblock node */
   2178   RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
   2179   for (i = 0; i < nWndNodes; i++) {
   2180     RF_ASSERT(wndNode[i].numSuccedents == 1);
   2181     wndNode[i].succedents[0] = unblockNode;
   2182     unblockNode->antecedents[i] = &wndNode[i];
   2183     unblockNode->antType[i] = rf_control;
   2184   }
   2185   for (i = 0; i < nWmirNodes; i++) {
   2186     RF_ASSERT(wmirNode[i].numSuccedents == 1);
   2187     wmirNode[i].succedents[0] = unblockNode;
   2188     unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
   2189     unblockNode->antType[i + nWndNodes] = rf_control;
   2190   }
   2191 
   2192   /* link the unblock node to the term node */
   2193   RF_ASSERT(unblockNode->numSuccedents == 1);
   2194   RF_ASSERT(termNode->numAntecedents == 1);
   2195   RF_ASSERT(termNode->numSuccedents == 0);
   2196   unblockNode->succedents[0] = termNode;
   2197   termNode->antecedents[0] = unblockNode;
   2198   termNode->antType[0] = rf_control;
   2199 
   2200   return;
   2201 }
   2202