Home | History | Annotate | Line # | Download | only in raidframe
rf_dagffwr.c revision 1.1
      1  1.1  oster /*	$NetBSD: rf_dagffwr.c,v 1.1 1998/11/13 04:20:27 oster Exp $	*/
      2  1.1  oster /*
      3  1.1  oster  * Copyright (c) 1995 Carnegie-Mellon University.
      4  1.1  oster  * All rights reserved.
      5  1.1  oster  *
      6  1.1  oster  * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II
      7  1.1  oster  *
      8  1.1  oster  * Permission to use, copy, modify and distribute this software and
      9  1.1  oster  * its documentation is hereby granted, provided that both the copyright
     10  1.1  oster  * notice and this permission notice appear in all copies of the
     11  1.1  oster  * software, derivative works or modified versions, and any portions
     12  1.1  oster  * thereof, and that both notices appear in supporting documentation.
     13  1.1  oster  *
     14  1.1  oster  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
     15  1.1  oster  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
     16  1.1  oster  * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
     17  1.1  oster  *
     18  1.1  oster  * Carnegie Mellon requests users of this software to return to
     19  1.1  oster  *
     20  1.1  oster  *  Software Distribution Coordinator  or  Software.Distribution (at) CS.CMU.EDU
     21  1.1  oster  *  School of Computer Science
     22  1.1  oster  *  Carnegie Mellon University
     23  1.1  oster  *  Pittsburgh PA 15213-3890
     24  1.1  oster  *
     25  1.1  oster  * any improvements or extensions that they make and grant Carnegie the
     26  1.1  oster  * rights to redistribute these changes.
     27  1.1  oster  */
     28  1.1  oster 
     29  1.1  oster /*
     30  1.1  oster  * rf_dagff.c
     31  1.1  oster  *
     32  1.1  oster  * code for creating fault-free DAGs
     33  1.1  oster  *
     34  1.1  oster  * :
     35  1.1  oster  * Log: rf_dagffwr.c,v
     36  1.1  oster  * Revision 1.19  1996/07/31 15:35:24  jimz
     37  1.1  oster  * evenodd changes; bugfixes for double-degraded archs, generalize
     38  1.1  oster  * some formerly PQ-only functions
     39  1.1  oster  *
     40  1.1  oster  * Revision 1.18  1996/07/28  20:31:39  jimz
     41  1.1  oster  * i386netbsd port
     42  1.1  oster  * true/false fixup
     43  1.1  oster  *
     44  1.1  oster  * Revision 1.17  1996/07/27  18:40:24  jimz
     45  1.1  oster  * cleanup sweep
     46  1.1  oster  *
     47  1.1  oster  * Revision 1.16  1996/07/22  19:52:16  jimz
     48  1.1  oster  * switched node params to RF_DagParam_t, a union of
     49  1.1  oster  * a 64-bit int and a void *, for better portability
     50  1.1  oster  * attempted hpux port, but failed partway through for
     51  1.1  oster  * lack of a single C compiler capable of compiling all
     52  1.1  oster  * source files
     53  1.1  oster  *
     54  1.1  oster  * Revision 1.15  1996/06/11  01:27:50  jimz
     55  1.1  oster  * Fixed bug where diskthread shutdown would crash or hang. This
     56  1.1  oster  * turned out to be two distinct bugs:
     57  1.1  oster  * (1) [crash] The thread shutdown code wasn't properly waiting for
     58  1.1  oster  * all the diskthreads to complete. This caused diskthreads that were
     59  1.1  oster  * exiting+cleaning up to unlock a destroyed mutex.
     60  1.1  oster  * (2) [hang] TerminateDiskQueues wasn't locking, and DiskIODequeue
     61  1.1  oster  * only checked for termination _after_ a wakeup if the queues were
     62  1.1  oster  * empty. This was a race where the termination wakeup could be lost
     63  1.1  oster  * by the dequeueing thread, and the system would hang waiting for the
     64  1.1  oster  * thread to exit, while the thread waited for an I/O or a signal to
     65  1.1  oster  * check the termination flag.
     66  1.1  oster  *
     67  1.1  oster  * Revision 1.14  1996/06/10  22:24:01  wvcii
     68  1.1  oster  * added write dags which do not have a commit node and are
     69  1.1  oster  * used in forward and backward error recovery experiments.
     70  1.1  oster  *
     71  1.1  oster  * Revision 1.13  1996/06/07  22:26:27  jimz
     72  1.1  oster  * type-ify which_ru (RF_ReconUnitNum_t)
     73  1.1  oster  *
     74  1.1  oster  * Revision 1.12  1996/06/07  21:33:04  jimz
     75  1.1  oster  * begin using consistent types for sector numbers,
     76  1.1  oster  * stripe numbers, row+col numbers, recon unit numbers
     77  1.1  oster  *
     78  1.1  oster  * Revision 1.11  1996/05/31  22:26:54  jimz
     79  1.1  oster  * fix a lot of mapping problems, memory allocation problems
     80  1.1  oster  * found some weird lock issues, fixed 'em
     81  1.1  oster  * more code cleanup
     82  1.1  oster  *
     83  1.1  oster  * Revision 1.10  1996/05/30  11:29:41  jimz
     84  1.1  oster  * Numerous bug fixes. Stripe lock release code disagreed with the taking code
     85  1.1  oster  * about when stripes should be locked (I made it consistent: no parity, no lock)
     86  1.1  oster  * There was a lot of extra serialization of I/Os which I've removed- a lot of
     87  1.1  oster  * it was to calculate values for the cache code, which is no longer with us.
     88  1.1  oster  * More types, function, macro cleanup. Added code to properly quiesce the array
     89  1.1  oster  * on shutdown. Made a lot of stuff array-specific which was (bogusly) general
     90  1.1  oster  * before. Fixed memory allocation, freeing bugs.
     91  1.1  oster  *
     92  1.1  oster  * Revision 1.9  1996/05/27  18:56:37  jimz
     93  1.1  oster  * more code cleanup
     94  1.1  oster  * better typing
     95  1.1  oster  * compiles in all 3 environments
     96  1.1  oster  *
     97  1.1  oster  * Revision 1.8  1996/05/24  22:17:04  jimz
     98  1.1  oster  * continue code + namespace cleanup
     99  1.1  oster  * typed a bunch of flags
    100  1.1  oster  *
    101  1.1  oster  * Revision 1.7  1996/05/24  04:28:55  jimz
    102  1.1  oster  * release cleanup ckpt
    103  1.1  oster  *
    104  1.1  oster  * Revision 1.6  1996/05/23  21:46:35  jimz
    105  1.1  oster  * checkpoint in code cleanup (release prep)
    106  1.1  oster  * lots of types, function names have been fixed
    107  1.1  oster  *
    108  1.1  oster  * Revision 1.5  1996/05/23  00:33:23  jimz
    109  1.1  oster  * code cleanup: move all debug decls to rf_options.c, all extern
    110  1.1  oster  * debug decls to rf_options.h, all debug vars preceded by rf_
    111  1.1  oster  *
    112  1.1  oster  * Revision 1.4  1996/05/18  19:51:34  jimz
    113  1.1  oster  * major code cleanup- fix syntax, make some types consistent,
    114  1.1  oster  * add prototypes, clean out dead code, et cetera
    115  1.1  oster  *
    116  1.1  oster  * Revision 1.3  1996/05/15  23:23:12  wvcii
    117  1.1  oster  * fixed bug in small write read old q node succedent initialization
    118  1.1  oster  *
    119  1.1  oster  * Revision 1.2  1996/05/08  21:01:24  jimz
    120  1.1  oster  * fixed up enum type names that were conflicting with other
    121  1.1  oster  * enums and function names (ie, "panic")
    122  1.1  oster  * future naming trends will be towards RF_ and rf_ for
    123  1.1  oster  * everything raidframe-related
    124  1.1  oster  *
    125  1.1  oster  * Revision 1.1  1996/05/03  19:20:45  wvcii
    126  1.1  oster  * Initial revision
    127  1.1  oster  *
    128  1.1  oster  */
    129  1.1  oster 
    130  1.1  oster #include "rf_types.h"
    131  1.1  oster #include "rf_raid.h"
    132  1.1  oster #include "rf_dag.h"
    133  1.1  oster #include "rf_dagutils.h"
    134  1.1  oster #include "rf_dagfuncs.h"
    135  1.1  oster #include "rf_threadid.h"
    136  1.1  oster #include "rf_debugMem.h"
    137  1.1  oster #include "rf_dagffrd.h"
    138  1.1  oster #include "rf_memchunk.h"
    139  1.1  oster #include "rf_general.h"
    140  1.1  oster #include "rf_dagffwr.h"
    141  1.1  oster 
    142  1.1  oster /******************************************************************************
    143  1.1  oster  *
    144  1.1  oster  * General comments on DAG creation:
    145  1.1  oster  *
    146  1.1  oster  * All DAGs in this file use roll-away error recovery.  Each DAG has a single
    147  1.1  oster  * commit node, usually called "Cmt."  If an error occurs before the Cmt node
    148  1.1  oster  * is reached, the execution engine will halt forward execution and work
    149  1.1  oster  * backward through the graph, executing the undo functions.  Assuming that
    150  1.1  oster  * each node in the graph prior to the Cmt node are undoable and atomic - or -
    151  1.1  oster  * does not make changes to permanent state, the graph will fail atomically.
    152  1.1  oster  * If an error occurs after the Cmt node executes, the engine will roll-forward
    153  1.1  oster  * through the graph, blindly executing nodes until it reaches the end.
    154  1.1  oster  * If a graph reaches the end, it is assumed to have completed successfully.
    155  1.1  oster  *
    156  1.1  oster  * A graph has only 1 Cmt node.
    157  1.1  oster  *
    158  1.1  oster  */
    159  1.1  oster 
    160  1.1  oster 
    161  1.1  oster /******************************************************************************
    162  1.1  oster  *
    163  1.1  oster  * The following wrappers map the standard DAG creation interface to the
    164  1.1  oster  * DAG creation routines.  Additionally, these wrappers enable experimentation
    165  1.1  oster  * with new DAG structures by providing an extra level of indirection, allowing
    166  1.1  oster  * the DAG creation routines to be replaced at this single point.
    167  1.1  oster  */
    168  1.1  oster 
    169  1.1  oster 
    170  1.1  oster void rf_CreateNonRedundantWriteDAG(
    171  1.1  oster   RF_Raid_t             *raidPtr,
    172  1.1  oster   RF_AccessStripeMap_t  *asmap,
    173  1.1  oster   RF_DagHeader_t        *dag_h,
    174  1.1  oster   void                  *bp,
    175  1.1  oster   RF_RaidAccessFlags_t   flags,
    176  1.1  oster   RF_AllocListElem_t    *allocList,
    177  1.1  oster   RF_IoType_t            type)
    178  1.1  oster {
    179  1.1  oster   rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
    180  1.1  oster     RF_IO_TYPE_WRITE);
    181  1.1  oster }
    182  1.1  oster 
    183  1.1  oster void rf_CreateRAID0WriteDAG(
    184  1.1  oster   RF_Raid_t             *raidPtr,
    185  1.1  oster   RF_AccessStripeMap_t  *asmap,
    186  1.1  oster   RF_DagHeader_t        *dag_h,
    187  1.1  oster   void                  *bp,
    188  1.1  oster   RF_RaidAccessFlags_t   flags,
    189  1.1  oster   RF_AllocListElem_t    *allocList,
    190  1.1  oster   RF_IoType_t            type)
    191  1.1  oster {
    192  1.1  oster   rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
    193  1.1  oster     RF_IO_TYPE_WRITE);
    194  1.1  oster }
    195  1.1  oster 
    196  1.1  oster void rf_CreateSmallWriteDAG(
    197  1.1  oster   RF_Raid_t             *raidPtr,
    198  1.1  oster   RF_AccessStripeMap_t  *asmap,
    199  1.1  oster   RF_DagHeader_t        *dag_h,
    200  1.1  oster   void                  *bp,
    201  1.1  oster   RF_RaidAccessFlags_t   flags,
    202  1.1  oster   RF_AllocListElem_t    *allocList)
    203  1.1  oster {
    204  1.1  oster #if RF_FORWARD > 0
    205  1.1  oster   rf_CommonCreateSmallWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
    206  1.1  oster     &rf_xorFuncs, NULL);
    207  1.1  oster #else /* RF_FORWARD > 0 */
    208  1.1  oster #if RF_BACKWARD > 0
    209  1.1  oster   rf_CommonCreateSmallWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
    210  1.1  oster     &rf_xorFuncs, NULL);
    211  1.1  oster #else /* RF_BACKWARD > 0 */
    212  1.1  oster   /* "normal" rollaway */
    213  1.1  oster   rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
    214  1.1  oster     &rf_xorFuncs, NULL);
    215  1.1  oster #endif /* RF_BACKWARD > 0 */
    216  1.1  oster #endif /* RF_FORWARD > 0 */
    217  1.1  oster }
    218  1.1  oster 
    219  1.1  oster void rf_CreateLargeWriteDAG(
    220  1.1  oster   RF_Raid_t             *raidPtr,
    221  1.1  oster   RF_AccessStripeMap_t  *asmap,
    222  1.1  oster   RF_DagHeader_t        *dag_h,
    223  1.1  oster   void                  *bp,
    224  1.1  oster   RF_RaidAccessFlags_t   flags,
    225  1.1  oster   RF_AllocListElem_t    *allocList)
    226  1.1  oster {
    227  1.1  oster #if RF_FORWARD > 0
    228  1.1  oster   rf_CommonCreateLargeWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
    229  1.1  oster     1, rf_RegularXorFunc, RF_TRUE);
    230  1.1  oster #else /* RF_FORWARD > 0 */
    231  1.1  oster #if RF_BACKWARD > 0
    232  1.1  oster   rf_CommonCreateLargeWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
    233  1.1  oster     1, rf_RegularXorFunc, RF_TRUE);
    234  1.1  oster #else /* RF_BACKWARD > 0 */
    235  1.1  oster   /* "normal" rollaway */
    236  1.1  oster   rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
    237  1.1  oster     1, rf_RegularXorFunc, RF_TRUE);
    238  1.1  oster #endif /* RF_BACKWARD > 0 */
    239  1.1  oster #endif /* RF_FORWARD > 0 */
    240  1.1  oster }
    241  1.1  oster 
    242  1.1  oster 
    243  1.1  oster /******************************************************************************
    244  1.1  oster  *
    245  1.1  oster  * DAG creation code begins here
    246  1.1  oster  */
    247  1.1  oster 
    248  1.1  oster 
    249  1.1  oster /******************************************************************************
    250  1.1  oster  *
    251  1.1  oster  * creates a DAG to perform a large-write operation:
    252  1.1  oster  *
    253  1.1  oster  *           / Rod \           / Wnd \
    254  1.1  oster  * H -- block- Rod - Xor - Cmt - Wnd --- T
    255  1.1  oster  *           \ Rod /          \  Wnp /
    256  1.1  oster  *                             \[Wnq]/
    257  1.1  oster  *
    258  1.1  oster  * The XOR node also does the Q calculation in the P+Q architecture.
    259  1.1  oster  * All nodes are before the commit node (Cmt) are assumed to be atomic and
    260  1.1  oster  * undoable - or - they make no changes to permanent state.
    261  1.1  oster  *
    262  1.1  oster  * Rod = read old data
    263  1.1  oster  * Cmt = commit node
    264  1.1  oster  * Wnp = write new parity
    265  1.1  oster  * Wnd = write new data
    266  1.1  oster  * Wnq = write new "q"
    267  1.1  oster  * [] denotes optional segments in the graph
    268  1.1  oster  *
    269  1.1  oster  * Parameters:  raidPtr   - description of the physical array
    270  1.1  oster  *              asmap     - logical & physical addresses for this access
    271  1.1  oster  *              bp        - buffer ptr (holds write data)
    272  1.1  oster  *              flags     - general flags (e.g. disk locking)
    273  1.1  oster  *              allocList - list of memory allocated in DAG creation
    274  1.1  oster  *              nfaults   - number of faults array can tolerate
    275  1.1  oster  *                          (equal to # redundancy units in stripe)
    276  1.1  oster  *              redfuncs  - list of redundancy generating functions
    277  1.1  oster  *
    278  1.1  oster  *****************************************************************************/
    279  1.1  oster 
    280  1.1  oster void rf_CommonCreateLargeWriteDAG(
    281  1.1  oster   RF_Raid_t             *raidPtr,
    282  1.1  oster   RF_AccessStripeMap_t  *asmap,
    283  1.1  oster   RF_DagHeader_t        *dag_h,
    284  1.1  oster   void                  *bp,
    285  1.1  oster   RF_RaidAccessFlags_t   flags,
    286  1.1  oster   RF_AllocListElem_t    *allocList,
    287  1.1  oster   int                    nfaults,
    288  1.1  oster   int                  (*redFunc)(RF_DagNode_t *),
    289  1.1  oster   int                    allowBufferRecycle)
    290  1.1  oster {
    291  1.1  oster   RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
    292  1.1  oster   RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode;
    293  1.1  oster   int nWndNodes, nRodNodes, i, nodeNum, asmNum;
    294  1.1  oster   RF_AccessStripeMapHeader_t *new_asm_h[2];
    295  1.1  oster   RF_StripeNum_t parityStripeID;
    296  1.1  oster   char *sosBuffer, *eosBuffer;
    297  1.1  oster   RF_ReconUnitNum_t which_ru;
    298  1.1  oster   RF_RaidLayout_t *layoutPtr;
    299  1.1  oster   RF_PhysDiskAddr_t *pda;
    300  1.1  oster 
    301  1.1  oster   layoutPtr = &(raidPtr->Layout);
    302  1.1  oster   parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr, asmap->raidAddress,
    303  1.1  oster     &which_ru);
    304  1.1  oster 
    305  1.1  oster   if (rf_dagDebug) {
    306  1.1  oster     printf("[Creating large-write DAG]\n");
    307  1.1  oster   }
    308  1.1  oster   dag_h->creator = "LargeWriteDAG";
    309  1.1  oster 
    310  1.1  oster   dag_h->numCommitNodes = 1;
    311  1.1  oster   dag_h->numCommits = 0;
    312  1.1  oster   dag_h->numSuccedents = 1;
    313  1.1  oster 
    314  1.1  oster   /* alloc the nodes: Wnd, xor, commit, block, term, and  Wnp */
    315  1.1  oster   nWndNodes = asmap->numStripeUnitsAccessed;
    316  1.1  oster   RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t),
    317  1.1  oster     (RF_DagNode_t *), allocList);
    318  1.1  oster   i = 0;
    319  1.1  oster   wndNodes    = &nodes[i]; i += nWndNodes;
    320  1.1  oster   xorNode     = &nodes[i]; i += 1;
    321  1.1  oster   wnpNode     = &nodes[i]; i += 1;
    322  1.1  oster   blockNode   = &nodes[i]; i += 1;
    323  1.1  oster   commitNode  = &nodes[i]; i += 1;
    324  1.1  oster   termNode    = &nodes[i]; i += 1;
    325  1.1  oster   if (nfaults == 2) {
    326  1.1  oster     wnqNode   = &nodes[i]; i += 1;
    327  1.1  oster   }
    328  1.1  oster   else {
    329  1.1  oster     wnqNode = NULL;
    330  1.1  oster   }
    331  1.1  oster   rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h,
    332  1.1  oster     &nRodNodes, &sosBuffer, &eosBuffer, allocList);
    333  1.1  oster   if (nRodNodes > 0) {
    334  1.1  oster     RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t),
    335  1.1  oster       (RF_DagNode_t *), allocList);
    336  1.1  oster   }
    337  1.1  oster   else {
    338  1.1  oster     rodNodes = NULL;
    339  1.1  oster   }
    340  1.1  oster 
    341  1.1  oster   /* begin node initialization */
    342  1.1  oster   if (nRodNodes > 0) {
    343  1.1  oster     rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
    344  1.1  oster       NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList);
    345  1.1  oster   }
    346  1.1  oster   else {
    347  1.1  oster     rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
    348  1.1  oster       NULL, 1, 0, 0, 0, dag_h, "Nil", allocList);
    349  1.1  oster   }
    350  1.1  oster 
    351  1.1  oster   rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL,
    352  1.1  oster     nWndNodes + nfaults, 1, 0, 0, dag_h, "Cmt", allocList);
    353  1.1  oster   rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL,
    354  1.1  oster     0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList);
    355  1.1  oster 
    356  1.1  oster   /* initialize the Rod nodes */
    357  1.1  oster   for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
    358  1.1  oster     if (new_asm_h[asmNum]) {
    359  1.1  oster       pda = new_asm_h[asmNum]->stripeMap->physInfo;
    360  1.1  oster       while (pda) {
    361  1.1  oster         rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc,
    362  1.1  oster           rf_DiskReadUndoFunc,rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
    363  1.1  oster           "Rod", allocList);
    364  1.1  oster         rodNodes[nodeNum].params[0].p = pda;
    365  1.1  oster         rodNodes[nodeNum].params[1].p = pda->bufPtr;
    366  1.1  oster         rodNodes[nodeNum].params[2].v = parityStripeID;
    367  1.1  oster         rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    368  1.1  oster           0, 0, which_ru);
    369  1.1  oster         nodeNum++;
    370  1.1  oster         pda = pda->next;
    371  1.1  oster       }
    372  1.1  oster     }
    373  1.1  oster   }
    374  1.1  oster   RF_ASSERT(nodeNum == nRodNodes);
    375  1.1  oster 
    376  1.1  oster   /* initialize the wnd nodes */
    377  1.1  oster   pda = asmap->physInfo;
    378  1.1  oster   for (i=0; i < nWndNodes; i++) {
    379  1.1  oster     rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
    380  1.1  oster       rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
    381  1.1  oster     RF_ASSERT(pda != NULL);
    382  1.1  oster     wndNodes[i].params[0].p = pda;
    383  1.1  oster     wndNodes[i].params[1].p = pda->bufPtr;
    384  1.1  oster     wndNodes[i].params[2].v = parityStripeID;
    385  1.1  oster     wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
    386  1.1  oster     pda = pda->next;
    387  1.1  oster   }
    388  1.1  oster 
    389  1.1  oster   /* initialize the redundancy node */
    390  1.1  oster   if (nRodNodes > 0) {
    391  1.1  oster     rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
    392  1.1  oster       nRodNodes, 2 * (nWndNodes+nRodNodes) + 1, nfaults, dag_h,
    393  1.1  oster       "Xr ", allocList);
    394  1.1  oster   }
    395  1.1  oster   else {
    396  1.1  oster     rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
    397  1.1  oster       1, 2 * (nWndNodes+nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);
    398  1.1  oster   }
    399  1.1  oster   xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
    400  1.1  oster   for (i=0; i < nWndNodes; i++) {
    401  1.1  oster     xorNode->params[2*i+0] = wndNodes[i].params[0];         /* pda */
    402  1.1  oster     xorNode->params[2*i+1] = wndNodes[i].params[1];         /* buf ptr */
    403  1.1  oster   }
    404  1.1  oster   for (i=0; i < nRodNodes; i++) {
    405  1.1  oster     xorNode->params[2*(nWndNodes+i)+0] = rodNodes[i].params[0];  /* pda */
    406  1.1  oster     xorNode->params[2*(nWndNodes+i)+1] = rodNodes[i].params[1];  /* buf ptr */
    407  1.1  oster   }
    408  1.1  oster   /* xor node needs to get at RAID information */
    409  1.1  oster   xorNode->params[2*(nWndNodes+nRodNodes)].p = raidPtr;
    410  1.1  oster 
    411  1.1  oster   /*
    412  1.1  oster    * Look for an Rod node that reads a complete SU. If none, alloc a buffer
    413  1.1  oster    * to receive the parity info. Note that we can't use a new data buffer
    414  1.1  oster    * because it will not have gotten written when the xor occurs.
    415  1.1  oster    */
    416  1.1  oster   if (allowBufferRecycle) {
    417  1.1  oster     for (i = 0; i < nRodNodes; i++) {
    418  1.1  oster       if (((RF_PhysDiskAddr_t *)rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
    419  1.1  oster         break;
    420  1.1  oster     }
    421  1.1  oster   }
    422  1.1  oster   if ((!allowBufferRecycle) || (i == nRodNodes)) {
    423  1.1  oster     RF_CallocAndAdd(xorNode->results[0], 1,
    424  1.1  oster       rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
    425  1.1  oster       (void *), allocList);
    426  1.1  oster   }
    427  1.1  oster   else {
    428  1.1  oster     xorNode->results[0] = rodNodes[i].params[1].p;
    429  1.1  oster   }
    430  1.1  oster 
    431  1.1  oster   /* initialize the Wnp node */
    432  1.1  oster   rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
    433  1.1  oster     rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);
    434  1.1  oster   wnpNode->params[0].p = asmap->parityInfo;
    435  1.1  oster   wnpNode->params[1].p = xorNode->results[0];
    436  1.1  oster   wnpNode->params[2].v = parityStripeID;
    437  1.1  oster   wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
    438  1.1  oster   /* parityInfo must describe entire parity unit */
    439  1.1  oster   RF_ASSERT(asmap->parityInfo->next == NULL);
    440  1.1  oster 
    441  1.1  oster   if (nfaults == 2) {
    442  1.1  oster       /*
    443  1.1  oster        * We never try to recycle a buffer for the Q calcuation
    444  1.1  oster        * in addition to the parity. This would cause two buffers
    445  1.1  oster        * to get smashed during the P and Q calculation, guaranteeing
    446  1.1  oster        * one would be wrong.
    447  1.1  oster        */
    448  1.1  oster       RF_CallocAndAdd(xorNode->results[1], 1,
    449  1.1  oster         rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
    450  1.1  oster         (void *),allocList);
    451  1.1  oster       rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
    452  1.1  oster         rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);
    453  1.1  oster       wnqNode->params[0].p = asmap->qInfo;
    454  1.1  oster       wnqNode->params[1].p = xorNode->results[1];
    455  1.1  oster       wnqNode->params[2].v = parityStripeID;
    456  1.1  oster       wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
    457  1.1  oster       /* parityInfo must describe entire parity unit */
    458  1.1  oster       RF_ASSERT(asmap->parityInfo->next == NULL);
    459  1.1  oster   }
    460  1.1  oster 
    461  1.1  oster   /*
    462  1.1  oster    * Connect nodes to form graph.
    463  1.1  oster    */
    464  1.1  oster 
    465  1.1  oster   /* connect dag header to block node */
    466  1.1  oster   RF_ASSERT(blockNode->numAntecedents == 0);
    467  1.1  oster   dag_h->succedents[0] = blockNode;
    468  1.1  oster 
    469  1.1  oster   if (nRodNodes > 0) {
    470  1.1  oster     /* connect the block node to the Rod nodes */
    471  1.1  oster     RF_ASSERT(blockNode->numSuccedents == nRodNodes);
    472  1.1  oster     RF_ASSERT(xorNode->numAntecedents == nRodNodes);
    473  1.1  oster     for (i = 0; i < nRodNodes; i++) {
    474  1.1  oster       RF_ASSERT(rodNodes[i].numAntecedents == 1);
    475  1.1  oster       blockNode->succedents[i] = &rodNodes[i];
    476  1.1  oster       rodNodes[i].antecedents[0] = blockNode;
    477  1.1  oster       rodNodes[i].antType[0] = rf_control;
    478  1.1  oster 
    479  1.1  oster       /* connect the Rod nodes to the Xor node */
    480  1.1  oster       RF_ASSERT(rodNodes[i].numSuccedents == 1);
    481  1.1  oster       rodNodes[i].succedents[0] = xorNode;
    482  1.1  oster       xorNode->antecedents[i] = &rodNodes[i];
    483  1.1  oster       xorNode->antType[i] = rf_trueData;
    484  1.1  oster     }
    485  1.1  oster   }
    486  1.1  oster   else {
    487  1.1  oster     /* connect the block node to the Xor node */
    488  1.1  oster     RF_ASSERT(blockNode->numSuccedents == 1);
    489  1.1  oster     RF_ASSERT(xorNode->numAntecedents == 1);
    490  1.1  oster     blockNode->succedents[0] = xorNode;
    491  1.1  oster     xorNode->antecedents[0] = blockNode;
    492  1.1  oster     xorNode->antType[0] = rf_control;
    493  1.1  oster   }
    494  1.1  oster 
    495  1.1  oster   /* connect the xor node to the commit node */
    496  1.1  oster   RF_ASSERT(xorNode->numSuccedents == 1);
    497  1.1  oster   RF_ASSERT(commitNode->numAntecedents == 1);
    498  1.1  oster   xorNode->succedents[0] = commitNode;
    499  1.1  oster   commitNode->antecedents[0] = xorNode;
    500  1.1  oster   commitNode->antType[0] = rf_control;
    501  1.1  oster 
    502  1.1  oster   /* connect the commit node to the write nodes */
    503  1.1  oster   RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults);
    504  1.1  oster   for (i = 0; i < nWndNodes; i++) {
    505  1.1  oster     RF_ASSERT(wndNodes->numAntecedents == 1);
    506  1.1  oster     commitNode->succedents[i] = &wndNodes[i];
    507  1.1  oster     wndNodes[i].antecedents[0] = commitNode;
    508  1.1  oster     wndNodes[i].antType[0] = rf_control;
    509  1.1  oster   }
    510  1.1  oster   RF_ASSERT(wnpNode->numAntecedents == 1);
    511  1.1  oster   commitNode->succedents[nWndNodes] = wnpNode;
    512  1.1  oster   wnpNode->antecedents[0]= commitNode;
    513  1.1  oster   wnpNode->antType[0] = rf_trueData;
    514  1.1  oster   if (nfaults == 2) {
    515  1.1  oster     RF_ASSERT(wnqNode->numAntecedents == 1);
    516  1.1  oster     commitNode->succedents[nWndNodes + 1] = wnqNode;
    517  1.1  oster     wnqNode->antecedents[0] = commitNode;
    518  1.1  oster     wnqNode->antType[0] = rf_trueData;
    519  1.1  oster   }
    520  1.1  oster 
    521  1.1  oster   /* connect the write nodes to the term node */
    522  1.1  oster   RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
    523  1.1  oster   RF_ASSERT(termNode->numSuccedents == 0);
    524  1.1  oster   for (i = 0; i < nWndNodes; i++) {
    525  1.1  oster     RF_ASSERT(wndNodes->numSuccedents == 1);
    526  1.1  oster     wndNodes[i].succedents[0] = termNode;
    527  1.1  oster     termNode->antecedents[i] = &wndNodes[i];
    528  1.1  oster     termNode->antType[i] = rf_control;
    529  1.1  oster   }
    530  1.1  oster   RF_ASSERT(wnpNode->numSuccedents == 1);
    531  1.1  oster   wnpNode->succedents[0] = termNode;
    532  1.1  oster   termNode->antecedents[nWndNodes] = wnpNode;
    533  1.1  oster   termNode->antType[nWndNodes] = rf_control;
    534  1.1  oster   if (nfaults == 2) {
    535  1.1  oster     RF_ASSERT(wnqNode->numSuccedents == 1);
    536  1.1  oster     wnqNode->succedents[0] = termNode;
    537  1.1  oster     termNode->antecedents[nWndNodes + 1] = wnqNode;
    538  1.1  oster     termNode->antType[nWndNodes + 1] = rf_control;
    539  1.1  oster   }
    540  1.1  oster }
    541  1.1  oster 
    542  1.1  oster /******************************************************************************
    543  1.1  oster  *
    544  1.1  oster  * creates a DAG to perform a small-write operation (either raid 5 or pq),
    545  1.1  oster  * which is as follows:
    546  1.1  oster  *
    547  1.1  oster  * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm
    548  1.1  oster  *            \- Rod X      /     \----> Wnd [Und]-/
    549  1.1  oster  *           [\- Rod X     /       \---> Wnd [Und]-/]
    550  1.1  oster  *           [\- Roq -> Q /         \--> Wnq [Unq]-/]
    551  1.1  oster  *
    552  1.1  oster  * Rop = read old parity
    553  1.1  oster  * Rod = read old data
    554  1.1  oster  * Roq = read old "q"
    555  1.1  oster  * Cmt = commit node
    556  1.1  oster  * Und = unlock data disk
    557  1.1  oster  * Unp = unlock parity disk
    558  1.1  oster  * Unq = unlock q disk
    559  1.1  oster  * Wnp = write new parity
    560  1.1  oster  * Wnd = write new data
    561  1.1  oster  * Wnq = write new "q"
    562  1.1  oster  * [ ] denotes optional segments in the graph
    563  1.1  oster  *
    564  1.1  oster  * Parameters:  raidPtr   - description of the physical array
    565  1.1  oster  *              asmap     - logical & physical addresses for this access
    566  1.1  oster  *              bp        - buffer ptr (holds write data)
    567  1.1  oster  *              flags     - general flags (e.g. disk locking)
    568  1.1  oster  *              allocList - list of memory allocated in DAG creation
    569  1.1  oster  *              pfuncs    - list of parity generating functions
    570  1.1  oster  *              qfuncs    - list of q generating functions
    571  1.1  oster  *
    572  1.1  oster  * A null qfuncs indicates single fault tolerant
    573  1.1  oster  *****************************************************************************/
    574  1.1  oster 
    575  1.1  oster void rf_CommonCreateSmallWriteDAG(
    576  1.1  oster   RF_Raid_t             *raidPtr,
    577  1.1  oster   RF_AccessStripeMap_t  *asmap,
    578  1.1  oster   RF_DagHeader_t        *dag_h,
    579  1.1  oster   void                  *bp,
    580  1.1  oster   RF_RaidAccessFlags_t   flags,
    581  1.1  oster   RF_AllocListElem_t    *allocList,
    582  1.1  oster   RF_RedFuncs_t         *pfuncs,
    583  1.1  oster   RF_RedFuncs_t         *qfuncs)
    584  1.1  oster {
    585  1.1  oster   RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
    586  1.1  oster   RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
    587  1.1  oster   RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes;
    588  1.1  oster   RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
    589  1.1  oster   int i, j, nNodes, totalNumNodes, lu_flag;
    590  1.1  oster   RF_ReconUnitNum_t which_ru;
    591  1.1  oster   int (*func)(RF_DagNode_t *), (*undoFunc)(RF_DagNode_t *);
    592  1.1  oster   int (*qfunc)(RF_DagNode_t *);
    593  1.1  oster   int numDataNodes, numParityNodes;
    594  1.1  oster   RF_StripeNum_t parityStripeID;
    595  1.1  oster   RF_PhysDiskAddr_t *pda;
    596  1.1  oster   char *name, *qname;
    597  1.1  oster   long nfaults;
    598  1.1  oster 
    599  1.1  oster   nfaults = qfuncs ? 2 : 1;
    600  1.1  oster   lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */
    601  1.1  oster 
    602  1.1  oster   parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
    603  1.1  oster     asmap->raidAddress, &which_ru);
    604  1.1  oster   pda = asmap->physInfo;
    605  1.1  oster   numDataNodes = asmap->numStripeUnitsAccessed;
    606  1.1  oster   numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
    607  1.1  oster 
    608  1.1  oster   if (rf_dagDebug) {
    609  1.1  oster     printf("[Creating small-write DAG]\n");
    610  1.1  oster   }
    611  1.1  oster   RF_ASSERT(numDataNodes > 0);
    612  1.1  oster   dag_h->creator = "SmallWriteDAG";
    613  1.1  oster 
    614  1.1  oster   dag_h->numCommitNodes = 1;
    615  1.1  oster   dag_h->numCommits = 0;
    616  1.1  oster   dag_h->numSuccedents = 1;
    617  1.1  oster 
    618  1.1  oster   /*
    619  1.1  oster    * DAG creation occurs in four steps:
    620  1.1  oster    * 1. count the number of nodes in the DAG
    621  1.1  oster    * 2. create the nodes
    622  1.1  oster    * 3. initialize the nodes
    623  1.1  oster    * 4. connect the nodes
    624  1.1  oster    */
    625  1.1  oster 
    626  1.1  oster   /*
    627  1.1  oster    * Step 1. compute number of nodes in the graph
    628  1.1  oster    */
    629  1.1  oster 
    630  1.1  oster   /* number of nodes:
    631  1.1  oster    *  a read and write for each data unit
    632  1.1  oster    *  a redundancy computation node for each parity node (nfaults * nparity)
    633  1.1  oster    *  a read and write for each parity unit
    634  1.1  oster    *  a block and commit node (2)
    635  1.1  oster    *  a terminate node
    636  1.1  oster    *  if atomic RMW
    637  1.1  oster    *    an unlock node for each data unit, redundancy unit
    638  1.1  oster    */
    639  1.1  oster   totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
    640  1.1  oster     + (nfaults * 2 * numParityNodes) + 3;
    641  1.1  oster   if (lu_flag) {
    642  1.1  oster     totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
    643  1.1  oster   }
    644  1.1  oster 
    645  1.1  oster   /*
    646  1.1  oster    * Step 2. create the nodes
    647  1.1  oster    */
    648  1.1  oster   RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t),
    649  1.1  oster     (RF_DagNode_t *), allocList);
    650  1.1  oster   i = 0;
    651  1.1  oster   blockNode        = &nodes[i]; i += 1;
    652  1.1  oster   commitNode       = &nodes[i]; i += 1;
    653  1.1  oster   readDataNodes    = &nodes[i]; i += numDataNodes;
    654  1.1  oster   readParityNodes  = &nodes[i]; i += numParityNodes;
    655  1.1  oster   writeDataNodes   = &nodes[i]; i += numDataNodes;
    656  1.1  oster   writeParityNodes = &nodes[i]; i += numParityNodes;
    657  1.1  oster   xorNodes         = &nodes[i]; i += numParityNodes;
    658  1.1  oster   termNode         = &nodes[i]; i += 1;
    659  1.1  oster   if (lu_flag) {
    660  1.1  oster     unlockDataNodes   = &nodes[i]; i += numDataNodes;
    661  1.1  oster     unlockParityNodes = &nodes[i]; i += numParityNodes;
    662  1.1  oster   }
    663  1.1  oster   else {
    664  1.1  oster     unlockDataNodes = unlockParityNodes = NULL;
    665  1.1  oster   }
    666  1.1  oster   if (nfaults == 2) {
    667  1.1  oster     readQNodes     = &nodes[i]; i += numParityNodes;
    668  1.1  oster     writeQNodes    = &nodes[i]; i += numParityNodes;
    669  1.1  oster     qNodes         = &nodes[i]; i += numParityNodes;
    670  1.1  oster     if (lu_flag) {
    671  1.1  oster       unlockQNodes    = &nodes[i]; i += numParityNodes;
    672  1.1  oster     }
    673  1.1  oster     else {
    674  1.1  oster       unlockQNodes = NULL;
    675  1.1  oster     }
    676  1.1  oster   }
    677  1.1  oster   else {
    678  1.1  oster     readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
    679  1.1  oster   }
    680  1.1  oster   RF_ASSERT(i == totalNumNodes);
    681  1.1  oster 
    682  1.1  oster   /*
    683  1.1  oster    * Step 3. initialize the nodes
    684  1.1  oster    */
    685  1.1  oster   /* initialize block node (Nil) */
    686  1.1  oster   nNodes     = numDataNodes + (nfaults * numParityNodes);
    687  1.1  oster   rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
    688  1.1  oster     NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
    689  1.1  oster 
    690  1.1  oster   /* initialize commit node (Cmt) */
    691  1.1  oster   rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
    692  1.1  oster     NULL, nNodes, (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList);
    693  1.1  oster 
    694  1.1  oster   /* initialize terminate node (Trm) */
    695  1.1  oster   rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
    696  1.1  oster     NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);
    697  1.1  oster 
    698  1.1  oster   /* initialize nodes which read old data (Rod) */
    699  1.1  oster   for (i = 0; i < numDataNodes; i++) {
    700  1.1  oster     rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
    701  1.1  oster       rf_GenericWakeupFunc, (nfaults * numParityNodes), 1, 4, 0, dag_h,
    702  1.1  oster       "Rod", allocList);
    703  1.1  oster     RF_ASSERT(pda != NULL);
    704  1.1  oster     /* physical disk addr desc */
    705  1.1  oster     readDataNodes[i].params[0].p = pda;
    706  1.1  oster     /* buffer to hold old data */
    707  1.1  oster     readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
    708  1.1  oster       dag_h, pda, allocList);
    709  1.1  oster     readDataNodes[i].params[2].v = parityStripeID;
    710  1.1  oster     readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    711  1.1  oster       lu_flag, 0, which_ru);
    712  1.1  oster     pda = pda->next;
    713  1.1  oster     for (j = 0; j < readDataNodes[i].numSuccedents; j++) {
    714  1.1  oster       readDataNodes[i].propList[j] = NULL;
    715  1.1  oster     }
    716  1.1  oster   }
    717  1.1  oster 
    718  1.1  oster   /* initialize nodes which read old parity (Rop) */
    719  1.1  oster   pda = asmap->parityInfo; i = 0;
    720  1.1  oster   for (i = 0; i < numParityNodes; i++) {
    721  1.1  oster     RF_ASSERT(pda != NULL);
    722  1.1  oster     rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc,
    723  1.1  oster       rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4,
    724  1.1  oster       0, dag_h, "Rop", allocList);
    725  1.1  oster     readParityNodes[i].params[0].p = pda;
    726  1.1  oster     /* buffer to hold old parity */
    727  1.1  oster     readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
    728  1.1  oster       dag_h, pda, allocList);
    729  1.1  oster     readParityNodes[i].params[2].v = parityStripeID;
    730  1.1  oster     readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    731  1.1  oster       lu_flag, 0, which_ru);
    732  1.1  oster     pda = pda->next;
    733  1.1  oster     for (j = 0; j < readParityNodes[i].numSuccedents; j++) {
    734  1.1  oster       readParityNodes[i].propList[0] = NULL;
    735  1.1  oster     }
    736  1.1  oster   }
    737  1.1  oster 
    738  1.1  oster   /* initialize nodes which read old Q (Roq) */
    739  1.1  oster   if (nfaults == 2) {
    740  1.1  oster     pda = asmap->qInfo;
    741  1.1  oster     for (i = 0; i < numParityNodes; i++) {
    742  1.1  oster       RF_ASSERT(pda != NULL);
    743  1.1  oster       rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
    744  1.1  oster         rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);
    745  1.1  oster       readQNodes[i].params[0].p = pda;
    746  1.1  oster       /* buffer to hold old Q */
    747  1.1  oster       readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda,
    748  1.1  oster         allocList);
    749  1.1  oster       readQNodes[i].params[2].v = parityStripeID;
    750  1.1  oster       readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    751  1.1  oster         lu_flag, 0, which_ru);
    752  1.1  oster       pda = pda->next;
    753  1.1  oster       for (j = 0; j < readQNodes[i].numSuccedents; j++) {
    754  1.1  oster         readQNodes[i].propList[0] = NULL;
    755  1.1  oster       }
    756  1.1  oster     }
    757  1.1  oster   }
    758  1.1  oster 
    759  1.1  oster   /* initialize nodes which write new data (Wnd) */
    760  1.1  oster   pda = asmap->physInfo;
    761  1.1  oster   for (i=0; i < numDataNodes; i++) {
    762  1.1  oster     RF_ASSERT(pda != NULL);
    763  1.1  oster     rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
    764  1.1  oster       rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
    765  1.1  oster       "Wnd", allocList);
    766  1.1  oster     /* physical disk addr desc */
    767  1.1  oster     writeDataNodes[i].params[0].p = pda;
    768  1.1  oster     /* buffer holding new data to be written */
    769  1.1  oster     writeDataNodes[i].params[1].p = pda->bufPtr;
    770  1.1  oster     writeDataNodes[i].params[2].v = parityStripeID;
    771  1.1  oster     writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    772  1.1  oster       0, 0, which_ru);
    773  1.1  oster     if (lu_flag) {
    774  1.1  oster       /* initialize node to unlock the disk queue */
    775  1.1  oster       rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
    776  1.1  oster         rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
    777  1.1  oster         "Und", allocList);
    778  1.1  oster       /* physical disk addr desc */
    779  1.1  oster       unlockDataNodes[i].params[0].p = pda;
    780  1.1  oster       unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    781  1.1  oster         0, lu_flag, which_ru);
    782  1.1  oster     }
    783  1.1  oster     pda = pda->next;
    784  1.1  oster   }
    785  1.1  oster 
    786  1.1  oster   /*
    787  1.1  oster    * Initialize nodes which compute new parity and Q.
    788  1.1  oster    */
    789  1.1  oster   /*
    790  1.1  oster    * We use the simple XOR func in the double-XOR case, and when
    791  1.1  oster    * we're accessing only a portion of one stripe unit. The distinction
    792  1.1  oster    * between the two is that the regular XOR func assumes that the targbuf
    793  1.1  oster    * is a full SU in size, and examines the pda associated with the buffer
    794  1.1  oster    * to decide where within the buffer to XOR the data, whereas
    795  1.1  oster    * the simple XOR func just XORs the data into the start of the buffer.
    796  1.1  oster    */
    797  1.1  oster   if ((numParityNodes==2) || ((numDataNodes == 1)
    798  1.1  oster     && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit)))
    799  1.1  oster   {
    800  1.1  oster     func = pfuncs->simple; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->SimpleName;
    801  1.1  oster     if (qfuncs) {
    802  1.1  oster       qfunc = qfuncs->simple;
    803  1.1  oster       qname = qfuncs->SimpleName;
    804  1.1  oster     }
    805  1.1  oster     else {
    806  1.1  oster       qfunc = NULL;
    807  1.1  oster       qname = NULL;
    808  1.1  oster     }
    809  1.1  oster   }
    810  1.1  oster   else {
    811  1.1  oster     func = pfuncs->regular;
    812  1.1  oster     undoFunc = rf_NullNodeUndoFunc;
    813  1.1  oster     name = pfuncs->RegularName;
    814  1.1  oster     if (qfuncs) {
    815  1.1  oster       qfunc = qfuncs->regular;
    816  1.1  oster       qname = qfuncs->RegularName;
    817  1.1  oster     }
    818  1.1  oster     else {
    819  1.1  oster       qfunc = NULL;
    820  1.1  oster       qname = NULL;
    821  1.1  oster     }
    822  1.1  oster   }
    823  1.1  oster   /*
    824  1.1  oster    * Initialize the xor nodes: params are {pda,buf}
    825  1.1  oster    * from {Rod,Wnd,Rop} nodes, and raidPtr
    826  1.1  oster    */
    827  1.1  oster   if (numParityNodes==2) {
    828  1.1  oster     /* double-xor case */
    829  1.1  oster     for (i=0; i < numParityNodes; i++) {
    830  1.1  oster       /* note: no wakeup func for xor */
    831  1.1  oster       rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL,
    832  1.1  oster         1, (numDataNodes + numParityNodes), 7, 1, dag_h, name, allocList);
    833  1.1  oster       xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
    834  1.1  oster       xorNodes[i].params[0]   = readDataNodes[i].params[0];
    835  1.1  oster       xorNodes[i].params[1]   = readDataNodes[i].params[1];
    836  1.1  oster       xorNodes[i].params[2]   = readParityNodes[i].params[0];
    837  1.1  oster       xorNodes[i].params[3]   = readParityNodes[i].params[1];
    838  1.1  oster       xorNodes[i].params[4]   = writeDataNodes[i].params[0];
    839  1.1  oster       xorNodes[i].params[5]   = writeDataNodes[i].params[1];
    840  1.1  oster       xorNodes[i].params[6].p = raidPtr;
    841  1.1  oster       /* use old parity buf as target buf */
    842  1.1  oster       xorNodes[i].results[0] = readParityNodes[i].params[1].p;
    843  1.1  oster       if (nfaults == 2) {
    844  1.1  oster         /* note: no wakeup func for qor */
    845  1.1  oster         rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
    846  1.1  oster           (numDataNodes + numParityNodes), 7, 1, dag_h, qname, allocList);
    847  1.1  oster         qNodes[i].params[0]   = readDataNodes[i].params[0];
    848  1.1  oster         qNodes[i].params[1]   = readDataNodes[i].params[1];
    849  1.1  oster         qNodes[i].params[2]   = readQNodes[i].params[0];
    850  1.1  oster         qNodes[i].params[3]   = readQNodes[i].params[1];
    851  1.1  oster         qNodes[i].params[4]   = writeDataNodes[i].params[0];
    852  1.1  oster         qNodes[i].params[5]   = writeDataNodes[i].params[1];
    853  1.1  oster         qNodes[i].params[6].p = raidPtr;
    854  1.1  oster         /* use old Q buf as target buf */
    855  1.1  oster         qNodes[i].results[0] = readQNodes[i].params[1].p;
    856  1.1  oster       }
    857  1.1  oster     }
    858  1.1  oster   }
    859  1.1  oster   else {
    860  1.1  oster     /* there is only one xor node in this case */
    861  1.1  oster     rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, 1,
    862  1.1  oster       (numDataNodes + numParityNodes),
    863  1.1  oster       (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
    864  1.1  oster     xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
    865  1.1  oster     for (i=0; i < numDataNodes + 1; i++) {
    866  1.1  oster       /* set up params related to Rod and Rop nodes */
    867  1.1  oster       xorNodes[0].params[2*i+0] = readDataNodes[i].params[0]; /* pda */
    868  1.1  oster       xorNodes[0].params[2*i+1] = readDataNodes[i].params[1]; /* buffer ptr */
    869  1.1  oster     }
    870  1.1  oster     for (i=0; i < numDataNodes; i++) {
    871  1.1  oster       /* set up params related to Wnd and Wnp nodes */
    872  1.1  oster       xorNodes[0].params[2*(numDataNodes+1+i)+0] = /* pda */
    873  1.1  oster         writeDataNodes[i].params[0];
    874  1.1  oster       xorNodes[0].params[2*(numDataNodes+1+i)+1] = /* buffer ptr */
    875  1.1  oster        writeDataNodes[i].params[1];
    876  1.1  oster     }
    877  1.1  oster     /* xor node needs to get at RAID information */
    878  1.1  oster     xorNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr;
    879  1.1  oster     xorNodes[0].results[0] = readParityNodes[0].params[1].p;
    880  1.1  oster     if (nfaults == 2)  {
    881  1.1  oster       rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
    882  1.1  oster         (numDataNodes + numParityNodes),
    883  1.1  oster         (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h,
    884  1.1  oster         qname, allocList);
    885  1.1  oster       for (i=0; i<numDataNodes; i++) {
    886  1.1  oster         /* set up params related to Rod */
    887  1.1  oster         qNodes[0].params[2*i+0] = readDataNodes[i].params[0]; /* pda */
    888  1.1  oster         qNodes[0].params[2*i+1] = readDataNodes[i].params[1]; /* buffer ptr */
    889  1.1  oster       }
    890  1.1  oster       /* and read old q */
    891  1.1  oster       qNodes[0].params[2*numDataNodes + 0] = /* pda */
    892  1.1  oster         readQNodes[0].params[0];
    893  1.1  oster       qNodes[0].params[2*numDataNodes + 1] = /* buffer ptr */
    894  1.1  oster         readQNodes[0].params[1];
    895  1.1  oster       for (i=0; i < numDataNodes; i++) {
    896  1.1  oster         /* set up params related to Wnd nodes */
    897  1.1  oster         qNodes[0].params[2*(numDataNodes+1+i)+0] = /* pda */
    898  1.1  oster           writeDataNodes[i].params[0];
    899  1.1  oster         qNodes[0].params[2*(numDataNodes+1+i)+1] = /* buffer ptr */
    900  1.1  oster           writeDataNodes[i].params[1];
    901  1.1  oster       }
    902  1.1  oster       /* xor node needs to get at RAID information */
    903  1.1  oster       qNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr;
    904  1.1  oster       qNodes[0].results[0] = readQNodes[0].params[1].p;
    905  1.1  oster     }
    906  1.1  oster   }
    907  1.1  oster 
    908  1.1  oster   /* initialize nodes which write new parity (Wnp) */
    909  1.1  oster   pda = asmap->parityInfo;
    910  1.1  oster   for (i=0;  i < numParityNodes; i++) {
    911  1.1  oster     rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
    912  1.1  oster       rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
    913  1.1  oster       "Wnp", allocList);
    914  1.1  oster     RF_ASSERT(pda != NULL);
    915  1.1  oster     writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr) filled in by xor node */
    916  1.1  oster     writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for parity write operation */
    917  1.1  oster     writeParityNodes[i].params[2].v = parityStripeID;
    918  1.1  oster     writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    919  1.1  oster       0, 0, which_ru);
    920  1.1  oster     if (lu_flag) {
    921  1.1  oster       /* initialize node to unlock the disk queue */
    922  1.1  oster       rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
    923  1.1  oster         rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
    924  1.1  oster         "Unp", allocList);
    925  1.1  oster       unlockParityNodes[i].params[0].p = pda; /* physical disk addr desc */
    926  1.1  oster       unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    927  1.1  oster         0, lu_flag, which_ru);
    928  1.1  oster     }
    929  1.1  oster     pda = pda->next;
    930  1.1  oster   }
    931  1.1  oster 
    932  1.1  oster   /* initialize nodes which write new Q (Wnq) */
    933  1.1  oster   if (nfaults == 2) {
    934  1.1  oster     pda = asmap->qInfo;
    935  1.1  oster     for (i=0;  i < numParityNodes; i++) {
    936  1.1  oster       rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
    937  1.1  oster         rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
    938  1.1  oster         "Wnq", allocList);
    939  1.1  oster       RF_ASSERT(pda != NULL);
    940  1.1  oster       writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr) filled in by xor node */
    941  1.1  oster       writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for parity write operation */
    942  1.1  oster       writeQNodes[i].params[2].v = parityStripeID;
    943  1.1  oster       writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    944  1.1  oster         0, 0, which_ru);
    945  1.1  oster       if (lu_flag) {
    946  1.1  oster         /* initialize node to unlock the disk queue */
    947  1.1  oster         rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
    948  1.1  oster           rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
    949  1.1  oster           "Unq", allocList);
    950  1.1  oster         unlockQNodes[i].params[0].p = pda; /* physical disk addr desc */
    951  1.1  oster         unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
    952  1.1  oster           0, lu_flag, which_ru);
    953  1.1  oster       }
    954  1.1  oster       pda = pda->next;
    955  1.1  oster     }
    956  1.1  oster   }
    957  1.1  oster 
    958  1.1  oster   /*
    959  1.1  oster    * Step 4. connect the nodes.
    960  1.1  oster    */
    961  1.1  oster 
    962  1.1  oster   /* connect header to block node */
    963  1.1  oster   dag_h->succedents[0] = blockNode;
    964  1.1  oster 
    965  1.1  oster   /* connect block node to read old data nodes */
    966  1.1  oster   RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
    967  1.1  oster   for (i = 0; i < numDataNodes; i++) {
    968  1.1  oster     blockNode->succedents[i] = &readDataNodes[i];
    969  1.1  oster     RF_ASSERT(readDataNodes[i].numAntecedents == 1);
    970  1.1  oster     readDataNodes[i].antecedents[0]= blockNode;
    971  1.1  oster     readDataNodes[i].antType[0] = rf_control;
    972  1.1  oster   }
    973  1.1  oster 
    974  1.1  oster   /* connect block node to read old parity nodes */
    975  1.1  oster   for (i = 0; i < numParityNodes; i++) {
    976  1.1  oster     blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
    977  1.1  oster     RF_ASSERT(readParityNodes[i].numAntecedents == 1);
    978  1.1  oster     readParityNodes[i].antecedents[0] = blockNode;
    979  1.1  oster     readParityNodes[i].antType[0] = rf_control;
    980  1.1  oster   }
    981  1.1  oster 
    982  1.1  oster   /* connect block node to read old Q nodes */
    983  1.1  oster   if (nfaults == 2) {
    984  1.1  oster     for (i = 0; i < numParityNodes; i++) {
    985  1.1  oster       blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
    986  1.1  oster       RF_ASSERT(readQNodes[i].numAntecedents == 1);
    987  1.1  oster       readQNodes[i].antecedents[0] = blockNode;
    988  1.1  oster       readQNodes[i].antType[0] = rf_control;
    989  1.1  oster     }
    990  1.1  oster   }
    991  1.1  oster 
    992  1.1  oster   /* connect read old data nodes to xor nodes */
    993  1.1  oster   for (i = 0; i < numDataNodes; i++) {
    994  1.1  oster     RF_ASSERT(readDataNodes[i].numSuccedents == (nfaults * numParityNodes));
    995  1.1  oster     for (j = 0; j < numParityNodes; j++){
    996  1.1  oster       RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
    997  1.1  oster       readDataNodes[i].succedents[j] = &xorNodes[j];
    998  1.1  oster       xorNodes[j].antecedents[i] = &readDataNodes[i];
    999  1.1  oster       xorNodes[j].antType[i] = rf_trueData;
   1000  1.1  oster     }
   1001  1.1  oster   }
   1002  1.1  oster 
   1003  1.1  oster   /* connect read old data nodes to q nodes */
   1004  1.1  oster   if (nfaults == 2) {
   1005  1.1  oster     for (i = 0; i < numDataNodes; i++) {
   1006  1.1  oster       for (j = 0; j < numParityNodes; j++) {
   1007  1.1  oster         RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
   1008  1.1  oster         readDataNodes[i].succedents[numParityNodes + j] = &qNodes[j];
   1009  1.1  oster         qNodes[j].antecedents[i] = &readDataNodes[i];
   1010  1.1  oster         qNodes[j].antType[i] = rf_trueData;
   1011  1.1  oster       }
   1012  1.1  oster     }
   1013  1.1  oster   }
   1014  1.1  oster 
   1015  1.1  oster   /* connect read old parity nodes to xor nodes */
   1016  1.1  oster   for (i = 0; i < numParityNodes; i++) {
   1017  1.1  oster     RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
   1018  1.1  oster     for (j = 0; j < numParityNodes; j++) {
   1019  1.1  oster       readParityNodes[i].succedents[j] = &xorNodes[j];
   1020  1.1  oster       xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
   1021  1.1  oster       xorNodes[j].antType[numDataNodes + i] = rf_trueData;
   1022  1.1  oster     }
   1023  1.1  oster   }
   1024  1.1  oster 
   1025  1.1  oster   /* connect read old q nodes to q nodes */
   1026  1.1  oster   if (nfaults == 2) {
   1027  1.1  oster     for (i = 0; i < numParityNodes; i++) {
   1028  1.1  oster       RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
   1029  1.1  oster       for (j = 0; j < numParityNodes; j++) {
   1030  1.1  oster         readQNodes[i].succedents[j] = &qNodes[j];
   1031  1.1  oster         qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
   1032  1.1  oster         qNodes[j].antType[numDataNodes + i] = rf_trueData;
   1033  1.1  oster       }
   1034  1.1  oster     }
   1035  1.1  oster   }
   1036  1.1  oster 
   1037  1.1  oster   /* connect xor nodes to commit node */
   1038  1.1  oster   RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes));
   1039  1.1  oster   for (i = 0; i < numParityNodes; i++) {
   1040  1.1  oster     RF_ASSERT(xorNodes[i].numSuccedents == 1);
   1041  1.1  oster     xorNodes[i].succedents[0] = commitNode;
   1042  1.1  oster     commitNode->antecedents[i] = &xorNodes[i];
   1043  1.1  oster     commitNode->antType[i] = rf_control;
   1044  1.1  oster   }
   1045  1.1  oster 
   1046  1.1  oster   /* connect q nodes to commit node */
   1047  1.1  oster   if (nfaults == 2) {
   1048  1.1  oster     for (i = 0; i < numParityNodes; i++) {
   1049  1.1  oster       RF_ASSERT(qNodes[i].numSuccedents == 1);
   1050  1.1  oster       qNodes[i].succedents[0] = commitNode;
   1051  1.1  oster       commitNode->antecedents[i + numParityNodes] = &qNodes[i];
   1052  1.1  oster       commitNode->antType[i + numParityNodes] = rf_control;
   1053  1.1  oster     }
   1054  1.1  oster   }
   1055  1.1  oster 
   1056  1.1  oster   /* connect commit node to write nodes */
   1057  1.1  oster   RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes)));
   1058  1.1  oster   for (i = 0; i < numDataNodes; i++) {
   1059  1.1  oster     RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
   1060  1.1  oster     commitNode->succedents[i] = &writeDataNodes[i];
   1061  1.1  oster     writeDataNodes[i].antecedents[0] = commitNode;
   1062  1.1  oster     writeDataNodes[i].antType[0] = rf_trueData;
   1063  1.1  oster   }
   1064  1.1  oster   for (i = 0; i < numParityNodes; i++) {
   1065  1.1  oster     RF_ASSERT(writeParityNodes[i].numAntecedents == 1);
   1066  1.1  oster     commitNode->succedents[i + numDataNodes] = &writeParityNodes[i];
   1067  1.1  oster     writeParityNodes[i].antecedents[0] = commitNode;
   1068  1.1  oster     writeParityNodes[i].antType[0] = rf_trueData;
   1069  1.1  oster   }
   1070  1.1  oster   if (nfaults == 2) {
   1071  1.1  oster     for (i = 0; i < numParityNodes; i++) {
   1072  1.1  oster       RF_ASSERT(writeQNodes[i].numAntecedents == 1);
   1073  1.1  oster       commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i];
   1074  1.1  oster       writeQNodes[i].antecedents[0] = commitNode;
   1075  1.1  oster       writeQNodes[i].antType[0] = rf_trueData;
   1076  1.1  oster     }
   1077  1.1  oster   }
   1078  1.1  oster 
   1079  1.1  oster   RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
   1080  1.1  oster   RF_ASSERT(termNode->numSuccedents == 0);
   1081  1.1  oster   for (i = 0; i < numDataNodes; i++) {
   1082  1.1  oster     if (lu_flag) {
   1083  1.1  oster       /* connect write new data nodes to unlock nodes */
   1084  1.1  oster       RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
   1085  1.1  oster       RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
   1086  1.1  oster       writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
   1087  1.1  oster       unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
   1088  1.1  oster       unlockDataNodes[i].antType[0] = rf_control;
   1089  1.1  oster 
   1090  1.1  oster       /* connect unlock nodes to term node */
   1091  1.1  oster       RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
   1092  1.1  oster       unlockDataNodes[i].succedents[0] = termNode;
   1093  1.1  oster       termNode->antecedents[i] = &unlockDataNodes[i];
   1094  1.1  oster       termNode->antType[i] = rf_control;
   1095  1.1  oster     }
   1096  1.1  oster     else {
   1097  1.1  oster       /* connect write new data nodes to term node */
   1098  1.1  oster       RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
   1099  1.1  oster       RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
   1100  1.1  oster       writeDataNodes[i].succedents[0] = termNode;
   1101  1.1  oster       termNode->antecedents[i] = &writeDataNodes[i];
   1102  1.1  oster       termNode->antType[i] = rf_control;
   1103  1.1  oster     }
   1104  1.1  oster   }
   1105  1.1  oster 
   1106  1.1  oster   for (i = 0; i < numParityNodes; i++) {
   1107  1.1  oster     if (lu_flag) {
   1108  1.1  oster       /* connect write new parity nodes to unlock nodes */
   1109  1.1  oster       RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
   1110  1.1  oster       RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
   1111  1.1  oster       writeParityNodes[i].succedents[0] = &unlockParityNodes[i];
   1112  1.1  oster       unlockParityNodes[i].antecedents[0] = &writeParityNodes[i];
   1113  1.1  oster       unlockParityNodes[i].antType[0] = rf_control;
   1114  1.1  oster 
   1115  1.1  oster       /* connect unlock nodes to term node */
   1116  1.1  oster       RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
   1117  1.1  oster       unlockParityNodes[i].succedents[0] = termNode;
   1118  1.1  oster       termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i];
   1119  1.1  oster       termNode->antType[numDataNodes + i] = rf_control;
   1120  1.1  oster     }
   1121  1.1  oster     else {
   1122  1.1  oster       RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
   1123  1.1  oster       writeParityNodes[i].succedents[0] = termNode;
   1124  1.1  oster       termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
   1125  1.1  oster       termNode->antType[numDataNodes + i] = rf_control;
   1126  1.1  oster     }
   1127  1.1  oster   }
   1128  1.1  oster 
   1129  1.1  oster   if (nfaults == 2) {
   1130  1.1  oster     for (i = 0; i < numParityNodes; i++) {
   1131  1.1  oster       if (lu_flag) {
   1132  1.1  oster         /* connect write new Q nodes to unlock nodes */
   1133  1.1  oster         RF_ASSERT(writeQNodes[i].numSuccedents == 1);
   1134  1.1  oster         RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
   1135  1.1  oster         writeQNodes[i].succedents[0] = &unlockQNodes[i];
   1136  1.1  oster         unlockQNodes[i].antecedents[0] = &writeQNodes[i];
   1137  1.1  oster         unlockQNodes[i].antType[0] = rf_control;
   1138  1.1  oster 
   1139  1.1  oster         /* connect unlock nodes to unblock node */
   1140  1.1  oster         RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
   1141  1.1  oster         unlockQNodes[i].succedents[0] = termNode;
   1142  1.1  oster         termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i];
   1143  1.1  oster         termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
   1144  1.1  oster       }
   1145  1.1  oster       else {
   1146  1.1  oster         RF_ASSERT(writeQNodes[i].numSuccedents == 1);
   1147  1.1  oster         writeQNodes[i].succedents[0] = termNode;
   1148  1.1  oster         termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
   1149  1.1  oster         termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
   1150  1.1  oster       }
   1151  1.1  oster     }
   1152  1.1  oster   }
   1153  1.1  oster }
   1154  1.1  oster 
   1155  1.1  oster 
   1156  1.1  oster /******************************************************************************
   1157  1.1  oster  * create a write graph (fault-free or degraded) for RAID level 1
   1158  1.1  oster  *
   1159  1.1  oster  * Hdr -> Commit -> Wpd -> Nil -> Trm
   1160  1.1  oster  *               -> Wsd ->
   1161  1.1  oster  *
   1162  1.1  oster  * The "Wpd" node writes data to the primary copy in the mirror pair
   1163  1.1  oster  * The "Wsd" node writes data to the secondary copy in the mirror pair
   1164  1.1  oster  *
   1165  1.1  oster  * Parameters:  raidPtr   - description of the physical array
   1166  1.1  oster  *              asmap     - logical & physical addresses for this access
   1167  1.1  oster  *              bp        - buffer ptr (holds write data)
   1168  1.1  oster  *              flags     - general flags (e.g. disk locking)
   1169  1.1  oster  *              allocList - list of memory allocated in DAG creation
   1170  1.1  oster  *****************************************************************************/
   1171  1.1  oster 
   1172  1.1  oster void rf_CreateRaidOneWriteDAG(
   1173  1.1  oster   RF_Raid_t             *raidPtr,
   1174  1.1  oster   RF_AccessStripeMap_t  *asmap,
   1175  1.1  oster   RF_DagHeader_t        *dag_h,
   1176  1.1  oster   void                  *bp,
   1177  1.1  oster   RF_RaidAccessFlags_t   flags,
   1178  1.1  oster   RF_AllocListElem_t    *allocList)
   1179  1.1  oster {
   1180  1.1  oster   RF_DagNode_t *unblockNode, *termNode, *commitNode;
   1181  1.1  oster   RF_DagNode_t *nodes, *wndNode, *wmirNode;
   1182  1.1  oster   int nWndNodes, nWmirNodes, i;
   1183  1.1  oster   RF_ReconUnitNum_t which_ru;
   1184  1.1  oster   RF_PhysDiskAddr_t *pda, *pdaP;
   1185  1.1  oster   RF_StripeNum_t parityStripeID;
   1186  1.1  oster 
   1187  1.1  oster   parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
   1188  1.1  oster     asmap->raidAddress, &which_ru);
   1189  1.1  oster   if (rf_dagDebug) {
   1190  1.1  oster     printf("[Creating RAID level 1 write DAG]\n");
   1191  1.1  oster   }
   1192  1.1  oster   dag_h->creator = "RaidOneWriteDAG";
   1193  1.1  oster 
   1194  1.1  oster   /* 2 implies access not SU aligned */
   1195  1.1  oster   nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
   1196  1.1  oster   nWndNodes =  (asmap->physInfo->next) ? 2 : 1;
   1197  1.1  oster 
   1198  1.1  oster   /* alloc the Wnd nodes and the Wmir node */
   1199  1.1  oster   if (asmap->numDataFailed == 1)
   1200  1.1  oster     nWndNodes--;
   1201  1.1  oster   if (asmap->numParityFailed == 1)
   1202  1.1  oster     nWmirNodes--;
   1203  1.1  oster 
   1204  1.1  oster   /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock + terminator) */
   1205  1.1  oster   RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t),
   1206  1.1  oster     (RF_DagNode_t *), allocList);
   1207  1.1  oster   i = 0;
   1208  1.1  oster   wndNode     = &nodes[i]; i += nWndNodes;
   1209  1.1  oster   wmirNode    = &nodes[i]; i += nWmirNodes;
   1210  1.1  oster   commitNode   = &nodes[i]; i += 1;
   1211  1.1  oster   unblockNode = &nodes[i]; i += 1;
   1212  1.1  oster   termNode = &nodes[i]; i += 1;
   1213  1.1  oster   RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
   1214  1.1  oster 
   1215  1.1  oster   /* this dag can commit immediately */
   1216  1.1  oster   dag_h->numCommitNodes = 1;
   1217  1.1  oster   dag_h->numCommits = 0;
   1218  1.1  oster   dag_h->numSuccedents = 1;
   1219  1.1  oster 
   1220  1.1  oster   /* initialize the commit, unblock, and term nodes */
   1221  1.1  oster   rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
   1222  1.1  oster     NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Cmt", allocList);
   1223  1.1  oster   rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
   1224  1.1  oster     NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList);
   1225  1.1  oster   rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
   1226  1.1  oster     NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
   1227  1.1  oster 
   1228  1.1  oster   /* initialize the wnd nodes */
   1229  1.1  oster   if (nWndNodes > 0) {
   1230  1.1  oster     pda = asmap->physInfo;
   1231  1.1  oster     for (i = 0; i < nWndNodes; i++) {
   1232  1.1  oster       rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
   1233  1.1  oster         rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList);
   1234  1.1  oster       RF_ASSERT(pda != NULL);
   1235  1.1  oster       wndNode[i].params[0].p = pda;
   1236  1.1  oster       wndNode[i].params[1].p = pda->bufPtr;
   1237  1.1  oster       wndNode[i].params[2].v = parityStripeID;
   1238  1.1  oster       wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1239  1.1  oster       pda = pda->next;
   1240  1.1  oster     }
   1241  1.1  oster     RF_ASSERT(pda == NULL);
   1242  1.1  oster   }
   1243  1.1  oster 
   1244  1.1  oster   /* initialize the mirror nodes */
   1245  1.1  oster   if (nWmirNodes > 0) {
   1246  1.1  oster     pda = asmap->physInfo;
   1247  1.1  oster     pdaP = asmap->parityInfo;
   1248  1.1  oster     for (i = 0; i < nWmirNodes; i++) {
   1249  1.1  oster       rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
   1250  1.1  oster         rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList);
   1251  1.1  oster       RF_ASSERT(pda != NULL);
   1252  1.1  oster       wmirNode[i].params[0].p = pdaP;
   1253  1.1  oster       wmirNode[i].params[1].p = pda->bufPtr;
   1254  1.1  oster       wmirNode[i].params[2].v = parityStripeID;
   1255  1.1  oster       wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1256  1.1  oster       pda = pda->next;
   1257  1.1  oster       pdaP = pdaP->next;
   1258  1.1  oster     }
   1259  1.1  oster     RF_ASSERT(pda == NULL);
   1260  1.1  oster     RF_ASSERT(pdaP == NULL);
   1261  1.1  oster   }
   1262  1.1  oster 
   1263  1.1  oster   /* link the header node to the commit node */
   1264  1.1  oster   RF_ASSERT(dag_h->numSuccedents == 1);
   1265  1.1  oster   RF_ASSERT(commitNode->numAntecedents == 0);
   1266  1.1  oster   dag_h->succedents[0] = commitNode;
   1267  1.1  oster 
   1268  1.1  oster   /* link the commit node to the write nodes */
   1269  1.1  oster   RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes));
   1270  1.1  oster   for (i = 0; i < nWndNodes; i++) {
   1271  1.1  oster     RF_ASSERT(wndNode[i].numAntecedents == 1);
   1272  1.1  oster     commitNode->succedents[i] = &wndNode[i];
   1273  1.1  oster     wndNode[i].antecedents[0] = commitNode;
   1274  1.1  oster     wndNode[i].antType[0] = rf_control;
   1275  1.1  oster   }
   1276  1.1  oster   for (i = 0; i < nWmirNodes; i++) {
   1277  1.1  oster     RF_ASSERT(wmirNode[i].numAntecedents == 1);
   1278  1.1  oster     commitNode->succedents[i + nWndNodes] = &wmirNode[i];
   1279  1.1  oster     wmirNode[i].antecedents[0] = commitNode;
   1280  1.1  oster     wmirNode[i].antType[0] = rf_control;
   1281  1.1  oster   }
   1282  1.1  oster 
   1283  1.1  oster   /* link the write nodes to the unblock node */
   1284  1.1  oster   RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
   1285  1.1  oster   for (i = 0; i < nWndNodes; i++) {
   1286  1.1  oster     RF_ASSERT(wndNode[i].numSuccedents == 1);
   1287  1.1  oster     wndNode[i].succedents[0] = unblockNode;
   1288  1.1  oster     unblockNode->antecedents[i] = &wndNode[i];
   1289  1.1  oster     unblockNode->antType[i] = rf_control;
   1290  1.1  oster   }
   1291  1.1  oster   for (i = 0; i < nWmirNodes; i++) {
   1292  1.1  oster     RF_ASSERT(wmirNode[i].numSuccedents == 1);
   1293  1.1  oster     wmirNode[i].succedents[0] = unblockNode;
   1294  1.1  oster     unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
   1295  1.1  oster     unblockNode->antType[i + nWndNodes] = rf_control;
   1296  1.1  oster   }
   1297  1.1  oster 
   1298  1.1  oster   /* link the unblock node to the term node */
   1299  1.1  oster   RF_ASSERT(unblockNode->numSuccedents == 1);
   1300  1.1  oster   RF_ASSERT(termNode->numAntecedents == 1);
   1301  1.1  oster   RF_ASSERT(termNode->numSuccedents == 0);
   1302  1.1  oster   unblockNode->succedents[0] = termNode;
   1303  1.1  oster   termNode->antecedents[0] = unblockNode;
   1304  1.1  oster   termNode->antType[0] = rf_control;
   1305  1.1  oster }
   1306  1.1  oster 
   1307  1.1  oster 
   1308  1.1  oster 
   1309  1.1  oster /* DAGs which have no commit points.
   1310  1.1  oster  *
   1311  1.1  oster  * The following DAGs are used in forward and backward error recovery experiments.
   1312  1.1  oster  * They are identical to the DAGs above this comment with the exception that the
   1313  1.1  oster  * the commit points have been removed.
   1314  1.1  oster  */
   1315  1.1  oster 
   1316  1.1  oster 
   1317  1.1  oster 
   1318  1.1  oster void rf_CommonCreateLargeWriteDAGFwd(
   1319  1.1  oster   RF_Raid_t             *raidPtr,
   1320  1.1  oster   RF_AccessStripeMap_t  *asmap,
   1321  1.1  oster   RF_DagHeader_t        *dag_h,
   1322  1.1  oster   void                  *bp,
   1323  1.1  oster   RF_RaidAccessFlags_t   flags,
   1324  1.1  oster   RF_AllocListElem_t    *allocList,
   1325  1.1  oster   int                    nfaults,
   1326  1.1  oster   int                  (*redFunc)(RF_DagNode_t *),
   1327  1.1  oster   int                    allowBufferRecycle)
   1328  1.1  oster {
   1329  1.1  oster   RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
   1330  1.1  oster   RF_DagNode_t *wnqNode, *blockNode, *syncNode, *termNode;
   1331  1.1  oster   int nWndNodes, nRodNodes, i, nodeNum, asmNum;
   1332  1.1  oster   RF_AccessStripeMapHeader_t *new_asm_h[2];
   1333  1.1  oster   RF_StripeNum_t parityStripeID;
   1334  1.1  oster   char *sosBuffer, *eosBuffer;
   1335  1.1  oster   RF_ReconUnitNum_t which_ru;
   1336  1.1  oster   RF_RaidLayout_t *layoutPtr;
   1337  1.1  oster   RF_PhysDiskAddr_t *pda;
   1338  1.1  oster 
   1339  1.1  oster   layoutPtr = &(raidPtr->Layout);
   1340  1.1  oster   parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
   1341  1.1  oster 
   1342  1.1  oster   if (rf_dagDebug)
   1343  1.1  oster     printf("[Creating large-write DAG]\n");
   1344  1.1  oster   dag_h->creator = "LargeWriteDAGFwd";
   1345  1.1  oster 
   1346  1.1  oster   dag_h->numCommitNodes = 0;
   1347  1.1  oster   dag_h->numCommits = 0;
   1348  1.1  oster   dag_h->numSuccedents = 1;
   1349  1.1  oster 
   1350  1.1  oster   /* alloc the nodes: Wnd, xor, commit, block, term, and  Wnp */
   1351  1.1  oster   nWndNodes = asmap->numStripeUnitsAccessed;
   1352  1.1  oster   RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
   1353  1.1  oster   i = 0;
   1354  1.1  oster   wndNodes    = &nodes[i]; i += nWndNodes;
   1355  1.1  oster   xorNode     = &nodes[i]; i += 1;
   1356  1.1  oster   wnpNode     = &nodes[i]; i += 1;
   1357  1.1  oster   blockNode   = &nodes[i]; i += 1;
   1358  1.1  oster   syncNode  = &nodes[i]; i += 1;
   1359  1.1  oster   termNode    = &nodes[i]; i += 1;
   1360  1.1  oster   if (nfaults == 2) {
   1361  1.1  oster     wnqNode   = &nodes[i]; i += 1;
   1362  1.1  oster   }
   1363  1.1  oster   else {
   1364  1.1  oster     wnqNode = NULL;
   1365  1.1  oster   }
   1366  1.1  oster   rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
   1367  1.1  oster   if (nRodNodes > 0) {
   1368  1.1  oster     RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
   1369  1.1  oster   }
   1370  1.1  oster   else {
   1371  1.1  oster     rodNodes = NULL;
   1372  1.1  oster   }
   1373  1.1  oster 
   1374  1.1  oster   /* begin node initialization */
   1375  1.1  oster   if (nRodNodes > 0) {
   1376  1.1  oster     rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList);
   1377  1.1  oster     rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes, 0, 0, dag_h, "Nil", allocList);
   1378  1.1  oster   }
   1379  1.1  oster   else {
   1380  1.1  oster     rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil", allocList);
   1381  1.1  oster     rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, 1, 0, 0, dag_h, "Nil", allocList);
   1382  1.1  oster   }
   1383  1.1  oster 
   1384  1.1  oster   rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList);
   1385  1.1  oster 
   1386  1.1  oster   /* initialize the Rod nodes */
   1387  1.1  oster   for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
   1388  1.1  oster     if (new_asm_h[asmNum]) {
   1389  1.1  oster       pda = new_asm_h[asmNum]->stripeMap->physInfo;
   1390  1.1  oster       while (pda) {
   1391  1.1  oster 	rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Rod", allocList);
   1392  1.1  oster 	rodNodes[nodeNum].params[0].p = pda;
   1393  1.1  oster 	rodNodes[nodeNum].params[1].p = pda->bufPtr;
   1394  1.1  oster 	rodNodes[nodeNum].params[2].v = parityStripeID;
   1395  1.1  oster 	rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1396  1.1  oster 	nodeNum++;
   1397  1.1  oster 	pda=pda->next;
   1398  1.1  oster       }
   1399  1.1  oster     }
   1400  1.1  oster   }
   1401  1.1  oster   RF_ASSERT(nodeNum == nRodNodes);
   1402  1.1  oster 
   1403  1.1  oster   /* initialize the wnd nodes */
   1404  1.1  oster   pda = asmap->physInfo;
   1405  1.1  oster   for (i=0; i < nWndNodes; i++) {
   1406  1.1  oster     rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
   1407  1.1  oster     RF_ASSERT(pda != NULL);
   1408  1.1  oster     wndNodes[i].params[0].p = pda;
   1409  1.1  oster     wndNodes[i].params[1].p = pda->bufPtr;
   1410  1.1  oster     wndNodes[i].params[2].v = parityStripeID;
   1411  1.1  oster     wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1412  1.1  oster     pda = pda->next;
   1413  1.1  oster   }
   1414  1.1  oster 
   1415  1.1  oster   /* initialize the redundancy node */
   1416  1.1  oster   rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, nfaults, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);
   1417  1.1  oster   xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
   1418  1.1  oster   for (i=0; i < nWndNodes; i++) {
   1419  1.1  oster     xorNode->params[2*i+0] = wndNodes[i].params[0];         /* pda */
   1420  1.1  oster     xorNode->params[2*i+1] = wndNodes[i].params[1];         /* buf ptr */
   1421  1.1  oster   }
   1422  1.1  oster   for (i=0; i < nRodNodes; i++) {
   1423  1.1  oster     xorNode->params[2*(nWndNodes+i)+0] = rodNodes[i].params[0];         /* pda */
   1424  1.1  oster     xorNode->params[2*(nWndNodes+i)+1] = rodNodes[i].params[1];         /* buf ptr */
   1425  1.1  oster   }
   1426  1.1  oster   xorNode->params[2*(nWndNodes+nRodNodes)].p = raidPtr; /* xor node needs to get at RAID information */
   1427  1.1  oster 
   1428  1.1  oster   /* look for an Rod node that reads a complete SU.  If none, alloc a buffer to receive the parity info.
   1429  1.1  oster    * Note that we can't use a new data buffer because it will not have gotten written when the xor occurs.
   1430  1.1  oster    */
   1431  1.1  oster   if (allowBufferRecycle) {
   1432  1.1  oster     for (i = 0; i < nRodNodes; i++)
   1433  1.1  oster       if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
   1434  1.1  oster         break;
   1435  1.1  oster   }
   1436  1.1  oster   if ((!allowBufferRecycle) || (i == nRodNodes)) {
   1437  1.1  oster     RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
   1438  1.1  oster   }
   1439  1.1  oster   else
   1440  1.1  oster     xorNode->results[0] = rodNodes[i].params[1].p;
   1441  1.1  oster 
   1442  1.1  oster   /* initialize the Wnp node */
   1443  1.1  oster   rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);
   1444  1.1  oster   wnpNode->params[0].p = asmap->parityInfo;
   1445  1.1  oster   wnpNode->params[1].p = xorNode->results[0];
   1446  1.1  oster   wnpNode->params[2].v = parityStripeID;
   1447  1.1  oster   wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1448  1.1  oster   RF_ASSERT(asmap->parityInfo->next == NULL);        /* parityInfo must describe entire parity unit */
   1449  1.1  oster 
   1450  1.1  oster   if (nfaults == 2)
   1451  1.1  oster     {
   1452  1.1  oster       /* we never try to recycle a buffer for the Q calcuation in addition to the parity.
   1453  1.1  oster 	 This would cause two buffers to get smashed during the P and Q calculation,
   1454  1.1  oster 	 guaranteeing one would be wrong.
   1455  1.1  oster       */
   1456  1.1  oster       RF_CallocAndAdd(xorNode->results[1], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
   1457  1.1  oster       rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);
   1458  1.1  oster       wnqNode->params[0].p = asmap->qInfo;
   1459  1.1  oster       wnqNode->params[1].p = xorNode->results[1];
   1460  1.1  oster       wnqNode->params[2].v = parityStripeID;
   1461  1.1  oster       wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1462  1.1  oster       RF_ASSERT(asmap->parityInfo->next == NULL);        /* parityInfo must describe entire parity unit */
   1463  1.1  oster     }
   1464  1.1  oster 
   1465  1.1  oster 
   1466  1.1  oster   /* connect nodes to form graph */
   1467  1.1  oster 
   1468  1.1  oster   /* connect dag header to block node */
   1469  1.1  oster   RF_ASSERT(blockNode->numAntecedents == 0);
   1470  1.1  oster   dag_h->succedents[0] = blockNode;
   1471  1.1  oster 
   1472  1.1  oster   if (nRodNodes > 0) {
   1473  1.1  oster     /* connect the block node to the Rod nodes */
   1474  1.1  oster     RF_ASSERT(blockNode->numSuccedents == nRodNodes);
   1475  1.1  oster     RF_ASSERT(syncNode->numAntecedents == nRodNodes);
   1476  1.1  oster     for (i = 0; i < nRodNodes; i++) {
   1477  1.1  oster       RF_ASSERT(rodNodes[i].numAntecedents == 1);
   1478  1.1  oster       blockNode->succedents[i] = &rodNodes[i];
   1479  1.1  oster       rodNodes[i].antecedents[0] = blockNode;
   1480  1.1  oster       rodNodes[i].antType[0] = rf_control;
   1481  1.1  oster 
   1482  1.1  oster       /* connect the Rod nodes to the Nil node */
   1483  1.1  oster       RF_ASSERT(rodNodes[i].numSuccedents == 1);
   1484  1.1  oster       rodNodes[i].succedents[0] = syncNode;
   1485  1.1  oster       syncNode->antecedents[i] = &rodNodes[i];
   1486  1.1  oster       syncNode->antType[i] = rf_trueData;
   1487  1.1  oster     }
   1488  1.1  oster   }
   1489  1.1  oster   else {
   1490  1.1  oster     /* connect the block node to the Nil node */
   1491  1.1  oster     RF_ASSERT(blockNode->numSuccedents == 1);
   1492  1.1  oster     RF_ASSERT(syncNode->numAntecedents == 1);
   1493  1.1  oster     blockNode->succedents[0] = syncNode;
   1494  1.1  oster     syncNode->antecedents[0] = blockNode;
   1495  1.1  oster     syncNode->antType[0] = rf_control;
   1496  1.1  oster   }
   1497  1.1  oster 
   1498  1.1  oster   /* connect the sync node to the Wnd nodes */
   1499  1.1  oster   RF_ASSERT(syncNode->numSuccedents == (1 + nWndNodes));
   1500  1.1  oster   for (i = 0; i < nWndNodes; i++) {
   1501  1.1  oster     RF_ASSERT(wndNodes->numAntecedents == 1);
   1502  1.1  oster     syncNode->succedents[i] = &wndNodes[i];
   1503  1.1  oster     wndNodes[i].antecedents[0] = syncNode;
   1504  1.1  oster     wndNodes[i].antType[0] = rf_control;
   1505  1.1  oster   }
   1506  1.1  oster 
   1507  1.1  oster   /* connect the sync node to the Xor node */
   1508  1.1  oster   RF_ASSERT(xorNode->numAntecedents == 1);
   1509  1.1  oster   syncNode->succedents[nWndNodes] = xorNode;
   1510  1.1  oster   xorNode->antecedents[0] = syncNode;
   1511  1.1  oster   xorNode->antType[0] = rf_control;
   1512  1.1  oster 
   1513  1.1  oster   /* connect the xor node to the write parity node */
   1514  1.1  oster   RF_ASSERT(xorNode->numSuccedents == nfaults);
   1515  1.1  oster   RF_ASSERT(wnpNode->numAntecedents == 1);
   1516  1.1  oster   xorNode->succedents[0] = wnpNode;
   1517  1.1  oster   wnpNode->antecedents[0]= xorNode;
   1518  1.1  oster   wnpNode->antType[0] = rf_trueData;
   1519  1.1  oster   if (nfaults == 2) {
   1520  1.1  oster     RF_ASSERT(wnqNode->numAntecedents == 1);
   1521  1.1  oster     xorNode->succedents[1] = wnqNode;
   1522  1.1  oster     wnqNode->antecedents[0] = xorNode;
   1523  1.1  oster     wnqNode->antType[0] = rf_trueData;
   1524  1.1  oster   }
   1525  1.1  oster 
   1526  1.1  oster   /* connect the write nodes to the term node */
   1527  1.1  oster   RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
   1528  1.1  oster   RF_ASSERT(termNode->numSuccedents == 0);
   1529  1.1  oster   for (i = 0; i < nWndNodes; i++) {
   1530  1.1  oster     RF_ASSERT(wndNodes->numSuccedents == 1);
   1531  1.1  oster     wndNodes[i].succedents[0] = termNode;
   1532  1.1  oster     termNode->antecedents[i] = &wndNodes[i];
   1533  1.1  oster     termNode->antType[i] = rf_control;
   1534  1.1  oster   }
   1535  1.1  oster   RF_ASSERT(wnpNode->numSuccedents == 1);
   1536  1.1  oster   wnpNode->succedents[0] = termNode;
   1537  1.1  oster   termNode->antecedents[nWndNodes] = wnpNode;
   1538  1.1  oster   termNode->antType[nWndNodes] = rf_control;
   1539  1.1  oster   if (nfaults == 2) {
   1540  1.1  oster     RF_ASSERT(wnqNode->numSuccedents == 1);
   1541  1.1  oster     wnqNode->succedents[0] = termNode;
   1542  1.1  oster     termNode->antecedents[nWndNodes + 1] = wnqNode;
   1543  1.1  oster     termNode->antType[nWndNodes + 1] = rf_control;
   1544  1.1  oster   }
   1545  1.1  oster }
   1546  1.1  oster 
   1547  1.1  oster 
   1548  1.1  oster /******************************************************************************
   1549  1.1  oster  *
   1550  1.1  oster  * creates a DAG to perform a small-write operation (either raid 5 or pq),
   1551  1.1  oster  * which is as follows:
   1552  1.1  oster  *
   1553  1.1  oster  * Hdr -> Nil -> Rop - Xor - Wnp [Unp] -- Trm
   1554  1.1  oster  *            \- Rod X- Wnd [Und] -------/
   1555  1.1  oster  *           [\- Rod X- Wnd [Und] ------/]
   1556  1.1  oster  *           [\- Roq - Q --> Wnq [Unq]-/]
   1557  1.1  oster  *
   1558  1.1  oster  * Rop = read old parity
   1559  1.1  oster  * Rod = read old data
   1560  1.1  oster  * Roq = read old "q"
   1561  1.1  oster  * Cmt = commit node
   1562  1.1  oster  * Und = unlock data disk
   1563  1.1  oster  * Unp = unlock parity disk
   1564  1.1  oster  * Unq = unlock q disk
   1565  1.1  oster  * Wnp = write new parity
   1566  1.1  oster  * Wnd = write new data
   1567  1.1  oster  * Wnq = write new "q"
   1568  1.1  oster  * [ ] denotes optional segments in the graph
   1569  1.1  oster  *
   1570  1.1  oster  * Parameters:  raidPtr   - description of the physical array
   1571  1.1  oster  *              asmap     - logical & physical addresses for this access
   1572  1.1  oster  *              bp        - buffer ptr (holds write data)
   1573  1.1  oster  *              flags     - general flags (e.g. disk locking)
   1574  1.1  oster  *              allocList - list of memory allocated in DAG creation
   1575  1.1  oster  *              pfuncs    - list of parity generating functions
   1576  1.1  oster  *              qfuncs    - list of q generating functions
   1577  1.1  oster  *
   1578  1.1  oster  * A null qfuncs indicates single fault tolerant
   1579  1.1  oster  *****************************************************************************/
   1580  1.1  oster 
   1581  1.1  oster void rf_CommonCreateSmallWriteDAGFwd(
   1582  1.1  oster   RF_Raid_t             *raidPtr,
   1583  1.1  oster   RF_AccessStripeMap_t  *asmap,
   1584  1.1  oster   RF_DagHeader_t        *dag_h,
   1585  1.1  oster   void                  *bp,
   1586  1.1  oster   RF_RaidAccessFlags_t   flags,
   1587  1.1  oster   RF_AllocListElem_t    *allocList,
   1588  1.1  oster   RF_RedFuncs_t         *pfuncs,
   1589  1.1  oster   RF_RedFuncs_t         *qfuncs)
   1590  1.1  oster {
   1591  1.1  oster   RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
   1592  1.1  oster   RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
   1593  1.1  oster   RF_DagNode_t *xorNodes, *qNodes, *blockNode, *nodes;
   1594  1.1  oster   RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
   1595  1.1  oster   int i, j, nNodes, totalNumNodes, lu_flag;
   1596  1.1  oster   RF_ReconUnitNum_t which_ru;
   1597  1.1  oster   int (*func)(RF_DagNode_t *), (*undoFunc)(RF_DagNode_t *);
   1598  1.1  oster   int (*qfunc)(RF_DagNode_t *);
   1599  1.1  oster   int numDataNodes, numParityNodes;
   1600  1.1  oster   RF_StripeNum_t parityStripeID;
   1601  1.1  oster   RF_PhysDiskAddr_t *pda;
   1602  1.1  oster   char *name, *qname;
   1603  1.1  oster   long nfaults;
   1604  1.1  oster 
   1605  1.1  oster   nfaults = qfuncs ? 2 : 1;
   1606  1.1  oster   lu_flag = (rf_enableAtomicRMW) ? 1 : 0;          /* lock/unlock flag */
   1607  1.1  oster 
   1608  1.1  oster   parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
   1609  1.1  oster   pda = asmap->physInfo;
   1610  1.1  oster   numDataNodes = asmap->numStripeUnitsAccessed;
   1611  1.1  oster   numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
   1612  1.1  oster 
   1613  1.1  oster   if (rf_dagDebug) printf("[Creating small-write DAG]\n");
   1614  1.1  oster   RF_ASSERT(numDataNodes > 0);
   1615  1.1  oster   dag_h->creator = "SmallWriteDAGFwd";
   1616  1.1  oster 
   1617  1.1  oster   dag_h->numCommitNodes = 0;
   1618  1.1  oster   dag_h->numCommits = 0;
   1619  1.1  oster   dag_h->numSuccedents = 1;
   1620  1.1  oster 
   1621  1.1  oster   qfunc = NULL;
   1622  1.1  oster   qname = NULL;
   1623  1.1  oster 
   1624  1.1  oster   /* DAG creation occurs in four steps:
   1625  1.1  oster      1. count the number of nodes in the DAG
   1626  1.1  oster      2. create the nodes
   1627  1.1  oster      3. initialize the nodes
   1628  1.1  oster      4. connect the nodes
   1629  1.1  oster    */
   1630  1.1  oster 
   1631  1.1  oster   /* Step 1. compute number of nodes in the graph */
   1632  1.1  oster 
   1633  1.1  oster   /* number of nodes:
   1634  1.1  oster       a read and write for each data unit
   1635  1.1  oster       a redundancy computation node for each parity node (nfaults * nparity)
   1636  1.1  oster       a read and write for each parity unit
   1637  1.1  oster       a block node
   1638  1.1  oster       a terminate node
   1639  1.1  oster       if atomic RMW
   1640  1.1  oster         an unlock node for each data unit, redundancy unit
   1641  1.1  oster   */
   1642  1.1  oster   totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) + (nfaults * 2 * numParityNodes) + 2;
   1643  1.1  oster   if (lu_flag)
   1644  1.1  oster     totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
   1645  1.1  oster 
   1646  1.1  oster 
   1647  1.1  oster   /* Step 2. create the nodes */
   1648  1.1  oster   RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
   1649  1.1  oster   i = 0;
   1650  1.1  oster   blockNode        = &nodes[i]; i += 1;
   1651  1.1  oster   readDataNodes    = &nodes[i]; i += numDataNodes;
   1652  1.1  oster   readParityNodes  = &nodes[i]; i += numParityNodes;
   1653  1.1  oster   writeDataNodes   = &nodes[i]; i += numDataNodes;
   1654  1.1  oster   writeParityNodes = &nodes[i]; i += numParityNodes;
   1655  1.1  oster   xorNodes         = &nodes[i]; i += numParityNodes;
   1656  1.1  oster   termNode         = &nodes[i]; i += 1;
   1657  1.1  oster   if (lu_flag) {
   1658  1.1  oster     unlockDataNodes   = &nodes[i]; i += numDataNodes;
   1659  1.1  oster     unlockParityNodes = &nodes[i]; i += numParityNodes;
   1660  1.1  oster   }
   1661  1.1  oster   else {
   1662  1.1  oster     unlockDataNodes = unlockParityNodes = NULL;
   1663  1.1  oster   }
   1664  1.1  oster   if (nfaults == 2) {
   1665  1.1  oster     readQNodes     = &nodes[i]; i += numParityNodes;
   1666  1.1  oster     writeQNodes    = &nodes[i]; i += numParityNodes;
   1667  1.1  oster     qNodes         = &nodes[i]; i += numParityNodes;
   1668  1.1  oster     if (lu_flag) {
   1669  1.1  oster       unlockQNodes    = &nodes[i]; i += numParityNodes;
   1670  1.1  oster     }
   1671  1.1  oster     else {
   1672  1.1  oster       unlockQNodes = NULL;
   1673  1.1  oster     }
   1674  1.1  oster   }
   1675  1.1  oster   else {
   1676  1.1  oster     readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
   1677  1.1  oster   }
   1678  1.1  oster   RF_ASSERT(i == totalNumNodes);
   1679  1.1  oster 
   1680  1.1  oster   /* Step 3. initialize the nodes */
   1681  1.1  oster   /* initialize block node (Nil) */
   1682  1.1  oster   nNodes     = numDataNodes + (nfaults * numParityNodes);
   1683  1.1  oster   rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
   1684  1.1  oster 
   1685  1.1  oster   /* initialize terminate node (Trm) */
   1686  1.1  oster   rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);
   1687  1.1  oster 
   1688  1.1  oster   /* initialize nodes which read old data (Rod) */
   1689  1.1  oster   for (i = 0; i < numDataNodes; i++) {
   1690  1.1  oster     rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, (numParityNodes * nfaults) + 1, 1, 4, 0, dag_h, "Rod", allocList);
   1691  1.1  oster     RF_ASSERT(pda != NULL);
   1692  1.1  oster     readDataNodes[i].params[0].p = pda;  /* physical disk addr desc */
   1693  1.1  oster     readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList);  /* buffer to hold old data */
   1694  1.1  oster     readDataNodes[i].params[2].v = parityStripeID;
   1695  1.1  oster     readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
   1696  1.1  oster     pda=pda->next;
   1697  1.1  oster     for (j = 0; j < readDataNodes[i].numSuccedents; j++)
   1698  1.1  oster       readDataNodes[i].propList[j] = NULL;
   1699  1.1  oster   }
   1700  1.1  oster 
   1701  1.1  oster   /* initialize nodes which read old parity (Rop) */
   1702  1.1  oster   pda = asmap->parityInfo; i = 0;
   1703  1.1  oster   for (i = 0; i < numParityNodes; i++) {
   1704  1.1  oster     RF_ASSERT(pda != NULL);
   1705  1.1  oster     rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Rop", allocList);
   1706  1.1  oster     readParityNodes[i].params[0].p = pda;
   1707  1.1  oster     readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList);    /* buffer to hold old parity */
   1708  1.1  oster     readParityNodes[i].params[2].v = parityStripeID;
   1709  1.1  oster     readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
   1710  1.1  oster     for (j = 0; j < readParityNodes[i].numSuccedents; j++)
   1711  1.1  oster       readParityNodes[i].propList[0] = NULL;
   1712  1.1  oster     pda=pda->next;
   1713  1.1  oster   }
   1714  1.1  oster 
   1715  1.1  oster   /* initialize nodes which read old Q (Roq) */
   1716  1.1  oster   if (nfaults == 2)
   1717  1.1  oster     {
   1718  1.1  oster       pda = asmap->qInfo;
   1719  1.1  oster       for (i = 0; i < numParityNodes; i++) {
   1720  1.1  oster 	RF_ASSERT(pda != NULL);
   1721  1.1  oster 	rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);
   1722  1.1  oster 	readQNodes[i].params[0].p = pda;
   1723  1.1  oster 	readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old Q */
   1724  1.1  oster 	readQNodes[i].params[2].v = parityStripeID;
   1725  1.1  oster 	readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
   1726  1.1  oster 	for (j = 0; j < readQNodes[i].numSuccedents; j++)
   1727  1.1  oster 	  readQNodes[i].propList[0] = NULL;
   1728  1.1  oster 	pda=pda->next;
   1729  1.1  oster       }
   1730  1.1  oster     }
   1731  1.1  oster 
   1732  1.1  oster   /* initialize nodes which write new data (Wnd) */
   1733  1.1  oster   pda = asmap->physInfo;
   1734  1.1  oster   for (i=0; i < numDataNodes; i++) {
   1735  1.1  oster     RF_ASSERT(pda != NULL);
   1736  1.1  oster     rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
   1737  1.1  oster     writeDataNodes[i].params[0].p = pda;                    /* physical disk addr desc */
   1738  1.1  oster     writeDataNodes[i].params[1].p = pda->bufPtr;   /* buffer holding new data to be written */
   1739  1.1  oster     writeDataNodes[i].params[2].v = parityStripeID;
   1740  1.1  oster     writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1741  1.1  oster 
   1742  1.1  oster     if (lu_flag) {
   1743  1.1  oster       /* initialize node to unlock the disk queue */
   1744  1.1  oster       rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Und", allocList);
   1745  1.1  oster       unlockDataNodes[i].params[0].p = pda;                    /* physical disk addr desc */
   1746  1.1  oster       unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
   1747  1.1  oster     }
   1748  1.1  oster 
   1749  1.1  oster     pda = pda->next;
   1750  1.1  oster   }
   1751  1.1  oster 
   1752  1.1  oster 
   1753  1.1  oster   /* initialize nodes which compute new parity and Q */
   1754  1.1  oster   /* we use the simple XOR func in the double-XOR case, and when we're accessing only a portion of one stripe unit.
   1755  1.1  oster    * the distinction between the two is that the regular XOR func assumes that the targbuf is a full SU in size,
   1756  1.1  oster    * and examines the pda associated with the buffer to decide where within the buffer to XOR the data, whereas
   1757  1.1  oster    * the simple XOR func just XORs the data into the start of the buffer.
   1758  1.1  oster    */
   1759  1.1  oster   if ((numParityNodes==2) || ((numDataNodes == 1) && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
   1760  1.1  oster     func = pfuncs->simple; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->SimpleName;
   1761  1.1  oster     if (qfuncs) {
   1762  1.1  oster       qfunc = qfuncs->simple;
   1763  1.1  oster       qname = qfuncs->SimpleName;
   1764  1.1  oster     }
   1765  1.1  oster   }
   1766  1.1  oster   else {
   1767  1.1  oster     func = pfuncs->regular; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->RegularName;
   1768  1.1  oster     if (qfuncs) { qfunc = qfuncs->regular; qname = qfuncs->RegularName;}
   1769  1.1  oster   }
   1770  1.1  oster   /* initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop} nodes, and raidPtr  */
   1771  1.1  oster   if (numParityNodes==2) {        /* double-xor case */
   1772  1.1  oster     for (i=0; i < numParityNodes; i++) {
   1773  1.1  oster       rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, name, allocList);  /* no wakeup func for xor */
   1774  1.1  oster       xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
   1775  1.1  oster       xorNodes[i].params[0]   = readDataNodes[i].params[0];
   1776  1.1  oster       xorNodes[i].params[1]   = readDataNodes[i].params[1];
   1777  1.1  oster       xorNodes[i].params[2]   = readParityNodes[i].params[0];
   1778  1.1  oster       xorNodes[i].params[3]   = readParityNodes[i].params[1];
   1779  1.1  oster       xorNodes[i].params[4]   = writeDataNodes[i].params[0];
   1780  1.1  oster       xorNodes[i].params[5]   = writeDataNodes[i].params[1];
   1781  1.1  oster       xorNodes[i].params[6].p = raidPtr;
   1782  1.1  oster       xorNodes[i].results[0] = readParityNodes[i].params[1].p;   /* use old parity buf as target buf */
   1783  1.1  oster       if (nfaults==2)
   1784  1.1  oster 	{
   1785  1.1  oster 	  rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, qname, allocList);  /* no wakeup func for xor */
   1786  1.1  oster 	  qNodes[i].params[0]   = readDataNodes[i].params[0];
   1787  1.1  oster 	  qNodes[i].params[1]   = readDataNodes[i].params[1];
   1788  1.1  oster 	  qNodes[i].params[2]   = readQNodes[i].params[0];
   1789  1.1  oster 	  qNodes[i].params[3]   = readQNodes[i].params[1];
   1790  1.1  oster 	  qNodes[i].params[4]   = writeDataNodes[i].params[0];
   1791  1.1  oster 	  qNodes[i].params[5]   = writeDataNodes[i].params[1];
   1792  1.1  oster 	  qNodes[i].params[6].p = raidPtr;
   1793  1.1  oster 	  qNodes[i].results[0] = readQNodes[i].params[1].p;   /* use old Q buf as target buf */
   1794  1.1  oster 	}
   1795  1.1  oster     }
   1796  1.1  oster   }
   1797  1.1  oster   else {
   1798  1.1  oster     /* there is only one xor node in this case */
   1799  1.1  oster     rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
   1800  1.1  oster     xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
   1801  1.1  oster     for (i=0; i < numDataNodes + 1; i++) {
   1802  1.1  oster       /* set up params related to Rod and Rop nodes */
   1803  1.1  oster       xorNodes[0].params[2*i+0] = readDataNodes[i].params[0];    /* pda */
   1804  1.1  oster       xorNodes[0].params[2*i+1] = readDataNodes[i].params[1];    /* buffer pointer */
   1805  1.1  oster     }
   1806  1.1  oster     for (i=0; i < numDataNodes; i++) {
   1807  1.1  oster       /* set up params related to Wnd and Wnp nodes */
   1808  1.1  oster       xorNodes[0].params[2*(numDataNodes+1+i)+0] = writeDataNodes[i].params[0]; /* pda */
   1809  1.1  oster       xorNodes[0].params[2*(numDataNodes+1+i)+1] = writeDataNodes[i].params[1]; /* buffer pointer */
   1810  1.1  oster     }
   1811  1.1  oster     xorNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr;  /* xor node needs to get at RAID information */
   1812  1.1  oster     xorNodes[0].results[0] = readParityNodes[0].params[1].p;
   1813  1.1  oster     if (nfaults==2)
   1814  1.1  oster       {
   1815  1.1  oster 	rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, qname, allocList);
   1816  1.1  oster 	for (i=0; i<numDataNodes; i++) {
   1817  1.1  oster 	  /* set up params related to Rod */
   1818  1.1  oster 	  qNodes[0].params[2*i+0] = readDataNodes[i].params[0];    /* pda */
   1819  1.1  oster 	  qNodes[0].params[2*i+1] = readDataNodes[i].params[1];    /* buffer pointer */
   1820  1.1  oster 	}
   1821  1.1  oster 	/* and read old q */
   1822  1.1  oster 	qNodes[0].params[2*numDataNodes + 0] = readQNodes[0].params[0];    /* pda */
   1823  1.1  oster 	qNodes[0].params[2*numDataNodes + 1] = readQNodes[0].params[1];    /* buffer pointer */
   1824  1.1  oster 	for (i=0; i < numDataNodes; i++) {
   1825  1.1  oster 	  /* set up params related to Wnd nodes */
   1826  1.1  oster 	  qNodes[0].params[2*(numDataNodes+1+i)+0] = writeDataNodes[i].params[0]; /* pda */
   1827  1.1  oster 	  qNodes[0].params[2*(numDataNodes+1+i)+1] = writeDataNodes[i].params[1]; /* buffer pointer */
   1828  1.1  oster 	}
   1829  1.1  oster 	qNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr;  /* xor node needs to get at RAID information */
   1830  1.1  oster 	qNodes[0].results[0] = readQNodes[0].params[1].p;
   1831  1.1  oster       }
   1832  1.1  oster   }
   1833  1.1  oster 
   1834  1.1  oster   /* initialize nodes which write new parity (Wnp) */
   1835  1.1  oster   pda = asmap->parityInfo;
   1836  1.1  oster   for (i=0;  i < numParityNodes; i++) {
   1837  1.1  oster     rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnp", allocList);
   1838  1.1  oster     RF_ASSERT(pda != NULL);
   1839  1.1  oster     writeParityNodes[i].params[0].p = pda;                  /* param 1 (bufPtr) filled in by xor node */
   1840  1.1  oster     writeParityNodes[i].params[1].p = xorNodes[i].results[0];     /* buffer pointer for parity write operation */
   1841  1.1  oster     writeParityNodes[i].params[2].v = parityStripeID;
   1842  1.1  oster     writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1843  1.1  oster 
   1844  1.1  oster     if (lu_flag) {
   1845  1.1  oster       /* initialize node to unlock the disk queue */
   1846  1.1  oster       rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unp", allocList);
   1847  1.1  oster       unlockParityNodes[i].params[0].p = pda;                    /* physical disk addr desc */
   1848  1.1  oster       unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
   1849  1.1  oster     }
   1850  1.1  oster 
   1851  1.1  oster     pda = pda->next;
   1852  1.1  oster   }
   1853  1.1  oster 
   1854  1.1  oster   /* initialize nodes which write new Q (Wnq) */
   1855  1.1  oster   if (nfaults == 2)
   1856  1.1  oster     {
   1857  1.1  oster       pda = asmap->qInfo;
   1858  1.1  oster       for (i=0;  i < numParityNodes; i++) {
   1859  1.1  oster 	rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnq", allocList);
   1860  1.1  oster 	RF_ASSERT(pda != NULL);
   1861  1.1  oster 	writeQNodes[i].params[0].p = pda;                  /* param 1 (bufPtr) filled in by xor node */
   1862  1.1  oster 	writeQNodes[i].params[1].p = qNodes[i].results[0];     /* buffer pointer for parity write operation */
   1863  1.1  oster 	writeQNodes[i].params[2].v = parityStripeID;
   1864  1.1  oster 	writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   1865  1.1  oster 
   1866  1.1  oster 	if (lu_flag) {
   1867  1.1  oster 	  /* initialize node to unlock the disk queue */
   1868  1.1  oster 	  rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unq", allocList);
   1869  1.1  oster 	  unlockQNodes[i].params[0].p = pda;                    /* physical disk addr desc */
   1870  1.1  oster 	  unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
   1871  1.1  oster 	}
   1872  1.1  oster 
   1873  1.1  oster 	pda = pda->next;
   1874  1.1  oster       }
   1875  1.1  oster     }
   1876  1.1  oster 
   1877  1.1  oster   /* Step 4. connect the nodes */
   1878  1.1  oster 
   1879  1.1  oster   /* connect header to block node */
   1880  1.1  oster   dag_h->succedents[0] = blockNode;
   1881  1.1  oster 
   1882  1.1  oster   /* connect block node to read old data nodes */
   1883  1.1  oster   RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
   1884  1.1  oster   for (i = 0; i < numDataNodes; i++) {
   1885  1.1  oster     blockNode->succedents[i] = &readDataNodes[i];
   1886  1.1  oster     RF_ASSERT(readDataNodes[i].numAntecedents == 1);
   1887  1.1  oster     readDataNodes[i].antecedents[0]= blockNode;
   1888  1.1  oster     readDataNodes[i].antType[0] = rf_control;
   1889  1.1  oster   }
   1890  1.1  oster 
   1891  1.1  oster   /* connect block node to read old parity nodes */
   1892  1.1  oster   for (i = 0; i < numParityNodes; i++) {
   1893  1.1  oster     blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
   1894  1.1  oster     RF_ASSERT(readParityNodes[i].numAntecedents == 1);
   1895  1.1  oster     readParityNodes[i].antecedents[0] = blockNode;
   1896  1.1  oster     readParityNodes[i].antType[0] = rf_control;
   1897  1.1  oster   }
   1898  1.1  oster 
   1899  1.1  oster   /* connect block node to read old Q nodes */
   1900  1.1  oster   if (nfaults == 2)
   1901  1.1  oster     for (i = 0; i < numParityNodes; i++) {
   1902  1.1  oster       blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
   1903  1.1  oster       RF_ASSERT(readQNodes[i].numAntecedents == 1);
   1904  1.1  oster       readQNodes[i].antecedents[0] = blockNode;
   1905  1.1  oster       readQNodes[i].antType[0] = rf_control;
   1906  1.1  oster     }
   1907  1.1  oster 
   1908  1.1  oster   /* connect read old data nodes to write new data nodes */
   1909  1.1  oster   for (i = 0; i < numDataNodes; i++) {
   1910  1.1  oster     RF_ASSERT(readDataNodes[i].numSuccedents == ((nfaults * numParityNodes) + 1));
   1911  1.1  oster     RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
   1912  1.1  oster     readDataNodes[i].succedents[0] = &writeDataNodes[i];
   1913  1.1  oster     writeDataNodes[i].antecedents[0] = &readDataNodes[i];
   1914  1.1  oster     writeDataNodes[i].antType[0] = rf_antiData;
   1915  1.1  oster   }
   1916  1.1  oster 
   1917  1.1  oster   /* connect read old data nodes to xor nodes */
   1918  1.1  oster   for (i = 0; i < numDataNodes; i++) {
   1919  1.1  oster     for (j = 0; j < numParityNodes; j++){
   1920  1.1  oster       RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
   1921  1.1  oster       readDataNodes[i].succedents[1 + j] = &xorNodes[j];
   1922  1.1  oster       xorNodes[j].antecedents[i] = &readDataNodes[i];
   1923  1.1  oster       xorNodes[j].antType[i] = rf_trueData;
   1924  1.1  oster     }
   1925  1.1  oster   }
   1926  1.1  oster 
   1927  1.1  oster   /* connect read old data nodes to q nodes */
   1928  1.1  oster   if (nfaults == 2)
   1929  1.1  oster     for (i = 0; i < numDataNodes; i++)
   1930  1.1  oster       for (j = 0; j < numParityNodes; j++){
   1931  1.1  oster 	RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
   1932  1.1  oster 	readDataNodes[i].succedents[1 + numParityNodes + j] = &qNodes[j];
   1933  1.1  oster 	qNodes[j].antecedents[i] = &readDataNodes[i];
   1934  1.1  oster 	qNodes[j].antType[i] = rf_trueData;
   1935  1.1  oster       }
   1936  1.1  oster 
   1937  1.1  oster   /* connect read old parity nodes to xor nodes */
   1938  1.1  oster   for (i = 0; i < numParityNodes; i++) {
   1939  1.1  oster     for (j = 0; j < numParityNodes; j++) {
   1940  1.1  oster       RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
   1941  1.1  oster       readParityNodes[i].succedents[j] = &xorNodes[j];
   1942  1.1  oster       xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
   1943  1.1  oster       xorNodes[j].antType[numDataNodes + i] = rf_trueData;
   1944  1.1  oster     }
   1945  1.1  oster   }
   1946  1.1  oster 
   1947  1.1  oster   /* connect read old q nodes to q nodes */
   1948  1.1  oster   if (nfaults == 2)
   1949  1.1  oster     for (i = 0; i < numParityNodes; i++) {
   1950  1.1  oster       for (j = 0; j < numParityNodes; j++) {
   1951  1.1  oster 	RF_ASSERT(readQNodes[i].numSuccedents == numParityNodes);
   1952  1.1  oster 	readQNodes[i].succedents[j] = &qNodes[j];
   1953  1.1  oster 	qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
   1954  1.1  oster 	qNodes[j].antType[numDataNodes + i] = rf_trueData;
   1955  1.1  oster       }
   1956  1.1  oster     }
   1957  1.1  oster 
   1958  1.1  oster   /* connect xor nodes to the write new parity nodes */
   1959  1.1  oster   for (i = 0; i < numParityNodes; i++) {
   1960  1.1  oster     RF_ASSERT(writeParityNodes[i].numAntecedents == numParityNodes);
   1961  1.1  oster     for (j = 0; j < numParityNodes; j++) {
   1962  1.1  oster       RF_ASSERT(xorNodes[j].numSuccedents == numParityNodes);
   1963  1.1  oster       xorNodes[i].succedents[j] = &writeParityNodes[j];
   1964  1.1  oster       writeParityNodes[j].antecedents[i] = &xorNodes[i];
   1965  1.1  oster       writeParityNodes[j].antType[i] = rf_trueData;
   1966  1.1  oster     }
   1967  1.1  oster   }
   1968  1.1  oster 
   1969  1.1  oster   /* connect q nodes to the write new q nodes */
   1970  1.1  oster   if (nfaults == 2)
   1971  1.1  oster     for (i = 0; i < numParityNodes; i++) {
   1972  1.1  oster       RF_ASSERT(writeQNodes[i].numAntecedents == numParityNodes);
   1973  1.1  oster       for (j = 0; j < numParityNodes; j++) {
   1974  1.1  oster 	RF_ASSERT(qNodes[j].numSuccedents == 1);
   1975  1.1  oster 	qNodes[i].succedents[j] = &writeQNodes[j];
   1976  1.1  oster 	writeQNodes[j].antecedents[i] = &qNodes[i];
   1977  1.1  oster 	writeQNodes[j].antType[i] = rf_trueData;
   1978  1.1  oster       }
   1979  1.1  oster     }
   1980  1.1  oster 
   1981  1.1  oster   RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
   1982  1.1  oster   RF_ASSERT(termNode->numSuccedents == 0);
   1983  1.1  oster   for (i = 0; i < numDataNodes; i++) {
   1984  1.1  oster     if (lu_flag) {
   1985  1.1  oster       /* connect write new data nodes to unlock nodes */
   1986  1.1  oster       RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
   1987  1.1  oster       RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
   1988  1.1  oster       writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
   1989  1.1  oster       unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
   1990  1.1  oster       unlockDataNodes[i].antType[0] = rf_control;
   1991  1.1  oster 
   1992  1.1  oster       /* connect unlock nodes to term node */
   1993  1.1  oster       RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
   1994  1.1  oster       unlockDataNodes[i].succedents[0] = termNode;
   1995  1.1  oster       termNode->antecedents[i] = &unlockDataNodes[i];
   1996  1.1  oster       termNode->antType[i] = rf_control;
   1997  1.1  oster     }
   1998  1.1  oster     else {
   1999  1.1  oster       /* connect write new data nodes to term node */
   2000  1.1  oster       RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
   2001  1.1  oster       RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
   2002  1.1  oster       writeDataNodes[i].succedents[0] = termNode;
   2003  1.1  oster       termNode->antecedents[i] = &writeDataNodes[i];
   2004  1.1  oster       termNode->antType[i] = rf_control;
   2005  1.1  oster     }
   2006  1.1  oster   }
   2007  1.1  oster 
   2008  1.1  oster   for (i = 0; i < numParityNodes; i++) {
   2009  1.1  oster     if (lu_flag) {
   2010  1.1  oster       /* connect write new parity nodes to unlock nodes */
   2011  1.1  oster       RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
   2012  1.1  oster       RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
   2013  1.1  oster       writeParityNodes[i].succedents[0] = &unlockParityNodes[i];
   2014  1.1  oster       unlockParityNodes[i].antecedents[0] = &writeParityNodes[i];
   2015  1.1  oster       unlockParityNodes[i].antType[0] = rf_control;
   2016  1.1  oster 
   2017  1.1  oster       /* connect unlock nodes to term node */
   2018  1.1  oster       RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
   2019  1.1  oster       unlockParityNodes[i].succedents[0] = termNode;
   2020  1.1  oster       termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i];
   2021  1.1  oster       termNode->antType[numDataNodes + i] = rf_control;
   2022  1.1  oster     }
   2023  1.1  oster     else {
   2024  1.1  oster       RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
   2025  1.1  oster       writeParityNodes[i].succedents[0] = termNode;
   2026  1.1  oster       termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
   2027  1.1  oster       termNode->antType[numDataNodes + i] = rf_control;
   2028  1.1  oster     }
   2029  1.1  oster   }
   2030  1.1  oster 
   2031  1.1  oster   if (nfaults == 2)
   2032  1.1  oster     for (i = 0; i < numParityNodes; i++) {
   2033  1.1  oster       if (lu_flag) {
   2034  1.1  oster 	/* connect write new Q nodes to unlock nodes */
   2035  1.1  oster 	RF_ASSERT(writeQNodes[i].numSuccedents == 1);
   2036  1.1  oster 	RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
   2037  1.1  oster 	writeQNodes[i].succedents[0] = &unlockQNodes[i];
   2038  1.1  oster 	unlockQNodes[i].antecedents[0] = &writeQNodes[i];
   2039  1.1  oster 	unlockQNodes[i].antType[0] = rf_control;
   2040  1.1  oster 
   2041  1.1  oster 	/* connect unlock nodes to unblock node */
   2042  1.1  oster 	RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
   2043  1.1  oster 	unlockQNodes[i].succedents[0] = termNode;
   2044  1.1  oster 	termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i];
   2045  1.1  oster 	termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
   2046  1.1  oster       }
   2047  1.1  oster       else {
   2048  1.1  oster 	RF_ASSERT(writeQNodes[i].numSuccedents == 1);
   2049  1.1  oster 	writeQNodes[i].succedents[0] = termNode;
   2050  1.1  oster 	termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
   2051  1.1  oster 	termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
   2052  1.1  oster       }
   2053  1.1  oster     }
   2054  1.1  oster }
   2055  1.1  oster 
   2056  1.1  oster 
   2057  1.1  oster 
   2058  1.1  oster /******************************************************************************
   2059  1.1  oster  * create a write graph (fault-free or degraded) for RAID level 1
   2060  1.1  oster  *
   2061  1.1  oster  * Hdr  Nil -> Wpd -> Nil -> Trm
   2062  1.1  oster  *      Nil -> Wsd ->
   2063  1.1  oster  *
   2064  1.1  oster  * The "Wpd" node writes data to the primary copy in the mirror pair
   2065  1.1  oster  * The "Wsd" node writes data to the secondary copy in the mirror pair
   2066  1.1  oster  *
   2067  1.1  oster  * Parameters:  raidPtr   - description of the physical array
   2068  1.1  oster  *              asmap     - logical & physical addresses for this access
   2069  1.1  oster  *              bp        - buffer ptr (holds write data)
   2070  1.1  oster  *              flags     - general flags (e.g. disk locking)
   2071  1.1  oster  *              allocList - list of memory allocated in DAG creation
   2072  1.1  oster  *****************************************************************************/
   2073  1.1  oster 
   2074  1.1  oster void rf_CreateRaidOneWriteDAGFwd(
   2075  1.1  oster   RF_Raid_t             *raidPtr,
   2076  1.1  oster   RF_AccessStripeMap_t  *asmap,
   2077  1.1  oster   RF_DagHeader_t        *dag_h,
   2078  1.1  oster   void                  *bp,
   2079  1.1  oster   RF_RaidAccessFlags_t   flags,
   2080  1.1  oster   RF_AllocListElem_t    *allocList)
   2081  1.1  oster {
   2082  1.1  oster   RF_DagNode_t *blockNode, *unblockNode, *termNode;
   2083  1.1  oster   RF_DagNode_t *nodes, *wndNode, *wmirNode;
   2084  1.1  oster   int nWndNodes, nWmirNodes, i;
   2085  1.1  oster   RF_ReconUnitNum_t which_ru;
   2086  1.1  oster   RF_PhysDiskAddr_t *pda, *pdaP;
   2087  1.1  oster   RF_StripeNum_t parityStripeID;
   2088  1.1  oster 
   2089  1.1  oster   parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
   2090  1.1  oster     asmap->raidAddress, &which_ru);
   2091  1.1  oster   if (rf_dagDebug) {
   2092  1.1  oster     printf("[Creating RAID level 1 write DAG]\n");
   2093  1.1  oster   }
   2094  1.1  oster 
   2095  1.1  oster   nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;  /* 2 implies access not SU aligned */
   2096  1.1  oster   nWndNodes =  (asmap->physInfo->next) ? 2 : 1;
   2097  1.1  oster 
   2098  1.1  oster   /* alloc the Wnd nodes and the Wmir node */
   2099  1.1  oster   if (asmap->numDataFailed == 1)
   2100  1.1  oster     nWndNodes--;
   2101  1.1  oster   if (asmap->numParityFailed == 1)
   2102  1.1  oster     nWmirNodes--;
   2103  1.1  oster 
   2104  1.1  oster   /* total number of nodes = nWndNodes + nWmirNodes + (block + unblock + terminator) */
   2105  1.1  oster   RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
   2106  1.1  oster   i = 0;
   2107  1.1  oster   wndNode     = &nodes[i]; i += nWndNodes;
   2108  1.1  oster   wmirNode    = &nodes[i]; i += nWmirNodes;
   2109  1.1  oster   blockNode   = &nodes[i]; i += 1;
   2110  1.1  oster   unblockNode = &nodes[i]; i += 1;
   2111  1.1  oster   termNode = &nodes[i]; i += 1;
   2112  1.1  oster   RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
   2113  1.1  oster 
   2114  1.1  oster   /* this dag can commit immediately */
   2115  1.1  oster   dag_h->numCommitNodes = 0;
   2116  1.1  oster   dag_h->numCommits = 0;
   2117  1.1  oster   dag_h->numSuccedents = 1;
   2118  1.1  oster 
   2119  1.1  oster   /* initialize the unblock and term nodes */
   2120  1.1  oster   rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Nil", allocList);
   2121  1.1  oster   rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList);
   2122  1.1  oster   rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
   2123  1.1  oster 
   2124  1.1  oster   /* initialize the wnd nodes */
   2125  1.1  oster   if (nWndNodes > 0) {
   2126  1.1  oster     pda = asmap->physInfo;
   2127  1.1  oster     for (i = 0; i < nWndNodes; i++) {
   2128  1.1  oster       rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList);
   2129  1.1  oster       RF_ASSERT(pda != NULL);
   2130  1.1  oster       wndNode[i].params[0].p = pda;
   2131  1.1  oster       wndNode[i].params[1].p = pda->bufPtr;
   2132  1.1  oster       wndNode[i].params[2].v = parityStripeID;
   2133  1.1  oster       wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   2134  1.1  oster       pda = pda->next;
   2135  1.1  oster     }
   2136  1.1  oster     RF_ASSERT(pda == NULL);
   2137  1.1  oster   }
   2138  1.1  oster 
   2139  1.1  oster   /* initialize the mirror nodes */
   2140  1.1  oster   if (nWmirNodes > 0) {
   2141  1.1  oster     pda = asmap->physInfo;
   2142  1.1  oster     pdaP = asmap->parityInfo;
   2143  1.1  oster     for (i = 0; i < nWmirNodes; i++) {
   2144  1.1  oster       rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList);
   2145  1.1  oster       RF_ASSERT(pda != NULL);
   2146  1.1  oster       wmirNode[i].params[0].p = pdaP;
   2147  1.1  oster       wmirNode[i].params[1].p = pda->bufPtr;
   2148  1.1  oster       wmirNode[i].params[2].v = parityStripeID;
   2149  1.1  oster       wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
   2150  1.1  oster       pda = pda->next;
   2151  1.1  oster       pdaP = pdaP->next;
   2152  1.1  oster     }
   2153  1.1  oster     RF_ASSERT(pda == NULL);
   2154  1.1  oster     RF_ASSERT(pdaP == NULL);
   2155  1.1  oster   }
   2156  1.1  oster 
   2157  1.1  oster   /* link the header node to the block node */
   2158  1.1  oster   RF_ASSERT(dag_h->numSuccedents == 1);
   2159  1.1  oster   RF_ASSERT(blockNode->numAntecedents == 0);
   2160  1.1  oster   dag_h->succedents[0] = blockNode;
   2161  1.1  oster 
   2162  1.1  oster   /* link the block node to the write nodes */
   2163  1.1  oster   RF_ASSERT(blockNode->numSuccedents == (nWndNodes + nWmirNodes));
   2164  1.1  oster   for (i = 0; i < nWndNodes; i++) {
   2165  1.1  oster     RF_ASSERT(wndNode[i].numAntecedents == 1);
   2166  1.1  oster     blockNode->succedents[i] = &wndNode[i];
   2167  1.1  oster     wndNode[i].antecedents[0] = blockNode;
   2168  1.1  oster     wndNode[i].antType[0] = rf_control;
   2169  1.1  oster   }
   2170  1.1  oster   for (i = 0; i < nWmirNodes; i++) {
   2171  1.1  oster     RF_ASSERT(wmirNode[i].numAntecedents == 1);
   2172  1.1  oster     blockNode->succedents[i + nWndNodes] = &wmirNode[i];
   2173  1.1  oster     wmirNode[i].antecedents[0] = blockNode;
   2174  1.1  oster     wmirNode[i].antType[0] = rf_control;
   2175  1.1  oster   }
   2176  1.1  oster 
   2177  1.1  oster   /* link the write nodes to the unblock node */
   2178  1.1  oster   RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
   2179  1.1  oster   for (i = 0; i < nWndNodes; i++) {
   2180  1.1  oster     RF_ASSERT(wndNode[i].numSuccedents == 1);
   2181  1.1  oster     wndNode[i].succedents[0] = unblockNode;
   2182  1.1  oster     unblockNode->antecedents[i] = &wndNode[i];
   2183  1.1  oster     unblockNode->antType[i] = rf_control;
   2184  1.1  oster   }
   2185  1.1  oster   for (i = 0; i < nWmirNodes; i++) {
   2186  1.1  oster     RF_ASSERT(wmirNode[i].numSuccedents == 1);
   2187  1.1  oster     wmirNode[i].succedents[0] = unblockNode;
   2188  1.1  oster     unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
   2189  1.1  oster     unblockNode->antType[i + nWndNodes] = rf_control;
   2190  1.1  oster   }
   2191  1.1  oster 
   2192  1.1  oster   /* link the unblock node to the term node */
   2193  1.1  oster   RF_ASSERT(unblockNode->numSuccedents == 1);
   2194  1.1  oster   RF_ASSERT(termNode->numAntecedents == 1);
   2195  1.1  oster   RF_ASSERT(termNode->numSuccedents == 0);
   2196  1.1  oster   unblockNode->succedents[0] = termNode;
   2197  1.1  oster   termNode->antecedents[0] = unblockNode;
   2198  1.1  oster   termNode->antType[0] = rf_control;
   2199  1.1  oster 
   2200  1.1  oster   return;
   2201  1.1  oster }
   2202