rf_dagffwr.c revision 1.1 1 /* $NetBSD: rf_dagffwr.c,v 1.1 1998/11/13 04:20:27 oster Exp $ */
2 /*
3 * Copyright (c) 1995 Carnegie-Mellon University.
4 * All rights reserved.
5 *
6 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II
7 *
8 * Permission to use, copy, modify and distribute this software and
9 * its documentation is hereby granted, provided that both the copyright
10 * notice and this permission notice appear in all copies of the
11 * software, derivative works or modified versions, and any portions
12 * thereof, and that both notices appear in supporting documentation.
13 *
14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17 *
18 * Carnegie Mellon requests users of this software to return to
19 *
20 * Software Distribution Coordinator or Software.Distribution (at) CS.CMU.EDU
21 * School of Computer Science
22 * Carnegie Mellon University
23 * Pittsburgh PA 15213-3890
24 *
25 * any improvements or extensions that they make and grant Carnegie the
26 * rights to redistribute these changes.
27 */
28
29 /*
30 * rf_dagff.c
31 *
32 * code for creating fault-free DAGs
33 *
34 * :
35 * Log: rf_dagffwr.c,v
36 * Revision 1.19 1996/07/31 15:35:24 jimz
37 * evenodd changes; bugfixes for double-degraded archs, generalize
38 * some formerly PQ-only functions
39 *
40 * Revision 1.18 1996/07/28 20:31:39 jimz
41 * i386netbsd port
42 * true/false fixup
43 *
44 * Revision 1.17 1996/07/27 18:40:24 jimz
45 * cleanup sweep
46 *
47 * Revision 1.16 1996/07/22 19:52:16 jimz
48 * switched node params to RF_DagParam_t, a union of
49 * a 64-bit int and a void *, for better portability
50 * attempted hpux port, but failed partway through for
51 * lack of a single C compiler capable of compiling all
52 * source files
53 *
54 * Revision 1.15 1996/06/11 01:27:50 jimz
55 * Fixed bug where diskthread shutdown would crash or hang. This
56 * turned out to be two distinct bugs:
57 * (1) [crash] The thread shutdown code wasn't properly waiting for
58 * all the diskthreads to complete. This caused diskthreads that were
59 * exiting+cleaning up to unlock a destroyed mutex.
60 * (2) [hang] TerminateDiskQueues wasn't locking, and DiskIODequeue
61 * only checked for termination _after_ a wakeup if the queues were
62 * empty. This was a race where the termination wakeup could be lost
63 * by the dequeueing thread, and the system would hang waiting for the
64 * thread to exit, while the thread waited for an I/O or a signal to
65 * check the termination flag.
66 *
67 * Revision 1.14 1996/06/10 22:24:01 wvcii
68 * added write dags which do not have a commit node and are
69 * used in forward and backward error recovery experiments.
70 *
71 * Revision 1.13 1996/06/07 22:26:27 jimz
72 * type-ify which_ru (RF_ReconUnitNum_t)
73 *
74 * Revision 1.12 1996/06/07 21:33:04 jimz
75 * begin using consistent types for sector numbers,
76 * stripe numbers, row+col numbers, recon unit numbers
77 *
78 * Revision 1.11 1996/05/31 22:26:54 jimz
79 * fix a lot of mapping problems, memory allocation problems
80 * found some weird lock issues, fixed 'em
81 * more code cleanup
82 *
83 * Revision 1.10 1996/05/30 11:29:41 jimz
84 * Numerous bug fixes. Stripe lock release code disagreed with the taking code
85 * about when stripes should be locked (I made it consistent: no parity, no lock)
86 * There was a lot of extra serialization of I/Os which I've removed- a lot of
87 * it was to calculate values for the cache code, which is no longer with us.
88 * More types, function, macro cleanup. Added code to properly quiesce the array
89 * on shutdown. Made a lot of stuff array-specific which was (bogusly) general
90 * before. Fixed memory allocation, freeing bugs.
91 *
92 * Revision 1.9 1996/05/27 18:56:37 jimz
93 * more code cleanup
94 * better typing
95 * compiles in all 3 environments
96 *
97 * Revision 1.8 1996/05/24 22:17:04 jimz
98 * continue code + namespace cleanup
99 * typed a bunch of flags
100 *
101 * Revision 1.7 1996/05/24 04:28:55 jimz
102 * release cleanup ckpt
103 *
104 * Revision 1.6 1996/05/23 21:46:35 jimz
105 * checkpoint in code cleanup (release prep)
106 * lots of types, function names have been fixed
107 *
108 * Revision 1.5 1996/05/23 00:33:23 jimz
109 * code cleanup: move all debug decls to rf_options.c, all extern
110 * debug decls to rf_options.h, all debug vars preceded by rf_
111 *
112 * Revision 1.4 1996/05/18 19:51:34 jimz
113 * major code cleanup- fix syntax, make some types consistent,
114 * add prototypes, clean out dead code, et cetera
115 *
116 * Revision 1.3 1996/05/15 23:23:12 wvcii
117 * fixed bug in small write read old q node succedent initialization
118 *
119 * Revision 1.2 1996/05/08 21:01:24 jimz
120 * fixed up enum type names that were conflicting with other
121 * enums and function names (ie, "panic")
122 * future naming trends will be towards RF_ and rf_ for
123 * everything raidframe-related
124 *
125 * Revision 1.1 1996/05/03 19:20:45 wvcii
126 * Initial revision
127 *
128 */
129
130 #include "rf_types.h"
131 #include "rf_raid.h"
132 #include "rf_dag.h"
133 #include "rf_dagutils.h"
134 #include "rf_dagfuncs.h"
135 #include "rf_threadid.h"
136 #include "rf_debugMem.h"
137 #include "rf_dagffrd.h"
138 #include "rf_memchunk.h"
139 #include "rf_general.h"
140 #include "rf_dagffwr.h"
141
142 /******************************************************************************
143 *
144 * General comments on DAG creation:
145 *
146 * All DAGs in this file use roll-away error recovery. Each DAG has a single
147 * commit node, usually called "Cmt." If an error occurs before the Cmt node
148 * is reached, the execution engine will halt forward execution and work
149 * backward through the graph, executing the undo functions. Assuming that
150 * each node in the graph prior to the Cmt node are undoable and atomic - or -
151 * does not make changes to permanent state, the graph will fail atomically.
152 * If an error occurs after the Cmt node executes, the engine will roll-forward
153 * through the graph, blindly executing nodes until it reaches the end.
154 * If a graph reaches the end, it is assumed to have completed successfully.
155 *
156 * A graph has only 1 Cmt node.
157 *
158 */
159
160
161 /******************************************************************************
162 *
163 * The following wrappers map the standard DAG creation interface to the
164 * DAG creation routines. Additionally, these wrappers enable experimentation
165 * with new DAG structures by providing an extra level of indirection, allowing
166 * the DAG creation routines to be replaced at this single point.
167 */
168
169
170 void rf_CreateNonRedundantWriteDAG(
171 RF_Raid_t *raidPtr,
172 RF_AccessStripeMap_t *asmap,
173 RF_DagHeader_t *dag_h,
174 void *bp,
175 RF_RaidAccessFlags_t flags,
176 RF_AllocListElem_t *allocList,
177 RF_IoType_t type)
178 {
179 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
180 RF_IO_TYPE_WRITE);
181 }
182
183 void rf_CreateRAID0WriteDAG(
184 RF_Raid_t *raidPtr,
185 RF_AccessStripeMap_t *asmap,
186 RF_DagHeader_t *dag_h,
187 void *bp,
188 RF_RaidAccessFlags_t flags,
189 RF_AllocListElem_t *allocList,
190 RF_IoType_t type)
191 {
192 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
193 RF_IO_TYPE_WRITE);
194 }
195
196 void rf_CreateSmallWriteDAG(
197 RF_Raid_t *raidPtr,
198 RF_AccessStripeMap_t *asmap,
199 RF_DagHeader_t *dag_h,
200 void *bp,
201 RF_RaidAccessFlags_t flags,
202 RF_AllocListElem_t *allocList)
203 {
204 #if RF_FORWARD > 0
205 rf_CommonCreateSmallWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
206 &rf_xorFuncs, NULL);
207 #else /* RF_FORWARD > 0 */
208 #if RF_BACKWARD > 0
209 rf_CommonCreateSmallWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
210 &rf_xorFuncs, NULL);
211 #else /* RF_BACKWARD > 0 */
212 /* "normal" rollaway */
213 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
214 &rf_xorFuncs, NULL);
215 #endif /* RF_BACKWARD > 0 */
216 #endif /* RF_FORWARD > 0 */
217 }
218
219 void rf_CreateLargeWriteDAG(
220 RF_Raid_t *raidPtr,
221 RF_AccessStripeMap_t *asmap,
222 RF_DagHeader_t *dag_h,
223 void *bp,
224 RF_RaidAccessFlags_t flags,
225 RF_AllocListElem_t *allocList)
226 {
227 #if RF_FORWARD > 0
228 rf_CommonCreateLargeWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
229 1, rf_RegularXorFunc, RF_TRUE);
230 #else /* RF_FORWARD > 0 */
231 #if RF_BACKWARD > 0
232 rf_CommonCreateLargeWriteDAGFwd(raidPtr, asmap, dag_h, bp, flags, allocList,
233 1, rf_RegularXorFunc, RF_TRUE);
234 #else /* RF_BACKWARD > 0 */
235 /* "normal" rollaway */
236 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
237 1, rf_RegularXorFunc, RF_TRUE);
238 #endif /* RF_BACKWARD > 0 */
239 #endif /* RF_FORWARD > 0 */
240 }
241
242
243 /******************************************************************************
244 *
245 * DAG creation code begins here
246 */
247
248
249 /******************************************************************************
250 *
251 * creates a DAG to perform a large-write operation:
252 *
253 * / Rod \ / Wnd \
254 * H -- block- Rod - Xor - Cmt - Wnd --- T
255 * \ Rod / \ Wnp /
256 * \[Wnq]/
257 *
258 * The XOR node also does the Q calculation in the P+Q architecture.
259 * All nodes are before the commit node (Cmt) are assumed to be atomic and
260 * undoable - or - they make no changes to permanent state.
261 *
262 * Rod = read old data
263 * Cmt = commit node
264 * Wnp = write new parity
265 * Wnd = write new data
266 * Wnq = write new "q"
267 * [] denotes optional segments in the graph
268 *
269 * Parameters: raidPtr - description of the physical array
270 * asmap - logical & physical addresses for this access
271 * bp - buffer ptr (holds write data)
272 * flags - general flags (e.g. disk locking)
273 * allocList - list of memory allocated in DAG creation
274 * nfaults - number of faults array can tolerate
275 * (equal to # redundancy units in stripe)
276 * redfuncs - list of redundancy generating functions
277 *
278 *****************************************************************************/
279
280 void rf_CommonCreateLargeWriteDAG(
281 RF_Raid_t *raidPtr,
282 RF_AccessStripeMap_t *asmap,
283 RF_DagHeader_t *dag_h,
284 void *bp,
285 RF_RaidAccessFlags_t flags,
286 RF_AllocListElem_t *allocList,
287 int nfaults,
288 int (*redFunc)(RF_DagNode_t *),
289 int allowBufferRecycle)
290 {
291 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
292 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode;
293 int nWndNodes, nRodNodes, i, nodeNum, asmNum;
294 RF_AccessStripeMapHeader_t *new_asm_h[2];
295 RF_StripeNum_t parityStripeID;
296 char *sosBuffer, *eosBuffer;
297 RF_ReconUnitNum_t which_ru;
298 RF_RaidLayout_t *layoutPtr;
299 RF_PhysDiskAddr_t *pda;
300
301 layoutPtr = &(raidPtr->Layout);
302 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr, asmap->raidAddress,
303 &which_ru);
304
305 if (rf_dagDebug) {
306 printf("[Creating large-write DAG]\n");
307 }
308 dag_h->creator = "LargeWriteDAG";
309
310 dag_h->numCommitNodes = 1;
311 dag_h->numCommits = 0;
312 dag_h->numSuccedents = 1;
313
314 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */
315 nWndNodes = asmap->numStripeUnitsAccessed;
316 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t),
317 (RF_DagNode_t *), allocList);
318 i = 0;
319 wndNodes = &nodes[i]; i += nWndNodes;
320 xorNode = &nodes[i]; i += 1;
321 wnpNode = &nodes[i]; i += 1;
322 blockNode = &nodes[i]; i += 1;
323 commitNode = &nodes[i]; i += 1;
324 termNode = &nodes[i]; i += 1;
325 if (nfaults == 2) {
326 wnqNode = &nodes[i]; i += 1;
327 }
328 else {
329 wnqNode = NULL;
330 }
331 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h,
332 &nRodNodes, &sosBuffer, &eosBuffer, allocList);
333 if (nRodNodes > 0) {
334 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t),
335 (RF_DagNode_t *), allocList);
336 }
337 else {
338 rodNodes = NULL;
339 }
340
341 /* begin node initialization */
342 if (nRodNodes > 0) {
343 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
344 NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList);
345 }
346 else {
347 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
348 NULL, 1, 0, 0, 0, dag_h, "Nil", allocList);
349 }
350
351 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL,
352 nWndNodes + nfaults, 1, 0, 0, dag_h, "Cmt", allocList);
353 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL,
354 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList);
355
356 /* initialize the Rod nodes */
357 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
358 if (new_asm_h[asmNum]) {
359 pda = new_asm_h[asmNum]->stripeMap->physInfo;
360 while (pda) {
361 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc,
362 rf_DiskReadUndoFunc,rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
363 "Rod", allocList);
364 rodNodes[nodeNum].params[0].p = pda;
365 rodNodes[nodeNum].params[1].p = pda->bufPtr;
366 rodNodes[nodeNum].params[2].v = parityStripeID;
367 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
368 0, 0, which_ru);
369 nodeNum++;
370 pda = pda->next;
371 }
372 }
373 }
374 RF_ASSERT(nodeNum == nRodNodes);
375
376 /* initialize the wnd nodes */
377 pda = asmap->physInfo;
378 for (i=0; i < nWndNodes; i++) {
379 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
380 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
381 RF_ASSERT(pda != NULL);
382 wndNodes[i].params[0].p = pda;
383 wndNodes[i].params[1].p = pda->bufPtr;
384 wndNodes[i].params[2].v = parityStripeID;
385 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
386 pda = pda->next;
387 }
388
389 /* initialize the redundancy node */
390 if (nRodNodes > 0) {
391 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
392 nRodNodes, 2 * (nWndNodes+nRodNodes) + 1, nfaults, dag_h,
393 "Xr ", allocList);
394 }
395 else {
396 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
397 1, 2 * (nWndNodes+nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);
398 }
399 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
400 for (i=0; i < nWndNodes; i++) {
401 xorNode->params[2*i+0] = wndNodes[i].params[0]; /* pda */
402 xorNode->params[2*i+1] = wndNodes[i].params[1]; /* buf ptr */
403 }
404 for (i=0; i < nRodNodes; i++) {
405 xorNode->params[2*(nWndNodes+i)+0] = rodNodes[i].params[0]; /* pda */
406 xorNode->params[2*(nWndNodes+i)+1] = rodNodes[i].params[1]; /* buf ptr */
407 }
408 /* xor node needs to get at RAID information */
409 xorNode->params[2*(nWndNodes+nRodNodes)].p = raidPtr;
410
411 /*
412 * Look for an Rod node that reads a complete SU. If none, alloc a buffer
413 * to receive the parity info. Note that we can't use a new data buffer
414 * because it will not have gotten written when the xor occurs.
415 */
416 if (allowBufferRecycle) {
417 for (i = 0; i < nRodNodes; i++) {
418 if (((RF_PhysDiskAddr_t *)rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
419 break;
420 }
421 }
422 if ((!allowBufferRecycle) || (i == nRodNodes)) {
423 RF_CallocAndAdd(xorNode->results[0], 1,
424 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
425 (void *), allocList);
426 }
427 else {
428 xorNode->results[0] = rodNodes[i].params[1].p;
429 }
430
431 /* initialize the Wnp node */
432 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
433 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);
434 wnpNode->params[0].p = asmap->parityInfo;
435 wnpNode->params[1].p = xorNode->results[0];
436 wnpNode->params[2].v = parityStripeID;
437 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
438 /* parityInfo must describe entire parity unit */
439 RF_ASSERT(asmap->parityInfo->next == NULL);
440
441 if (nfaults == 2) {
442 /*
443 * We never try to recycle a buffer for the Q calcuation
444 * in addition to the parity. This would cause two buffers
445 * to get smashed during the P and Q calculation, guaranteeing
446 * one would be wrong.
447 */
448 RF_CallocAndAdd(xorNode->results[1], 1,
449 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
450 (void *),allocList);
451 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
452 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);
453 wnqNode->params[0].p = asmap->qInfo;
454 wnqNode->params[1].p = xorNode->results[1];
455 wnqNode->params[2].v = parityStripeID;
456 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
457 /* parityInfo must describe entire parity unit */
458 RF_ASSERT(asmap->parityInfo->next == NULL);
459 }
460
461 /*
462 * Connect nodes to form graph.
463 */
464
465 /* connect dag header to block node */
466 RF_ASSERT(blockNode->numAntecedents == 0);
467 dag_h->succedents[0] = blockNode;
468
469 if (nRodNodes > 0) {
470 /* connect the block node to the Rod nodes */
471 RF_ASSERT(blockNode->numSuccedents == nRodNodes);
472 RF_ASSERT(xorNode->numAntecedents == nRodNodes);
473 for (i = 0; i < nRodNodes; i++) {
474 RF_ASSERT(rodNodes[i].numAntecedents == 1);
475 blockNode->succedents[i] = &rodNodes[i];
476 rodNodes[i].antecedents[0] = blockNode;
477 rodNodes[i].antType[0] = rf_control;
478
479 /* connect the Rod nodes to the Xor node */
480 RF_ASSERT(rodNodes[i].numSuccedents == 1);
481 rodNodes[i].succedents[0] = xorNode;
482 xorNode->antecedents[i] = &rodNodes[i];
483 xorNode->antType[i] = rf_trueData;
484 }
485 }
486 else {
487 /* connect the block node to the Xor node */
488 RF_ASSERT(blockNode->numSuccedents == 1);
489 RF_ASSERT(xorNode->numAntecedents == 1);
490 blockNode->succedents[0] = xorNode;
491 xorNode->antecedents[0] = blockNode;
492 xorNode->antType[0] = rf_control;
493 }
494
495 /* connect the xor node to the commit node */
496 RF_ASSERT(xorNode->numSuccedents == 1);
497 RF_ASSERT(commitNode->numAntecedents == 1);
498 xorNode->succedents[0] = commitNode;
499 commitNode->antecedents[0] = xorNode;
500 commitNode->antType[0] = rf_control;
501
502 /* connect the commit node to the write nodes */
503 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults);
504 for (i = 0; i < nWndNodes; i++) {
505 RF_ASSERT(wndNodes->numAntecedents == 1);
506 commitNode->succedents[i] = &wndNodes[i];
507 wndNodes[i].antecedents[0] = commitNode;
508 wndNodes[i].antType[0] = rf_control;
509 }
510 RF_ASSERT(wnpNode->numAntecedents == 1);
511 commitNode->succedents[nWndNodes] = wnpNode;
512 wnpNode->antecedents[0]= commitNode;
513 wnpNode->antType[0] = rf_trueData;
514 if (nfaults == 2) {
515 RF_ASSERT(wnqNode->numAntecedents == 1);
516 commitNode->succedents[nWndNodes + 1] = wnqNode;
517 wnqNode->antecedents[0] = commitNode;
518 wnqNode->antType[0] = rf_trueData;
519 }
520
521 /* connect the write nodes to the term node */
522 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
523 RF_ASSERT(termNode->numSuccedents == 0);
524 for (i = 0; i < nWndNodes; i++) {
525 RF_ASSERT(wndNodes->numSuccedents == 1);
526 wndNodes[i].succedents[0] = termNode;
527 termNode->antecedents[i] = &wndNodes[i];
528 termNode->antType[i] = rf_control;
529 }
530 RF_ASSERT(wnpNode->numSuccedents == 1);
531 wnpNode->succedents[0] = termNode;
532 termNode->antecedents[nWndNodes] = wnpNode;
533 termNode->antType[nWndNodes] = rf_control;
534 if (nfaults == 2) {
535 RF_ASSERT(wnqNode->numSuccedents == 1);
536 wnqNode->succedents[0] = termNode;
537 termNode->antecedents[nWndNodes + 1] = wnqNode;
538 termNode->antType[nWndNodes + 1] = rf_control;
539 }
540 }
541
542 /******************************************************************************
543 *
544 * creates a DAG to perform a small-write operation (either raid 5 or pq),
545 * which is as follows:
546 *
547 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm
548 * \- Rod X / \----> Wnd [Und]-/
549 * [\- Rod X / \---> Wnd [Und]-/]
550 * [\- Roq -> Q / \--> Wnq [Unq]-/]
551 *
552 * Rop = read old parity
553 * Rod = read old data
554 * Roq = read old "q"
555 * Cmt = commit node
556 * Und = unlock data disk
557 * Unp = unlock parity disk
558 * Unq = unlock q disk
559 * Wnp = write new parity
560 * Wnd = write new data
561 * Wnq = write new "q"
562 * [ ] denotes optional segments in the graph
563 *
564 * Parameters: raidPtr - description of the physical array
565 * asmap - logical & physical addresses for this access
566 * bp - buffer ptr (holds write data)
567 * flags - general flags (e.g. disk locking)
568 * allocList - list of memory allocated in DAG creation
569 * pfuncs - list of parity generating functions
570 * qfuncs - list of q generating functions
571 *
572 * A null qfuncs indicates single fault tolerant
573 *****************************************************************************/
574
575 void rf_CommonCreateSmallWriteDAG(
576 RF_Raid_t *raidPtr,
577 RF_AccessStripeMap_t *asmap,
578 RF_DagHeader_t *dag_h,
579 void *bp,
580 RF_RaidAccessFlags_t flags,
581 RF_AllocListElem_t *allocList,
582 RF_RedFuncs_t *pfuncs,
583 RF_RedFuncs_t *qfuncs)
584 {
585 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
586 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
587 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes;
588 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
589 int i, j, nNodes, totalNumNodes, lu_flag;
590 RF_ReconUnitNum_t which_ru;
591 int (*func)(RF_DagNode_t *), (*undoFunc)(RF_DagNode_t *);
592 int (*qfunc)(RF_DagNode_t *);
593 int numDataNodes, numParityNodes;
594 RF_StripeNum_t parityStripeID;
595 RF_PhysDiskAddr_t *pda;
596 char *name, *qname;
597 long nfaults;
598
599 nfaults = qfuncs ? 2 : 1;
600 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */
601
602 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
603 asmap->raidAddress, &which_ru);
604 pda = asmap->physInfo;
605 numDataNodes = asmap->numStripeUnitsAccessed;
606 numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
607
608 if (rf_dagDebug) {
609 printf("[Creating small-write DAG]\n");
610 }
611 RF_ASSERT(numDataNodes > 0);
612 dag_h->creator = "SmallWriteDAG";
613
614 dag_h->numCommitNodes = 1;
615 dag_h->numCommits = 0;
616 dag_h->numSuccedents = 1;
617
618 /*
619 * DAG creation occurs in four steps:
620 * 1. count the number of nodes in the DAG
621 * 2. create the nodes
622 * 3. initialize the nodes
623 * 4. connect the nodes
624 */
625
626 /*
627 * Step 1. compute number of nodes in the graph
628 */
629
630 /* number of nodes:
631 * a read and write for each data unit
632 * a redundancy computation node for each parity node (nfaults * nparity)
633 * a read and write for each parity unit
634 * a block and commit node (2)
635 * a terminate node
636 * if atomic RMW
637 * an unlock node for each data unit, redundancy unit
638 */
639 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
640 + (nfaults * 2 * numParityNodes) + 3;
641 if (lu_flag) {
642 totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
643 }
644
645 /*
646 * Step 2. create the nodes
647 */
648 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t),
649 (RF_DagNode_t *), allocList);
650 i = 0;
651 blockNode = &nodes[i]; i += 1;
652 commitNode = &nodes[i]; i += 1;
653 readDataNodes = &nodes[i]; i += numDataNodes;
654 readParityNodes = &nodes[i]; i += numParityNodes;
655 writeDataNodes = &nodes[i]; i += numDataNodes;
656 writeParityNodes = &nodes[i]; i += numParityNodes;
657 xorNodes = &nodes[i]; i += numParityNodes;
658 termNode = &nodes[i]; i += 1;
659 if (lu_flag) {
660 unlockDataNodes = &nodes[i]; i += numDataNodes;
661 unlockParityNodes = &nodes[i]; i += numParityNodes;
662 }
663 else {
664 unlockDataNodes = unlockParityNodes = NULL;
665 }
666 if (nfaults == 2) {
667 readQNodes = &nodes[i]; i += numParityNodes;
668 writeQNodes = &nodes[i]; i += numParityNodes;
669 qNodes = &nodes[i]; i += numParityNodes;
670 if (lu_flag) {
671 unlockQNodes = &nodes[i]; i += numParityNodes;
672 }
673 else {
674 unlockQNodes = NULL;
675 }
676 }
677 else {
678 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
679 }
680 RF_ASSERT(i == totalNumNodes);
681
682 /*
683 * Step 3. initialize the nodes
684 */
685 /* initialize block node (Nil) */
686 nNodes = numDataNodes + (nfaults * numParityNodes);
687 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
688 NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
689
690 /* initialize commit node (Cmt) */
691 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
692 NULL, nNodes, (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList);
693
694 /* initialize terminate node (Trm) */
695 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
696 NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);
697
698 /* initialize nodes which read old data (Rod) */
699 for (i = 0; i < numDataNodes; i++) {
700 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
701 rf_GenericWakeupFunc, (nfaults * numParityNodes), 1, 4, 0, dag_h,
702 "Rod", allocList);
703 RF_ASSERT(pda != NULL);
704 /* physical disk addr desc */
705 readDataNodes[i].params[0].p = pda;
706 /* buffer to hold old data */
707 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
708 dag_h, pda, allocList);
709 readDataNodes[i].params[2].v = parityStripeID;
710 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
711 lu_flag, 0, which_ru);
712 pda = pda->next;
713 for (j = 0; j < readDataNodes[i].numSuccedents; j++) {
714 readDataNodes[i].propList[j] = NULL;
715 }
716 }
717
718 /* initialize nodes which read old parity (Rop) */
719 pda = asmap->parityInfo; i = 0;
720 for (i = 0; i < numParityNodes; i++) {
721 RF_ASSERT(pda != NULL);
722 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc,
723 rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4,
724 0, dag_h, "Rop", allocList);
725 readParityNodes[i].params[0].p = pda;
726 /* buffer to hold old parity */
727 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
728 dag_h, pda, allocList);
729 readParityNodes[i].params[2].v = parityStripeID;
730 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
731 lu_flag, 0, which_ru);
732 pda = pda->next;
733 for (j = 0; j < readParityNodes[i].numSuccedents; j++) {
734 readParityNodes[i].propList[0] = NULL;
735 }
736 }
737
738 /* initialize nodes which read old Q (Roq) */
739 if (nfaults == 2) {
740 pda = asmap->qInfo;
741 for (i = 0; i < numParityNodes; i++) {
742 RF_ASSERT(pda != NULL);
743 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
744 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);
745 readQNodes[i].params[0].p = pda;
746 /* buffer to hold old Q */
747 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda,
748 allocList);
749 readQNodes[i].params[2].v = parityStripeID;
750 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
751 lu_flag, 0, which_ru);
752 pda = pda->next;
753 for (j = 0; j < readQNodes[i].numSuccedents; j++) {
754 readQNodes[i].propList[0] = NULL;
755 }
756 }
757 }
758
759 /* initialize nodes which write new data (Wnd) */
760 pda = asmap->physInfo;
761 for (i=0; i < numDataNodes; i++) {
762 RF_ASSERT(pda != NULL);
763 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
764 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
765 "Wnd", allocList);
766 /* physical disk addr desc */
767 writeDataNodes[i].params[0].p = pda;
768 /* buffer holding new data to be written */
769 writeDataNodes[i].params[1].p = pda->bufPtr;
770 writeDataNodes[i].params[2].v = parityStripeID;
771 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
772 0, 0, which_ru);
773 if (lu_flag) {
774 /* initialize node to unlock the disk queue */
775 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
776 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
777 "Und", allocList);
778 /* physical disk addr desc */
779 unlockDataNodes[i].params[0].p = pda;
780 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
781 0, lu_flag, which_ru);
782 }
783 pda = pda->next;
784 }
785
786 /*
787 * Initialize nodes which compute new parity and Q.
788 */
789 /*
790 * We use the simple XOR func in the double-XOR case, and when
791 * we're accessing only a portion of one stripe unit. The distinction
792 * between the two is that the regular XOR func assumes that the targbuf
793 * is a full SU in size, and examines the pda associated with the buffer
794 * to decide where within the buffer to XOR the data, whereas
795 * the simple XOR func just XORs the data into the start of the buffer.
796 */
797 if ((numParityNodes==2) || ((numDataNodes == 1)
798 && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit)))
799 {
800 func = pfuncs->simple; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->SimpleName;
801 if (qfuncs) {
802 qfunc = qfuncs->simple;
803 qname = qfuncs->SimpleName;
804 }
805 else {
806 qfunc = NULL;
807 qname = NULL;
808 }
809 }
810 else {
811 func = pfuncs->regular;
812 undoFunc = rf_NullNodeUndoFunc;
813 name = pfuncs->RegularName;
814 if (qfuncs) {
815 qfunc = qfuncs->regular;
816 qname = qfuncs->RegularName;
817 }
818 else {
819 qfunc = NULL;
820 qname = NULL;
821 }
822 }
823 /*
824 * Initialize the xor nodes: params are {pda,buf}
825 * from {Rod,Wnd,Rop} nodes, and raidPtr
826 */
827 if (numParityNodes==2) {
828 /* double-xor case */
829 for (i=0; i < numParityNodes; i++) {
830 /* note: no wakeup func for xor */
831 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL,
832 1, (numDataNodes + numParityNodes), 7, 1, dag_h, name, allocList);
833 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
834 xorNodes[i].params[0] = readDataNodes[i].params[0];
835 xorNodes[i].params[1] = readDataNodes[i].params[1];
836 xorNodes[i].params[2] = readParityNodes[i].params[0];
837 xorNodes[i].params[3] = readParityNodes[i].params[1];
838 xorNodes[i].params[4] = writeDataNodes[i].params[0];
839 xorNodes[i].params[5] = writeDataNodes[i].params[1];
840 xorNodes[i].params[6].p = raidPtr;
841 /* use old parity buf as target buf */
842 xorNodes[i].results[0] = readParityNodes[i].params[1].p;
843 if (nfaults == 2) {
844 /* note: no wakeup func for qor */
845 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
846 (numDataNodes + numParityNodes), 7, 1, dag_h, qname, allocList);
847 qNodes[i].params[0] = readDataNodes[i].params[0];
848 qNodes[i].params[1] = readDataNodes[i].params[1];
849 qNodes[i].params[2] = readQNodes[i].params[0];
850 qNodes[i].params[3] = readQNodes[i].params[1];
851 qNodes[i].params[4] = writeDataNodes[i].params[0];
852 qNodes[i].params[5] = writeDataNodes[i].params[1];
853 qNodes[i].params[6].p = raidPtr;
854 /* use old Q buf as target buf */
855 qNodes[i].results[0] = readQNodes[i].params[1].p;
856 }
857 }
858 }
859 else {
860 /* there is only one xor node in this case */
861 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, 1,
862 (numDataNodes + numParityNodes),
863 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
864 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
865 for (i=0; i < numDataNodes + 1; i++) {
866 /* set up params related to Rod and Rop nodes */
867 xorNodes[0].params[2*i+0] = readDataNodes[i].params[0]; /* pda */
868 xorNodes[0].params[2*i+1] = readDataNodes[i].params[1]; /* buffer ptr */
869 }
870 for (i=0; i < numDataNodes; i++) {
871 /* set up params related to Wnd and Wnp nodes */
872 xorNodes[0].params[2*(numDataNodes+1+i)+0] = /* pda */
873 writeDataNodes[i].params[0];
874 xorNodes[0].params[2*(numDataNodes+1+i)+1] = /* buffer ptr */
875 writeDataNodes[i].params[1];
876 }
877 /* xor node needs to get at RAID information */
878 xorNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr;
879 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
880 if (nfaults == 2) {
881 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
882 (numDataNodes + numParityNodes),
883 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h,
884 qname, allocList);
885 for (i=0; i<numDataNodes; i++) {
886 /* set up params related to Rod */
887 qNodes[0].params[2*i+0] = readDataNodes[i].params[0]; /* pda */
888 qNodes[0].params[2*i+1] = readDataNodes[i].params[1]; /* buffer ptr */
889 }
890 /* and read old q */
891 qNodes[0].params[2*numDataNodes + 0] = /* pda */
892 readQNodes[0].params[0];
893 qNodes[0].params[2*numDataNodes + 1] = /* buffer ptr */
894 readQNodes[0].params[1];
895 for (i=0; i < numDataNodes; i++) {
896 /* set up params related to Wnd nodes */
897 qNodes[0].params[2*(numDataNodes+1+i)+0] = /* pda */
898 writeDataNodes[i].params[0];
899 qNodes[0].params[2*(numDataNodes+1+i)+1] = /* buffer ptr */
900 writeDataNodes[i].params[1];
901 }
902 /* xor node needs to get at RAID information */
903 qNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr;
904 qNodes[0].results[0] = readQNodes[0].params[1].p;
905 }
906 }
907
908 /* initialize nodes which write new parity (Wnp) */
909 pda = asmap->parityInfo;
910 for (i=0; i < numParityNodes; i++) {
911 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
912 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
913 "Wnp", allocList);
914 RF_ASSERT(pda != NULL);
915 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr) filled in by xor node */
916 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for parity write operation */
917 writeParityNodes[i].params[2].v = parityStripeID;
918 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
919 0, 0, which_ru);
920 if (lu_flag) {
921 /* initialize node to unlock the disk queue */
922 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
923 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
924 "Unp", allocList);
925 unlockParityNodes[i].params[0].p = pda; /* physical disk addr desc */
926 unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
927 0, lu_flag, which_ru);
928 }
929 pda = pda->next;
930 }
931
932 /* initialize nodes which write new Q (Wnq) */
933 if (nfaults == 2) {
934 pda = asmap->qInfo;
935 for (i=0; i < numParityNodes; i++) {
936 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
937 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
938 "Wnq", allocList);
939 RF_ASSERT(pda != NULL);
940 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr) filled in by xor node */
941 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for parity write operation */
942 writeQNodes[i].params[2].v = parityStripeID;
943 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
944 0, 0, which_ru);
945 if (lu_flag) {
946 /* initialize node to unlock the disk queue */
947 rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
948 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
949 "Unq", allocList);
950 unlockQNodes[i].params[0].p = pda; /* physical disk addr desc */
951 unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
952 0, lu_flag, which_ru);
953 }
954 pda = pda->next;
955 }
956 }
957
958 /*
959 * Step 4. connect the nodes.
960 */
961
962 /* connect header to block node */
963 dag_h->succedents[0] = blockNode;
964
965 /* connect block node to read old data nodes */
966 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
967 for (i = 0; i < numDataNodes; i++) {
968 blockNode->succedents[i] = &readDataNodes[i];
969 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
970 readDataNodes[i].antecedents[0]= blockNode;
971 readDataNodes[i].antType[0] = rf_control;
972 }
973
974 /* connect block node to read old parity nodes */
975 for (i = 0; i < numParityNodes; i++) {
976 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
977 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
978 readParityNodes[i].antecedents[0] = blockNode;
979 readParityNodes[i].antType[0] = rf_control;
980 }
981
982 /* connect block node to read old Q nodes */
983 if (nfaults == 2) {
984 for (i = 0; i < numParityNodes; i++) {
985 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
986 RF_ASSERT(readQNodes[i].numAntecedents == 1);
987 readQNodes[i].antecedents[0] = blockNode;
988 readQNodes[i].antType[0] = rf_control;
989 }
990 }
991
992 /* connect read old data nodes to xor nodes */
993 for (i = 0; i < numDataNodes; i++) {
994 RF_ASSERT(readDataNodes[i].numSuccedents == (nfaults * numParityNodes));
995 for (j = 0; j < numParityNodes; j++){
996 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
997 readDataNodes[i].succedents[j] = &xorNodes[j];
998 xorNodes[j].antecedents[i] = &readDataNodes[i];
999 xorNodes[j].antType[i] = rf_trueData;
1000 }
1001 }
1002
1003 /* connect read old data nodes to q nodes */
1004 if (nfaults == 2) {
1005 for (i = 0; i < numDataNodes; i++) {
1006 for (j = 0; j < numParityNodes; j++) {
1007 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
1008 readDataNodes[i].succedents[numParityNodes + j] = &qNodes[j];
1009 qNodes[j].antecedents[i] = &readDataNodes[i];
1010 qNodes[j].antType[i] = rf_trueData;
1011 }
1012 }
1013 }
1014
1015 /* connect read old parity nodes to xor nodes */
1016 for (i = 0; i < numParityNodes; i++) {
1017 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
1018 for (j = 0; j < numParityNodes; j++) {
1019 readParityNodes[i].succedents[j] = &xorNodes[j];
1020 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
1021 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
1022 }
1023 }
1024
1025 /* connect read old q nodes to q nodes */
1026 if (nfaults == 2) {
1027 for (i = 0; i < numParityNodes; i++) {
1028 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
1029 for (j = 0; j < numParityNodes; j++) {
1030 readQNodes[i].succedents[j] = &qNodes[j];
1031 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
1032 qNodes[j].antType[numDataNodes + i] = rf_trueData;
1033 }
1034 }
1035 }
1036
1037 /* connect xor nodes to commit node */
1038 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes));
1039 for (i = 0; i < numParityNodes; i++) {
1040 RF_ASSERT(xorNodes[i].numSuccedents == 1);
1041 xorNodes[i].succedents[0] = commitNode;
1042 commitNode->antecedents[i] = &xorNodes[i];
1043 commitNode->antType[i] = rf_control;
1044 }
1045
1046 /* connect q nodes to commit node */
1047 if (nfaults == 2) {
1048 for (i = 0; i < numParityNodes; i++) {
1049 RF_ASSERT(qNodes[i].numSuccedents == 1);
1050 qNodes[i].succedents[0] = commitNode;
1051 commitNode->antecedents[i + numParityNodes] = &qNodes[i];
1052 commitNode->antType[i + numParityNodes] = rf_control;
1053 }
1054 }
1055
1056 /* connect commit node to write nodes */
1057 RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes)));
1058 for (i = 0; i < numDataNodes; i++) {
1059 RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
1060 commitNode->succedents[i] = &writeDataNodes[i];
1061 writeDataNodes[i].antecedents[0] = commitNode;
1062 writeDataNodes[i].antType[0] = rf_trueData;
1063 }
1064 for (i = 0; i < numParityNodes; i++) {
1065 RF_ASSERT(writeParityNodes[i].numAntecedents == 1);
1066 commitNode->succedents[i + numDataNodes] = &writeParityNodes[i];
1067 writeParityNodes[i].antecedents[0] = commitNode;
1068 writeParityNodes[i].antType[0] = rf_trueData;
1069 }
1070 if (nfaults == 2) {
1071 for (i = 0; i < numParityNodes; i++) {
1072 RF_ASSERT(writeQNodes[i].numAntecedents == 1);
1073 commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i];
1074 writeQNodes[i].antecedents[0] = commitNode;
1075 writeQNodes[i].antType[0] = rf_trueData;
1076 }
1077 }
1078
1079 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
1080 RF_ASSERT(termNode->numSuccedents == 0);
1081 for (i = 0; i < numDataNodes; i++) {
1082 if (lu_flag) {
1083 /* connect write new data nodes to unlock nodes */
1084 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1085 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
1086 writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
1087 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
1088 unlockDataNodes[i].antType[0] = rf_control;
1089
1090 /* connect unlock nodes to term node */
1091 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
1092 unlockDataNodes[i].succedents[0] = termNode;
1093 termNode->antecedents[i] = &unlockDataNodes[i];
1094 termNode->antType[i] = rf_control;
1095 }
1096 else {
1097 /* connect write new data nodes to term node */
1098 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1099 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
1100 writeDataNodes[i].succedents[0] = termNode;
1101 termNode->antecedents[i] = &writeDataNodes[i];
1102 termNode->antType[i] = rf_control;
1103 }
1104 }
1105
1106 for (i = 0; i < numParityNodes; i++) {
1107 if (lu_flag) {
1108 /* connect write new parity nodes to unlock nodes */
1109 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1110 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
1111 writeParityNodes[i].succedents[0] = &unlockParityNodes[i];
1112 unlockParityNodes[i].antecedents[0] = &writeParityNodes[i];
1113 unlockParityNodes[i].antType[0] = rf_control;
1114
1115 /* connect unlock nodes to term node */
1116 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
1117 unlockParityNodes[i].succedents[0] = termNode;
1118 termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i];
1119 termNode->antType[numDataNodes + i] = rf_control;
1120 }
1121 else {
1122 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1123 writeParityNodes[i].succedents[0] = termNode;
1124 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
1125 termNode->antType[numDataNodes + i] = rf_control;
1126 }
1127 }
1128
1129 if (nfaults == 2) {
1130 for (i = 0; i < numParityNodes; i++) {
1131 if (lu_flag) {
1132 /* connect write new Q nodes to unlock nodes */
1133 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1134 RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
1135 writeQNodes[i].succedents[0] = &unlockQNodes[i];
1136 unlockQNodes[i].antecedents[0] = &writeQNodes[i];
1137 unlockQNodes[i].antType[0] = rf_control;
1138
1139 /* connect unlock nodes to unblock node */
1140 RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
1141 unlockQNodes[i].succedents[0] = termNode;
1142 termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i];
1143 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1144 }
1145 else {
1146 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1147 writeQNodes[i].succedents[0] = termNode;
1148 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
1149 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1150 }
1151 }
1152 }
1153 }
1154
1155
1156 /******************************************************************************
1157 * create a write graph (fault-free or degraded) for RAID level 1
1158 *
1159 * Hdr -> Commit -> Wpd -> Nil -> Trm
1160 * -> Wsd ->
1161 *
1162 * The "Wpd" node writes data to the primary copy in the mirror pair
1163 * The "Wsd" node writes data to the secondary copy in the mirror pair
1164 *
1165 * Parameters: raidPtr - description of the physical array
1166 * asmap - logical & physical addresses for this access
1167 * bp - buffer ptr (holds write data)
1168 * flags - general flags (e.g. disk locking)
1169 * allocList - list of memory allocated in DAG creation
1170 *****************************************************************************/
1171
1172 void rf_CreateRaidOneWriteDAG(
1173 RF_Raid_t *raidPtr,
1174 RF_AccessStripeMap_t *asmap,
1175 RF_DagHeader_t *dag_h,
1176 void *bp,
1177 RF_RaidAccessFlags_t flags,
1178 RF_AllocListElem_t *allocList)
1179 {
1180 RF_DagNode_t *unblockNode, *termNode, *commitNode;
1181 RF_DagNode_t *nodes, *wndNode, *wmirNode;
1182 int nWndNodes, nWmirNodes, i;
1183 RF_ReconUnitNum_t which_ru;
1184 RF_PhysDiskAddr_t *pda, *pdaP;
1185 RF_StripeNum_t parityStripeID;
1186
1187 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1188 asmap->raidAddress, &which_ru);
1189 if (rf_dagDebug) {
1190 printf("[Creating RAID level 1 write DAG]\n");
1191 }
1192 dag_h->creator = "RaidOneWriteDAG";
1193
1194 /* 2 implies access not SU aligned */
1195 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
1196 nWndNodes = (asmap->physInfo->next) ? 2 : 1;
1197
1198 /* alloc the Wnd nodes and the Wmir node */
1199 if (asmap->numDataFailed == 1)
1200 nWndNodes--;
1201 if (asmap->numParityFailed == 1)
1202 nWmirNodes--;
1203
1204 /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock + terminator) */
1205 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t),
1206 (RF_DagNode_t *), allocList);
1207 i = 0;
1208 wndNode = &nodes[i]; i += nWndNodes;
1209 wmirNode = &nodes[i]; i += nWmirNodes;
1210 commitNode = &nodes[i]; i += 1;
1211 unblockNode = &nodes[i]; i += 1;
1212 termNode = &nodes[i]; i += 1;
1213 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
1214
1215 /* this dag can commit immediately */
1216 dag_h->numCommitNodes = 1;
1217 dag_h->numCommits = 0;
1218 dag_h->numSuccedents = 1;
1219
1220 /* initialize the commit, unblock, and term nodes */
1221 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
1222 NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Cmt", allocList);
1223 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
1224 NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList);
1225 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
1226 NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
1227
1228 /* initialize the wnd nodes */
1229 if (nWndNodes > 0) {
1230 pda = asmap->physInfo;
1231 for (i = 0; i < nWndNodes; i++) {
1232 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1233 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList);
1234 RF_ASSERT(pda != NULL);
1235 wndNode[i].params[0].p = pda;
1236 wndNode[i].params[1].p = pda->bufPtr;
1237 wndNode[i].params[2].v = parityStripeID;
1238 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1239 pda = pda->next;
1240 }
1241 RF_ASSERT(pda == NULL);
1242 }
1243
1244 /* initialize the mirror nodes */
1245 if (nWmirNodes > 0) {
1246 pda = asmap->physInfo;
1247 pdaP = asmap->parityInfo;
1248 for (i = 0; i < nWmirNodes; i++) {
1249 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1250 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList);
1251 RF_ASSERT(pda != NULL);
1252 wmirNode[i].params[0].p = pdaP;
1253 wmirNode[i].params[1].p = pda->bufPtr;
1254 wmirNode[i].params[2].v = parityStripeID;
1255 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1256 pda = pda->next;
1257 pdaP = pdaP->next;
1258 }
1259 RF_ASSERT(pda == NULL);
1260 RF_ASSERT(pdaP == NULL);
1261 }
1262
1263 /* link the header node to the commit node */
1264 RF_ASSERT(dag_h->numSuccedents == 1);
1265 RF_ASSERT(commitNode->numAntecedents == 0);
1266 dag_h->succedents[0] = commitNode;
1267
1268 /* link the commit node to the write nodes */
1269 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes));
1270 for (i = 0; i < nWndNodes; i++) {
1271 RF_ASSERT(wndNode[i].numAntecedents == 1);
1272 commitNode->succedents[i] = &wndNode[i];
1273 wndNode[i].antecedents[0] = commitNode;
1274 wndNode[i].antType[0] = rf_control;
1275 }
1276 for (i = 0; i < nWmirNodes; i++) {
1277 RF_ASSERT(wmirNode[i].numAntecedents == 1);
1278 commitNode->succedents[i + nWndNodes] = &wmirNode[i];
1279 wmirNode[i].antecedents[0] = commitNode;
1280 wmirNode[i].antType[0] = rf_control;
1281 }
1282
1283 /* link the write nodes to the unblock node */
1284 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
1285 for (i = 0; i < nWndNodes; i++) {
1286 RF_ASSERT(wndNode[i].numSuccedents == 1);
1287 wndNode[i].succedents[0] = unblockNode;
1288 unblockNode->antecedents[i] = &wndNode[i];
1289 unblockNode->antType[i] = rf_control;
1290 }
1291 for (i = 0; i < nWmirNodes; i++) {
1292 RF_ASSERT(wmirNode[i].numSuccedents == 1);
1293 wmirNode[i].succedents[0] = unblockNode;
1294 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
1295 unblockNode->antType[i + nWndNodes] = rf_control;
1296 }
1297
1298 /* link the unblock node to the term node */
1299 RF_ASSERT(unblockNode->numSuccedents == 1);
1300 RF_ASSERT(termNode->numAntecedents == 1);
1301 RF_ASSERT(termNode->numSuccedents == 0);
1302 unblockNode->succedents[0] = termNode;
1303 termNode->antecedents[0] = unblockNode;
1304 termNode->antType[0] = rf_control;
1305 }
1306
1307
1308
1309 /* DAGs which have no commit points.
1310 *
1311 * The following DAGs are used in forward and backward error recovery experiments.
1312 * They are identical to the DAGs above this comment with the exception that the
1313 * the commit points have been removed.
1314 */
1315
1316
1317
1318 void rf_CommonCreateLargeWriteDAGFwd(
1319 RF_Raid_t *raidPtr,
1320 RF_AccessStripeMap_t *asmap,
1321 RF_DagHeader_t *dag_h,
1322 void *bp,
1323 RF_RaidAccessFlags_t flags,
1324 RF_AllocListElem_t *allocList,
1325 int nfaults,
1326 int (*redFunc)(RF_DagNode_t *),
1327 int allowBufferRecycle)
1328 {
1329 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
1330 RF_DagNode_t *wnqNode, *blockNode, *syncNode, *termNode;
1331 int nWndNodes, nRodNodes, i, nodeNum, asmNum;
1332 RF_AccessStripeMapHeader_t *new_asm_h[2];
1333 RF_StripeNum_t parityStripeID;
1334 char *sosBuffer, *eosBuffer;
1335 RF_ReconUnitNum_t which_ru;
1336 RF_RaidLayout_t *layoutPtr;
1337 RF_PhysDiskAddr_t *pda;
1338
1339 layoutPtr = &(raidPtr->Layout);
1340 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
1341
1342 if (rf_dagDebug)
1343 printf("[Creating large-write DAG]\n");
1344 dag_h->creator = "LargeWriteDAGFwd";
1345
1346 dag_h->numCommitNodes = 0;
1347 dag_h->numCommits = 0;
1348 dag_h->numSuccedents = 1;
1349
1350 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */
1351 nWndNodes = asmap->numStripeUnitsAccessed;
1352 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1353 i = 0;
1354 wndNodes = &nodes[i]; i += nWndNodes;
1355 xorNode = &nodes[i]; i += 1;
1356 wnpNode = &nodes[i]; i += 1;
1357 blockNode = &nodes[i]; i += 1;
1358 syncNode = &nodes[i]; i += 1;
1359 termNode = &nodes[i]; i += 1;
1360 if (nfaults == 2) {
1361 wnqNode = &nodes[i]; i += 1;
1362 }
1363 else {
1364 wnqNode = NULL;
1365 }
1366 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
1367 if (nRodNodes > 0) {
1368 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1369 }
1370 else {
1371 rodNodes = NULL;
1372 }
1373
1374 /* begin node initialization */
1375 if (nRodNodes > 0) {
1376 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList);
1377 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes, 0, 0, dag_h, "Nil", allocList);
1378 }
1379 else {
1380 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil", allocList);
1381 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, 1, 0, 0, dag_h, "Nil", allocList);
1382 }
1383
1384 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList);
1385
1386 /* initialize the Rod nodes */
1387 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
1388 if (new_asm_h[asmNum]) {
1389 pda = new_asm_h[asmNum]->stripeMap->physInfo;
1390 while (pda) {
1391 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Rod", allocList);
1392 rodNodes[nodeNum].params[0].p = pda;
1393 rodNodes[nodeNum].params[1].p = pda->bufPtr;
1394 rodNodes[nodeNum].params[2].v = parityStripeID;
1395 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1396 nodeNum++;
1397 pda=pda->next;
1398 }
1399 }
1400 }
1401 RF_ASSERT(nodeNum == nRodNodes);
1402
1403 /* initialize the wnd nodes */
1404 pda = asmap->physInfo;
1405 for (i=0; i < nWndNodes; i++) {
1406 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
1407 RF_ASSERT(pda != NULL);
1408 wndNodes[i].params[0].p = pda;
1409 wndNodes[i].params[1].p = pda->bufPtr;
1410 wndNodes[i].params[2].v = parityStripeID;
1411 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1412 pda = pda->next;
1413 }
1414
1415 /* initialize the redundancy node */
1416 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, nfaults, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);
1417 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
1418 for (i=0; i < nWndNodes; i++) {
1419 xorNode->params[2*i+0] = wndNodes[i].params[0]; /* pda */
1420 xorNode->params[2*i+1] = wndNodes[i].params[1]; /* buf ptr */
1421 }
1422 for (i=0; i < nRodNodes; i++) {
1423 xorNode->params[2*(nWndNodes+i)+0] = rodNodes[i].params[0]; /* pda */
1424 xorNode->params[2*(nWndNodes+i)+1] = rodNodes[i].params[1]; /* buf ptr */
1425 }
1426 xorNode->params[2*(nWndNodes+nRodNodes)].p = raidPtr; /* xor node needs to get at RAID information */
1427
1428 /* look for an Rod node that reads a complete SU. If none, alloc a buffer to receive the parity info.
1429 * Note that we can't use a new data buffer because it will not have gotten written when the xor occurs.
1430 */
1431 if (allowBufferRecycle) {
1432 for (i = 0; i < nRodNodes; i++)
1433 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
1434 break;
1435 }
1436 if ((!allowBufferRecycle) || (i == nRodNodes)) {
1437 RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
1438 }
1439 else
1440 xorNode->results[0] = rodNodes[i].params[1].p;
1441
1442 /* initialize the Wnp node */
1443 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);
1444 wnpNode->params[0].p = asmap->parityInfo;
1445 wnpNode->params[1].p = xorNode->results[0];
1446 wnpNode->params[2].v = parityStripeID;
1447 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1448 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must describe entire parity unit */
1449
1450 if (nfaults == 2)
1451 {
1452 /* we never try to recycle a buffer for the Q calcuation in addition to the parity.
1453 This would cause two buffers to get smashed during the P and Q calculation,
1454 guaranteeing one would be wrong.
1455 */
1456 RF_CallocAndAdd(xorNode->results[1], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
1457 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);
1458 wnqNode->params[0].p = asmap->qInfo;
1459 wnqNode->params[1].p = xorNode->results[1];
1460 wnqNode->params[2].v = parityStripeID;
1461 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1462 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must describe entire parity unit */
1463 }
1464
1465
1466 /* connect nodes to form graph */
1467
1468 /* connect dag header to block node */
1469 RF_ASSERT(blockNode->numAntecedents == 0);
1470 dag_h->succedents[0] = blockNode;
1471
1472 if (nRodNodes > 0) {
1473 /* connect the block node to the Rod nodes */
1474 RF_ASSERT(blockNode->numSuccedents == nRodNodes);
1475 RF_ASSERT(syncNode->numAntecedents == nRodNodes);
1476 for (i = 0; i < nRodNodes; i++) {
1477 RF_ASSERT(rodNodes[i].numAntecedents == 1);
1478 blockNode->succedents[i] = &rodNodes[i];
1479 rodNodes[i].antecedents[0] = blockNode;
1480 rodNodes[i].antType[0] = rf_control;
1481
1482 /* connect the Rod nodes to the Nil node */
1483 RF_ASSERT(rodNodes[i].numSuccedents == 1);
1484 rodNodes[i].succedents[0] = syncNode;
1485 syncNode->antecedents[i] = &rodNodes[i];
1486 syncNode->antType[i] = rf_trueData;
1487 }
1488 }
1489 else {
1490 /* connect the block node to the Nil node */
1491 RF_ASSERT(blockNode->numSuccedents == 1);
1492 RF_ASSERT(syncNode->numAntecedents == 1);
1493 blockNode->succedents[0] = syncNode;
1494 syncNode->antecedents[0] = blockNode;
1495 syncNode->antType[0] = rf_control;
1496 }
1497
1498 /* connect the sync node to the Wnd nodes */
1499 RF_ASSERT(syncNode->numSuccedents == (1 + nWndNodes));
1500 for (i = 0; i < nWndNodes; i++) {
1501 RF_ASSERT(wndNodes->numAntecedents == 1);
1502 syncNode->succedents[i] = &wndNodes[i];
1503 wndNodes[i].antecedents[0] = syncNode;
1504 wndNodes[i].antType[0] = rf_control;
1505 }
1506
1507 /* connect the sync node to the Xor node */
1508 RF_ASSERT(xorNode->numAntecedents == 1);
1509 syncNode->succedents[nWndNodes] = xorNode;
1510 xorNode->antecedents[0] = syncNode;
1511 xorNode->antType[0] = rf_control;
1512
1513 /* connect the xor node to the write parity node */
1514 RF_ASSERT(xorNode->numSuccedents == nfaults);
1515 RF_ASSERT(wnpNode->numAntecedents == 1);
1516 xorNode->succedents[0] = wnpNode;
1517 wnpNode->antecedents[0]= xorNode;
1518 wnpNode->antType[0] = rf_trueData;
1519 if (nfaults == 2) {
1520 RF_ASSERT(wnqNode->numAntecedents == 1);
1521 xorNode->succedents[1] = wnqNode;
1522 wnqNode->antecedents[0] = xorNode;
1523 wnqNode->antType[0] = rf_trueData;
1524 }
1525
1526 /* connect the write nodes to the term node */
1527 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
1528 RF_ASSERT(termNode->numSuccedents == 0);
1529 for (i = 0; i < nWndNodes; i++) {
1530 RF_ASSERT(wndNodes->numSuccedents == 1);
1531 wndNodes[i].succedents[0] = termNode;
1532 termNode->antecedents[i] = &wndNodes[i];
1533 termNode->antType[i] = rf_control;
1534 }
1535 RF_ASSERT(wnpNode->numSuccedents == 1);
1536 wnpNode->succedents[0] = termNode;
1537 termNode->antecedents[nWndNodes] = wnpNode;
1538 termNode->antType[nWndNodes] = rf_control;
1539 if (nfaults == 2) {
1540 RF_ASSERT(wnqNode->numSuccedents == 1);
1541 wnqNode->succedents[0] = termNode;
1542 termNode->antecedents[nWndNodes + 1] = wnqNode;
1543 termNode->antType[nWndNodes + 1] = rf_control;
1544 }
1545 }
1546
1547
1548 /******************************************************************************
1549 *
1550 * creates a DAG to perform a small-write operation (either raid 5 or pq),
1551 * which is as follows:
1552 *
1553 * Hdr -> Nil -> Rop - Xor - Wnp [Unp] -- Trm
1554 * \- Rod X- Wnd [Und] -------/
1555 * [\- Rod X- Wnd [Und] ------/]
1556 * [\- Roq - Q --> Wnq [Unq]-/]
1557 *
1558 * Rop = read old parity
1559 * Rod = read old data
1560 * Roq = read old "q"
1561 * Cmt = commit node
1562 * Und = unlock data disk
1563 * Unp = unlock parity disk
1564 * Unq = unlock q disk
1565 * Wnp = write new parity
1566 * Wnd = write new data
1567 * Wnq = write new "q"
1568 * [ ] denotes optional segments in the graph
1569 *
1570 * Parameters: raidPtr - description of the physical array
1571 * asmap - logical & physical addresses for this access
1572 * bp - buffer ptr (holds write data)
1573 * flags - general flags (e.g. disk locking)
1574 * allocList - list of memory allocated in DAG creation
1575 * pfuncs - list of parity generating functions
1576 * qfuncs - list of q generating functions
1577 *
1578 * A null qfuncs indicates single fault tolerant
1579 *****************************************************************************/
1580
1581 void rf_CommonCreateSmallWriteDAGFwd(
1582 RF_Raid_t *raidPtr,
1583 RF_AccessStripeMap_t *asmap,
1584 RF_DagHeader_t *dag_h,
1585 void *bp,
1586 RF_RaidAccessFlags_t flags,
1587 RF_AllocListElem_t *allocList,
1588 RF_RedFuncs_t *pfuncs,
1589 RF_RedFuncs_t *qfuncs)
1590 {
1591 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
1592 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
1593 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *nodes;
1594 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
1595 int i, j, nNodes, totalNumNodes, lu_flag;
1596 RF_ReconUnitNum_t which_ru;
1597 int (*func)(RF_DagNode_t *), (*undoFunc)(RF_DagNode_t *);
1598 int (*qfunc)(RF_DagNode_t *);
1599 int numDataNodes, numParityNodes;
1600 RF_StripeNum_t parityStripeID;
1601 RF_PhysDiskAddr_t *pda;
1602 char *name, *qname;
1603 long nfaults;
1604
1605 nfaults = qfuncs ? 2 : 1;
1606 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */
1607
1608 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
1609 pda = asmap->physInfo;
1610 numDataNodes = asmap->numStripeUnitsAccessed;
1611 numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
1612
1613 if (rf_dagDebug) printf("[Creating small-write DAG]\n");
1614 RF_ASSERT(numDataNodes > 0);
1615 dag_h->creator = "SmallWriteDAGFwd";
1616
1617 dag_h->numCommitNodes = 0;
1618 dag_h->numCommits = 0;
1619 dag_h->numSuccedents = 1;
1620
1621 qfunc = NULL;
1622 qname = NULL;
1623
1624 /* DAG creation occurs in four steps:
1625 1. count the number of nodes in the DAG
1626 2. create the nodes
1627 3. initialize the nodes
1628 4. connect the nodes
1629 */
1630
1631 /* Step 1. compute number of nodes in the graph */
1632
1633 /* number of nodes:
1634 a read and write for each data unit
1635 a redundancy computation node for each parity node (nfaults * nparity)
1636 a read and write for each parity unit
1637 a block node
1638 a terminate node
1639 if atomic RMW
1640 an unlock node for each data unit, redundancy unit
1641 */
1642 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) + (nfaults * 2 * numParityNodes) + 2;
1643 if (lu_flag)
1644 totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
1645
1646
1647 /* Step 2. create the nodes */
1648 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
1649 i = 0;
1650 blockNode = &nodes[i]; i += 1;
1651 readDataNodes = &nodes[i]; i += numDataNodes;
1652 readParityNodes = &nodes[i]; i += numParityNodes;
1653 writeDataNodes = &nodes[i]; i += numDataNodes;
1654 writeParityNodes = &nodes[i]; i += numParityNodes;
1655 xorNodes = &nodes[i]; i += numParityNodes;
1656 termNode = &nodes[i]; i += 1;
1657 if (lu_flag) {
1658 unlockDataNodes = &nodes[i]; i += numDataNodes;
1659 unlockParityNodes = &nodes[i]; i += numParityNodes;
1660 }
1661 else {
1662 unlockDataNodes = unlockParityNodes = NULL;
1663 }
1664 if (nfaults == 2) {
1665 readQNodes = &nodes[i]; i += numParityNodes;
1666 writeQNodes = &nodes[i]; i += numParityNodes;
1667 qNodes = &nodes[i]; i += numParityNodes;
1668 if (lu_flag) {
1669 unlockQNodes = &nodes[i]; i += numParityNodes;
1670 }
1671 else {
1672 unlockQNodes = NULL;
1673 }
1674 }
1675 else {
1676 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
1677 }
1678 RF_ASSERT(i == totalNumNodes);
1679
1680 /* Step 3. initialize the nodes */
1681 /* initialize block node (Nil) */
1682 nNodes = numDataNodes + (nfaults * numParityNodes);
1683 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
1684
1685 /* initialize terminate node (Trm) */
1686 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);
1687
1688 /* initialize nodes which read old data (Rod) */
1689 for (i = 0; i < numDataNodes; i++) {
1690 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, (numParityNodes * nfaults) + 1, 1, 4, 0, dag_h, "Rod", allocList);
1691 RF_ASSERT(pda != NULL);
1692 readDataNodes[i].params[0].p = pda; /* physical disk addr desc */
1693 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old data */
1694 readDataNodes[i].params[2].v = parityStripeID;
1695 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1696 pda=pda->next;
1697 for (j = 0; j < readDataNodes[i].numSuccedents; j++)
1698 readDataNodes[i].propList[j] = NULL;
1699 }
1700
1701 /* initialize nodes which read old parity (Rop) */
1702 pda = asmap->parityInfo; i = 0;
1703 for (i = 0; i < numParityNodes; i++) {
1704 RF_ASSERT(pda != NULL);
1705 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Rop", allocList);
1706 readParityNodes[i].params[0].p = pda;
1707 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old parity */
1708 readParityNodes[i].params[2].v = parityStripeID;
1709 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1710 for (j = 0; j < readParityNodes[i].numSuccedents; j++)
1711 readParityNodes[i].propList[0] = NULL;
1712 pda=pda->next;
1713 }
1714
1715 /* initialize nodes which read old Q (Roq) */
1716 if (nfaults == 2)
1717 {
1718 pda = asmap->qInfo;
1719 for (i = 0; i < numParityNodes; i++) {
1720 RF_ASSERT(pda != NULL);
1721 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);
1722 readQNodes[i].params[0].p = pda;
1723 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old Q */
1724 readQNodes[i].params[2].v = parityStripeID;
1725 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
1726 for (j = 0; j < readQNodes[i].numSuccedents; j++)
1727 readQNodes[i].propList[0] = NULL;
1728 pda=pda->next;
1729 }
1730 }
1731
1732 /* initialize nodes which write new data (Wnd) */
1733 pda = asmap->physInfo;
1734 for (i=0; i < numDataNodes; i++) {
1735 RF_ASSERT(pda != NULL);
1736 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
1737 writeDataNodes[i].params[0].p = pda; /* physical disk addr desc */
1738 writeDataNodes[i].params[1].p = pda->bufPtr; /* buffer holding new data to be written */
1739 writeDataNodes[i].params[2].v = parityStripeID;
1740 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1741
1742 if (lu_flag) {
1743 /* initialize node to unlock the disk queue */
1744 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Und", allocList);
1745 unlockDataNodes[i].params[0].p = pda; /* physical disk addr desc */
1746 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1747 }
1748
1749 pda = pda->next;
1750 }
1751
1752
1753 /* initialize nodes which compute new parity and Q */
1754 /* we use the simple XOR func in the double-XOR case, and when we're accessing only a portion of one stripe unit.
1755 * the distinction between the two is that the regular XOR func assumes that the targbuf is a full SU in size,
1756 * and examines the pda associated with the buffer to decide where within the buffer to XOR the data, whereas
1757 * the simple XOR func just XORs the data into the start of the buffer.
1758 */
1759 if ((numParityNodes==2) || ((numDataNodes == 1) && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
1760 func = pfuncs->simple; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->SimpleName;
1761 if (qfuncs) {
1762 qfunc = qfuncs->simple;
1763 qname = qfuncs->SimpleName;
1764 }
1765 }
1766 else {
1767 func = pfuncs->regular; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->RegularName;
1768 if (qfuncs) { qfunc = qfuncs->regular; qname = qfuncs->RegularName;}
1769 }
1770 /* initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop} nodes, and raidPtr */
1771 if (numParityNodes==2) { /* double-xor case */
1772 for (i=0; i < numParityNodes; i++) {
1773 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, name, allocList); /* no wakeup func for xor */
1774 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
1775 xorNodes[i].params[0] = readDataNodes[i].params[0];
1776 xorNodes[i].params[1] = readDataNodes[i].params[1];
1777 xorNodes[i].params[2] = readParityNodes[i].params[0];
1778 xorNodes[i].params[3] = readParityNodes[i].params[1];
1779 xorNodes[i].params[4] = writeDataNodes[i].params[0];
1780 xorNodes[i].params[5] = writeDataNodes[i].params[1];
1781 xorNodes[i].params[6].p = raidPtr;
1782 xorNodes[i].results[0] = readParityNodes[i].params[1].p; /* use old parity buf as target buf */
1783 if (nfaults==2)
1784 {
1785 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, qname, allocList); /* no wakeup func for xor */
1786 qNodes[i].params[0] = readDataNodes[i].params[0];
1787 qNodes[i].params[1] = readDataNodes[i].params[1];
1788 qNodes[i].params[2] = readQNodes[i].params[0];
1789 qNodes[i].params[3] = readQNodes[i].params[1];
1790 qNodes[i].params[4] = writeDataNodes[i].params[0];
1791 qNodes[i].params[5] = writeDataNodes[i].params[1];
1792 qNodes[i].params[6].p = raidPtr;
1793 qNodes[i].results[0] = readQNodes[i].params[1].p; /* use old Q buf as target buf */
1794 }
1795 }
1796 }
1797 else {
1798 /* there is only one xor node in this case */
1799 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
1800 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
1801 for (i=0; i < numDataNodes + 1; i++) {
1802 /* set up params related to Rod and Rop nodes */
1803 xorNodes[0].params[2*i+0] = readDataNodes[i].params[0]; /* pda */
1804 xorNodes[0].params[2*i+1] = readDataNodes[i].params[1]; /* buffer pointer */
1805 }
1806 for (i=0; i < numDataNodes; i++) {
1807 /* set up params related to Wnd and Wnp nodes */
1808 xorNodes[0].params[2*(numDataNodes+1+i)+0] = writeDataNodes[i].params[0]; /* pda */
1809 xorNodes[0].params[2*(numDataNodes+1+i)+1] = writeDataNodes[i].params[1]; /* buffer pointer */
1810 }
1811 xorNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr; /* xor node needs to get at RAID information */
1812 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
1813 if (nfaults==2)
1814 {
1815 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, qname, allocList);
1816 for (i=0; i<numDataNodes; i++) {
1817 /* set up params related to Rod */
1818 qNodes[0].params[2*i+0] = readDataNodes[i].params[0]; /* pda */
1819 qNodes[0].params[2*i+1] = readDataNodes[i].params[1]; /* buffer pointer */
1820 }
1821 /* and read old q */
1822 qNodes[0].params[2*numDataNodes + 0] = readQNodes[0].params[0]; /* pda */
1823 qNodes[0].params[2*numDataNodes + 1] = readQNodes[0].params[1]; /* buffer pointer */
1824 for (i=0; i < numDataNodes; i++) {
1825 /* set up params related to Wnd nodes */
1826 qNodes[0].params[2*(numDataNodes+1+i)+0] = writeDataNodes[i].params[0]; /* pda */
1827 qNodes[0].params[2*(numDataNodes+1+i)+1] = writeDataNodes[i].params[1]; /* buffer pointer */
1828 }
1829 qNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr; /* xor node needs to get at RAID information */
1830 qNodes[0].results[0] = readQNodes[0].params[1].p;
1831 }
1832 }
1833
1834 /* initialize nodes which write new parity (Wnp) */
1835 pda = asmap->parityInfo;
1836 for (i=0; i < numParityNodes; i++) {
1837 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnp", allocList);
1838 RF_ASSERT(pda != NULL);
1839 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr) filled in by xor node */
1840 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for parity write operation */
1841 writeParityNodes[i].params[2].v = parityStripeID;
1842 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1843
1844 if (lu_flag) {
1845 /* initialize node to unlock the disk queue */
1846 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unp", allocList);
1847 unlockParityNodes[i].params[0].p = pda; /* physical disk addr desc */
1848 unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1849 }
1850
1851 pda = pda->next;
1852 }
1853
1854 /* initialize nodes which write new Q (Wnq) */
1855 if (nfaults == 2)
1856 {
1857 pda = asmap->qInfo;
1858 for (i=0; i < numParityNodes; i++) {
1859 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnq", allocList);
1860 RF_ASSERT(pda != NULL);
1861 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr) filled in by xor node */
1862 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for parity write operation */
1863 writeQNodes[i].params[2].v = parityStripeID;
1864 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1865
1866 if (lu_flag) {
1867 /* initialize node to unlock the disk queue */
1868 rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unq", allocList);
1869 unlockQNodes[i].params[0].p = pda; /* physical disk addr desc */
1870 unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
1871 }
1872
1873 pda = pda->next;
1874 }
1875 }
1876
1877 /* Step 4. connect the nodes */
1878
1879 /* connect header to block node */
1880 dag_h->succedents[0] = blockNode;
1881
1882 /* connect block node to read old data nodes */
1883 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
1884 for (i = 0; i < numDataNodes; i++) {
1885 blockNode->succedents[i] = &readDataNodes[i];
1886 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
1887 readDataNodes[i].antecedents[0]= blockNode;
1888 readDataNodes[i].antType[0] = rf_control;
1889 }
1890
1891 /* connect block node to read old parity nodes */
1892 for (i = 0; i < numParityNodes; i++) {
1893 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
1894 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
1895 readParityNodes[i].antecedents[0] = blockNode;
1896 readParityNodes[i].antType[0] = rf_control;
1897 }
1898
1899 /* connect block node to read old Q nodes */
1900 if (nfaults == 2)
1901 for (i = 0; i < numParityNodes; i++) {
1902 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
1903 RF_ASSERT(readQNodes[i].numAntecedents == 1);
1904 readQNodes[i].antecedents[0] = blockNode;
1905 readQNodes[i].antType[0] = rf_control;
1906 }
1907
1908 /* connect read old data nodes to write new data nodes */
1909 for (i = 0; i < numDataNodes; i++) {
1910 RF_ASSERT(readDataNodes[i].numSuccedents == ((nfaults * numParityNodes) + 1));
1911 RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
1912 readDataNodes[i].succedents[0] = &writeDataNodes[i];
1913 writeDataNodes[i].antecedents[0] = &readDataNodes[i];
1914 writeDataNodes[i].antType[0] = rf_antiData;
1915 }
1916
1917 /* connect read old data nodes to xor nodes */
1918 for (i = 0; i < numDataNodes; i++) {
1919 for (j = 0; j < numParityNodes; j++){
1920 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
1921 readDataNodes[i].succedents[1 + j] = &xorNodes[j];
1922 xorNodes[j].antecedents[i] = &readDataNodes[i];
1923 xorNodes[j].antType[i] = rf_trueData;
1924 }
1925 }
1926
1927 /* connect read old data nodes to q nodes */
1928 if (nfaults == 2)
1929 for (i = 0; i < numDataNodes; i++)
1930 for (j = 0; j < numParityNodes; j++){
1931 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
1932 readDataNodes[i].succedents[1 + numParityNodes + j] = &qNodes[j];
1933 qNodes[j].antecedents[i] = &readDataNodes[i];
1934 qNodes[j].antType[i] = rf_trueData;
1935 }
1936
1937 /* connect read old parity nodes to xor nodes */
1938 for (i = 0; i < numParityNodes; i++) {
1939 for (j = 0; j < numParityNodes; j++) {
1940 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
1941 readParityNodes[i].succedents[j] = &xorNodes[j];
1942 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
1943 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
1944 }
1945 }
1946
1947 /* connect read old q nodes to q nodes */
1948 if (nfaults == 2)
1949 for (i = 0; i < numParityNodes; i++) {
1950 for (j = 0; j < numParityNodes; j++) {
1951 RF_ASSERT(readQNodes[i].numSuccedents == numParityNodes);
1952 readQNodes[i].succedents[j] = &qNodes[j];
1953 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
1954 qNodes[j].antType[numDataNodes + i] = rf_trueData;
1955 }
1956 }
1957
1958 /* connect xor nodes to the write new parity nodes */
1959 for (i = 0; i < numParityNodes; i++) {
1960 RF_ASSERT(writeParityNodes[i].numAntecedents == numParityNodes);
1961 for (j = 0; j < numParityNodes; j++) {
1962 RF_ASSERT(xorNodes[j].numSuccedents == numParityNodes);
1963 xorNodes[i].succedents[j] = &writeParityNodes[j];
1964 writeParityNodes[j].antecedents[i] = &xorNodes[i];
1965 writeParityNodes[j].antType[i] = rf_trueData;
1966 }
1967 }
1968
1969 /* connect q nodes to the write new q nodes */
1970 if (nfaults == 2)
1971 for (i = 0; i < numParityNodes; i++) {
1972 RF_ASSERT(writeQNodes[i].numAntecedents == numParityNodes);
1973 for (j = 0; j < numParityNodes; j++) {
1974 RF_ASSERT(qNodes[j].numSuccedents == 1);
1975 qNodes[i].succedents[j] = &writeQNodes[j];
1976 writeQNodes[j].antecedents[i] = &qNodes[i];
1977 writeQNodes[j].antType[i] = rf_trueData;
1978 }
1979 }
1980
1981 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
1982 RF_ASSERT(termNode->numSuccedents == 0);
1983 for (i = 0; i < numDataNodes; i++) {
1984 if (lu_flag) {
1985 /* connect write new data nodes to unlock nodes */
1986 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1987 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
1988 writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
1989 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
1990 unlockDataNodes[i].antType[0] = rf_control;
1991
1992 /* connect unlock nodes to term node */
1993 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
1994 unlockDataNodes[i].succedents[0] = termNode;
1995 termNode->antecedents[i] = &unlockDataNodes[i];
1996 termNode->antType[i] = rf_control;
1997 }
1998 else {
1999 /* connect write new data nodes to term node */
2000 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
2001 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
2002 writeDataNodes[i].succedents[0] = termNode;
2003 termNode->antecedents[i] = &writeDataNodes[i];
2004 termNode->antType[i] = rf_control;
2005 }
2006 }
2007
2008 for (i = 0; i < numParityNodes; i++) {
2009 if (lu_flag) {
2010 /* connect write new parity nodes to unlock nodes */
2011 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
2012 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
2013 writeParityNodes[i].succedents[0] = &unlockParityNodes[i];
2014 unlockParityNodes[i].antecedents[0] = &writeParityNodes[i];
2015 unlockParityNodes[i].antType[0] = rf_control;
2016
2017 /* connect unlock nodes to term node */
2018 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
2019 unlockParityNodes[i].succedents[0] = termNode;
2020 termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i];
2021 termNode->antType[numDataNodes + i] = rf_control;
2022 }
2023 else {
2024 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
2025 writeParityNodes[i].succedents[0] = termNode;
2026 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
2027 termNode->antType[numDataNodes + i] = rf_control;
2028 }
2029 }
2030
2031 if (nfaults == 2)
2032 for (i = 0; i < numParityNodes; i++) {
2033 if (lu_flag) {
2034 /* connect write new Q nodes to unlock nodes */
2035 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
2036 RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
2037 writeQNodes[i].succedents[0] = &unlockQNodes[i];
2038 unlockQNodes[i].antecedents[0] = &writeQNodes[i];
2039 unlockQNodes[i].antType[0] = rf_control;
2040
2041 /* connect unlock nodes to unblock node */
2042 RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
2043 unlockQNodes[i].succedents[0] = termNode;
2044 termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i];
2045 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
2046 }
2047 else {
2048 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
2049 writeQNodes[i].succedents[0] = termNode;
2050 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
2051 termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
2052 }
2053 }
2054 }
2055
2056
2057
2058 /******************************************************************************
2059 * create a write graph (fault-free or degraded) for RAID level 1
2060 *
2061 * Hdr Nil -> Wpd -> Nil -> Trm
2062 * Nil -> Wsd ->
2063 *
2064 * The "Wpd" node writes data to the primary copy in the mirror pair
2065 * The "Wsd" node writes data to the secondary copy in the mirror pair
2066 *
2067 * Parameters: raidPtr - description of the physical array
2068 * asmap - logical & physical addresses for this access
2069 * bp - buffer ptr (holds write data)
2070 * flags - general flags (e.g. disk locking)
2071 * allocList - list of memory allocated in DAG creation
2072 *****************************************************************************/
2073
2074 void rf_CreateRaidOneWriteDAGFwd(
2075 RF_Raid_t *raidPtr,
2076 RF_AccessStripeMap_t *asmap,
2077 RF_DagHeader_t *dag_h,
2078 void *bp,
2079 RF_RaidAccessFlags_t flags,
2080 RF_AllocListElem_t *allocList)
2081 {
2082 RF_DagNode_t *blockNode, *unblockNode, *termNode;
2083 RF_DagNode_t *nodes, *wndNode, *wmirNode;
2084 int nWndNodes, nWmirNodes, i;
2085 RF_ReconUnitNum_t which_ru;
2086 RF_PhysDiskAddr_t *pda, *pdaP;
2087 RF_StripeNum_t parityStripeID;
2088
2089 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
2090 asmap->raidAddress, &which_ru);
2091 if (rf_dagDebug) {
2092 printf("[Creating RAID level 1 write DAG]\n");
2093 }
2094
2095 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; /* 2 implies access not SU aligned */
2096 nWndNodes = (asmap->physInfo->next) ? 2 : 1;
2097
2098 /* alloc the Wnd nodes and the Wmir node */
2099 if (asmap->numDataFailed == 1)
2100 nWndNodes--;
2101 if (asmap->numParityFailed == 1)
2102 nWmirNodes--;
2103
2104 /* total number of nodes = nWndNodes + nWmirNodes + (block + unblock + terminator) */
2105 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
2106 i = 0;
2107 wndNode = &nodes[i]; i += nWndNodes;
2108 wmirNode = &nodes[i]; i += nWmirNodes;
2109 blockNode = &nodes[i]; i += 1;
2110 unblockNode = &nodes[i]; i += 1;
2111 termNode = &nodes[i]; i += 1;
2112 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
2113
2114 /* this dag can commit immediately */
2115 dag_h->numCommitNodes = 0;
2116 dag_h->numCommits = 0;
2117 dag_h->numSuccedents = 1;
2118
2119 /* initialize the unblock and term nodes */
2120 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Nil", allocList);
2121 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList);
2122 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
2123
2124 /* initialize the wnd nodes */
2125 if (nWndNodes > 0) {
2126 pda = asmap->physInfo;
2127 for (i = 0; i < nWndNodes; i++) {
2128 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList);
2129 RF_ASSERT(pda != NULL);
2130 wndNode[i].params[0].p = pda;
2131 wndNode[i].params[1].p = pda->bufPtr;
2132 wndNode[i].params[2].v = parityStripeID;
2133 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
2134 pda = pda->next;
2135 }
2136 RF_ASSERT(pda == NULL);
2137 }
2138
2139 /* initialize the mirror nodes */
2140 if (nWmirNodes > 0) {
2141 pda = asmap->physInfo;
2142 pdaP = asmap->parityInfo;
2143 for (i = 0; i < nWmirNodes; i++) {
2144 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList);
2145 RF_ASSERT(pda != NULL);
2146 wmirNode[i].params[0].p = pdaP;
2147 wmirNode[i].params[1].p = pda->bufPtr;
2148 wmirNode[i].params[2].v = parityStripeID;
2149 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
2150 pda = pda->next;
2151 pdaP = pdaP->next;
2152 }
2153 RF_ASSERT(pda == NULL);
2154 RF_ASSERT(pdaP == NULL);
2155 }
2156
2157 /* link the header node to the block node */
2158 RF_ASSERT(dag_h->numSuccedents == 1);
2159 RF_ASSERT(blockNode->numAntecedents == 0);
2160 dag_h->succedents[0] = blockNode;
2161
2162 /* link the block node to the write nodes */
2163 RF_ASSERT(blockNode->numSuccedents == (nWndNodes + nWmirNodes));
2164 for (i = 0; i < nWndNodes; i++) {
2165 RF_ASSERT(wndNode[i].numAntecedents == 1);
2166 blockNode->succedents[i] = &wndNode[i];
2167 wndNode[i].antecedents[0] = blockNode;
2168 wndNode[i].antType[0] = rf_control;
2169 }
2170 for (i = 0; i < nWmirNodes; i++) {
2171 RF_ASSERT(wmirNode[i].numAntecedents == 1);
2172 blockNode->succedents[i + nWndNodes] = &wmirNode[i];
2173 wmirNode[i].antecedents[0] = blockNode;
2174 wmirNode[i].antType[0] = rf_control;
2175 }
2176
2177 /* link the write nodes to the unblock node */
2178 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
2179 for (i = 0; i < nWndNodes; i++) {
2180 RF_ASSERT(wndNode[i].numSuccedents == 1);
2181 wndNode[i].succedents[0] = unblockNode;
2182 unblockNode->antecedents[i] = &wndNode[i];
2183 unblockNode->antType[i] = rf_control;
2184 }
2185 for (i = 0; i < nWmirNodes; i++) {
2186 RF_ASSERT(wmirNode[i].numSuccedents == 1);
2187 wmirNode[i].succedents[0] = unblockNode;
2188 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
2189 unblockNode->antType[i + nWndNodes] = rf_control;
2190 }
2191
2192 /* link the unblock node to the term node */
2193 RF_ASSERT(unblockNode->numSuccedents == 1);
2194 RF_ASSERT(termNode->numAntecedents == 1);
2195 RF_ASSERT(termNode->numSuccedents == 0);
2196 unblockNode->succedents[0] = termNode;
2197 termNode->antecedents[0] = unblockNode;
2198 termNode->antType[0] = rf_control;
2199
2200 return;
2201 }
2202