rf_parityloggingdags.c revision 1.1 1 /* $NetBSD: rf_parityloggingdags.c,v 1.1 1998/11/13 04:20:32 oster Exp $ */
2 /*
3 * Copyright (c) 1995 Carnegie-Mellon University.
4 * All rights reserved.
5 *
6 * Author: William V. Courtright II
7 *
8 * Permission to use, copy, modify and distribute this software and
9 * its documentation is hereby granted, provided that both the copyright
10 * notice and this permission notice appear in all copies of the
11 * software, derivative works or modified versions, and any portions
12 * thereof, and that both notices appear in supporting documentation.
13 *
14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17 *
18 * Carnegie Mellon requests users of this software to return to
19 *
20 * Software Distribution Coordinator or Software.Distribution (at) CS.CMU.EDU
21 * School of Computer Science
22 * Carnegie Mellon University
23 * Pittsburgh PA 15213-3890
24 *
25 * any improvements or extensions that they make and grant Carnegie the
26 * rights to redistribute these changes.
27 */
28
29 /*
30 * Log: rf_parityloggingdags.c,v
31 * Revision 1.27 1996/07/28 20:31:39 jimz
32 * i386netbsd port
33 * true/false fixup
34 *
35 * Revision 1.26 1996/07/27 23:36:08 jimz
36 * Solaris port of simulator
37 *
38 * Revision 1.25 1996/07/22 19:52:16 jimz
39 * switched node params to RF_DagParam_t, a union of
40 * a 64-bit int and a void *, for better portability
41 * attempted hpux port, but failed partway through for
42 * lack of a single C compiler capable of compiling all
43 * source files
44 *
45 * Revision 1.24 1996/06/11 13:47:21 jimz
46 * fix up for in-kernel compilation
47 *
48 * Revision 1.23 1996/06/07 22:26:27 jimz
49 * type-ify which_ru (RF_ReconUnitNum_t)
50 *
51 * Revision 1.22 1996/06/07 21:33:04 jimz
52 * begin using consistent types for sector numbers,
53 * stripe numbers, row+col numbers, recon unit numbers
54 *
55 * Revision 1.21 1996/06/02 17:31:48 jimz
56 * Moved a lot of global stuff into array structure, where it belongs.
57 * Fixed up paritylogging, pss modules in this manner. Some general
58 * code cleanup. Removed lots of dead code, some dead files.
59 *
60 * Revision 1.20 1996/05/31 22:26:54 jimz
61 * fix a lot of mapping problems, memory allocation problems
62 * found some weird lock issues, fixed 'em
63 * more code cleanup
64 *
65 * Revision 1.19 1996/05/30 11:29:41 jimz
66 * Numerous bug fixes. Stripe lock release code disagreed with the taking code
67 * about when stripes should be locked (I made it consistent: no parity, no lock)
68 * There was a lot of extra serialization of I/Os which I've removed- a lot of
69 * it was to calculate values for the cache code, which is no longer with us.
70 * More types, function, macro cleanup. Added code to properly quiesce the array
71 * on shutdown. Made a lot of stuff array-specific which was (bogusly) general
72 * before. Fixed memory allocation, freeing bugs.
73 *
74 * Revision 1.18 1996/05/27 18:56:37 jimz
75 * more code cleanup
76 * better typing
77 * compiles in all 3 environments
78 *
79 * Revision 1.17 1996/05/24 22:17:04 jimz
80 * continue code + namespace cleanup
81 * typed a bunch of flags
82 *
83 * Revision 1.16 1996/05/24 04:28:55 jimz
84 * release cleanup ckpt
85 *
86 * Revision 1.15 1996/05/23 21:46:35 jimz
87 * checkpoint in code cleanup (release prep)
88 * lots of types, function names have been fixed
89 *
90 * Revision 1.14 1996/05/23 00:33:23 jimz
91 * code cleanup: move all debug decls to rf_options.c, all extern
92 * debug decls to rf_options.h, all debug vars preceded by rf_
93 *
94 * Revision 1.13 1996/05/18 19:51:34 jimz
95 * major code cleanup- fix syntax, make some types consistent,
96 * add prototypes, clean out dead code, et cetera
97 *
98 * Revision 1.12 1996/05/08 21:01:24 jimz
99 * fixed up enum type names that were conflicting with other
100 * enums and function names (ie, "panic")
101 * future naming trends will be towards RF_ and rf_ for
102 * everything raidframe-related
103 *
104 * Revision 1.11 1996/05/03 19:42:02 wvcii
105 * added includes for dag library
106 *
107 * Revision 1.10 1995/12/12 18:10:06 jimz
108 * MIN -> RF_MIN, MAX -> RF_MAX, ASSERT -> RF_ASSERT
109 * fix 80-column brain damage in comments
110 *
111 * Revision 1.9 1995/12/06 20:55:24 wvcii
112 * added prototyping
113 * fixed bug in dag header numSuccedents count for both small and large dags
114 *
115 * Revision 1.8 1995/11/30 16:08:01 wvcii
116 * added copyright info
117 *
118 * Revision 1.7 1995/11/07 15:29:05 wvcii
119 * reorganized code, adding comments and asserts
120 * dag creation routines now generate term node
121 * encoded commit point, barrier, and antecedence types into dags
122 *
123 * Revision 1.6 1995/09/07 15:52:06 jimz
124 * noop compile when INCLUDE_PARITYLOGGING not defined
125 *
126 * Revision 1.5 1995/06/15 13:51:53 robby
127 * updated some wrong prototypes (after prototyping rf_dagutils.h)
128 *
129 * Revision 1.4 1995/06/09 13:15:05 wvcii
130 * code is now nonblocking
131 *
132 * Revision 1.3 95/05/31 13:09:14 wvcii
133 * code debug
134 *
135 * Revision 1.2 1995/05/21 15:34:14 wvcii
136 * code debug
137 *
138 * Revision 1.1 95/05/16 14:36:53 wvcii
139 * Initial revision
140 *
141 *
142 */
143
144 #include "rf_archs.h"
145
146 #if RF_INCLUDE_PARITYLOGGING > 0
147
148 /*
149 DAGs specific to parity logging are created here
150 */
151
152 #include "rf_types.h"
153 #include "rf_raid.h"
154 #include "rf_dag.h"
155 #include "rf_dagutils.h"
156 #include "rf_dagfuncs.h"
157 #include "rf_threadid.h"
158 #include "rf_debugMem.h"
159 #include "rf_paritylog.h"
160 #include "rf_memchunk.h"
161 #include "rf_general.h"
162
163 #include "rf_parityloggingdags.h"
164
165 /******************************************************************************
166 *
167 * creates a DAG to perform a large-write operation:
168 *
169 * / Rod \ / Wnd \
170 * H -- NIL- Rod - NIL - Wnd ------ NIL - T
171 * \ Rod / \ Xor - Lpo /
172 *
173 * The writes are not done until the reads complete because if they were done in
174 * parallel, a failure on one of the reads could leave the parity in an inconsistent
175 * state, so that the retry with a new DAG would produce erroneous parity.
176 *
177 * Note: this DAG has the nasty property that none of the buffers allocated for reading
178 * old data can be freed until the XOR node fires. Need to fix this.
179 *
180 * The last two arguments are the number of faults tolerated, and function for the
181 * redundancy calculation. The undo for the redundancy calc is assumed to be null
182 *
183 *****************************************************************************/
184
185 void rf_CommonCreateParityLoggingLargeWriteDAG(
186 RF_Raid_t *raidPtr,
187 RF_AccessStripeMap_t *asmap,
188 RF_DagHeader_t *dag_h,
189 void *bp,
190 RF_RaidAccessFlags_t flags,
191 RF_AllocListElem_t *allocList,
192 int nfaults,
193 int (*redFunc)(RF_DagNode_t *))
194 {
195 RF_DagNode_t *nodes, *wndNodes, *rodNodes=NULL, *syncNode, *xorNode, *lpoNode, *blockNode, *unblockNode, *termNode;
196 int nWndNodes, nRodNodes, i;
197 RF_RaidLayout_t *layoutPtr = &(raidPtr->Layout);
198 RF_AccessStripeMapHeader_t *new_asm_h[2];
199 int nodeNum, asmNum;
200 RF_ReconUnitNum_t which_ru;
201 char *sosBuffer, *eosBuffer;
202 RF_PhysDiskAddr_t *pda;
203 RF_StripeNum_t parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
204
205 if (rf_dagDebug)
206 printf("[Creating parity-logging large-write DAG]\n");
207 RF_ASSERT(nfaults == 1); /* this arch only single fault tolerant */
208 dag_h->creator = "ParityLoggingLargeWriteDAG";
209
210 /* alloc the Wnd nodes, the xor node, and the Lpo node */
211 nWndNodes = asmap->numStripeUnitsAccessed;
212 RF_CallocAndAdd(nodes, nWndNodes + 6, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
213 i = 0;
214 wndNodes = &nodes[i]; i += nWndNodes;
215 xorNode = &nodes[i]; i += 1;
216 lpoNode = &nodes[i]; i += 1;
217 blockNode = &nodes[i]; i += 1;
218 syncNode = &nodes[i]; i += 1;
219 unblockNode = &nodes[i]; i += 1;
220 termNode = &nodes[i]; i += 1;
221
222 dag_h->numCommitNodes = nWndNodes + 1;
223 dag_h->numCommits = 0;
224 dag_h->numSuccedents = 1;
225
226 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
227 if (nRodNodes > 0)
228 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
229
230 /* begin node initialization */
231 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nRodNodes + 1, 0, 0, 0, dag_h, "Nil", allocList);
232 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, nWndNodes + 1, 0, 0, dag_h, "Nil", allocList);
233 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes + 1, 0, 0, dag_h, "Nil", allocList);
234 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
235
236 /* initialize the Rod nodes */
237 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
238 if (new_asm_h[asmNum]) {
239 pda = new_asm_h[asmNum]->stripeMap->physInfo;
240 while (pda) {
241 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc,rf_DiskReadUndoFunc,rf_GenericWakeupFunc,1,1,4,0, dag_h, "Rod", allocList);
242 rodNodes[nodeNum].params[0].p = pda;
243 rodNodes[nodeNum].params[1].p = pda->bufPtr;
244 rodNodes[nodeNum].params[2].v = parityStripeID;
245 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
246 nodeNum++;
247 pda=pda->next;
248 }
249 }
250 }
251 RF_ASSERT(nodeNum == nRodNodes);
252
253 /* initialize the wnd nodes */
254 pda = asmap->physInfo;
255 for (i=0; i < nWndNodes; i++) {
256 rf_InitNode(&wndNodes[i], rf_wait, RF_TRUE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
257 RF_ASSERT(pda != NULL);
258 wndNodes[i].params[0].p = pda;
259 wndNodes[i].params[1].p = pda->bufPtr;
260 wndNodes[i].params[2].v = parityStripeID;
261 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
262 pda = pda->next;
263 }
264
265 /* initialize the redundancy node */
266 rf_InitNode(xorNode, rf_wait, RF_TRUE, redFunc, rf_NullNodeUndoFunc, NULL, 1, 1, 2*(nWndNodes+nRodNodes)+1, 1, dag_h, "Xr ", allocList);
267 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
268 for (i=0; i < nWndNodes; i++) {
269 xorNode->params[2*i+0] = wndNodes[i].params[0]; /* pda */
270 xorNode->params[2*i+1] = wndNodes[i].params[1]; /* buf ptr */
271 }
272 for (i=0; i < nRodNodes; i++) {
273 xorNode->params[2*(nWndNodes+i)+0] = rodNodes[i].params[0]; /* pda */
274 xorNode->params[2*(nWndNodes+i)+1] = rodNodes[i].params[1]; /* buf ptr */
275 }
276 xorNode->params[2*(nWndNodes+nRodNodes)].p = raidPtr; /* xor node needs to get at RAID information */
277
278 /* look for an Rod node that reads a complete SU. If none, alloc a buffer to receive the parity info.
279 * Note that we can't use a new data buffer because it will not have gotten written when the xor occurs.
280 */
281 for (i = 0; i < nRodNodes; i++)
282 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
283 break;
284 if (i == nRodNodes) {
285 RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
286 }
287 else {
288 xorNode->results[0] = rodNodes[i].params[1].p;
289 }
290
291 /* initialize the Lpo node */
292 rf_InitNode(lpoNode, rf_wait, RF_FALSE, rf_ParityLogOverwriteFunc, rf_ParityLogOverwriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Lpo", allocList);
293
294 lpoNode->params[0].p = asmap->parityInfo;
295 lpoNode->params[1].p = xorNode->results[0];
296 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must describe entire parity unit */
297
298 /* connect nodes to form graph */
299
300 /* connect dag header to block node */
301 RF_ASSERT(dag_h->numSuccedents == 1);
302 RF_ASSERT(blockNode->numAntecedents == 0);
303 dag_h->succedents[0] = blockNode;
304
305 /* connect the block node to the Rod nodes */
306 RF_ASSERT(blockNode->numSuccedents == nRodNodes + 1);
307 for (i = 0; i < nRodNodes; i++) {
308 RF_ASSERT(rodNodes[i].numAntecedents == 1);
309 blockNode->succedents[i] = &rodNodes[i];
310 rodNodes[i].antecedents[0] = blockNode;
311 rodNodes[i].antType[0] = rf_control;
312 }
313
314 /* connect the block node to the sync node */
315 /* necessary if nRodNodes == 0 */
316 RF_ASSERT(syncNode->numAntecedents == nRodNodes + 1);
317 blockNode->succedents[nRodNodes] = syncNode;
318 syncNode->antecedents[0] = blockNode;
319 syncNode->antType[0] = rf_control;
320
321 /* connect the Rod nodes to the syncNode */
322 for (i = 0; i < nRodNodes; i++) {
323 rodNodes[i].succedents[0] = syncNode;
324 syncNode->antecedents[1 + i] = &rodNodes[i];
325 syncNode->antType[1 + i] = rf_control;
326 }
327
328 /* connect the sync node to the xor node */
329 RF_ASSERT(syncNode->numSuccedents == nWndNodes + 1);
330 RF_ASSERT(xorNode->numAntecedents == 1);
331 syncNode->succedents[0] = xorNode;
332 xorNode->antecedents[0] = syncNode;
333 xorNode->antType[0] = rf_trueData; /* carry forward from sync */
334
335 /* connect the sync node to the Wnd nodes */
336 for (i = 0; i < nWndNodes; i++) {
337 RF_ASSERT(wndNodes->numAntecedents == 1);
338 syncNode->succedents[1 + i] = &wndNodes[i];
339 wndNodes[i].antecedents[0] = syncNode;
340 wndNodes[i].antType[0] = rf_control;
341 }
342
343 /* connect the xor node to the Lpo node */
344 RF_ASSERT(xorNode->numSuccedents == 1);
345 RF_ASSERT(lpoNode->numAntecedents == 1);
346 xorNode->succedents[0] = lpoNode;
347 lpoNode->antecedents[0]= xorNode;
348 lpoNode->antType[0] = rf_trueData;
349
350 /* connect the Wnd nodes to the unblock node */
351 RF_ASSERT(unblockNode->numAntecedents == nWndNodes + 1);
352 for (i = 0; i < nWndNodes; i++) {
353 RF_ASSERT(wndNodes->numSuccedents == 1);
354 wndNodes[i].succedents[0] = unblockNode;
355 unblockNode->antecedents[i] = &wndNodes[i];
356 unblockNode->antType[i] = rf_control;
357 }
358
359 /* connect the Lpo node to the unblock node */
360 RF_ASSERT(lpoNode->numSuccedents == 1);
361 lpoNode->succedents[0] = unblockNode;
362 unblockNode->antecedents[nWndNodes] = lpoNode;
363 unblockNode->antType[nWndNodes] = rf_control;
364
365 /* connect unblock node to terminator */
366 RF_ASSERT(unblockNode->numSuccedents == 1);
367 RF_ASSERT(termNode->numAntecedents == 1);
368 RF_ASSERT(termNode->numSuccedents == 0);
369 unblockNode->succedents[0] = termNode;
370 termNode->antecedents[0] = unblockNode;
371 termNode->antType[0] = rf_control;
372 }
373
374
375
376
377 /******************************************************************************
378 *
379 * creates a DAG to perform a small-write operation (either raid 5 or pq), which is as follows:
380 *
381 * Header
382 * |
383 * Block
384 * / | ... \ \
385 * / | \ \
386 * Rod Rod Rod Rop
387 * | \ /| \ / | \/ |
388 * | | | /\ |
389 * Wnd Wnd Wnd X
390 * | \ / |
391 * | \ / |
392 * \ \ / Lpo
393 * \ \ / /
394 * +-> Unblock <-+
395 * |
396 * T
397 *
398 *
399 * R = Read, W = Write, X = Xor, o = old, n = new, d = data, p = parity.
400 * When the access spans a stripe unit boundary and is less than one SU in size, there will
401 * be two Rop -- X -- Wnp branches. I call this the "double-XOR" case.
402 * The second output from each Rod node goes to the X node. In the double-XOR
403 * case, there are exactly 2 Rod nodes, and each sends one output to one X node.
404 * There is one Rod -- Wnd -- T branch for each stripe unit being updated.
405 *
406 * The block and unblock nodes are unused. See comment above CreateFaultFreeReadDAG.
407 *
408 * Note: this DAG ignores all the optimizations related to making the RMWs atomic.
409 * it also has the nasty property that none of the buffers allocated for reading
410 * old data & parity can be freed until the XOR node fires. Need to fix this.
411 *
412 * A null qfuncs indicates single fault tolerant
413 *****************************************************************************/
414
415 void rf_CommonCreateParityLoggingSmallWriteDAG(
416 RF_Raid_t *raidPtr,
417 RF_AccessStripeMap_t *asmap,
418 RF_DagHeader_t *dag_h,
419 void *bp,
420 RF_RaidAccessFlags_t flags,
421 RF_AllocListElem_t *allocList,
422 RF_RedFuncs_t *pfuncs,
423 RF_RedFuncs_t *qfuncs)
424 {
425 RF_DagNode_t *xorNodes, *blockNode, *unblockNode, *nodes;
426 RF_DagNode_t *readDataNodes, *readParityNodes;
427 RF_DagNode_t *writeDataNodes, *lpuNodes;
428 RF_DagNode_t *unlockDataNodes=NULL, *termNode;
429 RF_PhysDiskAddr_t *pda = asmap->physInfo;
430 int numDataNodes = asmap->numStripeUnitsAccessed;
431 int numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
432 int i, j, nNodes, totalNumNodes;
433 RF_ReconUnitNum_t which_ru;
434 int (*func)(RF_DagNode_t *node), (*undoFunc)(RF_DagNode_t *node);
435 int (*qfunc)(RF_DagNode_t *node);
436 char *name, *qname;
437 RF_StripeNum_t parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
438 long nfaults = qfuncs ? 2 : 1;
439 int lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */
440
441 if (rf_dagDebug) printf("[Creating parity-logging small-write DAG]\n");
442 RF_ASSERT(numDataNodes > 0);
443 RF_ASSERT(nfaults == 1);
444 dag_h->creator = "ParityLoggingSmallWriteDAG";
445
446 /* DAG creation occurs in three steps:
447 1. count the number of nodes in the DAG
448 2. create the nodes
449 3. initialize the nodes
450 4. connect the nodes
451 */
452
453 /* Step 1. compute number of nodes in the graph */
454
455 /* number of nodes:
456 a read and write for each data unit
457 a redundancy computation node for each parity node
458 a read and Lpu for each parity unit
459 a block and unblock node (2)
460 a terminator node
461 if atomic RMW
462 an unlock node for each data unit, redundancy unit
463 */
464 totalNumNodes = (2 * numDataNodes) + numParityNodes + (2 * numParityNodes) + 3;
465 if (lu_flag)
466 totalNumNodes += numDataNodes;
467
468 nNodes = numDataNodes + numParityNodes;
469
470 dag_h->numCommitNodes = numDataNodes + numParityNodes;
471 dag_h->numCommits = 0;
472 dag_h->numSuccedents = 1;
473
474 /* Step 2. create the nodes */
475 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
476 i = 0;
477 blockNode = &nodes[i]; i += 1;
478 unblockNode = &nodes[i]; i += 1;
479 readDataNodes = &nodes[i]; i += numDataNodes;
480 readParityNodes = &nodes[i]; i += numParityNodes;
481 writeDataNodes = &nodes[i]; i += numDataNodes;
482 lpuNodes = &nodes[i]; i += numParityNodes;
483 xorNodes = &nodes[i]; i += numParityNodes;
484 termNode = &nodes[i]; i += 1;
485 if (lu_flag) {
486 unlockDataNodes = &nodes[i]; i += numDataNodes;
487 }
488 RF_ASSERT(i == totalNumNodes);
489
490 /* Step 3. initialize the nodes */
491 /* initialize block node (Nil) */
492 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
493
494 /* initialize unblock node (Nil) */
495 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, nNodes, 0, 0, dag_h, "Nil", allocList);
496
497 /* initialize terminatory node (Trm) */
498 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
499
500 /* initialize nodes which read old data (Rod) */
501 for (i = 0; i < numDataNodes; i++) {
502 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, nNodes, 1, 4, 0, dag_h, "Rod", allocList);
503 RF_ASSERT(pda != NULL);
504 readDataNodes[i].params[0].p = pda; /* physical disk addr desc */
505 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old data */
506 readDataNodes[i].params[2].v = parityStripeID;
507 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
508 pda=pda->next;
509 readDataNodes[i].propList[0] = NULL;
510 readDataNodes[i].propList[1] = NULL;
511 }
512
513 /* initialize nodes which read old parity (Rop) */
514 pda = asmap->parityInfo; i = 0;
515 for (i = 0; i < numParityNodes; i++) {
516 RF_ASSERT(pda != NULL);
517 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, nNodes, 1, 4, 0, dag_h, "Rop", allocList);
518 readParityNodes[i].params[0].p = pda;
519 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old parity */
520 readParityNodes[i].params[2].v = parityStripeID;
521 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
522 readParityNodes[i].propList[0] = NULL;
523 pda=pda->next;
524 }
525
526 /* initialize nodes which write new data (Wnd) */
527 pda = asmap->physInfo;
528 for (i=0; i < numDataNodes; i++) {
529 RF_ASSERT(pda != NULL);
530 rf_InitNode(&writeDataNodes[i], rf_wait, RF_TRUE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, nNodes, 4, 0, dag_h, "Wnd", allocList);
531 writeDataNodes[i].params[0].p = pda; /* physical disk addr desc */
532 writeDataNodes[i].params[1].p = pda->bufPtr; /* buffer holding new data to be written */
533 writeDataNodes[i].params[2].v = parityStripeID;
534 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
535
536 if (lu_flag) {
537 /* initialize node to unlock the disk queue */
538 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Und", allocList);
539 unlockDataNodes[i].params[0].p = pda; /* physical disk addr desc */
540 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
541 }
542 pda = pda->next;
543 }
544
545
546 /* initialize nodes which compute new parity */
547 /* we use the simple XOR func in the double-XOR case, and when we're accessing only a portion of one stripe unit.
548 * the distinction between the two is that the regular XOR func assumes that the targbuf is a full SU in size,
549 * and examines the pda associated with the buffer to decide where within the buffer to XOR the data, whereas
550 * the simple XOR func just XORs the data into the start of the buffer.
551 */
552 if ((numParityNodes==2) || ((numDataNodes == 1) && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
553 func = pfuncs->simple; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->SimpleName;
554 if (qfuncs)
555 { qfunc = qfuncs->simple; qname = qfuncs->SimpleName;}
556 } else {
557 func = pfuncs->regular; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->RegularName;
558 if (qfuncs) { qfunc = qfuncs->regular; qname = qfuncs->RegularName;}
559 }
560 /* initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop} nodes, and raidPtr */
561 if (numParityNodes==2) { /* double-xor case */
562 for (i=0; i < numParityNodes; i++) {
563 rf_InitNode(&xorNodes[i], rf_wait, RF_TRUE, func, undoFunc, NULL, 1, nNodes, 7, 1, dag_h, name, allocList); /* no wakeup func for xor */
564 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
565 xorNodes[i].params[0] = readDataNodes[i].params[0];
566 xorNodes[i].params[1] = readDataNodes[i].params[1];
567 xorNodes[i].params[2] = readParityNodes[i].params[0];
568 xorNodes[i].params[3] = readParityNodes[i].params[1];
569 xorNodes[i].params[4] = writeDataNodes[i].params[0];
570 xorNodes[i].params[5] = writeDataNodes[i].params[1];
571 xorNodes[i].params[6].p = raidPtr;
572 xorNodes[i].results[0] = readParityNodes[i].params[1].p; /* use old parity buf as target buf */
573 }
574 }
575 else {
576 /* there is only one xor node in this case */
577 rf_InitNode(&xorNodes[0], rf_wait, RF_TRUE, func, undoFunc, NULL, 1, nNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
578 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
579 for (i=0; i < numDataNodes + 1; i++) {
580 /* set up params related to Rod and Rop nodes */
581 xorNodes[0].params[2*i+0] = readDataNodes[i].params[0]; /* pda */
582 xorNodes[0].params[2*i+1] = readDataNodes[i].params[1]; /* buffer pointer */
583 }
584 for (i=0; i < numDataNodes; i++) {
585 /* set up params related to Wnd and Wnp nodes */
586 xorNodes[0].params[2*(numDataNodes+1+i)+0] = writeDataNodes[i].params[0]; /* pda */
587 xorNodes[0].params[2*(numDataNodes+1+i)+1] = writeDataNodes[i].params[1]; /* buffer pointer */
588 }
589 xorNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr; /* xor node needs to get at RAID information */
590 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
591 }
592
593 /* initialize the log node(s) */
594 pda = asmap->parityInfo;
595 for (i = 0; i < numParityNodes; i++) {
596 RF_ASSERT(pda);
597 rf_InitNode(&lpuNodes[i], rf_wait, RF_FALSE, rf_ParityLogUpdateFunc, rf_ParityLogUpdateUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Lpu", allocList);
598 lpuNodes[i].params[0].p = pda; /* PhysDiskAddr of parity */
599 lpuNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer to parity */
600 pda = pda->next;
601 }
602
603
604 /* Step 4. connect the nodes */
605
606 /* connect header to block node */
607 RF_ASSERT(dag_h->numSuccedents == 1);
608 RF_ASSERT(blockNode->numAntecedents == 0);
609 dag_h->succedents[0] = blockNode;
610
611 /* connect block node to read old data nodes */
612 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + numParityNodes));
613 for (i = 0; i < numDataNodes; i++) {
614 blockNode->succedents[i] = &readDataNodes[i];
615 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
616 readDataNodes[i].antecedents[0]= blockNode;
617 readDataNodes[i].antType[0] = rf_control;
618 }
619
620 /* connect block node to read old parity nodes */
621 for (i = 0; i < numParityNodes; i++) {
622 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
623 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
624 readParityNodes[i].antecedents[0] = blockNode;
625 readParityNodes[i].antType[0] = rf_control;
626 }
627
628 /* connect read old data nodes to write new data nodes */
629 for (i = 0; i < numDataNodes; i++) {
630 RF_ASSERT(readDataNodes[i].numSuccedents == numDataNodes + numParityNodes);
631 for (j = 0; j < numDataNodes; j++) {
632 RF_ASSERT(writeDataNodes[j].numAntecedents == numDataNodes + numParityNodes);
633 readDataNodes[i].succedents[j] = &writeDataNodes[j];
634 writeDataNodes[j].antecedents[i] = &readDataNodes[i];
635 if (i == j)
636 writeDataNodes[j].antType[i] = rf_antiData;
637 else
638 writeDataNodes[j].antType[i] = rf_control;
639 }
640 }
641
642 /* connect read old data nodes to xor nodes */
643 for (i = 0; i < numDataNodes; i++)
644 for (j = 0; j < numParityNodes; j++){
645 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
646 readDataNodes[i].succedents[numDataNodes + j] = &xorNodes[j];
647 xorNodes[j].antecedents[i] = &readDataNodes[i];
648 xorNodes[j].antType[i] = rf_trueData;
649 }
650
651 /* connect read old parity nodes to write new data nodes */
652 for (i = 0; i < numParityNodes; i++) {
653 RF_ASSERT(readParityNodes[i].numSuccedents == numDataNodes + numParityNodes);
654 for (j = 0; j < numDataNodes; j++) {
655 readParityNodes[i].succedents[j] = &writeDataNodes[j];
656 writeDataNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
657 writeDataNodes[j].antType[numDataNodes + i] = rf_control;
658 }
659 }
660
661 /* connect read old parity nodes to xor nodes */
662 for (i = 0; i < numParityNodes; i++)
663 for (j = 0; j < numParityNodes; j++) {
664 readParityNodes[i].succedents[numDataNodes + j] = &xorNodes[j];
665 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
666 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
667 }
668
669 /* connect xor nodes to write new parity nodes */
670 for (i = 0; i < numParityNodes; i++) {
671 RF_ASSERT(xorNodes[i].numSuccedents == 1);
672 RF_ASSERT(lpuNodes[i].numAntecedents == 1);
673 xorNodes[i].succedents[0] = &lpuNodes[i];
674 lpuNodes[i].antecedents[0] = &xorNodes[i];
675 lpuNodes[i].antType[0] = rf_trueData;
676 }
677
678 for (i = 0; i < numDataNodes; i++) {
679 if (lu_flag) {
680 /* connect write new data nodes to unlock nodes */
681 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
682 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
683 writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
684 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
685 unlockDataNodes[i].antType[0] = rf_control;
686
687 /* connect unlock nodes to unblock node */
688 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
689 RF_ASSERT(unblockNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
690 unlockDataNodes[i].succedents[0] = unblockNode;
691 unblockNode->antecedents[i] = &unlockDataNodes[i];
692 unblockNode->antType[i] = rf_control;
693 }
694 else {
695 /* connect write new data nodes to unblock node */
696 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
697 RF_ASSERT(unblockNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
698 writeDataNodes[i].succedents[0] = unblockNode;
699 unblockNode->antecedents[i] = &writeDataNodes[i];
700 unblockNode->antType[i] = rf_control;
701 }
702 }
703
704 /* connect write new parity nodes to unblock node */
705 for (i = 0; i < numParityNodes; i++) {
706 RF_ASSERT(lpuNodes[i].numSuccedents == 1);
707 lpuNodes[i].succedents[0] = unblockNode;
708 unblockNode->antecedents[numDataNodes + i] = &lpuNodes[i];
709 unblockNode->antType[numDataNodes + i] = rf_control;
710 }
711
712 /* connect unblock node to terminator */
713 RF_ASSERT(unblockNode->numSuccedents == 1);
714 RF_ASSERT(termNode->numAntecedents == 1);
715 RF_ASSERT(termNode->numSuccedents == 0);
716 unblockNode->succedents[0] = termNode;
717 termNode->antecedents[0] = unblockNode;
718 termNode->antType[0] = rf_control;
719 }
720
721
722 void rf_CreateParityLoggingSmallWriteDAG(
723 RF_Raid_t *raidPtr,
724 RF_AccessStripeMap_t *asmap,
725 RF_DagHeader_t *dag_h,
726 void *bp,
727 RF_RaidAccessFlags_t flags,
728 RF_AllocListElem_t *allocList,
729 RF_RedFuncs_t *pfuncs,
730 RF_RedFuncs_t *qfuncs)
731 {
732 dag_h->creator = "ParityLoggingSmallWriteDAG";
733 rf_CommonCreateParityLoggingSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, &rf_xorFuncs, NULL);
734 }
735
736
737 void rf_CreateParityLoggingLargeWriteDAG(
738 RF_Raid_t *raidPtr,
739 RF_AccessStripeMap_t *asmap,
740 RF_DagHeader_t *dag_h,
741 void *bp,
742 RF_RaidAccessFlags_t flags,
743 RF_AllocListElem_t *allocList,
744 int nfaults,
745 int (*redFunc)(RF_DagNode_t *))
746 {
747 dag_h->creator = "ParityLoggingSmallWriteDAG";
748 rf_CommonCreateParityLoggingLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 1, rf_RegularXorFunc);
749 }
750
751 #endif /* RF_INCLUDE_PARITYLOGGING > 0 */
752