Home | History | Annotate | Line # | Download | only in raidframe
rf_parityloggingdags.c revision 1.1
      1  1.1  oster /*	$NetBSD: rf_parityloggingdags.c,v 1.1 1998/11/13 04:20:32 oster Exp $	*/
      2  1.1  oster /*
      3  1.1  oster  * Copyright (c) 1995 Carnegie-Mellon University.
      4  1.1  oster  * All rights reserved.
      5  1.1  oster  *
      6  1.1  oster  * Author: William V. Courtright II
      7  1.1  oster  *
      8  1.1  oster  * Permission to use, copy, modify and distribute this software and
      9  1.1  oster  * its documentation is hereby granted, provided that both the copyright
     10  1.1  oster  * notice and this permission notice appear in all copies of the
     11  1.1  oster  * software, derivative works or modified versions, and any portions
     12  1.1  oster  * thereof, and that both notices appear in supporting documentation.
     13  1.1  oster  *
     14  1.1  oster  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
     15  1.1  oster  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
     16  1.1  oster  * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
     17  1.1  oster  *
     18  1.1  oster  * Carnegie Mellon requests users of this software to return to
     19  1.1  oster  *
     20  1.1  oster  *  Software Distribution Coordinator  or  Software.Distribution (at) CS.CMU.EDU
     21  1.1  oster  *  School of Computer Science
     22  1.1  oster  *  Carnegie Mellon University
     23  1.1  oster  *  Pittsburgh PA 15213-3890
     24  1.1  oster  *
     25  1.1  oster  * any improvements or extensions that they make and grant Carnegie the
     26  1.1  oster  * rights to redistribute these changes.
     27  1.1  oster  */
     28  1.1  oster 
     29  1.1  oster /*
     30  1.1  oster  * Log: rf_parityloggingdags.c,v
     31  1.1  oster  * Revision 1.27  1996/07/28 20:31:39  jimz
     32  1.1  oster  * i386netbsd port
     33  1.1  oster  * true/false fixup
     34  1.1  oster  *
     35  1.1  oster  * Revision 1.26  1996/07/27  23:36:08  jimz
     36  1.1  oster  * Solaris port of simulator
     37  1.1  oster  *
     38  1.1  oster  * Revision 1.25  1996/07/22  19:52:16  jimz
     39  1.1  oster  * switched node params to RF_DagParam_t, a union of
     40  1.1  oster  * a 64-bit int and a void *, for better portability
     41  1.1  oster  * attempted hpux port, but failed partway through for
     42  1.1  oster  * lack of a single C compiler capable of compiling all
     43  1.1  oster  * source files
     44  1.1  oster  *
     45  1.1  oster  * Revision 1.24  1996/06/11  13:47:21  jimz
     46  1.1  oster  * fix up for in-kernel compilation
     47  1.1  oster  *
     48  1.1  oster  * Revision 1.23  1996/06/07  22:26:27  jimz
     49  1.1  oster  * type-ify which_ru (RF_ReconUnitNum_t)
     50  1.1  oster  *
     51  1.1  oster  * Revision 1.22  1996/06/07  21:33:04  jimz
     52  1.1  oster  * begin using consistent types for sector numbers,
     53  1.1  oster  * stripe numbers, row+col numbers, recon unit numbers
     54  1.1  oster  *
     55  1.1  oster  * Revision 1.21  1996/06/02  17:31:48  jimz
     56  1.1  oster  * Moved a lot of global stuff into array structure, where it belongs.
     57  1.1  oster  * Fixed up paritylogging, pss modules in this manner. Some general
     58  1.1  oster  * code cleanup. Removed lots of dead code, some dead files.
     59  1.1  oster  *
     60  1.1  oster  * Revision 1.20  1996/05/31  22:26:54  jimz
     61  1.1  oster  * fix a lot of mapping problems, memory allocation problems
     62  1.1  oster  * found some weird lock issues, fixed 'em
     63  1.1  oster  * more code cleanup
     64  1.1  oster  *
     65  1.1  oster  * Revision 1.19  1996/05/30  11:29:41  jimz
     66  1.1  oster  * Numerous bug fixes. Stripe lock release code disagreed with the taking code
     67  1.1  oster  * about when stripes should be locked (I made it consistent: no parity, no lock)
     68  1.1  oster  * There was a lot of extra serialization of I/Os which I've removed- a lot of
     69  1.1  oster  * it was to calculate values for the cache code, which is no longer with us.
     70  1.1  oster  * More types, function, macro cleanup. Added code to properly quiesce the array
     71  1.1  oster  * on shutdown. Made a lot of stuff array-specific which was (bogusly) general
     72  1.1  oster  * before. Fixed memory allocation, freeing bugs.
     73  1.1  oster  *
     74  1.1  oster  * Revision 1.18  1996/05/27  18:56:37  jimz
     75  1.1  oster  * more code cleanup
     76  1.1  oster  * better typing
     77  1.1  oster  * compiles in all 3 environments
     78  1.1  oster  *
     79  1.1  oster  * Revision 1.17  1996/05/24  22:17:04  jimz
     80  1.1  oster  * continue code + namespace cleanup
     81  1.1  oster  * typed a bunch of flags
     82  1.1  oster  *
     83  1.1  oster  * Revision 1.16  1996/05/24  04:28:55  jimz
     84  1.1  oster  * release cleanup ckpt
     85  1.1  oster  *
     86  1.1  oster  * Revision 1.15  1996/05/23  21:46:35  jimz
     87  1.1  oster  * checkpoint in code cleanup (release prep)
     88  1.1  oster  * lots of types, function names have been fixed
     89  1.1  oster  *
     90  1.1  oster  * Revision 1.14  1996/05/23  00:33:23  jimz
     91  1.1  oster  * code cleanup: move all debug decls to rf_options.c, all extern
     92  1.1  oster  * debug decls to rf_options.h, all debug vars preceded by rf_
     93  1.1  oster  *
     94  1.1  oster  * Revision 1.13  1996/05/18  19:51:34  jimz
     95  1.1  oster  * major code cleanup- fix syntax, make some types consistent,
     96  1.1  oster  * add prototypes, clean out dead code, et cetera
     97  1.1  oster  *
     98  1.1  oster  * Revision 1.12  1996/05/08  21:01:24  jimz
     99  1.1  oster  * fixed up enum type names that were conflicting with other
    100  1.1  oster  * enums and function names (ie, "panic")
    101  1.1  oster  * future naming trends will be towards RF_ and rf_ for
    102  1.1  oster  * everything raidframe-related
    103  1.1  oster  *
    104  1.1  oster  * Revision 1.11  1996/05/03  19:42:02  wvcii
    105  1.1  oster  * added includes for dag library
    106  1.1  oster  *
    107  1.1  oster  * Revision 1.10  1995/12/12  18:10:06  jimz
    108  1.1  oster  * MIN -> RF_MIN, MAX -> RF_MAX, ASSERT -> RF_ASSERT
    109  1.1  oster  * fix 80-column brain damage in comments
    110  1.1  oster  *
    111  1.1  oster  * Revision 1.9  1995/12/06  20:55:24  wvcii
    112  1.1  oster  * added prototyping
    113  1.1  oster  * fixed bug in dag header numSuccedents count for both small and large dags
    114  1.1  oster  *
    115  1.1  oster  * Revision 1.8  1995/11/30  16:08:01  wvcii
    116  1.1  oster  * added copyright info
    117  1.1  oster  *
    118  1.1  oster  * Revision 1.7  1995/11/07  15:29:05  wvcii
    119  1.1  oster  * reorganized code, adding comments and asserts
    120  1.1  oster  * dag creation routines now generate term node
    121  1.1  oster  * encoded commit point, barrier, and antecedence types into dags
    122  1.1  oster  *
    123  1.1  oster  * Revision 1.6  1995/09/07  15:52:06  jimz
    124  1.1  oster  * noop compile when INCLUDE_PARITYLOGGING not defined
    125  1.1  oster  *
    126  1.1  oster  * Revision 1.5  1995/06/15  13:51:53  robby
    127  1.1  oster  * updated some wrong prototypes (after prototyping rf_dagutils.h)
    128  1.1  oster  *
    129  1.1  oster  * Revision 1.4  1995/06/09  13:15:05  wvcii
    130  1.1  oster  * code is now nonblocking
    131  1.1  oster  *
    132  1.1  oster  * Revision 1.3  95/05/31  13:09:14  wvcii
    133  1.1  oster  * code debug
    134  1.1  oster  *
    135  1.1  oster  * Revision 1.2  1995/05/21  15:34:14  wvcii
    136  1.1  oster  * code debug
    137  1.1  oster  *
    138  1.1  oster  * Revision 1.1  95/05/16  14:36:53  wvcii
    139  1.1  oster  * Initial revision
    140  1.1  oster  *
    141  1.1  oster  *
    142  1.1  oster  */
    143  1.1  oster 
    144  1.1  oster #include "rf_archs.h"
    145  1.1  oster 
    146  1.1  oster #if RF_INCLUDE_PARITYLOGGING > 0
    147  1.1  oster 
    148  1.1  oster /*
    149  1.1  oster   DAGs specific to parity logging are created here
    150  1.1  oster  */
    151  1.1  oster 
    152  1.1  oster #include "rf_types.h"
    153  1.1  oster #include "rf_raid.h"
    154  1.1  oster #include "rf_dag.h"
    155  1.1  oster #include "rf_dagutils.h"
    156  1.1  oster #include "rf_dagfuncs.h"
    157  1.1  oster #include "rf_threadid.h"
    158  1.1  oster #include "rf_debugMem.h"
    159  1.1  oster #include "rf_paritylog.h"
    160  1.1  oster #include "rf_memchunk.h"
    161  1.1  oster #include "rf_general.h"
    162  1.1  oster 
    163  1.1  oster #include "rf_parityloggingdags.h"
    164  1.1  oster 
    165  1.1  oster /******************************************************************************
    166  1.1  oster  *
    167  1.1  oster  * creates a DAG to perform a large-write operation:
    168  1.1  oster  *
    169  1.1  oster  *         / Rod \     / Wnd \
    170  1.1  oster  * H -- NIL- Rod - NIL - Wnd ------ NIL - T
    171  1.1  oster  *         \ Rod /     \ Xor - Lpo /
    172  1.1  oster  *
    173  1.1  oster  * The writes are not done until the reads complete because if they were done in
    174  1.1  oster  * parallel, a failure on one of the reads could leave the parity in an inconsistent
    175  1.1  oster  * state, so that the retry with a new DAG would produce erroneous parity.
    176  1.1  oster  *
    177  1.1  oster  * Note:  this DAG has the nasty property that none of the buffers allocated for reading
    178  1.1  oster  *        old data can be freed until the XOR node fires.  Need to fix this.
    179  1.1  oster  *
    180  1.1  oster  * The last two arguments are the number of faults tolerated, and function for the
    181  1.1  oster  * redundancy calculation. The undo for the redundancy calc is assumed to be null
    182  1.1  oster  *
    183  1.1  oster  *****************************************************************************/
    184  1.1  oster 
    185  1.1  oster void rf_CommonCreateParityLoggingLargeWriteDAG(
    186  1.1  oster   RF_Raid_t              *raidPtr,
    187  1.1  oster   RF_AccessStripeMap_t   *asmap,
    188  1.1  oster   RF_DagHeader_t         *dag_h,
    189  1.1  oster   void                   *bp,
    190  1.1  oster   RF_RaidAccessFlags_t    flags,
    191  1.1  oster   RF_AllocListElem_t     *allocList,
    192  1.1  oster   int                     nfaults,
    193  1.1  oster   int                   (*redFunc)(RF_DagNode_t *))
    194  1.1  oster {
    195  1.1  oster   RF_DagNode_t *nodes, *wndNodes, *rodNodes=NULL, *syncNode, *xorNode, *lpoNode, *blockNode, *unblockNode, *termNode;
    196  1.1  oster   int nWndNodes, nRodNodes, i;
    197  1.1  oster   RF_RaidLayout_t *layoutPtr = &(raidPtr->Layout);
    198  1.1  oster   RF_AccessStripeMapHeader_t *new_asm_h[2];
    199  1.1  oster   int nodeNum, asmNum;
    200  1.1  oster   RF_ReconUnitNum_t which_ru;
    201  1.1  oster   char *sosBuffer, *eosBuffer;
    202  1.1  oster   RF_PhysDiskAddr_t *pda;
    203  1.1  oster   RF_StripeNum_t parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
    204  1.1  oster 
    205  1.1  oster   if (rf_dagDebug)
    206  1.1  oster     printf("[Creating parity-logging large-write DAG]\n");
    207  1.1  oster   RF_ASSERT(nfaults == 1); /* this arch only single fault tolerant */
    208  1.1  oster   dag_h->creator = "ParityLoggingLargeWriteDAG";
    209  1.1  oster 
    210  1.1  oster   /* alloc the Wnd nodes, the xor node, and the Lpo node */
    211  1.1  oster   nWndNodes = asmap->numStripeUnitsAccessed;
    212  1.1  oster   RF_CallocAndAdd(nodes, nWndNodes + 6, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
    213  1.1  oster   i = 0;
    214  1.1  oster   wndNodes    = &nodes[i]; i += nWndNodes;
    215  1.1  oster   xorNode     = &nodes[i]; i += 1;
    216  1.1  oster   lpoNode     = &nodes[i]; i += 1;
    217  1.1  oster   blockNode   = &nodes[i]; i += 1;
    218  1.1  oster   syncNode    = &nodes[i]; i += 1;
    219  1.1  oster   unblockNode = &nodes[i]; i += 1;
    220  1.1  oster   termNode    = &nodes[i]; i += 1;
    221  1.1  oster 
    222  1.1  oster   dag_h->numCommitNodes = nWndNodes + 1;
    223  1.1  oster   dag_h->numCommits = 0;
    224  1.1  oster   dag_h->numSuccedents = 1;
    225  1.1  oster 
    226  1.1  oster   rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
    227  1.1  oster   if (nRodNodes > 0)
    228  1.1  oster     RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
    229  1.1  oster 
    230  1.1  oster   /* begin node initialization */
    231  1.1  oster   rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nRodNodes + 1, 0, 0, 0, dag_h, "Nil", allocList);
    232  1.1  oster   rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, nWndNodes + 1, 0, 0, dag_h, "Nil", allocList);
    233  1.1  oster   rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes + 1, 0, 0, dag_h, "Nil", allocList);
    234  1.1  oster   rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
    235  1.1  oster 
    236  1.1  oster   /* initialize the Rod nodes */
    237  1.1  oster   for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
    238  1.1  oster     if (new_asm_h[asmNum]) {
    239  1.1  oster       pda = new_asm_h[asmNum]->stripeMap->physInfo;
    240  1.1  oster       while (pda) {
    241  1.1  oster 	rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc,rf_DiskReadUndoFunc,rf_GenericWakeupFunc,1,1,4,0, dag_h, "Rod", allocList);
    242  1.1  oster 	rodNodes[nodeNum].params[0].p = pda;
    243  1.1  oster 	rodNodes[nodeNum].params[1].p = pda->bufPtr;
    244  1.1  oster 	rodNodes[nodeNum].params[2].v = parityStripeID;
    245  1.1  oster 	rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
    246  1.1  oster 	nodeNum++;
    247  1.1  oster 	pda=pda->next;
    248  1.1  oster       }
    249  1.1  oster     }
    250  1.1  oster   }
    251  1.1  oster   RF_ASSERT(nodeNum == nRodNodes);
    252  1.1  oster 
    253  1.1  oster   /* initialize the wnd nodes */
    254  1.1  oster   pda = asmap->physInfo;
    255  1.1  oster   for (i=0; i < nWndNodes; i++) {
    256  1.1  oster     rf_InitNode(&wndNodes[i], rf_wait, RF_TRUE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
    257  1.1  oster     RF_ASSERT(pda != NULL);
    258  1.1  oster     wndNodes[i].params[0].p = pda;
    259  1.1  oster     wndNodes[i].params[1].p = pda->bufPtr;
    260  1.1  oster     wndNodes[i].params[2].v = parityStripeID;
    261  1.1  oster     wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
    262  1.1  oster     pda = pda->next;
    263  1.1  oster   }
    264  1.1  oster 
    265  1.1  oster   /* initialize the redundancy node */
    266  1.1  oster   rf_InitNode(xorNode, rf_wait, RF_TRUE, redFunc, rf_NullNodeUndoFunc, NULL, 1, 1, 2*(nWndNodes+nRodNodes)+1, 1, dag_h, "Xr ", allocList);
    267  1.1  oster   xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
    268  1.1  oster   for (i=0; i < nWndNodes; i++) {
    269  1.1  oster     xorNode->params[2*i+0] = wndNodes[i].params[0];         /* pda */
    270  1.1  oster     xorNode->params[2*i+1] = wndNodes[i].params[1];         /* buf ptr */
    271  1.1  oster   }
    272  1.1  oster   for (i=0; i < nRodNodes; i++) {
    273  1.1  oster     xorNode->params[2*(nWndNodes+i)+0] = rodNodes[i].params[0];         /* pda */
    274  1.1  oster     xorNode->params[2*(nWndNodes+i)+1] = rodNodes[i].params[1];         /* buf ptr */
    275  1.1  oster   }
    276  1.1  oster   xorNode->params[2*(nWndNodes+nRodNodes)].p = raidPtr;  /* xor node needs to get at RAID information */
    277  1.1  oster 
    278  1.1  oster   /* look for an Rod node that reads a complete SU.  If none, alloc a buffer to receive the parity info.
    279  1.1  oster    * Note that we can't use a new data buffer because it will not have gotten written when the xor occurs.
    280  1.1  oster    */
    281  1.1  oster   for (i = 0; i < nRodNodes; i++)
    282  1.1  oster     if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
    283  1.1  oster       break;
    284  1.1  oster   if (i == nRodNodes) {
    285  1.1  oster     RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
    286  1.1  oster   }
    287  1.1  oster   else {
    288  1.1  oster     xorNode->results[0] = rodNodes[i].params[1].p;
    289  1.1  oster   }
    290  1.1  oster 
    291  1.1  oster   /* initialize the Lpo node */
    292  1.1  oster   rf_InitNode(lpoNode, rf_wait, RF_FALSE, rf_ParityLogOverwriteFunc, rf_ParityLogOverwriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Lpo", allocList);
    293  1.1  oster 
    294  1.1  oster   lpoNode->params[0].p = asmap->parityInfo;
    295  1.1  oster   lpoNode->params[1].p = xorNode->results[0];
    296  1.1  oster   RF_ASSERT(asmap->parityInfo->next == NULL);        /* parityInfo must describe entire parity unit */
    297  1.1  oster 
    298  1.1  oster   /* connect nodes to form graph */
    299  1.1  oster 
    300  1.1  oster   /* connect dag header to block node */
    301  1.1  oster   RF_ASSERT(dag_h->numSuccedents == 1);
    302  1.1  oster   RF_ASSERT(blockNode->numAntecedents == 0);
    303  1.1  oster   dag_h->succedents[0] = blockNode;
    304  1.1  oster 
    305  1.1  oster   /* connect the block node to the Rod nodes */
    306  1.1  oster   RF_ASSERT(blockNode->numSuccedents == nRodNodes + 1);
    307  1.1  oster   for (i = 0; i < nRodNodes; i++) {
    308  1.1  oster     RF_ASSERT(rodNodes[i].numAntecedents == 1);
    309  1.1  oster     blockNode->succedents[i] = &rodNodes[i];
    310  1.1  oster     rodNodes[i].antecedents[0] = blockNode;
    311  1.1  oster     rodNodes[i].antType[0] = rf_control;
    312  1.1  oster   }
    313  1.1  oster 
    314  1.1  oster   /* connect the block node to the sync node */
    315  1.1  oster   /* necessary if nRodNodes == 0 */
    316  1.1  oster   RF_ASSERT(syncNode->numAntecedents == nRodNodes + 1);
    317  1.1  oster   blockNode->succedents[nRodNodes] = syncNode;
    318  1.1  oster   syncNode->antecedents[0] = blockNode;
    319  1.1  oster   syncNode->antType[0] = rf_control;
    320  1.1  oster 
    321  1.1  oster   /* connect the Rod nodes to the syncNode */
    322  1.1  oster   for (i = 0; i < nRodNodes; i++) {
    323  1.1  oster     rodNodes[i].succedents[0] = syncNode;
    324  1.1  oster     syncNode->antecedents[1 + i] = &rodNodes[i];
    325  1.1  oster     syncNode->antType[1 + i] = rf_control;
    326  1.1  oster   }
    327  1.1  oster 
    328  1.1  oster   /* connect the sync node to the xor node */
    329  1.1  oster   RF_ASSERT(syncNode->numSuccedents == nWndNodes + 1);
    330  1.1  oster   RF_ASSERT(xorNode->numAntecedents == 1);
    331  1.1  oster   syncNode->succedents[0] = xorNode;
    332  1.1  oster   xorNode->antecedents[0] = syncNode;
    333  1.1  oster   xorNode->antType[0] = rf_trueData; /* carry forward from sync */
    334  1.1  oster 
    335  1.1  oster   /* connect the sync node to the Wnd nodes */
    336  1.1  oster   for (i = 0; i < nWndNodes; i++) {
    337  1.1  oster     RF_ASSERT(wndNodes->numAntecedents == 1);
    338  1.1  oster     syncNode->succedents[1 + i] = &wndNodes[i];
    339  1.1  oster     wndNodes[i].antecedents[0] = syncNode;
    340  1.1  oster     wndNodes[i].antType[0] = rf_control;
    341  1.1  oster   }
    342  1.1  oster 
    343  1.1  oster   /* connect the xor node to the Lpo node */
    344  1.1  oster   RF_ASSERT(xorNode->numSuccedents == 1);
    345  1.1  oster   RF_ASSERT(lpoNode->numAntecedents == 1);
    346  1.1  oster   xorNode->succedents[0] = lpoNode;
    347  1.1  oster   lpoNode->antecedents[0]= xorNode;
    348  1.1  oster   lpoNode->antType[0] = rf_trueData;
    349  1.1  oster 
    350  1.1  oster   /* connect the Wnd nodes to the unblock node */
    351  1.1  oster   RF_ASSERT(unblockNode->numAntecedents == nWndNodes + 1);
    352  1.1  oster   for (i = 0; i < nWndNodes; i++) {
    353  1.1  oster     RF_ASSERT(wndNodes->numSuccedents == 1);
    354  1.1  oster     wndNodes[i].succedents[0] = unblockNode;
    355  1.1  oster     unblockNode->antecedents[i] = &wndNodes[i];
    356  1.1  oster     unblockNode->antType[i] = rf_control;
    357  1.1  oster   }
    358  1.1  oster 
    359  1.1  oster   /* connect the Lpo node to the unblock node */
    360  1.1  oster   RF_ASSERT(lpoNode->numSuccedents == 1);
    361  1.1  oster   lpoNode->succedents[0] = unblockNode;
    362  1.1  oster   unblockNode->antecedents[nWndNodes] = lpoNode;
    363  1.1  oster   unblockNode->antType[nWndNodes] = rf_control;
    364  1.1  oster 
    365  1.1  oster   /* connect unblock node to terminator */
    366  1.1  oster   RF_ASSERT(unblockNode->numSuccedents == 1);
    367  1.1  oster   RF_ASSERT(termNode->numAntecedents == 1);
    368  1.1  oster   RF_ASSERT(termNode->numSuccedents == 0);
    369  1.1  oster   unblockNode->succedents[0] = termNode;
    370  1.1  oster   termNode->antecedents[0] = unblockNode;
    371  1.1  oster   termNode->antType[0] = rf_control;
    372  1.1  oster }
    373  1.1  oster 
    374  1.1  oster 
    375  1.1  oster 
    376  1.1  oster 
    377  1.1  oster /******************************************************************************
    378  1.1  oster  *
    379  1.1  oster  * creates a DAG to perform a small-write operation (either raid 5 or pq), which is as follows:
    380  1.1  oster  *
    381  1.1  oster  *                                     Header
    382  1.1  oster  *                                       |
    383  1.1  oster  *                                     Block
    384  1.1  oster  *                                 / |  ... \   \
    385  1.1  oster  *                                /  |       \   \
    386  1.1  oster  *                             Rod  Rod      Rod  Rop
    387  1.1  oster  *                             | \ /| \    / |  \/ |
    388  1.1  oster  *                             |    |        |  /\ |
    389  1.1  oster  *                             Wnd  Wnd      Wnd   X
    390  1.1  oster  *                              |    \       /     |
    391  1.1  oster  *                              |     \     /      |
    392  1.1  oster  *                               \     \   /      Lpo
    393  1.1  oster  *                                \     \ /       /
    394  1.1  oster  *                                 +-> Unblock <-+
    395  1.1  oster  *                                       |
    396  1.1  oster  *                                       T
    397  1.1  oster  *
    398  1.1  oster  *
    399  1.1  oster  * R = Read, W = Write, X = Xor, o = old, n = new, d = data, p = parity.
    400  1.1  oster  * When the access spans a stripe unit boundary and is less than one SU in size, there will
    401  1.1  oster  * be two Rop -- X -- Wnp branches.  I call this the "double-XOR" case.
    402  1.1  oster  * The second output from each Rod node goes to the X node.  In the double-XOR
    403  1.1  oster  * case, there are exactly 2 Rod nodes, and each sends one output to one X node.
    404  1.1  oster  * There is one Rod -- Wnd -- T branch for each stripe unit being updated.
    405  1.1  oster  *
    406  1.1  oster  * The block and unblock nodes are unused.  See comment above CreateFaultFreeReadDAG.
    407  1.1  oster  *
    408  1.1  oster  * Note:  this DAG ignores all the optimizations related to making the RMWs atomic.
    409  1.1  oster  *        it also has the nasty property that none of the buffers allocated for reading
    410  1.1  oster  *        old data & parity can be freed until the XOR node fires.  Need to fix this.
    411  1.1  oster  *
    412  1.1  oster  * A null qfuncs indicates single fault tolerant
    413  1.1  oster  *****************************************************************************/
    414  1.1  oster 
    415  1.1  oster void rf_CommonCreateParityLoggingSmallWriteDAG(
    416  1.1  oster   RF_Raid_t             *raidPtr,
    417  1.1  oster   RF_AccessStripeMap_t  *asmap,
    418  1.1  oster   RF_DagHeader_t        *dag_h,
    419  1.1  oster   void                  *bp,
    420  1.1  oster   RF_RaidAccessFlags_t   flags,
    421  1.1  oster   RF_AllocListElem_t    *allocList,
    422  1.1  oster   RF_RedFuncs_t         *pfuncs,
    423  1.1  oster   RF_RedFuncs_t         *qfuncs)
    424  1.1  oster {
    425  1.1  oster   RF_DagNode_t *xorNodes, *blockNode, *unblockNode, *nodes;
    426  1.1  oster   RF_DagNode_t *readDataNodes, *readParityNodes;
    427  1.1  oster   RF_DagNode_t *writeDataNodes, *lpuNodes;
    428  1.1  oster   RF_DagNode_t *unlockDataNodes=NULL, *termNode;
    429  1.1  oster   RF_PhysDiskAddr_t *pda = asmap->physInfo;
    430  1.1  oster   int numDataNodes = asmap->numStripeUnitsAccessed;
    431  1.1  oster   int numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
    432  1.1  oster   int i, j, nNodes, totalNumNodes;
    433  1.1  oster   RF_ReconUnitNum_t which_ru;
    434  1.1  oster   int (*func)(RF_DagNode_t *node), (*undoFunc)(RF_DagNode_t *node);
    435  1.1  oster   int (*qfunc)(RF_DagNode_t *node);
    436  1.1  oster   char *name, *qname;
    437  1.1  oster   RF_StripeNum_t parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
    438  1.1  oster   long nfaults = qfuncs ? 2 : 1;
    439  1.1  oster   int lu_flag = (rf_enableAtomicRMW) ? 1 : 0;          /* lock/unlock flag */
    440  1.1  oster 
    441  1.1  oster   if (rf_dagDebug) printf("[Creating parity-logging small-write DAG]\n");
    442  1.1  oster   RF_ASSERT(numDataNodes > 0);
    443  1.1  oster   RF_ASSERT(nfaults == 1);
    444  1.1  oster   dag_h->creator = "ParityLoggingSmallWriteDAG";
    445  1.1  oster 
    446  1.1  oster   /* DAG creation occurs in three steps:
    447  1.1  oster      1. count the number of nodes in the DAG
    448  1.1  oster      2. create the nodes
    449  1.1  oster      3. initialize the nodes
    450  1.1  oster      4. connect the nodes
    451  1.1  oster    */
    452  1.1  oster 
    453  1.1  oster   /* Step 1. compute number of nodes in the graph */
    454  1.1  oster 
    455  1.1  oster   /* number of nodes:
    456  1.1  oster       a read and write for each data unit
    457  1.1  oster       a redundancy computation node for each parity node
    458  1.1  oster       a read and Lpu for each parity unit
    459  1.1  oster       a block and unblock node (2)
    460  1.1  oster       a terminator node
    461  1.1  oster       if atomic RMW
    462  1.1  oster         an unlock node for each data unit, redundancy unit
    463  1.1  oster   */
    464  1.1  oster   totalNumNodes = (2 * numDataNodes) + numParityNodes + (2 * numParityNodes) + 3;
    465  1.1  oster   if (lu_flag)
    466  1.1  oster     totalNumNodes += numDataNodes;
    467  1.1  oster 
    468  1.1  oster   nNodes     = numDataNodes + numParityNodes;
    469  1.1  oster 
    470  1.1  oster   dag_h->numCommitNodes = numDataNodes + numParityNodes;
    471  1.1  oster   dag_h->numCommits = 0;
    472  1.1  oster   dag_h->numSuccedents = 1;
    473  1.1  oster 
    474  1.1  oster   /* Step 2. create the nodes */
    475  1.1  oster   RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
    476  1.1  oster   i = 0;
    477  1.1  oster   blockNode         = &nodes[i]; i += 1;
    478  1.1  oster   unblockNode       = &nodes[i]; i += 1;
    479  1.1  oster   readDataNodes     = &nodes[i]; i += numDataNodes;
    480  1.1  oster   readParityNodes   = &nodes[i]; i += numParityNodes;
    481  1.1  oster   writeDataNodes    = &nodes[i]; i += numDataNodes;
    482  1.1  oster   lpuNodes          = &nodes[i]; i += numParityNodes;
    483  1.1  oster   xorNodes          = &nodes[i]; i += numParityNodes;
    484  1.1  oster   termNode          = &nodes[i]; i += 1;
    485  1.1  oster   if (lu_flag) {
    486  1.1  oster     unlockDataNodes = &nodes[i]; i += numDataNodes;
    487  1.1  oster   }
    488  1.1  oster   RF_ASSERT(i == totalNumNodes);
    489  1.1  oster 
    490  1.1  oster   /* Step 3. initialize the nodes */
    491  1.1  oster   /* initialize block node (Nil) */
    492  1.1  oster   rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
    493  1.1  oster 
    494  1.1  oster   /* initialize unblock node (Nil) */
    495  1.1  oster   rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, nNodes, 0, 0, dag_h, "Nil", allocList);
    496  1.1  oster 
    497  1.1  oster   /* initialize terminatory node (Trm) */
    498  1.1  oster   rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
    499  1.1  oster 
    500  1.1  oster   /* initialize nodes which read old data (Rod) */
    501  1.1  oster   for (i = 0; i < numDataNodes; i++) {
    502  1.1  oster     rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, nNodes, 1, 4, 0, dag_h, "Rod", allocList);
    503  1.1  oster     RF_ASSERT(pda != NULL);
    504  1.1  oster     readDataNodes[i].params[0].p = pda; /* physical disk addr desc */
    505  1.1  oster     readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList);  /* buffer to hold old data */
    506  1.1  oster     readDataNodes[i].params[2].v = parityStripeID;
    507  1.1  oster     readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
    508  1.1  oster     pda=pda->next;
    509  1.1  oster     readDataNodes[i].propList[0] = NULL;
    510  1.1  oster     readDataNodes[i].propList[1] = NULL;
    511  1.1  oster   }
    512  1.1  oster 
    513  1.1  oster   /* initialize nodes which read old parity (Rop) */
    514  1.1  oster   pda = asmap->parityInfo; i = 0;
    515  1.1  oster   for (i = 0; i < numParityNodes; i++) {
    516  1.1  oster     RF_ASSERT(pda != NULL);
    517  1.1  oster     rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, nNodes, 1, 4, 0, dag_h, "Rop", allocList);
    518  1.1  oster     readParityNodes[i].params[0].p = pda;
    519  1.1  oster     readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList);    /* buffer to hold old parity */
    520  1.1  oster     readParityNodes[i].params[2].v = parityStripeID;
    521  1.1  oster     readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
    522  1.1  oster     readParityNodes[i].propList[0] = NULL;
    523  1.1  oster     pda=pda->next;
    524  1.1  oster   }
    525  1.1  oster 
    526  1.1  oster   /* initialize nodes which write new data (Wnd) */
    527  1.1  oster   pda = asmap->physInfo;
    528  1.1  oster   for (i=0; i < numDataNodes; i++) {
    529  1.1  oster     RF_ASSERT(pda != NULL);
    530  1.1  oster     rf_InitNode(&writeDataNodes[i], rf_wait, RF_TRUE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, nNodes, 4, 0, dag_h, "Wnd", allocList);
    531  1.1  oster     writeDataNodes[i].params[0].p = pda;                    /* physical disk addr desc */
    532  1.1  oster     writeDataNodes[i].params[1].p = pda->bufPtr;   /* buffer holding new data to be written */
    533  1.1  oster     writeDataNodes[i].params[2].v = parityStripeID;
    534  1.1  oster     writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
    535  1.1  oster 
    536  1.1  oster     if (lu_flag) {
    537  1.1  oster       /* initialize node to unlock the disk queue */
    538  1.1  oster       rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Und", allocList);
    539  1.1  oster       unlockDataNodes[i].params[0].p = pda; /* physical disk addr desc */
    540  1.1  oster       unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
    541  1.1  oster     }
    542  1.1  oster     pda = pda->next;
    543  1.1  oster   }
    544  1.1  oster 
    545  1.1  oster 
    546  1.1  oster   /* initialize nodes which compute new parity */
    547  1.1  oster   /* we use the simple XOR func in the double-XOR case, and when we're accessing only a portion of one stripe unit.
    548  1.1  oster    * the distinction between the two is that the regular XOR func assumes that the targbuf is a full SU in size,
    549  1.1  oster    * and examines the pda associated with the buffer to decide where within the buffer to XOR the data, whereas
    550  1.1  oster    * the simple XOR func just XORs the data into the start of the buffer.
    551  1.1  oster    */
    552  1.1  oster   if ((numParityNodes==2) || ((numDataNodes == 1) && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
    553  1.1  oster     func = pfuncs->simple; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->SimpleName;
    554  1.1  oster     if (qfuncs)
    555  1.1  oster       { qfunc = qfuncs->simple; qname = qfuncs->SimpleName;}
    556  1.1  oster   } else {
    557  1.1  oster     func = pfuncs->regular; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->RegularName;
    558  1.1  oster     if (qfuncs) { qfunc = qfuncs->regular; qname = qfuncs->RegularName;}
    559  1.1  oster   }
    560  1.1  oster   /* initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop} nodes, and raidPtr  */
    561  1.1  oster   if (numParityNodes==2) {        /* double-xor case */
    562  1.1  oster     for (i=0; i < numParityNodes; i++) {
    563  1.1  oster       rf_InitNode(&xorNodes[i], rf_wait, RF_TRUE, func, undoFunc, NULL, 1, nNodes, 7, 1, dag_h, name, allocList);  /* no wakeup func for xor */
    564  1.1  oster       xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
    565  1.1  oster       xorNodes[i].params[0] = readDataNodes[i].params[0];
    566  1.1  oster       xorNodes[i].params[1] = readDataNodes[i].params[1];
    567  1.1  oster       xorNodes[i].params[2] = readParityNodes[i].params[0];
    568  1.1  oster       xorNodes[i].params[3] = readParityNodes[i].params[1];
    569  1.1  oster       xorNodes[i].params[4] = writeDataNodes[i].params[0];
    570  1.1  oster       xorNodes[i].params[5] = writeDataNodes[i].params[1];
    571  1.1  oster       xorNodes[i].params[6].p = raidPtr;
    572  1.1  oster       xorNodes[i].results[0] = readParityNodes[i].params[1].p;   /* use old parity buf as target buf */
    573  1.1  oster     }
    574  1.1  oster   }
    575  1.1  oster   else {
    576  1.1  oster     /* there is only one xor node in this case */
    577  1.1  oster     rf_InitNode(&xorNodes[0], rf_wait, RF_TRUE, func, undoFunc, NULL, 1, nNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
    578  1.1  oster     xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
    579  1.1  oster     for (i=0; i < numDataNodes + 1; i++) {
    580  1.1  oster       /* set up params related to Rod and Rop nodes */
    581  1.1  oster       xorNodes[0].params[2*i+0] = readDataNodes[i].params[0];    /* pda */
    582  1.1  oster       xorNodes[0].params[2*i+1] = readDataNodes[i].params[1];    /* buffer pointer */
    583  1.1  oster     }
    584  1.1  oster     for (i=0; i < numDataNodes; i++) {
    585  1.1  oster       /* set up params related to Wnd and Wnp nodes */
    586  1.1  oster       xorNodes[0].params[2*(numDataNodes+1+i)+0] = writeDataNodes[i].params[0]; /* pda */
    587  1.1  oster       xorNodes[0].params[2*(numDataNodes+1+i)+1] = writeDataNodes[i].params[1]; /* buffer pointer */
    588  1.1  oster     }
    589  1.1  oster     xorNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr;  /* xor node needs to get at RAID information */
    590  1.1  oster     xorNodes[0].results[0] = readParityNodes[0].params[1].p;
    591  1.1  oster   }
    592  1.1  oster 
    593  1.1  oster   /* initialize the log node(s) */
    594  1.1  oster   pda = asmap->parityInfo;
    595  1.1  oster   for (i = 0;  i < numParityNodes; i++) {
    596  1.1  oster     RF_ASSERT(pda);
    597  1.1  oster     rf_InitNode(&lpuNodes[i], rf_wait, RF_FALSE, rf_ParityLogUpdateFunc, rf_ParityLogUpdateUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Lpu", allocList);
    598  1.1  oster     lpuNodes[i].params[0].p = pda;                    /* PhysDiskAddr of parity */
    599  1.1  oster     lpuNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer to parity */
    600  1.1  oster     pda = pda->next;
    601  1.1  oster   }
    602  1.1  oster 
    603  1.1  oster 
    604  1.1  oster   /* Step 4. connect the nodes */
    605  1.1  oster 
    606  1.1  oster   /* connect header to block node */
    607  1.1  oster   RF_ASSERT(dag_h->numSuccedents == 1);
    608  1.1  oster   RF_ASSERT(blockNode->numAntecedents == 0);
    609  1.1  oster   dag_h->succedents[0] = blockNode;
    610  1.1  oster 
    611  1.1  oster   /* connect block node to read old data nodes */
    612  1.1  oster   RF_ASSERT(blockNode->numSuccedents == (numDataNodes + numParityNodes));
    613  1.1  oster   for (i = 0; i < numDataNodes; i++) {
    614  1.1  oster     blockNode->succedents[i] = &readDataNodes[i];
    615  1.1  oster     RF_ASSERT(readDataNodes[i].numAntecedents == 1);
    616  1.1  oster     readDataNodes[i].antecedents[0]= blockNode;
    617  1.1  oster     readDataNodes[i].antType[0] = rf_control;
    618  1.1  oster   }
    619  1.1  oster 
    620  1.1  oster   /* connect block node to read old parity nodes */
    621  1.1  oster   for (i = 0; i < numParityNodes; i++) {
    622  1.1  oster     blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
    623  1.1  oster     RF_ASSERT(readParityNodes[i].numAntecedents == 1);
    624  1.1  oster     readParityNodes[i].antecedents[0] = blockNode;
    625  1.1  oster     readParityNodes[i].antType[0] = rf_control;
    626  1.1  oster   }
    627  1.1  oster 
    628  1.1  oster   /* connect read old data nodes to write new data nodes */
    629  1.1  oster   for (i = 0; i < numDataNodes; i++) {
    630  1.1  oster     RF_ASSERT(readDataNodes[i].numSuccedents == numDataNodes + numParityNodes);
    631  1.1  oster     for (j = 0; j < numDataNodes; j++) {
    632  1.1  oster       RF_ASSERT(writeDataNodes[j].numAntecedents == numDataNodes + numParityNodes);
    633  1.1  oster       readDataNodes[i].succedents[j] = &writeDataNodes[j];
    634  1.1  oster       writeDataNodes[j].antecedents[i] = &readDataNodes[i];
    635  1.1  oster       if (i == j)
    636  1.1  oster 	writeDataNodes[j].antType[i] = rf_antiData;
    637  1.1  oster       else
    638  1.1  oster 	writeDataNodes[j].antType[i] = rf_control;
    639  1.1  oster     }
    640  1.1  oster   }
    641  1.1  oster 
    642  1.1  oster   /* connect read old data nodes to xor nodes */
    643  1.1  oster   for (i = 0; i < numDataNodes; i++)
    644  1.1  oster     for (j = 0; j < numParityNodes; j++){
    645  1.1  oster       RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
    646  1.1  oster       readDataNodes[i].succedents[numDataNodes + j] = &xorNodes[j];
    647  1.1  oster       xorNodes[j].antecedents[i] = &readDataNodes[i];
    648  1.1  oster       xorNodes[j].antType[i] = rf_trueData;
    649  1.1  oster     }
    650  1.1  oster 
    651  1.1  oster   /* connect read old parity nodes to write new data nodes */
    652  1.1  oster   for (i = 0; i < numParityNodes; i++) {
    653  1.1  oster     RF_ASSERT(readParityNodes[i].numSuccedents == numDataNodes + numParityNodes);
    654  1.1  oster     for (j = 0; j < numDataNodes; j++) {
    655  1.1  oster       readParityNodes[i].succedents[j] = &writeDataNodes[j];
    656  1.1  oster       writeDataNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
    657  1.1  oster       writeDataNodes[j].antType[numDataNodes + i] = rf_control;
    658  1.1  oster     }
    659  1.1  oster   }
    660  1.1  oster 
    661  1.1  oster   /* connect read old parity nodes to xor nodes */
    662  1.1  oster   for (i = 0; i < numParityNodes; i++)
    663  1.1  oster     for (j = 0; j < numParityNodes; j++) {
    664  1.1  oster       readParityNodes[i].succedents[numDataNodes + j] = &xorNodes[j];
    665  1.1  oster       xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
    666  1.1  oster       xorNodes[j].antType[numDataNodes + i] = rf_trueData;
    667  1.1  oster     }
    668  1.1  oster 
    669  1.1  oster   /* connect xor nodes to write new parity nodes */
    670  1.1  oster   for (i = 0; i < numParityNodes; i++) {
    671  1.1  oster     RF_ASSERT(xorNodes[i].numSuccedents == 1);
    672  1.1  oster     RF_ASSERT(lpuNodes[i].numAntecedents == 1);
    673  1.1  oster     xorNodes[i].succedents[0] = &lpuNodes[i];
    674  1.1  oster     lpuNodes[i].antecedents[0] = &xorNodes[i];
    675  1.1  oster     lpuNodes[i].antType[0] = rf_trueData;
    676  1.1  oster   }
    677  1.1  oster 
    678  1.1  oster   for (i = 0; i < numDataNodes; i++) {
    679  1.1  oster     if (lu_flag) {
    680  1.1  oster       /* connect write new data nodes to unlock nodes */
    681  1.1  oster       RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
    682  1.1  oster       RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
    683  1.1  oster       writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
    684  1.1  oster       unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
    685  1.1  oster       unlockDataNodes[i].antType[0] = rf_control;
    686  1.1  oster 
    687  1.1  oster       /* connect unlock nodes to unblock node */
    688  1.1  oster       RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
    689  1.1  oster       RF_ASSERT(unblockNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
    690  1.1  oster       unlockDataNodes[i].succedents[0] = unblockNode;
    691  1.1  oster       unblockNode->antecedents[i] = &unlockDataNodes[i];
    692  1.1  oster       unblockNode->antType[i] = rf_control;
    693  1.1  oster     }
    694  1.1  oster     else {
    695  1.1  oster       /* connect write new data nodes to unblock node */
    696  1.1  oster       RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
    697  1.1  oster       RF_ASSERT(unblockNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
    698  1.1  oster       writeDataNodes[i].succedents[0] = unblockNode;
    699  1.1  oster       unblockNode->antecedents[i] = &writeDataNodes[i];
    700  1.1  oster       unblockNode->antType[i] = rf_control;
    701  1.1  oster     }
    702  1.1  oster   }
    703  1.1  oster 
    704  1.1  oster   /* connect write new parity nodes to unblock node */
    705  1.1  oster   for (i = 0; i < numParityNodes; i++) {
    706  1.1  oster     RF_ASSERT(lpuNodes[i].numSuccedents == 1);
    707  1.1  oster     lpuNodes[i].succedents[0] = unblockNode;
    708  1.1  oster     unblockNode->antecedents[numDataNodes + i] = &lpuNodes[i];
    709  1.1  oster     unblockNode->antType[numDataNodes + i] = rf_control;
    710  1.1  oster   }
    711  1.1  oster 
    712  1.1  oster   /* connect unblock node to terminator */
    713  1.1  oster   RF_ASSERT(unblockNode->numSuccedents == 1);
    714  1.1  oster   RF_ASSERT(termNode->numAntecedents == 1);
    715  1.1  oster   RF_ASSERT(termNode->numSuccedents == 0);
    716  1.1  oster   unblockNode->succedents[0] = termNode;
    717  1.1  oster   termNode->antecedents[0] = unblockNode;
    718  1.1  oster   termNode->antType[0] = rf_control;
    719  1.1  oster }
    720  1.1  oster 
    721  1.1  oster 
    722  1.1  oster void rf_CreateParityLoggingSmallWriteDAG(
    723  1.1  oster   RF_Raid_t             *raidPtr,
    724  1.1  oster   RF_AccessStripeMap_t  *asmap,
    725  1.1  oster   RF_DagHeader_t        *dag_h,
    726  1.1  oster   void                  *bp,
    727  1.1  oster   RF_RaidAccessFlags_t   flags,
    728  1.1  oster   RF_AllocListElem_t    *allocList,
    729  1.1  oster   RF_RedFuncs_t         *pfuncs,
    730  1.1  oster   RF_RedFuncs_t         *qfuncs)
    731  1.1  oster {
    732  1.1  oster   dag_h->creator = "ParityLoggingSmallWriteDAG";
    733  1.1  oster   rf_CommonCreateParityLoggingSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, &rf_xorFuncs, NULL);
    734  1.1  oster }
    735  1.1  oster 
    736  1.1  oster 
    737  1.1  oster void rf_CreateParityLoggingLargeWriteDAG(
    738  1.1  oster   RF_Raid_t              *raidPtr,
    739  1.1  oster   RF_AccessStripeMap_t   *asmap,
    740  1.1  oster   RF_DagHeader_t         *dag_h,
    741  1.1  oster   void                   *bp,
    742  1.1  oster   RF_RaidAccessFlags_t    flags,
    743  1.1  oster   RF_AllocListElem_t     *allocList,
    744  1.1  oster   int                     nfaults,
    745  1.1  oster   int                   (*redFunc)(RF_DagNode_t *))
    746  1.1  oster {
    747  1.1  oster   dag_h->creator = "ParityLoggingSmallWriteDAG";
    748  1.1  oster   rf_CommonCreateParityLoggingLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 1, rf_RegularXorFunc);
    749  1.1  oster }
    750  1.1  oster 
    751  1.1  oster #endif /* RF_INCLUDE_PARITYLOGGING > 0 */
    752