Home | History | Annotate | Line # | Download | only in raidframe
rf_parityloggingdags.c revision 1.1
      1 /*	$NetBSD: rf_parityloggingdags.c,v 1.1 1998/11/13 04:20:32 oster Exp $	*/
      2 /*
      3  * Copyright (c) 1995 Carnegie-Mellon University.
      4  * All rights reserved.
      5  *
      6  * Author: William V. Courtright II
      7  *
      8  * Permission to use, copy, modify and distribute this software and
      9  * its documentation is hereby granted, provided that both the copyright
     10  * notice and this permission notice appear in all copies of the
     11  * software, derivative works or modified versions, and any portions
     12  * thereof, and that both notices appear in supporting documentation.
     13  *
     14  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
     15  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
     16  * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
     17  *
     18  * Carnegie Mellon requests users of this software to return to
     19  *
     20  *  Software Distribution Coordinator  or  Software.Distribution (at) CS.CMU.EDU
     21  *  School of Computer Science
     22  *  Carnegie Mellon University
     23  *  Pittsburgh PA 15213-3890
     24  *
     25  * any improvements or extensions that they make and grant Carnegie the
     26  * rights to redistribute these changes.
     27  */
     28 
     29 /*
     30  * Log: rf_parityloggingdags.c,v
     31  * Revision 1.27  1996/07/28 20:31:39  jimz
     32  * i386netbsd port
     33  * true/false fixup
     34  *
     35  * Revision 1.26  1996/07/27  23:36:08  jimz
     36  * Solaris port of simulator
     37  *
     38  * Revision 1.25  1996/07/22  19:52:16  jimz
     39  * switched node params to RF_DagParam_t, a union of
     40  * a 64-bit int and a void *, for better portability
     41  * attempted hpux port, but failed partway through for
     42  * lack of a single C compiler capable of compiling all
     43  * source files
     44  *
     45  * Revision 1.24  1996/06/11  13:47:21  jimz
     46  * fix up for in-kernel compilation
     47  *
     48  * Revision 1.23  1996/06/07  22:26:27  jimz
     49  * type-ify which_ru (RF_ReconUnitNum_t)
     50  *
     51  * Revision 1.22  1996/06/07  21:33:04  jimz
     52  * begin using consistent types for sector numbers,
     53  * stripe numbers, row+col numbers, recon unit numbers
     54  *
     55  * Revision 1.21  1996/06/02  17:31:48  jimz
     56  * Moved a lot of global stuff into array structure, where it belongs.
     57  * Fixed up paritylogging, pss modules in this manner. Some general
     58  * code cleanup. Removed lots of dead code, some dead files.
     59  *
     60  * Revision 1.20  1996/05/31  22:26:54  jimz
     61  * fix a lot of mapping problems, memory allocation problems
     62  * found some weird lock issues, fixed 'em
     63  * more code cleanup
     64  *
     65  * Revision 1.19  1996/05/30  11:29:41  jimz
     66  * Numerous bug fixes. Stripe lock release code disagreed with the taking code
     67  * about when stripes should be locked (I made it consistent: no parity, no lock)
     68  * There was a lot of extra serialization of I/Os which I've removed- a lot of
     69  * it was to calculate values for the cache code, which is no longer with us.
     70  * More types, function, macro cleanup. Added code to properly quiesce the array
     71  * on shutdown. Made a lot of stuff array-specific which was (bogusly) general
     72  * before. Fixed memory allocation, freeing bugs.
     73  *
     74  * Revision 1.18  1996/05/27  18:56:37  jimz
     75  * more code cleanup
     76  * better typing
     77  * compiles in all 3 environments
     78  *
     79  * Revision 1.17  1996/05/24  22:17:04  jimz
     80  * continue code + namespace cleanup
     81  * typed a bunch of flags
     82  *
     83  * Revision 1.16  1996/05/24  04:28:55  jimz
     84  * release cleanup ckpt
     85  *
     86  * Revision 1.15  1996/05/23  21:46:35  jimz
     87  * checkpoint in code cleanup (release prep)
     88  * lots of types, function names have been fixed
     89  *
     90  * Revision 1.14  1996/05/23  00:33:23  jimz
     91  * code cleanup: move all debug decls to rf_options.c, all extern
     92  * debug decls to rf_options.h, all debug vars preceded by rf_
     93  *
     94  * Revision 1.13  1996/05/18  19:51:34  jimz
     95  * major code cleanup- fix syntax, make some types consistent,
     96  * add prototypes, clean out dead code, et cetera
     97  *
     98  * Revision 1.12  1996/05/08  21:01:24  jimz
     99  * fixed up enum type names that were conflicting with other
    100  * enums and function names (ie, "panic")
    101  * future naming trends will be towards RF_ and rf_ for
    102  * everything raidframe-related
    103  *
    104  * Revision 1.11  1996/05/03  19:42:02  wvcii
    105  * added includes for dag library
    106  *
    107  * Revision 1.10  1995/12/12  18:10:06  jimz
    108  * MIN -> RF_MIN, MAX -> RF_MAX, ASSERT -> RF_ASSERT
    109  * fix 80-column brain damage in comments
    110  *
    111  * Revision 1.9  1995/12/06  20:55:24  wvcii
    112  * added prototyping
    113  * fixed bug in dag header numSuccedents count for both small and large dags
    114  *
    115  * Revision 1.8  1995/11/30  16:08:01  wvcii
    116  * added copyright info
    117  *
    118  * Revision 1.7  1995/11/07  15:29:05  wvcii
    119  * reorganized code, adding comments and asserts
    120  * dag creation routines now generate term node
    121  * encoded commit point, barrier, and antecedence types into dags
    122  *
    123  * Revision 1.6  1995/09/07  15:52:06  jimz
    124  * noop compile when INCLUDE_PARITYLOGGING not defined
    125  *
    126  * Revision 1.5  1995/06/15  13:51:53  robby
    127  * updated some wrong prototypes (after prototyping rf_dagutils.h)
    128  *
    129  * Revision 1.4  1995/06/09  13:15:05  wvcii
    130  * code is now nonblocking
    131  *
    132  * Revision 1.3  95/05/31  13:09:14  wvcii
    133  * code debug
    134  *
    135  * Revision 1.2  1995/05/21  15:34:14  wvcii
    136  * code debug
    137  *
    138  * Revision 1.1  95/05/16  14:36:53  wvcii
    139  * Initial revision
    140  *
    141  *
    142  */
    143 
    144 #include "rf_archs.h"
    145 
    146 #if RF_INCLUDE_PARITYLOGGING > 0
    147 
    148 /*
    149   DAGs specific to parity logging are created here
    150  */
    151 
    152 #include "rf_types.h"
    153 #include "rf_raid.h"
    154 #include "rf_dag.h"
    155 #include "rf_dagutils.h"
    156 #include "rf_dagfuncs.h"
    157 #include "rf_threadid.h"
    158 #include "rf_debugMem.h"
    159 #include "rf_paritylog.h"
    160 #include "rf_memchunk.h"
    161 #include "rf_general.h"
    162 
    163 #include "rf_parityloggingdags.h"
    164 
    165 /******************************************************************************
    166  *
    167  * creates a DAG to perform a large-write operation:
    168  *
    169  *         / Rod \     / Wnd \
    170  * H -- NIL- Rod - NIL - Wnd ------ NIL - T
    171  *         \ Rod /     \ Xor - Lpo /
    172  *
    173  * The writes are not done until the reads complete because if they were done in
    174  * parallel, a failure on one of the reads could leave the parity in an inconsistent
    175  * state, so that the retry with a new DAG would produce erroneous parity.
    176  *
    177  * Note:  this DAG has the nasty property that none of the buffers allocated for reading
    178  *        old data can be freed until the XOR node fires.  Need to fix this.
    179  *
    180  * The last two arguments are the number of faults tolerated, and function for the
    181  * redundancy calculation. The undo for the redundancy calc is assumed to be null
    182  *
    183  *****************************************************************************/
    184 
    185 void rf_CommonCreateParityLoggingLargeWriteDAG(
    186   RF_Raid_t              *raidPtr,
    187   RF_AccessStripeMap_t   *asmap,
    188   RF_DagHeader_t         *dag_h,
    189   void                   *bp,
    190   RF_RaidAccessFlags_t    flags,
    191   RF_AllocListElem_t     *allocList,
    192   int                     nfaults,
    193   int                   (*redFunc)(RF_DagNode_t *))
    194 {
    195   RF_DagNode_t *nodes, *wndNodes, *rodNodes=NULL, *syncNode, *xorNode, *lpoNode, *blockNode, *unblockNode, *termNode;
    196   int nWndNodes, nRodNodes, i;
    197   RF_RaidLayout_t *layoutPtr = &(raidPtr->Layout);
    198   RF_AccessStripeMapHeader_t *new_asm_h[2];
    199   int nodeNum, asmNum;
    200   RF_ReconUnitNum_t which_ru;
    201   char *sosBuffer, *eosBuffer;
    202   RF_PhysDiskAddr_t *pda;
    203   RF_StripeNum_t parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
    204 
    205   if (rf_dagDebug)
    206     printf("[Creating parity-logging large-write DAG]\n");
    207   RF_ASSERT(nfaults == 1); /* this arch only single fault tolerant */
    208   dag_h->creator = "ParityLoggingLargeWriteDAG";
    209 
    210   /* alloc the Wnd nodes, the xor node, and the Lpo node */
    211   nWndNodes = asmap->numStripeUnitsAccessed;
    212   RF_CallocAndAdd(nodes, nWndNodes + 6, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
    213   i = 0;
    214   wndNodes    = &nodes[i]; i += nWndNodes;
    215   xorNode     = &nodes[i]; i += 1;
    216   lpoNode     = &nodes[i]; i += 1;
    217   blockNode   = &nodes[i]; i += 1;
    218   syncNode    = &nodes[i]; i += 1;
    219   unblockNode = &nodes[i]; i += 1;
    220   termNode    = &nodes[i]; i += 1;
    221 
    222   dag_h->numCommitNodes = nWndNodes + 1;
    223   dag_h->numCommits = 0;
    224   dag_h->numSuccedents = 1;
    225 
    226   rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
    227   if (nRodNodes > 0)
    228     RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
    229 
    230   /* begin node initialization */
    231   rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nRodNodes + 1, 0, 0, 0, dag_h, "Nil", allocList);
    232   rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, nWndNodes + 1, 0, 0, dag_h, "Nil", allocList);
    233   rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes + 1, 0, 0, dag_h, "Nil", allocList);
    234   rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
    235 
    236   /* initialize the Rod nodes */
    237   for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
    238     if (new_asm_h[asmNum]) {
    239       pda = new_asm_h[asmNum]->stripeMap->physInfo;
    240       while (pda) {
    241 	rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc,rf_DiskReadUndoFunc,rf_GenericWakeupFunc,1,1,4,0, dag_h, "Rod", allocList);
    242 	rodNodes[nodeNum].params[0].p = pda;
    243 	rodNodes[nodeNum].params[1].p = pda->bufPtr;
    244 	rodNodes[nodeNum].params[2].v = parityStripeID;
    245 	rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
    246 	nodeNum++;
    247 	pda=pda->next;
    248       }
    249     }
    250   }
    251   RF_ASSERT(nodeNum == nRodNodes);
    252 
    253   /* initialize the wnd nodes */
    254   pda = asmap->physInfo;
    255   for (i=0; i < nWndNodes; i++) {
    256     rf_InitNode(&wndNodes[i], rf_wait, RF_TRUE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
    257     RF_ASSERT(pda != NULL);
    258     wndNodes[i].params[0].p = pda;
    259     wndNodes[i].params[1].p = pda->bufPtr;
    260     wndNodes[i].params[2].v = parityStripeID;
    261     wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
    262     pda = pda->next;
    263   }
    264 
    265   /* initialize the redundancy node */
    266   rf_InitNode(xorNode, rf_wait, RF_TRUE, redFunc, rf_NullNodeUndoFunc, NULL, 1, 1, 2*(nWndNodes+nRodNodes)+1, 1, dag_h, "Xr ", allocList);
    267   xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
    268   for (i=0; i < nWndNodes; i++) {
    269     xorNode->params[2*i+0] = wndNodes[i].params[0];         /* pda */
    270     xorNode->params[2*i+1] = wndNodes[i].params[1];         /* buf ptr */
    271   }
    272   for (i=0; i < nRodNodes; i++) {
    273     xorNode->params[2*(nWndNodes+i)+0] = rodNodes[i].params[0];         /* pda */
    274     xorNode->params[2*(nWndNodes+i)+1] = rodNodes[i].params[1];         /* buf ptr */
    275   }
    276   xorNode->params[2*(nWndNodes+nRodNodes)].p = raidPtr;  /* xor node needs to get at RAID information */
    277 
    278   /* look for an Rod node that reads a complete SU.  If none, alloc a buffer to receive the parity info.
    279    * Note that we can't use a new data buffer because it will not have gotten written when the xor occurs.
    280    */
    281   for (i = 0; i < nRodNodes; i++)
    282     if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
    283       break;
    284   if (i == nRodNodes) {
    285     RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
    286   }
    287   else {
    288     xorNode->results[0] = rodNodes[i].params[1].p;
    289   }
    290 
    291   /* initialize the Lpo node */
    292   rf_InitNode(lpoNode, rf_wait, RF_FALSE, rf_ParityLogOverwriteFunc, rf_ParityLogOverwriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Lpo", allocList);
    293 
    294   lpoNode->params[0].p = asmap->parityInfo;
    295   lpoNode->params[1].p = xorNode->results[0];
    296   RF_ASSERT(asmap->parityInfo->next == NULL);        /* parityInfo must describe entire parity unit */
    297 
    298   /* connect nodes to form graph */
    299 
    300   /* connect dag header to block node */
    301   RF_ASSERT(dag_h->numSuccedents == 1);
    302   RF_ASSERT(blockNode->numAntecedents == 0);
    303   dag_h->succedents[0] = blockNode;
    304 
    305   /* connect the block node to the Rod nodes */
    306   RF_ASSERT(blockNode->numSuccedents == nRodNodes + 1);
    307   for (i = 0; i < nRodNodes; i++) {
    308     RF_ASSERT(rodNodes[i].numAntecedents == 1);
    309     blockNode->succedents[i] = &rodNodes[i];
    310     rodNodes[i].antecedents[0] = blockNode;
    311     rodNodes[i].antType[0] = rf_control;
    312   }
    313 
    314   /* connect the block node to the sync node */
    315   /* necessary if nRodNodes == 0 */
    316   RF_ASSERT(syncNode->numAntecedents == nRodNodes + 1);
    317   blockNode->succedents[nRodNodes] = syncNode;
    318   syncNode->antecedents[0] = blockNode;
    319   syncNode->antType[0] = rf_control;
    320 
    321   /* connect the Rod nodes to the syncNode */
    322   for (i = 0; i < nRodNodes; i++) {
    323     rodNodes[i].succedents[0] = syncNode;
    324     syncNode->antecedents[1 + i] = &rodNodes[i];
    325     syncNode->antType[1 + i] = rf_control;
    326   }
    327 
    328   /* connect the sync node to the xor node */
    329   RF_ASSERT(syncNode->numSuccedents == nWndNodes + 1);
    330   RF_ASSERT(xorNode->numAntecedents == 1);
    331   syncNode->succedents[0] = xorNode;
    332   xorNode->antecedents[0] = syncNode;
    333   xorNode->antType[0] = rf_trueData; /* carry forward from sync */
    334 
    335   /* connect the sync node to the Wnd nodes */
    336   for (i = 0; i < nWndNodes; i++) {
    337     RF_ASSERT(wndNodes->numAntecedents == 1);
    338     syncNode->succedents[1 + i] = &wndNodes[i];
    339     wndNodes[i].antecedents[0] = syncNode;
    340     wndNodes[i].antType[0] = rf_control;
    341   }
    342 
    343   /* connect the xor node to the Lpo node */
    344   RF_ASSERT(xorNode->numSuccedents == 1);
    345   RF_ASSERT(lpoNode->numAntecedents == 1);
    346   xorNode->succedents[0] = lpoNode;
    347   lpoNode->antecedents[0]= xorNode;
    348   lpoNode->antType[0] = rf_trueData;
    349 
    350   /* connect the Wnd nodes to the unblock node */
    351   RF_ASSERT(unblockNode->numAntecedents == nWndNodes + 1);
    352   for (i = 0; i < nWndNodes; i++) {
    353     RF_ASSERT(wndNodes->numSuccedents == 1);
    354     wndNodes[i].succedents[0] = unblockNode;
    355     unblockNode->antecedents[i] = &wndNodes[i];
    356     unblockNode->antType[i] = rf_control;
    357   }
    358 
    359   /* connect the Lpo node to the unblock node */
    360   RF_ASSERT(lpoNode->numSuccedents == 1);
    361   lpoNode->succedents[0] = unblockNode;
    362   unblockNode->antecedents[nWndNodes] = lpoNode;
    363   unblockNode->antType[nWndNodes] = rf_control;
    364 
    365   /* connect unblock node to terminator */
    366   RF_ASSERT(unblockNode->numSuccedents == 1);
    367   RF_ASSERT(termNode->numAntecedents == 1);
    368   RF_ASSERT(termNode->numSuccedents == 0);
    369   unblockNode->succedents[0] = termNode;
    370   termNode->antecedents[0] = unblockNode;
    371   termNode->antType[0] = rf_control;
    372 }
    373 
    374 
    375 
    376 
    377 /******************************************************************************
    378  *
    379  * creates a DAG to perform a small-write operation (either raid 5 or pq), which is as follows:
    380  *
    381  *                                     Header
    382  *                                       |
    383  *                                     Block
    384  *                                 / |  ... \   \
    385  *                                /  |       \   \
    386  *                             Rod  Rod      Rod  Rop
    387  *                             | \ /| \    / |  \/ |
    388  *                             |    |        |  /\ |
    389  *                             Wnd  Wnd      Wnd   X
    390  *                              |    \       /     |
    391  *                              |     \     /      |
    392  *                               \     \   /      Lpo
    393  *                                \     \ /       /
    394  *                                 +-> Unblock <-+
    395  *                                       |
    396  *                                       T
    397  *
    398  *
    399  * R = Read, W = Write, X = Xor, o = old, n = new, d = data, p = parity.
    400  * When the access spans a stripe unit boundary and is less than one SU in size, there will
    401  * be two Rop -- X -- Wnp branches.  I call this the "double-XOR" case.
    402  * The second output from each Rod node goes to the X node.  In the double-XOR
    403  * case, there are exactly 2 Rod nodes, and each sends one output to one X node.
    404  * There is one Rod -- Wnd -- T branch for each stripe unit being updated.
    405  *
    406  * The block and unblock nodes are unused.  See comment above CreateFaultFreeReadDAG.
    407  *
    408  * Note:  this DAG ignores all the optimizations related to making the RMWs atomic.
    409  *        it also has the nasty property that none of the buffers allocated for reading
    410  *        old data & parity can be freed until the XOR node fires.  Need to fix this.
    411  *
    412  * A null qfuncs indicates single fault tolerant
    413  *****************************************************************************/
    414 
    415 void rf_CommonCreateParityLoggingSmallWriteDAG(
    416   RF_Raid_t             *raidPtr,
    417   RF_AccessStripeMap_t  *asmap,
    418   RF_DagHeader_t        *dag_h,
    419   void                  *bp,
    420   RF_RaidAccessFlags_t   flags,
    421   RF_AllocListElem_t    *allocList,
    422   RF_RedFuncs_t         *pfuncs,
    423   RF_RedFuncs_t         *qfuncs)
    424 {
    425   RF_DagNode_t *xorNodes, *blockNode, *unblockNode, *nodes;
    426   RF_DagNode_t *readDataNodes, *readParityNodes;
    427   RF_DagNode_t *writeDataNodes, *lpuNodes;
    428   RF_DagNode_t *unlockDataNodes=NULL, *termNode;
    429   RF_PhysDiskAddr_t *pda = asmap->physInfo;
    430   int numDataNodes = asmap->numStripeUnitsAccessed;
    431   int numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
    432   int i, j, nNodes, totalNumNodes;
    433   RF_ReconUnitNum_t which_ru;
    434   int (*func)(RF_DagNode_t *node), (*undoFunc)(RF_DagNode_t *node);
    435   int (*qfunc)(RF_DagNode_t *node);
    436   char *name, *qname;
    437   RF_StripeNum_t parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
    438   long nfaults = qfuncs ? 2 : 1;
    439   int lu_flag = (rf_enableAtomicRMW) ? 1 : 0;          /* lock/unlock flag */
    440 
    441   if (rf_dagDebug) printf("[Creating parity-logging small-write DAG]\n");
    442   RF_ASSERT(numDataNodes > 0);
    443   RF_ASSERT(nfaults == 1);
    444   dag_h->creator = "ParityLoggingSmallWriteDAG";
    445 
    446   /* DAG creation occurs in three steps:
    447      1. count the number of nodes in the DAG
    448      2. create the nodes
    449      3. initialize the nodes
    450      4. connect the nodes
    451    */
    452 
    453   /* Step 1. compute number of nodes in the graph */
    454 
    455   /* number of nodes:
    456       a read and write for each data unit
    457       a redundancy computation node for each parity node
    458       a read and Lpu for each parity unit
    459       a block and unblock node (2)
    460       a terminator node
    461       if atomic RMW
    462         an unlock node for each data unit, redundancy unit
    463   */
    464   totalNumNodes = (2 * numDataNodes) + numParityNodes + (2 * numParityNodes) + 3;
    465   if (lu_flag)
    466     totalNumNodes += numDataNodes;
    467 
    468   nNodes     = numDataNodes + numParityNodes;
    469 
    470   dag_h->numCommitNodes = numDataNodes + numParityNodes;
    471   dag_h->numCommits = 0;
    472   dag_h->numSuccedents = 1;
    473 
    474   /* Step 2. create the nodes */
    475   RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
    476   i = 0;
    477   blockNode         = &nodes[i]; i += 1;
    478   unblockNode       = &nodes[i]; i += 1;
    479   readDataNodes     = &nodes[i]; i += numDataNodes;
    480   readParityNodes   = &nodes[i]; i += numParityNodes;
    481   writeDataNodes    = &nodes[i]; i += numDataNodes;
    482   lpuNodes          = &nodes[i]; i += numParityNodes;
    483   xorNodes          = &nodes[i]; i += numParityNodes;
    484   termNode          = &nodes[i]; i += 1;
    485   if (lu_flag) {
    486     unlockDataNodes = &nodes[i]; i += numDataNodes;
    487   }
    488   RF_ASSERT(i == totalNumNodes);
    489 
    490   /* Step 3. initialize the nodes */
    491   /* initialize block node (Nil) */
    492   rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
    493 
    494   /* initialize unblock node (Nil) */
    495   rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, nNodes, 0, 0, dag_h, "Nil", allocList);
    496 
    497   /* initialize terminatory node (Trm) */
    498   rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
    499 
    500   /* initialize nodes which read old data (Rod) */
    501   for (i = 0; i < numDataNodes; i++) {
    502     rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, nNodes, 1, 4, 0, dag_h, "Rod", allocList);
    503     RF_ASSERT(pda != NULL);
    504     readDataNodes[i].params[0].p = pda; /* physical disk addr desc */
    505     readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList);  /* buffer to hold old data */
    506     readDataNodes[i].params[2].v = parityStripeID;
    507     readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
    508     pda=pda->next;
    509     readDataNodes[i].propList[0] = NULL;
    510     readDataNodes[i].propList[1] = NULL;
    511   }
    512 
    513   /* initialize nodes which read old parity (Rop) */
    514   pda = asmap->parityInfo; i = 0;
    515   for (i = 0; i < numParityNodes; i++) {
    516     RF_ASSERT(pda != NULL);
    517     rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, nNodes, 1, 4, 0, dag_h, "Rop", allocList);
    518     readParityNodes[i].params[0].p = pda;
    519     readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList);    /* buffer to hold old parity */
    520     readParityNodes[i].params[2].v = parityStripeID;
    521     readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
    522     readParityNodes[i].propList[0] = NULL;
    523     pda=pda->next;
    524   }
    525 
    526   /* initialize nodes which write new data (Wnd) */
    527   pda = asmap->physInfo;
    528   for (i=0; i < numDataNodes; i++) {
    529     RF_ASSERT(pda != NULL);
    530     rf_InitNode(&writeDataNodes[i], rf_wait, RF_TRUE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, nNodes, 4, 0, dag_h, "Wnd", allocList);
    531     writeDataNodes[i].params[0].p = pda;                    /* physical disk addr desc */
    532     writeDataNodes[i].params[1].p = pda->bufPtr;   /* buffer holding new data to be written */
    533     writeDataNodes[i].params[2].v = parityStripeID;
    534     writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
    535 
    536     if (lu_flag) {
    537       /* initialize node to unlock the disk queue */
    538       rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Und", allocList);
    539       unlockDataNodes[i].params[0].p = pda; /* physical disk addr desc */
    540       unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
    541     }
    542     pda = pda->next;
    543   }
    544 
    545 
    546   /* initialize nodes which compute new parity */
    547   /* we use the simple XOR func in the double-XOR case, and when we're accessing only a portion of one stripe unit.
    548    * the distinction between the two is that the regular XOR func assumes that the targbuf is a full SU in size,
    549    * and examines the pda associated with the buffer to decide where within the buffer to XOR the data, whereas
    550    * the simple XOR func just XORs the data into the start of the buffer.
    551    */
    552   if ((numParityNodes==2) || ((numDataNodes == 1) && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
    553     func = pfuncs->simple; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->SimpleName;
    554     if (qfuncs)
    555       { qfunc = qfuncs->simple; qname = qfuncs->SimpleName;}
    556   } else {
    557     func = pfuncs->regular; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->RegularName;
    558     if (qfuncs) { qfunc = qfuncs->regular; qname = qfuncs->RegularName;}
    559   }
    560   /* initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop} nodes, and raidPtr  */
    561   if (numParityNodes==2) {        /* double-xor case */
    562     for (i=0; i < numParityNodes; i++) {
    563       rf_InitNode(&xorNodes[i], rf_wait, RF_TRUE, func, undoFunc, NULL, 1, nNodes, 7, 1, dag_h, name, allocList);  /* no wakeup func for xor */
    564       xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
    565       xorNodes[i].params[0] = readDataNodes[i].params[0];
    566       xorNodes[i].params[1] = readDataNodes[i].params[1];
    567       xorNodes[i].params[2] = readParityNodes[i].params[0];
    568       xorNodes[i].params[3] = readParityNodes[i].params[1];
    569       xorNodes[i].params[4] = writeDataNodes[i].params[0];
    570       xorNodes[i].params[5] = writeDataNodes[i].params[1];
    571       xorNodes[i].params[6].p = raidPtr;
    572       xorNodes[i].results[0] = readParityNodes[i].params[1].p;   /* use old parity buf as target buf */
    573     }
    574   }
    575   else {
    576     /* there is only one xor node in this case */
    577     rf_InitNode(&xorNodes[0], rf_wait, RF_TRUE, func, undoFunc, NULL, 1, nNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
    578     xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
    579     for (i=0; i < numDataNodes + 1; i++) {
    580       /* set up params related to Rod and Rop nodes */
    581       xorNodes[0].params[2*i+0] = readDataNodes[i].params[0];    /* pda */
    582       xorNodes[0].params[2*i+1] = readDataNodes[i].params[1];    /* buffer pointer */
    583     }
    584     for (i=0; i < numDataNodes; i++) {
    585       /* set up params related to Wnd and Wnp nodes */
    586       xorNodes[0].params[2*(numDataNodes+1+i)+0] = writeDataNodes[i].params[0]; /* pda */
    587       xorNodes[0].params[2*(numDataNodes+1+i)+1] = writeDataNodes[i].params[1]; /* buffer pointer */
    588     }
    589     xorNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr;  /* xor node needs to get at RAID information */
    590     xorNodes[0].results[0] = readParityNodes[0].params[1].p;
    591   }
    592 
    593   /* initialize the log node(s) */
    594   pda = asmap->parityInfo;
    595   for (i = 0;  i < numParityNodes; i++) {
    596     RF_ASSERT(pda);
    597     rf_InitNode(&lpuNodes[i], rf_wait, RF_FALSE, rf_ParityLogUpdateFunc, rf_ParityLogUpdateUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Lpu", allocList);
    598     lpuNodes[i].params[0].p = pda;                    /* PhysDiskAddr of parity */
    599     lpuNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer to parity */
    600     pda = pda->next;
    601   }
    602 
    603 
    604   /* Step 4. connect the nodes */
    605 
    606   /* connect header to block node */
    607   RF_ASSERT(dag_h->numSuccedents == 1);
    608   RF_ASSERT(blockNode->numAntecedents == 0);
    609   dag_h->succedents[0] = blockNode;
    610 
    611   /* connect block node to read old data nodes */
    612   RF_ASSERT(blockNode->numSuccedents == (numDataNodes + numParityNodes));
    613   for (i = 0; i < numDataNodes; i++) {
    614     blockNode->succedents[i] = &readDataNodes[i];
    615     RF_ASSERT(readDataNodes[i].numAntecedents == 1);
    616     readDataNodes[i].antecedents[0]= blockNode;
    617     readDataNodes[i].antType[0] = rf_control;
    618   }
    619 
    620   /* connect block node to read old parity nodes */
    621   for (i = 0; i < numParityNodes; i++) {
    622     blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
    623     RF_ASSERT(readParityNodes[i].numAntecedents == 1);
    624     readParityNodes[i].antecedents[0] = blockNode;
    625     readParityNodes[i].antType[0] = rf_control;
    626   }
    627 
    628   /* connect read old data nodes to write new data nodes */
    629   for (i = 0; i < numDataNodes; i++) {
    630     RF_ASSERT(readDataNodes[i].numSuccedents == numDataNodes + numParityNodes);
    631     for (j = 0; j < numDataNodes; j++) {
    632       RF_ASSERT(writeDataNodes[j].numAntecedents == numDataNodes + numParityNodes);
    633       readDataNodes[i].succedents[j] = &writeDataNodes[j];
    634       writeDataNodes[j].antecedents[i] = &readDataNodes[i];
    635       if (i == j)
    636 	writeDataNodes[j].antType[i] = rf_antiData;
    637       else
    638 	writeDataNodes[j].antType[i] = rf_control;
    639     }
    640   }
    641 
    642   /* connect read old data nodes to xor nodes */
    643   for (i = 0; i < numDataNodes; i++)
    644     for (j = 0; j < numParityNodes; j++){
    645       RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
    646       readDataNodes[i].succedents[numDataNodes + j] = &xorNodes[j];
    647       xorNodes[j].antecedents[i] = &readDataNodes[i];
    648       xorNodes[j].antType[i] = rf_trueData;
    649     }
    650 
    651   /* connect read old parity nodes to write new data nodes */
    652   for (i = 0; i < numParityNodes; i++) {
    653     RF_ASSERT(readParityNodes[i].numSuccedents == numDataNodes + numParityNodes);
    654     for (j = 0; j < numDataNodes; j++) {
    655       readParityNodes[i].succedents[j] = &writeDataNodes[j];
    656       writeDataNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
    657       writeDataNodes[j].antType[numDataNodes + i] = rf_control;
    658     }
    659   }
    660 
    661   /* connect read old parity nodes to xor nodes */
    662   for (i = 0; i < numParityNodes; i++)
    663     for (j = 0; j < numParityNodes; j++) {
    664       readParityNodes[i].succedents[numDataNodes + j] = &xorNodes[j];
    665       xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
    666       xorNodes[j].antType[numDataNodes + i] = rf_trueData;
    667     }
    668 
    669   /* connect xor nodes to write new parity nodes */
    670   for (i = 0; i < numParityNodes; i++) {
    671     RF_ASSERT(xorNodes[i].numSuccedents == 1);
    672     RF_ASSERT(lpuNodes[i].numAntecedents == 1);
    673     xorNodes[i].succedents[0] = &lpuNodes[i];
    674     lpuNodes[i].antecedents[0] = &xorNodes[i];
    675     lpuNodes[i].antType[0] = rf_trueData;
    676   }
    677 
    678   for (i = 0; i < numDataNodes; i++) {
    679     if (lu_flag) {
    680       /* connect write new data nodes to unlock nodes */
    681       RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
    682       RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
    683       writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
    684       unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
    685       unlockDataNodes[i].antType[0] = rf_control;
    686 
    687       /* connect unlock nodes to unblock node */
    688       RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
    689       RF_ASSERT(unblockNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
    690       unlockDataNodes[i].succedents[0] = unblockNode;
    691       unblockNode->antecedents[i] = &unlockDataNodes[i];
    692       unblockNode->antType[i] = rf_control;
    693     }
    694     else {
    695       /* connect write new data nodes to unblock node */
    696       RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
    697       RF_ASSERT(unblockNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
    698       writeDataNodes[i].succedents[0] = unblockNode;
    699       unblockNode->antecedents[i] = &writeDataNodes[i];
    700       unblockNode->antType[i] = rf_control;
    701     }
    702   }
    703 
    704   /* connect write new parity nodes to unblock node */
    705   for (i = 0; i < numParityNodes; i++) {
    706     RF_ASSERT(lpuNodes[i].numSuccedents == 1);
    707     lpuNodes[i].succedents[0] = unblockNode;
    708     unblockNode->antecedents[numDataNodes + i] = &lpuNodes[i];
    709     unblockNode->antType[numDataNodes + i] = rf_control;
    710   }
    711 
    712   /* connect unblock node to terminator */
    713   RF_ASSERT(unblockNode->numSuccedents == 1);
    714   RF_ASSERT(termNode->numAntecedents == 1);
    715   RF_ASSERT(termNode->numSuccedents == 0);
    716   unblockNode->succedents[0] = termNode;
    717   termNode->antecedents[0] = unblockNode;
    718   termNode->antType[0] = rf_control;
    719 }
    720 
    721 
    722 void rf_CreateParityLoggingSmallWriteDAG(
    723   RF_Raid_t             *raidPtr,
    724   RF_AccessStripeMap_t  *asmap,
    725   RF_DagHeader_t        *dag_h,
    726   void                  *bp,
    727   RF_RaidAccessFlags_t   flags,
    728   RF_AllocListElem_t    *allocList,
    729   RF_RedFuncs_t         *pfuncs,
    730   RF_RedFuncs_t         *qfuncs)
    731 {
    732   dag_h->creator = "ParityLoggingSmallWriteDAG";
    733   rf_CommonCreateParityLoggingSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, &rf_xorFuncs, NULL);
    734 }
    735 
    736 
    737 void rf_CreateParityLoggingLargeWriteDAG(
    738   RF_Raid_t              *raidPtr,
    739   RF_AccessStripeMap_t   *asmap,
    740   RF_DagHeader_t         *dag_h,
    741   void                   *bp,
    742   RF_RaidAccessFlags_t    flags,
    743   RF_AllocListElem_t     *allocList,
    744   int                     nfaults,
    745   int                   (*redFunc)(RF_DagNode_t *))
    746 {
    747   dag_h->creator = "ParityLoggingSmallWriteDAG";
    748   rf_CommonCreateParityLoggingLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 1, rf_RegularXorFunc);
    749 }
    750 
    751 #endif /* RF_INCLUDE_PARITYLOGGING > 0 */
    752