rf_parityloggingdags.c revision 1.2 1 /* $NetBSD: rf_parityloggingdags.c,v 1.2 1999/01/26 02:34:00 oster Exp $ */
2 /*
3 * Copyright (c) 1995 Carnegie-Mellon University.
4 * All rights reserved.
5 *
6 * Author: William V. Courtright II
7 *
8 * Permission to use, copy, modify and distribute this software and
9 * its documentation is hereby granted, provided that both the copyright
10 * notice and this permission notice appear in all copies of the
11 * software, derivative works or modified versions, and any portions
12 * thereof, and that both notices appear in supporting documentation.
13 *
14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17 *
18 * Carnegie Mellon requests users of this software to return to
19 *
20 * Software Distribution Coordinator or Software.Distribution (at) CS.CMU.EDU
21 * School of Computer Science
22 * Carnegie Mellon University
23 * Pittsburgh PA 15213-3890
24 *
25 * any improvements or extensions that they make and grant Carnegie the
26 * rights to redistribute these changes.
27 */
28
29 #include "rf_archs.h"
30
31 #if RF_INCLUDE_PARITYLOGGING > 0
32
33 /*
34 DAGs specific to parity logging are created here
35 */
36
37 #include "rf_types.h"
38 #include "rf_raid.h"
39 #include "rf_dag.h"
40 #include "rf_dagutils.h"
41 #include "rf_dagfuncs.h"
42 #include "rf_threadid.h"
43 #include "rf_debugMem.h"
44 #include "rf_paritylog.h"
45 #include "rf_memchunk.h"
46 #include "rf_general.h"
47
48 #include "rf_parityloggingdags.h"
49
50 /******************************************************************************
51 *
52 * creates a DAG to perform a large-write operation:
53 *
54 * / Rod \ / Wnd \
55 * H -- NIL- Rod - NIL - Wnd ------ NIL - T
56 * \ Rod / \ Xor - Lpo /
57 *
58 * The writes are not done until the reads complete because if they were done in
59 * parallel, a failure on one of the reads could leave the parity in an inconsistent
60 * state, so that the retry with a new DAG would produce erroneous parity.
61 *
62 * Note: this DAG has the nasty property that none of the buffers allocated for reading
63 * old data can be freed until the XOR node fires. Need to fix this.
64 *
65 * The last two arguments are the number of faults tolerated, and function for the
66 * redundancy calculation. The undo for the redundancy calc is assumed to be null
67 *
68 *****************************************************************************/
69
70 void rf_CommonCreateParityLoggingLargeWriteDAG(
71 RF_Raid_t *raidPtr,
72 RF_AccessStripeMap_t *asmap,
73 RF_DagHeader_t *dag_h,
74 void *bp,
75 RF_RaidAccessFlags_t flags,
76 RF_AllocListElem_t *allocList,
77 int nfaults,
78 int (*redFunc)(RF_DagNode_t *))
79 {
80 RF_DagNode_t *nodes, *wndNodes, *rodNodes=NULL, *syncNode, *xorNode, *lpoNode, *blockNode, *unblockNode, *termNode;
81 int nWndNodes, nRodNodes, i;
82 RF_RaidLayout_t *layoutPtr = &(raidPtr->Layout);
83 RF_AccessStripeMapHeader_t *new_asm_h[2];
84 int nodeNum, asmNum;
85 RF_ReconUnitNum_t which_ru;
86 char *sosBuffer, *eosBuffer;
87 RF_PhysDiskAddr_t *pda;
88 RF_StripeNum_t parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
89
90 if (rf_dagDebug)
91 printf("[Creating parity-logging large-write DAG]\n");
92 RF_ASSERT(nfaults == 1); /* this arch only single fault tolerant */
93 dag_h->creator = "ParityLoggingLargeWriteDAG";
94
95 /* alloc the Wnd nodes, the xor node, and the Lpo node */
96 nWndNodes = asmap->numStripeUnitsAccessed;
97 RF_CallocAndAdd(nodes, nWndNodes + 6, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
98 i = 0;
99 wndNodes = &nodes[i]; i += nWndNodes;
100 xorNode = &nodes[i]; i += 1;
101 lpoNode = &nodes[i]; i += 1;
102 blockNode = &nodes[i]; i += 1;
103 syncNode = &nodes[i]; i += 1;
104 unblockNode = &nodes[i]; i += 1;
105 termNode = &nodes[i]; i += 1;
106
107 dag_h->numCommitNodes = nWndNodes + 1;
108 dag_h->numCommits = 0;
109 dag_h->numSuccedents = 1;
110
111 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
112 if (nRodNodes > 0)
113 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
114
115 /* begin node initialization */
116 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nRodNodes + 1, 0, 0, 0, dag_h, "Nil", allocList);
117 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, nWndNodes + 1, 0, 0, dag_h, "Nil", allocList);
118 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes + 1, 0, 0, dag_h, "Nil", allocList);
119 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
120
121 /* initialize the Rod nodes */
122 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
123 if (new_asm_h[asmNum]) {
124 pda = new_asm_h[asmNum]->stripeMap->physInfo;
125 while (pda) {
126 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc,rf_DiskReadUndoFunc,rf_GenericWakeupFunc,1,1,4,0, dag_h, "Rod", allocList);
127 rodNodes[nodeNum].params[0].p = pda;
128 rodNodes[nodeNum].params[1].p = pda->bufPtr;
129 rodNodes[nodeNum].params[2].v = parityStripeID;
130 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
131 nodeNum++;
132 pda=pda->next;
133 }
134 }
135 }
136 RF_ASSERT(nodeNum == nRodNodes);
137
138 /* initialize the wnd nodes */
139 pda = asmap->physInfo;
140 for (i=0; i < nWndNodes; i++) {
141 rf_InitNode(&wndNodes[i], rf_wait, RF_TRUE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
142 RF_ASSERT(pda != NULL);
143 wndNodes[i].params[0].p = pda;
144 wndNodes[i].params[1].p = pda->bufPtr;
145 wndNodes[i].params[2].v = parityStripeID;
146 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
147 pda = pda->next;
148 }
149
150 /* initialize the redundancy node */
151 rf_InitNode(xorNode, rf_wait, RF_TRUE, redFunc, rf_NullNodeUndoFunc, NULL, 1, 1, 2*(nWndNodes+nRodNodes)+1, 1, dag_h, "Xr ", allocList);
152 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
153 for (i=0; i < nWndNodes; i++) {
154 xorNode->params[2*i+0] = wndNodes[i].params[0]; /* pda */
155 xorNode->params[2*i+1] = wndNodes[i].params[1]; /* buf ptr */
156 }
157 for (i=0; i < nRodNodes; i++) {
158 xorNode->params[2*(nWndNodes+i)+0] = rodNodes[i].params[0]; /* pda */
159 xorNode->params[2*(nWndNodes+i)+1] = rodNodes[i].params[1]; /* buf ptr */
160 }
161 xorNode->params[2*(nWndNodes+nRodNodes)].p = raidPtr; /* xor node needs to get at RAID information */
162
163 /* look for an Rod node that reads a complete SU. If none, alloc a buffer to receive the parity info.
164 * Note that we can't use a new data buffer because it will not have gotten written when the xor occurs.
165 */
166 for (i = 0; i < nRodNodes; i++)
167 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
168 break;
169 if (i == nRodNodes) {
170 RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);
171 }
172 else {
173 xorNode->results[0] = rodNodes[i].params[1].p;
174 }
175
176 /* initialize the Lpo node */
177 rf_InitNode(lpoNode, rf_wait, RF_FALSE, rf_ParityLogOverwriteFunc, rf_ParityLogOverwriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Lpo", allocList);
178
179 lpoNode->params[0].p = asmap->parityInfo;
180 lpoNode->params[1].p = xorNode->results[0];
181 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must describe entire parity unit */
182
183 /* connect nodes to form graph */
184
185 /* connect dag header to block node */
186 RF_ASSERT(dag_h->numSuccedents == 1);
187 RF_ASSERT(blockNode->numAntecedents == 0);
188 dag_h->succedents[0] = blockNode;
189
190 /* connect the block node to the Rod nodes */
191 RF_ASSERT(blockNode->numSuccedents == nRodNodes + 1);
192 for (i = 0; i < nRodNodes; i++) {
193 RF_ASSERT(rodNodes[i].numAntecedents == 1);
194 blockNode->succedents[i] = &rodNodes[i];
195 rodNodes[i].antecedents[0] = blockNode;
196 rodNodes[i].antType[0] = rf_control;
197 }
198
199 /* connect the block node to the sync node */
200 /* necessary if nRodNodes == 0 */
201 RF_ASSERT(syncNode->numAntecedents == nRodNodes + 1);
202 blockNode->succedents[nRodNodes] = syncNode;
203 syncNode->antecedents[0] = blockNode;
204 syncNode->antType[0] = rf_control;
205
206 /* connect the Rod nodes to the syncNode */
207 for (i = 0; i < nRodNodes; i++) {
208 rodNodes[i].succedents[0] = syncNode;
209 syncNode->antecedents[1 + i] = &rodNodes[i];
210 syncNode->antType[1 + i] = rf_control;
211 }
212
213 /* connect the sync node to the xor node */
214 RF_ASSERT(syncNode->numSuccedents == nWndNodes + 1);
215 RF_ASSERT(xorNode->numAntecedents == 1);
216 syncNode->succedents[0] = xorNode;
217 xorNode->antecedents[0] = syncNode;
218 xorNode->antType[0] = rf_trueData; /* carry forward from sync */
219
220 /* connect the sync node to the Wnd nodes */
221 for (i = 0; i < nWndNodes; i++) {
222 RF_ASSERT(wndNodes->numAntecedents == 1);
223 syncNode->succedents[1 + i] = &wndNodes[i];
224 wndNodes[i].antecedents[0] = syncNode;
225 wndNodes[i].antType[0] = rf_control;
226 }
227
228 /* connect the xor node to the Lpo node */
229 RF_ASSERT(xorNode->numSuccedents == 1);
230 RF_ASSERT(lpoNode->numAntecedents == 1);
231 xorNode->succedents[0] = lpoNode;
232 lpoNode->antecedents[0]= xorNode;
233 lpoNode->antType[0] = rf_trueData;
234
235 /* connect the Wnd nodes to the unblock node */
236 RF_ASSERT(unblockNode->numAntecedents == nWndNodes + 1);
237 for (i = 0; i < nWndNodes; i++) {
238 RF_ASSERT(wndNodes->numSuccedents == 1);
239 wndNodes[i].succedents[0] = unblockNode;
240 unblockNode->antecedents[i] = &wndNodes[i];
241 unblockNode->antType[i] = rf_control;
242 }
243
244 /* connect the Lpo node to the unblock node */
245 RF_ASSERT(lpoNode->numSuccedents == 1);
246 lpoNode->succedents[0] = unblockNode;
247 unblockNode->antecedents[nWndNodes] = lpoNode;
248 unblockNode->antType[nWndNodes] = rf_control;
249
250 /* connect unblock node to terminator */
251 RF_ASSERT(unblockNode->numSuccedents == 1);
252 RF_ASSERT(termNode->numAntecedents == 1);
253 RF_ASSERT(termNode->numSuccedents == 0);
254 unblockNode->succedents[0] = termNode;
255 termNode->antecedents[0] = unblockNode;
256 termNode->antType[0] = rf_control;
257 }
258
259
260
261
262 /******************************************************************************
263 *
264 * creates a DAG to perform a small-write operation (either raid 5 or pq), which is as follows:
265 *
266 * Header
267 * |
268 * Block
269 * / | ... \ \
270 * / | \ \
271 * Rod Rod Rod Rop
272 * | \ /| \ / | \/ |
273 * | | | /\ |
274 * Wnd Wnd Wnd X
275 * | \ / |
276 * | \ / |
277 * \ \ / Lpo
278 * \ \ / /
279 * +-> Unblock <-+
280 * |
281 * T
282 *
283 *
284 * R = Read, W = Write, X = Xor, o = old, n = new, d = data, p = parity.
285 * When the access spans a stripe unit boundary and is less than one SU in size, there will
286 * be two Rop -- X -- Wnp branches. I call this the "double-XOR" case.
287 * The second output from each Rod node goes to the X node. In the double-XOR
288 * case, there are exactly 2 Rod nodes, and each sends one output to one X node.
289 * There is one Rod -- Wnd -- T branch for each stripe unit being updated.
290 *
291 * The block and unblock nodes are unused. See comment above CreateFaultFreeReadDAG.
292 *
293 * Note: this DAG ignores all the optimizations related to making the RMWs atomic.
294 * it also has the nasty property that none of the buffers allocated for reading
295 * old data & parity can be freed until the XOR node fires. Need to fix this.
296 *
297 * A null qfuncs indicates single fault tolerant
298 *****************************************************************************/
299
300 void rf_CommonCreateParityLoggingSmallWriteDAG(
301 RF_Raid_t *raidPtr,
302 RF_AccessStripeMap_t *asmap,
303 RF_DagHeader_t *dag_h,
304 void *bp,
305 RF_RaidAccessFlags_t flags,
306 RF_AllocListElem_t *allocList,
307 RF_RedFuncs_t *pfuncs,
308 RF_RedFuncs_t *qfuncs)
309 {
310 RF_DagNode_t *xorNodes, *blockNode, *unblockNode, *nodes;
311 RF_DagNode_t *readDataNodes, *readParityNodes;
312 RF_DagNode_t *writeDataNodes, *lpuNodes;
313 RF_DagNode_t *unlockDataNodes=NULL, *termNode;
314 RF_PhysDiskAddr_t *pda = asmap->physInfo;
315 int numDataNodes = asmap->numStripeUnitsAccessed;
316 int numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
317 int i, j, nNodes, totalNumNodes;
318 RF_ReconUnitNum_t which_ru;
319 int (*func)(RF_DagNode_t *node), (*undoFunc)(RF_DagNode_t *node);
320 int (*qfunc)(RF_DagNode_t *node);
321 char *name, *qname;
322 RF_StripeNum_t parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);
323 long nfaults = qfuncs ? 2 : 1;
324 int lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */
325
326 if (rf_dagDebug) printf("[Creating parity-logging small-write DAG]\n");
327 RF_ASSERT(numDataNodes > 0);
328 RF_ASSERT(nfaults == 1);
329 dag_h->creator = "ParityLoggingSmallWriteDAG";
330
331 /* DAG creation occurs in three steps:
332 1. count the number of nodes in the DAG
333 2. create the nodes
334 3. initialize the nodes
335 4. connect the nodes
336 */
337
338 /* Step 1. compute number of nodes in the graph */
339
340 /* number of nodes:
341 a read and write for each data unit
342 a redundancy computation node for each parity node
343 a read and Lpu for each parity unit
344 a block and unblock node (2)
345 a terminator node
346 if atomic RMW
347 an unlock node for each data unit, redundancy unit
348 */
349 totalNumNodes = (2 * numDataNodes) + numParityNodes + (2 * numParityNodes) + 3;
350 if (lu_flag)
351 totalNumNodes += numDataNodes;
352
353 nNodes = numDataNodes + numParityNodes;
354
355 dag_h->numCommitNodes = numDataNodes + numParityNodes;
356 dag_h->numCommits = 0;
357 dag_h->numSuccedents = 1;
358
359 /* Step 2. create the nodes */
360 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
361 i = 0;
362 blockNode = &nodes[i]; i += 1;
363 unblockNode = &nodes[i]; i += 1;
364 readDataNodes = &nodes[i]; i += numDataNodes;
365 readParityNodes = &nodes[i]; i += numParityNodes;
366 writeDataNodes = &nodes[i]; i += numDataNodes;
367 lpuNodes = &nodes[i]; i += numParityNodes;
368 xorNodes = &nodes[i]; i += numParityNodes;
369 termNode = &nodes[i]; i += 1;
370 if (lu_flag) {
371 unlockDataNodes = &nodes[i]; i += numDataNodes;
372 }
373 RF_ASSERT(i == totalNumNodes);
374
375 /* Step 3. initialize the nodes */
376 /* initialize block node (Nil) */
377 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
378
379 /* initialize unblock node (Nil) */
380 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, nNodes, 0, 0, dag_h, "Nil", allocList);
381
382 /* initialize terminatory node (Trm) */
383 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
384
385 /* initialize nodes which read old data (Rod) */
386 for (i = 0; i < numDataNodes; i++) {
387 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, nNodes, 1, 4, 0, dag_h, "Rod", allocList);
388 RF_ASSERT(pda != NULL);
389 readDataNodes[i].params[0].p = pda; /* physical disk addr desc */
390 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old data */
391 readDataNodes[i].params[2].v = parityStripeID;
392 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);
393 pda=pda->next;
394 readDataNodes[i].propList[0] = NULL;
395 readDataNodes[i].propList[1] = NULL;
396 }
397
398 /* initialize nodes which read old parity (Rop) */
399 pda = asmap->parityInfo; i = 0;
400 for (i = 0; i < numParityNodes; i++) {
401 RF_ASSERT(pda != NULL);
402 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, nNodes, 1, 4, 0, dag_h, "Rop", allocList);
403 readParityNodes[i].params[0].p = pda;
404 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old parity */
405 readParityNodes[i].params[2].v = parityStripeID;
406 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
407 readParityNodes[i].propList[0] = NULL;
408 pda=pda->next;
409 }
410
411 /* initialize nodes which write new data (Wnd) */
412 pda = asmap->physInfo;
413 for (i=0; i < numDataNodes; i++) {
414 RF_ASSERT(pda != NULL);
415 rf_InitNode(&writeDataNodes[i], rf_wait, RF_TRUE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, nNodes, 4, 0, dag_h, "Wnd", allocList);
416 writeDataNodes[i].params[0].p = pda; /* physical disk addr desc */
417 writeDataNodes[i].params[1].p = pda->bufPtr; /* buffer holding new data to be written */
418 writeDataNodes[i].params[2].v = parityStripeID;
419 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
420
421 if (lu_flag) {
422 /* initialize node to unlock the disk queue */
423 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Und", allocList);
424 unlockDataNodes[i].params[0].p = pda; /* physical disk addr desc */
425 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru);
426 }
427 pda = pda->next;
428 }
429
430
431 /* initialize nodes which compute new parity */
432 /* we use the simple XOR func in the double-XOR case, and when we're accessing only a portion of one stripe unit.
433 * the distinction between the two is that the regular XOR func assumes that the targbuf is a full SU in size,
434 * and examines the pda associated with the buffer to decide where within the buffer to XOR the data, whereas
435 * the simple XOR func just XORs the data into the start of the buffer.
436 */
437 if ((numParityNodes==2) || ((numDataNodes == 1) && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
438 func = pfuncs->simple; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->SimpleName;
439 if (qfuncs)
440 { qfunc = qfuncs->simple; qname = qfuncs->SimpleName;}
441 } else {
442 func = pfuncs->regular; undoFunc = rf_NullNodeUndoFunc; name = pfuncs->RegularName;
443 if (qfuncs) { qfunc = qfuncs->regular; qname = qfuncs->RegularName;}
444 }
445 /* initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop} nodes, and raidPtr */
446 if (numParityNodes==2) { /* double-xor case */
447 for (i=0; i < numParityNodes; i++) {
448 rf_InitNode(&xorNodes[i], rf_wait, RF_TRUE, func, undoFunc, NULL, 1, nNodes, 7, 1, dag_h, name, allocList); /* no wakeup func for xor */
449 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
450 xorNodes[i].params[0] = readDataNodes[i].params[0];
451 xorNodes[i].params[1] = readDataNodes[i].params[1];
452 xorNodes[i].params[2] = readParityNodes[i].params[0];
453 xorNodes[i].params[3] = readParityNodes[i].params[1];
454 xorNodes[i].params[4] = writeDataNodes[i].params[0];
455 xorNodes[i].params[5] = writeDataNodes[i].params[1];
456 xorNodes[i].params[6].p = raidPtr;
457 xorNodes[i].results[0] = readParityNodes[i].params[1].p; /* use old parity buf as target buf */
458 }
459 }
460 else {
461 /* there is only one xor node in this case */
462 rf_InitNode(&xorNodes[0], rf_wait, RF_TRUE, func, undoFunc, NULL, 1, nNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
463 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
464 for (i=0; i < numDataNodes + 1; i++) {
465 /* set up params related to Rod and Rop nodes */
466 xorNodes[0].params[2*i+0] = readDataNodes[i].params[0]; /* pda */
467 xorNodes[0].params[2*i+1] = readDataNodes[i].params[1]; /* buffer pointer */
468 }
469 for (i=0; i < numDataNodes; i++) {
470 /* set up params related to Wnd and Wnp nodes */
471 xorNodes[0].params[2*(numDataNodes+1+i)+0] = writeDataNodes[i].params[0]; /* pda */
472 xorNodes[0].params[2*(numDataNodes+1+i)+1] = writeDataNodes[i].params[1]; /* buffer pointer */
473 }
474 xorNodes[0].params[2*(numDataNodes+numDataNodes+1)].p = raidPtr; /* xor node needs to get at RAID information */
475 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
476 }
477
478 /* initialize the log node(s) */
479 pda = asmap->parityInfo;
480 for (i = 0; i < numParityNodes; i++) {
481 RF_ASSERT(pda);
482 rf_InitNode(&lpuNodes[i], rf_wait, RF_FALSE, rf_ParityLogUpdateFunc, rf_ParityLogUpdateUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Lpu", allocList);
483 lpuNodes[i].params[0].p = pda; /* PhysDiskAddr of parity */
484 lpuNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer to parity */
485 pda = pda->next;
486 }
487
488
489 /* Step 4. connect the nodes */
490
491 /* connect header to block node */
492 RF_ASSERT(dag_h->numSuccedents == 1);
493 RF_ASSERT(blockNode->numAntecedents == 0);
494 dag_h->succedents[0] = blockNode;
495
496 /* connect block node to read old data nodes */
497 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + numParityNodes));
498 for (i = 0; i < numDataNodes; i++) {
499 blockNode->succedents[i] = &readDataNodes[i];
500 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
501 readDataNodes[i].antecedents[0]= blockNode;
502 readDataNodes[i].antType[0] = rf_control;
503 }
504
505 /* connect block node to read old parity nodes */
506 for (i = 0; i < numParityNodes; i++) {
507 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
508 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
509 readParityNodes[i].antecedents[0] = blockNode;
510 readParityNodes[i].antType[0] = rf_control;
511 }
512
513 /* connect read old data nodes to write new data nodes */
514 for (i = 0; i < numDataNodes; i++) {
515 RF_ASSERT(readDataNodes[i].numSuccedents == numDataNodes + numParityNodes);
516 for (j = 0; j < numDataNodes; j++) {
517 RF_ASSERT(writeDataNodes[j].numAntecedents == numDataNodes + numParityNodes);
518 readDataNodes[i].succedents[j] = &writeDataNodes[j];
519 writeDataNodes[j].antecedents[i] = &readDataNodes[i];
520 if (i == j)
521 writeDataNodes[j].antType[i] = rf_antiData;
522 else
523 writeDataNodes[j].antType[i] = rf_control;
524 }
525 }
526
527 /* connect read old data nodes to xor nodes */
528 for (i = 0; i < numDataNodes; i++)
529 for (j = 0; j < numParityNodes; j++){
530 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
531 readDataNodes[i].succedents[numDataNodes + j] = &xorNodes[j];
532 xorNodes[j].antecedents[i] = &readDataNodes[i];
533 xorNodes[j].antType[i] = rf_trueData;
534 }
535
536 /* connect read old parity nodes to write new data nodes */
537 for (i = 0; i < numParityNodes; i++) {
538 RF_ASSERT(readParityNodes[i].numSuccedents == numDataNodes + numParityNodes);
539 for (j = 0; j < numDataNodes; j++) {
540 readParityNodes[i].succedents[j] = &writeDataNodes[j];
541 writeDataNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
542 writeDataNodes[j].antType[numDataNodes + i] = rf_control;
543 }
544 }
545
546 /* connect read old parity nodes to xor nodes */
547 for (i = 0; i < numParityNodes; i++)
548 for (j = 0; j < numParityNodes; j++) {
549 readParityNodes[i].succedents[numDataNodes + j] = &xorNodes[j];
550 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
551 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
552 }
553
554 /* connect xor nodes to write new parity nodes */
555 for (i = 0; i < numParityNodes; i++) {
556 RF_ASSERT(xorNodes[i].numSuccedents == 1);
557 RF_ASSERT(lpuNodes[i].numAntecedents == 1);
558 xorNodes[i].succedents[0] = &lpuNodes[i];
559 lpuNodes[i].antecedents[0] = &xorNodes[i];
560 lpuNodes[i].antType[0] = rf_trueData;
561 }
562
563 for (i = 0; i < numDataNodes; i++) {
564 if (lu_flag) {
565 /* connect write new data nodes to unlock nodes */
566 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
567 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
568 writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
569 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
570 unlockDataNodes[i].antType[0] = rf_control;
571
572 /* connect unlock nodes to unblock node */
573 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
574 RF_ASSERT(unblockNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
575 unlockDataNodes[i].succedents[0] = unblockNode;
576 unblockNode->antecedents[i] = &unlockDataNodes[i];
577 unblockNode->antType[i] = rf_control;
578 }
579 else {
580 /* connect write new data nodes to unblock node */
581 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
582 RF_ASSERT(unblockNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
583 writeDataNodes[i].succedents[0] = unblockNode;
584 unblockNode->antecedents[i] = &writeDataNodes[i];
585 unblockNode->antType[i] = rf_control;
586 }
587 }
588
589 /* connect write new parity nodes to unblock node */
590 for (i = 0; i < numParityNodes; i++) {
591 RF_ASSERT(lpuNodes[i].numSuccedents == 1);
592 lpuNodes[i].succedents[0] = unblockNode;
593 unblockNode->antecedents[numDataNodes + i] = &lpuNodes[i];
594 unblockNode->antType[numDataNodes + i] = rf_control;
595 }
596
597 /* connect unblock node to terminator */
598 RF_ASSERT(unblockNode->numSuccedents == 1);
599 RF_ASSERT(termNode->numAntecedents == 1);
600 RF_ASSERT(termNode->numSuccedents == 0);
601 unblockNode->succedents[0] = termNode;
602 termNode->antecedents[0] = unblockNode;
603 termNode->antType[0] = rf_control;
604 }
605
606
607 void rf_CreateParityLoggingSmallWriteDAG(
608 RF_Raid_t *raidPtr,
609 RF_AccessStripeMap_t *asmap,
610 RF_DagHeader_t *dag_h,
611 void *bp,
612 RF_RaidAccessFlags_t flags,
613 RF_AllocListElem_t *allocList,
614 RF_RedFuncs_t *pfuncs,
615 RF_RedFuncs_t *qfuncs)
616 {
617 dag_h->creator = "ParityLoggingSmallWriteDAG";
618 rf_CommonCreateParityLoggingSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, &rf_xorFuncs, NULL);
619 }
620
621
622 void rf_CreateParityLoggingLargeWriteDAG(
623 RF_Raid_t *raidPtr,
624 RF_AccessStripeMap_t *asmap,
625 RF_DagHeader_t *dag_h,
626 void *bp,
627 RF_RaidAccessFlags_t flags,
628 RF_AllocListElem_t *allocList,
629 int nfaults,
630 int (*redFunc)(RF_DagNode_t *))
631 {
632 dag_h->creator = "ParityLoggingSmallWriteDAG";
633 rf_CommonCreateParityLoggingLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 1, rf_RegularXorFunc);
634 }
635
636 #endif /* RF_INCLUDE_PARITYLOGGING > 0 */
637